Skip to content

alicatlib

Top-level re-exports. Most users only need names from this module.

alicatlib

alicatlib — Python library for the full Alicat instrument matrix.

Covers {flow, pressure} × {meter, controller} × {gas, liquid}, plus the CODA Coriolis line. Orthogonal :class:Medium gating (design §5.9a) lets every command declare which media it applies to; the session refuses cross-medium dispatch pre-I/O with :class:AlicatMediumMismatchError. Devices whose prefix doesn't uniquely determine the configured medium can be narrowed at open time via assume_media= on :func:~alicatlib.devices.factory.open_device.

Core API is async (built on anyio); a sync facade is available at :mod:alicatlib.sync for scripts, notebooks, and REPL use.

See docs/design.md for the architectural design.

AcquisitionSummary dataclass

AcquisitionSummary(
    started_at,
    finished_at=None,
    samples_emitted=0,
    samples_late=0,
    max_drift_ms=0.0,
)

Per-run summary owned by the recorder.

Mutability contract (per the cross-lib spec §M):

  • The recorder is the only writer. It updates counters in place during the run so progress polling (TUIs, dashboards) works without a separate API.
  • Consumers treat the summary as read-only.
  • :attr:finished_at is None while the recording is in flight and is set on context-manager exit.

Attributes:

Name Type Description
started_at datetime

Wall-clock at recorder entry.

finished_at datetime | None

Wall-clock at producer shutdown — None while the recording is in flight.

samples_emitted int

Count of per-tick batches actually pushed onto the receive stream. Partial batches (some devices errored under ErrorPolicy.RETURN) still count as one emitted batch.

samples_late int

Count of ticks that missed their target slot (producer overran the previous tick, or overflow policy dropped the batch).

max_drift_ms float

Largest observed positive drift of an emitted batch relative to its absolute target, in milliseconds. A healthy run stays well under one period; values approaching 1000 / rate_hz indicate the device or consumer is saturating the schedule.

AlicatCapabilityError

AlicatCapabilityError(message='', *, context=None)

Bases: AlicatError

The device cannot perform the requested command.

Source code in src/alicatlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

AlicatCommandRejectedError

AlicatCommandRejectedError(message='', *, context=None)

Bases: AlicatProtocolError

The device replied with its error marker (? / similar).

Source code in src/alicatlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

AlicatConfig dataclass

AlicatConfig(
    default_timeout_s=0.5,
    multiline_timeout_s=1.0,
    write_timeout_s=0.5,
    default_baudrate=19200,
    drain_before_write=False,
    save_rate_warn_per_min=10,
    eager_tasks=False,
)

Process-wide default settings.

Individual sessions may override any of these at construction time.

Attributes:

Name Type Description
default_timeout_s float

Default per-command response timeout for single-line commands, in seconds.

multiline_timeout_s float

Default response timeout for multiline table commands (??M*, ??D*, gas list), in seconds. Larger than default_timeout_s because the table commands are paced at device speed across 5–20 lines.

write_timeout_s float

Upper bound on a single Transport.write call, in seconds. Writes can block on RS-485 hardware flow control, a hung device, or a TCP transport's send buffer; this bounds that.

default_baudrate int

Default serial baudrate when none is specified.

drain_before_write bool

Whether the protocol client should drain any stale input bytes before each command. Useful for re-syncing after a timeout; adds latency per command.

save_rate_warn_per_min int

EEPROM-wear warning threshold. Any command carrying a save=True flag (active gas, PID/PDF gains, deadband, batch, valve offset, totalizer save, setpoint source, …) logs at WARN when fired more than this many times per minute per device. See design §5.20.7.

eager_tasks bool

Opt-in to asyncio.eager_task_factory on the running event loop. Skips one event-loop round-trip when a newly-created task's first await doesn't suspend — a measurable win under tight command loops. Off by default because it is a scheduling semantic change (tasks that return before their first suspension never hit the loop). No-op on trio. See design §5.2 and the :func:alicatlib._runtime.install_eager_task_factory helper users call near app startup.

replace

replace(**updates)

Return a copy of this config with updates applied.

Source code in src/alicatlib/config.py
def replace(self, **updates: Any) -> Self:
    """Return a copy of this config with ``updates`` applied."""
    return replace(self, **updates)

AlicatConfigurationError

AlicatConfigurationError(message='', *, context=None)

Bases: AlicatError

User-supplied configuration was invalid.

Source code in src/alicatlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

AlicatConnectionError

AlicatConnectionError(message='', *, context=None)

Bases: AlicatTransportError

Connection could not be established or was lost.

Source code in src/alicatlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

AlicatDeviceSnapshot dataclass

AlicatDeviceSnapshot(
    name,
    model,
    firmware,
    serial,
    connected,
    last_error,
    recoverable_error_count,
    captured_at,
    unit_id,
    media,
    capabilities,
)

Bases: DeviceSnapshot

Alicat-specific :class:DeviceSnapshot extras.

Adds the Alicat-native bus address, media flags, and capability set. Inherits every base field from :class:DeviceSnapshot so consumers that want only the cross-lib surface can still pattern- match on the base type.

AlicatDiscoveryError

AlicatDiscoveryError(message='', *, context=None)

Bases: AlicatError

Device discovery failed.

Source code in src/alicatlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

AlicatError

AlicatError(message='', *, context=None)

Bases: Exception

Base class for every exception raised by :mod:alicatlib.

Carries a typed :class:ErrorContext. The message is the human-readable summary; the context is the machine-readable detail.

Source code in src/alicatlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

with_context

with_context(**updates)

Return a copy of this error with its context updated.

Useful when an inner layer raises and an outer layer wants to enrich the context (for instance adding port or elapsed_s).

Allocates a fresh instance via cls.__new__ and copies attribute state directly. Avoids re-invoking __init__ — many subclasses (AlicatMediumMismatchError, AlicatFirmwareError, UnknownGasError and friends) have bespoke keyword-only signatures that don't accept (message, *, context=), and copy.copy would silently dispatch through them via :meth:Exception.__reduce__.

Source code in src/alicatlib/errors.py
def with_context(self, **updates: Any) -> Self:
    """Return a copy of this error with its context updated.

    Useful when an inner layer raises and an outer layer wants to enrich
    the context (for instance adding ``port`` or ``elapsed_s``).

    Allocates a fresh instance via ``cls.__new__`` and copies attribute
    state directly. Avoids re-invoking ``__init__`` — many subclasses
    (``AlicatMediumMismatchError``, ``AlicatFirmwareError``,
    ``UnknownGasError`` and friends) have bespoke keyword-only
    signatures that don't accept ``(message, *, context=)``, and
    ``copy.copy`` would silently dispatch through them via
    :meth:`Exception.__reduce__`.
    """
    cls = type(self)
    new = cls.__new__(cls)
    # ``Exception`` slot state lives in ``self.args``; reuse it so
    # ``str(err)`` keeps the original message.
    new.args = self.args
    # Copy subclass-specific attributes (value, field_name, required_min, ...).
    # Use ``__dict__`` directly when present (most subclasses), and fall back
    # to slot iteration if a frozen variant ever appears.
    try:
        new.__dict__.update(self.__dict__)
    except AttributeError:  # pragma: no cover — no slotted subclass today
        for slot in getattr(cls, "__slots__", ()):
            if hasattr(self, slot):
                object.__setattr__(new, slot, getattr(self, slot))
    new.context = self.context.merged(**updates)
    new.__cause__ = self.__cause__
    new.__context__ = self.__context__
    new.__traceback__ = self.__traceback__
    return new

AlicatFirmwareError

AlicatFirmwareError(
    *,
    command,
    reason,
    actual=None,
    required_min=None,
    required_max=None,
    required_families=None,
    context=None,
)

Bases: AlicatCapabilityError

The device's firmware version is outside the command's supported range.

Source code in src/alicatlib/errors.py
def __init__(
    self,
    *,
    command: str,
    reason: str,
    actual: FirmwareVersion | None = None,
    required_min: FirmwareVersion | None = None,
    required_max: FirmwareVersion | None = None,
    required_families: frozenset[FirmwareFamily] | None = None,
    context: ErrorContext | None = None,
) -> None:
    self.command = command
    self.reason = reason
    self.actual = actual
    self.required_min = required_min
    self.required_max = required_max
    self.required_families = required_families
    required = ""
    if required_min is not None or required_max is not None:
        lo = str(required_min) if required_min is not None else "*"
        hi = str(required_max) if required_max is not None else "*"
        required = f" (requires {lo}..{hi}, have {actual})"
    elif required_families:
        fams = ", ".join(f.value for f in sorted(required_families, key=lambda x: x.value))
        required = f" (requires family in {{{fams}}}, have {actual})"
    super().__init__(
        f"Firmware check failed for {command}: {reason}{required}",
        context=context,
    )

AlicatManager

AlicatManager(*, error_policy=ErrorPolicy.RAISE)

Coordinator for many devices across one or more serial ports.

Operations run concurrently across different physical ports (via :func:anyio.create_task_group) and serialise on the same-port client lock. Per-device failures are surfaced per :attr:error_policy:

  • :attr:ErrorPolicy.RAISE: the manager still collects results from every device, then raises an :class:ExceptionGroup if any failed.
  • :attr:ErrorPolicy.RETURN: the mapping's values carry :class:DeviceResult containers with .value or .error.

Usage::

async with AlicatManager() as mgr:
    await mgr.add("fuel", "/dev/ttyUSB0")
    await mgr.add("air", "/dev/ttyUSB1")
    frames = await mgr.poll()
Source code in src/alicatlib/manager.py
def __init__(self, *, error_policy: ErrorPolicy = ErrorPolicy.RAISE) -> None:
    self._error_policy = error_policy
    self._devices: dict[str, _DeviceEntry] = {}
    self._ports: dict[str, _PortEntry] = {}
    # Guards state mutation on ``add`` / ``remove`` / ``close``.
    # The per-port client lock serialises I/O, so we only need
    # to serialise the manager's bookkeeping here.
    self._state_lock = anyio.Lock()
    self._closed = False

