Communication Decorators

Protocol-agnostic decorators that transform component methods into communication interfaces.

Communication Decorators

Modern decorators for multi-protocol communication with late-binding support.

The decorator system uses a two-phase initialization pattern: 1. Phase 1 (Decoration): Create HandlerBlueprint, register in CallbackRegistry 2. Phase 2 (Binding): Bind decorated method to blueprint during component initialization

This decouples interface definition from implementation, enabling: - Dynamic interface registration - Late binding of callbacks - Better testing (mock callbacks easily) - Clear separation of concerns

Example

>>> class MyComponent:
...     @remote_service()
...     async def calculate(self, request, response=None):
...         return {"result": request["x"] + request["y"]}

The decorator creates a ServiceBlueprint, but binding happens when the component is registered with an entity.

vyra_base.com.core.decorators.remote_publisher(name=None, protocols=None, auto_register=True, namespace=None, **kwargs)[source]

Decorator for pub/sub communication (publisher side).

Creates a PublisherBlueprint and registers it in CallbackRegistry. The decorated method becomes a publisher interface.

Parameters:
  • name (Optional[str]) – Topic/channel name (defaults to function name)

  • protocols (Optional[List[ProtocolType]]) – Preferred protocols (will use factory fallback if None)

  • auto_register (bool) – Whether to auto-register blueprint

  • namespace (Optional[str]) – Optional module namespace

  • **kwargs – Additional metadata (message_type, qos, retain, etc.)

Example

>>> class Component:
...     @remote_publisher(protocols=[ProtocolType.REDIS])
...     async def publish_status(self, message):
...         '''This method will become a publisher'''
...         pass
...
...     async def some_task(self):
...         # Publish status update
...         await self.publish_status({"state": "running"})

Note

  • Publisher creation happens via InterfaceFactory during binding

  • The decorated method is replaced with actual publishing logic

class vyra_base.com.core.decorators.ActionServerDecorator[source]

Bases: object

Class-based decorator for ActionServer with multi-callback support.

Provides sub-decorators for separate goal acceptance, cancellation, and execution callbacks. This enables clean separation of concerns for action server implementations.

Usage:
>>> class Component:
...     @remote_actionServer.on_goal(name="process_batch")
...     async def accept_goal(self, goal_request):
...         '''Validate and accept/reject the goal'''
...         return True  # Accept goal
...
...     @remote_actionServer.on_cancel(name="process_batch")
...     async def handle_cancel(self, goal_handle):
...         '''Handle cancellation request'''
...         return True  # Accept cancellation
...
...     @remote_actionServer.execute(name="process_batch")
...     async def execute_batch(self, goal_handle):
...         '''Execute the action with feedback'''
...         for i in range(10):
...             if goal_handle.is_cancel_requested():
...                 goal_handle.canceled()
...                 return {"cancelled": True}
...             goal_handle.publish_feedback({"progress": (i+1)*10})
...         goal_handle.succeed()
...         return {"result": "completed"}
static on_goal(name, protocols=None, auto_register=True, namespace=None, **kwargs)[source]

Decorator for goal acceptance callback.

Parameters:
  • name (str) – Action name (REQUIRED for multi-callback pattern)

  • protocols (Optional[List[ProtocolType]]) – Preferred protocols

  • auto_register (bool) – Whether to auto-register blueprint

  • namespace (Optional[str]) – Optional module namespace

  • **kwargs – Additional metadata

The decorated method should: - Accept (self, goal_request) parameters - Return bool (True=accept, False=reject)

static on_cancel(name, protocols=None, auto_register=False, namespace=None, **kwargs)[source]

Decorator for cancellation callback.

Parameters:
  • name (str) – Action name (must match on_goal/execute)

  • protocols (Optional[List[ProtocolType]]) – Preferred protocols

  • auto_register (bool) – Usually False (blueprint already registered by on_goal)

  • namespace (Optional[str]) – Optional module namespace

  • **kwargs – Additional metadata

