Interface Factory¶
Unified interface creation with automatic protocol selection and fallback.
Overview¶
The InterfaceFactory provides a high-level API for creating communication interfaces
with automatic protocol selection and graceful fallback. It supports both server-side
(responding to requests) and client-side (making requests) patterns.
Key Features:
Automatic protocol selection (Zenoh → ROS2 → Redis → UDS)
Server/Client pattern with explicit methods
Publisher/Subscriber pattern support
Action Server/Client for long-running tasks
Protocol availability checking
Graceful degradation
Customizable fallback chains
Factory Methods¶
Server-Side Methods¶
These methods create interfaces that respond to requests or execute tasks:
create_callable(name, callback, protocols=None, **kwargs)
Creates a service server (responds to service calls).
- param name:
Service name
- param callback:
Server callback function
(request, response) -> response- param protocols:
Optional list of protocols to try (defaults to CALLABLE_FALLBACK)
- param kwargs:
Additional protocol-specific parameters
- returns:
VyraCallableinstance- raises InterfaceError:
If no protocol is available
# Service Server
server = await InterfaceFactory.create_callable(
"calculate",
callback=lambda req, res: setattr(res, 'result', req.a + req.b) or res
)
create_speaker(name, callback=None, protocols=None, **kwargs)
Creates a publisher (publishes messages to topics).
- param name:
Topic name
- param callback:
Optional callback (unused for publisher)
- param protocols:
Optional list of protocols to try (defaults to SPEAKER_FALLBACK)
- param kwargs:
Additional protocol-specific parameters
- returns:
VyraSpeakerinstance- raises InterfaceError:
If no protocol is available
# Publisher
speaker = await InterfaceFactory.create_speaker("events")
await speaker.shout({"event": "update"})
create_job(name, callback, protocols=None, **kwargs)
Creates an action server (executes long-running tasks).
- param name:
Action name
- param callback:
Action execution callback
- param protocols:
Optional list of protocols to try (defaults to JOB_FALLBACK)
- param kwargs:
Additional protocol-specific parameters
- returns:
VyraJobinstance- raises InterfaceError:
If no protocol is available
# Action Server
async def execute_task(goal, feedback_callback=None):
for i in range(100):
if feedback_callback:
await feedback_callback({"progress": i})
return {"status": "completed"}
job = await InterfaceFactory.create_job("process", callback=execute_task)
Client-Side Methods¶
These methods create interfaces that make requests or send goals:
create_caller(name, protocols=None, **kwargs) (NEW!)
Creates a service client (calls remote services).
- param name:
Service name
- param protocols:
Optional list of protocols to try (defaults to CALLABLE_FALLBACK)
- param kwargs:
Additional protocol-specific parameters
- returns:
VyraCallableinstance configured as client- raises InterfaceError:
If no protocol is available
# Service Client
caller = await InterfaceFactory.create_caller("calculate")
response = await caller.call(request, timeout=5.0)
create_listener(name, callback, protocols=None, **kwargs) (NEW!)
Creates a subscriber (receives messages from topics).
- param name:
Topic name
- param callback:
Message callback function
(message) -> None- param protocols:
Optional list of protocols to try (defaults to SPEAKER_FALLBACK)
- param kwargs:
Additional protocol-specific parameters
- returns:
VyraSpeakerinstance configured as subscriber- raises InterfaceError:
If no protocol is available
# Subscriber
listener = await InterfaceFactory.create_listener(
"events",
callback=lambda msg: print(f"Received: {msg}")
)
create_dispatcher(name, protocols=None, **kwargs) (NEW!)
Creates an action client (sends goals to action servers).
- param name:
Action name
- param protocols:
Optional list of protocols to try (defaults to JOB_FALLBACK)
- param kwargs:
Additional protocol-specific parameters (e.g., feedback_callback)
- returns:
VyraJobinstance configured as client- raises InterfaceError:
If no protocol is available
# Action Client
dispatcher = await InterfaceFactory.create_dispatcher(
"process",
feedback_callback=lambda fb: print(f"Progress: {fb}")
)
result = await dispatcher.execute(goal)
Configuration Methods¶
register_provider(provider)
Register a protocol provider with the factory.
- param provider:
AbstractProtocolProviderinstance or list of providers
from vyra_base.com.transport.t_ros2 import ROS2Provider
provider = ROS2Provider("my_module", "id123")
await provider.initialize()
InterfaceFactory.register_provider(provider)
unregister_provider(protocol)
Unregister a protocol provider.
- param protocol:
ProtocolTypeenum value
set_fallback_chain(interface_type, protocols)
Customize the protocol fallback order for a specific interface type.
- param interface_type:
„callable“, „speaker“, or „job“
- param protocols:
Ordered list of
ProtocolTypeto try
# Prioritize UDS for callables
InterfaceFactory.set_fallback_chain(
"callable",
[ProtocolType.UDS, ProtocolType.REDIS, ProtocolType.ROS2]
)
get_available_protocols()
Returns a list of currently available protocols.
- returns:
List[ProtocolType]
protocols = InterfaceFactory.get_available_protocols()
if ProtocolType.ROS2 in protocols:
print("ROS2 is available")
Usage Examples¶
Complete Server-Client Example¶
from vyra_base.com import InterfaceFactory
from vyra_base.com.core.types import ProtocolType
# Initialize provider
from vyra_base.com.transport.t_ros2 import ROS2Provider
provider = ROS2Provider("my_module", "id123")
await provider.initialize()
InterfaceFactory.register_provider(provider)
# SERVER: Create service server
async def handle_request(request, response):
response.result = request.a + request.b
return response
server = await InterfaceFactory.create_callable(
"add_service",
callback=handle_request,
service_type=AddTwoInts
)
# CLIENT: Call the service
caller = await InterfaceFactory.create_caller(
"add_service",
service_type=AddTwoInts
)
request = AddTwoInts.Request()
request.a = 5
request.b = 3
response = await caller.call(request)
print(f"Result: {response.sum}") # 8
Publisher-Subscriber Example¶
# PUBLISHER
speaker = await InterfaceFactory.create_speaker(
"temperature",
message_type=Temperature
)
await speaker.shout(Temperature(value=23.5))
# SUBSCRIBER
def on_temperature(msg):
print(f"Temperature: {msg.value}°C")
listener = await InterfaceFactory.create_listener(
"temperature",
callback=on_temperature,
message_type=Temperature
)
Action Server-Client Example¶
# ACTION SERVER
async def execute_fibonacci(goal, feedback_callback=None):
sequence = [0, 1]
for i in range(1, goal.order):
sequence.append(sequence[-1] + sequence[-2])
if feedback_callback:
await feedback_callback({"sequence": sequence})
return {"sequence": sequence}
job = await InterfaceFactory.create_job(
"fibonacci",
callback=execute_fibonacci,
action_type=Fibonacci
)
# ACTION CLIENT
def on_feedback(fb):
print(f"Current sequence: {fb['sequence']}")
dispatcher = await InterfaceFactory.create_dispatcher(
"fibonacci",
feedback_callback=on_feedback,
action_type=Fibonacci
)
goal = Fibonacci.Goal(order=10)
result = await dispatcher.execute(goal)
print(f"Final: {result.result.sequence}")
Explicit Protocol Selection¶
# Try specific protocols only
caller = await InterfaceFactory.create_caller(
"fast_service",
protocols=[ProtocolType.UDS, ProtocolType.SHARED_MEMORY]
)
Fallback Chain Customization¶
# Prioritize Zenoh for all speakers
InterfaceFactory.set_fallback_chain(
"speaker",
[ProtocolType.ZENOH, ProtocolType.REDIS, ProtocolType.MQTT]
)
Default Fallback Chains¶
- CALLABLE_FALLBACK:
Zenoh → ROS2 → Redis → UDS
- SPEAKER_FALLBACK:
Zenoh → ROS2 → Redis → UDS
- JOB_FALLBACK:
Zenoh → ROS2 → Redis → UDS
Server vs Client Behavior¶
The factory automatically determines server/client behavior based on the method used:
Method |
Role |
Flag |
Behavior |
|---|---|---|---|
|
Server |
|
Responds to service calls |
|
Client |
|
Makes service calls |
|
Publisher |
|
Publishes messages |
|
Subscriber |
|
Receives messages |
|
Server |
|
Executes actions |
|
Client |
|
Sends goals |
Error Handling¶
from vyra_base.com.core.exceptions import (
ProtocolUnavailableError,
InterfaceError
)
try:
caller = await InterfaceFactory.create_caller(
"service",
protocols=[ProtocolType.ROS2]
)
except ProtocolUnavailableError:
# ROS2 not available, try fallback
caller = await InterfaceFactory.create_caller(
"service",
protocols=[ProtocolType.UDS]
)
except InterfaceError as e:
logger.error(f"Failed to create interface: {e}")
Best Practices¶
Use explicit server/client methods for clarity:
# ✅ Good - Clear intent server = await InterfaceFactory.create_callable(...) client = await InterfaceFactory.create_caller(...) # ❌ Avoid - Ambiguous interface = await InterfaceFactory.create_callable(...) # Server or client?
Register providers once at startup:
# ✅ Good - Register once provider = ROS2Provider(...) await provider.initialize() InterfaceFactory.register_provider(provider)
Check availability before explicit protocol selection:
# ✅ Good - Check first if ProtocolType.ROS2 in InterfaceFactory.get_available_protocols(): interface = await InterfaceFactory.create_caller( protocols=[ProtocolType.ROS2] )
Use type hints and docstrings:
# ✅ Good - Clear types async def handle_request( request: AddTwoInts.Request, response: AddTwoInts.Response ) -> AddTwoInts.Response: """Add two integers.""" response.sum = request.a + request.b return response
See Also¶
Core Communication Types - Interface types and enums
Communication Layer Exceptions - Communication exceptions
Communication Decorators - Protocol-agnostic decorators
ROS2 Transport Module - ROS2 transport provider
Redis Transport Module - Redis transport provider
Interface Factory
Unified interface creation with automatic protocol selection and fallback.
- class vyra_base.com.core.factory.InterfaceFactory[Quellcode]¶
Bases:
objectFactory for creating communication interfaces with protocol fallback.
Features: - Automatic protocol selection - Fallback chain (ROS2 → SharedMemory → UDS → Redis → gRPC) - Protocol availability checking - Graceful degradation
Example
>>> # Auto-select best available protocol >>> server = await InterfaceFactory.create_server( ... "my_service", ... callback=handle_request ... ) >>> >>> # Explicit protocol with fallback >>> server = await InterfaceFactory.create_server( ... "my_service", ... protocols=[ProtocolType.ROS2, ProtocolType.SHARED_MEMORY], ... callback=handle_request ... ) >>> >>> # Publisher for pub/sub >>> publisher = await InterfaceFactory.create_publisher( ... "temperature", ... protocols=[ProtocolType.REDIS, ProtocolType.MQTT] ... )
- PUBLISHER_FALLBACK = [ProtocolType.ZENOH, ProtocolType.ROS2, ProtocolType.REDIS, ProtocolType.UDS]¶
- SUBSCRIBER_FALLBACK = [ProtocolType.ZENOH, ProtocolType.ROS2, ProtocolType.REDIS, ProtocolType.UDS]¶
- SERVER_FALLBACK = [ProtocolType.ZENOH, ProtocolType.ROS2, ProtocolType.REDIS, ProtocolType.UDS]¶
- CLIENT_FALLBACK = [ProtocolType.ZENOH, ProtocolType.ROS2, ProtocolType.REDIS, ProtocolType.UDS]¶
- ACTION_SERVER_FALLBACK = [ProtocolType.ZENOH, ProtocolType.ROS2, ProtocolType.REDIS, ProtocolType.UDS]¶
- ACTION_CLIENT_FALLBACK = [ProtocolType.ZENOH, ProtocolType.ROS2, ProtocolType.REDIS, ProtocolType.UDS]¶
- static loop_check_pending()[Quellcode]¶
Check pending interfaces and initialize if callbacks are now available. Should be called periodically (e.g. in main loop) to handle late callback registration.
- static register_provider(provider)[Quellcode]¶
Register one or more protocol providers.
- Parameter:
provider (
Union[AbstractProtocolProvider,list]) – Single provider instance or list of providers- Rückgabetyp:
- static unregister_provider(protocol)[Quellcode]¶
Unregister a protocol provider.
- Parameter:
protocol (
ProtocolType) – Protocol type to unregister- Rückgabetyp:
- async static create_publisher(name, protocols=None, **kwargs)[Quellcode]¶
Create Publisher with automatic protocol selection.
Only the own (default) provider is used.
- Parameter:
name (
str) – Topic/channel nameprotocols (
Optional[List[ProtocolType]]) – Preferred protocols (fallback if not specified)**kwargs – Additional parameters (message_type, qos, etc.)
- Rückgabe:
Initialized publisher
- Rückgabetyp:
- Verursacht:
InterfaceError – If no protocol available
- async static create_subscriber(name, subscriber_callback, protocols=None, module_id=None, module_name=None, **kwargs)[Quellcode]¶
Create Subscriber with automatic protocol selection.
- Parameter:
name (
str) – Topic/channel namesubscriber_callback (
Callable) – Async callback for received messagesprotocols (
Optional[List[ProtocolType]]) – Preferred protocols (fallback if not specified)module_id (
Optional[str]) – Optional target module ID. When provided the registry looks up (or creates) a provider scoped to that module.module_name (
Optional[str]) – Human-readable name of the target module (required when module_id is supplied).**kwargs – Additional parameters (message_type, qos, etc.)
- Rückgabe:
Initialized subscriber or None if pending
- Rückgabetyp:
- Verursacht:
InterfaceError – If no protocol available
- async static create_server(name, response_callback, protocols=None, **kwargs)[Quellcode]¶
Create Server with automatic protocol selection.
Only the own (default) provider is used. module_id / module_name are determined by the provider that was registered without a module_id.
- Parameter:
- Rückgabe:
Initialized server or None if pending
- Rückgabetyp:
- Verursacht:
InterfaceError – If no protocol available
- async static create_client(name, protocols=None, module_id=None, module_name=None, **kwargs)[Quellcode]¶
Create Client with automatic protocol selection.
- Parameter:
name (
str) – Service nameprotocols (
Optional[List[ProtocolType]]) – Preferred protocols (fallback if not specified)module_id (
Optional[str]) – Optional target module ID. When provided the registry looks up (or creates) a provider scoped to that module.module_name (
Optional[str]) – Human-readable name of the target module (required when module_id is supplied).**kwargs – Additional parameters (service_type, qos, etc.)
- Rückgabe:
Initialized client
- Rückgabetyp:
- Verursacht:
InterfaceError – If no protocol available
- async static create_action_server(name, handle_goal_request, handle_cancel_request, execution_callback, protocols=None, **kwargs)[Quellcode]¶
Create Action Server with automatic protocol selection.
Only the own (default) provider is used.
- Parameter:
name (
str) – Action namehandle_goal_request (
Optional[Callable]) – Async callback to accept/reject goalshandle_cancel_request (
Optional[Callable]) – Async callback for cancellationsexecution_callback (
Optional[Callable]) – Async callback for goal executionprotocols (
Optional[List[ProtocolType]]) – Preferred protocols (fallback if not specified)**kwargs – Additional parameters (action_type, qos, etc.)
- Rückgabe:
Initialized action server or None if pending
- Rückgabetyp:
- Verursacht:
InterfaceError – If no protocol available
- async static create_action_client(name, direct_response_callback=None, feedback_callback=None, goal_callback=None, protocols=None, module_id=None, module_name=None, **kwargs)[Quellcode]¶
Create Action Client with automatic protocol selection.
- Parameter:
name (
str) – Action namedirect_response_callback (
Optional[Callable]) – Async callback for goal acceptancefeedback_callback (
Optional[Callable]) – Async callback for feedbackgoal_callback (
Optional[Callable]) – Async callback for final resultprotocols (
Optional[List[ProtocolType]]) – Preferred protocols (fallback if not specified)module_id (
Optional[str]) – Optional target module ID. When provided the registry looks up (or creates) a provider scoped to that module.module_name (
Optional[str]) – Human-readable name of the target module (required when module_id is supplied).**kwargs – Additional parameters (action_type, qos, etc.)
- Rückgabe:
Initialized action client or None if pending
- Rückgabetyp:
- Verursacht:
InterfaceError – If no protocol available
- static get_available_protocols(interface_type, protocols)[Quellcode]¶
Customize fallback chain for interface type.
- Parameter:
interface_type (
str) – „server“, „publisher“, or „actionServer“protocols (
List[ProtocolType]) – Ordered list of protocols to try
- Rückgabetyp:
Example
>>> # Prioritize SharedMemory over ROS2 >>> InterfaceFactory.set_fallback_chain( ... "server", ... [ProtocolType.SHARED_MEMORY, ProtocolType.ROS2, ProtocolType.UDS] ... )
- async static create_from_blueprint(blueprint, **override_kwargs)[Quellcode]¶
Create interface from a HandlerBlueprint.
This is the primary method for two-phase initialization: 1. Blueprint created during decoration/configuration 2. Interface created when blueprint is bound with callback
- Parameter:
blueprint (
HandlerBlueprint) – HandlerBlueprint instance**override_kwargs – Override blueprint metadata
- Rückgabetyp:
Union[VyraServer,VyraPublisher,VyraSubscriber,VyraActionServer,None]- Rückgabe:
Created interface or None if pending (callback not bound yet)
Example
>>> from vyra_base.com.core.blueprints import ServiceBlueprint >>> blueprint = ServiceBlueprint(name="my_service") >>> # ... later when callback available ... >>> blueprint.bind_callback(my_callback) >>> server = await InterfaceFactory.create_from_blueprint(blueprint)
- async static bind_pending_callback(name, callback=None, handle_goal_request=None, handle_cancel_request=None, execution_callback=None)[Quellcode]¶
Bind one or more callbacks to a pending interface and initialize it.
For servers/subscribers, use the
callbackparameter. For action servers, use the dedicatedhandle_goal_request,handle_cancel_requestandexecution_callbackparameters. If a parameter isNone, the previously stored value is kept.- Parameter:
name (
str) – Interface name (must match pending registration)callback (
Optional[Callable]) – Callback for server (response_callback) or subscriber (subscriber_callback)handle_goal_request (
Optional[Callable]) – Action server goal-accept callbackhandle_cancel_request (
Optional[Callable]) – Action server cancel callbackexecution_callback (
Optional[Callable]) – Action server execution callback
- Rückgabetyp:
Union[VyraServer,VyraSubscriber,VyraActionServer,None]- Rückgabe:
Initialized interface or None if not found in pending
Example
>>> # Phase 1: Create server without callback >>> await InterfaceFactory.create_server("my_service", response_callback=None) >>> # Phase 2: Bind callback later >>> server = await InterfaceFactory.bind_pending_callback( ... "my_service", callback=my_callback_function ... ) >>> >>> # Action server – bind all three callbacks >>> action_srv = await InterfaceFactory.bind_pending_callback( ... "my_action", ... handle_goal_request=on_goal, ... handle_cancel_request=on_cancel, ... execution_callback=execute, ... )
- async static process_pending_interfaces()[Quellcode]¶
Process all pending interfaces, attempting to initialize those with callbacks.
This should be called periodically (e.g., from entity event loop) or after binding callbacks via CallbackRegistry.
- Rückgabetyp:
- Rückgabe:
Dictionary mapping interface names to initialization success status
Example
>>> # In entity event loop >>> while running: ... results = await InterfaceFactory.process_pending_interfaces() ... await asyncio.sleep(1.0)
- static get_pending_count()[Quellcode]¶
Get count of pending interfaces awaiting callback binding.
- Rückgabetyp:
- static list_pending()[Quellcode]¶
Get list of pending interface names.
- static has_pending(name)[Quellcode]¶
Check if an interface is pending callback binding.