closed property

closed

True once :meth:close has been called.

error_policy property

error_policy

The :class:ErrorPolicy this manager was constructed with.

names property

names

Insertion-ordered tuple of managed device names.

__aenter__ async

__aenter__()

Enter the async context — returns self for chaining.

Source code in src/alicatlib/manager.py
async def __aenter__(self) -> Self:
    """Enter the async context — returns ``self`` for chaining."""
    return self

__aexit__ async

__aexit__(exc_type, exc, tb)

Close every managed device + port on exit.

Source code in src/alicatlib/manager.py
async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc: BaseException | None,
    tb: TracebackType | None,
) -> None:
    """Close every managed device + port on exit."""
    del exc_type, exc, tb
    await self.close()

add async

add(name, source, *, unit_id='A', serial=None, timeout=0.5)

Register and open a device under name.

The source discriminates lifecycle ownership:

  • Device — pre-built (via :func:open_device outside the manager). The manager only tracks the name mapping; it does not take lifecycle ownership.
  • str — serial port path ("/dev/ttyUSB0", "COM3"). The manager creates a :class:~alicatlib.transport.serial.SerialTransport and :class:AlicatProtocolClient, canonicalises the port key, and reuses them across multi-device buses (RS-485).
  • :class:Transport — duck-typed transport. The manager wraps it in a new client but does not take transport ownership (the caller keeps open/close responsibility).
  • :class:AlicatProtocolClient — use as-is; the manager does not close it.

Parameters:

Name Type Description Default
name str

Unique manager-level identifier. Must not already exist on this manager.

required
source Device | str | Transport | AlicatProtocolClient

One of the four lifecycle shapes above.

required
unit_id str

Bus-level letter for the device. "A" is the polling default; multiple devices on the same port get distinct unit ids.

'A'
serial SerialSettings | None

:class:SerialSettings override. Only honoured when source is a port-string — ignored otherwise (pre-built transports carry their own settings).

None
timeout float

Default command timeout passed through to :func:open_device.

0.5

Returns:

Type Description
Device

The identified :class:Device (a :class:FlowMeter,

Device

class:FlowController, etc. subclass).

Raises:

Type Description
AlicatValidationError

name already exists or serial was supplied with a non-string source.

AlicatConnectionError

The manager is closed.

Source code in src/alicatlib/manager.py
async def add(
    self,
    name: str,
    source: Device | str | Transport | AlicatProtocolClient,
    *,
    unit_id: str = "A",
    serial: SerialSettings | None = None,
    timeout: float = 0.5,
) -> Device:
    """Register and open a device under ``name``.

    The ``source`` discriminates lifecycle ownership:

    - ``Device`` — pre-built (via :func:`open_device` outside the
      manager). The manager only tracks the name mapping; it does
      *not* take lifecycle ownership.
    - ``str`` — serial port path (``"/dev/ttyUSB0"``, ``"COM3"``).
      The manager creates a
      :class:`~alicatlib.transport.serial.SerialTransport` and
      :class:`AlicatProtocolClient`, canonicalises the port key,
      and reuses them across multi-device buses (RS-485).
    - :class:`Transport` — duck-typed transport. The manager wraps
      it in a new client but does *not* take transport ownership
      (the caller keeps open/close responsibility).
    - :class:`AlicatProtocolClient` — use as-is; the manager does
      not close it.

    Args:
        name: Unique manager-level identifier. Must not already
            exist on this manager.
        source: One of the four lifecycle shapes above.
        unit_id: Bus-level letter for the device. ``"A"`` is the
            polling default; multiple devices on the same port
            get distinct unit ids.
        serial: :class:`SerialSettings` override. Only honoured
            when ``source`` is a port-string — ignored otherwise
            (pre-built transports carry their own settings).
        timeout: Default command timeout passed through to
            :func:`open_device`.

    Returns:
        The identified :class:`Device` (a :class:`FlowMeter`,
        :class:`FlowController`, etc. subclass).

    Raises:
        AlicatValidationError: ``name`` already exists or
            ``serial`` was supplied with a non-string source.
        AlicatConnectionError: The manager is closed.
    """
    async with self._state_lock:
        self._check_open()
        if name in self._devices:
            raise AlicatValidationError(
                f"manager: name {name!r} already in use",
                context=ErrorContext(extra={"name": name}),
            )
        if serial is not None and not isinstance(source, str):
            raise AlicatValidationError(
                "manager.add(serial=...) only applies to string port sources; "
                "pre-built Transport / AlicatProtocolClient carry their own settings",
                context=ErrorContext(extra={"name": name}),
            )

        port_key, port_entry, owns_device = await self._resolve_source(
            source,
            serial=serial,
            timeout=timeout,
        )

        # ``open_device`` runs identification + probes. If it raises,
        # we must not leave the port's ref count dangling.
        try:
            if owns_device:
                assert port_entry is not None  # noqa: S101 — narrow for mypy
                device = await open_device(port_entry.client, unit_id=unit_id, timeout=timeout)
            else:
                # ``source`` was a pre-built Device.
                assert isinstance(source, Device)  # noqa: S101 — narrow for mypy
                device = source
        except BaseException:
            if port_entry is not None and port_key is not None and name not in port_entry.refs:
                # We created a brand-new port just for this add —
                # unwind it rather than leaking the transport.
                await self._maybe_teardown_port(port_key, port_entry)
            raise

        self._devices[name] = _DeviceEntry(
            name=name,
            device=device,
            port_key=port_key,
            owns_device=owns_device,
        )
        if port_entry is not None:
            port_entry.refs.add(name)

        _logger.info(
            "manager.add",
            extra={
                "device_name": name,
                "port_key": port_key,
                "unit_id": unit_id,
                "model": device.info.model,
                "firmware": str(device.info.firmware),
            },
        )
        return device

close async

close()

Tear down every managed device and port (LIFO).

Idempotent: safe to call from both :meth:__aexit__ and explicit user code. Individual close failures are caught and logged so one device's shutdown error doesn't strand the others.

Source code in src/alicatlib/manager.py
async def close(self) -> None:
    """Tear down every managed device and port (LIFO).

    Idempotent: safe to call from both :meth:`__aexit__` and
    explicit user code. Individual close failures are caught and
    logged so one device's shutdown error doesn't strand the
    others.
    """
    async with self._state_lock:
        if self._closed:
            return
        # Unwind in reverse insertion order — LIFO per design §5.13.
        for name in reversed(list(self._devices.keys())):
            entry = self._devices.pop(name)
            try:
                await self._teardown_device(entry)
            except Exception as err:  # noqa: BLE001 — best-effort manager teardown: keep going so other devices still close
                _logger.warning(
                    "manager.close_device_failed",
                    extra={"device_name": name, "error": repr(err)},
                )
        # Any port entries that survived (e.g. because a pre-built
        # client source never got refs torn down) are left alone —
        # the caller owns them.
        self._closed = True

execute async

execute(command, requests_by_name)

Dispatch a per-device Command across the requested names.

requests_by_name chooses both which devices participate and what arguments each gets — supporting the common case of "same command, different setpoint per device".

Source code in src/alicatlib/manager.py
async def execute[Req, Resp](
    self,
    command: Command[Req, Resp],
    requests_by_name: Mapping[str, Req],
) -> Mapping[str, DeviceResult[Resp]]:
    """Dispatch a per-device ``Command`` across the requested names.

    ``requests_by_name`` chooses both which devices participate and
    what arguments each gets — supporting the common case of
    "same command, different setpoint per device".
    """
    for name in requests_by_name:
        if name not in self._devices:
            raise AlicatValidationError(
                f"manager.execute: no device named {name!r}",
                context=ErrorContext(command_name=command.name, extra={"name": name}),
            )
    targets = tuple(requests_by_name.keys())
    name_by_device_id = {id(entry.device): entry.name for entry in self._devices.values()}

    async def _execute(device: Device) -> Resp:
        return await device.session.execute(
            command,
            requests_by_name[name_by_device_id[id(device)]],
        )

    return await self._dispatch(command.name, targets, _execute)

get

get(name)

Return the device registered under name (raises if unknown).

Source code in src/alicatlib/manager.py
def get(self, name: str) -> Device:
    """Return the device registered under ``name`` (raises if unknown)."""
    try:
        return self._devices[name].device
    except KeyError:
        raise AlicatValidationError(
            f"manager: no device named {name!r}",
            context=ErrorContext(extra={"name": name}),
        ) from None

poll async

poll(names=None)

Poll every (or named) device concurrently across ports.

Returns a mapping from device name to :class:DeviceResult even under :attr:ErrorPolicy.RAISE — but under that policy, any failed device's error is re-raised as an :class:ExceptionGroup after all devices have completed.

Source code in src/alicatlib/manager.py
async def poll(
    self,
    names: Sequence[str] | None = None,
) -> Mapping[str, DeviceResult[Reading]]:
    """Poll every (or named) device concurrently across ports.

    Returns a mapping from device name to :class:`DeviceResult`
    even under :attr:`ErrorPolicy.RAISE` — but under that policy,
    any failed device's error is re-raised as an
    :class:`ExceptionGroup` after all devices have completed.
    """
    targets = self._resolve_names(names)

    async def _poll(device: Device) -> Reading:
        return await device.poll()

    return await self._dispatch("poll", targets, _poll)

remove async

remove(name)

Unregister and close the device named name.

If name was the last device on a shared port, the transport and client for that port are closed too. A pre-built :class:Device source is only dropped from the manager's registry — the caller retains lifecycle ownership.

