Source code for

from bouter import Experiment
from bouter import utilities, decorators
import numpy as np
import pandas as pd

[docs]class FreelySwimmingExperiment(Experiment): @property def n_tail_segments(self): return self["tracking+fish_tracking"]["n_segments"] - 1 @property def n_fish(self): return self["tracking+fish_tracking"]["n_fish_max"] @property def camera_px_in_mm(self): """ Return camera pixel size in millimeters :param exp: :return: """ cal_params = self["stimulus"]["calibration_params"] proj_mat = np.array(cal_params["cam_to_proj"]) return ( np.linalg.norm(np.array([1.0, 0.0]) @ proj_mat[:, :2]) * cal_params["mm_px"] ) @property def tail_columns(self): """Return a nested list of names of columns with tracking data from all tracked segments. One list for each fish tracked during the experiment. """ return [[f"f{i}_theta_{j:02}" for j in range(self.n_tail_segments)] for i in range(self.n_fish)] def _extract_bout(self, s, e, n_segments, i_fish=0, scale=1.0, dt=None): bout = self._rename_fish(self.behavior_log.iloc[s:e], i_fish, n_segments) # scale to physical coordinates if dt is None: dt = (bout.t.values[-1] - bout.t.values[0]) / bout.shape[0] # pixels are scaled to millimeters (columns x, vx, y and vy) bout.iloc[:, 1:5] *= scale # velocities are additionally divided by the time difference to get mm/s bout.iloc[:, 2:7:2] /= dt return bout def _fish_column_names(self, i_fish, n_segments): return [ "f{:d}_x".format(i_fish), "f{:d}_vx".format(i_fish), "f{:d}_y".format(i_fish), "f{:d}_vy".format(i_fish), "f{:d}_theta".format(i_fish), "f{:d}_vtheta".format(i_fish), ] + ["f{:d}_theta_{:02d}".format(i_fish, i) for i in range(n_segments)] def _fish_renames(self, i_fish, n_segments): return dict( { "f{:d}_x".format(i_fish): "x", "f{:d}_vx".format(i_fish): "vx", "f{:d}_y".format(i_fish): "y", "f{:d}_vy".format(i_fish): "vy", "f{:d}_theta".format(i_fish): "theta", "f{:d}_vtheta".format(i_fish): "vtheta", }, **{ "f{:d}_theta_{:02d}".format(i_fish, i): "theta_{:02d}".format(i) for i in range(n_segments) } ) def _rename_fish(self, df, i_fish, n_segments): return df.filter(["t"] + self._fish_column_names(i_fish, n_segments)).rename( columns=self._fish_renames(i_fish, n_segments) )
[docs] def compute_velocity( self, max_interpolate=2, recalculate_vel=False, scale=None, median_vel=False, window_size=7, ): """Compute the squared total swimming velocity for each fish. Add them as new columns to the dataframe log and return the complete dataframe. :param max_interpolate: number of points to interpolate if surrounded by NaNs in tracking :param recalculate_vel: :param scale: mm per pixel, recalculated by default :return: """ df = self.behavior_log scale = scale or self.camera_px_in_mm dt = np.mean(np.diff(df.t[100:200])) dfint = df.interpolate("linear", limit=max_interpolate, limit_area="inside") fish_velocities = pd.DataFrame(np.nan, index=self.behavior_log.index, columns=["vel2_f{}".format(i_fish) for i_fish in range(self.n_fish)]) for i_fish in range(self.n_fish): if recalculate_vel: for thing in ["x", "y", "theta"]: dfint["f{}_v{}".format(i_fish, thing)] = np.r_[ np.diff(dfint["f{}_{}".format(i_fish, thing)]), 0 ] vel2 = ( dfint["f{}_vx".format(i_fish)] ** 2 + dfint["f{}_vy".format(i_fish)] ** 2 ) * ((scale / dt) ** 2) if median_vel: vel2 = vel2.rolling(window=window_size, min_periods=1).median() fish_velocities["vel2_f{}".format(i_fish)] = vel2 return fish_velocities
[docs] @decorators.cache_results() def get_bouts( self, scale=None, threshold=1, **kwargs ): """ Extracts all bouts from a freely-swimming tracking experiment :param exp: the experiment object :param scale: mm per pixel, recalculated by default :param threshold: velocity threshold in mm/s :return: tuple: (list of single bout dataframes, list of boolean arrays marking if the bout i follows bout i-1) """ n_fish = self.n_fish n_segments = self.n_tail_segments scale = scale or self.camera_px_in_mm fish_velocities = self.compute_velocity() bouts = [] continuous = [] for i_fish in range(n_fish): vel2 = fish_velocities["vel2_f{}".format(i_fish)] bout_locations, continuity = utilities.extract_segments_above_threshold( vel2.values, threshold=threshold ** 2, **kwargs ) all_bouts_fish = [ self._extract_bout(s, e, n_segments, i_fish, scale) for s, e in bout_locations ] bouts.append(all_bouts_fish) continuous.append(np.array(continuity)) return bouts, continuous
[docs] @decorators.cache_results() def get_bout_properties(self, continuity=None): """ Makes a summary of all extracted bouts with basic kinematic parameters and timing. :param continuity: :return: a dataframe containing all bouts """ headers = [ "t_start", "x_start", "y_start", "theta_start", "t_end", "x_end", "y_end", "theta_end", ] #Extract experiment bouts bouts, _ = self.get_bouts() # an array is preallocated loop through the bouts bout_data = np.empty( (np.sum([len(bouts[i]) for i in range(len(bouts))]), len(headers)) ) n_summarized_bouts = 0 for i_fish in range(len(bouts)): for i_bout, bout in enumerate(bouts[i_fish]): # slices from 0 to 4 are the start parameters, from 4 to 8 the end parameters for sl, idx in zip([slice(0, 4), slice(4, 8)], [0, -1]): bout_data[n_summarized_bouts + i_bout, sl] = [ bout.t.iloc[idx], bout.x.iloc[idx], bout.y.iloc[idx], bout.theta.iloc[idx], ] n_summarized_bouts += len(bouts[i_fish]) bout_data_df = pd.DataFrame(bout_data, columns=headers) if continuity: bout_data_df["follows_previous"] = np.concatenate(continuity) # if there are multiple fish tracked in the same experiments, assign the # identities (there is no guarantee that the identity will be consistent if the # fish cross or go outside of the visible region) if len(bouts) > 1: origin_fish = np.concatenate( [np.full(len(bouts[i]), i, dtype=np.uint8) for i in range(len(bouts))] ) bout_data_df.insert(0, "i_fish", origin_fish) return bout_data_df
[docs] @decorators.cache_results(cache_filename="behavior_log") def reconstruct_missing_segments(self, continue_curvature=None): for i_fish in range(self.n_fish): segments = self.behavior_log.loc[:, self.tail_columns[i_fish]].values.copy() if "f{}_missing_n".format(i_fish) in self.behavior_log.columns: revert_pts = self.behavior_log["f{}_missing_n".format(i_fish)].values else: revert_pts = None # Revert if possible if continue_curvature is None: if continue_curvature is None: if revert_pts is not None: fixed_segments = utilities.revert_segment_filling( segments, revert_pts=revert_pts, ) self.behavior_log.loc[:, self.tail_columns[i_fish]] = fixed_segments # Otherwise, use the parameter to do the filling: else: fixed_segments, missing_n = utilities.fill_out_segments( segments, continue_curvature=continue_curvature, revert_pts=revert_pts, ) self.behavior_log.loc[:, self.tail_columns[i_fish]] = fixed_segments self.behavior_log["f{}_missing_n".format(i_fish)] = missing_n return self.behavior_log