import numpy as np
import nept
[docs]class Epoch:
"""An array of epochs, where each epoch has a start and stop time.
Parameters
----------
time : np.array
If shape (n_epochs, 1) or (n_epochs,), the start time for each epoch.
If shape (n_epochs, 2), the start and stop times for each epoch.
duration : np.array or None, optional
The length of the epoch.
Attributes
----------
time : np.array
The start and stop times for each epoch. With shape (n_epochs, 2).
"""
def __init__(self, time, duration=None):
try:
time = np.squeeze(time).astype(float)
except ValueError as err:
raise ValueError("must have the same number of start and stop times").with_traceback(err.__traceback__)
if time.ndim == 0:
time = time[..., np.newaxis]
if duration is not None:
duration = np.squeeze(duration).astype(float)
if duration.ndim == 0:
duration = duration[..., np.newaxis]
if time.ndim == 2 and duration.ndim == 1:
raise ValueError("duration not allowed when using start and stop times")
if time.ndim == 1 and time.shape[0] != duration.shape[0]:
raise ValueError("must have same number of time and duration samples")
if time.ndim == 1 and duration.ndim == 1:
stop_epoch = time + duration
time = np.hstack((time[..., np.newaxis], stop_epoch[..., np.newaxis]))
if time.ndim == 1 and duration is None:
time = time[..., np.newaxis]
if time.ndim == 2 and time.shape[1] != 2:
time = np.hstack((time[0][..., np.newaxis], time[1][..., np.newaxis]))
if time.ndim > 2:
raise ValueError("time cannot have more than 2 dimensions")
if time.ndim == 2 and np.any(time[:, 1] - time[:, 0] <= 0):
raise ValueError("start must be less than stop")
sort_idx = np.argsort(time[:, 0])
time = time[sort_idx]
self.time = time
def __getitem__(self, idx):
return Epoch(np.hstack([np.array(self.starts[idx])[..., np.newaxis],
np.array(self.stops[idx])[..., np.newaxis]]))
@property
def centers(self):
"""(np.array) The center of each epoch."""
return np.mean(self.time, axis=1)
@property
def durations(self):
"""(np.array) The duration of each epoch."""
return self.time[:, 1] - self.time[:, 0]
@property
def isempty(self):
"""(boolean) Whether the epoch array is empty."""
if self.time.size == 0:
return True
else:
return False
@property
def starts(self):
"""(np.array) The start of each epoch."""
return self.time[:, 0]
@property
def start(self):
"""(np.array) The start of the first epoch."""
return self.time[:, 0][0]
@property
def stops(self):
"""(np.array) The stop of each epoch."""
return self.time[:, 1]
@property
def stop(self):
"""(np.array) The stop of the last epoch."""
return self.time[:, 1][-1]
@property
def n_epochs(self):
"""(int) The number of epochs."""
return len(self.time[:, 0])
[docs] def copy(self):
new_starts = np.array(self.starts)
new_stops = np.array(self.stops)
return Epoch(new_starts, new_stops-new_starts)
[docs] def intersect(self, epoch):
"""Finds intersection between two sets of epochs.
Parameters
----------
epoch : nept.Epoch
Returns
-------
intersect_epochs : nept.Epoch
"""
if len(self.starts) == 0 or len(epoch.starts) == 0:
return Epoch([], [])
new_starts = []
new_stops = []
epoch_a = self.copy().merge()
epoch_b = epoch.copy().merge()
for aa in epoch_a.time:
for bb in epoch_b.time:
if (aa[0] <= bb[0] < aa[1]) and (aa[0] < bb[1] <= aa[1]):
new_starts.append(bb[0])
new_stops.append(bb[1])
elif (aa[0] < bb[0] < aa[1]) and (aa[0] < bb[1] > aa[1]):
new_starts.append(bb[0])
new_stops.append(aa[1])
elif (aa[0] > bb[0] < aa[1]) and (aa[0] < bb[1] < aa[1]):
new_starts.append(aa[0])
new_stops.append(bb[1])
elif (aa[0] >= bb[0] < aa[1]) and (aa[0] < bb[1] >= aa[1]):
new_starts.append(aa[0])
new_stops.append(aa[1])
return Epoch(np.hstack([np.array(new_starts)[..., np.newaxis],
np.array(new_stops)[..., np.newaxis]]))
[docs] def overlaps(self, epoch):
"""Finds overlap between template epochs and epoch of interest.
Parameters
----------
epoch : nept.Epoch
Returns
-------
overlaps_epochs : nept.Epoch
"""
if len(self.starts) == 0 or len(epoch.starts) == 0:
return Epoch([], [])
new_starts = []
new_stops = []
template = self.copy().merge()
epoch_interest = epoch.copy().merge()
for aa in template.time:
for bb in epoch_interest.time:
if (aa[0] <= bb[0] < aa[1]) and (aa[0] < bb[1] <= aa[1]):
new_starts.append(bb[0])
new_stops.append(bb[1])
elif (aa[0] < bb[0] < aa[1]) and (aa[0] < bb[1] > aa[1]):
new_starts.append(bb[0])
new_stops.append(bb[1])
elif (aa[0] > bb[0] < aa[1]) and (aa[0] < bb[1] < aa[1]):
new_starts.append(bb[0])
new_stops.append(bb[1])
elif (aa[0] >= bb[0] < aa[1]) and (aa[0] < bb[1] >= aa[1]):
new_starts.append(bb[0])
new_stops.append(bb[1])
new_starts = np.unique(new_starts)
new_stops = np.unique(new_stops)
return Epoch(np.hstack([np.array(new_starts)[..., np.newaxis],
np.array(new_stops)[..., np.newaxis]]))
[docs] def merge(self, gap=0.0):
"""Merges epochs that are close or overlapping.
Parameters
----------
gap : float, optional
Amount (in time) to consider epochs close enough to merge.
Defaults to 0.0 (no gap).
Returns
-------
merged_epochs : nept.Epoch
"""
if gap < 0:
raise ValueError("gap cannot be negative")
epoch = self.copy()
stops = epoch.stops[:-1] + gap
starts = epoch.starts[1:]
to_merge = (stops - starts) >= 0
new_starts = [epoch.starts[0]]
new_stops = []
next_stop = epoch.stops[0]
for i in range(epoch.time.shape[0] - 1):
this_stop = epoch.stops[i]
next_stop = max(next_stop, this_stop)
if not to_merge[i]:
new_stops.append(next_stop)
new_starts.append(epoch.starts[i+1])
new_stops.append(epoch.stops[-1])
new_starts = np.array(new_starts)
new_stops = np.array(new_stops)
return Epoch(new_starts, new_stops-new_starts)
[docs] def expand(self, amount, direction='both'):
"""Expands epoch by the given amount.
Parameters
----------
amount : float
Amount (in time) to expand each epoch.
direction : str
Can be 'both', 'start', or 'stop'. This specifies
which direction to resize epoch.
Returns
-------
expanded_epochs : nept.Epoch
"""
if direction == 'both':
resize_starts = self.time[:, 0] - amount
resize_stops = self.time[:, 1] + amount
elif direction == 'start':
resize_starts = self.time[:, 0] - amount
resize_stops = self.time[:, 1]
elif direction == 'stop':
resize_starts = self.time[:, 0]
resize_stops = self.time[:, 1] + amount
else:
raise ValueError("direction must be 'both', 'start', or 'stop'")
return Epoch(np.hstack((resize_starts[..., np.newaxis],
resize_stops[..., np.newaxis])))
[docs] def shrink(self, amount, direction='both'):
"""Shrinks epoch by the given amount.
Parameters
----------
amount : float
Amount (in time) to shrink each epoch.
direction : str
Can be 'both', 'start', or 'stop'. This specifies
which direction to resize epoch.
Returns
-------
shrinked_epochs : nept.Epoch
"""
both_limit = min(self.durations / 2)
if amount > both_limit and direction == 'both':
raise ValueError("shrink amount too large")
single_limit = min(self.durations)
if amount > single_limit and direction != 'both':
raise ValueError("shrink amount too large")
return self.expand(-amount, direction)
[docs] def join(self, epoch):
"""Combines two sets of epochs.
Parameters
----------
epoch : nept.Epoch
Returns
-------
joined_epochs : nept.Epoch
"""
join_starts = np.concatenate((self.starts, epoch.starts))
join_stops = np.concatenate((self.stops, epoch.stops))
return Epoch(join_starts, join_stops-join_starts)
[docs] def contains(self, value):
"""Checks whether value is in any epoch.
Parameters
----------
epochs: nept.Epoch
value: float or int
Returns
-------
boolean
"""
for start, stop in zip(self.starts, self.stops):
if start <= value <= stop:
return True
return False