Source code in src/alicatlib/manager.py
async def remove(self, name: str) -> None:
    """Unregister and close the device named ``name``.

    If ``name`` was the last device on a shared port, the
    transport and client for that port are closed too. A
    pre-built :class:`Device` source is only dropped from the
    manager's registry — the caller retains lifecycle ownership.
    """
    async with self._state_lock:
        self._check_open()
        if name not in self._devices:
            raise AlicatValidationError(
                f"manager: no device named {name!r}",
                context=ErrorContext(extra={"name": name}),
            )
        entry = self._devices.pop(name)
        await self._teardown_device(entry)
        _logger.info("manager.remove", extra={"device_name": name})

request async

request(statistics, names=None, *, averaging_ms=1)

Run :meth:Device.request across devices concurrently.

Every targeted device receives the same statistic list and averaging window — mirroring the primer's DV semantics.

Source code in src/alicatlib/manager.py
async def request(
    self,
    statistics: Sequence[Statistic | str],
    names: Sequence[str] | None = None,
    *,
    averaging_ms: int = 1,
) -> Mapping[str, DeviceResult[MeasurementSet]]:
    """Run :meth:`Device.request` across devices concurrently.

    Every targeted device receives the same statistic list and
    averaging window — mirroring the primer's ``DV`` semantics.
    """
    targets = self._resolve_names(names)

    async def _request(device: Device) -> MeasurementSet:
        return await device.request(statistics, averaging_ms=averaging_ms)

    return await self._dispatch("request", targets, _request)

AlicatMediumMismatchError

AlicatMediumMismatchError(
    *,
    command,
    device_media,
    command_media,
    hint=None,
    context=None,
)

Bases: AlicatConfigurationError

A command's declared medium doesn't intersect the device's configured medium.

Raised pre-I/O from :class:alicatlib.devices.session.Session at the media gate (design §5.4, §5.9a). The typical shape: calling :meth:Device.gas on a liquid-only device, or :meth:Device.fluid on a gas-only device. The error carries the mismatch in :attr:ErrorContext.device_media and :attr:ErrorContext.command_media and points at the remediation API in its message.

Source code in src/alicatlib/errors.py
def __init__(
    self,
    *,
    command: str,
    device_media: Medium,
    command_media: Medium,
    hint: str | None = None,
    context: ErrorContext | None = None,
) -> None:
    self.command = command
    self.device_media = device_media
    self.command_media = command_media
    suffix = f" — {hint}" if hint else ""
    super().__init__(
        (
            f"{command} requires medium {command_media.name or command_media!r} but "
            f"device is configured as {device_media.name or device_media!r}{suffix}"
        ),
        context=context,
    )

AlicatMissingHardwareError

AlicatMissingHardwareError(message='', *, context=None)

Bases: AlicatCapabilityError

The device lacks hardware the command requires.

Raised from :class:alicatlib.devices.session.Session before any I/O, using the :class:alicatlib.commands.base.Capability bits declared on the :class:alicatlib.commands.base.Command spec. More useful than letting the device silently respond ? — tells the caller exactly which capability is missing (BAROMETER, MULTI_VALVE, ANALOG_INPUT, ...). See design §5.17.

Source code in src/alicatlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

AlicatParseError

AlicatParseError(
    message,
    *,
    field_name=None,
    expected=None,
    actual=None,
    context=None,
)

Bases: AlicatProtocolError

A response could not be parsed into its typed model.

Source code in src/alicatlib/errors.py
def __init__(
    self,
    message: str,
    *,
    field_name: str | None = None,
    expected: object = None,
    actual: object = None,
    context: ErrorContext | None = None,
) -> None:
    self.field_name = field_name
    self.expected = expected
    self.actual = actual
    super().__init__(message, context=context)

AlicatProtocolError

AlicatProtocolError(message='', *, context=None)

Bases: AlicatError

The bytes arrived but did not parse as a valid Alicat response.

Source code in src/alicatlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

AlicatSinkDependencyError

AlicatSinkDependencyError(message='', *, context=None)

Bases: AlicatSinkError, AlicatConfigurationError

A sink's optional backing library is not installed.

Raised when the user instantiates (or calls open() on) a sink whose extras have not been installed — e.g. ParquetSink without alicatlib[parquet] or PostgresSink without alicatlib[postgres]. The message always names the exact extra to install so the remediation is copy-pasteable.

Multi-inherits :class:AlicatConfigurationError because callers that already branch on configuration errors (missing extras being a configuration problem from their perspective) keep working without changes.

Source code in src/alicatlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

AlicatSinkError

AlicatSinkError(message='', *, context=None)

Bases: AlicatError

Base class for errors raised by sinks (CSV, JSONL, SQLite, Parquet, Postgres).

Source code in src/alicatlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

AlicatSinkSchemaError

AlicatSinkSchemaError(message='', *, context=None)

Bases: AlicatSinkError

A batch's shape is incompatible with the sink's locked schema.

Raised when a sink has locked its schema on the first batch (or validated against an existing table) and a subsequent batch carries rows whose shape can't be reconciled — for example, a Postgres target table that's missing a required column, or a Parquet writer that would need a type change mid-file.

Dropping unknown optional columns is handled by a per-sink WARN log and does not raise.

Source code in src/alicatlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

AlicatSinkWriteError

AlicatSinkWriteError(message='', *, context=None)

Bases: AlicatSinkError

The backing store rejected a write.

Wraps the underlying driver exception (sqlite3, asyncpg, pyarrow) so downstream error handlers don't need to import optional dependencies. The original exception is preserved via raise ... from original so tracebacks remain intact.

Source code in src/alicatlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

AlicatStreamingModeError

AlicatStreamingModeError(message='', *, context=None)

Bases: AlicatProtocolError

A request/response command was attempted while the client was in streaming mode.

Source code in src/alicatlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

AlicatTimeoutError

AlicatTimeoutError(message='', *, context=None)

Bases: AlicatTransportError

An I/O timeout expired.

A timeout is never represented as an empty successful response.

Source code in src/alicatlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

AlicatTransportError

AlicatTransportError(message='', *, context=None)

Bases: AlicatError

Serial/TCP transport failed to move bytes.

Source code in src/alicatlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

AlicatUnitIdMismatchError

AlicatUnitIdMismatchError(message='', *, context=None)

Bases: AlicatProtocolError

The response's unit ID did not match the request's.

Source code in src/alicatlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

AlicatUnsupportedCommandError

AlicatUnsupportedCommandError(message='', *, context=None)

Bases: AlicatCapabilityError

The command is not supported on this device kind.

Source code in src/alicatlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

AlicatValidationError

AlicatValidationError(message='', *, context=None)

Bases: AlicatConfigurationError

Arguments failed validation before any I/O (range checks, missing confirm).

Source code in src/alicatlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

DeviceKind

Bases: StrEnum

What kind of Alicat device we're talking to.

Coarser than :class:alicatlib.commands.base.Capability — a flow meter might or might not have a barometer; a flow controller might have one, two, or three valves. Per-feature gating is via Capability; this enum just says "mass-flow meter vs mass-flow controller vs pressure meter ..." so commands can declare a short list of compatible kinds.

UNKNOWN class-attribute instance-attribute

UNKNOWN = 'unknown'

Catch-all for models the factory's MODEL_RULES table doesn't match.

A device with this kind still gets a generic :class:Device facade (poll() and execute() work); only commands whose device_kinds explicitly list UNKNOWN will dispatch — the session's kind-gating (§5.7) rejects the rest. This is the "loud silence" path: we'd rather tell users "unknown model, try model_hint" than silently classify a new MFC as a pressure controller.

DeviceResult dataclass

DeviceResult(value, error)

Per-device result container — value or error, never both.

The union is encoded as two optional fields (rather than an Either / Result ADT) so mypy's narrowing on ok reads cleanly at call sites without pattern matching.

Construct via the :meth:success / :meth:failure factories for cleaner call-site reads; direct keyword construction stays valid for backwards-compatible test fixtures.

Attributes:

Name Type Description
value T | None

The successful result, or None if the call failed.

error AlicatError | None

The captured :class:~alicatlib.errors.AlicatError, or None if the call succeeded.

ok property

ok

True when the device produced a value (error is None).

failure classmethod

failure(error)

Build a failed result wrapping error.

Source code in src/alicatlib/manager.py
@classmethod
def failure(cls, error: AlicatError) -> Self:
    """Build a failed result wrapping ``error``."""
    return cls(value=None, error=error)

success classmethod

success(value)

Build a successful result wrapping value.

Source code in src/alicatlib/manager.py
@classmethod
def success[ValueT](cls, value: ValueT) -> DeviceResult[ValueT]:
    """Build a successful result wrapping ``value``."""
    del cls
    return DeviceResult(value=value, error=None)

DeviceSnapshot dataclass

DeviceSnapshot(
    name,
    model,
    firmware,
    serial,
    connected,
    last_error,
    recoverable_error_count,
    captured_at,
)

Cross-lib status snapshot of a device.

Built by :meth:alicatlib.devices.base.Device.snapshot from cached identity + session counters — no I/O. Useful for status CLIs, dashboards, and healthchecks where polling the wire would be overkill.

Per the cross-lib spec §H every sibling lib ships this base shape; alicat adds :class:AlicatDeviceSnapshot for media / capabilities.

DiscoveryResult dataclass

DiscoveryResult(
    port,
    address,
    baudrate,
    protocol,
    device_info,
    error,
    elapsed_s,
)

Outcome of a single :func:probe attempt.

Exactly one of :attr:device_info / :attr:error is populated — ok results carry a fully-identified :class:DeviceInfo, failed ones carry the typed :class:AlicatError from the identification pipeline. The :attr:ok convenience lets callers filter without hasattr.

Shape conforms to the cross-lib spec §B: :attr:address is the bus address (Alicat unit_id, "A".."Z"), :attr:protocol names the wire dialect (always :attr:ProtocolKind.ASCII for alicat), and :attr:elapsed_s measures how long the probe took.

ok property

ok

Whether identification completed successfully.

ErrorContext dataclass

ErrorContext(
    command_name=None,
    command_bytes=None,
    raw_response=None,
    unit_id=None,
    port=None,
    protocol=None,
    firmware=None,
    device_kind=None,
    device_media=None,
    command_media=None,
    elapsed_s=None,
    extra=_empty_extra(),
)

