Interface Factory

Unified interface creation with automatic protocol selection and fallback.

Overview

The InterfaceFactory provides a high-level API for creating communication interfaces with automatic protocol selection and graceful fallback. It supports both server-side (responding to requests) and client-side (making requests) patterns.

Key Features:

  • Automatic protocol selection (Zenoh → ROS2 → Redis → UDS)

  • Server/Client pattern with explicit methods

  • Publisher/Subscriber pattern support

  • Action Server/Client for long-running tasks

  • Protocol availability checking

  • Graceful degradation

  • Customizable fallback chains

Factory Methods

Server-Side Methods

These methods create interfaces that respond to requests or execute tasks:

create_callable(name, callback, protocols=None, **kwargs)

Creates a service server (responds to service calls).

param name:

Service name

param callback:

Server callback function (request, response) -> response

param protocols:

Optional list of protocols to try (defaults to CALLABLE_FALLBACK)

param kwargs:

Additional protocol-specific parameters

returns:

VyraCallable instance

raises InterfaceError:

If no protocol is available

# Service Server
server = await InterfaceFactory.create_callable(
    "calculate",
    callback=lambda req, res: setattr(res, 'result', req.a + req.b) or res
)

create_speaker(name, callback=None, protocols=None, **kwargs)

Creates a publisher (publishes messages to topics).

param name:

Topic name

param callback:

Optional callback (unused for publisher)

param protocols:

Optional list of protocols to try (defaults to SPEAKER_FALLBACK)

param kwargs:

Additional protocol-specific parameters

returns:

VyraSpeaker instance

raises InterfaceError:

If no protocol is available

# Publisher
speaker = await InterfaceFactory.create_speaker("events")
await speaker.shout({"event": "update"})

create_job(name, callback, protocols=None, **kwargs)

Creates an action server (executes long-running tasks).

param name:

Action name

param callback:

Action execution callback

param protocols:

Optional list of protocols to try (defaults to JOB_FALLBACK)

param kwargs:

Additional protocol-specific parameters

returns:

VyraJob instance

raises InterfaceError:

If no protocol is available

# Action Server
async def execute_task(goal, feedback_callback=None):
    for i in range(100):
        if feedback_callback:
            await feedback_callback({"progress": i})
    return {"status": "completed"}

job = await InterfaceFactory.create_job("process", callback=execute_task)

Client-Side Methods

These methods create interfaces that make requests or send goals:

create_caller(name, protocols=None, **kwargs) (NEW!)

Creates a service client (calls remote services).

param name:

Service name

param protocols:

Optional list of protocols to try (defaults to CALLABLE_FALLBACK)

param kwargs:

Additional protocol-specific parameters

returns:

VyraCallable instance configured as client

raises InterfaceError:

If no protocol is available

# Service Client
caller = await InterfaceFactory.create_caller("calculate")
response = await caller.call(request, timeout=5.0)

create_listener(name, callback, protocols=None, **kwargs) (NEW!)

Creates a subscriber (receives messages from topics).

param name:

Topic name

param callback:

Message callback function (message) -> None

param protocols:

Optional list of protocols to try (defaults to SPEAKER_FALLBACK)

param kwargs:

Additional protocol-specific parameters

returns:

VyraSpeaker instance configured as subscriber

raises InterfaceError:

If no protocol is available

# Subscriber
listener = await InterfaceFactory.create_listener(
    "events",
    callback=lambda msg: print(f"Received: {msg}")
)

create_dispatcher(name, protocols=None, **kwargs) (NEW!)

Creates an action client (sends goals to action servers).

param name:

Action name

param protocols:

Optional list of protocols to try (defaults to JOB_FALLBACK)

param kwargs:

Additional protocol-specific parameters (e.g., feedback_callback)

returns:

VyraJob instance configured as client

raises InterfaceError:

If no protocol is available

# Action Client
dispatcher = await InterfaceFactory.create_dispatcher(
    "process",
    feedback_callback=lambda fb: print(f"Progress: {fb}")
)
result = await dispatcher.execute(goal)

Configuration Methods

register_provider(provider)

Register a protocol provider with the factory.

param provider:

AbstractProtocolProvider instance or list of providers

