manager#

class mesofield.data.manager.DataPacket[source]#

Bases: object

Entry in DataQueue.

__init__(device_id, timestamp, payload, device_ts=None, meta=<factory>)#
Parameters:
Return type:

None

class mesofield.data.manager.DataQueue[source]#

Bases: object

Thread-safe queue for data streaming between devices and consumers.

Registered DataProducer (the DataManager) devices can push data packets to this queue, which can then be consumed by other parts of the system.

__init__(maxsize=0)[source]#
Parameters:

maxsize (int)

Return type:

None

push(device_id, payload, *, timestamp=None, device_ts=None, **meta)[source]#

Add a new data packet to the queue.

Parameters:
Return type:

None

pop(block=True, timeout=None)[source]#

Return the next DataPacket from the queue.

Parameters:
Return type:

DataPacket

class mesofield.data.manager.DataPaths[source]#

Bases: object

Structured storage for all output paths used by the DataSaver.

Relies on the ExperimentConfig to generate paths based on the experiment’s BIDS directory using the make_path method, which relies on HardwareDevices implementing a file_type, bids_type.

These paths can then be passed to HardwareDevice.save_data(path)

__init__(configuration, notes, timestamps, hardware=<factory>, writers=<factory>, queue='')#
Parameters:
Return type:

None

class mesofield.data.manager.DataSaver[source]#

Bases: object

Helper class for saving experiment data to disk.

This class handles saving configuration, hardware data, notes, timestamps, and camera data to disk.

Takes paths generated from DataPaths instance and calls Device.save_data(path) on all hardware devices that implement this method.

(NOTE: If a device does not implement save_data, it will be skipped.)

Takes an ExperimentConfig to save configuration.

save_queue(rows, path=None)[source]#

Save queued data rows to CSV file specified in DataPaths or override path.

Rows are produced by DataManager._queue_writer_loop as [queue_elapsed, packet_ts, device_ts, device_id, payload]. If any payload is a dict, its keys are fanned out into dedicated columns; the payload column then holds the non-dict payloads only (blank when a row has dict columns).

Parameters:
Return type:

None

__init__(cfg, logger=<factory>)#
Parameters:
Return type:

None

class mesofield.data.manager.DataManager[source]#

Bases: object

Wrapper providing DataSaver and DataQueue.

__init__()[source]#
Return type:

None

setup(config, devices=None)[source]#

Attach configuration and optionally register devices.

Parameters:
Return type:

None

register_devices(devices)[source]#

Register a list of hardware devices with the manager.

Any device exposing a mesofield.signals.DeviceSignals bundle on device.signals is registered. Devices that do not emit on signals.data (e.g. stimulus devices) are still tracked but contribute nothing to the queue.

Parameters:

devices (Iterable[Any])

Return type:

None

start_queue_logger(path=None)[source]#

Begin capturing DataQueue contents.

Parameters:

path (str | None)

Return type:

None

stop_queue_logger()[source]#

Stop the queue logging thread and flush data to disk.

Return type:

None

register_hardware_device(device)[source]#

Track a hardware device and connect its data stream to the queue.

Devices expose a uniform mesofield.signals.DeviceSignals bundle on device.signals. We simply connect signals.data to push (payload, device_ts) packets onto the queue. Devices that never emit on signals.data (e.g. StimulusDevice) are registered but never produce queue entries.

Parameters:

device (Any)

Return type:

None

get_device_outputs(subject, session)[source]#

Return a DataFrame of output file paths for registered devices.

Parameters:
Return type:

DataFrame