Structured context attached to every :class:AlicatError.

Every field is optional so callers can build a context progressively as a command flows through layers (transport → protocol → session → command).

extra accepts any Mapping and is always frozen into a read-only :class:types.MappingProxyType at construction. The shared empty sentinel can therefore never be mutated through error.context.extra[k] = v.

address property

address

Uniform cross-library accessor for the device's bus address.

Alicat's semantically-meaningful native field is :attr:unit_id (the single-letter A..Z polling id), and that stays the source of truth inside the library. :attr:address is the cross-lib spelling consumers can read uniformly across alicatlib / sartoriuslib / watlowlib / nidaqlib.

merged

merged(**updates)

Return a new context with updates overlaid. Unknown keys go to extra.

Source code in src/alicatlib/errors.py
def merged(self, **updates: Any) -> Self:
    """Return a new context with ``updates`` overlaid. Unknown keys go to ``extra``."""
    known: dict[str, Any] = {}
    extra_updates: dict[str, Any] = {}
    for key, value in updates.items():
        if key in _CONTEXT_KNOWN_FIELDS:
            known[key] = value
        else:
            extra_updates[key] = value

    new_extra: Mapping[str, Any] = (
        MappingProxyType({**self.extra, **extra_updates}) if extra_updates else self.extra
    )
    return replace(self, **known, extra=new_extra)

ErrorPolicy

Bases: Enum

How the manager surfaces per-device failures.

Under :attr:RAISE, the manager collects every device's result and — if any call failed — raises an :class:ExceptionGroup containing the per-device exceptions after the task group joins. Under :attr:RETURN, each device produces a :class:DeviceResult and the caller inspects .error per entry.

Design reference: docs/design.md §5.13.

FirmwareVersion dataclass

FirmwareVersion(family, major, minor, raw)

Family-scoped firmware version.

Warning — ordering is intentionally family-gated. __lt__ / __le__ / __gt__ / __ge__ raise :class:TypeError when the operands have different families. __eq__ returns False on family mismatch rather than raising (so sets and dict lookups stay well-behaved). This asymmetry is deliberate: silent cross-family comparison is the worse failure mode.

Canonical gating pattern (see :class:alicatlib.devices.session.Session)::

if cmd.firmware_families and fw.family not in cmd.firmware_families:
    raise AlicatFirmwareError(reason="family_not_supported", ...)
if cmd.min_firmware and fw < cmd.min_firmware:       # safe: same family
    raise AlicatFirmwareError(reason="firmware_too_old", ...)

Attributes:

Name Type Description
family FirmwareFamily

The firmware family (GP / V1_V7 / V8_V9 / V10).

major int

Numeric major; 0 for GP.

minor int

Numeric minor; 0 for GP.

raw str

The original string as reported by the device, preserved for diagnostics (e.g. "GP", "GP-10v05", "10v05").

parse classmethod

parse(software)

Parse software into a :class:FirmwareVersion.

Accepts any of the historical shapes: "GP", "GP-10v05", "1v00", "7v99", "10v05", "10v5", "10.05", or those substrings embedded in a longer response.

GP detection: if the string contains a standalone GP token, the family is :attr:FirmwareFamily.GP, regardless of any trailing Nv<major>v<minor> suffix. major / minor are 0 for GP (the Nv suffix, when present, is purely cosmetic on GP hardware).

Parameters:

Name Type Description Default
software str

Firmware string as reported by the device.

required

Returns:

Type Description
Self

The parsed version.

Raises:

Type Description
AlicatParseError

If software contains neither a GP token nor a recognisable <major>v<minor> / <major>.<minor> pair.

Source code in src/alicatlib/firmware.py
@classmethod
def parse(cls, software: str) -> Self:
    """Parse ``software`` into a :class:`FirmwareVersion`.

    Accepts any of the historical shapes: ``"GP"``, ``"GP-10v05"``,
    ``"1v00"``, ``"7v99"``, ``"10v05"``, ``"10v5"``, ``"10.05"``, or those
    substrings embedded in a longer response.

    GP detection: if the string contains a standalone ``GP`` token, the
    family is :attr:`FirmwareFamily.GP`, regardless of any trailing
    ``Nv<major>v<minor>`` suffix. ``major`` / ``minor`` are ``0`` for GP
    (the Nv suffix, when present, is purely cosmetic on GP hardware).

    Args:
        software: Firmware string as reported by the device.

    Returns:
        The parsed version.

    Raises:
        AlicatParseError: If ``software`` contains neither a ``GP`` token
            nor a recognisable ``<major>v<minor>`` / ``<major>.<minor>`` pair.
    """
    is_gp = _GP_PREFIX_RE.search(software) is not None
    numeric_match = _NUMERIC_RE.search(software)

    if is_gp:
        return cls(family=FirmwareFamily.GP, major=0, minor=0, raw=software)

    if numeric_match is None:
        raise AlicatParseError(
            f"Could not parse firmware from {software!r}",
            field_name="software",
            expected="GP or <major>v<minor>",
            actual=software,
        )

    major = int(numeric_match.group("major"))
    minor = int(numeric_match.group("minor"))
    family = _family_for_major(major)
    return cls(family=family, major=major, minor=minor, raw=software)

Gas

Bases: StrEnum

Gas / gas mixture (Primer Appendix C).

code property

code

Numeric Alicat code (see Appendix A/B/C).

display_name property

display_name

Human-readable name from the primer.

InvalidUnitIdError

InvalidUnitIdError(message='', *, context=None)

Bases: AlicatConfigurationError

A unit ID was not a single letter AZ.

Source code in src/alicatlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

LoopControlVariable

Bases: IntEnum

Statistics a controller's feedback loop can track.

Values are the primer's statistic codes, so LV <value> over the wire is a direct str(member.value). The members mirror the :class:Statistic names they correspond to — LoopControlVariable.MASS_FLOW_SETPT matches :data:Statistic.MASS_FLOW_SETPT (code 37).

statistic property

statistic

The :class:Statistic member that shares this wire code.

Medium

Bases: Flag

What kind of fluid a device moves.

Orthogonal to :class:~alicatlib.devices.kind.DeviceKind (function × form). A :class:Flag rather than a plain :class:Enum so the model can represent devices whose media is ambiguous at the prefix level — either because the hardware truly supports both (some Coriolis lines are reported this way) or because the prefix covers multiple order-time configurations. Gating via bitwise intersection keeps a single code path for every configuration:

.. code:: python

if not (device.info.media & command.media):
    raise AlicatMediumMismatchError(...)

See design §5.9a for the full rationale on modelling medium as a flag (not an enum), why the class tree stays kind-shaped rather than medium-shaped, and why assume_media on the factory replaces rather than unions.

GAS class-attribute instance-attribute

GAS = auto()

Device is configured for gas. Gas-specific commands (GS, ??G*, gas-mix edits) pass the media gate; liquid-specific commands fail pre-I/O.

LIQUID class-attribute instance-attribute

LIQUID = auto()

Device is configured for liquid. Liquid-specific commands (fluid select / list, per-fluid reference density) pass the media gate; gas commands fail pre-I/O.

NONE class-attribute instance-attribute

NONE = 0

No medium resolved. Only valid as an intermediate during identification; a live :class:~alicatlib.devices.models.DeviceInfo always carries at least one of :attr:GAS / :attr:LIQUID.

OverflowPolicy

Bases: Enum

What record() does when the receive-stream buffer is full.

The producer runs on an absolute-target schedule; the consumer drains at its own pace. Slow consumers create backpressure — this knob picks how the recorder responds.

BLOCK class-attribute instance-attribute

BLOCK = 'block'

Await the slow consumer. Default. Silent drops are surprising in a data-acquisition setting, so the recorder blocks the producer rather than quietly discarding samples. The effective sample rate drops to the consumer's drain rate; samples_late accrues once the consumer catches up and the producer can check its schedule.

DROP_NEWEST class-attribute instance-attribute

DROP_NEWEST = 'drop_newest'

Drop the sample that was about to be enqueued. Counted as late.

DROP_OLDEST class-attribute instance-attribute

DROP_OLDEST = 'drop_oldest'

Evict the oldest queued batch, then enqueue. Counted as late.

PollSource

Bases: Protocol

Minimal shape the recorder needs from its dispatcher.

:class:~alicatlib.manager.AlicatManager satisfies this: its poll(names) returns a Mapping[str, DeviceResult[Reading]]. Using a Protocol keeps :func:record testable against a lightweight stub without pulling in the whole manager + transport stack.

poll async

poll(names=None)

Poll every named device (or all under management) concurrently.

Must return a mapping keyed by the manager-assigned device name. Successful polls carry the :class:Reading as .value; failed ones carry the :class:~alicatlib.errors.AlicatError as .error (per :class:~alicatlib.manager.ErrorPolicy.RETURN).

Source code in src/alicatlib/streaming/recorder.py
async def poll(
    self,
    names: Sequence[str] | None = None,
) -> Mapping[str, DeviceResult[Reading]]:
    """Poll every named device (or all under management) concurrently.

    Must return a mapping keyed by the manager-assigned device name.
    Successful polls carry the :class:`Reading` as ``.value``;
    failed ones carry the :class:`~alicatlib.errors.AlicatError` as
    ``.error`` (per :class:`~alicatlib.manager.ErrorPolicy.RETURN`).
    """
    ...

PollSourceAdapter

PollSourceAdapter(name, device)

Wrap one :class:Device as a :class:PollSource for :func:record.

Capa's old _SingleDevicePollSource shim reinvented this; the adapter lives here so the wiring is one line at the call site::

adapter = PollSourceAdapter("fuel", device)
async with record(adapter, rate_hz=10) as recording:
    ...

The names filter is honoured per the cross-lib spec §E: when the caller passes a name set that does not include this device's name, poll() returns an empty mapping rather than polling anyway. The recorder always passes a complete name set in single-device mode so filtering is harmless; the empty-mapping behaviour is the correct cross-lib semantic.