The decorated method should: - Accept (self, goal_handle) parameters - Return bool (True=accept cancel, False=reject cancel)

static execute(name, protocols=None, auto_register=False, namespace=None, **kwargs)[source]

Decorator for action execution callback.

Parameters:
  • name (str) – Action name (must match on_goal/on_cancel)

  • protocols (Optional[List[ProtocolType]]) – Preferred protocols

  • auto_register (bool) – Usually False (blueprint already registered by on_goal)

  • namespace (Optional[str]) – Optional module namespace

  • **kwargs – Additional metadata

The decorated method should: - Accept (self, goal_handle) parameters - Return Dict with result data - Use goal_handle.publish_feedback() for progress updates - Use goal_handle.succeed()/abort()/canceled() to complete

vyra_base.com.core.decorators.remote_subscriber(name=None, protocols=None, auto_register=True, namespace=None, **kwargs)[source]

Decorator for subscription callbacks (subscriber side of pub/sub).

Creates a SubscriberBlueprint and registers it in CallbackRegistry. The decorated method will be called when messages are received on the topic.

Parameters:
  • name (Optional[str]) – Topic/channel name (defaults to function name)

  • protocols (Optional[List[ProtocolType]]) – Preferred protocols (will use factory fallback if None)

  • auto_register (bool) – Whether to auto-register blueprint

  • namespace (Optional[str]) – Optional module namespace

  • **kwargs – Additional metadata (message_type, qos, etc.)

Example

>>> class Component:
...     @remote_subscriber(topic="/status")
...     async def on_status_update(self, message):
...         logger.info(f"Received status: {message}")

Note

  • Subscriber is automatically created and bound during registration

  • The decorated method is called for each received message

vyra_base.com.core.decorators.get_decorated_methods(obj)[source]

Get all methods decorated with communication decorators.

Scans an object for decorated methods and returns them organized by type. Also retrieves associated blueprints from CallbackRegistry.

Parameters:

obj (Any) – Object to inspect (typically a component instance)

Returns:

{

“servers”: […], # @remote_service methods “publishers”: […], # @remote_publisher methods “subscribers”: […], # @remote_subscriber methods “actions”: […] # @remote_actionServer methods

}

Each entry contains: {

“name”: str, # Interface name “method”: Callable, # Bound method “protocols”: List, # Protocol preferences “kwargs”: dict, # Additional metadata “blueprint”: Blueprint # Associated blueprint (if registered)

}

Return type:

dict

Example

>>> component = MyComponent()
>>> methods = get_decorated_methods(component)
>>> print(f"Found {len(methods['callables'])} services")
>>> for svc in methods['callables']:
...     if svc['blueprint'] and not svc['blueprint'].is_bound():
...         svc['blueprint'].bind_callback(svc['method'])
vyra_base.com.core.decorators.bind_decorated_callbacks(component, namespace=None, force_rebind=False)[source]

Bind all decorated methods from a component to their registered blueprints.

This is the Phase 2 of the two-phase initialization, typically called during component initialization (e.g., in Component.__init__ or set_interfaces()).

Parameters:
  • component (Any) – Component instance with decorated methods

  • namespace (Optional[str]) – Optional namespace for blueprint lookup

  • force_rebind (bool) – If True, unbind existing callbacks before binding

Return type:

Dict[str, bool]

Returns:

Dictionary mapping interface names to binding success status

Example

>>> component = MyComponent()
>>> results = bind_decorated_callbacks(component, namespace="v2_modulemanager")
>>> print(f"Successfully bound {sum(results.values())} interfaces")
>>> failed = [name for name, success in results.items() if not success]
>>> if failed:
...     logger.warning(f"Failed to bind: {failed}")
vyra_base.com.core.decorators.remote_service(name=None, protocols=None, auto_register=True, namespace=None, **kwargs)[source]

