import asyncio
import logging
import re
import uuid
from datetime import datetime
from typing import Any, Union, Optional, Callable
# Check ROS2 availability
try:
import rclpy
_ROS2_AVAILABLE = True
except ImportError:
_ROS2_AVAILABLE = False
if _ROS2_AVAILABLE:
from vyra_base.com.transport.t_ros2 import Ros2TypeConverter
from vyra_base.com.handler.ros2 import ROS2Handler
else:
Ros2TypeConverter = None
ROS2Handler = None
from .feeder import BaseFeeder
from .tracking import FeedTracker, ExecutionPoint
from vyra_base.defaults.entries import ModuleEntry, NewsEntry
from vyra_base.defaults.exceptions import FeederException
from vyra_base.com.core.interface_path_registry import get_interface_registry
logger = logging.getLogger(__name__)
[docs]
class NewsFeeder(BaseFeeder):
"""
Collection of the news messages.
:param node: The VyraNode instance associated with this feeder.
:type node: Optional[Any]
:param module_entity: Module configuration entry.
:type module_entity: ModuleEntry
:param loggingOn: Flag to enable or disable logging next to feeding. Defaults to False.
:type loggingOn: bool, Optional
:raises FeederException: If the publisher cannot be created.
"""
[docs]
def __init__(
self,
node: Optional[Any],
module_entity: ModuleEntry,
loggingOn: bool = False
):
super().__init__()
# _feederName must match the "functionname" in the interface config JSON
self._feederName: str = 'NewsFeed'
self._doc: str = 'Collect news messages of this module.'
self._level: int = logging.INFO
self._node: Optional[Any] = node
self._module_entity: ModuleEntry = module_entity
self._ros2_available: bool = _ROS2_AVAILABLE and node is not None
self._loggingOn: bool = loggingOn
if self._ros2_available and ROS2Handler:
self._handler_classes.append(ROS2Handler)
[docs]
async def start(self) -> None:
"""Starts the feeder by initializing handlers.
Automatically resolves the transport protocol from the module's
interface config (reads ``NewsFeed`` entry, picks ``tags``).
"""
paths = get_interface_registry().get_interface_paths()
if paths:
self.set_interface_paths(paths)
await self.create(loggingOn=self._loggingOn)
[docs]
async def feed(self, newsElement: Union[NewsEntry, str, list]) -> None:
"""
Feed a news entry to the feeder.
Normalises the input to a :class:`~vyra_base.defaults.entries.NewsEntry`,
then delegates to :meth:`~BaseFeeder.feed` which calls
:meth:`_prepare_entry_for_publish` followed by
:class:`~vyra_base.com.feeder.message_mapper.MessageMapper` for
protocol-aware conversion.
:param newsElement: The news entry to be fed. Can be a string or list of strings
to be processed into a NewsEntry, or a NewsEntry object.
:type newsElement: Union[NewsEntry, str, list]
:raises FeederException: If the type of newsElement is not supported.
"""
if isinstance(newsElement, str) or isinstance(newsElement, list):
newsfeed_entry = self.build_newsfeed(newsElement)
elif isinstance(newsElement, NewsEntry):
newsfeed_entry = newsElement
else:
raise FeederException(f"Wrong Type. Expect: NewsEntry, got {type(newsElement)}")
await super().feed(newsfeed_entry)
[docs]
def feed_sync(self, msg: Any) -> None:
"""Sync version of feed() — normalises input to NewsEntry before delegating."""
if isinstance(msg, str) or isinstance(msg, list):
msg = self.build_newsfeed(msg)
elif not isinstance(msg, NewsEntry):
raise FeederException(f"Wrong Type. Expect: NewsEntry or str, got {type(msg)}")
return super().feed_sync(msg)
[docs]
def register_news_condition(
self,
condition_function: Callable[[dict[str, Any]], bool],
*,
name: Optional[str] = None,
execution_point: ExecutionPoint = "ALWAYS",
success_message: Optional[str] = None,
failure_message: Optional[str] = None,
tag: str = "news",
) -> str:
"""Register a synchronous bool condition for tracked feeder messages."""
return self.register_condition(
condition_function,
name=name,
tag=tag,
execution_point=execution_point,
success_message=success_message,
failure_message=failure_message,
)
[docs]
def monitor(
self,
*,
tag: str = "news",
label: Optional[str] = None,
severity: str = "INFO",
entity: Any = None,
during_interval_seconds: float = 0.05,
) -> Callable:
"""Return a runtime-monitoring decorator bound to this feeder."""
return FeedTracker(self).monitor(
tag=tag,
label=label,
severity=severity,
entity=entity,
during_interval_seconds=during_interval_seconds,
)
[docs]
async def evaluate_tracked_conditions(self, context: dict[str, Any]) -> None:
"""Evaluate registered conditions and publish resulting tracker messages."""
outputs = self.evaluate_conditions(context)
entity = context.get("entity")
for tag, message in outputs:
if tag == "news":
await self.feed(message)
continue
feeder = getattr(entity, f"{tag}_feeder", None) if entity is not None else None
if feeder is not None and hasattr(feeder, "feed"):
await feeder.feed(message)
else:
logger.warning("No feeder found for tracked condition tag '%s'", tag)
[docs]
def evaluate_tracked_conditions_sync(self, context: dict[str, Any]) -> None:
"""Sync variant of :meth:`evaluate_tracked_conditions`."""
outputs = self.evaluate_conditions(context)
entity = context.get("entity")
for tag, message in outputs:
if tag == "news":
self.feed_sync(message)
continue
feeder = getattr(entity, f"{tag}_feeder", None) if entity is not None else None
if feeder is None:
logger.warning("No feeder found for tracked condition tag '%s'", tag)
continue
if hasattr(feeder, "feed_sync"):
feeder.feed_sync(message)
continue
if hasattr(feeder, "feed"):
loop = asyncio.get_event_loop()
if loop.is_running():
loop.create_task(feeder.feed(message))
else:
loop.run_until_complete(feeder.feed(message))
def _prepare_entry_for_publish(self, entry: NewsEntry) -> dict: # type: ignore[override]
"""
Convert a :class:`~vyra_base.defaults.entries.NewsEntry` to a
wire-ready dict whose keys match the transport interface field names
(``VBASENewsFeed``).
"""
try:
level_name = (
NewsEntry.MESSAGE_LEVEL(entry.level).name
if isinstance(entry.level, int)
else str(entry.level)
)
except (ValueError, KeyError):
level_name = str(entry.level)
return {
'type': level_name,
'message': str(entry.message) if entry.message else '',
'timestamp': entry.timestamp if entry.timestamp else datetime.now(),
'module_id': str(entry.module_id) if entry.module_id else '',
'module_name': str(entry.module_name) if entry.module_name else '',
}
[docs]
def build_newsfeed(self, *args: Any) -> NewsEntry:
"""
Build a well structured newsfeed entry from plain text and module information.
:param args: The arguments to be processed into a news entry. Can be a string or list of strings.
:type args: Any
:return: A structured news entry containing the message, level, timestamp, UUID,
module name, module ID, module template, and type.
:rtype: NewsEntry
:raises FeederException: If the type of the message level is not valid.
"""
message: str = (''.join(map(str, args))).split('|')[-1]
ansi_escape = re.compile(
r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
message = ansi_escape.sub('', message)
message = message.replace('__call__', 'Call')
message = message.replace('__init__', 'Initialize')
message_level = extract_level_from_msg(message)
if message_level is None:
message_level = NewsEntry.MESSAGE_LEVEL.INFO
if message_level not in NewsEntry.MESSAGE_LEVEL:
raise FeederException(
f"Invalid type for newsfeed entry: {type}. "
f"Expected one of {NewsEntry.MESSAGE_LEVEL.__members__}."
)
newsfeed_entry: NewsEntry = NewsEntry(
level= message_level.value,
message= message,
timestamp= datetime.now(),
uuid= uuid.uuid4(),
module_name= self._module_entity.name,
module_id= self._module_entity.uuid,
)
return newsfeed_entry