Source code in src/alicatlib/streaming/__init__.py
def __init__(self, name: str, device: Device) -> None:
    self._name = name
    self._device = device

device property

device

The wrapped async :class:Device.

name property

name

The manager-style name this adapter publishes the device under.

poll async

poll(names=None)

Poll the wrapped device and return a single-entry mapping.

Source code in src/alicatlib/streaming/__init__.py
async def poll(
    self,
    names: Iterable[str] | None = None,
) -> Mapping[str, DeviceResult[Reading]]:
    """Poll the wrapped device and return a single-entry mapping."""
    if names is not None and self._name not in set(names):
        return {}
    try:
        reading = await self._device.poll()
    except AlicatError as err:
        failure: DeviceResult[Reading] = DeviceResult(value=None, error=err)
        return {self._name: failure}
    return {self._name: DeviceResult.success(reading)}

ProtocolKind

Bases: Enum

Wire protocol an Alicat library instance speaks.

Alicat devices use a single line-oriented ASCII protocol on every transport (RS-232 / RS-485 / USB-CDC). The enum exists so the cross-lib :class:DiscoveryResult / :class:ErrorContext base fields can carry a typed protocol marker; for alicat the value is always :attr:ASCII (or None when not applicable).

Reading dataclass

Reading(
    unit_id,
    reading_format,
    values,
    values_by_statistic,
    status,
    received_at,
    t_mono_ns,
)

Timing-wrapped :class:ParsedFrame — the public polling result.

Built by :meth:from_parsed. t_mono_ns is for drift analysis and scheduling (never wall-clock); received_at is for data provenance in sinks.

as_dict

as_dict()

Flatten to a JSON/CSV-friendly dict.

Produces {field_name: value, "status": "HLD,OPL", "received_at": iso8601} — status codes collapse into a single comma-joined sorted string (empty when no codes are active) so downstream schema is stable across rows. Callers that need per-code boolean columns should wrap this themselves; the library picks the schema-stable form.

Source code in src/alicatlib/devices/reading.py
def as_dict(self) -> dict[str, float | str | None]:
    """Flatten to a JSON/CSV-friendly dict.

    Produces ``{field_name: value, "status": "HLD,OPL", "received_at": iso8601}``
    — status codes collapse into a single comma-joined sorted string
    (empty when no codes are active) so downstream schema is stable
    across rows. Callers that need per-code boolean columns should
    wrap this themselves; the library picks the schema-stable form.
    """
    result: dict[str, float | str | None] = dict(self.values)
    result["status"] = ",".join(sorted(code.value for code in self.status))
    result["received_at"] = self.received_at.isoformat()
    return result

from_parsed classmethod

from_parsed(
    parsed, *, reading_format, received_at, t_mono_ns
)

Wrap a :class:ParsedFrame with timing captured at read time.

Source code in src/alicatlib/devices/reading.py
@classmethod
def from_parsed(
    cls,
    parsed: ParsedFrame,
    *,
    reading_format: DataFrameFormat,
    received_at: datetime,
    t_mono_ns: int,
) -> Reading:
    """Wrap a :class:`ParsedFrame` with timing captured at read time."""
    return cls(
        unit_id=parsed.unit_id,
        reading_format=reading_format,
        values=parsed.values,
        values_by_statistic=parsed.values_by_statistic,
        status=parsed.status,
        received_at=received_at,
        t_mono_ns=t_mono_ns,
    )

get_float

get_float(name)

Return the float value at name, or None if absent or non-numeric.

This is the "forgiving" accessor used when a downstream consumer wants a numeric value and accepts absence. Text-valued fields and the -- sentinel both yield None; exceptions are never raised. Callers that need strict behaviour should index :attr:values directly.

Source code in src/alicatlib/devices/reading.py
def get_float(self, name: str) -> float | None:
    """Return the float value at ``name``, or ``None`` if absent or non-numeric.

    This is the "forgiving" accessor used when a downstream consumer
    wants a numeric value and accepts absence. Text-valued fields and
    the ``--`` sentinel both yield ``None``; exceptions are never
    raised. Callers that need strict behaviour should index
    :attr:`values` directly.
    """
    value = self.values.get(name)
    return value if isinstance(value, float) else None

get_statistic

get_statistic(stat)

Return the value keyed by :class:Statistic, or None if absent.

Prefer this over :meth:get_float when the caller has a typed :class:Statistic — it's IDE-completable and robust to wire-name renames across firmware versions.

Source code in src/alicatlib/devices/reading.py
def get_statistic(self, stat: Statistic) -> float | str | None:
    """Return the value keyed by :class:`Statistic`, or ``None`` if absent.

    Prefer this over :meth:`get_float` when the caller has a typed
    :class:`Statistic` — it's IDE-completable and robust to wire-name
    renames across firmware versions.
    """
    return self.values_by_statistic.get(stat)

Recording dataclass

Recording(stream, summary, rate_hz)

The object yielded by :func:record's async context manager.

Wraps the live receive stream, the (mutable) :class:AcquisitionSummary the recorder is updating in place, and the rate the recorder is running at. Consumers iterate via async for batch in recording (the instance delegates to :attr:stream), observe progress via :attr:summary, and read :attr:rate_hz for queue-sizing decisions.

Attributes:

Name Type Description
stream AsyncIterator[T]

Async iterator of per-tick batches (or whatever record type the lib emits). Typed via T so consumer code can stay strict.

summary AcquisitionSummary

Live :class:AcquisitionSummary — counters update in place during the run; consumers must not mutate it.

rate_hz float

The rate the recorder is running at, captured at entry. Useful for back-pressure sizing in wrappers.

__aiter__

__aiter__()

Delegate iteration to :attr:stream.

Lets async for batch in recording work without forcing callers to dereference recording.stream themselves — ergonomic for the common case, while keeping the typed attribute around for consumers that want to interleave reads with reading :attr:summary or :attr:rate_hz.

Source code in src/alicatlib/streaming/recorder.py
def __aiter__(self) -> AsyncIterator[T]:
    """Delegate iteration to :attr:`stream`.

    Lets ``async for batch in recording`` work without forcing
    callers to dereference ``recording.stream`` themselves —
    ergonomic for the common case, while keeping the typed
    attribute around for consumers that want to interleave reads
    with reading :attr:`summary` or :attr:`rate_hz`.
    """
    return self.stream.__aiter__()

Sample dataclass

Sample(
    device,
    unit_id,
    t_mono_ns,
    t_utc,
    requested_at,
    received_at,
    latency_s,
    reading,
    t_midpoint_mono_ns=None,
)

One device poll with full timing provenance.

Attributes:

Name Type Description
device str

The manager-assigned name (from AlicatManager.add). Stable downstream identifier that follows the value into sinks.

unit_id str

Bus-level single-letter unit id of the polled device. Kept separate from device so a user renaming the manager key doesn't lose the physical addressing context.

t_mono_ns int

:func:time.monotonic_ns at the acquisition midpoint. Canonical join key per the cross-lib §C contract; never displayed, since the absolute value has no calendar meaning.

t_utc datetime

Wall-clock datetime (UTC, tz-aware) for the acquisition midpoint — (requested_at + received_at) / 2. Use this when aligning Alicat samples against other sensor streams.

t_midpoint_mono_ns int | None

Optional monotonic-ns midpoint of an integration window. None for single polled samples (the common case); populated only when a sample summarises a multi-sample window.

requested_at datetime

Wall-clock datetime (UTC) captured just before the poll bytes leave the host. I/O-boundary provenance — keep alongside t_utc so callers can see the dispatch instant separately from the acquisition midpoint.

received_at datetime

Wall-clock datetime (UTC) captured just after the reply line is read. I/O-boundary provenance.

latency_s float

(received_at - requested_at).total_seconds() — precomputed for convenience; equivalent to received_at - requested_at but avoids the subtraction at every downstream call site.

reading Reading

The :class:Reading returned by the device's poll.

Statistic

Bases: StrEnum

Device statistic code (Primer Appendix A).

code property

code

Numeric Alicat code (see Appendix A/B/C).

display_name property

display_name

Human-readable name from the primer.

Unit

Bases: StrEnum

Engineering unit (Primer Appendix B).

categories property

categories

Primer Appendix B categories this unit belongs to.

code property

code

Numeric Alicat code (see Appendix A/B/C).

display_name property

display_name

Human-readable name from the primer.

UnknownFluidError

UnknownFluidError(value, *, suggestions=(), context=None)

Bases: AlicatConfigurationError

A fluid (working-liquid) name or code did not resolve against the registry.

Source code in src/alicatlib/errors.py
def __init__(
    self,
    value: str | int,
    *,
    suggestions: tuple[str, ...] = (),
    context: ErrorContext | None = None,
) -> None:
    self.value = value
    self.suggestions = suggestions
    hint = f" (did you mean: {', '.join(suggestions)}?)" if suggestions else ""
    super().__init__(f"Unknown fluid: {value!r}{hint}", context=context)

UnknownGasError

UnknownGasError(value, *, suggestions=(), context=None)

Bases: AlicatConfigurationError

A gas name or code did not resolve against the registry.

Source code in src/alicatlib/errors.py
def __init__(
    self,
    value: str | int,
    *,
    suggestions: tuple[str, ...] = (),
    context: ErrorContext | None = None,
) -> None:
    self.value = value
    self.suggestions = suggestions
    hint = f" (did you mean: {', '.join(suggestions)}?)" if suggestions else ""
    super().__init__(f"Unknown gas: {value!r}{hint}", context=context)

UnknownStatisticError

UnknownStatisticError(
    value, *, suggestions=(), context=None
)

Bases: AlicatConfigurationError

A statistic name or code did not resolve against the registry.

