Quellcode für vyra_base.com.feeder.custom_feeder

"""
Custom feeder base class for VYRA application developers.

``CustomBaseFeeder`` is the recommended starting point when you need to
publish proprietary domain data (e.g. sensor readings, machine parameters,
alarm setpoints) over the VYRA transport layer.

Quick start::

    from vyra_base.com.feeder.custom_feeder import CustomBaseFeeder
    from vyra_base.com.feeder.registry import register_feeder
    from vyra_base.defaults.entries import ModuleEntry

    @register_feeder("TemperatureFeed")
    class TemperatureFeeder(CustomBaseFeeder):
        \"\"\"Publishes temperature readings from a PLC.\"\"\"

        def _build_message(self, raw: float) -> dict:
            return {"value": raw, "unit": "°C", "sensor": self._sensor_id}

        def _validate(self, raw: float) -> bool:
            return -50.0 <= raw <= 300.0

    # Usage in your module application:
    feeder = TemperatureFeeder(
        feeder_name="TemperatureFeed",
        module_entity=my_module_entity,
    )
    await feeder.start()
    await feeder.feed(87.3)     # publishes {"value": 87.3, "unit": "°C", "sensor": "PT-01"}

The protocol is resolved automatically from the interface config JSON — add
a ``"type": "publisher", "functionname": "TemperatureFeed"`` entry with the
appropriate ``tags`` (e.g. ``["zenoh"]``).
"""

from __future__ import annotations

import logging
from typing import Any, Optional, Callable

from vyra_base.com.feeder.feeder import BaseFeeder
from vyra_base.com.feeder.tracking import FeedTracker
from vyra_base.defaults.entries import ModuleEntry
from vyra_base.defaults.exceptions import FeederException
from vyra_base.com.core.interface_path_registry import InterfacePathRegistry, get_interface_registry

logger = logging.getLogger(__name__)


[Doku] class CustomBaseFeeder(BaseFeeder): """Base class for user-defined VYRA feeders. Extends :class:`~vyra_base.com.feeder.feeder.BaseFeeder` with two hooks that subclasses can override to implement custom message mapping and validation without touching the transport layer. :param feeder_name: The feeder's name. **Must** match the ``functionname`` in the module's interface config JSON so that the protocol resolver can find the correct transport entry. :type feeder_name: str :param module_entity: Module configuration (name, uuid, …). :type module_entity: ModuleEntry :param message_type: Optional message class forwarded to the publisher (e.g. a protobuf class). Pass ``None`` for dict-based protocols. :type message_type: Any, optional :param node: ROS2 node (required only when using the ROS2 transport). :type node: Any, optional :param loggingOn: If ``True``, every :meth:`feed` call is also logged via the standard Python logger. :type loggingOn: bool, optional """
[Doku] def __init__( self, feeder_name: str, module_entity: ModuleEntry, message_type: Any = None, node: Optional[Any] = None, loggingOn: bool = False, ): super().__init__() self._feederName: str = feeder_name self._module_entity: ModuleEntry = module_entity self._type: Any = message_type self._node: Optional[Any] = node self._loggingOn: bool = loggingOn
# ------------------------------------------------------------------ # Hooks — override in subclasses # ------------------------------------------------------------------ def _build_message(self, raw: Any) -> Any: """Transform *raw* into the domain object expected by the publisher. The default implementation returns *raw* unchanged. Override to map your input type to e.g. a protobuf message, a dataclass, or a dict. :param raw: The raw value passed to :meth:`feed`. :type raw: Any :return: The domain object to publish. :rtype: Any """ return raw def _validate(self, raw: Any) -> bool: """Validate *raw* before publishing. The default implementation always returns ``True``. Override to add range checks, type checks, or business-rule validation. If this method returns ``False`` the message is **not** published and a warning is logged. :param raw: The raw value passed to :meth:`feed`. :type raw: Any :return: ``True`` if the value should be published. :rtype: bool """ return True # ------------------------------------------------------------------ # IFeeder.feed override — applies hooks before publishing # ------------------------------------------------------------------
[Doku] async def feed(self, raw: Any) -> None: # type: ignore[override] """Validate, transform, and publish *raw*. 1. Call :meth:`_validate` — skip on ``False``. 2. Call :meth:`_build_message` — transform to domain object. 3. Delegate to :meth:`~vyra_base.com.feeder.feeder.BaseFeeder.feed`. :param raw: The raw value to validate, transform, and publish. :type raw: Any """ if not self._validate(raw): logger.warning( "⚠️ %s: validation failed for message '%s', skipping.", self._feederName, raw ) return try: message = self._build_message(raw) except Exception as exc: logger.error( "❌ %s: _build_message failed: %s", self._feederName, exc ) self._error_count += 1 return await super().feed(message)
# ------------------------------------------------------------------ # start() — resolve interface paths from registry # ------------------------------------------------------------------
[Doku] async def start(self) -> None: """Resolve protocol from interface config and start the feeder.""" paths = get_interface_registry().get_interface_paths() if paths: self.set_interface_paths(paths) await self.create(loggingOn=self._loggingOn)
[Doku] def monitor( self, *, tag: Optional[str] = None, label: Optional[str] = None, severity: str = "INFO", entity: Any = None, during_interval_seconds: float = 0.05, ) -> Callable: """Return a runtime-monitoring decorator bound to this feeder.""" resolved_tag = tag or self._feederName.replace("Feed", "").lower() return FeedTracker(self).monitor( tag=resolved_tag, label=label, severity=severity, entity=entity, during_interval_seconds=during_interval_seconds, )