Quellcode für vyra_base.com.handler.ros2
import logging
from logging import LogRecord
from typing import Any
from rclpy.node import Node
from vyra_base.com.handler.communication import CommunicationHandler
from vyra_base.com.core.types import VyraPublisher
from vyra_base.helper.error_handler import ErrorTraceback
logger = logging.getLogger(__name__)
[Doku]
class ROS2Handler(CommunicationHandler):
"""
Abstract class for all transport communication handlers.
:cvar __handlerName__: Name of the handler.
:type __handlerName__: str
:cvar __doc__: Documentation string for the handler.
:type __doc__: str
:param initiator: The initiator of the handler.
:type initiator: str
:param publisher: The publisher instance to use.
:type publisher: VyraPublisher
:param type: The ROS2 message type.
:type type: Any
"""
__handlerName__: str = 'ROS2Handler'
__doc__: str = 'ROS2 communication handler'
[Doku]
def __init__(self, initiator: str, publisher: VyraPublisher, type: Any):
"""
Initialize the ROS2Handler.
:param initiator: The initiator of the handler.
:type initiator: str
:param publisher: The publisher instance to use.
:type publisher: VyraPublisher
:param type: The ROS2 message type.
:type type: Any
"""
self._initiator = initiator
self._publisher: VyraPublisher = publisher
self._type: Any = type
super().__init__()
[Doku]
async def emit(self, record: LogRecord):
"""
Emit a log record by publishing it as a ROS2 message.
Converts Python log records to ROS2 message format and publishes
to configured topic. Maps fields from log record to ROS2 message type.
:param record: Python log record to publish.
:type record: LogRecord
"""
try:
logger.debug(
f"{self._initiator} instruct {ROS2Handler.__handlerName__} "
f"publish -> {record.msg}"
)
record_msg: Any = record.msg
ros_msg: Any = self._type()
record_fields = [f.lstrip('_') for f in record_msg.__slots__]
# type_fields = [f.lstrip('_') for f in self._type.__slots__]
type_fields = list(self._type.get_fields_and_field_types().keys())
for field in type_fields:
value = getattr(record_msg, field, '__not_found__')
if value == '__not_found__':
logger.warn(
f"Ros2-Field '{field}' not found in msg-type "
f"{record_msg.__class__.__name__}. "
f"Needed fields are: {type_fields}. "
f"Provided message fields are: {record_fields}. "
"Abort publishing this message."
)
return None
setattr(ros_msg, field, value)
await self._publisher.publish(ros_msg)
finally:
ErrorTraceback.check_error_exist()