ROS2 Communication

VYRA uses ROS2 for inter-module communication. The framework abstracts the ROS2 API and provides simple interfaces for Services, Topics, and Actions.

Concepts

ROS2 has three main communication patterns:

  1. Services (Request/Response):

    • Job = Service Client (calls service)

    • Callable = Service Server (provides service)

    • Synchronous request-response communication

  2. Topics (Publish/Subscribe):

    • Speaker = Publisher (publishes)

    • Listener = Subscriber (receives)

    • Asynchronous broadcast communication

  3. Actions (Long-running operations):

    • Action Client (starts action)

    • Action Server (executes action)

    • With feedback and cancel capability

Services: Job & Callable

Job - Service Client

A Job calls services on other modules:

from vyra_base.com.datalayer.interface_factoy import create_vyra_job
from example_interfaces.srv import AddTwoInts

# Create job
job = create_vyra_job(
    node=entity.node,
    service_name="/calculator/add_two_ints",
    service_type=AddTwoInts
)

# Prepare request
request = AddTwoInts.Request()
request.a = 5
request.b = 7

# Call service asynchronously
response = await job.call_async(request)
print(f"Result: {response.sum}")

Parameters:

  • node: The ROS2 node (typically entity.node)

  • service_name: Service name (with namespace)

  • service_type: ROS2 service type (imported from *_interfaces.srv)

Callable - Service Server

A Callable provides a service for other modules. Use the @remote_service decorato:

from vyra_base.com.datalayer.interface_factoy import remote_service
from vyra_base.state import OperationalStateMachine

class Calculato(OperationalStateMachine):
    def __init__(self, unified_state_machine):
        super().__init__(unified_state_machine)

    @remote_service()
    async def add_two_ints(self, request, response):
        """Adds two numbers"""
        response.sum = request.a + request.b
        return response

    @remote_service()
    async def get_status(self, request, response):
        """Returns the current status"""
        response.status = "operational"
        return response

Automatic Regisration:

Methods with @remote_service are automatically registered as ROS2 services, when the JSON metadata is correctly defined (see ROS2 Interfaces).

Parameters:

  • request: Incoming service request (defined by service type)

  • response: Outgoing service response (returned)

Topics: Speaker & Listener

Speaker - Publisher

A Speaker publishes messages on a topic:

from vyra_base.com.datalayer.interface_factoy import create_vyra_speaker
from std_msgs.msg import String

# Create speaker
speaker = create_vyra_speaker(
    node=entity.node,
    topic_name="/status_updates",
    topic_type=String,
    qos_profile=10  # Queue-Size
)

# Publish message
msg = String()
msg.data = "System running"
speaker.shout(msg)

Parameters:

  • node: The ROS2 node

  • topic_name: Topic name

  • topic_type: ROS2 message type (imported from *_interfaces.msg)

  • qos_profile: Quality of Service (queue size or QoS object)

Listener - Subscriber

A Listener receives messages from a topic:

from vyra_base.com.transport.ros2.subscriber import Subscriber
from std_msgs.msg import String

# Define callback function
def on_message_received(msg):
    print(f"Received: {msg.data}")

# Create listener
listener = VyraSubscriber(
    node=entity.node,
    topic_name="/status_updates",
    topic_type=String,
    callback=on_message_received,
    qos_profile=10
)

# Subscription create
listener.create_subscription()

Parameters:

  • callback: Function called for each message

  • Other parameters same as Speaker

Practical Examples

Inter-Module Service Call

Module A (Server):

class RobotController(OperationalStateMachine):
    @remote_service()
    async def move_to_position(self, request, response):
        """Moves robot to position"""
        x, y = request.target_x, request.target_y

        # Execute movement
        success = await self.robot.move_to(x, y)

        response.success = success
        response.message = "Movement complete"
        return response

Module B (Client):

# Create job
move_job = create_vyra_job(
    node=entity.node,
    service_name="/robot_controller/move_to_position",
    service_type=MoveToPosition
)

# Request movement
request = MoveToPosition.Request()
request.target_x = 10.5
request.target_y = 5.3

response = await move_job.call_async(request)
if response.success:
    print("Robot moved successfuly")

Topic-based Status Messages

Publisher (Module A):

# Speaker for status updates
status_speaker = create_vyra_speaker(
    node=entity.node,
    topic_name="/system_status",
    topic_type=SystemStatus
)

# Publish status regularly
async def publish_status_loop():
    while True:
        status = SystemStatus()
        status.cpu_usage = get_cpu_usage()
        status.memory_usage = get_memory_usage()
        status.timestamp = get_current_time()

        status_speaker.shout(status)
        await asyncio.sleep(1.0)  # Every second

Subscriber (Module B):

# Callback for status reception
def on_status_received(msg):
    if msg.cpu_usage > 80:
        logger.warning(f"High CPU load: {msg.cpu_usage}%")

# Create listener
status_listener = VyraSubscriber(
    node=entity.node,
    topic_name="/system_status",
    topic_type=SystemStatus,
    callback=on_status_received
)
status_listener.create_subscription()

Interface Definition

ROS2 interfaces are defined via JSON metadata.

Example (config/service_interfaces.json):

{
    "services": [
        {
            "name": "add_two_ints",
            "type": "example_interfaces/srv/AddTwoInts",
            "description": "Adds two integers"
        },
        {
            "name": "get_status",
            "type": "std_srvs/srv/Trigger",
            "description": "Returns current status"
        }
    ]
}

Thise metadata are loaded at entity start and automatically registered:

# In _base_.py
interfaces = await entity.load_interfaces_from_config()
await entity.set_interfaces(interfaces)

For more Details see ROS2 Interfaces.

Quality of Service (QoS)

ROS2 uses QoS profiles to configure communication:

from rclpy.qos import QoSProfile, ReliabilityPolicy, DurabilityPolicy

# Custom QoS profile
qos = QoSProfile(
    depth=10,  # Queue size
    reliability=ReliabilityPolicy.RELIABLE,  # or BEST_EFFORT
    durability=DurabilityPolicy.VOLATILE  # or TRANSIENT_LOCAL
)

# Use with Speaker
speaker = create_vyra_speaker(
    node=entity.node,
    topic_name="/important_data",
    topic_type=Data,
    qos_profile=qos
)

Typical Profiles:

  • Sensor Data: BEST_EFFORT (fast, loss acceptable)

  • Commands: RELIABLE (guaranteed delivery)

  • Persistent Data: TRANSIENT_LOCAL (new subscribers receive last message)

Best Practices

Recommended:

  • Use meaningful service/topic names with namespace

  • Define interfaces in JSON metadata

  • Use @remote_service for automatic regisration

  • Implement timeouts for service calls

  • Use QoS profiles appropriate for the use case

Avoid:

  • Manual ROS2 node creation (use entity.node)

  • Very large messages (> 1 MB) via topics

  • High-frequency service calls (> 100 Hz, use topics)

  • Blocking calls without timeout

Error Handling

from rclpy.task import Future

try:
    # Service call with timeout
    response = await asyncio.wait_for(
        job.call_async(request),
        timeout=5.0  # 5 seconds
    )
except asyncio.TimeoutError:
    logger.error("Service call timeout")
except Exception as e:
    logger.error(f"Service error: {e}")

Further Information