Quellcode für vyra_base.com.feeder.feeder

from __future__ import annotations

import asyncio
import datetime
import logging
from collections import deque
from typing import Any, Type, Optional, Sequence, Callable
from pathlib import Path

# Check ROS2 availability
try:
    import rclpy
    _ROS2_AVAILABLE = True
except ImportError:
    _ROS2_AVAILABLE = False

if _ROS2_AVAILABLE:
    from rclpy.qos import (
        QoSProfile,
        QoSHistoryPolicy,
        QoSReliabilityPolicy,
        QoSDurabilityPolicy,
    )
    from vyra_base.com import InterfaceFactory, ProtocolType

from vyra_base.com.feeder.interfaces import IFeeder
from vyra_base.com.handler.communication import CommunicationHandler
from vyra_base.defaults.exceptions import FeederException
from vyra_base.helper.logging_config import VyraLoggingConfig
from vyra_base.com.feeder.config_resolver import FeederConfigResolver
from vyra_base.com.core.types import ProtocolType as PT
from vyra_base.com.core.types import VyraPublisher
from vyra_base.com.core.interface_loader import InterfaceLoader
from vyra_base.com.feeder.message_mapper import MessageMapper
from vyra_base.com.feeder.tracking import (
    FeedConditionRegistry,
    FeedDebouncer,
    build_message_signature,
    ExecutionPoint,
)

logger = logging.getLogger(__name__)


