Quellcode für vyra_base.com.feeder.news_feeder

import asyncio
import logging
import re
import uuid
from datetime import datetime
from typing import Any, Union, Optional, Callable

# Check ROS2 availability
try:
    import rclpy
    _ROS2_AVAILABLE = True
except ImportError:
    _ROS2_AVAILABLE = False

if _ROS2_AVAILABLE:
    from vyra_base.com.transport.t_ros2 import Ros2TypeConverter
    from vyra_base.com.handler.ros2 import ROS2Handler
else:
    Ros2TypeConverter = None
    ROS2Handler = None

from .feeder import BaseFeeder
from .tracking import FeedTracker, ExecutionPoint
from vyra_base.defaults.entries import ModuleEntry, NewsEntry
from vyra_base.defaults.exceptions import FeederException
from vyra_base.com.core.interface_path_registry import get_interface_registry

logger = logging.getLogger(__name__)


[Doku] class NewsFeeder(BaseFeeder): """ Collection of the news messages. :param node: The VyraNode instance associated with this feeder. :type node: Optional[Any] :param module_entity: Module configuration entry. :type module_entity: ModuleEntry :param loggingOn: Flag to enable or disable logging next to feeding. Defaults to False. :type loggingOn: bool, Optional :raises FeederException: If the publisher cannot be created. """
[Doku] def __init__( self, node: Optional[Any], module_entity: ModuleEntry, loggingOn: bool = False ): super().__init__() # _feederName must match the "functionname" in the interface config JSON self._feederName: str = 'NewsFeed' self._doc: str = 'Collect news messages of this module.' self._level: int = logging.INFO self._node: Optional[Any] = node self._module_entity: ModuleEntry = module_entity self._ros2_available: bool = _ROS2_AVAILABLE and node is not None self._loggingOn: bool = loggingOn if self._ros2_available and ROS2Handler: self._handler_classes.append(ROS2Handler)
[Doku] async def start(self) -> None: """Starts the feeder by initializing handlers. Automatically resolves the transport protocol from the module's interface config (reads ``NewsFeed`` entry, picks ``tags``). """ paths = get_interface_registry().get_interface_paths() if paths: self.set_interface_paths(paths) await self.create(loggingOn=self._loggingOn)
[Doku] async def feed(self, newsElement: Union[NewsEntry, str, list]) -> None: """ Feed a news entry to the feeder. Normalises the input to a :class:`~vyra_base.defaults.entries.NewsEntry`, then delegates to :meth:`~BaseFeeder.feed` which calls :meth:`_prepare_entry_for_publish` followed by :class:`~vyra_base.com.feeder.message_mapper.MessageMapper` for protocol-aware conversion. :param newsElement: The news entry to be fed. Can be a string or list of strings to be processed into a NewsEntry, or a NewsEntry object. :type newsElement: Union[NewsEntry, str, list] :raises FeederException: If the type of newsElement is not supported. """ if isinstance(newsElement, str) or isinstance(newsElement, list): newsfeed_entry = self.build_newsfeed(newsElement) elif isinstance(newsElement, NewsEntry): newsfeed_entry = newsElement else: raise FeederException(f"Wrong Type. Expect: NewsEntry, got {type(newsElement)}") await super().feed(newsfeed_entry)
[Doku] def feed_sync(self, msg: Any) -> None: """Sync version of feed() — normalises input to NewsEntry before delegating.""" if isinstance(msg, str) or isinstance(msg, list): msg = self.build_newsfeed(msg) elif not isinstance(msg, NewsEntry): raise FeederException(f"Wrong Type. Expect: NewsEntry or str, got {type(msg)}") return super().feed_sync(msg)
[Doku] def register_news_condition( self, condition_function: Callable[[dict[str, Any]], bool], *, name: Optional[str] = None, execution_point: ExecutionPoint = "ALWAYS", success_message: Optional[str] = None, failure_message: Optional[str] = None, tag: str = "news", ) -> str: """Register a synchronous bool condition for tracked feeder messages.""" return self.register_condition( condition_function, name=name, tag=tag, execution_point=execution_point, success_message=success_message, failure_message=failure_message, )
[Doku] def monitor( self, *, tag: str = "news", label: Optional[str] = None, severity: str = "INFO", entity: Any = None, during_interval_seconds: float = 0.05, ) -> Callable: """Return a runtime-monitoring decorator bound to this feeder.""" return FeedTracker(self).monitor( tag=tag, label=label, severity=severity, entity=entity, during_interval_seconds=during_interval_seconds, )
[Doku] async def evaluate_tracked_conditions(self, context: dict[str, Any]) -> None: """Evaluate registered conditions and publish resulting tracker messages.""" outputs = self.evaluate_conditions(context) entity = context.get("entity") for tag, message in outputs: if tag == "news": await self.feed(message) continue feeder = getattr(entity, f"{tag}_feeder", None) if entity is not None else None if feeder is not None and hasattr(feeder, "feed"): await feeder.feed(message) else: logger.warning("No feeder found for tracked condition tag '%s'", tag)
[Doku] def evaluate_tracked_conditions_sync(self, context: dict[str, Any]) -> None: """Sync variant of :meth:`evaluate_tracked_conditions`.""" outputs = self.evaluate_conditions(context) entity = context.get("entity") for tag, message in outputs: if tag == "news": self.feed_sync(message) continue feeder = getattr(entity, f"{tag}_feeder", None) if entity is not None else None if feeder is None: logger.warning("No feeder found for tracked condition tag '%s'", tag) continue if hasattr(feeder, "feed_sync"): feeder.feed_sync(message) continue if hasattr(feeder, "feed"): loop = asyncio.get_event_loop() if loop.is_running(): loop.create_task(feeder.feed(message)) else: loop.run_until_complete(feeder.feed(message))
def _prepare_entry_for_publish(self, entry: NewsEntry) -> dict: # type: ignore[override] """ Convert a :class:`~vyra_base.defaults.entries.NewsEntry` to a wire-ready dict whose keys match the transport interface field names (``VBASENewsFeed``). """ try: level_name = ( NewsEntry.MESSAGE_LEVEL(entry.level).name if isinstance(entry.level, int) else str(entry.level) ) except (ValueError, KeyError): level_name = str(entry.level) return { 'type': level_name, 'message': str(entry.message) if entry.message else '', 'timestamp': entry.timestamp if entry.timestamp else datetime.now(), 'module_id': str(entry.module_id) if entry.module_id else '', 'module_name': str(entry.module_name) if entry.module_name else '', }
[Doku] def build_newsfeed(self, *args: Any) -> NewsEntry: """ Build a well structured newsfeed entry from plain text and module information. :param args: The arguments to be processed into a news entry. Can be a string or list of strings. :type args: Any :return: A structured news entry containing the message, level, timestamp, UUID, module name, module ID, module template, and type. :rtype: NewsEntry :raises FeederException: If the type of the message level is not valid. """ message: str = (''.join(map(str, args))).split('|')[-1] ansi_escape = re.compile( r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') message = ansi_escape.sub('', message) message = message.replace('__call__', 'Call') message = message.replace('__init__', 'Initialize') message_level = extract_level_from_msg(message) if message_level is None: message_level = NewsEntry.MESSAGE_LEVEL.INFO if message_level not in NewsEntry.MESSAGE_LEVEL: raise FeederException( f"Invalid type for newsfeed entry: {type}. " f"Expected one of {NewsEntry.MESSAGE_LEVEL.__members__}." ) newsfeed_entry: NewsEntry = NewsEntry( level= message_level.value, message= message, timestamp= datetime.now(), uuid= uuid.uuid4(), module_name= self._module_entity.name, module_id= self._module_entity.uuid, ) return newsfeed_entry
[Doku] def extract_level_from_msg(message: str) -> Union[NewsEntry.MESSAGE_LEVEL, None]: """ Extract the message level from a given message string. :param message: The message string from which to extract the level. :type message: str :return: The extracted message level if found, otherwise None. :rtype: Union[NewsEntry.MESSAGE_LEVEL, None] """ match = re.search(r"<([^<>]+)>", message) if match: value = match.group(1) try: return NewsEntry.MESSAGE_LEVEL(value) except ValueError: return None return None