ROS2 Transport Module

Provides ROS2/DDS-based transport implementation with layered architecture.

Overview

The ROS2 transport module implements the AbstractProtocolProvider interface for Robot Operating System 2 (ROS2) communication using DDS middleware.

Architecture Layers:

  • communication/: Core ROS2functionality (Services, Topics, Actions)

  • vyra_models/: VYRA abstractions (ROS2Callable, ROS2Speaker, ROS2Job)

  • node.py: ROS2 node management and lifecycle

  • provider.py: Protocol provider implementation

Features:

  • ✅ Service-based request-response (ROS2Callable)

  • ✅ Topic-based Pub/Sub (ROS2Speaker)

  • ✅ Action-based long-running tasks (ROS2Job)

  • ✅ Server/Client pattern with is_callable and is_job flags

  • ✅ Type conversion utilities

  • ✅ Node lifecycle management

  • ✅ QoS policy configuration

  • ✅ SROS2 security support

  • ✅ Dynamic interface loading

Usage

Basic Provider Setup

from vyra_base.com.transport.t_ros2 import ROS2Provider, ROS2_AVAILABLE
from vyra_base.com.core.types import ProtocolType

if ROS2_AVAILABLE:
    # Create provider
    provider = ROS2Provider(
        module_name="my_module",
        module_id="abc123",
        protocol=ProtocolType.ROS2
    )

    # Initialize with config
    await provider.initialize(config={
        "node_name": "my_node",
        "namespace": "/my_namespace"
    })

Create Service Server

from example_interfaces.srv import AddTwoInts

# Server callback
async def handle_request(request, response):
    response.sum = request.a + request.b
    return response

# Create service server (is_callable=True)
server = await provider.create_callable(
    name="add_service",
    callback=handle_request,
    service_type=AddTwoInts
)

Create Service Client

# Create service client (is_callable=False)
client = await provider.create_callable(
    name="add_service",
    callback=None,  # No callback for client
    service_type=AddTwoInts,
    is_callable=False
)

# Call service
request = AddTwoInts.Request(a=5, b=3)
response = await client.call(request, timeout=5.0)

Create Publisher

from std_msgs.msg import String

# Create publisher (is_publisher=True)
publisher = await provider.create_speaker(
    name="my_topic",
    message_type=String,
    is_publisher=True,
    qos_profile=10
)

# Publish message
await publisher.shout(String(data="Hello"))

Create Subscriber

# Subscriber callback
def on_message(msg):
    print(f"Received: {msg.data}")

# Create subscriber (is_publisher=False)
subscriber = await provider.create_speaker(
    name="my_topic",
    message_type=String,
    is_publisher=False,
    qos_profile=10
)

# Start listening
await subscriber.listen(on_message)

Create Action Server

from example_interfaces.action import Fibonacci

# Action server callback
async def execute_fibonacci(goal, feedback_callback=None):
    sequence = [0, 1]
    for i in range(1, goal.order):
        sequence.append(sequence[-1] + sequence[-2])
        if feedback_callback:
            await feedback_callback({"sequence": sequence})
    return {"sequence": sequence}

# Create action server (is_job=True)
action_server = await provider.create_job(
    name="fibonacci",
    callback=execute_fibonacci,
    action_type=Fibonacci
)

Create Action Client

# Feedback callback
def on_feedback(feedback):
    print(f"Progress: {feedback['sequence']}")

# Create action client (is_job=False)
action_client = await provider.create_job(
    name="fibonacci",
    callback=None,  # No callback for client
    action_type=Fibonacci,
    is_job=False,
    feedback_callback=on_feedback
)

# Send goal
goal = Fibonacci.Goal(order=10)
result = await action_client.execute(goal)

Configuration

Node Configuration

config = {
    "node_name": "my_node",
    "namespace": "/my_namespace",
    "use_sim_time": False,
    "enable_rosout": True,
    "parameter_overrides": {
        "param1": "value1",
        "param2": 42
    }
}

await provider.initialize(config=config)

QoS Profiles

from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy

