Quellcode für vyra_base.com.handler.interfaces
"""
Abstract interfaces for VYRA feeder handlers.
This module defines the ``IFeederHandler`` abstract base class that all
communication handlers must implement. Transport handlers (ROS2, Zenoh,
Redis, UDS) create their internal publisher via
:func:`~vyra_base.com.core.factory.InterfaceFactory.create_publisher`,
keeping the CAL layer fully decoupled from handler logic.
Design decisions
----------------
* **Hybrid logging.Handler + async dispatch**: Handlers extend Python's
:class:`logging.Handler` so they can still be attached to Python loggers
(feeder logging pipeline) *and* offer a proper ``async dispatch()`` for
direct message transport — no breaking change for existing code.
* **Protocol identification**: Every handler declares its
:class:`~vyra_base.com.core.types.ProtocolType`, enabling
:class:`~vyra_base.com.handler.factory.HandlerFactory` to select the
correct implementation at runtime.
* **Availability check**: ``is_available()`` lets the feeder probe whether a
handler's transport backend is reachable before flushing buffered messages.
"""
from __future__ import annotations
import abc
import logging
from typing import Any, Optional
[Doku]
class IFeederHandler(logging.Handler, abc.ABC):
"""Abstract base class for all VYRA feeder handlers.
Every concrete handler must implement :meth:`dispatch` (async direct
transport) and :meth:`get_protocol`. The :meth:`emit` method bridges the
Python logging pipeline to :meth:`dispatch` so that handlers can be
attached directly to a Python :class:`logging.Logger`.
Subclasses **must not** change the ``emit`` → ``dispatch`` delegation
unless there is a specific reason (e.g. ``DBCommunicationHandler`` logs
the formatted string rather than the raw message object).
:cvar __handlerName__: Human-readable handler identifier. Set in each
subclass.
:cvar __doc__: One-line description of what this handler transports.
:ivar activated: Whether this handler is active. Inactive handlers are
skipped during dispatch without raising errors.
:vartype activated: bool
"""
__handlerName__: str = "AbstractHandler"
[Doku]
def __init__(self, *args: Any, **kwargs: Any) -> None:
"""Initialise the handler and set :attr:`activated` to ``True``."""
super().__init__(*args, **kwargs)
self.activated: bool = True
[Doku]
def activate(self) -> None:
"""Enable this handler so it participates in message dispatch."""
self.activated = True
[Doku]
def deactivate(self) -> None:
"""Disable this handler so it is silently skipped during dispatch."""
self.activated = False
# ------------------------------------------------------------------
# Abstract interface — every handler must implement these
# ------------------------------------------------------------------
[Doku]
@abc.abstractmethod
async def dispatch(self, message: Any) -> None:
"""Transport *message* over the backing protocol.
This is the primary entry-point for the feeder. The feeder calls
``dispatch`` directly (bypassing the logging pipeline) when it already
has a fully-formed domain object (e.g. ``StateEntry``).
:param message: Domain object or raw value to transport.
:type message: Any
:raises HandlerDispatchError: If the transport operation fails
unrecoverably.
"""
[Doku]
@abc.abstractmethod
def get_protocol(self) -> str:
"""Return the :class:`~vyra_base.com.core.types.ProtocolType` value
this handler uses (e.g. ``"ros2"``, ``"zenoh"``, …).
Using ``str`` as return type keeps the module importable without
the full ``vyra_base.com.core.types`` dependency tree.
:rtype: str
"""
# ------------------------------------------------------------------
# Optional overrides — sensible defaults provided
# ------------------------------------------------------------------
[Doku]
def get_handler_name(self) -> str:
"""Return the handler's human-readable name.
Defaults to :attr:`__handlerName__`.
:rtype: str
"""
return self.__handlerName__
[Doku]
def is_available(self) -> bool:
"""Check whether the backing transport is currently reachable.
The default implementation always returns ``True``. Override in
transport handlers to probe the actual protocol connection.
:rtype: bool
"""
return True
# ------------------------------------------------------------------
# logging.Handler bridge — delegates emit → dispatch
# ------------------------------------------------------------------
[Doku]
def emit(self, record: logging.LogRecord) -> None:
"""Bridge Python logging to :meth:`dispatch`.
Called by the Python logging framework. Schedules :meth:`dispatch`
via :func:`asyncio.get_event_loop` so that the synchronous logging
call does not block the event loop.
If :attr:`activated` is ``False`` the record is silently discarded.
Subclasses that need a different bridging strategy (e.g. formatting
the record as a plain string for a database) should override this
method directly.
:param record: Python :class:`~logging.LogRecord` to emit.
:type record: logging.LogRecord
"""
if not self.activated:
return
import asyncio
try:
loop = asyncio.get_event_loop()
if loop.is_running():
loop.create_task(self.dispatch(record.msg))
else:
loop.run_until_complete(self.dispatch(record.msg))
except Exception:
self.handleError(record)