Source code in src/alicatlib/errors.py
def __init__(
    self,
    value: str | int,
    *,
    suggestions: tuple[str, ...] = (),
    context: ErrorContext | None = None,
) -> None:
    self.value = value
    self.suggestions = suggestions
    hint = f" (did you mean: {', '.join(suggestions)}?)" if suggestions else ""
    super().__init__(f"Unknown statistic: {value!r}{hint}", context=context)

UnknownUnitError

UnknownUnitError(value, *, suggestions=(), context=None)

Bases: AlicatConfigurationError

A unit name or code did not resolve against the registry.

Source code in src/alicatlib/errors.py
def __init__(
    self,
    value: str | int,
    *,
    suggestions: tuple[str, ...] = (),
    context: ErrorContext | None = None,
) -> None:
    self.value = value
    self.suggestions = suggestions
    hint = f" (did you mean: {', '.join(suggestions)}?)" if suggestions else ""
    super().__init__(f"Unknown unit: {value!r}{hint}", context=context)

config_from_env

config_from_env(prefix=DEFAULT_ENV_PREFIX)

Best-effort env loader.

Only reads well-known keys; unknown keys are ignored. Missing or unparseable values fall back to :class:AlicatConfig's defaults — this function never raises. Use explicit dataclass construction when you need strict validation.

Recognised keys (with prefix="ALICATLIB_"):

  • ALICATLIB_DEFAULT_TIMEOUT_S — float seconds
  • ALICATLIB_MULTILINE_TIMEOUT_S — float seconds
  • ALICATLIB_WRITE_TIMEOUT_S — float seconds
  • ALICATLIB_DEFAULT_BAUDRATE — int
  • ALICATLIB_DRAIN_BEFORE_WRITE"1" / "true" / "yes"
  • ALICATLIB_SAVE_RATE_WARN_PER_MIN — int
  • ALICATLIB_EAGER_TASKS"1" / "true" / "yes"

Parameters:

Name Type Description Default
prefix str

Prefix to prepend to each env key. Defaults to "ALICATLIB_".

DEFAULT_ENV_PREFIX

Returns:

Name Type Description
An AlicatConfig

class:AlicatConfig, falling back to defaults for any missing or

AlicatConfig

unparseable env var.

Source code in src/alicatlib/config.py
def config_from_env(prefix: str = DEFAULT_ENV_PREFIX) -> AlicatConfig:
    """Best-effort env loader.

    Only reads well-known keys; unknown keys are ignored. Missing or
    unparseable values fall back to :class:`AlicatConfig`'s defaults — this
    function never raises. Use explicit dataclass construction when you need
    strict validation.

    Recognised keys (with ``prefix="ALICATLIB_"``):

    - ``ALICATLIB_DEFAULT_TIMEOUT_S`` — float seconds
    - ``ALICATLIB_MULTILINE_TIMEOUT_S`` — float seconds
    - ``ALICATLIB_WRITE_TIMEOUT_S`` — float seconds
    - ``ALICATLIB_DEFAULT_BAUDRATE`` — int
    - ``ALICATLIB_DRAIN_BEFORE_WRITE`` — ``"1"`` / ``"true"`` / ``"yes"``
    - ``ALICATLIB_SAVE_RATE_WARN_PER_MIN`` — int
    - ``ALICATLIB_EAGER_TASKS`` — ``"1"`` / ``"true"`` / ``"yes"``

    Args:
        prefix: Prefix to prepend to each env key. Defaults to ``"ALICATLIB_"``.

    Returns:
        An :class:`AlicatConfig`, falling back to defaults for any missing or
        unparseable env var.
    """
    base = AlicatConfig()

    timeout = _float_env(f"{prefix}DEFAULT_TIMEOUT_S", base.default_timeout_s)
    multiline_timeout = _float_env(f"{prefix}MULTILINE_TIMEOUT_S", base.multiline_timeout_s)
    write_timeout = _float_env(f"{prefix}WRITE_TIMEOUT_S", base.write_timeout_s)
    baudrate = _int_env(f"{prefix}DEFAULT_BAUDRATE", base.default_baudrate)
    drain = _bool_env(f"{prefix}DRAIN_BEFORE_WRITE", base.drain_before_write)
    save_rate = _int_env(f"{prefix}SAVE_RATE_WARN_PER_MIN", base.save_rate_warn_per_min)
    eager = _bool_env(f"{prefix}EAGER_TASKS", base.eager_tasks)

    return AlicatConfig(
        default_timeout_s=timeout,
        multiline_timeout_s=multiline_timeout,
        write_timeout_s=write_timeout,
        default_baudrate=baudrate,
        drain_before_write=drain,
        save_rate_warn_per_min=save_rate,
        eager_tasks=eager,
    )

find_devices async

find_devices(
    ports=None,
    *,
    unit_ids=("A",),
    baudrates=DEFAULT_DISCOVERY_BAUDRATES,
    timeout=_DEFAULT_PROBE_TIMEOUT_S,
    max_concurrency=_DEFAULT_MAX_CONCURRENCY,
    stop_on_first_hit=False,
)

Probe the cross-product ports × unit_ids × baudrates concurrently.

When ports is None the sweep enumerates every port visible via :func:list_serial_ports — convenient for "what's plugged in?" but note that a large fleet plus multiple baudrates multiplies out quickly (10 ports × 2 baud × 5 unit ids = 100 probes).

Concurrency is bounded two ways:

  • max_concurrency via :class:anyio.CapacityLimiter — at most that many serial handles are ever open simultaneously.
  • A per-port :class:anyio.Lock — combinations targeting the same physical port serialise, because a serial port can only be held by one transport at a time. Without this, a sweep that tries two baud rates on one port would see the second probe fail with PortBusyError (or an unrelated transport error) even when the device is present at the correct baud — the two probes simply raced for the same handle.

Lock order is port-first, limiter-second: a probe waiting on its port lock does not consume a limiter slot, which keeps the overall concurrency ceiling meaningful.

When stop_on_first_hit is True, a successful probe at (port, _, baud) records baud as that port's confirmed rate and any pending same-port probe at a different baud is skipped. Same-baud probes at other unit ids still run (important for RS-485 multi-drop buses where several devices share a port at a single baud). Skipped combinations are simply omitted from the result tuple, so the caller can expect len(result) ≤ len(combinations). Default is False — every combination produces a result, in a stable row-major order (ports × unit_ids × baudrates).

The function never raises — every probe's result lands in the returned tuple, ok or not.

Source code in src/alicatlib/devices/discovery.py
async def find_devices(
    ports: Iterable[str] | None = None,
    *,
    unit_ids: Sequence[str] = ("A",),
    baudrates: Sequence[int] = DEFAULT_DISCOVERY_BAUDRATES,
    timeout: float = _DEFAULT_PROBE_TIMEOUT_S,
    max_concurrency: int = _DEFAULT_MAX_CONCURRENCY,
    stop_on_first_hit: bool = False,
) -> tuple[DiscoveryResult, ...]:
    """Probe the cross-product ``ports × unit_ids × baudrates`` concurrently.

    When ``ports`` is ``None`` the sweep enumerates every port visible
    via :func:`list_serial_ports` — convenient for "what's plugged in?"
    but note that a large fleet plus multiple baudrates multiplies out
    quickly (10 ports × 2 baud × 5 unit ids = 100 probes).

    Concurrency is bounded two ways:

    - ``max_concurrency`` via :class:`anyio.CapacityLimiter` — at most
      that many serial handles are ever open simultaneously.
    - A per-port :class:`anyio.Lock` — combinations targeting the same
      physical port serialise, because a serial port can only be held
      by one transport at a time. Without this, a sweep that tries two
      baud rates on one port would see the second probe fail with
      ``PortBusyError`` (or an unrelated transport error) even when the
      device is present at the correct baud — the two probes simply
      raced for the same handle.

    Lock order is port-first, limiter-second: a probe waiting on its
    port lock does not consume a limiter slot, which keeps the overall
    concurrency ceiling meaningful.

    When ``stop_on_first_hit`` is ``True``, a successful probe at
    ``(port, _, baud)`` records ``baud`` as that port's confirmed rate
    and any pending same-port probe at a different baud is skipped.
    Same-baud probes at other unit ids still run (important for RS-485
    multi-drop buses where several devices share a port at a single
    baud). Skipped combinations are simply omitted from the result
    tuple, so the caller can expect ``len(result) ≤ len(combinations)``.
    Default is ``False`` — every combination produces a result, in a
    stable row-major order (``ports`` × ``unit_ids`` × ``baudrates``).

    The function never raises — every probe's result lands in the
    returned tuple, ``ok`` or not.
    """
    if ports is None:
        ports = await list_serial_ports()
    port_list = list(ports)

    combinations = list(product(port_list, unit_ids, baudrates))
    results: list[DiscoveryResult | None] = [None] * len(combinations)
    limiter = anyio.CapacityLimiter(max_concurrency)
    port_locks: dict[str, anyio.Lock] = {port: anyio.Lock() for port in port_list}
    # Per-port confirmed baud — populated on first ok result under
    # ``stop_on_first_hit``. Keyed by port because baud is a bus
    # property, not a per-device one: if one unit id responded at
    # 19200, the bus is at 19200 and other bauds are pointless.
    confirmed_baud: dict[str, int] = {}

    async def _run(index: int, port: str, unit_id: str, baudrate: int) -> None:
        async with port_locks[port]:
            if stop_on_first_hit:
                hit = confirmed_baud.get(port)
                if hit is not None and hit != baudrate:
                    return
            async with limiter:
                result = await probe(
                    port,
                    unit_id=unit_id,
                    baudrate=baudrate,
                    timeout=timeout,
                )
            results[index] = result
            if stop_on_first_hit and result.ok:
                confirmed_baud[port] = baudrate

    async with anyio.create_task_group() as tg:
        for index, (port, unit_id, baudrate) in enumerate(combinations):
            _ = tg.start_soon(_run, index, port, unit_id, baudrate)

    # ``None`` entries are skipped-by-design under ``stop_on_first_hit``;
    # otherwise every slot is populated because the task group only
    # exits after every spawned task returns.
    return tuple(r for r in results if r is not None)

