"""
ROS2 Protocol Provider
Implements AbstractProtocolProvider for ROS2 transport.
Provides distributed communication via DDS middleware.
"""
from __future__ import annotations
import asyncio
from ctypes import ArgumentError
import logging
from typing import Any, Callable, Optional, Dict, TYPE_CHECKING
from vyra_base.com.core.topic_builder import TopicBuilder
from vyra_base.com.core.types import (
ProtocolType,
# New unified types
VyraPublisher,
VyraSubscriber,
VyraServer,
VyraClient,
VyraActionServer,
VyraActionClient
)
from vyra_base.com.core.exceptions import (
ProtocolUnavailableError,
ProviderError,
)
from vyra_base.com.transport.t_ros2.node import NodeSettings, VyraNode
from vyra_base.com.transport.t_ros2.vyra_models import (
# New unified
VyraPublisherImpl,
VyraSubscriberImpl,
VyraServerImpl,
VyraClientImpl,
VyraActionServerImpl,
VyraActionClientImpl
)
from vyra_base.com.providers.protocol_provider import AbstractProtocolProvider
from vyra_base.com.core.callback_registry import CallbackRegistry
logger = logging.getLogger(__name__)
# Check if rclpy is available
try:
import rclpy
from rclpy.node import Node
ROS2_AVAILABLE = True
except ImportError:
ROS2_AVAILABLE = False
logger.warning(
"⚠️ rclpy not available. ROS2 transport disabled. "
"Install ROS2 and source setup.bash"
)
[Doku]
class ROS2Provider(AbstractProtocolProvider):
"""
Protocol provider for ROS2 transport.
Features:
- Distributed communication via DDS
- Quality of Service (QoS) policies
- Discovery and introspection
- SROS2 security support
Requirements:
- ROS2 installation
- rclpy package
- Source /opt/ros/<distro>/setup.bash
Example:
>>> # Initialize provider
>>> provider = ROS2Provider(ProtocolType.ROS2)
>>>
>>> if await provider.check_availability():
... await provider.initialize(config={
... "node_name": "my_module",
... "namespace": "/vyra"
... })
...
... # Create server (service server)
... async def handle_request(req):
... return {"result": req.value * 2}
...
... server = await provider.create_server(
... "calculate",
... handle_request,
... service_type="example_interfaces/srv/AddTwoInts"
... )
...
... # Create publisher (topic publisher)
... publisher = await provider.create_publisher(
... "sensor_data",
... message_type="std_msgs/msg/String"
... )
"""
[Doku]
def __init__(
self,
module_name: str,
module_id: str,
protocol: ProtocolType = ProtocolType.ROS2,
):
"""
Initialize ROS2 provider.
Args:
protocol: Protocol type (must be ROS2)
node_name: Default node name
"""
super().__init__(protocol)
self.node_name = f"{module_name}_{module_id}"
self._node: Optional[VyraNode] = None
self._available = ROS2_AVAILABLE # Set availability based on rclpy presence
# Default configuration
self._config = {
"node_name": self.node_name,
"namespace": "",
"use_sim_time": False,
"enable_rosout": True,
"parameter_overrides": None,
}
self._topic_builder = TopicBuilder(module_name, module_id)
[Doku]
async def check_availability(self) -> bool:
"""
Check if ROS2 is available.
Returns:
bool: True if rclpy is installed and importable
"""
self._available = ROS2_AVAILABLE
if not self._available:
logger.warning(
"⚠️ ROS2 transport not available. "
"Install ROS2 and source setup.bash"
)
else:
logger.info("✅ ROS2 transport available")
return self._available
[Doku]
async def initialize(self, config: Optional[Dict[str, Any]] = None) -> bool:
"""
Initialize ROS2 provider.
Args:
config: Optional configuration
- node_name: ROS2 node name
- namespace: Node namespace
- use_sim_time: Use simulation time
- enable_rosout: Enable rosout logging
- parameter_overrides: Node parameter overrides
Returns:
bool: True if initialization successful
"""
available = await self.check_availability()
if not available:
raise ProtocolUnavailableError(
"ROS2 transport not available. "
"Install ROS2 and source setup.bash"
)
if self._initialized:
logger.warning("⚠️ Provider already initialized")
return True
# Update configuration
if config:
self._config.update(config)
logger.info(
f"🚀 Initializing ROS2 provider: {self._config['node_name']}"
)
try:
# Initialize rclpy if not already done
if not rclpy.ok():
rclpy.init()
logger.debug("✅ rclpy initialized")
# Create node (we'll use the existing VyraNode or create a basic one)
# Note: In actual usage, this will be integrated with existing VyraNode
node_settings = NodeSettings(
name=self._config["node_name"],
parameters=self._config.get("parameter_overrides", {})
)
self._node = VyraNode(node_settings)
self._initialized = True
logger.info(
f"✅ ROS2 provider initialized: {self._config['node_name']}"
)
return True
except Exception as e:
logger.error(f"❌ Failed to initialize ROS2 provider: {e}")
return False
[Doku]
async def shutdown(self) -> None:
"""Shutdown the provider and cleanup resources."""
if not self._initialized:
return
logger.info(f"🛑 Shutting down ROS2 provider: {self.node_name}")
# Destroy node
if self._node:
self._node.destroy_node()
self._node = None
# Shutdown rclpy
if rclpy.ok():
rclpy.shutdown()
logger.debug("✅ rclpy shutdown")
self._initialized = False
logger.info("✅ ROS2 provider shutdown complete")
# ========================================================================
# UNIFIED INTERFACE CREATION METHODS
# ========================================================================
[Doku]
async def create_publisher(
self,
name: str,
**kwargs
) -> VyraPublisher:
"""Create ROS2 publisher."""
self.require_initialization()
topic_builder = kwargs.pop("topic_builder", None) or self._topic_builder
message_type = self._resolve_ros2_type(kwargs.pop("message_type", None))
# Only accept message_type if it's a valid ROS2 type (has _TYPE_SUPPORT).
# Generic Python types like dict are not valid for ROS2; fall through
# to topic_builder resolution in that case.
if not message_type or not hasattr(message_type, '_TYPE_SUPPORT'):
message_type = topic_builder.load_interface_type(name, self.protocol)
if not message_type:
raise ArgumentError("message_type is required for ROS2 publisher")
node = kwargs.pop("node", None)
qos_profile = kwargs.pop("qos_profile", None)
logger.info(f"🔧 Creating ROS2 publisher: {name} (type: {message_type})")
publisher = VyraPublisherImpl(
name=name,
topic_builder=topic_builder,
node=node or self._node,
message_type=message_type,
qos_profile=qos_profile,
**kwargs
)
await publisher.initialize()
logger.info(f"✅ ROS2 publisher created: {name}")
return publisher
[Doku]
async def create_subscriber(
self,
name: str,
subscriber_callback: Optional[Callable],
**kwargs
) -> VyraSubscriber:
"""Create ROS2 subscriber."""
self.require_initialization()
topic_builder = kwargs.pop("topic_builder", None) or self._topic_builder
message_type = self._resolve_ros2_type(kwargs.pop("message_type", None))
if not message_type or not hasattr(message_type, '_TYPE_SUPPORT'):
message_type = topic_builder.load_interface_type(name, self.protocol)
if not message_type:
raise ArgumentError("message_type is required for ROS2 subscriber")
if subscriber_callback is None:
_bp = CallbackRegistry.get_blueprint(name)
if _bp and _bp.is_bound():
subscriber_callback = _bp.callback
else:
raise ProviderError(
f"No subscriber_callback provided for '{name}' and no bound blueprint found in CallbackRegistry"
)
node = kwargs.pop("node", None)
qos_profile = kwargs.pop("qos_profile", None)
logger.info(f"🔧 Creating ROS2 subscriber: {name} (type: {message_type})")
subscriber = VyraSubscriberImpl(
name=name,
topic_builder=topic_builder,
node=node or self._node,
message_type=message_type,
subscriber_callback=subscriber_callback,
qos_profile=qos_profile,
**kwargs
)
await subscriber.initialize()
logger.info(f"✅ ROS2 subscriber created: {name}")
return subscriber
@staticmethod
def _resolve_ros2_type(service_type: Any) -> Optional[type]:
"""Extract the actual ROS2 class from a mixed list or return the type as-is.
``interfacetypes`` passed from the interface builder is a list that may
contain both string file names (e.g. ``"ServiceTest.proto"``) and the
resolved Python class for ROS2 (e.g.
``<class 'v2_modulemanager_interfaces.srv._service_test.ServiceTest'>``).
ROS2 needs the bare class; all non-class entries are ignored.
"""
if service_type is None:
return None
if isinstance(service_type, list):
for item in service_type:
if not isinstance(item, str):
return item
return None # list contained only strings (proto names) → no ROS2 type
return service_type # already a class or module
[Doku]
async def create_server(
self,
name: str,
response_callback: Optional[Callable],
**kwargs
) -> VyraServer:
"""Create ROS2 service server."""
self.require_initialization()
topic_builder = kwargs.pop("topic_builder", None) or self._topic_builder
service_type = self._resolve_ros2_type(kwargs.pop("service_type", None))
if not service_type:
service_type = topic_builder.load_interface_type(name, self.protocol)
if not service_type:
raise ArgumentError("service_type is required for ROS2 server")
if response_callback is None:
_bp = CallbackRegistry.get_blueprint(name)
if _bp and _bp.is_bound():
response_callback = _bp.callback
else:
raise ProviderError(
f"No response_callback provided for server '{name}' and no bound blueprint found in CallbackRegistry"
)
node = kwargs.pop("node", None)
qos_profile = kwargs.pop("qos_profile", None)
logger.info(f"🔧 Creating ROS2 server: {name} (type: {service_type})")
server = VyraServerImpl(
name=name,
topic_builder=topic_builder,
node=node or self._node,
service_type=service_type,
response_callback=response_callback,
qos_profile=qos_profile,
**kwargs
)
await server.initialize()
logger.info(f"✅ ROS2 server created: {name}")
return server
[Doku]
async def create_client(
self,
name: str,
topic_builder: Optional[TopicBuilder] = None,
service_type: Optional[type] = None,
request_callback: Optional[Callable] = None,
**kwargs
) -> VyraClient:
"""
Create ROS2 service client.
Args:
name: Service name
topic_builder: TopicBuilder instance. When provided (e.g. injected by
the factory for cross-module proxy calls), it is used for both
interface-type discovery and topic-name building, so that the
correct module-specific interface paths are consulted. Falls back
to the provider's own ``_topic_builder`` when omitted.
service_type: Explicit ROS2 service type class. Resolved via
``topic_builder.load_interface_type`` when not supplied.
request_callback: Optional async callback for responses.
**kwargs: Additional parameters forwarded to ``VyraClientImpl``
(node, qos_profile, …).
"""
self.require_initialization()
# Use injected topic_builder first so cross-module proxy calls resolve
# interface types from the target module's interface package rather than
# the provider's default (which only covers the local vyra_base interfaces).
effective_topic_builder = topic_builder or self._topic_builder
# service_type may be a mixed list from interface_builder; extract the class
service_type = self._resolve_ros2_type(service_type)
if not service_type:
service_type = effective_topic_builder.load_interface_type(name, self.protocol)
if not service_type:
raise ArgumentError("service_type is required for ROS2 client")
node = kwargs.pop("node", None)
qos_profile = kwargs.pop("qos_profile", None)
logger.info(f"🔧 Creating ROS2 client: {name} (type: {service_type})")
client = VyraClientImpl(
name=name,
topic_builder=effective_topic_builder,
node=node or self._node,
service_type=service_type,
request_callback=request_callback,
qos_profile=qos_profile,
**kwargs
)
await client.initialize()
logger.info(f"✅ ROS2 client created: {name}")
return client
[Doku]
async def create_action_server(
self,
name: str,
handle_goal_request: Optional[Callable] = None,
handle_cancel_request: Optional[Callable] = None,
execution_callback: Optional[Callable] = None,
**kwargs
) -> VyraActionServer:
"""Create ROS2 action server."""
self.require_initialization()
topic_builder = kwargs.pop("topic_builder", None) or self._topic_builder
action_type = self._resolve_ros2_type(kwargs.pop("action_type", None))
if not action_type:
action_type = topic_builder.load_interface_type(name, self.protocol)
if not action_type:
raise ArgumentError("action_type is required for ROS2 action server")
if execution_callback is None:
_bp = CallbackRegistry.get_blueprint(name)
if _bp and _bp.is_bound():
execution_callback = _bp.callback
else:
raise ProviderError(
f"No execution_callback provided for action server '{name}' and no bound blueprint found in CallbackRegistry"
)
if handle_goal_request is None:
_bp = CallbackRegistry.get_blueprint(name)
if _bp is not None:
handle_goal_request = getattr(_bp, 'get_callback', lambda x: None)('on_goal')
if handle_cancel_request is None:
_bp = CallbackRegistry.get_blueprint(name)
if _bp is not None:
handle_cancel_request = getattr(_bp, 'get_callback', lambda x: None)('on_cancel')
node = kwargs.pop("node", None)
logger.info(f"🔧 Creating ROS2 action server: {name} (type: {action_type})")
action_server = VyraActionServerImpl(
name=name,
topic_builder=topic_builder,
node=node or self._node,
action_type=action_type,
handle_goal_request=handle_goal_request,
handle_cancel_request=handle_cancel_request,
execution_callback=execution_callback,
**kwargs
)
await action_server.initialize()
logger.info(f"✅ ROS2 action server created: {name}")
return action_server
[Doku]
async def create_action_client(
self,
name: str,
direct_response_callback: Optional[Callable] = None,
feedback_callback: Optional[Callable] = None,
goal_callback: Optional[Callable] = None,
**kwargs
) -> VyraActionClient:
"""Create ROS2 action client."""
self.require_initialization()
topic_builder = kwargs.pop("topic_builder", None) or self._topic_builder
action_type = self._resolve_ros2_type(kwargs.pop("action_type", None))
if not action_type:
action_type = topic_builder.load_interface_type(name, self.protocol)
if not action_type:
raise ArgumentError("action_type is required for ROS2 action client")
node = kwargs.pop("node", None)
logger.info(f"🔧 Creating ROS2 action client: {name} (type: {action_type})")
action_client = VyraActionClientImpl(
name=name,
topic_builder=topic_builder,
node=node or self._node,
action_type=action_type,
direct_response_callback=direct_response_callback,
feedback_callback=feedback_callback,
goal_callback=goal_callback,
**kwargs
)
await action_client.initialize()
logger.info(f"✅ ROS2 action client created: {name}")
return action_client
[Doku]
def get_node(self) -> Node:
"""
Get the ROS2 node.
Returns:
rclpy Node instance
"""
if not self._initialized or not self._node:
raise ProviderError("Provider not initialized or node not available")
return self._node