from vyra_base.com.transport.t_ros2 import ROS2Provider

provider = ROS2Provider("my_module", "id123")
await provider.initialize()
InterfaceFactory.register_provider(provider)

unregister_provider(protocol)

Unregister a protocol provider.

param protocol:

ProtocolType enum value

set_fallback_chain(interface_type, protocols)

Customize the protocol fallback order for a specific interface type.

param interface_type:

„callable“, „speaker“, or „job“

param protocols:

Ordered list of ProtocolType to try

# Prioritize UDS for callables
InterfaceFactory.set_fallback_chain(
    "callable",
    [ProtocolType.UDS, ProtocolType.REDIS, ProtocolType.ROS2]
)

get_available_protocols()

Returns a list of currently available protocols.

returns:

List[ProtocolType]

protocols = InterfaceFactory.get_available_protocols()
if ProtocolType.ROS2 in protocols:
    print("ROS2 is available")

Usage Examples

Complete Server-Client Example

from vyra_base.com import InterfaceFactory
from vyra_base.com.core.types import ProtocolType

# Initialize provider
from vyra_base.com.transport.t_ros2 import ROS2Provider
provider = ROS2Provider("my_module", "id123")
await provider.initialize()
InterfaceFactory.register_provider(provider)

# SERVER: Create service server
async def handle_request(request, response):
    response.result = request.a + request.b
    return response

server = await InterfaceFactory.create_callable(
    "add_service",
    callback=handle_request,
    service_type=AddTwoInts
)

# CLIENT: Call the service
caller = await InterfaceFactory.create_caller(
    "add_service",
    service_type=AddTwoInts
)

request = AddTwoInts.Request()
request.a = 5
request.b = 3
response = await caller.call(request)
print(f"Result: {response.sum}")  # 8

Publisher-Subscriber Example

# PUBLISHER
speaker = await InterfaceFactory.create_speaker(
    "temperature",
    message_type=Temperature
)
await speaker.shout(Temperature(value=23.5))

# SUBSCRIBER
def on_temperature(msg):
    print(f"Temperature: {msg.value}°C")

listener = await InterfaceFactory.create_listener(
    "temperature",
    callback=on_temperature,
    message_type=Temperature
)

Action Server-Client Example

# ACTION SERVER
async def execute_fibonacci(goal, feedback_callback=None):
    sequence = [0, 1]
    for i in range(1, goal.order):
        sequence.append(sequence[-1] + sequence[-2])
        if feedback_callback:
            await feedback_callback({"sequence": sequence})
    return {"sequence": sequence}

job = await InterfaceFactory.create_job(
    "fibonacci",
    callback=execute_fibonacci,
    action_type=Fibonacci
)

# ACTION CLIENT
def on_feedback(fb):
    print(f"Current sequence: {fb['sequence']}")

dispatcher = await InterfaceFactory.create_dispatcher(
    "fibonacci",
    feedback_callback=on_feedback,
    action_type=Fibonacci
)

goal = Fibonacci.Goal(order=10)
result = await dispatcher.execute(goal)
print(f"Final: {result.result.sequence}")

Explicit Protocol Selection

# Try specific protocols only
caller = await InterfaceFactory.create_caller(
    "fast_service",
    protocols=[ProtocolType.UDS, ProtocolType.SHARED_MEMORY]
)

Fallback Chain Customization

# Prioritize Zenoh for all speakers
InterfaceFactory.set_fallback_chain(
    "speaker",
    [ProtocolType.ZENOH, ProtocolType.REDIS, ProtocolType.MQTT]
)

Default Fallback Chains

CALLABLE_FALLBACK:
  • Zenoh → ROS2 → Redis → UDS

SPEAKER_FALLBACK:
  • Zenoh → ROS2 → Redis → UDS

JOB_FALLBACK:
  • Zenoh → ROS2 → Redis → UDS

Server vs Client Behavior

The factory automatically determines server/client behavior based on the method used:

Method

Role

Flag

Behavior

create_callable()

Server

is_callable=True

Responds to service calls

create_caller()

Client

is_callable=False

Makes service calls

create_speaker()

Publisher

is_publisher=True

Publishes messages

create_listener()

Subscriber

