import numpy as np
from glmdenoise.utils.make_design_matrix import make_design
[docs]def constructStimulusMatrices(m, prenumlag=0, postnumlag=0, wantwrap=0):
"""construc stimulus matrices from design matrix m
Args:
m ([2d matrix]): is a 2D matrix, each row of which is a stimulus
sequence (i.e. a vector that is all zeros except for ones
indicating the onset of a given stimulus (fractional values
are also okay))
prenumlag (bool or int, optional): Defaults to False. number of
stimulus points in the past
postnumlag (bool or int, optional): Defaults to False. number of
stimulus points in the future
wantwrap (bool, optional): Defaults to False. whether to wrap
around
Returns:
[2d matrix]: a stimulus matrix of dimensions
size(m,2) x ((prenumlag+postnumlag+1)*size(m,1)).
this is a horizontal concatenation of the stimulus
matrix for the first stimulus sequence, the stimulus
matrix for the second stimulus sequence, and so on.
this function is useful for fitting finite impulse
response (FIR) models.
"""
# make sure m is numpy
m = np.asarray(m)
# get out early
if not prenumlag and not postnumlag:
f = m.T
return f
else:
nconds, nvols = m.shape
# do it
num = prenumlag + postnumlag + 1
f = np.zeros((nvols, num*nconds))
for p in range(nconds):
i = p + 1
thiscol = (i - 1) * num + np.array(range(num))
f[:, thiscol] = constructStimulusMatrix(
m[p, :], prenumlag, postnumlag, wantwrap
)
return f
[docs]def constructStimulusMatrix(v, prenumlag, postnumlag, wantwrap=0):
"""Construct stimulus matrix from design vector
Args:
v ([1d vector]): v is the stimulus sequence represented as a vector
prenumlag ([int]): this is the number of stimulus points in the past
postnumlag ([int]): this is the number of stimulus points in the future
wantwrap (int, optional): Defaults to 0. whether to wrap around
Returns:
[2d array]: return a stimulus matrix of dimensions
length(v) x (prenumlag+postnumlag+1)
where each column represents the stimulus at
a particular time lag.
"""
v = np.asarray(v)
total = prenumlag + postnumlag + 1
f = np.zeros((len(v), total))
for p in range(total):
i = p + 1
if False:
pass
# shift = [0 - prenumlag + (p-1)]
# f[:, p] = np.roll(v, shift, axis=(0, 1)).T
else:
temp = -prenumlag + (i - 1)
if temp < 0:
pass
# vindx = range(len(v), 1 - temp)
# findx = range(len(v)+temp)
# f[findx, p] = v[vindx]
else:
f[temp:, p] = v[: len(v) - temp]
return f
[docs]def calccod(x, y, wantgain=0, wantmeansub=1):
"""Calculate the coefficient of determination
Args:
x ([type]): matrix with the same dimensions as y
y ([type]): matrix with the same dimensions as x
dim ([type]): is the dimension of interest
wantgain (int, optional): Defaults to 0. 0 means normal
1 means allow a gain to be applied to each case of <x>
to minimize the squared error with respect to <y>.
in this case, there cannot be any NaNs in <x> or <y>.
2 is like 1 except that gains are restricted to be non-negative.
so, if the gain that minimizes the squared error is negative,
we simply set the gain to be applied to be 0.
default: 0.
wantmeansub (int, optional): Defaults to 1.
0 means do not subtract any mean. this makes it such that
the variance quantification is relative to 0.
1 means subtract the mean of each case of <y> from both
<x> and <y> before performing the calculation. this makes
it such that the variance quantification
is relative to the mean of each case of <y>.
note that <wantgain> occurs before <wantmeansub>.
default: 1.
calculate the coefficient of determination (R^2) indicating
the percent variance in <y> that is explained by <x>. this is achieved
by calculating 100*(1 - sum((y-x).^2) / sum(y.^2)). note that
by default, we subtract the mean of each case of <y> from both <x>
and <y> before proceeding with the calculation.
the quantity is at most 100 but can be 0 or negative (unbounded).
note that this metric is sensitive to DC and scale and is not symmetric
(i.e. if you swap <x> and <y>, you may obtain different results).
it is therefore fundamentally different than Pearson's correlation
coefficient (see calccorrelation.m).
NaNs are handled gracefully (a NaN causes that data point to be ignored).
if there are no valid data points (i.e. all data points are
ignored because of NaNs), we return NaN for that case.
note some weird cases:
calccod([],[]) is []
history:
2013/08/18 - fix pernicious case where <x> is all zeros and <wantgain>
is 1 or 2.
2010/11/28 - add <wantgain>==2 case
2010/11/23 - changed the output range to percentages. thus, the range
is (-Inf,100]. also, we removed the <wantr> input since
it was dumb.
example:
x = np.random.random(100)
calccod(x,x+0.1*np.random.random(100))
"""
# input
dim = np.argmax(x.shape)
# handle gain
if wantgain:
# to get the residuals, we want to do something like y-x*inv(x'*x)*x'*y
temp = 1/np.dot(x, x) * np.dot(x, y)
if wantgain == 2:
# if the gain was going to be negative, rectify it to 0.
temp[temp < 0] = 0
x = x * temp
# propagate NaNs (i.e. ignore invalid data points)
x[np.isnan(y)] = np.nan
y[np.isnan(x)] = np.nan
# handle mean subtraction
if wantmeansub:
mn = np.nanmean(y, axis=dim)
y = y - mn
x = x - mn
# finally, compute it
with np.errstate(divide='ignore', invalid='ignore'):
nom = np.nansum((y-x) ** 2, axis=dim)
denom = np.nansum((y**2), axis=dim)
f = np.nan_to_num(1 - (nom / denom))
return f
[docs]def calccodStack(y, yhat):
"""
[summary]
Args:
x ([type]): [description]
y ([type]): [description]
"""
numruns = len(y)
nom = []
denom = []
for run in range(numruns):
with np.errstate(divide="ignore", invalid="ignore"):
yrun = np.array(y[run])
yhatrun = np.array(yhat[run])
nom.append(np.sum((yrun - yhatrun) ** 2, axis=0))
denom.append(np.sum(yrun ** 2, axis=0)) # Kendricks denominator
nom = np.array(nom).sum(0)
denom = np.array(denom).sum(0)
with np.errstate(divide='ignore', invalid='ignore'):
f = np.nan_to_num(1 - nom / denom)
return f
[docs]def mtimesStack(m1, m2):
"""function f = mtimesStack(m1,m2)
simply return <m1>*np.vstack(m2) but do so in a way that doesn't cause
too much memory usage.
Args:
m1 ([A x B]): is A x B
m2 ([B x C]): is a stack of matrices such that np.vstack(m2) is B x C
"""
nruns = len(m2)
f = 0
cnt = 0
for q in range(nruns):
nvols = m2[q].shape[0]
thiscol = cnt + np.asarray(list(range(nvols)))
colrow = m1[:, thiscol] @ m2[q]
f = f + colrow
cnt = cnt + m2[q].shape[0]
return f
[docs]def olsmatrix(X):
"""OLS regression
what we want to do is to perform OLS regression using <X>
and obtain the parameter estimates. this is accomplished
by inv(X'\*X)\*X'\*y = f\*y where y is the data (samples x cases).
what this function does is to return <f> which has dimensions
parameters x samples.
we check for a special case, namely, when one or more regressors
are all zeros. if we find that this is the case, we issue a warning
and simply ignore these regressors when fitting. thus, the weights
associated with these regressors will be zeros.
if any warning messages are produced by the inversion process, then we die.
this is a conservative strategy that ensures that the regression is
well-behaved (i.e. has a unique, finite solution). (note that this does
not cover the case of zero regressors, which is gracefully handled as
described above.)
note that no scale normalization of the regressor columns is performed.
Args:
X (ndarray): Samples by parameters
Returns:
(f): 2D parameters by Samples
"""
bad = np.all(X == 0, axis=0)
good = np.invert(bad)
# report warning
if not np.any(good) == 1:
print(
"regressors are all zeros; we will estimate a 0 weight for those regressors."
)
f = np.zeros((X.shape[1], X.shape[0]))
return f
# do it
if np.any(bad):
print(
"One or more regressors are all zeros; we will estimate a 0 weight for those regressors."
)
f = np.zeros((X.shape[1], X.shape[0]))
X = np.mat(X)
f[good, :] = np.linalg.inv(
X[:, good].T @ X[:, good]) @ X[:, good].T
else:
X = np.mat(X)
f = np.linalg.inv(X.T @ X) @ X.T
return f
[docs]def convolveDesign(X, hrf):
"""convolve each column of a 2d design matrix with hrf
Args:
X ([2D design matrix]): time by cond
hrf ([1D hrf function]): hrf
Returns:
[convdes]: 2D: Samples by cond
"""
ntime, ncond = X.shape
convdes = np.asarray([np.convolve(X[:, x], hrf) for x in range(ncond)]).T
return convdes[0:ntime, :]
[docs]def optimiseHRF(
design,
data,
tr,
hrfknobs,
combinedmatrix,
numforhrf=50,
hrfthresh=0.5,
hrffitmask=1,
):
"""Optimise hrf from a selection of voxels.
This uses an iterative fitting optimisation procedure,
where we fit for betas and then fit for hrf using a fir
like approach.
Args:
design (pandas dataframe): this is a pandas data frame with keys:
['trial_type']: stimulus condition index
['onset']: onsets in s for each event
['duration']: duration for each event
data (2d array): data (time x vox). this data should already
have polynomials projected out.
tr (float): the sampling rate in seconds
hrfknobs (1d array): should be time x 1 with the initial seed
for the HRF. The length of this vector indicates the
number of time points that we will attempt to estimate
in the HRF. Note on normalization: after fitting the HRF, we
normalize the HRF to peak at 1 (and adjust amplitudes
accordingly).
combinedmatrix (stack of 2d arrays): projection matrix of the
polynomials and extra regressors (if passed by user).
This is used to whiten the design matrix.
numforhrf (int, optional): Defaults to 50.
is a positive integer indicating the number of voxels
(with the best R^2 values) to consider in fitting the
global HRF. (If there are fewer than that number of
voxels available, we just use the voxels that are
available.)
hrfthresh (float, optional): Defaults to .5.
If the R^2 between the estimated HRF and the initial HRF
is less than <hrfthresh>, we decide to just use the initial
HRF. Set <hrfthresh> to -Inf if you never want to reject
the estimated HRF.
Returns:
(Dict): we return a dictionary with kers:
["hrf"]: the optimised hrf (but see note above on hrfthresh)
["hrffitvoxels"]: the indices of the voxels used to fit.
["convdesign"]: the design convolved with the optimised hrf
and polynomials projected out.
["seedhrf"]: we return the seed hrf for book keeping.
"""
minR2 = 0.99
# calc
numinhrf = len(hrfknobs)
numruns = len(design)
postnumlag = numinhrf - 1
# collect ntimes per run
ntimes = []
for p in range(numruns):
ntimes.append(data[p].shape[0])
# loop until convergence
currenthrf = hrfknobs # initialize
cnt = 1
while True:
print('\t optimising hrf :{}\n'.format(cnt))
# fix the HRF, estimate the amplitudes
if cnt % 2 == 1:
# prepare design matrix
convdesign = []
for p in range(numruns):
# get design matrix with HRF
# number of time points
convdes = make_design(design[p], tr, ntimes[p], currenthrf)
# project the polynomials out
convdes = np.dot(combinedmatrix[p], convdes)
# time x conditions
convdesign.append(convdes)
# stack design across runs
stackdesign = np.vstack(convdesign)
# estimate the amplitudes (output: conditions x voxels)
currentbeta = mtimesStack(olsmatrix(stackdesign), data)
# calculate R^2
modelfit = [np.dot(convdesign[p], currentbeta).astype(np.float32)
for p in range(numruns)]
R2 = calccodStack(data, modelfit)
# figure out indices of good voxels
if hrffitmask == 1:
temp = R2
else: # if people provided a mask for hrf fitting
temp = np.zeros((R2.shape))
temp[np.invert(hrffitmask.ravel())] = -np.inf
# shove -Inf in where invalid
temp[np.isnan(temp)] = -np.inf
ii = np.argsort(temp)
nii = len(ii)
iichosen = ii[np.max((1, nii - numforhrf)):nii]
iichosen = np.setdiff1d(
iichosen, iichosen[temp[iichosen] == -np.inf]
).tolist()
hrffitvoxels = iichosen
# fix the amplitudes, estimate the HRF
else:
nhrfvox = len(hrffitvoxels)
# prepare design matrix
convdesign = []
for p in range(numruns):
X = make_design(design[p], tr, ntimes[p])
# expand design matrix using delta functions
numcond = X.shape[1]
# time x L*conditions
stimmat = constructStimulusMatrices(
X.T, prenumlag=0, postnumlag=postnumlag
).reshape(-1, numcond, order='F').astype(np.float32)
# calc
# weight and sum based on the current amplitude estimates.
# only include the good voxels.
# return shape time*L x voxels
convdes = np.dot(
stimmat, currentbeta[:, hrffitvoxels]).astype(np.float32)
# remove polynomials and extra regressors
# time x L*voxels
convdes = convdes.reshape(
(ntimes[p], -1), order='F')
# time x L*voxels
convdes = np.array(np.dot(combinedmatrix[p], convdes))
# time x voxels x L
convdes = convdes.reshape((ntimes[p], numinhrf, -1), order='F')
convdesign.append(
np.transpose(convdes, (0, 2, 1))
)
# estimate the HRF
previoushrf = currenthrf
datasubset = np.array(np.vstack(
[data[x][:, hrffitvoxels] for x in range(numruns)]
))
stackdesign = np.vstack(convdesign)
ntime = stackdesign.shape[0]
stackdesign = stackdesign.reshape(
(ntime * nhrfvox, numinhrf), order='F')
stackdesign = olsmatrix(stackdesign)
currenthrf = np.asarray(stackdesign.dot(
datasubset.reshape((-1), order='F')))[0]
# if HRF is all zeros (this can happen when the data are all zeros)
# get out prematurely
if np.all(currenthrf == 0):
print('current hrf went all to 0 after {} attempts\n'.format(cnt))
break
# check for convergence
# how much variance of the previous estimate does
# the current one explain?
hrfR2 = calccod(previoushrf, currenthrf, wantmeansub=0)
if (hrfR2 >= minR2 and cnt > 2):
break
cnt += 1
# sanity check
# we want to see that we're not converging in a weird place
# so we compute the coefficient of determination between the
# current estimate and the seed hrf
hrfR2 = calccod(hrfknobs, previoushrf, wantmeansub=0)
# sanity check to make sure that we are not doing worse.
if hrfR2 < hrfthresh:
print(
"Global HRF estimate is far from the initial seed,"
"probably indicating low SNR. We are just going to"
"use the initial seed as the HRF estimate.\n"
)
# prepare design matrix
convdesign = []
whitedesign = []
for p in range(numruns):
# get design matrix with HRF
# number of time points
convdes = make_design(design[p], tr, ntimes[p], hrfknobs)
# project the polynomials out
whitedesign.append(np.dot(combinedmatrix[p], convdes))
# time x conditions
convdesign.append(convdes)
f = dict()
f["hrf"] = hrfknobs
f["hrffitvoxels"] = None
f["convdesign"] = convdesign
f["whitedesign"] = whitedesign
f["seedhrf"] = hrfknobs
return f
# normalize results
mx = np.max(previoushrf)
previoushrf = previoushrf / mx
currentbeta = currentbeta * mx
# prepare design matrix
whitedesign = []
convdesign = []
for p in range(numruns):
# get design matrix with HRF
# number of time points
convdes = make_design(design[p], tr, ntimes[p], previoushrf)
# project the polynomials out
whitedesign.append(np.dot(combinedmatrix[p], convdes))
# time x conditions
convdesign.append(convdes)
# return
f = dict()
f["hrf"] = previoushrf
f["hrffitvoxels"] = hrffitvoxels
f["convdesign"] = convdesign
f["whitedesign"] = whitedesign
f["seedhrf"] = hrfknobs
f["hrffitmask"] = hrffitmask
return f