"""Dataqueue synchronization data source.
This loader ingests the central ``*_dataqueue.csv`` log produced by the rig and
returns a stream describing every message with absolute and relative timing.
It is commonly used to align per-device clocks (treadmill, cameras, nidaq) by
extracting anchor pairs for downstream timeline fitting.
"""
from typing import Optional
import pandas as pd
import numpy as np
from pathlib import Path
from mesofield.datakit.sources.register import LoadContext, TimeseriesSource
[docs]
class DataqueueSource(TimeseriesSource):
"""Load the dataqueue CSV as a time-indexed table."""
tag = "dataqueue"
patterns = ("**/*_dataqueue.csv",)
camera_tag = None
time_column = "queue_elapsed"
device_id_column = "device_id"
device_timestamp_column = "device_ts"
payload_column = "payload"
master_device_priority: tuple[str, ...] = ("dhyana", "mesoscope")
device_alias_patterns: tuple[tuple[str, str], ...] = (
("dhyana", "meso"),
("mesoscope", "meso"),
("thorcam", "pupil"),
)
[docs]
def build_timeseries(
self,
path: Path,
*,
context: LoadContext | None = None,
) -> tuple[np.ndarray, pd.DataFrame, dict]:
"""Read ``*_dataqueue.csv`` and return a time-indexed table."""
raw = pd.read_csv(path, low_memory=False)
if self.time_column not in raw.columns:
raise ValueError(f"Dataqueue file missing required time column '{self.time_column}': {path}")
queue_numeric = pd.to_numeric(raw[self.time_column], errors="coerce")
queue_elapsed = queue_numeric.to_numpy(dtype=np.float64)
if queue_elapsed.size == 0:
raise ValueError("Dataqueue has no timing samples")
queue_start = float(queue_elapsed[0])
t = queue_elapsed - queue_start
master_id = self._select_master_device(raw)
master_elapsed = self._master_elapsed(raw, master_id, queue_start)
if master_elapsed.size == 0:
master_elapsed = t
device_elapsed = self._build_device_elapsed(raw, queue_start)
device_ts = self._build_device_ts(raw)
device_aliases = self._build_device_aliases(raw)
affine = self._fit_master_affine(raw, master_id, queue_start)
device_aligned = self._apply_affine(device_elapsed, affine)
frame = raw.copy()
frame["time_elapsed_s"] = t.astype(np.float64)
per_device_start_map: dict[str, float] = {}
if self.device_id_column in frame.columns:
per_device_start = (
frame.groupby(self.device_id_column)[self.time_column]
.transform("min")
.astype(np.float64)
)
per_device_start_map = {
str(device_id): float(value)
for device_id, value in (
frame.groupby(self.device_id_column)[self.time_column]
.min()
.items()
)
}
# `device_elapsed_s` is `queue_elapsed - per_device_start`; keep the
# offsets in meta and drop the redundant full-length column.
# Memory optimizations: parse timestamp strings to datetime64, low-cardinality
# string columns to categoricals. These columns are stored as full-length
# ndarrays per row in the materialized output, so per-value bytes matter.
#
# ``payload`` is mostly numeric (TTL levels, counts) with a sparse mix of
# semantic strings (e.g. ``EncoderData(...)``). The dataqueue itself
# treats payloads as opaque — sources that need the semantic content
# (such as ``TreadmillSource``) re-read the raw CSV via
# ``context.path_for("dataqueue")`` and parse it themselves. Here we
# just downcast the whole column to ``float32`` (non-numeric entries
# become NaN) to keep the materialized output small.
if self.device_timestamp_column in frame.columns:
parsed = pd.to_datetime(frame[self.device_timestamp_column], errors="coerce", utc=True)
if not parsed.isna().all():
frame[self.device_timestamp_column] = parsed
if "packet_ts" in frame.columns:
parsed = pd.to_datetime(frame["packet_ts"], errors="coerce", utc=True)
if not parsed.isna().all():
frame["packet_ts"] = parsed
if self.device_id_column in frame.columns:
frame[self.device_id_column] = frame[self.device_id_column].astype("category")
if self.payload_column in frame.columns:
frame[self.payload_column] = (
pd.to_numeric(frame[self.payload_column], errors="coerce")
.astype(np.float32)
)
meta = {
"source_file": str(path),
"n_entries": int(len(raw)),
"master_device_id": master_id,
"master_queue_start": float(queue_start),
"per_device_queue_start": per_device_start_map,
"device_sample_rate_hz": self._estimate_device_rates(device_elapsed),
"device_aliases": device_aliases,
"affine": affine,
"time_basis": self.time_column,
}
return t.astype(np.float64), frame, meta
def _select_master_device(self, df: pd.DataFrame) -> str:
"""Pick the device used as the master time basis."""
if self.device_id_column not in df.columns:
return "master"
ids = df[self.device_id_column].astype(str).fillna("")
for pattern in self.master_device_priority:
mask = ids.str.contains(pattern, case=False, regex=False)
if mask.any():
return str(ids[mask].iloc[0])
non_empty = ids[ids != ""]
if not non_empty.empty:
return str(non_empty.iloc[0])
return "master"
def _master_elapsed(self, df: pd.DataFrame, master_id: str, queue_start: float) -> np.ndarray:
"""Return elapsed seconds for the master device."""
device_series = df.get(self.device_id_column)
if device_series is None:
return np.array([], dtype=np.float64)
mask = device_series.astype(str) == master_id
master_rows = df.loc[mask]
if master_rows.empty:
return np.array([], dtype=np.float64)
master_queue = pd.to_numeric(master_rows[self.time_column], errors="coerce").dropna()
if master_queue.empty:
return np.array([], dtype=np.float64)
master_queue = master_queue - float(master_queue.iloc[0])
return master_queue.to_numpy(dtype=np.float64)
def _estimate_rate(self, timeline: np.ndarray) -> float:
if timeline.size < 2:
return 0.0
diffs = np.diff(timeline)
diffs = diffs[np.isfinite(diffs) & (diffs > 0)]
if diffs.size == 0:
return 0.0
return float(1.0 / np.median(diffs))
def _build_device_elapsed(self, df: pd.DataFrame, queue_start: float) -> dict[str, np.ndarray]:
"""Return per-device elapsed seconds from queue timestamps."""
device_elapsed: dict[str, np.ndarray] = {}
if self.device_id_column not in df.columns:
return device_elapsed
for device_id, group in df.groupby(self.device_id_column):
device_key = str(device_id)
queue_rel = pd.to_numeric(group[self.time_column], errors="coerce").to_numpy(dtype=np.float64)
queue_rel = queue_rel - float(queue_start)
order = np.argsort(queue_rel)
device_elapsed[device_key] = queue_rel[order]
return device_elapsed
def _build_device_ts(self, df: pd.DataFrame) -> dict[str, np.ndarray]:
"""Return per-device raw timestamps as ISO strings when possible."""
device_ts: dict[str, np.ndarray] = {}
if self.device_id_column not in df.columns or self.device_timestamp_column not in df.columns:
return device_ts
for device_id, group in df.groupby(self.device_id_column):
device_key = str(device_id)
parsed = pd.to_datetime(group[self.device_timestamp_column], errors="coerce", utc=True)
if parsed.isna().all():
device_ts[device_key] = group[self.device_timestamp_column].to_numpy()
else:
device_ts[device_key] = parsed.dt.strftime("%Y-%m-%dT%H:%M:%S.%fZ").to_numpy()
return device_ts
def _build_device_aliases(self, df: pd.DataFrame) -> dict[str, str]:
"""Map device identifiers to configured aliases (e.g., Dhyana → meso)."""
aliases: dict[str, str] = {}
if self.device_id_column not in df.columns:
return aliases
ids = df[self.device_id_column].dropna().astype(str).unique().tolist()
for device_id in ids:
alias = self._alias_for_device(device_id)
if alias is not None:
aliases[device_id] = alias
return aliases
def _alias_for_device(self, device_id: str) -> Optional[str]:
lowered = device_id.lower()
for pattern, alias in self.device_alias_patterns:
if pattern.lower() in lowered:
return alias
return None
def _fit_master_affine(
self,
df: pd.DataFrame,
master_id: str,
queue_start: float,
) -> dict[str, float] | None:
"""Fit affine map from elapsed seconds to absolute time using master device."""
if self.device_id_column not in df.columns or self.device_timestamp_column not in df.columns:
return None
master_rows = df.loc[df[self.device_id_column].astype(str) == master_id]
if master_rows.empty:
return None
elapsed = pd.to_numeric(master_rows[self.time_column], errors="coerce").to_numpy(dtype=np.float64)
elapsed = elapsed - float(queue_start)
absolute = pd.to_datetime(master_rows[self.device_timestamp_column], errors="coerce", utc=True)
if elapsed.size < 2 or absolute.isna().all():
return None
valid_mask = np.isfinite(elapsed) & ~absolute.isna()
if valid_mask.sum() < 2:
return None
elapsed = elapsed[valid_mask]
absolute = absolute[valid_mask]
e0 = float(elapsed[0])
e1 = float(elapsed[-1])
a0 = float(absolute.iloc[0].value) / 1e9
a1 = float(absolute.iloc[-1].value) / 1e9
if e1 == e0:
return None
a = (a1 - a0) / (e1 - e0)
b = a0 - a * e0
return {"a": float(a), "b": float(b)}
def _apply_affine(
self,
device_elapsed: dict[str, np.ndarray],
affine: dict[str, float] | None,
) -> dict[str, np.ndarray]:
"""Project elapsed times into aligned absolute seconds (UTC)."""
if affine is None:
return {}
a = affine.get("a")
b = affine.get("b")
if a is None or b is None:
return {}
aligned: dict[str, np.ndarray] = {}
for device_id, elapsed in device_elapsed.items():
aligned[device_id] = (a * elapsed + b).astype(np.float64)
return aligned
def _estimate_device_rates(self, device_elapsed: dict[str, np.ndarray]) -> dict[str, float]:
return {device_id: self._estimate_rate(values) for device_id, values in device_elapsed.items()}