Decorator for request/response communication (ROS2 Service, gRPC, etc.)

Creates a ServiceBlueprint and registers it in CallbackRegistry. The decorated method will be bound to the blueprint during component initialization.

Parameters:
  • name (Optional[str]) – Service name (defaults to function name)

  • protocols (Optional[List[ProtocolType]]) – Preferred protocols (will use factory fallback if None)

  • auto_register (bool) – Whether to auto-register blueprint (default True)

  • namespace (Optional[str]) – Optional module namespace for blueprint registration

  • **kwargs – Additional metadata (service_type, qos, etc.)

Example

>>> class MyComponent(OperationalStateMachine):
...     @remote_service()
...     async def calculate(self, request, response=None):
...         result = request["x"] + request["y"]
...         return {"result": result}
...
...     @remote_service(protocols=[ProtocolType.ROS2, ProtocolType.ZENOH])
...     async def process_data(self, request, response=None):
...         # This method prioritizes ROS2, falls back to Zenoh
...         return {"status": "processed"}

Note

  • Methods must accept (request, response=None) signature

  • Response parameter maintained for ROS2 compatibility

  • Async methods recommended (sync methods wrapped automatically)

  • Blueprint created during decoration, callback bound during initialization

vyra_base.com.core.decorators.remote_service_ros2(name=None, protocols=None, auto_register=True, namespace=None, **kwargs)

Decorator for request/response communication (ROS2 Service, gRPC, etc.)

Creates a ServiceBlueprint and registers it in CallbackRegistry. The decorated method will be bound to the blueprint during component initialization.

Parameters:
  • name (Optional[str]) – Service name (defaults to function name)

  • protocols (Optional[List[ProtocolType]]) – Preferred protocols (will use factory fallback if None)

  • auto_register (bool) – Whether to auto-register blueprint (default True)

  • namespace (Optional[str]) – Optional module namespace for blueprint registration

  • **kwargs – Additional metadata (service_type, qos, etc.)

Example

>>> class MyComponent(OperationalStateMachine):
...     @remote_service()
...     async def calculate(self, request, response=None):
...         result = request["x"] + request["y"]
...         return {"result": result}
...
...     @remote_service(protocols=[ProtocolType.ROS2, ProtocolType.ZENOH])
...     async def process_data(self, request, response=None):
...         # This method prioritizes ROS2, falls back to Zenoh
...         return {"status": "processed"}

Note

  • Methods must accept (request, response=None) signature

  • Response parameter maintained for ROS2 compatibility

  • Async methods recommended (sync methods wrapped automatically)

  • Blueprint created during decoration, callback bound during initialization

vyra_base.com.core.decorators.remote_speaker(name=None, protocols=None, auto_register=True, namespace=None, **kwargs)

Decorator for pub/sub communication (publisher side).

Creates a PublisherBlueprint and registers it in CallbackRegistry. The decorated method becomes a publisher interface.

Parameters:
  • name (Optional[str]) – Topic/channel name (defaults to function name)

  • protocols (Optional[List[ProtocolType]]) – Preferred protocols (will use factory fallback if None)

  • auto_register (bool) – Whether to auto-register blueprint

  • namespace (Optional[str]) – Optional module namespace

  • **kwargs – Additional metadata (message_type, qos, retain, etc.)

Example

>>> class Component:
...     @remote_publisher(protocols=[ProtocolType.REDIS])
...     async def publish_status(self, message):
...         '''This method will become a publisher'''
...         pass
...
...     async def some_task(self):
...         # Publish status update
...         await self.publish_status({"state": "running"})

Note

  • Publisher creation happens via InterfaceFactory during binding

  • The decorated method is replaced with actual publishing logic

vyra_base.com.core.decorators.remote_listener(name=None, protocols=None, auto_register=True, namespace=None, **kwargs)

Decorator for subscription callbacks (subscriber side of pub/sub).