is_publisher=False

Receives messages

create_job()

Server

is_job=True

Executes actions

create_dispatcher()

Client

is_job=False

Sends goals

Error Handling

from vyra_base.com.core.exceptions import (
    ProtocolUnavailableError,
    InterfaceError
)

try:
    caller = await InterfaceFactory.create_caller(
        "service",
        protocols=[ProtocolType.ROS2]
    )
except ProtocolUnavailableError:
    # ROS2 not available, try fallback
    caller = await InterfaceFactory.create_caller(
        "service",
        protocols=[ProtocolType.UDS]
    )
except InterfaceError as e:
    logger.error(f"Failed to create interface: {e}")

Best Practices

  1. Use explicit server/client methods for clarity:

    # ✅ Good - Clear intent
    server = await InterfaceFactory.create_callable(...)
    client = await InterfaceFactory.create_caller(...)
    
    # ❌ Avoid - Ambiguous
    interface = await InterfaceFactory.create_callable(...)  # Server or client?
    
  2. Register providers once at startup:

    # ✅ Good - Register once
    provider = ROS2Provider(...)
    await provider.initialize()
    InterfaceFactory.register_provider(provider)
    
  3. Check availability before explicit protocol selection:

    # ✅ Good - Check first
    if ProtocolType.ROS2 in InterfaceFactory.get_available_protocols():
        interface = await InterfaceFactory.create_caller(
            protocols=[ProtocolType.ROS2]
        )
    
  4. Use type hints and docstrings:

    # ✅ Good - Clear types
    async def handle_request(
        request: AddTwoInts.Request,
        response: AddTwoInts.Response
    ) -> AddTwoInts.Response:
        """Add two integers."""
        response.sum = request.a + request.b
        return response
    

See Also

Interface Factory

Unified interface creation with automatic protocol selection and fallback.

class vyra_base.com.core.factory.InterfaceFactory[Quellcode]

Bases: object

Factory for creating communication interfaces with protocol fallback.

Features: - Automatic protocol selection - Fallback chain (ROS2 → SharedMemory → UDS → Redis → gRPC) - Protocol availability checking - Graceful degradation

Example

>>> # Auto-select best available protocol
>>> server = await InterfaceFactory.create_server(
...     "my_service",
...     callback=handle_request
... )
>>>
>>> # Explicit protocol with fallback
>>> server = await InterfaceFactory.create_server(
...     "my_service",
...     protocols=[ProtocolType.ROS2, ProtocolType.SHARED_MEMORY],
...     callback=handle_request
... )
>>>
>>> # Publisher for pub/sub
>>> publisher = await InterfaceFactory.create_publisher(
...     "temperature",
...     protocols=[ProtocolType.REDIS, ProtocolType.MQTT]
... )
PUBLISHER_FALLBACK = [ProtocolType.ZENOH, ProtocolType.ROS2, ProtocolType.REDIS, ProtocolType.UDS]
SUBSCRIBER_FALLBACK = [ProtocolType.ZENOH, ProtocolType.ROS2, ProtocolType.REDIS, ProtocolType.UDS]
SERVER_FALLBACK = [ProtocolType.ZENOH, ProtocolType.ROS2, ProtocolType.REDIS, ProtocolType.UDS]
CLIENT_FALLBACK = [ProtocolType.ZENOH, ProtocolType.ROS2, ProtocolType.REDIS, ProtocolType.UDS]
ACTION_SERVER_FALLBACK = [ProtocolType.ZENOH, ProtocolType.ROS2, ProtocolType.REDIS, ProtocolType.UDS]
ACTION_CLIENT_FALLBACK = [ProtocolType.ZENOH, ProtocolType.ROS2, ProtocolType.REDIS, ProtocolType.UDS]
static loop_check_pending()[Quellcode]

Check pending interfaces and initialize if callbacks are now available. Should be called periodically (e.g. in main loop) to handle late callback registration.

static register_provider(provider)[Quellcode]

Register one or more protocol providers.

Parameter:

provider (Union[AbstractProtocolProvider, list]) – Single provider instance or list of providers

Rückgabetyp:

None

static unregister_provider(protocol)[Quellcode]

Unregister a protocol provider.

