Source code for sails.modal

#!/usr/bin/python

# vim: set expandtab ts=4 sw=4:

import scipy
from scipy import signal
import numpy as np
import matplotlib.pyplot as plt

from sails.support import ensure_leading_negative
from sails.modelfit import coloured_noise_fit

__all__ = []


[docs]def adjust_phase(x): """ Method for normalising eigenvector matrices to meet the conditions in eqn 21 of Neumaier & Schneider 2001. :param x: Vector of complex numbers to adjust the phase of :returns: Vector of numbers with adjusted phase """ ox = np.zeros_like(x) # Rows of x for j in range(x.shape[0]): a = np.atleast_2d(np.real(x[j, :])).T b = np.atleast_2d(np.imag(x[j, :])).T tmp = np.dot(b.T, b) - np.dot(a.T, a) phi = .5 * np.arctan(2 * np.sum(a*b / tmp)) bnorm = np.linalg.norm(np.sin(phi) * a + np.cos(phi) * b) anorm = np.linalg.norm(np.cos(phi) * a + np.sin(phi) * b) if bnorm > anorm: if phi < 0: phi = phi - (np.pi/2.) else: phi = phi + (np.pi/2.) ox[j, :] = x[j, :] * np.exp((0+1j) * phi) return ox
__all__.append('adjust_phase')
[docs]class MvarModalDecomposition(object): """ Object which calculates the modal decomposition of a fitted model derived from the AbstractLinearModel class. """ # This is only used when used within NAF hdf5_outputs = ['nsignals', 'order', 'companion', 'evals', 'evecsr', 'evecsl', 'decoupled_cov', 'abs_lambda_sq', 'excitation', 'modeinds', 'freq_vect', 'period', 'dampening_time', 'freq_vect_hz', 'peak_frequency', 'dampening_time_hz', 'freq_response', 'phase_response', 'pole_pairs', 'residue', 'freq_response_modal', 'phase_response_modal', 'nmodes', 'delay_vect']
[docs] @classmethod def initialise(cls, model, sample_rate=None, normalise=False): """ Compute the modal pole-residue decomposition of the A matrix from a fitted MVAR model. Currently only implemented for a single realisation (A.ndim == 3) :param model: Fitted model object. Must be derived from AbstractLinearModel. :param sample_rate: sample rate of system in Hz :param normalise: Whether to adjust the phase of the eigenvectors :returns: Modal decomposition object """ # Create object ret = cls() A = model.parameters nsignals = ret.nsignals = A.shape[0] ret.order = A.shape[2] - 1 nmodes = nsignals * ret.order if A.ndim == 4: pass else: # Add dummy epoch A = A[..., None] A = ensure_leading_negative(A) if model.resid_cov.ndim == 2: # Add a dummy epoch model.resid_cov = model.resid_cov[..., None] nrealisations = A.shape[3] # preallocate arrays ret.evals = np.zeros((nmodes, nrealisations), dtype=complex) ret.evecsr = np.zeros((nmodes, nmodes, nrealisations), dtype=complex) ret.evecsl = np.zeros((nmodes, nmodes, nrealisations), dtype=complex) ret.modeinds = np.zeros((nmodes, nrealisations), dtype=int) ret.pole_pairs = np.zeros_like(ret.evals, dtype=int) * np.nan for ii in range(nrealisations): # Generate companion form from .modelfit import A_to_companion ret.companion = A_to_companion(A[:, :, 1:, ii]) # Roots are now the eigenvalues of the companion matrix ret.evals[:, ii], ret.evecsl[:, :, ii], ret.evecsr[:, :, ii] = \ scipy.linalg.eig(ret.companion, left=True, right=True) # Apply normalisation from http://climate-dynamics.org/wp-content/uploads/2015/08/arfit.pdf if normalise: ret.evecsr[:, :, ii] = adjust_phase(ret.evecsr[:, :, ii].T).T # take left eigenvecs from right, might preserve scaling better ret.evecsl[:, :, ii] = np.linalg.inv(ret.evecsr[:, :, ii].conj().T) ret.modeinds[:, ii] = np.argsort(np.abs(ret.evals[:, ii]))[::-1] # Loop through poles with an imaginary part and find pairings, poles # with a zero imaginary part are left indexed as nan ppidx = 0 for idx in range(len(ret.pole_pairs)): if ret.evals[idx].imag > 0: # Find the conjugate pair cidx = (abs(ret.evals[idx] - ret.evals.conj())).argmin() ret.pole_pairs[idx] = ppidx ret.pole_pairs[cidx] = ppidx ppidx += 1 # Compute the period and dampening time per mode # We ignore /0 warnings as they are expected for real modes old_warning = np.geterr()['divide'] np.seterr(divide='ignore') ret.period = 2*np.pi / np.abs(np.angle(ret.evals)) ret.dampening_time = -1.0 / np.log(np.abs(ret.evals)) np.seterr(divide=old_warning) if sample_rate is not None: ret.peak_frequency = sample_rate / ret.period ret.dampening_time_hz = sample_rate / ret.dampening_time else: ret.peak_frequency = None ret.dampening_time_hz = None ret.nmodes = nmodes ret.delay_vect = model.delay_vect return ret
@property def period_hz(self): """This is an old and deprecated way of accessing peak_frequency""" return self.peak_frequency
[docs] def compute_residues(self): """ Compute the residue matrix from the eigenvectors associated with each pole. """ # Compute the residue matrices and frequency responses per mode residue = np.zeros((self.nsignals, self.nsignals, self.nmodes), dtype=complex) for imode in range(len(self.evals)): # Compute residue matrix from left and right eigenvectors l = np.atleast_2d(self.evecsl[:self.nsignals, imode]).conj() r = np.atleast_2d(self.evecsr[:self.nsignals, imode]).T residue[:, :, imode] = l.dot(r).T return residue
[docs] def compute_modal_parameters(self): """ Compute the order 1 or 2 autoregressive coefficients associated with each pole or conjugate pair. """ # Compute the residue matrices and frequency responses per mode residue = self.compute_residues() A = np.zeros((self.nsignals, self.nsignals, 3, self.pole_pairs.max()+1)) # Loop through nodes and connections for ii in np.arange(self.nsignals): for jj in np.arange(self.nsignals): # For each pole pair for kk in np.arange(self.pole_pairs.max()+1): inds = self.pole_pairs[:, 0] == kk b, a = signal.invresz(residue[ii, jj, inds].squeeze(), self.evals[inds, 0], [0]) # I'm scaling the ar coefficients by the leading numerator coeff. # This scaling doesn't affect the roots (ie peak frequency) but # will show up in the relative measures. A[ii, jj, :len(a), kk] = a * b[0] return A
[docs] def modal_freqz(self): """ Compute filter characteristics for each pole """ # Compute the frequency response for the whole system w, h = signal.freqz(1, self.evals) mag_response = np.abs(h) phase_response = np.unwrap(np.angle(h)) return w, mag_response, phase_response
[docs] def excitation(self, resid_cov): """ http://climate-dynamics.org/wp-content/uploads/2015/08/arfit.pdf """ # Compute the excitation per mode inv_evecsr = np.linalg.inv(self.evecsr) decoupled_cov = inv_evecsr[:, :self.nsignals].dot(resid_cov[:, :, 0]) decoupled_cov = decoupled_cov.dot(inv_evecsr[:, :self.nsignals].conj().T) abs_lambda_sq = np.power(np.abs(self.evals), 2) excitation = (np.diag(decoupled_cov) / (1 - abs_lambda_sq)).real excitation = excitation / np.sum(excitation) return excitation
[docs] def transfer_function(self, sample_rate, freq_vect, modes=None, sum_modes=True): """ Compute the transfer function in pole-residue form, splitting the system into modes with separate transfer functions. The full system transfer function is then a linear sum of each modal transfer function. :param sample_rate: sample rate of system in Hz :param freq_vect: Vector of frequencies at which to evaluate function :param modes: List of mode indices to evaluate over (optional) :param sum_modes: Boolean indicating whether to sum modes or return all """ if (sample_rate is None and freq_vect is not None) or \ (sample_rate is not None and freq_vect is None): raise ValueError('Please define both sample_rate and ' 'freq_vect (or neither to return ' 'normalised frequency 0->pi)') if freq_vect is None and sample_rate is None: # Use normalised frequency freq_vect_rads = np.linspace(0, np.pi, 512) else: # convert user defined frequecies to radians freq_vect_rads = (freq_vect / (sample_rate / 2.)) * np.pi # Allow us to pick out which modes we want if modes is None: modes = list(range(len(self.evals))) if sum_modes is False: nmodes = len(modes) else: nmodes = 1 # Compute transfer function per mode residue = self.compute_residues() H = np.zeros((residue.shape[0], residue.shape[0], len(freq_vect_rads), nmodes), dtype=complex) # Compute point on unit circle for each frequency z = np.cos(freq_vect_rads) + 1j*np.sin(freq_vect_rads) idx = 0 for imode in modes: for ifreq in range(len(freq_vect_rads)): # Compute transfer function for this frequency and this mode num = residue[:, :, imode] * z[ifreq] dom = z[ifreq] - self.evals[imode] if sum_modes: H[:, :, ifreq, 0] += num / dom else: H[:, :, ifreq, idx] = num / dom idx += 1 return H
[docs] def per_mode_transfer_function(self, sample_rate, freq_vect): """ Extracts the transfer function for each mode by summing the transfer function across pole-pairs where necessary :param sample_rate: sample rate of system in Hz :param freq_vect: Vector of frequencies at which to evaluate function """ H = self.transfer_function(sample_rate, freq_vect, sum_modes=False) mi = self.mode_indices ret = np.zeros((len(freq_vect), len(mi)), dtype=complex) for idx, i in enumerate(mi): ret[:, idx] = np.sum(H[..., i], axis=3).squeeze() return ret
[docs] def modal_transfer_function(self, sample_rate, modes=None): """ Compute a transfer function matrix based on the excitation period for each mode :param sample_rate: sample rate of system in Hz :param modes: List of mode indices to evaluate over (optional) """ # Allow us to pick out which modes we want if modes is None: modes = list(range(len(self.evals))) # Compute transfer function per mode H = np.zeros((self.residue.shape[0], self.residue.shape[0], len(modes)), dtype=complex) idx = 0 for imode in modes: fv_rad = (self.peak_frequency[imode] / (sample_rate / 2.)) * np.pi # Compute point on unit circle for the relevant frequency z = np.cos(fv_rad) + 1j*np.sin(fv_rad) # Compute tranfer function for this frequency and this mode num = self.residue[:, :, imode] * z dom = z - self.evals[imode] H[:, :, idx] = num / dom idx += 1 return H
def get_mode_inds(self, fmin=0, fmax=None, mag_thresh=0, resid_thresh=0, index_mode='inclusive'): if fmax is None: fmax = self.freq_vect_hz[-1]/2 if index_mode == 'inclusive': inds = (self.peak_frequency >= fmin) * (self.peak_frequency <= fmax) if mag_thresh is not None: inds = inds * (np.abs(self.evals) >= mag_thresh) if resid_thresh is not None: inds = inds * (np.abs(self.residue_norm[:, None]) >= resid_thresh) elif index_mode == 'exclusive': inds = (self.peak_frequency > fmin) * (self.peak_frequency < fmax) if mag_thresh is not None: inds = inds * (np.abs(self.evals) > mag_thresh) if resid_thresh is not None: inds = inds * (np.abs(self.residue_norm[:, None]) > resid_thresh) else: raise ValueError("index_mode {0} not recognised, please use" "'inclusive' or 'exclusive'".format(index_mode)) return np.where(inds)[0] # TODO: Validate this function
[docs] def modal_timecourse(self, data): """ Compute the modal time-course from a dataset and a fitted model. This is the tranformation of a set of [nsignals x time] data observations into modal coordinates of shape [nmodes x time]. :param data: TODO """ # Preallocate modal time-course array r = np.zeros((self.nmodes, data.shape[1], data.shape[2])) for idx in range(self.order, data.shape[1]): # Get modal time-course for this sample for iep in range(data.shape[2]): # Generate delay embedded data for this sample deldat = np.zeros((self.nsignals*(self.order-1),)) for idelay in range(1, len(self.delay_vect)): deldat[(idelay-1)*self.nsignals:idelay*self.nsignals] = \ data[:, idx-idelay, iep] r[:, idx, iep] = self.companion.dot(deldat) return r
@property def residue_norm(self): """ TODO """ nmodes = len(self.modeinds) residue = self.compute_residues() return np.linalg.norm(residue.reshape(-1, nmodes), axis=0) @property def mode_indices(self): """ Returns a list of tuples of mode indices where each tuple contains the indices of either a pole-pair or an unpaired pole :return: list of tuples containing indices into the modes """ modes = [] handled_nodes = np.zeros_like(self.pole_pairs) for k in range(self.pole_pairs.shape[0]): # Deal with nodes we've already handled as conjugate pairs if handled_nodes[k] == 1: continue # Is this a real mode if np.isnan(self.pole_pairs[k]): # Single, real mode modes.append((k,)) handled_nodes[k] = 1 else: # Need to find pair index pidx = self.pole_pairs[k] indices = np.where(self.pole_pairs == pidx)[0] modes.append(tuple(indices)) for idx in indices: handled_nodes[idx] = 1 return modes
[docs] def pole_plot(self, ax=None, normscale=50, plottype='unitcircle'): """ Plot the poles of the current system. :param ax: Optional axis on which to plot :param normscale: Scaling factor for points on plot :param plottyle: String defining which plot to make, can be 'unitcircle' or 'eigenspectrum' :return: Reference to axes on which plot was drawn """ if ax is None: plt.figure() ax = plt.subplot(111) norm_factors = self.residue_norm / self.residue_norm.max() if plottype == 'unitcircle': x = np.cos(np.linspace(0, 2*np.pi, 128)) y = np.sin(np.linspace(0, 2*np.pi, 128)) plt.grid(True) ax.plot(x, y, 'k') ax.plot(.75*x, .75*y, 'k--', linewidth=.2) ax.plot(.5*x, .5*y, 'k--', linewidth=.2) ax.plot(.25*x, .25*y, 'k--', linewidth=.2) ax.plot(1.15*x[8:25], 1.15*y[8:25], 'k') ax.arrow(1.15*x[23], 1.15*y[23], x[24]-x[23], y[24]-y[23], head_width=.05, color='k') handled_nodes = np.zeros_like(self.pole_pairs) for k in range(self.pole_pairs.shape[0]): # Deal with nodes we've already handled as conjugate pairs if handled_nodes[k] == 1: continue # Need to find pair index pidx = self.pole_pairs[k] # Deal with real pole case if np.isnan(pidx): indices = [k] else: indices = np.where(self.pole_pairs == pidx)[0] # Stash the real and imaginary parts r = [] i = [] ms = None for idx in indices: handled_nodes[idx] = 1 r.append(self.evals[idx].real) i.append(self.evals[idx].imag) # The scaling should be the same for the conjugate pair if ms is None: ms = norm_factors[idx]*normscale # Plot them all together to maintain the same colour # for each pair plt.plot(r, i, '+', ms=ms, markeredgewidth=2) ax.set_xlabel('Real') ax.set_ylabel('Imaginary') ax.set_ylim(-1.2, 1.2) ax.set_xlim(-1.2, 1.2) ax.annotate('Frequency', xy=(.56, 1.02)) elif plottype == 'eigenspectrum': mags = np.abs(self.evals) mags = mags / (1-mags) if self.peak_frequency is None: x_vect = self.period xlabel = 'Period (samples)' else: x_vect = self.peak_frequency xlabel = 'Frequency (Hz)' yt = np.linspace(0, 1., 11) yt_scaled = yt / (1-yt) ax.scatter(x_vect, mags, c='k', marker='o', s=norm_factors*normscale) ax.set_yticks(yt_scaled[:-2]) ax.set_yticklabels(yt[:-2]) ax.grid(True) ax.set_xlabel(xlabel) ax.set_ylabel('Pole Magnitude') return ax
__all__.append('MvarModalDecomposition') def find_noise_poles(modes, model, X, sample_rate, metric='diff', include_real_poles=True): coloured_noise, f_alphas = coloured_noise_fit(X, model.order) coloured_modes = MvarModalDecomposition.initialise(coloured_noise, sample_rate=sample_rate) from scipy import interpolate hz, I = np.unique(coloured_modes.peak_frequency[:, 0], return_index=True) pchip = interpolate.pchip(coloured_modes.peak_frequency[I, 0], np.abs(coloured_modes.evals[I, 0])) I = np.argsort(modes.peak_frequency[:, 0]) interp_poles = pchip(modes.peak_frequency[:, 0]) orig_poles = np.abs(modes.evals[:, 0]) if metric == 'diff': vals = (orig_poles-interp_poles) elif metric == 'diff_noisenorm': vals = (orig_poles-interp_poles) / interp_poles elif metric == 'diff_datanorm': vals = (orig_poles-interp_poles) / orig_poles elif metric == 'diff_sumnorm': vals = (orig_poles-interp_poles) / (orig_poles+interp_poles) thresh = 0 if include_real_poles: good_modes = np.where(vals > thresh)[0] else: good_modes = np.where((vals > thresh) & (np.abs(modes.evals[:, 0].imag) > 1e-10))[0] bad_modes = np.setdiff1d(np.arange(modes.nmodes), good_modes) return good_modes, bad_modes, vals def get_noise_thresh_gamma(modes, fmin, fmax): from scipy.stats import gamma x = np.linspace(0, 1, 1024) freq_centres = np.linspace(fmin, fmax, 64) thresh = np.zeros_like(freq_centres) for ii in range(len(freq_centres)): inds = modes.get_mode_inds(fmin=freq_centres[ii]-2, fmax=freq_centres[ii]+2) m = np.abs(modes.evals[inds]) mle_tuple = gamma.fit(1-m, 1) rv1 = gamma.pdf(x=x, a=mle_tuple[0], loc=mle_tuple[1], scale=mle_tuple[2]) thresh[ii] = 1-x[np.argmax(rv1)] from scipy import interpolate f = interpolate.interp1d(freq_centres, thresh, bounds_error=False) return freq_centres, thresh, f(modes.peak_frequency) def cluster_modes(X, n_clusters, pca_dims): """ X: array [vals,nmodes] """ from sklearn.decomposition import PCA from sklearn.cluster import KMeans pc = PCA(n_components=pca_dims, whiten=True, copy=True).fit(X) km = KMeans(n_clusters=n_clusters) clusters = km.fit_predict(pc.components_.T) return clusters, km.inertia_ def get_cluster_average(clusters, vals): """ clusters: int array [nmodes,] (cluster inds include zero) vals: array [...,nmodes] """ out = np.zeros((vals.shape[:-1], clusters.max()+1)) for c in range(clusters.max()+1): out[..., c] = vals[..., clusters == c].mean(axis=-1) return out