ROS2 Communication¶
VYRA uses ROS2 for inter-module communication. The framework abstracts the ROS2 API and provides simple interfaces for Services, Topics, and Actions.
Concepts¶
ROS2 has three main communication patterns:
Services (Request/Response):
Job = Service Client (calls service)
Callable = Service Server (provides service)
Synchronous request-response communication
Topics (Publish/Subscribe):
Speaker = Publisher (publishes)
Listener = Subscriber (receives)
Asynchronous broadcast communication
Actions (Long-running operations):
Action Client (starts action)
Action Server (executes action)
With feedback and cancel capability
Services: Job & Callable¶
Job - Service Client¶
A Job calls services on other modules:
from vyra_base.com.datalayer.interface_factoy import create_vyra_job
from example_interfaces.srv import AddTwoInts
# Create job
job = create_vyra_job(
node=entity.node,
service_name="/calculator/add_two_ints",
service_type=AddTwoInts
)
# Prepare request
request = AddTwoInts.Request()
request.a = 5
request.b = 7
# Call service asynchronously
response = await job.call_async(request)
print(f"Result: {response.sum}")
Parameters:
node: The ROS2 node (typicallyentity.node)service_name: Service name (with namespace)service_type: ROS2 service type (imported from*_interfaces.srv)
Callable - Service Server¶
A Callable provides a service for other modules.
Use the @remote_service decorato:
from vyra_base.com.datalayer.interface_factoy import remote_service
from vyra_base.state import OperationalStateMachine
class Calculato(OperationalStateMachine):
def __init__(self, unified_state_machine):
super().__init__(unified_state_machine)
@remote_service()
async def add_two_ints(self, request, response):
"""Adds two numbers"""
response.sum = request.a + request.b
return response
@remote_service()
async def get_status(self, request, response):
"""Returns the current status"""
response.status = "operational"
return response
Automatic Regisration:
Methods with @remote_service are automatically registered as ROS2 services,
when the JSON metadata is correctly defined (see ROS2 Interfaces).
Parameters:
request: Incoming service request (defined by service type)response: Outgoing service response (returned)
Topics: Speaker & Listener¶
Speaker - Publisher¶
A Speaker publishes messages on a topic:
from vyra_base.com.datalayer.interface_factoy import create_vyra_speaker
from std_msgs.msg import String
# Create speaker
speaker = create_vyra_speaker(
node=entity.node,
topic_name="/status_updates",
topic_type=String,
qos_profile=10 # Queue-Size
)
# Publish message
msg = String()
msg.data = "System running"
speaker.shout(msg)
Parameters:
node: The ROS2 nodetopic_name: Topic nametopic_type: ROS2 message type (imported from*_interfaces.msg)qos_profile: Quality of Service (queue size or QoS object)
Listener - Subscriber¶
A Listener receives messages from a topic:
from vyra_base.com.transport.ros2.subscriber import Subscriber
from std_msgs.msg import String
# Define callback function
def on_message_received(msg):
print(f"Received: {msg.data}")
# Create listener
listener = VyraSubscriber(
node=entity.node,
topic_name="/status_updates",
topic_type=String,
callback=on_message_received,
qos_profile=10
)
# Subscription create
listener.create_subscription()
Parameters:
callback: Function called for each messageOther parameters same as Speaker
Practical Examples¶
Inter-Module Service Call¶
Module A (Server):
class RobotController(OperationalStateMachine):
@remote_service()
async def move_to_position(self, request, response):
"""Moves robot to position"""
x, y = request.target_x, request.target_y
# Execute movement
success = await self.robot.move_to(x, y)
response.success = success
response.message = "Movement complete"
return response
Module B (Client):
# Create job
move_job = create_vyra_job(
node=entity.node,
service_name="/robot_controller/move_to_position",
service_type=MoveToPosition
)
# Request movement
request = MoveToPosition.Request()
request.target_x = 10.5
request.target_y = 5.3
response = await move_job.call_async(request)
if response.success:
print("Robot moved successfuly")
Topic-based Status Messages¶
Publisher (Module A):
# Speaker for status updates
status_speaker = create_vyra_speaker(
node=entity.node,
topic_name="/system_status",
topic_type=SystemStatus
)
# Publish status regularly
async def publish_status_loop():
while True:
status = SystemStatus()
status.cpu_usage = get_cpu_usage()
status.memory_usage = get_memory_usage()
status.timestamp = get_current_time()
status_speaker.shout(status)
await asyncio.sleep(1.0) # Every second
Subscriber (Module B):
# Callback for status reception
def on_status_received(msg):
if msg.cpu_usage > 80:
logger.warning(f"High CPU load: {msg.cpu_usage}%")
# Create listener
status_listener = VyraSubscriber(
node=entity.node,
topic_name="/system_status",
topic_type=SystemStatus,
callback=on_status_received
)
status_listener.create_subscription()
Interface Definition¶
ROS2 interfaces are defined via JSON metadata.
Example (config/service_interfaces.json):
{
"services": [
{
"name": "add_two_ints",
"type": "example_interfaces/srv/AddTwoInts",
"description": "Adds two integers"
},
{
"name": "get_status",
"type": "std_srvs/srv/Trigger",
"description": "Returns current status"
}
]
}
Thise metadata are loaded at entity start and automatically registered:
# In _base_.py
interfaces = await entity.load_interfaces_from_config()
await entity.set_interfaces(interfaces)
For more Details see ROS2 Interfaces.
Quality of Service (QoS)¶
ROS2 uses QoS profiles to configure communication:
from rclpy.qos import QoSProfile, ReliabilityPolicy, DurabilityPolicy
# Custom QoS profile
qos = QoSProfile(
depth=10, # Queue size
reliability=ReliabilityPolicy.RELIABLE, # or BEST_EFFORT
durability=DurabilityPolicy.VOLATILE # or TRANSIENT_LOCAL
)
# Use with Speaker
speaker = create_vyra_speaker(
node=entity.node,
topic_name="/important_data",
topic_type=Data,
qos_profile=qos
)
Typical Profiles:
Sensor Data:
BEST_EFFORT(fast, loss acceptable)Commands:
RELIABLE(guaranteed delivery)Persistent Data:
TRANSIENT_LOCAL(new subscribers receive last message)
Best Practices¶
✅ Recommended:
Use meaningful service/topic names with namespace
Define interfaces in JSON metadata
Use
@remote_servicefor automatic regisrationImplement timeouts for service calls
Use QoS profiles appropriate for the use case
❌ Avoid:
Manual ROS2 node creation (use
entity.node)Very large messages (> 1 MB) via topics
High-frequency service calls (> 100 Hz, use topics)
Blocking calls without timeout
Error Handling¶
from rclpy.task import Future
try:
# Service call with timeout
response = await asyncio.wait_for(
job.call_async(request),
timeout=5.0 # 5 seconds
)
except asyncio.TimeoutError:
logger.error("Service call timeout")
except Exception as e:
logger.error(f"Service error: {e}")
Further Information¶
IPC - Inter Process Communication - IPC within a module
Feeders - Automatic Data Publication - Automatic feeders
ROS2 Interfaces - Interface definitions
../vyra_base.com.datalayer - API Reference
ROS2 Documentation: https://docs.ros.org/en/kilted/