from __future__ import annotations
import logging
import uuid
from datetime import datetime
from typing import Any, Union, Optional, Callable
# Check ROS2 availability
try:
import rclpy
_ROS2_AVAILABLE = True
except ImportError:
_ROS2_AVAILABLE = False
if _ROS2_AVAILABLE:
from vyra_base.com.transport.t_ros2 import Ros2TypeConverter
from vyra_base.com.handler.ros2 import ROS2Handler
from vyra_base.defaults.entries import ErrorEntry, ModuleEntry
from vyra_base.defaults.exceptions import FeederException
from vyra_base.com.core.interface_path_registry import get_interface_registry
from vyra_base.com.feeder.tracking import FeedTracker, feed_tracker
from .feeder import BaseFeeder
logger = logging.getLogger(__name__)
[docs]
class ErrorFeeder(BaseFeeder):
"""
Collection of the error messages.
:param node: The VyraNode instance associated with this feeder.
:type node: Optional[Any]
:param module_entity: The module configuration entry.
:type module_entity: ModuleEntry
:param loggingOn: Flag to enable or disable logging next to feeding. Defaults to False.
:type loggingOn: bool, Optional
:raises FeederException: If the publisher cannot be created.
"""
[docs]
def __init__(
self,
node: Optional[Any],
module_entity: ModuleEntry,
loggingOn: bool = True
):
super().__init__()
# _feederName must match the "functionname" in the interface config JSON
self._feederName: str = 'ErrorFeed'
self._doc: str = 'Collect error messages of this module.'
self._level: int = logging.ERROR
self._node: Optional[Any] = node
self._module_entity: ModuleEntry = module_entity
self._ros2_available: bool = _ROS2_AVAILABLE and node is not None
self._loggingOn: bool = loggingOn
if self._ros2_available and ROS2Handler is not None:
self._handler_classes.append(ROS2Handler)
[docs]
async def start(self) -> None:
"""Starts the feeder by initializing handlers.
Automatically resolves the transport protocol from the module's
interface config (reads ``ErrorFeed`` entry, picks ``tags``).
"""
paths = get_interface_registry().get_interface_paths()
if paths:
self.set_interface_paths(paths)
await self.create(loggingOn=self._loggingOn)
[docs]
async def feed(self, errorElement: Union[ErrorEntry, dict]) -> None:
"""
Feed an error entry to the feeder.
Normalises the input to an :class:`~vyra_base.defaults.entries.ErrorEntry`,
then delegates to :meth:`~BaseFeeder.feed` which calls
:meth:`_prepare_entry_for_publish` followed by
:class:`~vyra_base.com.feeder.message_mapper.MessageMapper` for
protocol-aware conversion.
:param errorElement: The error entry to be fed. Can be a dictionary with error
details or an :class:`~vyra_base.defaults.entries.ErrorEntry` object.
:type errorElement: Union[ErrorEntry, dict]
:raises FeederException: If the type of errorElement is neither a dict nor an
:class:`~vyra_base.defaults.entries.ErrorEntry`.
"""
if isinstance(errorElement, dict):
errorfeed_entry = self.build_errorfeed(errorElement)
elif isinstance(errorElement, ErrorEntry):
errorfeed_entry = errorElement
else:
raise FeederException(f"Wrong Type. Expect: ErrorEntry, got {type(errorElement)}")
# Ensure timestamp and uuid are set
if errorfeed_entry.timestamp is None:
errorfeed_entry.timestamp = datetime.now()
if errorfeed_entry.uuid is None:
errorfeed_entry.uuid = uuid.uuid4()
await super().feed(errorfeed_entry)
[docs]
def feed_sync(self, errorElement: Union[ErrorEntry, dict]) -> None:
"""Sync version of :meth:`feed` with ErrorEntry normalization."""
if isinstance(errorElement, dict):
errorfeed_entry = self.build_errorfeed(errorElement)
elif isinstance(errorElement, ErrorEntry):
errorfeed_entry = errorElement
else:
raise FeederException(f"Wrong Type. Expect: ErrorEntry, got {type(errorElement)}")
if errorfeed_entry.timestamp is None:
errorfeed_entry.timestamp = datetime.now()
if errorfeed_entry.uuid is None:
errorfeed_entry.uuid = uuid.uuid4()
super().feed_sync(errorfeed_entry)
[docs]
def monitor(
self,
*,
tag: str = "error",
label: Optional[str] = None,
severity: str = "WARNING",
entity: Any = None,
during_interval_seconds: float = 0.05,
) -> Callable:
"""Return an exception-monitoring decorator bound to this feeder."""
return FeedTracker(self).monitor(
tag=tag,
label=label,
severity=severity,
entity=entity,
during_interval_seconds=during_interval_seconds,
)
def _prepare_entry_for_publish(self, entry: ErrorEntry) -> dict: # type: ignore[override]
"""
Convert an :class:`~vyra_base.defaults.entries.ErrorEntry` to a
wire-ready dict whose keys match the transport interface field names
(``VBASEErrorFeed``).
"""
return {
'error_id': str(entry.uuid) if entry.uuid else '',
'error_code': int(entry.code or 0),
'module_id': str(entry.module_id) if entry.module_id else '',
'description': str(entry.description or ''),
'solution': str(entry.solution or ''),
'miscellaneous': str(entry.miscellaneous or ''),
'timestamp': entry.timestamp or datetime.now(),
}
[docs]
def build_errorfeed(self, errorDict: dict) -> ErrorEntry:
"""
Build an error entry from the given keyword arguments.
:param errorDict: A dictionary containing error details. Keys are:
- ``code``: int16 - Error code (default: 0x00000000)
- ``uuid``: UUID - Unique identifier for the error (default: a new UUID)
- ``description``: str - Description of the error (default: '')
- ``solution``: str - Suggested solution for the error (default: '')
- ``miscellaneous``: str - Additional information (default: '')
- ``level``: ErrorEntry.ERROR_LEVEL - Level of the error (default: ErrorEntry.ERROR_LEVEL.MINOR_FAULT)
:type errorDict: dict
:return: An instance of :class:`vyra_base.defaults.entries.ErrorEntry` populated with the provided details.
:rtype: ErrorEntry
"""
errorfeed_entry = ErrorEntry(
code=errorDict.get('error_code', 0x00000000),
module_id=self._module_entity.uuid,
module_name=self._module_entity.name,
uuid=errorDict.get('uuid', uuid.uuid4()),
timestamp=datetime.now(),
description=errorDict.get('description', ''),
solution=errorDict.get('solution', ''),
miscellaneous=errorDict.get('miscellaneous', ''),
level=errorDict.get('level', ErrorEntry.ERROR_LEVEL.MINOR_FAULT).value
)
return errorfeed_entry