#!/usr/bin/python
# vim: set expandtab ts=4 sw=4:
import math
import numpy as np
from scipy import signal, stats
from .anam import AbstractAnam, register_class
__all__ = []
def fast_resample(X, ds_factor, axis=1):
"""Fast resampling of a timeseries array. This function pads the
time-dimension to the nearest power of 2 to ensure that the resampling can
use efficient FFT routines. The padding is removed before the downsampled
data are returned.
Parameters
----------
X : ndarray
Data to be resampled
ds_factor : float
Factor of downsampling to apply. Output sample rate will be sample_rate/ds_factor
axis : int
Axis along which to perform resampling. Defaults to 1 for compatibility reasons
Returns
-------
ndarray
Resampled dataset, of size [nchannels x nsamples]
"""
orig_size = X.shape[axis]
target_size = int(X.shape[axis] // ds_factor)
tmp_size = int(2**math.ceil(math.log(X.shape[axis], 2)))
tmp_target_size = int(tmp_size // ds_factor)
# We zero-pad the data in the time dimension up to the temporary size
# Start by creating an array of appropriate shape
new_shape = list(X.shape)
new_shape[axis] = tmp_size
tmp_X = np.zeros(new_shape, dtype=X.dtype)
# Now we copy the data into the relevant part of the temporary array
slic = [slice(None)] * len(X.shape)
slic[axis] = slice(0, orig_size)
slic = tuple(slic)
tmp_X[slic] = X
# Finally, we resample along the relevant axis
resampled_X = signal.resample(tmp_X, tmp_target_size, axis=axis)
# And extract just the parts we want
slic = [slice(None)] * len(X.shape)
slic[axis] = slice(0, target_size)
slic = tuple(slic)
return resampled_X[slic]
__all__.append('fast_resample')
def epoch_data(X, segment_len=None, trials=None):
"""Epoch a continuous data array. Takes a 2d [channels x samples] array and
returns a [channels x segment_len x ntrials] array.
Data can EITHER be epoched by specifying a segment_len value or a trials
array. If a segment_len is passed then the samples array is split into
none-overlapping segments of the specified length. Any samples not fitting
into the final epoch are dropped. If a trials array (of shape [ntrials x 2]
each row containing a start_ind and a stop_ind) then X is split into the
trails it specifies.
Parameters
----------
X : 2d data array
Data to be epoched
segment_len : int > 0
Length of continuous data segments to split dataset into
trials : 2d array
Trial start and stop indices contained in an [ntrials x 2] array
Returns
-------
3d array
Epoched dataset
"""
if (X.ndim == 3) and (X.shape[0] > 0):
raise ValueError('Data is already epoched')
if (segment_len is not None) and (trials is not None):
raise ValueError("Please specify either 'segment_len' or 'trials'. Can only apply one or the other")
if segment_len is not None:
nsegs = X.shape[1] // segment_len
# Doing the reshape with FORTRAN ordering to get the desired behaviour.
# Force the array back to be memory-contiguous afterwards. (Might be a better way to do this)
Xout = X[:, :nsegs * segment_len].reshape(X.shape[0], segment_len, nsegs, order='F')
return np.ascontiguousarray(Xout)
if trials is not None:
if np.any(trials < 0):
raise ValueError("Negative trial index found, 'trials' should be strictly positive")
if np.any(trials > X.shape[1]):
outs = trials[trials > X.shape[1]]
raise ValueError("Trial indices {0} are out of bounds for axis 1 of X (size={1})".format(outs, X.shape[1]))
trial_len = np.abs(np.diff(trials[0, :])[0])
Xout = np.zeros((X.shape[0], trial_len, trials.shape[0]))
for ii in range(trials.shape[0]):
Xout[:, :, ii] = X[:, trials[ii, 0]:trials[ii, 1]]
return Xout
__all__.append('epoch_data')
def get_valid_samples(data, delay_vect, mode='valid', backwards=False):
"""
Helper function for excluding or replacing data samples which are not
preceded by a full delay vect of samples for prediction.
Parameters
----------
data : ndarray
Array of data to trim or change, of size [nchannels x nsamples x ntrials]
delay_vect : ndarray
Vector of lags included in model
mode : {'valid','full_nan','full'}
Options for excluding or replacing residuals which do not have a full
model prediction ie the third sample of an order 5 model. 'valid'
removes samples without a full model prediction, 'full_nan' returns
resids of the same size as the data with nans replacing excluded
samples and 'full' returns resids keeping non-full model samples in
place. (Default is 'valid')
backwards : Boolean
Flag indicating whether to edit the first or last samples
Returns
-------
ndarray
data with non-valid samples removed or replaced with nans
"""
if mode == 'full':
# just pass through
return data
# Don't work in place
ret = data.copy()
if mode == 'valid':
if backwards:
ret = ret[:, :-len(delay_vect), :]
else:
ret = ret[:, len(delay_vect):, :]
elif mode == 'full_nan':
if backwards:
ret[:, -len(delay_vect):, :] = np.nan
else:
ret[:, :len(delay_vect), :] = np.nan
else:
raise ValueError("Residual return mode is not recognised, please use \
'valid', 'full_nan' or 'full'")
return ret
__all__.append('get_valid_samples')
def gesd(x, alpha=0.05, p_out=.1, outlier_side=0):
"""Detect outliers using Generalized ESD test
Parameters
----------
x : vector
Data set containing outliers
alpha : scalar
Significance level to detect at (default = 0.05)
p_out : int
Maximum number of outliers to detect (default = 10% of data set)
outlier_side : {-1,0,1}
Specify sidedness of the test
- outlier_side = -1 -> outliers are all smaller
- outlier_side = 0 -> outliers could be small/negative or large/positive (default)
- outlier_side = 1 -> outliers are all larger
Returns
-------
idx : boolean vector
Boolean array with TRUE wherever a sample is an outlier
x2 : vector
Input array with outliers removed
References
----------
B. Rosner (1983). Percentage Points for a Generalized ESD Many-Outlier Procedure. Technometrics 25(2), pp. 165-172.
http://www.jstor.org/stable/1268549?seq=1
"""
if outlier_side == 0:
alpha = alpha/2
if not isinstance(x, np.ndarray):
x = np.asarray(x)
n_out = int(np.ceil(len(x)*p_out))
if np.any(np.isnan(x)):
# Need to find outliers only in finite x
y = np.where(np.isnan(x))[0]
idx1, x2 = gesd(x[np.isfinite(x)], alpha, n_out, outlier_side)
# idx1 has the indexes of y which were marked as outliers
# the value of y contains the corresponding indexes of x that are outliers
idx = np.zeros_like(x).astype(bool)
idx[y[idx1]] = True
n = len(x)
temp = x.copy()
R = np.zeros((n_out,))
rm_idx = np.zeros((n_out,), dtype=int)
lam = np.zeros((n_out,))
for j in range(0, int(n_out)):
i = j+1
if outlier_side == -1:
rm_idx[j] = np.nanargmin(temp)
sample = np.nanmin(temp)
R[j] = np.nanmean(temp) - sample
elif outlier_side == 0:
rm_idx[j] = int(np.nanargmax(abs(temp-np.nanmean(temp))))
R[j] = np.nanmax(abs(temp-np.nanmean(temp)))
elif outlier_side == 1:
rm_idx[j] = np.nanargmax(temp)
sample = np.nanmax(temp)
R[j] = sample - np.nanmean(temp)
R[j] = R[j] / np.nanstd(temp)
temp[int(rm_idx[j])] = np.nan
p = 1-alpha/(n-i+1)
t = stats.t.ppf(p, n-i-1)
lam[j] = ((n-i) * t) / (np.sqrt((n-i-1+t**2)*(n-i+1)))
# Create a boolean array of outliers
idx = np.zeros((n,)).astype(bool)
idx[rm_idx[np.where(R > lam)[0]]] = True
x2 = x[~idx]
return idx, x2
def _find_outliers_in_dims(X, axis=-1, metric_func=np.std, gesd_args=None):
"""Find outliers across specified dimensions of an array"""
if gesd_args is None:
gesd_args = {}
if axis == -1:
axis = np.arange(X.ndim)[axis]
squashed_axes = tuple(np.setdiff1d(np.arange(X.ndim), axis))
metric = metric_func(X, axis=squashed_axes)
rm_ind, _ = gesd(metric, **gesd_args)
return rm_ind
def _find_outliers_in_segments(X, axis=-1, segment_len=100,
metric_func=np.std, gesd_args=None):
"""Create dummy-segments in a dimension of an array and find outliers in it"""
if gesd_args is None:
gesd_args = {}
if axis == -1:
axis = np.arange(X.ndim)[axis]
# Prepare to slice data array
slc = []
for ii in range(X.ndim):
if ii == axis:
slc.append(slice(0, segment_len))
else:
slc.append(slice(None))
# Preallocate some variables
starts = np.arange(0, X.shape[axis], segment_len)
metric = np.zeros((len(starts), ))
bad_inds = np.zeros(X.shape[axis])*np.nan
# Main loop
for ii in range(len(starts)):
if ii == len(starts)-1:
stop = None
else:
stop = starts[ii]+segment_len
# Update slice on dim of interest
slc[axis] = slice(starts[ii], stop)
# Compute metric for current chunk
metric[ii] = metric_func(X[tuple(slc)])
# Store which chunk we've used
bad_inds[slc[axis]] = ii
# Get bad segments
rm_ind, _ = gesd(metric, **gesd_args)
# Convert to int indices
rm_ind = np.where(rm_ind)[0]
# Convert to bool in original space of defined axis
bads = np.isin(bad_inds, rm_ind)
return bads
def detect_artefacts(X, axis=None, reject_mode='dim', metric_func=np.std,
segment_len=100, gesd_args=None, ret_mode='bad_inds'):
"""Detect bad observations or segments in a dataset
Parameters
----------
X : ndarray
Array to find artefacts in.
axis : int
Index of the axis to detect artefacts in
reject_mode : {'dim' | 'segments'}
Flag indicating whether to detect outliers across a dimension (dim;
default) or whether to split a dim into segments and detect outliers in
the them (segments)
metric_func : function
Function defining metric to detect outliers on. Defaults to np.std but
can be any function taking an array and returning a single number.
segement_len : int > 0
Integer window length of dummy epochs for bad_segment detection
gesd_args : dict
Dictionary of arguments to pass to gesd
ret_mode : {'good_inds','bad_inds','zero_bads','nan_bads'}
Flag indicating whether to return the indices for good observations,
indices for bad observations (default), the input data with outliers
removed (zero_bads) or the input data with outliers replaced with nans
(nan_bads)
Returns
-------
ndarray
If ret_mode is 'bad_inds' or 'good_inds', this returns a boolean vector
of length X.shape[axis] indicating good or bad samples. If ret_mode if
'zero_bads' or 'nan_bads' this returns an array copy of the input data
X with bad samples set to zero or np.nan respectively.
"""
if reject_mode not in ['dim', 'segments']:
raise ValueError("reject_mode: '{0}' not recognised".format(reject_mode))
if ret_mode not in ['bad_inds', 'good_inds', 'zero_bads', 'nan_bads']:
raise ValueError("ret_mode: '{0}' not recognised")
if axis is None or axis > X.ndim:
raise ValueError('bad axis')
if reject_mode == 'dim':
bad_inds = _find_outliers_in_dims(X, axis=axis, metric_func=metric_func, gesd_args=None)
elif reject_mode == 'segments':
bad_inds = _find_outliers_in_segments(X, axis=axis,
segment_len=segment_len,
metric_func=metric_func,
gesd_args=None)
if ret_mode == 'bad_inds':
return bad_inds
elif ret_mode == 'good_inds':
return bad_inds == False # noqa: E712
elif ret_mode in ['zero_bads', 'nan_bads']:
out = X.copy()
slc = []
for ii in range(X.ndim):
if ii == axis:
slc.append(bad_inds)
else:
slc.append(slice(None))
slc = tuple(slc)
if ret_mode == 'zero_bads':
out[slc] = 0
return out
elif ret_mode == 'nan_bads':
out[slc] = np.nan
return out
[docs]class PCA(AbstractAnam):
"""
Class for handling PCA-based reduction before fitting a model
"""
hdf5_outputs = ['npcs', 'data_mean', '_U', '_s', '_VT',
'explained_variance', 'explained_variance_ratio',
'components', 'loadings', 'scores']
def __init__(self, data=None, npcs=None):
AbstractAnam.__init__(self)
# Need to cope with no-argument initialisation for anamnesis
if data is not None:
self.data_mean = data.mean(axis=0)
self._pca_svd(data, npcs)
def _pca_svd(self, data, npcs=10):
"""
Compute a Principal Components Analysis on a given dataset using the SVD
method.
Parameters
----------
X : ndarray
Input data of shape [nsamples x nfeatures]. The second dimension is
reduced by the PCA.
Returns
-------
scores : ndarray [nsamples x npcs]
The dimensionality-reduced data. This contains the value of each
observation in the new PCA-space.
components : ndarray [npcs x nfeatures]
The eigenvectors describing how each feature (node or connection) loads
onto the latent PCA dimensions.
loadings : ndarray [npcs x nfeatures]
The components scaled by their contribution to the variance in the
original data
explained_variance_ratio : ndarray [npcs]
The proportion of variance explained by each PC
"""
self.npcs = npcs
# Demean observations
self.data_mean = data.mean(axis=0)[None, :]
# compute
self._U, self._s, self._VT = np.linalg.svd(data - self.data_mean, full_matrices=False)
# Variance explained metrics
var = self._s ** 2 / (data.shape[0]-1)
self.explained_variance = var[:npcs]
self.explained_variance_ratio = var[:npcs] / var.sum()
# The weights for each original variable in each principal component
self.components = self._VT[:npcs, :] # Eigenvectors
self.loadings = self.components * self._s[:npcs, None] # Scaled by variance contributed to data
# The new co-ordinates of each observation in PCA-space
self.scores = self._s[:npcs] * self._U[:, :npcs]
# Check that component self.scores are properly decorrelated
try:
C = np.corrcoef(self.scores.T)
assert(np.sum(np.abs(C - np.diag(np.diag(C)))) < 1e-10)
except AssertionError:
print('Observations are not properly de-correlated by PCA. Data demeaning might have gone wrong.')
raise
[docs] def project_score(self, scores):
"""
Compute a projection of a set of scores back to original data space
"""
return self.data_mean + np.dot(scores, self.components)
__all__.append('PCA')
register_class(PCA)