Creates a SubscriberBlueprint and registers it in CallbackRegistry. The decorated method will be called when messages are received on the topic.

Parameters:
  • name (Optional[str]) – Topic/channel name (defaults to function name)

  • protocols (Optional[List[ProtocolType]]) – Preferred protocols (will use factory fallback if None)

  • auto_register (bool) – Whether to auto-register blueprint

  • namespace (Optional[str]) – Optional module namespace

  • **kwargs – Additional metadata (message_type, qos, etc.)

Example

>>> class Component:
...     @remote_subscriber(topic="/status")
...     async def on_status_update(self, message):
...         logger.info(f"Received status: {message}")

Note

  • Subscriber is automatically created and bound during registration

  • The decorated method is called for each received message

Overview

All decorators follow the two-phase initialization pattern:

  1. Phase 1 — The decorator runs at class-definition time and creates a Blueprint. No network connection is made yet.

  2. Phase 2 — Calling bind_decorated_callbacks() attaches your instance methods to the blueprints. After that, InterfaceFactory can create live interfaces.

Decorator Reference

@remote_service()

Expose a method as a request/response service.

from vyra_base.com import remote_service, ProtocolType, AccessLevel

class MyComponent:
    @remote_service(
        name="calculate",
        protocols=[ProtocolType.ZENOH, ProtocolType.REDIS],
        namespace="my_module",
        access_level=AccessLevel.PUBLIC,
    )
    async def calculate(self, request, response=None):
        return {"result": request["x"] + request["y"]}

Parameters:

Parameter

Type

Description

name

str

Service name (defaults to function name)

protocols

list[ProtocolType]

Preferred protocols in fallback order

namespace

str

Topic namespace prefix (usually your module name)

access_level

AccessLevel

Who can call this service (default: PUBLIC)

@remote_publisher

Expose a method as a message publisher.

from vyra_base.com import remote_publisher, ProtocolType

class SensorComponent:
    @remote_publisher(
        name="temperature",
        protocols=[ProtocolType.ZENOH],
        namespace="sensors",
    )
    async def publish_temperature(self, value: float):
        return {"value": value, "unit": "°C"}

The return value becomes the message payload published to subscribers.

@remote_subscriber

Register a method as a subscriber callback.

from vyra_base.com import remote_subscriber, ProtocolType

class DashboardComponent:
    @remote_subscriber(
        name="temperature",
        protocols=[ProtocolType.ZENOH],
        namespace="sensors",
    )
    async def on_temperature(self, message: dict):
        print(f"Temperature: {message['value']} {message['unit']}")

@remote_actionServer

Register a group of methods as an action server. Your class must inherit from IActionHandler.

from vyra_base.com import remote_actionServer, IActionHandler, IGoalHandle, ProtocolType

class ProcessingComponent(IActionHandler):

    @remote_actionServer.on_goal(name="run", protocols=[ProtocolType.ZENOH])
    async def on_goal(self, goal_request: dict) -> bool:
        """Accept (True) or reject (False) the incoming goal."""
        return goal_request.get("dataset") is not None

    @remote_actionServer.execute(name="run")
    async def execute(self, goal_handle: IGoalHandle):
        """Run the action; publish feedback; set result."""
        for i in range(100):
            await goal_handle.publish_feedback({"progress": i})
        goal_handle.succeed()
        return {"status": "done"}

    @remote_actionServer.on_cancel(name="run")
    async def on_cancel(self, goal_handle: IGoalHandle) -> bool:
        """Return True to accept cancellation."""
        return True

Helper Functions

from vyra_base.com import get_decorated_methods, bind_decorated_callbacks

# Inspect all decorated methods on a component instance
methods = get_decorated_methods(component)

# Phase 2: attach instance callbacks to blueprints
bind_decorated_callbacks(component, namespace="my_module")

See also

Quickstart — Building a VYRA Module — Step 2 shows full usage in a module context