Parameter:

protocol (ProtocolType) – Protocol type to unregister

Rückgabetyp:

None

async static create_publisher(name, protocols=None, **kwargs)[Quellcode]

Create Publisher with automatic protocol selection.

Only the own (default) provider is used.

Parameter:
  • name (str) – Topic/channel name

  • protocols (Optional[List[ProtocolType]]) – Preferred protocols (fallback if not specified)

  • **kwargs – Additional parameters (message_type, qos, etc.)

Rückgabe:

Initialized publisher

Rückgabetyp:

VyraPublisher

Verursacht:

InterfaceError – If no protocol available

async static create_subscriber(name, subscriber_callback, protocols=None, module_id=None, module_name=None, **kwargs)[Quellcode]

Create Subscriber with automatic protocol selection.

Parameter:
  • name (str) – Topic/channel name

  • subscriber_callback (Callable) – Async callback for received messages

  • protocols (Optional[List[ProtocolType]]) – Preferred protocols (fallback if not specified)

  • module_id (Optional[str]) – Optional target module ID. When provided the registry looks up (or creates) a provider scoped to that module.

  • module_name (Optional[str]) – Human-readable name of the target module (required when module_id is supplied).

  • **kwargs – Additional parameters (message_type, qos, etc.)

Rückgabe:

Initialized subscriber or None if pending

Rückgabetyp:

Optional[VyraSubscriber]

Verursacht:

InterfaceError – If no protocol available

async static create_server(name, response_callback, protocols=None, **kwargs)[Quellcode]

Create Server with automatic protocol selection.

Only the own (default) provider is used. module_id / module_name are determined by the provider that was registered without a module_id.

Parameter:
  • name (str) – Service name

  • response_callback (Optional[Callable]) – Async callback for requests

  • protocols (Optional[List[ProtocolType]]) – Preferred protocols (fallback if not specified)

  • **kwargs – Additional parameters (service_type, qos, etc.)

Rückgabe:

Initialized server or None if pending

Rückgabetyp:

Optional[VyraServer]

Verursacht:

InterfaceError – If no protocol available

async static create_client(name, protocols=None, module_id=None, module_name=None, **kwargs)[Quellcode]

Create Client with automatic protocol selection.

Parameter:
  • name (str) – Service name

  • protocols (Optional[List[ProtocolType]]) – Preferred protocols (fallback if not specified)

  • module_id (Optional[str]) – Optional target module ID. When provided the registry looks up (or creates) a provider scoped to that module.

  • module_name (Optional[str]) – Human-readable name of the target module (required when module_id is supplied).

  • **kwargs – Additional parameters (service_type, qos, etc.)

Rückgabe:

Initialized client

Rückgabetyp:

VyraClient

Verursacht:

InterfaceError – If no protocol available

async static create_action_server(name, handle_goal_request, handle_cancel_request, execution_callback, protocols=None, **kwargs)[Quellcode]

Create Action Server with automatic protocol selection.

Only the own (default) provider is used.

Parameter:
  • name (str) – Action name

  • handle_goal_request (Optional[Callable]) – Async callback to accept/reject goals

  • handle_cancel_request (Optional[Callable]) – Async callback for cancellations

  • execution_callback (Optional[Callable]) – Async callback for goal execution

  • protocols (Optional[List[ProtocolType]]) – Preferred protocols (fallback if not specified)

  • **kwargs – Additional parameters (action_type, qos, etc.)

Rückgabe:

Initialized action server or None if pending

Rückgabetyp:

Optional[VyraActionServer]

Verursacht:

InterfaceError – If no protocol available

async static create_action_client(name, direct_response_callback=None, feedback_callback=None, goal_callback=None, protocols=None, module_id=None, module_name=None, **kwargs)[Quellcode]

Create Action Client with automatic protocol selection.

Parameter:
  • name (str) – Action name

  • direct_response_callback (Optional[Callable]) – Async callback for goal acceptance

  • feedback_callback (Optional[Callable]) – Async callback for feedback

  • goal_callback (Optional[Callable]) – Async callback for final result

  • protocols (Optional[List[ProtocolType]]) – Preferred protocols (fallback if not specified)

  • module_id (Optional[str]) – Optional target module ID. When provided the registry looks up (or creates) a provider scoped to that module.

  • module_name (Optional[str]) – Human-readable name of the target module (required when module_id is supplied).

  • **kwargs – Additional parameters (action_type, qos, etc.)