list_serial_ports async

list_serial_ports()

Enumerate serial-port device paths visible to the OS.

Thin wrapper over :func:anyserial.list_serial_ports. Returns device-path strings (/dev/ttyUSB0, COM3 …) in whatever order the backend reports.

The native backend does not require the anyserial[discovery-pyserial] extra; platforms where it misses devices can install that extra and switch by setting the backend="pyserial" kwarg on :func:anyserial.list_serial_ports directly.

Source code in src/alicatlib/devices/discovery.py
async def list_serial_ports() -> list[str]:
    """Enumerate serial-port device paths visible to the OS.

    Thin wrapper over :func:`anyserial.list_serial_ports`. Returns
    device-path strings (``/dev/ttyUSB0``, ``COM3`` …) in whatever order
    the backend reports.

    The native backend does not require the ``anyserial[discovery-pyserial]``
    extra; platforms where it misses devices can install that extra and
    switch by setting the ``backend="pyserial"`` kwarg on
    :func:`anyserial.list_serial_ports` directly.
    """
    return [port.device for port in await anyserial.list_serial_ports()]

open_device async

open_device(
    port,
    *,
    unit_id="A",
    serial=None,
    timeout=0.5,
    recover_from_stream=True,
    model_hint=None,
    assume_capabilities=Capability.NONE,
    assume_media=None,
)

Open and return a fully-identified :class:Device.

Usage forms::

async with await open_device("/dev/ttyUSB0") as device:
    ...

device = await open_device("/dev/ttyUSB0")
try:
    ...
finally:
    await device.close()