# Custom QoS
qos = QoSProfile(
    reliability=ReliabilityPolicy.RELIABLE,
    history=HistoryPolicy.KEEP_LAST,
    depth=10
)

speaker = await provider.create_speaker(
    name="reliable_topic",
    message_type=String,
    qos_profile=qos
)

Server/Client Flags

The provider automatically detects server/client mode based on callback presence, but you can explicitly set the flags:

Interface Type

Flag

Behavior

Service Server

is_callable=True

Responds to service calls (callback required)

Service Client

is_callable=False

Makes service calls (no callback)

Publisher

is_publisher=True

Publishes messages

Subscriber

is_publisher=False

Receives messages (callback required)

Action Server

is_job=True

Executes actions (callback required)

Action Client

is_job=False

Sends goals (no callback)

API Reference

ROS2 Protocol Provider

Implements AbstractProtocolProvider for ROS2 transport. Provides distributed communication via DDS middleware.

class vyra_base.com.transport.t_ros2.provider.ROS2Provider(module_name, module_id, protocol=ProtocolType.ROS2)[Quellcode]

Bases: AbstractProtocolProvider

Protocol provider for ROS2 transport.

Features: - Distributed communication via DDS - Quality of Service (QoS) policies - Discovery and introspection - SROS2 security support

Requirements: - ROS2 installation - rclpy package - Source /opt/ros/<distro>/setup.bash

Example

>>> # Initialize provider
>>> provider = ROS2Provider(ProtocolType.ROS2)
>>>
>>> if await provider.check_availability():
...     await provider.initialize(config={
...         "node_name": "my_module",
...         "namespace": "/vyra"
...     })
...
...     # Create server (service server)
...     async def handle_request(req):
...         return {"result": req.value * 2}
...
...     server = await provider.create_server(
...         "calculate",
...         handle_request,
...         service_type="example_interfaces/srv/AddTwoInts"
...     )
...
...     # Create publisher (topic publisher)
...     publisher = await provider.create_publisher(
...         "sensor_data",
...         message_type="std_msgs/msg/String"
...     )
Parameter:
__init__(module_name, module_id, protocol=ProtocolType.ROS2)[Quellcode]

Initialize ROS2 provider.

Parameter:
  • protocol (ProtocolType) – Protocol type (must be ROS2)

  • node_name – Default node name

  • module_name (str)

  • module_id (str)

async check_availability()[Quellcode]

Check if ROS2 is available.

Rückgabe:

True if rclpy is installed and importable

Rückgabetyp:

bool

async initialize(config=None)[Quellcode]

Initialize ROS2 provider.

Parameter:

config (Optional[Dict[str, Any]]) – Optional configuration - node_name: ROS2 node name - namespace: Node namespace - use_sim_time: Use simulation time - enable_rosout: Enable rosout logging - parameter_overrides: Node parameter overrides

Rückgabe:

True if initialization successful

Rückgabetyp:

bool

async shutdown()[Quellcode]

Shutdown the provider and cleanup resources.

Rückgabetyp:

None

async create_publisher(name, **kwargs)[Quellcode]

Create ROS2 publisher.

Rückgabetyp:

VyraPublisher

Parameter:

name (str)

async create_subscriber(name, subscriber_callback, **kwargs)[Quellcode]

Create ROS2 subscriber.

Rückgabetyp:

VyraSubscriber

Parameter:
async create_server(name, response_callback, **kwargs)[Quellcode]

Create ROS2 service server.

Rückgabetyp:

VyraServer

Parameter:
async create_client(name, topic_builder=None, service_type=None, request_callback=None, **kwargs)[Quellcode]

Create ROS2 service client.

Parameter:
  • name (str) – Service name

  • topic_builder (Optional[TopicBuilder]) – TopicBuilder instance. When provided (e.g. injected by the factory for cross-module proxy calls), it is used for both interface-type discovery and topic-name building, so that the correct module-specific interface paths are consulted. Falls back to the provider’s own _topic_builder when omitted.

  • service_type (Optional[type]) – Explicit ROS2 service type class. Resolved via topic_builder.load_interface_type when not supplied.

  • request_callback (Optional[Callable]) – Optional async callback for responses.

  • **kwargs – Additional parameters forwarded to VyraClientImpl (node, qos_profile, …).