Rückgabe:

Initialized action client or None if pending

Rückgabetyp:

Optional[VyraActionClient]

Verursacht:

InterfaceError – If no protocol available

static get_available_protocols(interface_type, protocols)[Quellcode]

Customize fallback chain for interface type.

Parameter:
  • interface_type (str) – „server“, „publisher“, or „actionServer“

  • protocols (List[ProtocolType]) – Ordered list of protocols to try

Rückgabetyp:

List[ProtocolType]

Example

>>> # Prioritize SharedMemory over ROS2
>>> InterfaceFactory.set_fallback_chain(
...     "server",
...     [ProtocolType.SHARED_MEMORY, ProtocolType.ROS2, ProtocolType.UDS]
... )
async static create_from_blueprint(blueprint, **override_kwargs)[Quellcode]

Create interface from a HandlerBlueprint.

This is the primary method for two-phase initialization: 1. Blueprint created during decoration/configuration 2. Interface created when blueprint is bound with callback

Parameter:
  • blueprint (HandlerBlueprint) – HandlerBlueprint instance

  • **override_kwargs – Override blueprint metadata

Rückgabetyp:

Union[VyraServer, VyraPublisher, VyraSubscriber, VyraActionServer, None]

Rückgabe:

Created interface or None if pending (callback not bound yet)

Example

>>> from vyra_base.com.core.blueprints import ServiceBlueprint
>>> blueprint = ServiceBlueprint(name="my_service")
>>> # ... later when callback available ...
>>> blueprint.bind_callback(my_callback)
>>> server = await InterfaceFactory.create_from_blueprint(blueprint)
async static bind_pending_callback(name, callback=None, handle_goal_request=None, handle_cancel_request=None, execution_callback=None)[Quellcode]

Bind one or more callbacks to a pending interface and initialize it.

For servers/subscribers, use the callback parameter. For action servers, use the dedicated handle_goal_request, handle_cancel_request and execution_callback parameters. If a parameter is None, the previously stored value is kept.

Parameter:
  • name (str) – Interface name (must match pending registration)

  • callback (Optional[Callable]) – Callback for server (response_callback) or subscriber (subscriber_callback)

  • handle_goal_request (Optional[Callable]) – Action server goal-accept callback

  • handle_cancel_request (Optional[Callable]) – Action server cancel callback

  • execution_callback (Optional[Callable]) – Action server execution callback

Rückgabetyp:

Union[VyraServer, VyraSubscriber, VyraActionServer, None]

Rückgabe:

Initialized interface or None if not found in pending

Example

>>> # Phase 1: Create server without callback
>>> await InterfaceFactory.create_server("my_service", response_callback=None)
>>> # Phase 2: Bind callback later
>>> server = await InterfaceFactory.bind_pending_callback(
...     "my_service", callback=my_callback_function
... )
>>>
>>> # Action server – bind all three callbacks
>>> action_srv = await InterfaceFactory.bind_pending_callback(
...     "my_action",
...     handle_goal_request=on_goal,
...     handle_cancel_request=on_cancel,
...     execution_callback=execute,
... )
async static process_pending_interfaces()[Quellcode]

Process all pending interfaces, attempting to initialize those with callbacks.

This should be called periodically (e.g., from entity event loop) or after binding callbacks via CallbackRegistry.

Rückgabetyp:

Dict[str, bool]

Rückgabe:

Dictionary mapping interface names to initialization success status

Example

>>> # In entity event loop
>>> while running:
...     results = await InterfaceFactory.process_pending_interfaces()
...     await asyncio.sleep(1.0)
static get_pending_count()[Quellcode]

Get count of pending interfaces awaiting callback binding.

Rückgabetyp:

int

static list_pending()[Quellcode]

Get list of pending interface names.

Rückgabetyp:

List[str]

static has_pending(name)[Quellcode]

Check if an interface is pending callback binding.

Rückgabetyp:

bool

Parameter:

name (str)