Quellcode für vyra_base.com.handler.ros2

import logging
from logging import LogRecord
from typing import Any

from rclpy.node import Node

from vyra_base.com.handler.communication import CommunicationHandler
from vyra_base.com.core.types import VyraPublisher
from vyra_base.helper.error_handler import ErrorTraceback

logger = logging.getLogger(__name__)


[Doku] class ROS2Handler(CommunicationHandler): """ Abstract class for all transport communication handlers. :cvar __handlerName__: Name of the handler. :type __handlerName__: str :cvar __doc__: Documentation string for the handler. :type __doc__: str :param initiator: The initiator of the handler. :type initiator: str :param publisher: The publisher instance to use. :type publisher: VyraPublisher :param type: The ROS2 message type. :type type: Any """ __handlerName__: str = 'ROS2Handler' __doc__: str = 'ROS2 communication handler'
[Doku] def __init__(self, initiator: str, publisher: VyraPublisher, type: Any): """ Initialize the ROS2Handler. :param initiator: The initiator of the handler. :type initiator: str :param publisher: The publisher instance to use. :type publisher: VyraPublisher :param type: The ROS2 message type. :type type: Any """ self._initiator = initiator self._publisher: VyraPublisher = publisher self._type: Any = type super().__init__()
[Doku] async def emit(self, record: LogRecord): """ Emit a log record by publishing it as a ROS2 message. Converts Python log records to ROS2 message format and publishes to configured topic. Maps fields from log record to ROS2 message type. :param record: Python log record to publish. :type record: LogRecord """ try: logger.debug( f"{self._initiator} instruct {ROS2Handler.__handlerName__} " f"publish -> {record.msg}" ) record_msg: Any = record.msg ros_msg: Any = self._type() record_fields = [f.lstrip('_') for f in record_msg.__slots__] # type_fields = [f.lstrip('_') for f in self._type.__slots__] type_fields = list(self._type.get_fields_and_field_types().keys()) for field in type_fields: value = getattr(record_msg, field, '__not_found__') if value == '__not_found__': logger.warn( f"Ros2-Field '{field}' not found in msg-type " f"{record_msg.__class__.__name__}. " f"Needed fields are: {type_fields}. " f"Provided message fields are: {record_fields}. " "Abort publishing this message." ) return None setattr(ros_msg, field, value) await self._publisher.publish(ros_msg) finally: ErrorTraceback.check_error_exist()