Rückgabetyp:

VyraClient

async create_action_server(name, handle_goal_request=None, handle_cancel_request=None, execution_callback=None, **kwargs)[Quellcode]

Create ROS2 action server.

Rückgabetyp:

VyraActionServer

Parameter:
async create_action_client(name, direct_response_callback=None, feedback_callback=None, goal_callback=None, **kwargs)[Quellcode]

Create ROS2 action client.

Rückgabetyp:

VyraActionClient

Parameter:
get_node()[Quellcode]

Get the ROS2 node.

Rückgabetyp:

Node

Rückgabe:

rclpy Node instance

ROS2 VYRA Models Layer

VYRA abstractions for ROS2 communication. This layer wraps ROS2 functionality into VYRA’s unified interface.

Components:
  • ROS2PublisherImpl: Publish-only pattern (wraps ROS2 topic publisher)

  • ROS2SubscriberImpl: Subscribe-only pattern (wraps ROS2 topic subscriber)

  • ROS2ServerImpl: Request-response server (wraps ROS2 service server)

  • ROS2ClientImpl: Request-response client (wraps ROS2 service client)

  • ROS2ActionServerImpl: Long-running task server (wraps ROS2 action server)

  • ROS2ActionClientImpl: Long-running task client (wraps ROS2 action client)

Usage:
>>> from vyra_base.com.transport.t_ros2.vyra_models import ROS2PublisherImpl, ROS2SubscriberImpl
>>>
>>> # Publisher
>>> publisher = ROS2PublisherImpl(name="sensor_data", node=node, message_type=Temperature)
>>> await publisher.initialize()
>>> await publisher.publish(temperature_msg)
>>>
>>> # Subscriber with callback
>>> async def on_message(msg):
...     print(f"Received: {msg}")
>>> subscriber = ROS2SubscriberImpl(name="sensor_data", node=node, message_type=Temperature, subscriber_callback=on_message)
>>> await subscriber.initialize()
class vyra_base.com.transport.t_ros2.vyra_models.VyraPublisherImpl(name, topic_builder, node, message_type, qos_profile=None, **kwargs)[Quellcode]

Bases: VyraPublisher

Vyra Publisher implementation.

Wraps Vyra topic publisher for one-way message publishing.

Parameter:
__init__(name, topic_builder, node, message_type, qos_profile=None, **kwargs)[Quellcode]
Parameter:
async initialize()[Quellcode]

Initialize ROS2 publisher.

Rückgabetyp:

bool

async shutdown()[Quellcode]

Shutdown ROS2 publisher.

Rückgabetyp:

None

async publish(message)[Quellcode]

Publish message (async).

Parameter:

message (Any) – Message to publish (ROS2 message instance)

Rückgabe:

True if published successfully

Rückgabetyp:

bool

class vyra_base.com.transport.t_ros2.vyra_models.VyraSubscriberImpl(name, topic_builder, node, message_type, subscriber_callback=None, qos_profile=None, **kwargs)[Quellcode]

Bases: VyraSubscriber

Vyra Subscriber implementation.

Wraps Vyra topic subscriber with async callback support.

Parameter:
__init__(name, topic_builder, node, message_type, subscriber_callback=None, qos_profile=None, **kwargs)[Quellcode]
Parameter:
async initialize()[Quellcode]

Initialize ROS2 subscriber.

Rückgabetyp:

bool

async shutdown()[Quellcode]

Shutdown ROS2 subscriber.

Rückgabetyp:

None

async subscribe()[Quellcode]

Start subscribing (already active after initialize for ROS2).

Note: ROS2 subscribers are automatically active after creation. This method is provided for API consistency.

Rückgabetyp:

None

class vyra_base.com.transport.t_ros2.vyra_models.VyraServerImpl(name, topic_builder, node, service_type, response_callback=None, qos_profile=None, **kwargs)[Quellcode]

Bases: VyraServer

Vyra Server implementation.

