Source code for vyra_base.com.transport.t_ros2.node
import re
import asyncio
from dataclasses import dataclass, field
from rclpy.node import Node
[docs]
@dataclass
class NodeSettings:
"""
Settings for a Vyra node.
:ivar name: Name of the node.
:vartype name: str
:ivar parameters: Dictionary of parameters for the node.
:vartype parameters: dict
"""
name: str = 'Vyra_node'
parameters: dict = field(default_factory=dict)
[docs]
class VyraNode(Node):
"""
Vyra node class.
:param node_settings: NodeSettings object containing the node's settings.
:type node_settings: NodeSettings
"""
[docs]
def __init__(self, node_settings: NodeSettings) -> None:
super().__init__(node_settings.name)
self._node_settings: NodeSettings = node_settings
self.reload_event = asyncio.Event()
[docs]
def set_reload(self) -> None:
"""
Set the reload event to notify that the node settings have changed.
"""
self.reload_event.set()
@property
def node_settings(self) -> NodeSettings:
"""
Get the node settings.
:return: NodeSettings object containing the node's settings.
:rtype: NodeSettings
"""
return self._node_settings
[docs]
class CheckerNode(Node):
"""
Node to check the availability of other nodes.
Note: Uses enable_rosout=False to avoid SROS2 permission issues
with the /rosout topic during node availability checks.
:param enable_rosout: Whether to enable the rosout logger.
:type enable_rosout: bool
"""
[docs]
def __init__(self, enable_rosout: bool = False) -> None:
super().__init__(
'checker_node',
enable_rosout=enable_rosout # Disable rosout for SROS2 compatibility
)
[docs]
def is_node_available(self, node_name: str) -> bool:
"""
Check if a node with the given name is available.
:param node_name: Name of the node to check.
:type node_name: str
:return: True if the node is available, False otherwise.
:rtype: bool
"""
node_names_and_namespaces = self.get_node_names_and_namespaces()
return any(name == node_name for name, _ in node_names_and_namespaces)
[docs]
@staticmethod
def check_node_name(node_name: str) -> bool:
"""
Check if the node name is valid.
:param node_name: Name of the node to check.
:type node_name: str
:return: True if the node name is valid, False otherwise.
:rtype: bool
"""
return bool(re.match(r'^[a-zA-Z0-9_/]*$', node_name))