Source code for sails.modelfit

#!/usr/bin/python

# vim: set expandtab ts=4 sw=4:

import warnings

import numpy as np

from .stats import find_cov_multitrial
from .diags import ModelDiagnostics
from .utils import get_valid_samples

__all__ = []


def A_to_companion(A):
    """Transforms a [nsignals x nsignals x order] MVAR parameter array into the
    [nsignals*order x nsignals*order] companion form. This assumes there is no
    leading identity matrix in the A form.

    Parameters
    ----------
    A : ndarray
        MVAR parameter array of size [nsignals, nsignals, order]

    Returns
    -------
    ndarray
        MVAR parameter matrix in companion form

    """

    nsignals = A.shape[0]
    order = A.shape[2]
    companion = np.eye(nsignals*(order), k=nsignals, dtype=A.dtype).T
    companion[:nsignals, :] = A.reshape((nsignals, nsignals*order), order='F')

    return companion


[docs]def get_residuals(data, parameters, delay_vect, backwards=False, mode='valid'): """This is a helper function for computing the residuals of a dataset after the MVAR predictions have been removed. Parameters ---------- data : ndarray Data to compute the residuals from, of size [nchannels x nsampes x nrealisations] parameters : ndarray MVAR parameter matrix, of size [nchannels x nchannels x model order] delay_vect : ndarray Vector of lag indices corresponding to the third dimension of the parameter matrix backwards : bool Flag indicating whether the forwards or backwards parameters havebeeen passed (Default value = False) mode : {'valid','full_nan','full'} Options for excluding or replacing residuals which do not have a full model prediction ie the third sample of an order 5 model. 'valid' removes samples without a full model prediction, 'full_nan' returns resids of the same size as the data with nans replacing excluded samples and 'full' returns resids keeping non-full model samples in place. Returns ------- ndarray Residual data, of size [nchannels x nsamples x nrealisations] """ # TODO: double check that we are including all the order resid = np.zeros_like(data, dtype=data.dtype) # for each epoch... for ntrial in range(data.shape[2]): if backwards: pred = np.zeros_like(data[:, :, 0], dtype=data.dtype) for m in range(1, len(delay_vect)): tmp = parameters[:, :, m].dot(data[:, :, ntrial]) pred[:, :-delay_vect[m]] += tmp[:, delay_vect[m]:] resid[:, :, ntrial] = data[:, :, ntrial] - pred[...] else: pred = np.zeros_like(data[:, :, 0], dtype=data.dtype) for m in range(1, len(delay_vect)): tmp = parameters[:, :, m].dot(data[:, :, ntrial]) pred[:, delay_vect[m]:] += tmp[:, :-delay_vect[m]] resid[:, :, ntrial] = data[:, :, ntrial] - pred[...] resid = get_valid_samples(resid, delay_vect, mode=mode) return resid
__all__.append('get_residuals')
[docs]class AbstractLinearModel(object): """This is a base class defining data storage and generic methods for LinearModel classes. New LinearModels should inherit from AbstractLinearModel and overload the fit_model method""" # This is only used within Anamnesis hdf5_outputs = ['data_cov', 'resid_cov', 'parameters', 'maxorder', 'delay_vect'] @property def nsignals(self): """Number of signals in fitted model""" return self.parameters.shape[0] @property def order(self): """Order of fitted model""" return self.parameters.shape[2] - 1 @property def companion(self): """Parameter matrix in companion form""" return A_to_companion(self.parameters[:, :, 1:])
[docs] def compute_diagnostics(self, data, compute_pc=False): """Compute several diagnostic metrics from a fitted MVAR model and a dataset. Parameters ---------- data : ndarrat The data to compute the model diagnostics with, typically the same data used during model fit. compute_pc : bool Flag indicating whether the compute the percent consistency, this is typically the the longest computation of all the metrics here (Default value = False) Returns ------- sails.ModelDiagnostics object containing LL, AIC, BIC, stability ratio, durbin-watson, R squared and percent consistency measures """ from .diags import ModelDiagnostics return ModelDiagnostics.compute(self, data, compute_pc=compute_pc)
[docs] def get_residuals(self, data, mode='valid'): """Returns the prediction error from a fitted model. This is a wrapper function for get_residuals() Parameters ---------- data : ndarray Data to compute the residuals of, of size [nchannels x nsamples x nrealisations] Returns ------- ndarray Residual data """ if self.parameters.ndim == 3: resid = get_residuals(data, self.parameters, self.delay_vect, backwards=False, mode=mode) elif self.parameters.ndim == 4: # We have different parameters for each epoch resid = get_residuals(np.atleast_3d(data[..., 0]), self.parameters[..., 0], self.delay_vect, backwards=False, mode=mode) for e in range(1, self.parameters.shape[3]): tmp = get_residuals(np.atleast_3d(data[..., e]), self.parameters[..., e], self.delay_vect, backwards=False, mode=mode) resid = np.concatenate((resid, tmp), axis=2) return resid
__all__.append('AbstractLinearModel')
[docs]class VieiraMorfLinearModel(AbstractLinearModel): """A class implementing the Vieira-Morf linear model fit""" # This is only used within Anamnesis hdf5_outputs = AbstractLinearModel.hdf5_outputs + ['bwd_parameters']
[docs] def get_residuals(self, data, forward_parameters=True, mode='valid'): """Returns the prediction error from a fitted model. This is a wrapper function for get_residuals() Parameters ---------- data : ndarray Data to compute the residuals from, of size [nchannels x nsamples x nrealisations] forward_parameters : bool If True, use forward parameters, otherwise use backward parameters (Default value = True) Returns ------- ndarray Residual data """ if not forward_parameters: if self.bwd_parameters.ndim == 3: resid = get_residuals(data, self.bwd_parameters, self.delay_vect, backwards=True, mode=mode) elif self.bwd_parameters.ndim == 4: # We have different parameters for each epoch resid = get_residuals(data[..., 0], self.bwd_parameters[..., 0], self.delay_vect, backwards=True, mode=mode) for e in range(1, self.bwd_parameters.shape[3]): tmp = get_residuals(data[..., e], self.bwd_parameters[..., e], self.delay_vect, backwards=True, mode=mode) resid = np.concatenate((resid, tmp), axis=2) else: resid = AbstractLinearModel.get_residuals(self, data, mode=mode) return resid
[docs] @classmethod def fit_model(cls, data, delay_vect): """Estimates the multichannel autoregressive spectral estimators using a Vieira-Morf algorithm. Equations are referenced to [Marple1987]_, appendix 15.B. This is the multitrial versions of the algorithm, using the AMVAR method outlined in [Ding2000]_. Parameters ---------- data : numpy.ndarray array of shape [nsignals, nsamples, ntrials] delay_vect : numpy.ndarray Vector containing evenly spaced delays to be assessed in the model. Delays are represented in samples. Must start with zero. Returns ------- sails.VieiraMorfLinearModel A populated object containing the fitted forward and backwards coefficients and several other useful variables and methods. """ # Create object ret = cls() # Shorter reference X = data # Set-up initial parameters delay_vect = delay_vect.astype(int) nsignals = X.shape[0] ntrials = X.shape[2] maxorder = delay_vect.shape[0]-1 # Begin model fitting # Relevant parameters: # A - Forward linear prediction coefficients matrix # B - Backward linear prediction coefficients matrix # PF - Forward linear prediction error covariance # PB - Backward linear prediction error covariance # PFH - Estimate of forward linear prediction error covariance # PBH - Estimate of backward linear prediction error covariance # PFBH - Estimate of linear prediction error covariance # RHO - Estimate of the partial correlation matrix # EF - Forward linear prediction error # EB - Backward linear prediction error # Initialisation # Eq 15.91 # Initialise prediction error as the data EF = X.copy().astype(X.dtype) EB = X.copy().astype(X.dtype) # Eq 15.90 # Initialise error covariance as the data covariance PF = find_cov_multitrial(X, X) PB = find_cov_multitrial(X, X) # Set order zero coefficients to identity A = np.ndarray((nsignals, nsignals, maxorder+1), dtype=X.dtype) A[:, :, 0] = np.eye(nsignals) # TODO: double check that this is correct behaviour B = np.ndarray((nsignals, nsignals, maxorder+1), dtype=X.dtype) B[:, :, 0] = np.zeros(nsignals) # Main Loop M = 0 while M < maxorder: # Create clean arrays for the estimates of the error covariance PFH = np.zeros((nsignals, nsignals)) PBH = np.zeros((nsignals, nsignals)) PFBH = np.zeros((nsignals, nsignals)) # Eq 15.89 - get estimates of forward and backwards covariance PFH = find_cov_multitrial(EF[:, delay_vect[M+1]:, :], EF[:, delay_vect[M+1]:, :]) PBH = find_cov_multitrial(EB[:, delay_vect[M]:-delay_vect[1], :], EB[:, delay_vect[M]:-delay_vect[1], :]) PFBH = find_cov_multitrial(EF[:, delay_vect[M+1]:, :], EB[:, delay_vect[M]:-delay_vect[1], :]) # Eq 15.88 - compute estimated normalised partial correlation # matrix tmp = np.linalg.inv(np.linalg.cholesky(PFH)).dot(PFBH) chol = np.linalg.cholesky(PBH) RHO = tmp.dot(np.array(np.mat(np.linalg.inv(chol)).H)) M += 1 # Eq 15.82 & 15.83 - Update forward and backward reflection # coefficients tmp = np.linalg.cholesky(PF).dot(RHO) A[:, :, M] = - tmp.dot(np.linalg.inv(np.linalg.cholesky(PB))) tmp = np.linalg.cholesky(PB).dot(np.array(np.mat(RHO).H)) B[:, :, M] = - tmp.dot(np.linalg.inv(np.linalg.cholesky(PF))) # Eq 15.75 & 15.76 - Update forward and backward error covariances tmp = np.eye(nsignals) - (A[:, :, M].dot(B[:, :, M])) PF = tmp.dot(PF) tmp = np.eye(nsignals) - (B[:, :, M].dot(A[:, :, M])) PB = tmp.dot(PB) # might not need this if statement, subsequent xrange will handle # it, # though this is clearer if M > 1: A_t = A.copy() B_t = B.copy() # We are working from the equations - the FORTRAN is wrong! # Eq 15.71 & 15.72 - Update forward and backward predictor # coefficients for k in range(1, M): mk = M-k # TODO - double check backwards coeffs against Ding2000 # simulations A[:, :, k] = A_t[:, :, k] + A[:, :, M].dot(B_t[:, :, mk]) B[:, :, k] = B_t[:, :, k] + B[:, :, M].dot(A_t[:, :, mk]) # Eq 15.84 & 15.85 - Update the residuals for t in range(ntrials): tmp = EF[:, :, t].copy() tmp2 = EB[:, :, t].copy() update_A = A[:, :, M].dot(tmp2[:, :-delay_vect[1]]) EF[:, delay_vect[1]:, t] = tmp[:, delay_vect[1]:] + update_A update_B = B[:, :, M].dot(tmp[:, delay_vect[1]:]) EB[:, delay_vect[1]:, t] = tmp2[:, :-delay_vect[1]] + update_B # Sanity check for single channel case if nsignals == 1 and np.allclose(PF, PB) is False: ret.status = -1 warnings.warn("Warning: problem with model fit. PF != PB. " "See single channel identity in Marple " "top of p405") # Correct for sign flip in VM estimatation ret.parameters = -A ret.bwd_parameters = -B # Populate return object ret._data = X ret.maxorder = maxorder ret.delay_vect = delay_vect ret.data_cov = find_cov_multitrial(X, X) resids = ret.get_residuals(X) ret.resid_cov = find_cov_multitrial(resids, resids) return ret
__all__.append('VieiraMorfLinearModel')
[docs]class OLSLinearModel(AbstractLinearModel): """A class implementing ordinary least squares linear model fit"""
[docs] @classmethod def fit_model(cls, data, delay_vect): """This is a class method which fits a linear model and returns a populated :class:`OLSLinearModel` instance containing the fitted model. Parameters ---------- data : ndarray The data to be used to compute the model fit delay_vect : ndarray A vector of lag indices defining the lags to fit Returns ------- sails.OLSLinearModel A populated object containing the fitted coefficients and several other useful variables and methods. """ # Create object ret = cls() # Set-up initial parameters nsignals = data.shape[0] nsamples = data.shape[1] maxorder = delay_vect.shape[0]-1 maxlag = delay_vect[-1].astype(int) # Preallocate design matrix X = np.zeros((nsamples-maxlag, nsignals*maxorder)) # Preallocate observation matrix Y = np.zeros((nsamples-maxlag, nsignals)) # Create design matrix for idx in range(nsamples-maxlag): X[idx, :] = data[:, -delay_vect[1::].astype(int)+idx].reshape(-1) Y[idx, :] = data[:, idx, 0] # B is shaped [maxorder*nsignals, nsignals] # # [ A[1, 1, 1] A[1, 1, 2] ... A[1, 1, p] A[1, 2, 1] A[1, 2, 2] ... A[1, 2, p] ] # [ A[2, 1, 1] A[2, 1, 2] ... A[2, 1, p] A[2, 2, 1] A[2, 2, 2] ... A[2, 2, p] ] # Normal equations B = np.linalg.inv(X.T.dot(X)).dot(X.T).dot(Y) # Reshape output parameters = np.zeros((nsignals, nsignals, maxorder+1)) parameters[..., 0] = -np.eye(nsignals) for idx in range(nsignals): parameters[:, idx, 1:] = B[idx*maxorder:(idx+1)*maxorder].T resids = get_residuals(data, -parameters, delay_vect) ret.data_cov = find_cov_multitrial(data, data) ret.resid_cov = find_cov_multitrial(resids, resids) ret.parameters = parameters ret.maxorder = maxorder ret.delay_vect = delay_vect return ret
__all__.append('OLSLinearModel')
[docs]def sliding_window_fit(model_class, data, delay_vect, win_len_samp, win_step_samp, compute_diagnostics=True): """A helper function for fitting many MVAR models within sliding windows across a dataset. Parameters ---------- model_class : LinearModel The SAILS linear model typee to fit data : ndarray Data to fit the model on, of size [nchannels x nsamples x nrealisations] delay_vect : ndarray A vector of lags specifying which lags to fit win_len_samp : int The window length in samples win_step_samp : int The step size between windows in samples compute_diagnostics : boolean Flag indicating whether to compute model diagnostics at each window (Default value = True) Returns ------- LinearModel An instance of linear model passed into the function containing MVAR parameters for all windows. The parameters are stored in model.parameters which is of size: [nchannels x nchannels x model order x nwindows] """ # Compute start and end samples of our windows start_pts = list(range(0, data.shape[1] - max(delay_vect) - win_len_samp, win_step_samp)) end_pts = [x + win_len_samp for x in start_pts] # Preallocate model outputs params = np.zeros((data.shape[0], data.shape[0], len(delay_vect), len(start_pts))) resid_cov = np.zeros((data.shape[0], data.shape[0], len(start_pts))) # Preallocate diagnostic outputs if compute_diagnostics: diag = [] # Main model loop x = np.zeros((win_len_samp, len(start_pts))) for ii in range(len(start_pts)): x = data[:, start_pts[ii]:end_pts[ii], :] # Fit model m = model_class.fit_model(x, delay_vect) params[:, :, :, ii] = m.parameters resid_cov[:, :, ii] = m.resid_cov # Compute diagnostics if compute_diagnostics: diag.append(m.compute_diagnostics(x)) # Create output class M = model_class() M.parameters = params M.resid_cov = resid_cov M.delay_vect = delay_vect M.win_len_samp = win_len_samp M.win_step_samp = win_step_samp M.time_vect = (np.array(start_pts) + np.array(end_pts)) / 2 if compute_diagnostics: D = ModelDiagnostics.combine_diag_list(diag) if compute_diagnostics: return M, D else: return M
__all__.append('sliding_window_fit') def coloured_noise_fit(X, model_order): """A helper function to fit an autoregressive model whose parameters are constrained to be coloured noise as definede by [Kasdin1995]_. Parameters ---------- X : ndarray The data to compute the model fit on model_order : int The maximum lag to be computed Returns ------- LinearModel A SAILS linear model instance containing the fitted model float The power of the final model fit. """ from .tutorial_utils import generate_pink_roots, model_from_roots # Initialise returned model M = AbstractLinearModel() M.parameters = np.zeros((X.shape[0], X.shape[0], model_order, X.shape[2])) M.resid_cov = np.eye(X.shape[0]) # Assumed for now M.delay_vect = np.arange(model_order) # Range of 1/f^alpha values to explore powers = np.linspace(.05, 1.95, 200) fit_inds = np.zeros((X.shape[0],), dtype=int) for ichan in range(X.shape[0]): ss = np.zeros(powers.shape) for ii in range(len(powers)): rts = generate_pink_roots(powers[ii], order=model_order) m = model_from_roots(rts) resid = get_residuals(X[None, ichan, :, :], m.parameters, np.arange(model_order)) ss[ii] = np.power(resid, 2).sum() fit_inds[ichan] = np.argmin(ss).astype(int) rts = generate_pink_roots(powers[fit_inds[ichan]], order=model_order) M.parameters[ichan, ichan, :, 0] = -np.poly(rts) fit_powers = powers[fit_inds] return M, fit_powers __all__.append('coloured_noise_fit')