Source code for sails.mvar_metrics

#!/usr/bin/python

# vim: set expandtab ts=4 sw=4:

import numpy as np

from sails.support import ensure_leading_positive, ensure_leading_negative
from sails import plotting

__all__ = []


[docs]class AbstractMVARMetrics(object): """This is a base class for computing various connectivity metrics from fitted autoregressive models""" @property def S(self): """Spectral Matrix of shape [nsignals, nsignals, order, epochs]""" return spectral_matrix(self.H, self.resid_cov) @property def PSD(self): """Power Spectral Density matrix""" return np.abs(psd_matrix(self.S, self.sample_rate)) @property def inv_S(self): """Inverse Power Spectrl Density matrix""" return inv_spectral_matrix(self.H, self.resid_cov) @property def coherency(self): """Complex-valued coherency""" return coherency(self.S) @property def magnitude_squared_coherence(self): """Magnitude squared coherence""" return magnitude_squared_coherence(self.S) @property def imaginary_coherence(self): """Imaginary part of complex coherency""" return imaginary_coherence(self.S) @property def phase_coherence(self): """Phase angle of complex coherency""" return phase_coherence(self.S) @property def partial_coherence(self): """Partialled magnitude-squared coherence""" return partial_coherence(self.inv_S) @property def ff_directed_transfer_function(self): """Full-frequency directed transfer function""" return ff_directed_transfer_function(self.H) @property def d_directed_transfer_function(self): """Direct directed transfer function""" return d_directed_transfer_function(self.H, self.inv_S) @property def directed_transfer_function(self): """Directed transfer function""" return directed_transfer_function(self.H) @property def partial_directed_coherence(self): """Partial directed coherence""" return partial_directed_coherence(self.Af) @property def isolated_effective_coherence(self): """Isolated effective coherence""" return isolated_effective_coherence(self.Af, self.resid_cov) @property def geweke_granger_causality(self): """Geweke granger causality""" return geweke_granger_causality(self.S, self.H, self.resid_cov)
[docs] def plot_diags(self, metric='S', inds=None, F=None, ax=None): """Helper function for plotting the diagonal part of a connectivty metric. Calls sails.plotting.plot_diagonal. Parameters ---------- metric : str Name of the connectivity metric to plot (Default value = 'S') F : matplotlib figure handle Handle for matplotlib figure to plot in (Default value = None) ax : matplotlib axes handle Handle for matplotlib axes to plot in (Default value = None) """ met = getattr(self, metric) plotting.plot_diagonal(self.freq_vect, met, title=metric, F=F, ax=ax)
[docs] def plot_summary(self, metric='S', ind=1): """Helper function for plotting a summary of a connectivty metric. Calls sails.plotting.plot_summary. Parameters ---------- metric : str Name of the connectivity metric to plot (Default value = 'S') ind : int Index into frequency dimension to plot connectivity matrix (Default value = 1) """ met = getattr(self, metric) plotting.plot_metric_summary(self.freq_vect, met, ind=ind)
[docs] def get_spectral_generalisation(self, metric='S'): """Compute the spectral generalisation matrix for a given metric. Returns the spatial similarity of different frequencies. Parameters ---------- metric : str Name of the connectivity metric to plot (Default value = 'S') Returns ------- ndarray Matrix of spectral generalisation [nfrequencies x nfrequencies] """ met = np.abs(getattr(self, metric)[:, :, :, 0]) met = met.reshape((met.shape[0]**2, met.shape[2])) return met.T.dot(met)
[docs] def get_node_weight(self, metric='S'): """Compute the node weight for a given connectivity metric. Parameters ---------- metric : str Name of the connectivity metric to plot (Default value = 'S') Returns ------- ndarray Node-weight matrix of size [nchannels x nchannels] """ met = np.abs(getattr(self, metric)[:, :, :, 0]) for ii in range(met.shape[2]): met[:, :, ii] = met[:, :, ii] - np.diag(np.diag(met[:, :, ii])) nconnections = met.shape[0]+met.shape[1]-1 return met[:, 0, :] + met[0, :, :] / nconnections
__all__.append('AbstractMVARMetrics')
[docs]class ModalMvarMetrics(AbstractMVARMetrics): """This class for computing various connectivity metrics based on a Modal decomposition of a fitted MVAR model This is typically called using the :meth:`initialise` or :meth:`initialise_from_modes` classmethods to compute the decomposition and return a class instance containing them. """ # This is only used when used within Anamnesis hdf5_outputs = ['resid_cov', 'freq_vect', 'H', 'modes']
[docs] @classmethod def initialise(cls, model, sample_rate, freq_vect, sum_modes=True): """Compute some basic values from the pole-residue decomposition of the A matrix. This is a class method which creates and returns a :class:`ModalMvarMetrics` instance containing the decomposition parameters with methods computing various connectivity metrics. This class computes the transfer function H using modal parameters. Currently only implemented for a single realisation (A.ndim == 3) Parameters ---------- model : sails LinearModel SAILS class instance containing a fitted linear model. sample_rate : float Sample rate of the dataset freq_vect : ndarray Vector of specifying which frequencies to compute connectivity at. sum_modes : bool Flag indicating whether to sum across modes (Default value = True) Returns ------- sails.ModalMvarMetrics instance """ # Create object ret = cls() A = model.parameters resid_cov = model.resid_cov if A.ndim == 4: NotImplementedError('Modal Metrics only implemented for single ' 'realisations of A') else: # Add dummy epoch A = A[..., None] A = ensure_leading_negative(A) if resid_cov.ndim == 2: resid_cov = resid_cov[..., None] # Add a dummy epoch if resid_cov.shape[2] != A.shape[0]*A.shape[2]: # Repeat resid cov per epoch/mode (probably only valid for # modes...) resid_cov = np.repeat(resid_cov, A.shape[0]*A.shape[2], axis=-1) ret.resid_cov = resid_cov ret.freq_vect = freq_vect ret.sample_rate = sample_rate # Compute transfer function from .modal import MvarModalDecomposition ret.modes = MvarModalDecomposition.initialise(model, sample_rate=sample_rate, normalise=False) ret.H = ret.modes.transfer_function(sample_rate, freq_vect, sum_modes=sum_modes) return ret
[docs] @classmethod def initialise_from_modes(cls, model, modes, sample_rate, freq_vect, mode_inds=None, sum_modes=True): """Compute some basic values from the pole-residue decomposition of the A matrix. This is a class method which creates and returns a :class:`sails.ModalMvarMetrics` instance containing the decomposition parameters with methods computing various connectivity metrics. This class computes the transfer function H using modal parameters. A sub-set of modes can be used to compute a reduced H. Currently only implemented for a single realisation (A.ndim == 3) Parameters ---------- model : sails LinearModel SAILS class instance containing a fitted linear model. modes : sails MvarModalDecomposition SAILS class instance containing a fitted modal decomposition sample_rate : float The sample rate of the fitted data freq_vect : ndarray Vector of specifying which frequencies to compute connectivity at. mode_inds : ndarray Which modes to use when constructing H (Default value = None) sum_modes :Boolean Flag indicating whether to sum across modes when constructing H (Default value = True) Returns ------- sails.ModalMvarMetrics instance """ # Create object ret = cls() ret.modes = modes ret.freq_vect = freq_vect ret.sample_rate = sample_rate A = model.parameters resid_cov = model.resid_cov if A.ndim == 4: NotImplementedError('Modal Metrics only implemented for single ' 'realisations of A') else: # Add dummy epoch A = A[..., None] A = ensure_leading_positive(A) if resid_cov.ndim == 2: resid_cov = resid_cov[..., None] # Add a dummy epoch if resid_cov.shape[2] != A.shape[0]*A.shape[2]: # Repeat resid cov per epoch/mode (probably only valid for # modes...) resid_cov = np.repeat(resid_cov, A.shape[0]*A.shape[2], axis=-1) ret.resid_cov = resid_cov if mode_inds is None: mode_inds = np.arange(modes.nmodes) # Compute transfer function ret.H = ret.modes.transfer_function(sample_rate, freq_vect, modes=mode_inds, sum_modes=sum_modes) return ret
__all__.append('ModalMvarMetrics')
[docs]class FourierMvarMetrics(AbstractMVARMetrics): """This class for computing various connectivity metrics based on the Fourier transform of the MVAR co-efficients. This is typically called using the :meth:`initialise` classmethod to compute the frequency transform and return a class instance with methods for computing different connectivity estimators """ # This is only used when used within NAF hdf5_outputs = ['resid_cov', 'freq_vect', 'H', '_A', 'Af']
[docs] @classmethod def initialise(cls, model, sample_rate, freq_vect, nmodes=None): """Compute some basic values from the Fourier transform of the A matrix. This is a class method which creates and returns a :class:`FourierMvarMetrics` instance with methods computing various connectivity metrics. This class computes the transfer function H the Fourier transform. Parameters ---------- model : sails LinearModel SAILS class instance containing a fitted linear model. sample_rate : float Sample rate of the dataset freq_vect : ndarray Vector of specifying which frequencies to compute connectivity at. Returns ------- sails.FourierMvarMetrics instance """ # Create object ret = cls() A = model.parameters resid_cov = model.resid_cov if A.ndim == 3: A = A[..., None] # Add a dummy epoch ret._A = A A = ensure_leading_positive(A) if resid_cov.ndim == 2: resid_cov = resid_cov[..., None] # Add a dummy epoch if resid_cov.shape[2] != A.shape[3]: # Repeat resid cov per epoch/mode (probably only valid for # modes...) resid_cov = np.repeat(resid_cov, A.shape[3], axis=-1) ret.resid_cov = resid_cov ret.freq_vect = freq_vect ret.sample_rate = sample_rate # Get frequency transform of A ret.Af = ar_spectrum(A, resid_cov, sample_rate, freq_vect) # Get transfer function ret.H = transfer_function(ret.Af) return ret
__all__.append('FourierMvarMetrics') __all__.append('modal_transfer_function') # Spectrum estimators
[docs]def sdf_spectrum(A, sigma, sample_rate, freq_vect): """Estimate of the Spectral Density as found on wikipedia and [Quirk1983]_. This assumes that the spectral representation of A is invertable Parameters ---------- A : ndarray Matrix of autoregressive parameters, of size [nchannels x nchannels x model order] sigma : ndarray Residual covariance matrix of the modelled system sample_rate : float The samplingfrequency of the modelled system freq_vect : ndarray Vector of frequencies at which to evaluate the spectrum Returns ------- ndarray Frequenecy transform of input A, of size [nchannels x nchannels x nfrequencies] """ # model order order = A.shape[2] - 1 nsignals = A.shape[0] T = 1./sample_rate spectrum = np.zeros((nsignals, nsignals, len(freq_vect)), dtype=complex) eye = np.eye(nsignals) for node1 in range(nsignals): for node2 in range(nsignals): for idx, f in zip(list(range(len(freq_vect))), freq_vect): est = eye[node1, node2] for k in range(1, order+1): est = A[node1, node2, k] * np.exp(0-1j*2*np.pi*k*f*T) num = sigma[node1, node2] * T dom = np.abs(est)**2 spectrum[node1, node2, idx] = num / dom return spectrum
__all__.append('sdf_spectrum')
[docs]def psd_spectrum(A, sigma, sample_rate, freq_vect): """Estimate the PSD representation of a set of MVAR coefficients as stated in [Penny2009]_ section 7.4.4. This assumes that the spectral representation of A is invertable WARNING: does not behave as expected for some data, use with caution. ar_spectrum is recommended. Parameters ---------- A : ndarray Matrix of autoregressive parameters, of size [nchannels x nchannels x model order] sigma : ndarray Residual covariance matrix of the modelled system sample_rate : float The samplingfrequency of the modelled system freq_vect : ndarray Vector of frequencies at which to evaluate the spectrum Returns ------- ndarray Frequenecy transform of input A, of size [nchannels x nchannels x nfrequencies] """ # model order order = A.shape[2] - 1 nsignals = A.shape[0] T = 1./sample_rate spectrum = np.zeros((nsignals, nsignals, len(freq_vect)), dtype=complex) eye = np.eye(nsignals) for node1 in range(nsignals): for node2 in range(nsignals): for idx, f in zip(list(range(len(freq_vect))), freq_vect): # Normalised digital frequency f = f / sample_rate est = eye[node1, node2] for k in range(1, order+1): est += A[node1, node2, k] * np.exp(0-1j*2*np.pi*k*f) spectrum[node1, node2, idx] = est # Compute the PSD_mvar psd = np.zeros_like(spectrum) for idx, f in zip(list(range(len(freq_vect))), freq_vect): A_inv = np.linalg.inv(spectrum[..., idx]) # Hermitian of the inverse A_inv_herm = np.array(np.mat(A_inv).H) psd[:, :, idx] = T * (A_inv.dot(sigma).dot(A_inv_herm)) return psd
__all__.append('psd_spectrum')
[docs]def ar_spectrum(A, sigma, sample_rate, freq_vect): """Estimate the spectral representation of a set of MVAR coefficients as suggested by [Baccala2001]_, the equation without a number just below equation 13. Parameters ---------- A : ndarray Matrix of autoregressive parameters, of size [nchannels x nchannels x model order] sigma : ndarray Residual covariance matrix of the modelled system sample_rate : float The samplingfrequency of the modelled system freq_vect : ndarray Vector of frequencies at which to evaluate the spectrum Returns ------- ndarray Frequenecy transform of input A, of size [nchannels x nchannels x nfrequencies] """ # This routine assumes that our parameter matrix is of the form where # we have a leading positive. Ensure that this is the case. A = ensure_leading_positive(A) if A.ndim == 3: A = A[..., None] # model order order = A.shape[2] - 1 nsignals = A.shape[0] nepochs = A.shape[3] spectrum = np.zeros((nsignals, nsignals, len(freq_vect), nepochs), dtype=complex) for node1 in range(nsignals): for node2 in range(nsignals): for e in range(nepochs): for idx, f in zip(list(range(len(freq_vect))), freq_vect): # Normalised digital frequency f = f / sample_rate est = 0+0j for k in range(1, order+1): est -= A[node1, node2, k, e] * np.exp(0-1j*2*np.pi*k*f) spectrum[node1, node2, idx, e] = est return spectrum
__all__.append('ar_spectrum')
[docs]def transfer_function(Af): """Function for computing the transfer function of a system from the frequency transform of the autoregressive parameters. Parameters ---------- Af : Frequency domain version of parameters (can be calculated using ar_spectrum function) [nchannels x nchannels x nfreqs x nepochs] Returns ------- ndarray System transfer function, of size [nchannels x nchannels x nfreqs x nepochs] Note ---- This function computes this the transfer function :math:`H` as defined by: .. math:: H(f) = ( I - Af(f) )^{-1} where :math:`Af` is the frequency transformed autoregessive parameter matrix and :math:`I` is identity. """ # Remove identity from Af Afi = np.repeat(np.eye(Af.shape[0])[:, :, None], Af.shape[2], axis=-1)[..., None] - Af nchannels = Af.shape[0] nfreqs = Af.shape[2] nepochs = Af.shape[-1] tf = np.zeros((nchannels, nchannels, nfreqs, nepochs), dtype=complex) for e in range(nepochs): for f in range(nfreqs): tf[:, :, f, e] = np.linalg.inv(Afi[:, :, f, e]) return tf
__all__.append('transfer_function')
[docs]def spectral_matrix(H, noise_cov): """Function for computing the spectral matrix, the matrix of spectra and cross-spectra. Parameters ---------- H : ndarray The transfer matrix [nchannels x nchannels x nfreqs x nepochs] noise_cov : ndarray The noise covariance matrix of the system [nchannels x nchannels x nepochs] Returns ------- ndarray System spectral matrix, of size [nchannels x nchannels x nfreqs x nepochs] Note ---- This function computes the system spectral matrix as defined by: .. math:: S(f) = H(f)\\Sigma H(f)^H where :math:`H` is the transfer function, :math:`\\Sigma` is the residual covariance and :math:`^H` the Hermitian transpose. """ # Make sure that we have the 4th dimension if H.ndim < 4: H = H[:, :, :, None] S = np.zeros_like(H) for i in range(H.shape[2]): for j in range(H.shape[3]): S[:, :, i, j] = H[:, :, i, j].dot( noise_cov[:, :, j]).dot(H[:, :, i, j].T.conj()) return S
__all__.append('spectral_matrix') def psd_matrix(S, sample_rate): """Function for computing the power spectral density matrix with units matched to scipy.signal.welch(X, axis=1, scaling='spectrum') Parameters ---------- S : The spectral matrix [nchannels x nchannels x nfreqs x nepochs] sample_rate : The sampling rate of the system Returns ------- """ return S / sample_rate __all__.append('psd_matrix') def inv_spectral_matrix(H, noise_cov): """Function for computing the inverse spectral matrix. Parameters ---------- H : The transfer matrix [nchannels x nchannels x nfreqs x nepochs] noise_cov : The noise covariance matrix of the system [nchannels x nchannels x nepochs] Returns ------- ndarray Inverse system spectral matrix, of size [nchannels x nchannels x nfreqs x nepochs] Note ---- This function computes the inverse spectral matrix as defined by: .. math:: S(f) = ( H(f)\\Sigma H(f)^H )^{-1} where :math:`H` is the transfer function, :math:`\\Sigma` is the residual covariance and :math:`^H` the Hermitian transpose. """ inv_S = np.zeros_like(H) S = spectral_matrix(H, noise_cov) for f in range(S.shape[2]): for e in range(S.shape[3]): inv_S[:, :, f, e] = np.linalg.pinv(np.abs(S[:, :, f, e])) return inv_S __all__.append('inv_spectral_matrix')
[docs]def coherency(S): """Method for computing the Coherency. This is the complex form of coherence, from which metrics such as magnitude squared coherence can be derived Parameters ---------- S : ndarray The system spectral matrix in 3D or 4D form, of size [nchannels x nchannels x nfreqs x nepochs] Returns ------- ndarray Complex-valued coherency matrix, of size [nchannels x nchannels x nfreqs x nepochs] Note ---- This function computes the coherency as defined by: .. math:: Coh_{ij}(f) = \\frac{S_{ij}(f)}{ \\sqrt{ | S_{ii}(f)S_{jj}(f) | } } where :math:`S` is the system spectral matrix. """ coh = np.zeros_like(S) for i in range(S.shape[0]): for j in range(S.shape[0]): coh[i, j, :, :] = S[i, j, :, :] / \ (np.sqrt(np.abs(S[i, i, :, :] * S[j, j, :, :]))) return coh
__all__.append('coherency') def magnitude_squared_coherence(S): """Method for computing the Magnitude Squred Coherence. Parameters ---------- S : ndarray The system spectral matrix in 3D or 4D form, of size [nchannels x nchannels x nfreqs x nepochs] Returns ------- ndarray Magnitude squared coherence matrix, of size [nchannels x nchannels x nfreqs x nepochs] Note ---- This function computes this magnitude squared coherence as defined by: .. math:: MSC_{ij}(f) = \\frac{ |S_{ij}(f)|^2 }{ \\sqrt{ | S_{ii}(f)S_{jj}(f) | } } where :math:`S` is the system spectral matrix. This is closely related to the complex valued-coherency. """ return np.power(np.abs(coherency(S)), 2) __all__.append('magnitude_squared_coherence') def imaginary_coherence(S): """Method for computing the imaginary coherence Parameters ---------- S : ndarray The system spectral matrix in 3D or 4D form, of size [nchannels x nchannels x nfreqs x nepochs] Returns ------- ndarray Imaginary coherence matrix, of size [nchannels x nchannels x nfreqs x nepochs] Note ---- This function computes the imaginary cohereence as defined by: .. math:: iCoh_{ij}(f) = \\Im \\bigg( \\frac{ S_{ij}(f) }{ \\sqrt{ | S_{ii}(f)S_{jj}(f) | } } \\bigg) where :math:`S` is the system spectral matrix. This is closely related to the complex valued-coherency. """ return coherency(S).imag __all__.append('imaginary_coherence') def phase_coherence(S): """Method for computing the phase coherence Parameters ---------- S : ndarray The system spectral matrix in 3D or 4D form, of size [nchannels x nchannels x nfreqs x nepochs] Returns ------- ndarray Phase coherence matrix, of size [nchannels x nchannels x nfreqs x nepochs] Note ---- This function computes the phase-coherence as defined by: .. math:: pCoh_{ij}(f) = \\arg \\bigg( \\frac{ S_{ij}(f) }{ \\sqrt{ | S_{ii}(f)S_{jj}(f) | } } \\bigg ) where :math:`S` is the system spectral matrix. This is closely related to the complex valued-coherency. """ return np.angle(coherency(S)) __all__.append('phase_coherence')
[docs]def partial_coherence(inv_S): """Method for computing the Partial Coherence. Parameters ---------- inv_S : ndarray The system spectral matrix in 3D or 4D form, of size [nchannels x nchannels x nfreqs x nepochs] Returns ------- ndarray Complex-valued coherency matrix, of size [nchannels x nchannels x nfreqs x nepochs] Note ---- This function computes the partial coherence using the inverse spectral matrix :math:`P = S^{-1}` where :math:`S` is the system spectral matrix. The partial coherence is then computed as: .. math:: PCoh_{ij}(f) = \\frac{ |P_{ij}(f)|^2 }{ \\sqrt{ | P_{ii}(f)P_{jj}(f) | } } """ pcoh = np.zeros_like(inv_S) for i in range(inv_S.shape[0]): for j in range(inv_S.shape[0]): pcoh[i, j, :, :] = np.power(np.abs(inv_S[i, j, :, :]), 2) / \ (inv_S[i, i, :, :] * inv_S[j, j, :, :]) return pcoh
__all__.append('partial_coherence') def ff_directed_transfer_function(H): """Function for computing the full-frequency directed transfer function as defined in [Korzeniewska2003]_. Parameters ---------- H : ndarray The system transfer function [nchannels x nchannels x nfreqs x nepochs] Returns ------- ndarray The full-frequency directed transfer function Note ---- This function computes the directed transfer function from the system transfer function :math:`H`. .. math:: ffDTF_{ij}(f) = \\frac{ | H_{ij}(f) |^2 }{ \\sqrt{ \\sum_f \\sum_f | \\bar{H}_{ij}(f) |^2 } } This is similar to the directed transfer function with the exceptiont hat the normalisation in the denominator is summed across frequency. """ dtf = np.zeros_like(H) for e in range(H.shape[3]): for i in range(H.shape[0]): # Make the denominator for this channel over all frequencies dtf_denom = np.sum(np.power(np.abs(H[i, :, :, e]), 2)) # Estimate ffDTF for f in range(H.shape[2]): for j in range(H.shape[0]): dtf[i, j, f, e] = np.power( np.abs(H[i, j, f, e]), 2) / dtf_denom return dtf __all__.append('ff_directed_transfer_function') def d_directed_transfer_function(H, inv_S): """Function for computing the direct directed transfer function as defined in [Korzeniewska2003]_. Parameters ---------- H : The transfer matrix [nchannels x nchannels x nfreqs x nepochs] s : inv_S: The inverse spectral matrix [nchannels x nchannels x nfreqs x nepochs] inv_S : Returns ------- ndarray The direct directed transfer function Note ---- This function computes the direct directed transfer function by point-wise multiplicationo of the partial coherence :math:`P` and the full-frequency directed transfer function :math:`dDTF`. .. math:: dDTF(F) = P(f) * ffDTF(f) This is similar to the directed transfer function with the exceptiont hat the normalisation in the denominator is summed across frequency. """ return partial_coherence(inv_S) * ff_directed_transfer_function(H) __all__.append('d_directed_transfer_function')
[docs]def partial_directed_coherence(Af): """Function to estimate the partial directed coherence from a set of multivariate parameters as defined in [Baccala2001]_. Parameters ---------- Af : ndarray Frequency domain version of parameters (can be calculated using ar_spectrum function), of size [nchannels x nchannels x nfrequencies x nrealisations] Returns ------- ndarray The partial directed coherence of the system. Note ---- This function computes the partial coherence using the frequency transform of the autoregressive parameters :math:`Af` subtracted from Identity. .. math:: \\bar{Af}(f) = I - Af(f) The partial directed coherence is then .. math:: PDC_{ij}(f) = \\frac{ | \\bar{Af}_{ij}(f)| }{ \\sqrt{ \\sum_i | \\bar{Af}_{ij}(f) |^2 } } """ pdc = np.zeros_like(Af) # Remove identity from Af Afi = np.repeat(np.eye(Af.shape[0])[:, :, None], Af.shape[2], axis=-1)[..., None] - Af for e in range(Afi.shape[3]): for j in range(Afi.shape[0]): for f in range(Afi.shape[2]): pdc_denom = np.sqrt( (np.abs(Afi[:, j, f, e])**2).sum()) for i in range(Afi.shape[0]): pdc[i, j, f, e] = np.abs( Afi[i, j, f, e]) / pdc_denom return pdc
__all__.append('partial_directed_coherence')
[docs]def isolated_effective_coherence(Af, noise_cov): """Function for estimating the Isolated Effective Coherence as defined in [Pascual-Marqui2014]_. Parameters ---------- Af : ndarray Frequency domain version of parameters (can be calculated using ar_spectrum function) [nchannels x nchannels x nfreqs x nepochs] noise_cov : ndarray The noise covariance matrix of the system [nchannels x nchannels x nepochs] Returns ------- ndarray The isolated effective coherence. """ # Remove identity from Af Afi = np.repeat(np.eye(Af.shape[0])[:, :, None], Af.shape[2], axis=-1)[..., None] - Af nchannels = Afi.shape[0] nfreqs = Afi.shape[2] nepochs = Afi.shape[3] iec = np.zeros((nchannels, nchannels, nfreqs, nepochs), dtype=complex) for e in range(nepochs): # Get inverse of the residual noise covariance and set off-diagonal # elements to zero S = noise_cov[..., e] # S = np.linalg.inv(noise_cov[..., e]) iso_s = np.linalg.inv(np.diag(np.diag(S))) for f in range(nfreqs): for i in range(nchannels): for j in range(nchannels): iso_params = np.diag(np.diag(Afi[:, :, f, e])) # Set all parameters except the diagonal and connection of # interest to zero iso_params[i, j] = Afi[i, j, f, e] denom_col = iso_s[j, j] * np.abs(iso_params[j, j])**2 numerator = iso_s[i, i] * np.abs(iso_params[i, j])**2 iec[i, j, f, e] = numerator / (numerator + denom_col) return iec.real
__all__.append('isolated_effective_coherence')
[docs]def directed_transfer_function(H): """Method for computing the Directed Transfer Function as defined in [Kaminski1991]_. Parameters ---------- H : ndarray The transfer matrix [nchannels x nchannels x nfreqs x nepochs] Returns ------- ndarray The directed transfer function of the system. Note ---- This function computes the directed transfer function from the system transfer function :math:`H`. .. math:: DTF_{ij}(f) = \\frac{ | H_{ij}(f) |^2 }{ \\sqrt{ \\sum_j | \\bar{H}_{ij}(f) |^2 } } """ dtf = np.zeros_like(H) for e in range(H.shape[3]): for i in range(H.shape[0]): for f in range(H.shape[2]): # Make the denominator for this channel dtf_denom = np.sum(np.power(np.abs(H[i, :, f, e]), 2)) for j in range(H.shape[0]): dtf[i, j, f, e] = np.power( np.abs(H[i, j, f, e]), 2) / dtf_denom return dtf
__all__.append('directed_transfer_function')
[docs]def geweke_granger_causality(S, H, sigma): """This function computes the Geweke-Granger causality as defined in [Barrett2010]_ Parameters --------- S : ndarray Spectral matrix [nchannels x nchannels x nfreqs x nepochs] H : ndarray The transfer matrix [nchannels x nchannels x nfreqs x nepochs] sigma : ndarray Residual noise covariance matrix Returns ------- ndarray The Geweke-Granger causality """ nchannels = S.shape[0] nfreqs = S.shape[2] nepochs = S.shape[3] ggc = np.zeros((nchannels, nchannels, nfreqs, nepochs), dtype=complex) for e in range(nepochs): # Compute conditional sigma matrix cond_sigma = np.zeros((nchannels, nchannels)) inv_sigma = np.linalg.inv(sigma[:, :, e]) for ii in range(nchannels): for jj in range(nchannels): cond_sigma[ii, jj] = sigma[jj, jj, e] - \ sigma[ii, jj, e]*inv_sigma[ii, ii]*sigma[jj, ii, e] for f in range(nfreqs): for ii in range(nchannels): for jj in range(nchannels): denom = H[ii, jj, f, e] * cond_sigma[ii, jj] * H[ii, jj, f, e].conj() denom = np.abs(S[ii, ii, f, e] - denom) ggc[ii, jj, f, e] = np.log(np.abs(S[ii, ii, f, e]) / denom) return ggc
__all__.append('geweke_granger_causality')