Quellcode für vyra_base.com.handler.uds

"""
Unix Domain Socket (UDS) transport handler for VYRA feeders.

Publishers are created via
:func:`~vyra_base.com.core.factory.InterfaceFactory.create_publisher` using
:attr:`~vyra_base.com.core.types.ProtocolType.UDS`.

UDS is ideal for high-throughput, low-latency intra-host communication
between VYRA modules running in the same Docker network namespace or on
the same physical host.
"""

from __future__ import annotations

import logging
from logging import LogRecord
from typing import Any

from vyra_base.com.handler.communication import CommunicationHandler
from vyra_base.com.core.types import VyraPublisher
from vyra_base.helper.error_handler import ErrorTraceback

logger = logging.getLogger(__name__)


[Doku] class UDSHandler(CommunicationHandler): """Feeder handler that publishes messages over Unix Domain Sockets. Uses the ``t_uds`` CAL provider via a pre-created ``VyraPublisher`` (created by :class:`~vyra_base.com.handler.factory.HandlerFactory`). UDS is a good fit for safety-critical, latency-sensitive events that must not leave the host boundary. :cvar __handlerName__: Identifies this handler as ``"UDSHandler"``. :param initiator: Name of the owning feeder. :type initiator: str :param publisher: Pre-created UDS ``VyraPublisher``. :type publisher: VyraPublisher :param type: Expected message type. :type type: Any """ __handlerName__: str = 'UDSHandler' __doc__: str = 'Unix Domain Socket transport handler'
[Doku] def __init__(self, initiator: str, publisher: VyraPublisher, type: Any): self._initiator = initiator self._publisher: VyraPublisher = publisher self._type: Any = type super().__init__()
# ------------------------------------------------------------------ # IFeederHandler implementation # ------------------------------------------------------------------
[Doku] def get_protocol(self) -> str: """Return ``"uds"``.""" return "uds"
[Doku] def is_available(self) -> bool: """Return ``True`` if the publisher is set.""" return self._publisher is not None
[Doku] async def dispatch(self, message: Any) -> None: """Publish *message* via the UDS CAL publisher. :param message: Domain object to publish. :type message: Any """ try: logger.debug( "%s%s publishing via UDS: %s", self._initiator, UDSHandler.__handlerName__, message, ) await self._publisher.publish(message) finally: ErrorTraceback.check_error_exist()
[Doku] async def emit(self, record: LogRecord) -> None: # type: ignore[override] """Publish the raw ``record.msg`` domain object. :param record: Python log record whose ``msg`` is a domain object. :type record: LogRecord """ await self.dispatch(record.msg)