protocols#

Protocol definitions for hardware instruments and data management.

This module defines the core interfaces that standardize behavior across the mesofield project, allowing for interoperability between different hardware instruments, data producers, and data consumers.

Protocol Implementation Notes#

When implementing these protocols, there are two approaches:

  1. Direct inheritance (for regular classes without metaclass conflicts):

    class MySensor(DataAcquisitionDevice):
        def __init__(self):
            self._init_logger()
            # Implement required methods and attributes
    
  2. Duck typing (for classes with existing inheritance or metaclass conflicts, e.g. QThread):

    class MyQThreadSensor(QThread):
        def __init__(self):
            super().__init__()
            self._init_logger()
            self.device_type = "sensor"
            self.device_id = "my_sensor"
    

The second approach is necessary for Qt classes (QObject, QThread, QWidget) or any class that already uses a metaclass. Protocol checking uses duck typing internally, so both approaches work with our system.

class mesofield.protocols.Procedure[source]#

Bases: Protocol

Protocol defining the standard interface for experiment procedures.

initialize_hardware()[source]#

Setup the experiment procedure.

Return type:

None

run()[source]#

Run the experiment procedure.

Return type:

None

save_data()[source]#

Save data from the experiment.

Return type:

None

cleanup()[source]#

Clean up after the experiment procedure.

Return type:

None

class mesofield.protocols.HardwareDevice[source]#

Bases: Protocol

Protocol defining the standard interface for all hardware devices.

Lifecycle: initialize -> arm -> start -> stop -> shutdown. Every device exposes self.signals (a mesofield.signals.DeviceSignals) carrying started, finished, and data emitters.

initialize()[source]#

One-time setup (open ports, load configs). Idempotent.

Return type:

bool

arm(config)[source]#

Per-run preparation (writers, output paths, sequence build).

Called by HardwareManager.arm_all immediately before start_all. Devices without per-run prep may no-op.

Parameters:

config (ExperimentConfig)

Return type:

None

stop()[source]#

Stop the device after a run.

shutdown()[source]#

Close and clean up resources.

Return type:

None

status()[source]#

Get the current status of the device.

Return type:

Dict[str, Any]

property metadata: Dict[str, Any]#

Return metadata about the hardware.

class mesofield.protocols.DataProducer[source]#

Bases: HardwareDevice, Protocol

Protocol for hardware that produces data streamed to the DataQueue.

start()[source]#

Start data acquisition. Should emit signals.started.

Return type:

bool

stop()[source]#

Stop data acquisition. Should emit signals.finished.

Return type:

bool

save_data(path=None)[source]#

Persist the captured data.

Parameters:

path (str | None)

get_data()[source]#

Return the latest data.

Return type:

Any | None

class mesofield.protocols.FrameProcessor[source]#

Bases: Protocol

Protocol for optional real-time per-frame consumers.

A FrameProcessor subscribes to a DataProducer camera’s signals.frame (carrying (img, idx, device_ts)) and emits a scalar result on its own signals.data and on a Qt-compatible valueUpdated(time, value) signal. See mesofield.processors for the threaded reference base class.

attach(camera)[source]#

Subscribe to camera.signals.frame and start processing.

Parameters:

camera (DataProducer)

Return type:

None

detach()[source]#

Disconnect and stop the worker.

Return type:

None

compute(img, idx, ts)[source]#

Return a scalar for this frame, or None to skip.

Parameters:
Return type:

float | None

class mesofield.protocols.StimulusDevice[source]#

Bases: HardwareDevice, Protocol

Protocol for stimulus-presentation devices (e.g. PsychoPy).

Like HardwareDevice but explicitly not a data producer: consumers should not expect data signal emissions and should not call save_data/get_data.

start()[source]#

Begin stimulus presentation.

Return type:

bool

class mesofield.protocols.DataConsumer[source]#

Bases: Protocol

Protocol defining the interface for data-consuming components.

property name: str#

Return the name of the data consumer.

property get_supported_data_types: List[str]#

Return the types of data this consumer can process.

process_data(data, metadata)[source]#

Process data with metadata.

Parameters:
  • data (Any) – The data to process.

  • metadata (Dict[str, Any]) – Metadata about the data, including source, timestamp, etc.

Returns:

True if data was processed successfully, False otherwise.

Return type:

bool

class mesofield.protocols.ThreadedHardwareDevice[source]#

Bases: object

Mixin for implementing the HardwareDevice protocol with Python’s threading.

This mixin provides the basic structure for a hardware device that uses Python’s threading module. It handles the thread creation, starting, and stopping.

Example

class MySensor(ThreadedHardwareDevice):
    device_type = "sensor"
    device_id = "my_sensor"

    def __init__(self, config=None):
        super().__init__()
        self.config = config or {}

    def initialize(self):
        pass

    def _run(self):
        while not self._stop_event.is_set():
            # Do work
            pass

    def get_status(self):
        return {"active": not self._stop_event.is_set()}
__init__()[source]#
start()[source]#

Start the device thread.

Return type:

bool

stop()[source]#

Stop the device thread.

Return type:

bool

close()[source]#

Close the device and clean up resources.

Return type:

None

class mesofield.protocols.AsyncioHardwareDevice[source]#

Bases: object

Mixin for implementing the HardwareDevice protocol with asyncio.

This mixin provides the basic structure for a hardware device that uses Python’s asyncio module. It handles the task creation, starting, and cancellation.

Example

class MySensor(AsyncioHardwareDevice):
    device_type = "sensor"
    device_id = "my_sensor"

    def __init__(self, loop=None, config=None):
        super().__init__(loop)
        self.config = config or {}

    def initialize(self):
        pass

    async def _run(self):
        while True:
            if self._should_stop():
                break
            await asyncio.sleep(0.01)

    def get_status(self):
        return {"active": self._task is not None and not self._task.done()}
__init__(loop=None)[source]#
start()[source]#

Start the device task.

Return type:

bool

stop()[source]#

Stop the device task.

Return type:

bool

close()[source]#

Close the device and clean up resources.

Return type:

None