The caller's port determines the lifecycle the device takes ownership of:

  • str ("/dev/ttyUSB0" etc.) — build a :class:SerialTransport from serial (or defaults), open it, wrap in an :class:AlicatProtocolClient. The device closes both on :meth:Device.close.
  • :class:Transport — wrap in a new :class:AlicatProtocolClient; the transport's open/close is the caller's responsibility (we never close a transport we didn't open).
  • :class:AlicatProtocolClient — use as-is; neither transport nor client is closed by the device. Stream recovery is skipped because the factory doesn't have access to the underlying transport.

The assume_capabilities override is union'd onto the probed set per design §5.9 — the factory never subtracts flags, because silently masking hardware the device reports as present is exactly the failure mode capability probing exists to avoid.

The assume_media override replaces the prefix-derived media (design §5.9a). Medium answers "how is this specific unit configured," not "what can the hardware do" — the common correction is to narrow from a permissive prefix default to the single medium the unit was actually ordered locked to. The K-family CODA prefixes default to Medium.GAS | Medium.LIQUID because the part-number decoder encodes kind but not medium; other future order-configurable prefixes can adopt the same pattern. A replace policy also future-proofs the model: any new ambiguous prefix drops into :data:MODEL_RULES with the widest default, and users narrow at open time.

Source code in src/alicatlib/devices/factory.py
async def open_device(
    port: str | Transport | AlicatProtocolClient,
    *,
    unit_id: str = "A",
    serial: SerialSettings | None = None,
    timeout: float = 0.5,
    recover_from_stream: bool = True,
    model_hint: str | None = None,
    assume_capabilities: Capability = Capability.NONE,
    assume_media: Medium | None = None,
) -> Device:
    """Open and return a fully-identified :class:`Device`.

    Usage forms::

        async with await open_device("/dev/ttyUSB0") as device:
            ...

        device = await open_device("/dev/ttyUSB0")
        try:
            ...
        finally:
            await device.close()

    The caller's ``port`` determines the lifecycle the device takes
    ownership of:

    - ``str`` (``"/dev/ttyUSB0"`` etc.) — build a
      :class:`SerialTransport` from ``serial`` (or defaults), open it,
      wrap in an :class:`AlicatProtocolClient`. The device closes both
      on :meth:`Device.close`.
    - :class:`Transport` — wrap in a new
      :class:`AlicatProtocolClient`; the transport's open/close is the
      caller's responsibility (we never close a transport we didn't
      open).
    - :class:`AlicatProtocolClient` — use as-is; neither transport nor
      client is closed by the device. Stream recovery is skipped
      because the factory doesn't have access to the underlying
      transport.

    The ``assume_capabilities`` override is union'd onto the probed set
    per design §5.9 — the factory never *subtracts* flags, because
    silently masking hardware the device reports as present is exactly
    the failure mode capability probing exists to avoid.

    The ``assume_media`` override **replaces** the prefix-derived media
    (design §5.9a). Medium answers "how is this specific unit
    configured," not "what can the hardware do" — the common correction
    is to narrow from a permissive prefix default to the single medium
    the unit was actually ordered locked to. The K-family CODA prefixes
    default to ``Medium.GAS | Medium.LIQUID`` because the part-number
    decoder encodes kind but not medium; other future order-configurable
    prefixes can adopt the same pattern. A replace policy also
    future-proofs the model: any new ambiguous prefix drops into
    :data:`MODEL_RULES` with the widest default, and users narrow at
    open time.
    """
    owns_transport = False
    transport: Transport | None = None

    if isinstance(port, AlicatProtocolClient):
        client = port
    elif isinstance(port, str):
        settings = serial if serial is not None else SerialSettings(port=port)
        transport = SerialTransport(settings)
        client = AlicatProtocolClient(transport, default_timeout=timeout)
        owns_transport = True
    else:
        # Duck-typed Transport (Protocol isn't runtime-checkable).
        transport = port
        client = AlicatProtocolClient(transport, default_timeout=timeout)

    try:
        if transport is not None and not transport.is_open:
            await transport.open()
        if recover_from_stream and transport is not None:
            await _recover_from_stream(transport, unit_id)

        info = await identify_device(client, unit_id, model_hint=model_hint)
        probed_caps, probe_report = await probe_capabilities(client, unit_id, info)
        merged_caps = probed_caps | assume_capabilities
        # Medium resolution: prefix-derived by default; ``assume_media``
        # **replaces** (not unions — design §5.9a). Rationale: the common
        # correction is narrowing a permissive prefix default
        # (``Medium.GAS | Medium.LIQUID`` for K-family CODA prefixes and similar
        # whose medium varies by order-time configuration) to the
        # single medium the unit was actually ordered locked to.
        resolved_media = info.media if assume_media is None else assume_media
        info = dataclasses.replace(
            info,
            media=resolved_media,
            capabilities=merged_caps,
            probe_report=probe_report,
        )

        data_frame_format = await _probe_data_frame_format(client, info, unit_id)

        # Per design §10.1: bind per-field engineering units
        # from ``DCU`` where ``??D*`` didn't surface a recognisable label,
        # then populate ``DeviceInfo.full_scale`` from ``FPF`` so
        # setpoint and similar facades can range-check pre-I/O (design
        # §5.20.2). Both probes iterate the data-frame fields, are
        # best-effort per statistic, and never fail the open — a device
        # that rejects one probe just leaves that slot unresolved.
        data_frame_format = await _bind_field_units(
            client,
            info,
            unit_id,
            data_frame_format,
        )
        info = await _probe_full_scales(
            client,
            info,
            unit_id,
            data_frame_format,
        )

        port_label = _resolve_port_label(port, transport)
        session = Session(
            client,
            unit_id=unit_id,
            info=info,
            data_frame_format=data_frame_format,
            port_label=port_label,
        )
        # Pre-cache the loop-control variable for controllers so the
        # first setpoint call can already range-check. Best-effort:
        # firmware without ``LV`` (V1_V7, pre-9v00 V8_V9) leaves the
        # cache ``None`` and setpoint simply skips the range check.
        await _prefetch_loop_control_variable(session, info)

        device_cls = device_class_for(info)
        device = device_cls(
            session,
            owned_transport=transport if owns_transport else None,
        )
    except BaseException:
        # Open failed mid-pipeline; drop any transport we opened so the
        # caller isn't left with a leaked file descriptor.
        if owns_transport and transport is not None and transport.is_open:
            await transport.close()
        raise

    return device

probe async

probe(
    port,
    *,
    unit_id="A",
    baudrate=19200,
    timeout=_DEFAULT_PROBE_TIMEOUT_S,
)

Probe one port at one baudrate for one unit id.

Never raises — every failure becomes :attr:DiscoveryResult.error so that a bulk :func:find_devices call collects a uniform result set. Opening errors (permission denied, port busy, no such device) are caught here the same as identification errors; the caller sees one shape whether the device is offline, misconfigured, or silent.

Source code in src/alicatlib/devices/discovery.py
async def probe(
    port: str,
    *,
    unit_id: str = "A",
    baudrate: int = 19200,
    timeout: float = _DEFAULT_PROBE_TIMEOUT_S,
) -> DiscoveryResult:
    """Probe one port at one baudrate for one unit id.

    Never raises — every failure becomes :attr:`DiscoveryResult.error`
    so that a bulk :func:`find_devices` call collects a uniform result
    set. Opening errors (permission denied, port busy, no such device)
    are caught here the same as identification errors; the caller sees
    one shape whether the device is offline, misconfigured, or silent.
    """
    from alicatlib.protocol import ProtocolKind  # noqa: PLC0415 — avoid import cycle at module load

    started_mono = monotonic()
    settings = SerialSettings(port=port, baudrate=baudrate)
    transport = SerialTransport(settings)
    try:
        await transport.open()
    except AlicatError as err:
        return DiscoveryResult(
            port=port,
            address=unit_id,
            baudrate=baudrate,
            protocol=ProtocolKind.ASCII,
            device_info=None,
            error=err,
            elapsed_s=monotonic() - started_mono,
        )
    try:
        client = AlicatProtocolClient(
            transport,
            default_timeout=timeout,
            # Multiline (``??M*``) deserves a bit more headroom — the
            # factory-default ratio of 2x matches the protocol client
            # itself.
            multiline_timeout=timeout * 2,
        )
        return await _probe_with_client(
            client,
            port=port,
            unit_id=unit_id,
            baudrate=baudrate,
            started_mono=started_mono,
        )
    finally:
        # Best-effort teardown — a close failure here shouldn't hide
        # the identification result the caller came for.
        with contextlib.suppress(AlicatError):
            await transport.close()

record async

record(
    source,
    *,
    rate_hz,
    duration=None,
    names=None,
    overflow=OverflowPolicy.BLOCK,
    buffer_size=64,
)

Record polled samples into a receive stream at an absolute cadence.

Usage::

async with record(mgr, rate_hz=10, duration=60) as recording:
    async for batch in recording.stream:
        process(batch)
    print(recording.summary.samples_emitted)

The CM yields a :class:Recording carrying the async iterator, the live :class:AcquisitionSummary, and the rate. Each batch on the stream is a Mapping[name, Sample] — one entry per device that polled successfully on that tick. Devices whose :class:DeviceResult carries an error are omitted from that batch and logged at WARN.

Parameters:

Name Type Description Default
source PollSource

Any :class:PollSource (typically an :class:~alicatlib.manager.AlicatManager).

required
rate_hz float

Target cadence. Absolute targets are computed target[n] = start + n * (1 / rate_hz). Must be > 0.

required
duration float | None

Total acquisition duration in seconds. None means "until the caller exits the CM".

None
names Sequence[str] | None

Subset of device names to poll per tick. None polls everything the source manages.

None
overflow OverflowPolicy

Backpressure policy when the receive-stream buffer is full. See :class:OverflowPolicy.

BLOCK
buffer_size int

Receive-stream capacity, in per-tick batches. 64 mirrors the design default.

64

Yields:

Name Type Description
A AsyncGenerator[Recording[Mapping[str, Sample]]]

class:Recording of per-tick Mapping[device_name, Sample].

Raises:

Type Description
ValueError

If rate_hz <= 0 or duration <= 0 or buffer_size < 1.

Source code in src/alicatlib/streaming/recorder.py
@asynccontextmanager
async def record(
    source: PollSource,
    *,
    rate_hz: float,
    duration: float | None = None,
    names: Sequence[str] | None = None,
    overflow: OverflowPolicy = OverflowPolicy.BLOCK,
    buffer_size: int = 64,
) -> AsyncGenerator[Recording[Mapping[str, Sample]]]:
    """Record polled samples into a receive stream at an absolute cadence.

    Usage::

        async with record(mgr, rate_hz=10, duration=60) as recording:
            async for batch in recording.stream:
                process(batch)
            print(recording.summary.samples_emitted)

    The CM yields a :class:`Recording` carrying the async iterator, the
    live :class:`AcquisitionSummary`, and the rate. Each batch on the
    stream is a ``Mapping[name, Sample]`` — one entry per device that
    polled successfully on that tick. Devices whose :class:`DeviceResult`
    carries an error are omitted from that batch and logged at WARN.

    Args:
        source: Any :class:`PollSource` (typically an
            :class:`~alicatlib.manager.AlicatManager`).
        rate_hz: Target cadence. Absolute targets are computed
            ``target[n] = start + n * (1 / rate_hz)``. Must be > 0.
        duration: Total acquisition duration in seconds. ``None``
            means "until the caller exits the CM".
        names: Subset of device names to poll per tick. ``None`` polls
            everything the source manages.
        overflow: Backpressure policy when the receive-stream buffer
            is full. See :class:`OverflowPolicy`.
        buffer_size: Receive-stream capacity, in per-tick batches.
            ``64`` mirrors the design default.

    Yields:
        A :class:`Recording` of per-tick ``Mapping[device_name, Sample]``.

    Raises:
        ValueError: If ``rate_hz <= 0`` or ``duration <= 0`` or
            ``buffer_size < 1``.
    """
    if rate_hz <= 0:
        raise ValueError(f"rate_hz must be > 0, got {rate_hz!r}")
    if duration is not None and duration <= 0:
        raise ValueError(f"duration must be > 0 or None, got {duration!r}")
    if buffer_size < 1:
        raise ValueError(f"buffer_size must be >= 1, got {buffer_size!r}")
    if overflow is OverflowPolicy.DROP_OLDEST:
        # Fail at call site (not deep inside the producer task) so the
        # exception type doesn't come back wrapped in an ExceptionGroup.
        raise NotImplementedError(
            "OverflowPolicy.DROP_OLDEST is not yet implemented; use BLOCK "
            "or DROP_NEWEST for now (design §5.14).",
        )

    period = 1.0 / rate_hz
    total_ticks = None if duration is None else max(1, round(duration * rate_hz))

    send_stream, receive_stream = anyio.create_memory_object_stream[Mapping[str, Sample]](
        max_buffer_size=buffer_size,
    )
    started_at = datetime.now(UTC)
    summary = AcquisitionSummary(started_at=started_at)
    recording: Recording[Mapping[str, Sample]] = Recording(
        stream=receive_stream,
        summary=summary,
        rate_hz=rate_hz,
    )

    _logger.info(
        "recorder.start",
        extra={
            "rate_hz": rate_hz,
            "duration_s": duration,
            "overflow": overflow.value,
            "buffer_size": buffer_size,
            "names": list(names) if names is not None else None,
        },
    )

    async with anyio.create_task_group() as tg, receive_stream:
        _ = tg.start_soon(
            _run_producer,
            source,
            send_stream,
            period,
            total_ticks,
            names,
            overflow,
            summary,
        )
        try:
            yield recording
        finally:
            # Cancel + drain before the CM returns — producer lifetime
            # is strictly nested inside the ``async with`` per §5.14.
            tg.cancel_scope.cancel()

    summary.finished_at = datetime.now(UTC)
    _logger.info(
        "recorder.stop",
        extra={
            "samples_emitted": summary.samples_emitted,
            "samples_late": summary.samples_late,
            "max_drift_ms": summary.max_drift_ms,
            "duration_s": (summary.finished_at - started_at).total_seconds(),
        },
    )

sample_to_row

sample_to_row(sample)

Flatten a :class:Sample into a single row dict for tabular sinks.

Schema layout (stable across samples):

  • device — manager-assigned name.
  • unit_id — bus-level single-letter id.
  • t_utc / requested_at / received_at — ISO 8601.
  • t_mono_ns — monotonic acquisition midpoint, ns since boot.
  • latency_s — poll round-trip, seconds.
  • reading fields — everything from :meth:Reading.as_dict except the reading's own received_at (superseded by the sample-level value so all rows have the same received_at semantics).
  • status — comma-joined sorted status codes (empty string when no flags active), from :meth:Reading.as_dict.

The reading's own received_at is dropped so the row's received_at consistently means "recorder-observed reply time" across rows — otherwise multi-device rows would mix reading-level and sample-level timings.

Source code in src/alicatlib/sinks/base.py
def sample_to_row(sample: Sample) -> dict[str, float | str | int | None]:
    """Flatten a :class:`Sample` into a single row dict for tabular sinks.

    Schema layout (stable across samples):

    - ``device`` — manager-assigned name.
    - ``unit_id`` — bus-level single-letter id.
    - ``t_utc`` / ``requested_at`` / ``received_at`` — ISO 8601.
    - ``t_mono_ns`` — monotonic acquisition midpoint, ns since boot.
    - ``latency_s`` — poll round-trip, seconds.
    - *reading fields* — everything from :meth:`Reading.as_dict` *except*
      the reading's own ``received_at`` (superseded by the sample-level
      value so all rows have the same ``received_at`` semantics).
    - ``status`` — comma-joined sorted status codes (empty string when
      no flags active), from :meth:`Reading.as_dict`.

    The reading's own ``received_at`` is dropped so the row's
    ``received_at`` consistently means "recorder-observed reply time"
    across rows — otherwise multi-device rows would mix reading-level
    and sample-level timings.
    """
    row: dict[str, float | str | int | None] = {
        "device": sample.device,
        "unit_id": sample.unit_id,
        "t_utc": sample.t_utc.isoformat(),
        "t_mono_ns": sample.t_mono_ns,
        "requested_at": sample.requested_at.isoformat(),
        "received_at": sample.received_at.isoformat(),
        "latency_s": sample.latency_s,
    }
    reading_dict = sample.reading.as_dict()
    reading_dict.pop("received_at", None)
    # The first ??D* field is the unit-id echo (design §5.6). It
    # duplicates ``sample.unit_id`` verbatim and collides case-
    # insensitively with the ``unit_id`` column in strict backends like
    # SQLite (hardware-validation finding, 2026-04-17: captured parser
    # names the field ``Unit_ID`` while the sample-level column is
    # ``unit_id`` — SQLite treats them as a duplicate column).
    for key in ("Unit_ID", "unit_id"):
        reading_dict.pop(key, None)
    row.update(reading_dict)
    return row

to_pint

to_pint(unit)

Return a pint-compatible unit string, or None if unmapped.

Accepts a :class:Unit member, the raw Alicat label string (case-insensitive fallback), or None. PSIA / PSIG / PSID collapse to "psi" — lossy by design (spec §K).

Source code in src/alicatlib/units.py
def to_pint(unit: Unit | str | None) -> str | None:
    """Return a pint-compatible unit string, or ``None`` if unmapped.

    Accepts a :class:`Unit` member, the raw Alicat label string
    (case-insensitive fallback), or ``None``. PSIA / PSIG / PSID
    collapse to ``"psi"`` — lossy by design (spec §K).
    """
    if unit is None:
        return None
    if isinstance(unit, Unit):
        return _ALICAT_UNIT_TO_PINT.get(unit)
    stripped = unit.strip()
    try:
        return _ALICAT_UNIT_TO_PINT[Unit(stripped)]
    except (ValueError, KeyError):
        pass
    target = stripped.casefold()
    for member, pint_str in _ALICAT_UNIT_TO_PINT.items():
        if member.value.casefold() == target:
            return pint_str
    return None