Communication Decorators¶
Protocol-agnostic decorators that transform component methods into communication interfaces.
Communication Decorators
Modern decorators for multi-protocol communication with late-binding support.
The decorator system uses a two-phase initialization pattern: 1. Phase 1 (Decoration): Create HandlerBlueprint, register in CallbackRegistry 2. Phase 2 (Binding): Bind decorated method to blueprint during component initialization
This decouples interface definition from implementation, enabling: - Dynamic interface registration - Late binding of callbacks - Better testing (mock callbacks easily) - Clear separation of concerns
Example
>>> class MyComponent:
... @remote_service()
... async def calculate(self, request, response=None):
... return {"result": request["x"] + request["y"]}
The decorator creates a ServiceBlueprint, but binding happens when the component is registered with an entity.
- vyra_base.com.core.decorators.remote_publisher(name=None, protocols=None, auto_register=True, namespace=None, **kwargs)[source]¶
Decorator for pub/sub communication (publisher side).
Creates a PublisherBlueprint and registers it in CallbackRegistry. The decorated method becomes a publisher interface.
- Parameters:
Example
>>> class Component: ... @remote_publisher(protocols=[ProtocolType.REDIS]) ... async def publish_status(self, message): ... '''This method will become a publisher''' ... pass ... ... async def some_task(self): ... # Publish status update ... await self.publish_status({"state": "running"})
Note
Publisher creation happens via InterfaceFactory during binding
The decorated method is replaced with actual publishing logic
- class vyra_base.com.core.decorators.ActionServerDecorator[source]¶
Bases:
objectClass-based decorator for ActionServer with multi-callback support.
Provides sub-decorators for separate goal acceptance, cancellation, and execution callbacks. This enables clean separation of concerns for action server implementations.
- Usage:
>>> class Component: ... @remote_actionServer.on_goal(name="process_batch") ... async def accept_goal(self, goal_request): ... '''Validate and accept/reject the goal''' ... return True # Accept goal ... ... @remote_actionServer.on_cancel(name="process_batch") ... async def handle_cancel(self, goal_handle): ... '''Handle cancellation request''' ... return True # Accept cancellation ... ... @remote_actionServer.execute(name="process_batch") ... async def execute_batch(self, goal_handle): ... '''Execute the action with feedback''' ... for i in range(10): ... if goal_handle.is_cancel_requested(): ... goal_handle.canceled() ... return {"cancelled": True} ... goal_handle.publish_feedback({"progress": (i+1)*10}) ... goal_handle.succeed() ... return {"result": "completed"}
- static on_goal(name, protocols=None, auto_register=True, namespace=None, **kwargs)[source]¶
Decorator for goal acceptance callback.
- Parameters:
The decorated method should: - Accept (self, goal_request) parameters - Return bool (True=accept, False=reject)
- static on_cancel(name, protocols=None, auto_register=False, namespace=None, **kwargs)[source]¶
Decorator for cancellation callback.
- Parameters:
The decorated method should: - Accept (self, goal_handle) parameters - Return bool (True=accept cancel, False=reject cancel)
- static execute(name, protocols=None, auto_register=False, namespace=None, **kwargs)[source]¶
Decorator for action execution callback.
- Parameters:
The decorated method should: - Accept (self, goal_handle) parameters - Return Dict with result data - Use goal_handle.publish_feedback() for progress updates - Use goal_handle.succeed()/abort()/canceled() to complete
- vyra_base.com.core.decorators.remote_subscriber(name=None, protocols=None, auto_register=True, namespace=None, **kwargs)[source]¶
Decorator for subscription callbacks (subscriber side of pub/sub).
Creates a SubscriberBlueprint and registers it in CallbackRegistry. The decorated method will be called when messages are received on the topic.
- Parameters:
Example
>>> class Component: ... @remote_subscriber(topic="/status") ... async def on_status_update(self, message): ... logger.info(f"Received status: {message}")
Note
Subscriber is automatically created and bound during registration
The decorated method is called for each received message
- vyra_base.com.core.decorators.get_decorated_methods(obj)[source]¶
Get all methods decorated with communication decorators.
Scans an object for decorated methods and returns them organized by type. Also retrieves associated blueprints from CallbackRegistry.
- Parameters:
obj (
Any) – Object to inspect (typically a component instance)- Returns:
- {
“servers”: […], # @remote_service methods “publishers”: […], # @remote_publisher methods “subscribers”: […], # @remote_subscriber methods “actions”: […] # @remote_actionServer methods
}
- Each entry contains: {
“name”: str, # Interface name “method”: Callable, # Bound method “protocols”: List, # Protocol preferences “kwargs”: dict, # Additional metadata “blueprint”: Blueprint # Associated blueprint (if registered)
}
- Return type:
Example
>>> component = MyComponent() >>> methods = get_decorated_methods(component) >>> print(f"Found {len(methods['callables'])} services") >>> for svc in methods['callables']: ... if svc['blueprint'] and not svc['blueprint'].is_bound(): ... svc['blueprint'].bind_callback(svc['method'])
- vyra_base.com.core.decorators.bind_decorated_callbacks(component, namespace=None, force_rebind=False)[source]¶
Bind all decorated methods from a component to their registered blueprints.
This is the Phase 2 of the two-phase initialization, typically called during component initialization (e.g., in Component.__init__ or set_interfaces()).
- Parameters:
- Return type:
- Returns:
Dictionary mapping interface names to binding success status
Example
>>> component = MyComponent() >>> results = bind_decorated_callbacks(component, namespace="v2_modulemanager") >>> print(f"Successfully bound {sum(results.values())} interfaces") >>> failed = [name for name, success in results.items() if not success] >>> if failed: ... logger.warning(f"Failed to bind: {failed}")
- vyra_base.com.core.decorators.remote_service(name=None, protocols=None, auto_register=True, namespace=None, **kwargs)[source]¶
Decorator for request/response communication (ROS2 Service, gRPC, etc.)
Creates a ServiceBlueprint and registers it in CallbackRegistry. The decorated method will be bound to the blueprint during component initialization.
- Parameters:
name (
Optional[str]) – Service name (defaults to function name)protocols (
Optional[List[ProtocolType]]) – Preferred protocols (will use factory fallback if None)auto_register (
bool) – Whether to auto-register blueprint (default True)namespace (
Optional[str]) – Optional module namespace for blueprint registration**kwargs – Additional metadata (service_type, qos, etc.)
Example
>>> class MyComponent(OperationalStateMachine): ... @remote_service() ... async def calculate(self, request, response=None): ... result = request["x"] + request["y"] ... return {"result": result} ... ... @remote_service(protocols=[ProtocolType.ROS2, ProtocolType.ZENOH]) ... async def process_data(self, request, response=None): ... # This method prioritizes ROS2, falls back to Zenoh ... return {"status": "processed"}
Note
Methods must accept (request, response=None) signature
Response parameter maintained for ROS2 compatibility
Async methods recommended (sync methods wrapped automatically)
Blueprint created during decoration, callback bound during initialization
- vyra_base.com.core.decorators.remote_service_ros2(name=None, protocols=None, auto_register=True, namespace=None, **kwargs)¶
Decorator for request/response communication (ROS2 Service, gRPC, etc.)
Creates a ServiceBlueprint and registers it in CallbackRegistry. The decorated method will be bound to the blueprint during component initialization.
- Parameters:
name (
Optional[str]) – Service name (defaults to function name)protocols (
Optional[List[ProtocolType]]) – Preferred protocols (will use factory fallback if None)auto_register (
bool) – Whether to auto-register blueprint (default True)namespace (
Optional[str]) – Optional module namespace for blueprint registration**kwargs – Additional metadata (service_type, qos, etc.)
Example
>>> class MyComponent(OperationalStateMachine): ... @remote_service() ... async def calculate(self, request, response=None): ... result = request["x"] + request["y"] ... return {"result": result} ... ... @remote_service(protocols=[ProtocolType.ROS2, ProtocolType.ZENOH]) ... async def process_data(self, request, response=None): ... # This method prioritizes ROS2, falls back to Zenoh ... return {"status": "processed"}
Note
Methods must accept (request, response=None) signature
Response parameter maintained for ROS2 compatibility
Async methods recommended (sync methods wrapped automatically)
Blueprint created during decoration, callback bound during initialization
- vyra_base.com.core.decorators.remote_speaker(name=None, protocols=None, auto_register=True, namespace=None, **kwargs)¶
Decorator for pub/sub communication (publisher side).
Creates a PublisherBlueprint and registers it in CallbackRegistry. The decorated method becomes a publisher interface.
- Parameters:
Example
>>> class Component: ... @remote_publisher(protocols=[ProtocolType.REDIS]) ... async def publish_status(self, message): ... '''This method will become a publisher''' ... pass ... ... async def some_task(self): ... # Publish status update ... await self.publish_status({"state": "running"})
Note
Publisher creation happens via InterfaceFactory during binding
The decorated method is replaced with actual publishing logic
- vyra_base.com.core.decorators.remote_listener(name=None, protocols=None, auto_register=True, namespace=None, **kwargs)¶
Decorator for subscription callbacks (subscriber side of pub/sub).
Creates a SubscriberBlueprint and registers it in CallbackRegistry. The decorated method will be called when messages are received on the topic.
- Parameters:
Example
>>> class Component: ... @remote_subscriber(topic="/status") ... async def on_status_update(self, message): ... logger.info(f"Received status: {message}")
Note
Subscriber is automatically created and bound during registration
The decorated method is called for each received message
Overview¶
All decorators follow the two-phase initialization pattern:
Phase 1 — The decorator runs at class-definition time and creates a
Blueprint. No network connection is made yet.Phase 2 — Calling
bind_decorated_callbacks()attaches your instance methods to the blueprints. After that,InterfaceFactorycan create live interfaces.
Decorator Reference¶
@remote_service()¶
Expose a method as a request/response service.
from vyra_base.com import remote_service, ProtocolType, AccessLevel
class MyComponent:
@remote_service(
name="calculate",
protocols=[ProtocolType.ZENOH, ProtocolType.REDIS],
namespace="my_module",
access_level=AccessLevel.PUBLIC,
)
async def calculate(self, request, response=None):
return {"result": request["x"] + request["y"]}
Parameters:
Parameter |
Type |
Description |
|---|---|---|
|
|
Service name (defaults to function name) |
|
|
Preferred protocols in fallback order |
|
|
Topic namespace prefix (usually your module name) |
|
|
Who can call this service (default: |
@remote_publisher¶
Expose a method as a message publisher.
from vyra_base.com import remote_publisher, ProtocolType
class SensorComponent:
@remote_publisher(
name="temperature",
protocols=[ProtocolType.ZENOH],
namespace="sensors",
)
async def publish_temperature(self, value: float):
return {"value": value, "unit": "°C"}
The return value becomes the message payload published to subscribers.
@remote_subscriber¶
Register a method as a subscriber callback.
from vyra_base.com import remote_subscriber, ProtocolType
class DashboardComponent:
@remote_subscriber(
name="temperature",
protocols=[ProtocolType.ZENOH],
namespace="sensors",
)
async def on_temperature(self, message: dict):
print(f"Temperature: {message['value']} {message['unit']}")
@remote_actionServer¶
Register a group of methods as an action server.
Your class must inherit from IActionHandler.
from vyra_base.com import remote_actionServer, IActionHandler, IGoalHandle, ProtocolType
class ProcessingComponent(IActionHandler):
@remote_actionServer.on_goal(name="run", protocols=[ProtocolType.ZENOH])
async def on_goal(self, goal_request: dict) -> bool:
"""Accept (True) or reject (False) the incoming goal."""
return goal_request.get("dataset") is not None
@remote_actionServer.execute(name="run")
async def execute(self, goal_handle: IGoalHandle):
"""Run the action; publish feedback; set result."""
for i in range(100):
await goal_handle.publish_feedback({"progress": i})
goal_handle.succeed()
return {"status": "done"}
@remote_actionServer.on_cancel(name="run")
async def on_cancel(self, goal_handle: IGoalHandle) -> bool:
"""Return True to accept cancellation."""
return True
Helper Functions¶
from vyra_base.com import get_decorated_methods, bind_decorated_callbacks
# Inspect all decorated methods on a component instance
methods = get_decorated_methods(component)
# Phase 2: attach instance callbacks to blueprints
bind_decorated_callbacks(component, namespace="my_module")
See also
Quickstart — Building a VYRA Module — Step 2 shows full usage in a module context