Quellcode für vyra_base.com.transport.t_ros2.node

import re
import asyncio

from dataclasses import dataclass, field
from rclpy.node import Node


[Doku] @dataclass class NodeSettings: """ Settings for a Vyra node. :ivar name: Name of the node. :vartype name: str :ivar parameters: Dictionary of parameters for the node. :vartype parameters: dict """ name: str = 'Vyra_node' parameters: dict = field(default_factory=dict)
[Doku] class VyraNode(Node): """ Vyra node class. :param node_settings: NodeSettings object containing the node's settings. :type node_settings: NodeSettings """
[Doku] def __init__(self, node_settings: NodeSettings) -> None: super().__init__(node_settings.name) self._node_settings: NodeSettings = node_settings self.reload_event = asyncio.Event()
[Doku] def set_reload(self) -> None: """ Set the reload event to notify that the node settings have changed. """ self.reload_event.set()
@property def node_settings(self) -> NodeSettings: """ Get the node settings. :return: NodeSettings object containing the node's settings. :rtype: NodeSettings """ return self._node_settings
[Doku] class CheckerNode(Node): """ Node to check the availability of other nodes. Note: Uses enable_rosout=False to avoid SROS2 permission issues with the /rosout topic during node availability checks. :param enable_rosout: Whether to enable the rosout logger. :type enable_rosout: bool """
[Doku] def __init__(self, enable_rosout: bool = False) -> None: super().__init__( 'checker_node', enable_rosout=enable_rosout # Disable rosout for SROS2 compatibility )
[Doku] def is_node_available(self, node_name: str) -> bool: """ Check if a node with the given name is available. :param node_name: Name of the node to check. :type node_name: str :return: True if the node is available, False otherwise. :rtype: bool """ node_names_and_namespaces = self.get_node_names_and_namespaces() return any(name == node_name for name, _ in node_names_and_namespaces)
[Doku] @staticmethod def check_node_name(node_name: str) -> bool: """ Check if the node name is valid. :param node_name: Name of the node to check. :type node_name: str :return: True if the node name is valid, False otherwise. :rtype: bool """ return bool(re.match(r'^[a-zA-Z0-9_/]*$', node_name))