import numpy as np
from scipy import signal
import warnings
import nept
[docs]def find_nearest_idx(array, val):
"""Finds nearest index in array to value.
Parameters
----------
array : np.array
val : float
Returns
-------
Index into array that is closest to val
"""
return (np.abs(array-val)).argmin()
[docs]def find_nearest_indices(array, vals):
"""Finds nearest index in array to value.
Parameters
----------
array : np.array
This is the array you wish to index into.
vals : np.array
This is the array that you are getting your indices from.
Returns
-------
Indices into array that is closest to vals.
Notes
-----
Wrapper around find_nearest_idx().
"""
return np.array([find_nearest_idx(array, val) for val in vals], dtype=int)
[docs]def find_multi_in_epochs(spikes, epochs, min_involved):
"""Finds epochs with minimum number of participating neurons.
Parameters
----------
spikes: np.array
Of nept.SpikeTrain objects
epochs: nept.Epoch
min_involved: int
Returns
-------
multi_epochs: nept.Epoch
"""
multi_starts = []
multi_stops = []
n_neurons = len(spikes)
for start, stop in zip(epochs.starts, epochs.stops):
involved = 0
for this_neuron in spikes:
if ((start <= this_neuron.time) & (this_neuron.time <= stop)).sum() >= 1:
involved += 1
if involved >= min_involved:
multi_starts.append(start)
multi_stops.append(stop)
multi_starts = np.array(multi_starts)
multi_stops = np.array(multi_stops)
multi_epochs = nept.Epoch(np.hstack([np.array(multi_starts)[..., np.newaxis],
np.array(multi_stops)[..., np.newaxis]]))
return multi_epochs
[docs]def get_sort_idx(tuning_curves):
"""Finds indices to sort neurons by max firing in tuning curve.
Parameters
----------
tuning_curves : list of lists
Where each inner list is the tuning curves for an individual
neuron.
Returns
-------
sorted_idx : list
List of integers that correspond to the neuron in sorted order.
"""
tc_max_loc = []
for i, neuron_tc in enumerate(tuning_curves):
tc_max_loc.append((i, np.where(neuron_tc == np.max(neuron_tc))[0][0]))
sorted_by_tc = sorted(tc_max_loc, key=lambda x: x[1])
sorted_idx = []
for idx in sorted_by_tc:
sorted_idx.append(idx[0])
return sorted_idx
[docs]def get_edges(position, binsize, lastbin=True):
"""Finds edges based on linear time
Parameters
----------
position : nept.AnalogSignal
binsize : float
This is the desired size of bin.
Typically set around 0.020 to 0.040 seconds.
lastbin : boolean
Determines whether to include the last bin. This last bin may
not have the same binsize as the other bins.
Returns
-------
edges : np.array
"""
edges = np.arange(position.time[0], position.time[-1], binsize)
if lastbin:
if edges[-1] != position.time[-1]:
edges = np.hstack((edges, position.time[-1]))
return edges
[docs]def bin_spikes(spikes, position, window_size, window_advance,
gaussian_std=None, n_gaussian_std=5, normalized=True):
"""Bins spikes using a sliding window.
Parameters
----------
spikes: list
Of nept.SpikeTrain
position: nept.AnalogSignal
window_size: float
window_advance: float
gaussian_std: float or None
n_gaussian_std: int
normalized: boolean
Returns
-------
binned_spikes: nept.AnalogSignal
"""
bin_edges = get_edges(position, window_advance, lastbin=True)
given_n_bins = window_size / window_advance
n_bins = int(round(given_n_bins))
if abs(n_bins - given_n_bins) > 0.01:
warnings.warn("window advance does not divide the window size evenly. "
"Using window size %g instead." % (n_bins*window_advance))
if normalized:
square_filter = np.ones(n_bins) * (1 / n_bins)
else:
square_filter = np.ones(n_bins)
counts = np.zeros((len(spikes), len(bin_edges)))
for idx, spiketrain in enumerate(spikes):
counts[idx] = np.convolve(np.hstack([np.histogram(spiketrain.time, bins=bin_edges)[0],
np.array([0])]),
square_filter, mode='same')
if gaussian_std is not None and gaussian_std > window_advance:
n_points = n_gaussian_std * gaussian_std * 2 / window_advance
n_points = max(n_points, 1.0)
if n_points % 2 == 0:
n_points += 1
n_points = int(round(n_points))
gaussian_filter = signal.gaussian(n_points, gaussian_std / window_advance)
gaussian_filter /= np.sum(gaussian_filter)
if len(gaussian_filter) < counts.shape[1]:
for idx, spiketrain in enumerate(spikes):
counts[idx] = np.convolve(counts[idx], gaussian_filter, mode='same')
else:
raise ValueError("gaussian filter too long for this length of time")
return nept.AnalogSignal(counts.T, bin_edges)
[docs]def cartesian(xcenters, ycenters):
"""Finds every combination of elements in two arrays.
Parameters
----------
xcenters : np.array
ycenters : np.array
Returns
-------
cartesian : np.array
With shape(n_sample, 2).
"""
return np.transpose([np.tile(xcenters, len(ycenters)), np.repeat(ycenters, len(xcenters))])
[docs]def get_xyedges(position, binsize=3):
"""Gets edges based on position min and max.
Parameters
----------
position: 2D nept.Position
binsize: int
Returns
-------
xedges: np.array
yedges: np.array
"""
xedges = np.arange(position.x.min(), position.x.max() + binsize, binsize)
yedges = np.arange(position.y.min(), position.y.max() + binsize, binsize)
return xedges, yedges
[docs]def expand_line(start_pt, stop_pt, line, expand_by=6):
""" Creates buffer zone around a line.
Parameters
----------
start_pt : Shapely's Point object
stop_pt : Shapely's Point object
line : Shapely's LineString object
expand_by : int
This sets by how much you wish to expand the line.
Defaults to 6.
Returns
-------
zone : Shapely's Polygon object
"""
line_expanded = line.buffer(expand_by)
zone = start_pt.union(line_expanded).union(stop_pt)
return zone
[docs]def perievent_slice(analogsignal, events, t_before, t_after, dt=None):
"""Slices the analogsignal data into perievent chunks.
Unlike time_slice, the resulting AnalogSignal will be multidimensional.
Only works for 1D signals.
Parameters
----------
analogsignal : nept.AnalogSignal
events : np.array
t_before : float
t_after : float
dt : float
Returns
-------
nept.AnalogSignal
"""
if analogsignal.dimensions != 1:
raise ValueError("AnalogSignal must be 1D.")
if dt is None:
dt = np.median(np.diff(analogsignal.time))
time = np.arange(-t_before, t_after+dt, dt)
data = np.zeros((len(time), len(events)))
for i, event in enumerate(events):
sliced = analogsignal.time_slice(event-t_before, event+t_after)
data[:,i] = np.interp(time+event, sliced.time, np.squeeze(sliced.data))
return nept.AnalogSignal(data, time)
[docs]def speed_threshold(position, t_smooth=0.5, speed_limit=0.4):
"""Finds positions above a certain speed threshold
Parameters
----------
position : nept.Position
t_smooth : float
speed_limit : float
Returns
-------
position_run : nept.Position
"""
speed = position.speed(t_smooth)
run_idx = np.squeeze(speed.data) >= speed_limit
return position[run_idx]