[Doku] class BaseFeeder(IFeeder): """ Concrete base class for all VYRA feeders. Implements :class:`~vyra_base.com.feeder.interfaces.IFeeder` and provides: * **Protocol auto-resolution** via :class:`~vyra_base.com.feeder.config_resolver.FeederConfigResolver` — the transport protocol is read from the module's interface config JSON (``functionname`` matched against ``feeder_name``). * **Pre-start buffer** — messages fed before :meth:`start` are queued and flushed automatically. * **Metrics** — ``feed_count``, ``error_count``, ``last_feed_at``. * **Health check** — :meth:`is_alive` probes the backing publisher. * **Retry policy** — configurable ``max_retries`` and ``retry_delay``. Abstract class — subclasses set ``_feederName``, ``_type``, optionally ``_interface_paths``. """
[Doku] def __init__(self) -> None: """ Initialize the BaseFeeder. """ # QoS configuration (only if ROS2 available) if _ROS2_AVAILABLE: self._qos = QoSProfile( history=QoSHistoryPolicy.KEEP_LAST, depth=10, reliability=QoSReliabilityPolicy.RELIABLE, durability=QoSDurabilityPolicy.TRANSIENT_LOCAL ) else: self._qos = None self._feedBaseName: str = 'vyraFeeder' self._feederName: str = 'AbstractFeeder' self._doc: str = 'Abstract class for all feeder classes.' self._level: int = logging.INFO self._ros2_available: bool = _ROS2_AVAILABLE # Interface paths for protocol resolution (set by subclasses or entity) self._interface_paths: list[str] = [] self._handler_classes: list[Type[CommunicationHandler]] = [] self._handler: list[Type[CommunicationHandler] | CommunicationHandler] = [] # self._feeder: logging.Logger self._loggingOn: bool = False self._node: Optional[Any] = None self._type: Any = None # Resolved message/interface type (loaded from InterfaceLoader or set from self._type) self._msg_type: Optional[Any] = None self._publisher: Optional[VyraPublisher] = None self._feedbuffer: deque = deque(maxlen=20) # Resolved protocol (set after start()) self._resolved_protocol: Optional[str] = None self._is_ready: bool = False # Metrics self._feed_count: int = 0 self._error_count: int = 0 self._last_feed_at: Optional[datetime.datetime] = None # Retry policy self._max_retries: int = 3 self._retry_delay: float = 1.0 # Feed monitoring helpers self._debouncer = FeedDebouncer(window_seconds=5.0) self._debounced_duplicate_count: int = 0 self._conditions = FeedConditionRegistry()
# ------------------------------------------------------------------ # IFeeder implementation # ------------------------------------------------------------------
[Doku] def get_feeder_name(self) -> str: """Return the feeder name (= ``functionname`` in interface config).""" return self._feederName
[Doku] def get_protocol(self) -> Optional[str]: """Return the resolved transport protocol, or ``None`` before start.""" return self._resolved_protocol
[Doku] def is_alive(self) -> bool: """Return ``True`` if the publisher is set and available.""" if not self._publisher: return False if hasattr(self._publisher, 'is_connected'): return bool(self._publisher.is_connected()) return True
[Doku] def is_ready(self) -> bool: """Return ``True`` after :meth:`start` has completed successfully.""" return self._is_ready
[Doku] def get_buffer(self) -> deque: """Return the pre-start message buffer.""" return self._feedbuffer
@property def feed_count(self) -> int: """Number of successfully published messages.""" return self._feed_count @property def error_count(self) -> int: """Number of publish errors.""" return self._error_count @property def last_feed_at(self) -> Optional[datetime.datetime]: """Timestamp of the last successful :meth:`feed` call.""" return self._last_feed_at @property def debounced_duplicate_count(self) -> int: """Number of duplicate messages suppressed by debouncing.""" return self._debounced_duplicate_count
[Doku] def register_condition( self, condition_function: Callable[[dict[str, Any]], bool], *, name: Optional[str] = None, tag: str = "news", execution_point: ExecutionPoint = "ALWAYS", success_message: Optional[str] = None, failure_message: Optional[str] = None, ) -> str: """Register a synchronous bool-returning condition callback.""" return self._conditions.register( condition_function, name=name, tag=tag, execution_point=execution_point, success_message=success_message, failure_message=failure_message, )
[Doku] def unregister_condition(self, name: str) -> bool: """Remove a previously registered condition callback by name.""" return self._conditions.unregister(name)
[Doku] def evaluate_conditions( self, context: dict[str, Any], *, rule_names: Optional[list[str]] = None, tags: Optional[list[str]] = None, execution_point: Optional[ExecutionPoint] = None, ) -> list[tuple[str, str]]: """Evaluate registered conditions and return resulting messages. By default all registered conditions are evaluated. Use ``rule_names`` and/or ``tags`` to limit evaluation to a subset. """ return self._conditions.evaluate( context, rule_names=rule_names, tags=tags, execution_point=execution_point, )
# ------------------------------------------------------------------ # Initialisation helpers # ------------------------------------------------------------------
[Doku] def set_interface_paths(self, paths: Sequence[str]|list[Path]) -> None: """Override the interface paths used for protocol resolution. Called by a :class:`~vyra_base.core.entity.VyraEntity` after constructing the feeder to provide module-specific config paths. :param paths: List of directory or JSON file paths. :type paths: Sequence[str] | list[Path] """ if isinstance(paths, Sequence): self._interface_paths = [str(p) for p in paths] else: self._interface_paths = [str(paths)]
[Doku] async def create(self, loggingOn: bool = False) -> None: """Create the publisher using protocol resolved from interface config. Resolution order: 1. ``FeederConfigResolver.resolve(feeder_name, interface_paths)`` — reads the module's JSON config, maps ``tags`` to a protocol. 2. If no config found (or ``interface_paths`` empty): fall back to ``InterfaceFactory.create_publisher`` with the default chain ``[ZENOH, ROS2, REDIS, UDS]``. :raises FeederException: If no protocol is available at all. :param loggingOn: Emit feeder messages also to the base logger. :type loggingOn: bool """ self._loggingOn = loggingOn # ── Protocol resolution via interface config ────────────────── resolved_protocols = None if self._interface_paths: try: result = FeederConfigResolver.resolve( feeder_name=self._feederName, interface_paths=self._interface_paths, ) if result is not None: self._resolved_protocol = result.protocol try: resolved_protocols = [PT(result.protocol)] except ValueError: logger.warning( "⚠️ Protocol '%s' from config not a valid ProtocolType, " "using fallback chain.", result.protocol ) except Exception as exc: logger.warning("⚠️ FeederConfigResolver error for '%s': %s — using fallback.", self._feederName, exc) # ── Publisher creation ──────────────────────────────────────── try: pub_kwargs: dict[str, Any] = { "name": self._feederName, "namespace": "feeder", } if getattr(self, '_type', None) is not None: pub_kwargs["message_type"] = self._type if resolved_protocols: pub_kwargs["protocols"] = resolved_protocols else: # Default fallback chain if self._ros2_available: pub_kwargs["protocols"] = [PT.ROS2, PT.ZENOH, PT.REDIS, PT.UDS] else: pub_kwargs["protocols"] = [PT.ZENOH, PT.REDIS, PT.UDS] if self._ros2_available and self._node: pub_kwargs["node"] = self._node if self._qos: pub_kwargs["qos_profile"] = self._qos self._publisher = await InterfaceFactory.create_publisher(**pub_kwargs) if self._resolved_protocol is None and hasattr(self._publisher, 'protocol'): self._resolved_protocol = getattr(self._publisher.protocol, 'value', str(self._publisher.protocol)) logger.info("✅ %s using protocol '%s'", self._feederName, self._resolved_protocol) except Exception as exc_primary: # Last-resort legacy retry logger.warning("⚠️ create_publisher failed (%s), retrying with legacy kwargs.", exc_primary) try: fallback = [PT.ROS2, PT.REDIS] if self._ros2_available else [PT.REDIS] fallback_kwargs: dict[str, Any] = { "name": self._feederName, "protocols": fallback, "namespace": "feeder", "is_publisher": True, } if getattr(self, '_type', None) is not None: fallback_kwargs["message_type"] = self._type if self._ros2_available and self._node: fallback_kwargs["node"] = self._node fallback_kwargs["qos_profile"] = self._qos self._publisher = await InterfaceFactory.create_publisher( **fallback_kwargs ) except Exception as exc_fallback: logger.error("❌ Failed to create feeder publisher for '%s': %s", self._feederName, exc_fallback) raise FeederException( f"Could not create publisher for {self._feederName}. " f"Install ROS2 or configure Zenoh/Redis." ) from exc_fallback if self._publisher is None: raise FeederException( f"No communication protocol available for {self._feederName}. " f"Install ROS2, Zenoh, or Redis." ) # Auto-load interface type via InterfaceLoader (after protocol is known) self._msg_type = await self._load_msg_type() # DEPRECATED # self.create_feeder() # Attach ROS2 handlers only for ROS2-backed publishers (legacy support) if self._ros2_available and hasattr(self._publisher, 'publisher_server'): for handler_class in self._handler_classes: if not isinstance(handler_class, type) or not issubclass( handler_class, CommunicationHandler): raise TypeError("Handler class must be a subclass of CommunicationHandler") handler = handler_class( initiator=self._feederName, publisher=self._publisher, type=self._publisher.message_type ) self.add_handler(handler) else: logger.debug("⚠️ Skipping ROS2 handlers for %s (non-ROS2 publisher)", self._feederName) # Flush buffered messages self._is_ready = True await self._flush_buffer()
# ------------------------------------------------------------------ # Interface type loading # ------------------------------------------------------------------ async def _load_msg_type(self) -> Optional[Any]: """ Resolve the transport message type for this feeder. Priority: 1. Use :class:`~vyra_base.com.core.interface_loader.InterfaceLoader` to look up the interface for ``self._feederName`` with the resolved protocol. 2. Fall back to ``self._type`` (explicit type passed in ``__init__``). Returns: ROS2 message class, proto ``*_pb2`` module, or ``None``. """ protocol = self._resolved_protocol or "" # Try InterfaceLoader first if self._interface_paths or self._feederName: try: loader = InterfaceLoader( interface_paths=[Path(p) for p in self._interface_paths] if self._interface_paths else None ) loaded = loader.get_interface_for_function( self._feederName, protocol=protocol ) if loaded is not None: logger.info( f"✅ InterfaceLoader resolved msg type for '{self._feederName}' " f"via protocol '{protocol}': {loaded}" ) return loaded except Exception as exc: logger.debug( f"⚠️ InterfaceLoader failed for '{self._feederName}': {exc}" ) # Fallback: use explicit type from __init__ if self._type is not None: logger.debug( f"Using explicit _type for '{self._feederName}': {self._type}" ) return self._type # ------------------------------------------------------------------ # Conversion hook (overridden by concrete feeders) # ------------------------------------------------------------------ def _prepare_entry_for_publish(self, entry: Any) -> Any: """ Convert a Python entry object to a wire-ready dict (or pass through). Concrete feeders override this to apply feeder-specific field name mappings and value transformations. The returned value is then further converted by :class:`~vyra_base.com.feeder.message_mapper.MessageMapper` into the protocol-native message type inside :meth:`_publish`. The default implementation returns *entry* unchanged. """ return entry # ------------------------------------------------------------------ # Buffer flush # ------------------------------------------------------------------ async def _flush_buffer(self) -> None: """Publish all messages that arrived before :meth:`start`.""" flushed = 0 while self._feedbuffer: msg = self._feedbuffer.popleft() try: await self._publish(msg) flushed += 1 except Exception as exc: logger.warning("⚠️ Buffer flush failed for message in %s: %s", self._feederName, exc) if flushed: logger.debug("🔁 %s flushed %d buffered message(s).", self._feederName, flushed)
[Doku] async def start(self) -> None: """Start the feeder (implements :class:`IFeeder`). Subclasses may override to add extra initialisation before calling ``await super().start()``. """ await self.create(loggingOn=self._loggingOn)
# ------------------------------------------------------------------ # Feed / publish path # ------------------------------------------------------------------
[Doku] async def feed(self, msg: Any) -> None: """Enqueue *msg* for publishing (implements :class:`IFeeder`). If the feeder is not yet ready the message is buffered. Otherwise the publish path is executed synchronously (event-loop aware). :param msg: Message to publish. :type msg: Any """ debounce_hit = self._debouncer.evaluate(build_message_signature(msg)) if not debounce_hit.should_publish: self._debounced_duplicate_count += 1 logger.debug( "🔁 Debounced duplicate in %s (count=%d)", self._feederName, debounce_hit.duplicate_count, ) return if not self._is_ready: self._feedbuffer.append(msg) logger.debug("⏳ %s buffering message (not started yet).", self._feederName) return # if hasattr(self, '_feeder'): # self._feeder.log(self._level, msg) if self._loggingOn: logger.info("🗞 Feeder %s fed: %s", self._feederName, msg) if not self._publisher: logger.error("❌ No publisher for feeder %s", self._feederName) self._error_count += 1 return try: await self._publish(msg) except Exception as exc: logger.error("❌ Feed failed in %s: %s", self._feederName, exc) self._error_count += 1
[Doku] def feed_sync(self, msg: Any) -> None: """Sync version of :meth:`feed` for use in sync contexts.""" debounce_hit = self._debouncer.evaluate(build_message_signature(msg)) if not debounce_hit.should_publish: self._debounced_duplicate_count += 1 logger.debug( "🔁 Debounced duplicate in %s (count=%d)", self._feederName, debounce_hit.duplicate_count, ) return if not self._is_ready: self._feedbuffer.append(msg) logger.debug("⏳ %s buffering message (not started yet).", self._feederName) return # if hasattr(self, '_feeder'): # self._feeder.log(self._level, msg) if self._loggingOn: logger.info("🗞 Feeder %s fed: %s", self._feederName, msg) if not self._publisher: logger.error("❌ No publisher for feeder %s", self._feederName) self._error_count += 1 return loop = asyncio.get_event_loop() if loop.is_running(): loop.create_task(self._publish(msg)) else: loop.run_until_complete(self._publish(msg))
async def _publish(self, msg: Any) -> None: """ Internal coroutine — convert entry, then publish with retry. Conversion order: 1. ``_prepare_entry_for_publish(msg)`` — feeder-specific field mapping 2. :meth:`~vyra_base.com.feeder.message_mapper.MessageMapper.to_transport_msg` — route to ROS2 / Protobuf / passthrough based on resolved protocol """ last_exc: Optional[Exception] = None if not self._publisher: raise FeederException(f"No publisher available for {self._feederName}") # Step 1: feeder-specific pre-processing wire_data = self._prepare_entry_for_publish(msg) # Step 2: convert to transport-native message try: transport_msg = MessageMapper.to_transport_msg( wire_data if isinstance(wire_data, dict) else wire_data, protocol=self._resolved_protocol or "", msg_type=self._msg_type, ) except Exception as exc: logger.debug( f"⚠️ MessageMapper conversion skipped for '{self._feederName}': {exc}" ) transport_msg = wire_data for attempt in range(1, self._max_retries + 1): try: if hasattr(self._publisher, 'publish'): await self._publisher.publish(transport_msg) else: raise FeederException( f"Publisher has no publish() method: {type(self._publisher)}" ) self._feed_count += 1 self._last_feed_at = datetime.datetime.utcnow() # Dispatch to all attached handlers (fire-and-forget, parallel) self._dispatch_to_handlers(wire_data) return except Exception as exc: last_exc = exc if attempt < self._max_retries: await asyncio.sleep(self._retry_delay) self._error_count += 1 logger.error("❌ %s publish failed after %d attempt(s): %s", self._feederName, self._max_retries, last_exc) # Dispatch to handlers even on publish failure so DB/logging handlers # record the event regardless of transport availability. self._dispatch_to_handlers(wire_data) # ------------------------------------------------------------------ # Feeder logger setup # ------------------------------------------------------------------ # def create_feeder(self) -> None: # """Set up the Python logger for this feeder.""" # feed_logger_name = f"{self._feedBaseName}.{self._feederName}" # self._feeder = logging.getLogger(feed_logger_name) # self._feeder.setLevel(self._level) # if self._loggingOn: # vyra_logger = logging.getLogger("vyra_base") # for handler in vyra_logger.handlers: # if handler not in self._feeder.handlers: # self._feeder.addHandler(handler)
[Doku] def add_handler(self, handler: CommunicationHandler) -> bool: """Attach a :class:`~vyra_base.com.handler.communication.CommunicationHandler`. :param handler: Handler instance to attach. :type handler: CommunicationHandler :return: ``True`` if added, ``False`` if already present. :rtype: bool """ if not isinstance(handler, CommunicationHandler): raise TypeError(f"Expected CommunicationHandler, got {type(handler)}") if handler in self._handler: return False # DEPRECATED # self._feeder.addHandler(handler) self._handler.append(handler) return True
def _dispatch_to_handlers(self, wire_data: Any) -> None: """Schedule :meth:`~CommunicationHandler.dispatch` on every active handler. Each active handler is dispatched as a fire-and-forget :class:`asyncio.Task` so that slow/blocking handlers cannot stall the feeder's publish path. Inactive handlers (``handler.activated == False``) are silently skipped. This method is called from :meth:`_publish` both on successful publish *and* on publish failure, ensuring that DB/logging handlers always record the event regardless of transport availability. :param wire_data: The pre-processed message dict (output of :meth:`_prepare_entry_for_publish`) passed verbatim to every handler. :type wire_data: Any """ if not self._handler: return try: loop = asyncio.get_event_loop() except RuntimeError: logger.warning("⚠️ _dispatch_to_handlers: no event loop available") return for handler in self._handler: if not getattr(handler, 'activated', True): continue if loop.is_running(): loop.create_task(handler.dispatch(wire_data)) else: loop.run_until_complete(handler.dispatch(wire_data))
[Doku] def add_handler_class(self, handler_class: Type[CommunicationHandler]) -> None: """Register a handler class to be instantiated during :meth:`create`. :param handler_class: Handler class (must subclass CommunicationHandler). :type handler_class: Type[CommunicationHandler] """ if not isinstance(handler_class, type) or not issubclass( handler_class, CommunicationHandler): raise TypeError("Handler class must be a subclass of CommunicationHandler") if handler_class not in self._handler_classes: self._handler_classes.append(handler_class)