Wraps Vyra service server with async callback support.

Parameter:
__init__(name, topic_builder, node, service_type, response_callback=None, qos_profile=None, **kwargs)[Quellcode]
Parameter:
async initialize()[Quellcode]

Initialize ROS2 service server.

Rückgabetyp:

bool

async shutdown()[Quellcode]

Shutdown ROS2 server.

Rückgabetyp:

None

async serve()[Quellcode]

Start serving (already active after initialize for ROS2).

Note: ROS2 servers are automatically active after creation.

Rückgabetyp:

None

class vyra_base.com.transport.t_ros2.vyra_models.VyraClientImpl(name, topic_builder, node, service_type, request_callback=None, qos_profile=None, **kwargs)[Quellcode]

Bases: VyraClient

Vyra Client implementation.

Wraps Vyra service client for async request-response.

Parameter:
__init__(name, topic_builder, node, service_type, request_callback=None, qos_profile=None, **kwargs)[Quellcode]
Parameter:
async initialize()[Quellcode]

Initialize ROS2 service client.

Rückgabetyp:

bool

async shutdown()[Quellcode]

Shutdown ROS2 client.

Rückgabetyp:

None

async call(request, timeout=5.0)[Quellcode]

Call service and await response (async).

Accepts either a pre-built ROS2 request message or a plain Python dict. When a dict is passed it is used to populate a freshly instantiated <ServiceType>.Request object field-by-field (with case-insensitive key matching so callers can use either t1 or T1). The raw ROS2 response is automatically converted to an ordered dict before being returned so that higher layers (e.g. the proxy router) can treat all protocol responses uniformly.

Parameter:
  • request (Any) – ROS2 request message or dict of request fields.

  • timeout (float) – Timeout in seconds.

Rückgabe:

Response fields as a Python dict.

Rückgabetyp:

Any

Verursacht:
class vyra_base.com.transport.t_ros2.vyra_models.VyraActionServerImpl(name, topic_builder, node, action_type, handle_goal_request=None, handle_cancel_request=None, execution_callback=None, **kwargs)[Quellcode]

Bases: VyraActionServer

Vyra Action Server implementation.

Wraps Vyra action server with async callback support for goal handling.

Parameter:
__init__(name, topic_builder, node, action_type, handle_goal_request=None, handle_cancel_request=None, execution_callback=None, **kwargs)[Quellcode]
Parameter:
async initialize()[Quellcode]

Initialize ROS2 action server.

Rückgabetyp:

bool

async shutdown()[Quellcode]

Shutdown ROS2 action server.

Rückgabetyp:

None

async start()[Quellcode]

Start action server (already active after initialize for ROS2).

Rückgabetyp:

None

class vyra_base.com.transport.t_ros2.vyra_models.VyraActionClientImpl(name, topic_builder, node, action_type, direct_response_callback=None, feedback_callback=None, goal_callback=None, **kwargs)[Quellcode]

Bases: VyraActionClient

Vyra Action Client implementation.

Wraps Vyra action client with async callback support.

Parameter:
__init__(name, topic_builder, node, action_type, direct_response_callback=None, feedback_callback=None, goal_callback=None, **kwargs)[Quellcode]
Parameter:
async initialize()[Quellcode]

Initialize ROS2 action client.

Rückgabetyp:

bool

async shutdown()[Quellcode]

Shutdown ROS2 action client.

Rückgabetyp:

None

async send_goal(goal, **kwargs)[Quellcode]

Send goal to action server (async).

Parameter:
  • goal (Any) – Goal message

  • **kwargs – Additional parameters (timeout, etc.)

Rückgabetyp:

Any

Rückgabe:

Result message

Verursacht:
async cancel_goal(goal_handle)[Quellcode]

Cancel a running goal.

Parameter:

goal_handle (Any) – Handle returned from send_goal

Rückgabe:

True if cancellation accepted

Rückgabetyp:

bool

Dependencies

  • rclpy (ROS2 Python client library)

  • rclpy.node (Node management)

  • rclpy.qos (Quality of Service)

  • rclpy.action (Action interfaces)

See Also