Quellcode für vyra_base.com.handler.zenoh
"""
Zenoh transport handler for VYRA feeders.
Publishers are created via
:func:`~vyra_base.com.core.factory.InterfaceFactory.create_publisher` using
:attr:`~vyra_base.com.core.types.ProtocolType.ZENOH`. This keeps the
Zenoh session management inside the CAL layer (``t_zenoh`` provider) and
decoupled from the handler.
"""
from __future__ import annotations
import logging
from logging import LogRecord
from typing import Any
from vyra_base.com.handler.communication import CommunicationHandler
from vyra_base.com.core.types import VyraPublisher
from vyra_base.helper.error_handler import ErrorTraceback
logger = logging.getLogger(__name__)
[Doku]
class ZenohHandler(CommunicationHandler):
"""Feeder handler that publishes messages over the Zenoh protocol.
The internal ``VyraPublisher`` is created externally (by
:class:`~vyra_base.com.handler.factory.HandlerFactory`) using
:func:`~vyra_base.com.core.factory.InterfaceFactory.create_publisher`
with ``protocols=[ProtocolType.ZENOH]``. This handler only wraps the
publisher to bridge the feeder / logging interface.
:cvar __handlerName__: Identifies this handler as ``"ZenohHandler"``.
:param initiator: Name of the feeder that owns this handler.
:type initiator: str
:param publisher: Pre-created Zenoh ``VyraPublisher``.
:type publisher: VyraPublisher
:param type: Expected message type (used for logging / validation).
:type type: Any
"""
__handlerName__: str = 'ZenohHandler'
__doc__: str = 'Zenoh transport handler'
[Doku]
def __init__(self, initiator: str, publisher: VyraPublisher, type: Any):
self._initiator = initiator
self._publisher: VyraPublisher = publisher
self._type: Any = type
super().__init__()
# ------------------------------------------------------------------
# IFeederHandler implementation
# ------------------------------------------------------------------
[Doku]
def get_protocol(self) -> str:
"""Return ``"zenoh"``."""
return "zenoh"
[Doku]
def is_available(self) -> bool:
"""Return ``True`` if the publisher is set and connected."""
return self._publisher is not None
[Doku]
async def dispatch(self, message: Any) -> None:
"""Publish *message* via the Zenoh CAL publisher.
:param message: Domain object to publish. Must be compatible with
the message type configured for the topic.
:type message: Any
"""
try:
logger.debug(
"%s → %s publishing via Zenoh: %s",
self._initiator,
ZenohHandler.__handlerName__,
message,
)
await self._publisher.publish(message)
finally:
ErrorTraceback.check_error_exist()
# ------------------------------------------------------------------
# logging.Handler override — emit record.msg directly
# ------------------------------------------------------------------
[Doku]
async def emit(self, record: LogRecord) -> None: # type: ignore[override]
"""Publish the raw ``record.msg`` object (not the formatted string).
Zenoh topics carry typed domain objects, not plain log strings.
:param record: Python log record whose ``msg`` is a domain object.
:type record: LogRecord
"""
await self.dispatch(record.msg)