ROS2 Transport Module¶
Provides ROS2/DDS-based transport implementation with layered architecture.
Overview¶
The ROS2 transport module implements the AbstractProtocolProvider interface
for Robot Operating System 2 (ROS2) communication using DDS middleware.
Architecture Layers:
communication/: Core ROS2functionality (Services, Topics, Actions)
vyra_models/: VYRA abstractions (ROS2Callable, ROS2Speaker, ROS2Job)
node.py: ROS2 node management and lifecycle
provider.py: Protocol provider implementation
Features:
✅ Service-based request-response (ROS2Callable)
✅ Topic-based Pub/Sub (ROS2Speaker)
✅ Action-based long-running tasks (ROS2Job)
✅ Server/Client pattern with
is_callableandis_jobflags✅ Type conversion utilities
✅ Node lifecycle management
✅ QoS policy configuration
✅ SROS2 security support
✅ Dynamic interface loading
Usage¶
Basic Provider Setup¶
from vyra_base.com.transport.t_ros2 import ROS2Provider, ROS2_AVAILABLE
from vyra_base.com.core.types import ProtocolType
if ROS2_AVAILABLE:
# Create provider
provider = ROS2Provider(
module_name="my_module",
module_id="abc123",
protocol=ProtocolType.ROS2
)
# Initialize with config
await provider.initialize(config={
"node_name": "my_node",
"namespace": "/my_namespace"
})
Create Service Server¶
from example_interfaces.srv import AddTwoInts
# Server callback
async def handle_request(request, response):
response.sum = request.a + request.b
return response
# Create service server (is_callable=True)
server = await provider.create_callable(
name="add_service",
callback=handle_request,
service_type=AddTwoInts
)
Create Service Client¶
# Create service client (is_callable=False)
client = await provider.create_callable(
name="add_service",
callback=None, # No callback for client
service_type=AddTwoInts,
is_callable=False
)
# Call service
request = AddTwoInts.Request(a=5, b=3)
response = await client.call(request, timeout=5.0)
Create Publisher¶
from std_msgs.msg import String
# Create publisher (is_publisher=True)
publisher = await provider.create_speaker(
name="my_topic",
message_type=String,
is_publisher=True,
qos_profile=10
)
# Publish message
await publisher.shout(String(data="Hello"))
Create Subscriber¶
# Subscriber callback
def on_message(msg):
print(f"Received: {msg.data}")
# Create subscriber (is_publisher=False)
subscriber = await provider.create_speaker(
name="my_topic",
message_type=String,
is_publisher=False,
qos_profile=10
)
# Start listening
await subscriber.listen(on_message)
Create Action Server¶
from example_interfaces.action import Fibonacci
# Action server callback
async def execute_fibonacci(goal, feedback_callback=None):
sequence = [0, 1]
for i in range(1, goal.order):
sequence.append(sequence[-1] + sequence[-2])
if feedback_callback:
await feedback_callback({"sequence": sequence})
return {"sequence": sequence}
# Create action server (is_job=True)
action_server = await provider.create_job(
name="fibonacci",
callback=execute_fibonacci,
action_type=Fibonacci
)
Create Action Client¶
# Feedback callback
def on_feedback(feedback):
print(f"Progress: {feedback['sequence']}")
# Create action client (is_job=False)
action_client = await provider.create_job(
name="fibonacci",
callback=None, # No callback for client
action_type=Fibonacci,
is_job=False,
feedback_callback=on_feedback
)
# Send goal
goal = Fibonacci.Goal(order=10)
result = await action_client.execute(goal)
Configuration¶
Node Configuration¶
config = {
"node_name": "my_node",
"namespace": "/my_namespace",
"use_sim_time": False,
"enable_rosout": True,
"parameter_overrides": {
"param1": "value1",
"param2": 42
}
}
await provider.initialize(config=config)
QoS Profiles¶
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
# Custom QoS
qos = QoSProfile(
reliability=ReliabilityPolicy.RELIABLE,
history=HistoryPolicy.KEEP_LAST,
depth=10
)
speaker = await provider.create_speaker(
name="reliable_topic",
message_type=String,
qos_profile=qos
)
Server/Client Flags¶
The provider automatically detects server/client mode based on callback presence, but you can explicitly set the flags:
Interface Type |
Flag |
Behavior |
|---|---|---|
Service Server |
|
Responds to service calls (callback required) |
Service Client |
|
Makes service calls (no callback) |
Publisher |
|
Publishes messages |
Subscriber |
|
Receives messages (callback required) |
Action Server |
|
Executes actions (callback required) |
Action Client |
|
Sends goals (no callback) |
API Reference¶
ROS2 Protocol Provider
Implements AbstractProtocolProvider for ROS2 transport. Provides distributed communication via DDS middleware.
- class vyra_base.com.transport.t_ros2.provider.ROS2Provider(module_name, module_id, protocol=ProtocolType.ROS2)[Quellcode]¶
Bases:
AbstractProtocolProviderProtocol provider for ROS2 transport.
Features: - Distributed communication via DDS - Quality of Service (QoS) policies - Discovery and introspection - SROS2 security support
Requirements: - ROS2 installation - rclpy package - Source /opt/ros/<distro>/setup.bash
Example
>>> # Initialize provider >>> provider = ROS2Provider(ProtocolType.ROS2) >>> >>> if await provider.check_availability(): ... await provider.initialize(config={ ... "node_name": "my_module", ... "namespace": "/vyra" ... }) ... ... # Create server (service server) ... async def handle_request(req): ... return {"result": req.value * 2} ... ... server = await provider.create_server( ... "calculate", ... handle_request, ... service_type="example_interfaces/srv/AddTwoInts" ... ) ... ... # Create publisher (topic publisher) ... publisher = await provider.create_publisher( ... "sensor_data", ... message_type="std_msgs/msg/String" ... )
- Parameter:
module_name (str)
module_id (str)
protocol (ProtocolType)
- __init__(module_name, module_id, protocol=ProtocolType.ROS2)[Quellcode]¶
Initialize ROS2 provider.
- Parameter:
protocol (
ProtocolType) – Protocol type (must be ROS2)node_name – Default node name
module_name (str)
module_id (str)
- async check_availability()[Quellcode]¶
Check if ROS2 is available.
- Rückgabe:
True if rclpy is installed and importable
- Rückgabetyp:
- async initialize(config=None)[Quellcode]¶
Initialize ROS2 provider.
- Parameter:
config (
Optional[Dict[str,Any]]) – Optional configuration - node_name: ROS2 node name - namespace: Node namespace - use_sim_time: Use simulation time - enable_rosout: Enable rosout logging - parameter_overrides: Node parameter overrides- Rückgabe:
True if initialization successful
- Rückgabetyp:
- async shutdown()[Quellcode]¶
Shutdown the provider and cleanup resources.
- Rückgabetyp:
- async create_publisher(name, **kwargs)[Quellcode]¶
Create ROS2 publisher.
- Rückgabetyp:
- Parameter:
name (str)
- async create_subscriber(name, subscriber_callback, **kwargs)[Quellcode]¶
Create ROS2 subscriber.
- Rückgabetyp:
- Parameter:
- async create_server(name, response_callback, **kwargs)[Quellcode]¶
Create ROS2 service server.
- Rückgabetyp:
- Parameter:
- async create_client(name, topic_builder=None, service_type=None, request_callback=None, **kwargs)[Quellcode]¶
Create ROS2 service client.
- Parameter:
name (
str) – Service nametopic_builder (
Optional[TopicBuilder]) – TopicBuilder instance. When provided (e.g. injected by the factory for cross-module proxy calls), it is used for both interface-type discovery and topic-name building, so that the correct module-specific interface paths are consulted. Falls back to the provider’s own_topic_builderwhen omitted.service_type (
Optional[type]) – Explicit ROS2 service type class. Resolved viatopic_builder.load_interface_typewhen not supplied.request_callback (
Optional[Callable]) – Optional async callback for responses.**kwargs – Additional parameters forwarded to
VyraClientImpl(node, qos_profile, …).
- Rückgabetyp:
- async create_action_server(name, handle_goal_request=None, handle_cancel_request=None, execution_callback=None, **kwargs)[Quellcode]¶
Create ROS2 action server.
- Rückgabetyp:
- Parameter:
- async create_action_client(name, direct_response_callback=None, feedback_callback=None, goal_callback=None, **kwargs)[Quellcode]¶
Create ROS2 action client.
- Rückgabetyp:
- Parameter:
- get_node()[Quellcode]¶
Get the ROS2 node.
- Rückgabetyp:
Node- Rückgabe:
rclpy Node instance
ROS2 VYRA Models Layer
VYRA abstractions for ROS2 communication. This layer wraps ROS2 functionality into VYRA’s unified interface.
- Components:
ROS2PublisherImpl: Publish-only pattern (wraps ROS2 topic publisher)
ROS2SubscriberImpl: Subscribe-only pattern (wraps ROS2 topic subscriber)
ROS2ServerImpl: Request-response server (wraps ROS2 service server)
ROS2ClientImpl: Request-response client (wraps ROS2 service client)
ROS2ActionServerImpl: Long-running task server (wraps ROS2 action server)
ROS2ActionClientImpl: Long-running task client (wraps ROS2 action client)
- Usage:
>>> from vyra_base.com.transport.t_ros2.vyra_models import ROS2PublisherImpl, ROS2SubscriberImpl >>> >>> # Publisher >>> publisher = ROS2PublisherImpl(name="sensor_data", node=node, message_type=Temperature) >>> await publisher.initialize() >>> await publisher.publish(temperature_msg) >>> >>> # Subscriber with callback >>> async def on_message(msg): ... print(f"Received: {msg}") >>> subscriber = ROS2SubscriberImpl(name="sensor_data", node=node, message_type=Temperature, subscriber_callback=on_message) >>> await subscriber.initialize()
- class vyra_base.com.transport.t_ros2.vyra_models.VyraPublisherImpl(name, topic_builder, node, message_type, qos_profile=None, **kwargs)[Quellcode]¶
Bases:
VyraPublisherVyra Publisher implementation.
Wraps Vyra topic publisher for one-way message publishing.
- Parameter:
name (str)
topic_builder (TopicBuilder)
node (Any)
message_type (type)
qos_profile (Any | None)
- __init__(name, topic_builder, node, message_type, qos_profile=None, **kwargs)[Quellcode]¶
- Parameter:
name (str)
topic_builder (TopicBuilder)
node (Any)
message_type (Any)
qos_profile (Any | None)
- async initialize()[Quellcode]¶
Initialize ROS2 publisher.
- Rückgabetyp:
- async shutdown()[Quellcode]¶
Shutdown ROS2 publisher.
- Rückgabetyp:
- async publish(message)[Quellcode]¶
Publish message (async).
- class vyra_base.com.transport.t_ros2.vyra_models.VyraSubscriberImpl(name, topic_builder, node, message_type, subscriber_callback=None, qos_profile=None, **kwargs)[Quellcode]¶
Bases:
VyraSubscriberVyra Subscriber implementation.
Wraps Vyra topic subscriber with async callback support.
- Parameter:
- __init__(name, topic_builder, node, message_type, subscriber_callback=None, qos_profile=None, **kwargs)[Quellcode]¶
- async initialize()[Quellcode]¶
Initialize ROS2 subscriber.
- Rückgabetyp:
- async shutdown()[Quellcode]¶
Shutdown ROS2 subscriber.
- Rückgabetyp:
- async subscribe()[Quellcode]¶
Start subscribing (already active after initialize for ROS2).
Note: ROS2 subscribers are automatically active after creation. This method is provided for API consistency.
- Rückgabetyp:
- class vyra_base.com.transport.t_ros2.vyra_models.VyraServerImpl(name, topic_builder, node, service_type, response_callback=None, qos_profile=None, **kwargs)[Quellcode]¶
Bases:
VyraServerVyra Server implementation.
Wraps Vyra service server with async callback support.
- Parameter:
- __init__(name, topic_builder, node, service_type, response_callback=None, qos_profile=None, **kwargs)[Quellcode]¶
- async initialize()[Quellcode]¶
Initialize ROS2 service server.
- Rückgabetyp:
- async shutdown()[Quellcode]¶
Shutdown ROS2 server.
- Rückgabetyp:
- async serve()[Quellcode]¶
Start serving (already active after initialize for ROS2).
Note: ROS2 servers are automatically active after creation.
- Rückgabetyp:
- class vyra_base.com.transport.t_ros2.vyra_models.VyraClientImpl(name, topic_builder, node, service_type, request_callback=None, qos_profile=None, **kwargs)[Quellcode]¶
Bases:
VyraClientVyra Client implementation.
Wraps Vyra service client for async request-response.
- Parameter:
- __init__(name, topic_builder, node, service_type, request_callback=None, qos_profile=None, **kwargs)[Quellcode]¶
- async initialize()[Quellcode]¶
Initialize ROS2 service client.
- Rückgabetyp:
- async shutdown()[Quellcode]¶
Shutdown ROS2 client.
- Rückgabetyp:
- async call(request, timeout=5.0)[Quellcode]¶
Call service and await response (async).
Accepts either a pre-built ROS2 request message or a plain Python dict. When a dict is passed it is used to populate a freshly instantiated
<ServiceType>.Requestobject field-by-field (with case-insensitive key matching so callers can use eithert1orT1). The raw ROS2 response is automatically converted to an ordered dict before being returned so that higher layers (e.g. the proxy router) can treat all protocol responses uniformly.- Parameter:
- Rückgabe:
Response fields as a Python dict.
- Rückgabetyp:
- Verursacht:
InterfaceError – If not initialized or call fails.
TimeoutError – If the service is not reachable within timeout.
- class vyra_base.com.transport.t_ros2.vyra_models.VyraActionServerImpl(name, topic_builder, node, action_type, handle_goal_request=None, handle_cancel_request=None, execution_callback=None, **kwargs)[Quellcode]¶
Bases:
VyraActionServerVyra Action Server implementation.
Wraps Vyra action server with async callback support for goal handling.
- Parameter:
- __init__(name, topic_builder, node, action_type, handle_goal_request=None, handle_cancel_request=None, execution_callback=None, **kwargs)[Quellcode]¶
- async initialize()[Quellcode]¶
Initialize ROS2 action server.
- Rückgabetyp:
- async shutdown()[Quellcode]¶
Shutdown ROS2 action server.
- Rückgabetyp:
- async start()[Quellcode]¶
Start action server (already active after initialize for ROS2).
- Rückgabetyp:
- class vyra_base.com.transport.t_ros2.vyra_models.VyraActionClientImpl(name, topic_builder, node, action_type, direct_response_callback=None, feedback_callback=None, goal_callback=None, **kwargs)[Quellcode]¶
Bases:
VyraActionClientVyra Action Client implementation.
Wraps Vyra action client with async callback support.
- Parameter:
- __init__(name, topic_builder, node, action_type, direct_response_callback=None, feedback_callback=None, goal_callback=None, **kwargs)[Quellcode]¶
- async initialize()[Quellcode]¶
Initialize ROS2 action client.
- Rückgabetyp:
- async shutdown()[Quellcode]¶
Shutdown ROS2 action client.
- Rückgabetyp:
- async send_goal(goal, **kwargs)[Quellcode]¶
Send goal to action server (async).
- Parameter:
goal (
Any) – Goal message**kwargs – Additional parameters (timeout, etc.)
- Rückgabetyp:
- Rückgabe:
Result message
- Verursacht:
InterfaceError – If not initialized or send fails
TimeoutError – If goal times out
- async cancel_goal(goal_handle)[Quellcode]¶
Cancel a running goal.
Dependencies¶
rclpy(ROS2 Python client library)rclpy.node(Node management)rclpy.qos(Quality of Service)rclpy.action(Action interfaces)
See Also¶
Interface Factory - InterfaceFactory for protocol-agnostic usage
Redis Transport Module - Redis transport provider
UDS Transport Module - Unix Domain Socket transport provider