vyra_base.com package¶
Subpackages¶
- vyra_base.com.feeder package
- Submodules
- vyra_base.com.feeder.interfaces module
- vyra_base.com.feeder.feeder module
- vyra_base.com.feeder.custom_feeder module
- vyra_base.com.feeder.config_resolver module
- vyra_base.com.feeder.registry module
- vyra_base.com.feeder.state_feeder module
- vyra_base.com.feeder.news_feeder module
- vyra_base.com.feeder.error_feeder module
- Module contents
Submodules¶
vyra_base.com.handler module¶
Communication handlers for V.Y.R.A. modules.
Handler hierarchy¶
IFeederHandler (interfaces.py) — ABC: dispatch() + get_protocol() + logging.Handler
└── CommunicationHandler (communication.py) — concrete base
├── ROS2Handler (ros2.py) — ROS2 CAL publisher (via InterfaceFactory)
├── ZenohHandler (zenoh.py) — Zenoh CAL publisher (via InterfaceFactory)
├── RedisHandler (redis.py) — Redis CAL publisher (via InterfaceFactory)
├── UDSHandler (uds.py) — UDS CAL publisher (via InterfaceFactory)
└── DBCommunicationHandler (database.py) — async DB persistence
Use HandlerFactory to create handlers
without importing protocol-specific modules directly.
- class vyra_base.com.handler.IFeederHandler(*args, **kwargs)[source]¶
-
Abstract base class for all VYRA feeder handlers.
Every concrete handler must implement
dispatch()(async direct transport) andget_protocol(). Theemit()method bridges the Python logging pipeline todispatch()so that handlers can be attached directly to a Pythonlogging.Logger.Subclasses must not change the
emit→dispatchdelegation unless there is a specific reason (e.g.DBCommunicationHandlerlogs the formatted string rather than the raw message object).- Variables:
__handlerName__ – Human-readable handler identifier. Set in each subclass.
__doc__ – One-line description of what this handler transports.
activated (bool) – Whether this handler is active. Inactive handlers are skipped during dispatch without raising errors.
- Parameters:
args (Any)
kwargs (Any)
- abstractmethod async dispatch(message)[source]¶
Transport message over the backing protocol.
This is the primary entry-point for the feeder. The feeder calls
dispatchdirectly (bypassing the logging pipeline) when it already has a fully-formed domain object (e.g.StateEntry).
- abstractmethod get_protocol()[source]¶
Return the
ProtocolTypevalue this handler uses (e.g."ros2","zenoh", …).Using
stras return type keeps the module importable without the fullvyra_base.com.core.typesdependency tree.- Return type:
- get_handler_name()[source]¶
Return the handler’s human-readable name.
Defaults to
__handlerName__.- Return type:
- is_available()[source]¶
Check whether the backing transport is currently reachable.
The default implementation always returns
True. Override in transport handlers to probe the actual protocol connection.- Return type:
- emit(record)[source]¶
Bridge Python logging to
dispatch().Called by the Python logging framework. Schedules
dispatch()viaasyncio.get_event_loop()so that the synchronous logging call does not block the event loop.If
activatedisFalsethe record is silently discarded.Subclasses that need a different bridging strategy (e.g. formatting the record as a plain string for a database) should override this method directly.
- class vyra_base.com.handler.CommunicationHandler(initiator='', publisher=None, type=None)[source]¶
Bases:
IFeederHandlerAbstract base class for all communication handlers.
- Parameters:
initiator (str)
publisher (VyraPublisher)
type (type)
- __init__(initiator='', publisher=None, type=None)[source]¶
Initialise the CommunicationHandler.
- Parameters:
initiator (
str) – Optional initiator string (used in log messages).publisher (
VyraPublisher) – Optional pre-created publisher object.type (
type) – Optional message-type information.
- async dispatch(message)[source]¶
Transport message over the backing protocol.
Subclasses must override this method.
- Parameters:
message (
Any) – The message to transport.- Raises:
NotImplementedError – Always — subclasses must implement.
- Return type:
- get_protocol()[source]¶
Return the protocol identifier string.
Subclasses must override this method.
- Raises:
NotImplementedError – Always — subclasses must implement.
- Return type:
- class vyra_base.com.handler.ROS2Handler(initiator, publisher, type)[source]¶
Bases:
CommunicationHandlerROS2 communication handler
- Parameters:
initiator (str)
publisher (VyraPublisher)
type (Any)
- __init__(initiator, publisher, type)[source]¶
Initialize the ROS2Handler.
- Parameters:
initiator (
str) – The initiator of the handler.publisher (
VyraPublisher) – The publisher instance to use.type (
Any) – The ROS2 message type.
- class vyra_base.com.handler.ZenohHandler(initiator, publisher, type)[source]¶
Bases:
CommunicationHandlerZenoh transport handler
- Parameters:
initiator (str)
publisher (VyraPublisher)
type (Any)
- __init__(initiator, publisher, type)[source]¶
Initialise the CommunicationHandler.
- Parameters:
initiator (
str) – Optional initiator string (used in log messages).publisher (
VyraPublisher) – Optional pre-created publisher object.type (
Any) – Optional message-type information.
- class vyra_base.com.handler.RedisHandler(initiator, publisher, type)[source]¶
Bases:
CommunicationHandlerRedis pub/sub transport handler
- Parameters:
initiator (str)
publisher (VyraPublisher)
type (Any)
- __init__(initiator, publisher, type)[source]¶
Initialise the CommunicationHandler.
- Parameters:
initiator (
str) – Optional initiator string (used in log messages).publisher (
VyraPublisher) – Optional pre-created publisher object.type (
Any) – Optional message-type information.
- class vyra_base.com.handler.UDSHandler(initiator, publisher, type)[source]¶
Bases:
CommunicationHandlerUnix Domain Socket transport handler
- Parameters:
initiator (str)
publisher (VyraPublisher)
type (Any)
- __init__(initiator, publisher, type)[source]¶
Initialise the CommunicationHandler.
- Parameters:
initiator (
str) – Optional initiator string (used in log messages).publisher (
VyraPublisher) – Optional pre-created publisher object.type (
Any) – Optional message-type information.
- class vyra_base.com.handler.DBCommunicationHandler(database, source='DBCommunicationHandler')[source]¶
Bases:
CommunicationHandlerDatabase persistence handler
- Parameters:
database (Any)
source (str)
- __init__(database, source='DBCommunicationHandler')[source]¶
Initialise the CommunicationHandler.
- Parameters:
initiator (str) – Optional initiator string (used in log messages).
publisher (VyraPublisher, optional) – Optional pre-created publisher object.
type (type, optional) – Optional message-type information.
database (Any)
source (str)
- class vyra_base.com.handler.DatabaseWriter(*args, **kwargs)[source]¶
Bases:
ProtocolStructural protocol for database backends.
Any object that implements
async write(record: dict) -> Nonequalifies — no inheritance required.- __init__(*args, **kwargs)¶
- class vyra_base.com.handler.ErrorLogDatabaseHandler(database, model, field_definitions, *, max_rows=10000, activated=None, source='ErrorLogDatabaseHandler')[source]¶
Bases:
CommunicationHandlerGeneric database persistence handler for log records
- Parameters:
- __init__(database, model, field_definitions, *, max_rows=10000, activated=None, source='ErrorLogDatabaseHandler')[source]¶
Initialise the CommunicationHandler.
- Parameters:
initiator (str) – Optional initiator string (used in log messages).
publisher (VyraPublisher, optional) – Optional pre-created publisher object.
type (type, optional) – Optional message-type information.
database (DbAccess | None)
model (Type[Base])
max_rows (int)
activated (bool | None)
source (str)
- Return type:
None
- configure(database)[source]¶
Wire a
DbAccessand activate.Meant to be called once by
_activate_errorfeed_db_handler()after storage has been initialised.- Parameters:
database (
DbAccess) – ReadyDbAccessobject.- Return type:
- async dispatch(message)[source]¶
Validate and persist message to the database.
The message is expected to be a
dict(output of_prepare_entry_for_publish()) or an object with a__dict__. The method calls_build_record()to map the raw message to column values, validates them against_field_definitions, then writes with ID-rotation.
- class vyra_base.com.handler.FieldSpec(python_type=typing.Any, required=True, nullable=False)[source]¶
Bases:
objectDescribes a single column in the target log table.
- Parameters:
- class vyra_base.com.handler.HandlerFactory[source]¶
Bases:
objectFactory that creates
IFeederHandlerinstances for a givenProtocolType.All transport handlers use
create_publisher()to obtain theirVyraPublisher— the factory does not touch provider internals directly.- async static create(protocol, initiator, feeder_name, message_type, *, node=None, qos_profile=None, database=None, extra_kwargs=None)[source]¶
Create a handler for the given protocol.
Transport handler creation flow:
HandlerFactory.create(ProtocolType.ZENOH, ...) └─► InterfaceFactory.create_publisher(protocols=[ZENOH], ...) └─► ZenohProvider.create_publisher(...) ← t_zenoh/provider.py └─► ZenohPublisherImpl(...) ← t_zenoh/vyra_models/ └─► ZenohHandler(initiator, publisher, message_type)- Parameters:
protocol (
Any) – Transport protocol to use. One ofProtocolType.initiator (
str) – Name of the feeder owning this handler (used in log messages).feeder_name (
str) – Topic / service name forwarded toInterfaceFactory.create_publisher.message_type (
Any) – Message type for the publisher (e.g. ROS2 msg class, protobuf class, orNonefor dict-based protocols).node (
Optional[Any]) – ROS2 node (required only forProtocolType.ROS2).qos_profile (
Optional[Any]) – ROS2 QoS profile (optional, ROS2 only).database (
Optional[Any]) – Database connection/handler (required only forProtocolType.DATABASE/ DB handlers).extra_kwargs (
Optional[dict]) – Extra keyword arguments forwarded toInterfaceFactory.create_publisher.
- Returns:
A fully initialised handler instance ready to be added to a feeder.
- Return type:
- Raises:
ValueError – If an unsupported protocol is given.
RuntimeError – If the publisher cannot be created.
- class vyra_base.com.handler.VyraLogHandler(capacity=1000, max_message_length=10000)[source]¶
Bases:
HandlerLightweight in-memory ring-buffer that captures recent log records.
Each record is stored as a plain
dictso it can be JSON-serialised without extra effort. The buffer holds at most capacity entries; older entries are dropped automatically (FIFO).Usage:
handler = VyraLogHandler(capacity=1000) handler.setLevel(logging.DEBUG) logging.getLogger().addHandler(handler) # Later, retrieve entries: recent = handler.get_recent(limit=100)
- Parameters:
- __init__(capacity=1000, max_message_length=10000)[source]¶
Initializes the instance - basically setting the formatter to None and the filter list to empty.
- get_recent(limit=200, since_ts=None)[source]¶
Return up to limit most-recent log entries, ordered oldest-first.
- Parameters:
limit (
int) – Maximum number of entries to return.0or a negative value returns the entire buffer (subject to since_ts filter).since_ts (
Optional[float]) – Optional UNIX timestamp in seconds (float). When provided, only entries whoseseqvalue (millisecond epoch) corresponds to a time >= since_ts are returned. Entries from before this point are excluded.
- Returns:
List of dicts with keys
level,message,logger_name,timestamp,seq.- Return type:
Module contents¶
VYRA Base Communication Module (COM)
Multi-protocol communication system with automatic protocol selection and fallback.
Architecture¶
- Transport Layer (in-process / low-latency)
ROS2 · Zenoh · Redis · UDS
- External Layer (out-of-process / cross-service)
gRPC · MQTT · REST · WebSocket · Shared Memory
- Industrial Layer
Modbus · OPC UA
Public API¶
- Decorators (use on component methods):
@remote_service — expose a method as a callable service (request/response) @remote_publisher — expose a method as a publisher (fire-and-forget) @remote_subscriber — register a method as a message subscriber @remote_actionServer — expose a method group as an action server (goal/feedback/result)
- Factory:
InterfaceFactory — create server/client/publisher/subscriber/action objects
Example:
from vyra_base.com import remote_service, remote_publisher, InterfaceFactory, ProtocolType
class MyComponent:
@remote_service(name="ping", protocols=[ProtocolType.ZENOH])
async def ping(self, request, response=None):
return {"pong": True}
- exception vyra_base.com.CommunicationError(message, details=None, original_exception=None)[source]¶
Bases:
ExceptionBase exception for all communication errors.
- message¶
Error description
- details¶
Optional additional error context
- original_exception¶
Original exception if wrapped
Bases:
ProtocolErrorRaised when a requested protocol is not available or not installed.
- exception vyra_base.com.ProtocolNotInitializedError(protocol, component=None)[source]¶
Bases:
ProtocolErrorRaised when trying to use a protocol that hasn’t been initialized.
- exception vyra_base.com.TransportError(message, details=None, original_exception=None)[source]¶
Bases:
CommunicationErrorBase exception for transport layer errors.
- exception vyra_base.com.ProviderError(message, details=None, original_exception=None)[source]¶
Bases:
CommunicationErrorBase exception for provider-related errors.
- exception vyra_base.com.ProviderNotFoundError(provider_name, protocol=None)[source]¶
Bases:
ProviderErrorRaised when requested provider is not registered.
- exception vyra_base.com.ProviderRegistrationError(provider_name, reason)[source]¶
Bases:
ProviderErrorRaised when provider registration fails.
- exception vyra_base.com.InterfaceError(message, details=None, original_exception=None)[source]¶
Bases:
CommunicationErrorBase exception for interface creation/management errors.
- exception vyra_base.com.TServerError(message, details=None, original_exception=None)[source]¶
Bases:
InterfaceErrorRaised when transport server creation or invocation fails.
- exception vyra_base.com.TSubscriberError(message, details=None, original_exception=None)[source]¶
Bases:
InterfaceErrorRaised when transport subscriber creation or subscription fails.
- exception vyra_base.com.ActionServerError(message, details=None, original_exception=None)[source]¶
Bases:
InterfaceErrorRaised when actionServer creation or execution fails.
- class vyra_base.com.ProtocolType(*values)[source]¶
-
Supported communication protocols.
- ROS2 = 'ros2'¶
- ZENOH = 'zenoh'¶
- REDIS = 'redis'¶
- UDS = 'uds'¶
- SHARED_MEMORY = 'sharedmemory'¶
- MQTT = 'mqtt'¶
- GRPC = 'grpc'¶
- REST = 'rest'¶
- WEBSOCKET = 'websocket'¶
- MODBUS = 'modbus'¶
- OPCUA = 'opcua'¶
- class vyra_base.com.InterfaceType(*values)[source]¶
-
VYRA communication interface types.
- PUBLISHER = 'publisher'¶
- SUBSCRIBER = 'subscriber'¶
- SERVER = 'server'¶
- CLIENT = 'client'¶
- ACTION_SERVER = 'actionServer'¶
- ACTION_CLIENT = 'actionClient'¶
- class vyra_base.com.AccessLevel(*values)[source]¶
-
Access control levels for interfaces.
- PUBLIC = 'public'¶
- PROTECTED = 'protected'¶
- PRIVATE = 'private'¶
- INTERNAL = 'internal'¶
- class vyra_base.com.ActionStatus(*values)[source]¶
Bases:
EnumStatus values for action goals (ROS2 Action compatible).
Used to track the lifecycle state of action server goals.
- UNKNOWN = 0¶
- ACCEPTED = 1¶
- EXECUTING = 2¶
- CANCELING = 3¶
- SUCCEEDED = 4¶
- CANCELED = 5¶
- ABORTED = 6¶
- class vyra_base.com.VyraPublisher(name, topic_builder, protocol=ProtocolType.ROS2, **kwargs)[source]¶
Bases:
VyraTransportPublisher interface for one-way message publishing.
Represents publish-only communication (no callbacks): - ROS2: Topic Publisher - Zenoh: Publisher - Redis: Pub/Sub Publisher - UDS: Datagram Socket Sender
- Parameters:
name (str)
topic_builder (TopicBuilder)
protocol (ProtocolType)
- __init__(name, topic_builder, protocol=ProtocolType.ROS2, **kwargs)[source]¶
- Parameters:
name (str)
topic_builder (TopicBuilder)
protocol (ProtocolType)
- topic_builder: TopicBuilder¶
- class vyra_base.com.VyraSubscriber(name, topic_builder, subscriber_callback=None, protocol=ProtocolType.ROS2, **kwargs)[source]¶
Bases:
VyraTransportSubscriber interface for receiving messages with callback.
Represents subscribe-only communication: - ROS2: Topic Subscriber - Zenoh: Subscriber - Redis: Pub/Sub Subscriber - UDS: Datagram Socket Receiver
- Parameters:
name (str)
topic_builder (TopicBuilder)
subscriber_callback (Callable | None)
protocol (ProtocolType)
- __init__(name, topic_builder, subscriber_callback=None, protocol=ProtocolType.ROS2, **kwargs)[source]¶
- Parameters:
name (str)
topic_builder (TopicBuilder)
subscriber_callback (Callable | None)
protocol (ProtocolType)
- topic_builder: TopicBuilder¶
- class vyra_base.com.VyraServer(name, topic_builder, response_callback=None, protocol=ProtocolType.ROS2, **kwargs)[source]¶
Bases:
VyraTransportServer interface for request-response communication.
Represents service server: - ROS2: Service Server - Zenoh: Queryable - Redis: Request-Response Pattern - UDS: Stream Socket RPC Server
- Parameters:
name (str)
topic_builder (TopicBuilder)
protocol (ProtocolType)
- __init__(name, topic_builder, response_callback=None, protocol=ProtocolType.ROS2, **kwargs)[source]¶
- Parameters:
name (str)
topic_builder (TopicBuilder)
protocol (ProtocolType)
- topic_builder: TopicBuilder¶
- class vyra_base.com.VyraClient(name, topic_builder, request_callback=None, protocol=ProtocolType.ROS2, **kwargs)[source]¶
Bases:
VyraTransportClient interface for request-response communication.
Represents service client: - ROS2: Service Client - Zenoh: Query Client - Redis: Request-Response Pattern - UDS: Stream Socket RPC Client
- Parameters:
name (str)
topic_builder (TopicBuilder)
protocol (ProtocolType)
- __init__(name, topic_builder, request_callback=None, protocol=ProtocolType.ROS2, **kwargs)[source]¶
- Parameters:
name (str)
topic_builder (TopicBuilder)
protocol (ProtocolType)
- topic_builder: TopicBuilder¶
- class vyra_base.com.VyraActionServer(name, topic_builder, handle_goal_request=None, handle_cancel_request=None, execution_callback=None, protocol=ProtocolType.ROS2, **kwargs)[source]¶
Bases:
VyraTransportAction Server interface for long-running tasks.
Represents async task server with progress feedback: - ROS2: Action Server - Zenoh: Callable + Publishers (control/feedback/result) - Redis: State tracking with Pub/Sub - UDS: Stream-based state messages
- Parameters:
- __init__(name, topic_builder, handle_goal_request=None, handle_cancel_request=None, execution_callback=None, protocol=ProtocolType.ROS2, **kwargs)[source]¶
- topic_builder: TopicBuilder¶
- class vyra_base.com.VyraActionClient(name, topic_builder, direct_response_callback=None, feedback_callback=None, goal_callback=None, protocol=ProtocolType.ROS2, **kwargs)[source]¶
Bases:
VyraTransportAction Client interface for long-running tasks.
Represents async task client: - ROS2: Action Client - Zenoh: Query + Subscribers (control/feedback/result) - Redis: State tracking with Pub/Sub - UDS: Stream-based state messages
- Parameters:
- __init__(name, topic_builder, direct_response_callback=None, feedback_callback=None, goal_callback=None, protocol=ProtocolType.ROS2, **kwargs)[source]¶
- topic_builder: TopicBuilder¶
- class vyra_base.com.IServiceHandler[source]¶
Bases:
ABCAbstract interface for service (request-response) handlers.
Implement this interface for services that handle single request-response interactions like ROS2 Services, gRPC unary calls, REST endpoints.
Example
- class CalculateHandler(IServiceHandler):
- async def handle_request(self, request):
result = request.x + request.y return {“result”: result}
- abstractmethod async handle_request(request)[source]¶
Handle incoming service request.
- Parameters:
request (
Any) – Request data/object- Return type:
- Returns:
Response data (dict or response object)
Example
- async def handle_request(self, request):
# Process request result = process(request.data) # Return response return {“result”: result, “status”: “success”}
- class vyra_base.com.IActionHandler[source]¶
Bases:
ABCAbstract interface for action (long-running task) handlers.
REQUIRED for all ActionServer implementations. Defines the three-phase lifecycle of action goals:
on_goal: Accept or reject incoming goals
execute: Main execution logic with feedback
on_cancel: Handle cancellation requests
Example
- class BatchProcessorHandler(IActionHandler):
- async def on_goal(self, goal):
return goal.count <= 100
- async def execute(self, handle):
- for i in range(handle.goal.count):
- if handle.is_cancel_requested():
handle.canceled() return {“processed”: i}
process_item(i) handle.publish_feedback({“progress”: i+1})
handle.succeed() return {“processed”: handle.goal.count}
- async def on_cancel(self):
return True
- abstractmethod async on_goal(goal)[source]¶
Called when a new goal is received.
Decide whether to accept or reject the goal based on its parameters, current system state, resource availability, etc.
- Parameters:
goal (
Any) – Goal request data- Returns:
True to accept, False to reject
- Return type:
Example
- async def on_goal(self, goal):
# Check if goal is valid if goal.count <= 0 or goal.count > 1000:
logger.warning(f”Rejecting invalid goal count: {goal.count}”) return False
# Check system resources if self.is_busy():
logger.info(“Rejecting goal: system busy”) return False
return True
- abstractmethod async execute(handle)[source]¶
Execute the goal and return result.
This is the main execution logic. Use the goal_handle to: - Access goal parameters via handle.goal - Publish feedback via handle.publish_feedback() - Check cancellation via handle.is_cancel_requested() - Set final status via handle.succeed()/abort()/canceled()
- Parameters:
handle (
IGoalHandle) – Goal handle for accessing goal data and publishing feedback- Returns:
Result data to return to client
- Return type:
Example
- async def execute(self, handle):
total = handle.goal.count
- for i in range(total):
# Check for cancellation if handle.is_cancel_requested():
handle.canceled() return {
“processed”: i, “status”: “CANCELED” # ActionStatus.CANCELED from vyra_base.com.core.types
}
# Do work await self.process_item(i)
# Publish feedback await handle.publish_feedback({
“progress”: i + 1, “total”: total, “percent”: (i + 1) / total * 100
})
# Success handle.succeed() return {
“processed”: total, “status”: “SUCCEEDED” # ActionStatus.SUCCEEDED from vyra_base.com.core.types
}
- abstractmethod async on_cancel()[source]¶
Called when cancellation is requested.
Decide whether to accept the cancellation request. Some goals may not be cancellable (e.g., already near completion, critical operations).
- Returns:
True to accept cancellation, False to reject
- Return type:
Example
- async def on_cancel(self):
# Check if safe to cancel if self.in_critical_section():
logger.warning(“Cannot cancel: in critical section”) return False
logger.info(“Accepting cancellation request”) return True
- class vyra_base.com.IGoalHandle[source]¶
Bases:
ABCAbstract interface for action goal handles.
Provides methods for: - Publishing feedback during execution - Checking cancellation requests - Setting goal status (succeed/abort/cancel)
- abstractmethod async publish_feedback(feedback)[source]¶
Publish feedback about goal progress.
Example:
await goal_handle.publish_feedback({ "progress": 50, "message": "Processing..." })
- class vyra_base.com.GoalHandle(goal_id, goal, feedback_fn)[source]¶
Bases:
IGoalHandleTransport-agnostic goal handle passed to execution callbacks.
This is the main communication object between the transport layer and the application. All action servers create a
GoalHandlefor each accepted goal and pass it to the user’sexecution_callback.Implements the
IGoalHandleinterface so user code typed against the abstract interface works seamlessly with the concrete implementation.Example usage inside an execution_callback:
async def my_execute(goal_handle: GoalHandle): for i in range(10): await goal_handle.publish_feedback({'progress': i * 10}) if goal_handle.is_cancel_requested(): goal_handle.canceled() return {'status': 'canceled'} goal_handle.succeed() return {'done': True}
- set_canceled()[source]¶
Alias for
canceled().- Return type:
- class vyra_base.com.InterfaceFactory[source]¶
Bases:
objectFactory for creating communication interfaces with protocol fallback.
Features: - Automatic protocol selection - Fallback chain (ROS2 → SharedMemory → UDS → Redis → gRPC) - Protocol availability checking - Graceful degradation
Example
>>> # Auto-select best available protocol >>> server = await InterfaceFactory.create_server( ... "my_service", ... callback=handle_request ... ) >>> >>> # Explicit protocol with fallback >>> server = await InterfaceFactory.create_server( ... "my_service", ... protocols=[ProtocolType.ROS2, ProtocolType.SHARED_MEMORY], ... callback=handle_request ... ) >>> >>> # Publisher for pub/sub >>> publisher = await InterfaceFactory.create_publisher( ... "temperature", ... protocols=[ProtocolType.REDIS, ProtocolType.MQTT] ... )
- PUBLISHER_FALLBACK = [ProtocolType.ZENOH, ProtocolType.ROS2, ProtocolType.REDIS, ProtocolType.UDS]¶
- SUBSCRIBER_FALLBACK = [ProtocolType.ZENOH, ProtocolType.ROS2, ProtocolType.REDIS, ProtocolType.UDS]¶
- SERVER_FALLBACK = [ProtocolType.ZENOH, ProtocolType.ROS2, ProtocolType.REDIS, ProtocolType.UDS]¶
- CLIENT_FALLBACK = [ProtocolType.ZENOH, ProtocolType.ROS2, ProtocolType.REDIS, ProtocolType.UDS]¶
- ACTION_SERVER_FALLBACK = [ProtocolType.ZENOH, ProtocolType.ROS2, ProtocolType.REDIS, ProtocolType.UDS]¶
- ACTION_CLIENT_FALLBACK = [ProtocolType.ZENOH, ProtocolType.ROS2, ProtocolType.REDIS, ProtocolType.UDS]¶
- static loop_check_pending()[source]¶
Check pending interfaces and initialize if callbacks are now available. Should be called periodically (e.g. in main loop) to handle late callback registration.
- static register_provider(provider)[source]¶
Register one or more protocol providers.
- Parameters:
provider (
Union[AbstractProtocolProvider,list]) – Single provider instance or list of providers- Return type:
- static unregister_provider(protocol)[source]¶
Unregister a protocol provider.
- Parameters:
protocol (
ProtocolType) – Protocol type to unregister- Return type:
- async static create_publisher(name, protocols=None, **kwargs)[source]¶
Create Publisher with automatic protocol selection.
Only the own (default) provider is used.
- Parameters:
name (
str) – Topic/channel nameprotocols (
Optional[List[ProtocolType]]) – Preferred protocols (fallback if not specified)**kwargs – Additional parameters (message_type, qos, etc.)
- Returns:
Initialized publisher
- Return type:
- Raises:
InterfaceError – If no protocol available
- async static create_subscriber(name, subscriber_callback, protocols=None, module_id=None, module_name=None, **kwargs)[source]¶
Create Subscriber with automatic protocol selection.
- Parameters:
name (
str) – Topic/channel namesubscriber_callback (
Callable) – Async callback for received messagesprotocols (
Optional[List[ProtocolType]]) – Preferred protocols (fallback if not specified)module_id (
Optional[str]) – Optional target module ID. When provided the registry looks up (or creates) a provider scoped to that module.module_name (
Optional[str]) – Human-readable name of the target module (required when module_id is supplied).**kwargs – Additional parameters (message_type, qos, etc.)
- Returns:
Initialized subscriber or None if pending
- Return type:
- Raises:
InterfaceError – If no protocol available
- async static create_server(name, response_callback, protocols=None, **kwargs)[source]¶
Create Server with automatic protocol selection.
Only the own (default) provider is used. module_id / module_name are determined by the provider that was registered without a module_id.
- Parameters:
- Returns:
Initialized server or None if pending
- Return type:
- Raises:
InterfaceError – If no protocol available
- async static create_client(name, protocols=None, module_id=None, module_name=None, **kwargs)[source]¶
Create Client with automatic protocol selection.
- Parameters:
name (
str) – Service nameprotocols (
Optional[List[ProtocolType]]) – Preferred protocols (fallback if not specified)module_id (
Optional[str]) – Optional target module ID. When provided the registry looks up (or creates) a provider scoped to that module.module_name (
Optional[str]) – Human-readable name of the target module (required when module_id is supplied).**kwargs – Additional parameters (service_type, qos, etc.)
- Returns:
Initialized client
- Return type:
- Raises:
InterfaceError – If no protocol available
- async static create_action_server(name, handle_goal_request, handle_cancel_request, execution_callback, protocols=None, **kwargs)[source]¶
Create Action Server with automatic protocol selection.
Only the own (default) provider is used.
- Parameters:
name (
str) – Action namehandle_goal_request (
Optional[Callable]) – Async callback to accept/reject goalshandle_cancel_request (
Optional[Callable]) – Async callback for cancellationsexecution_callback (
Optional[Callable]) – Async callback for goal executionprotocols (
Optional[List[ProtocolType]]) – Preferred protocols (fallback if not specified)**kwargs – Additional parameters (action_type, qos, etc.)
- Returns:
Initialized action server or None if pending
- Return type:
- Raises:
InterfaceError – If no protocol available
- async static create_action_client(name, direct_response_callback=None, feedback_callback=None, goal_callback=None, protocols=None, module_id=None, module_name=None, **kwargs)[source]¶
Create Action Client with automatic protocol selection.
- Parameters:
name (
str) – Action namedirect_response_callback (
Optional[Callable]) – Async callback for goal acceptancefeedback_callback (
Optional[Callable]) – Async callback for feedbackgoal_callback (
Optional[Callable]) – Async callback for final resultprotocols (
Optional[List[ProtocolType]]) – Preferred protocols (fallback if not specified)module_id (
Optional[str]) – Optional target module ID. When provided the registry looks up (or creates) a provider scoped to that module.module_name (
Optional[str]) – Human-readable name of the target module (required when module_id is supplied).**kwargs – Additional parameters (action_type, qos, etc.)
- Returns:
Initialized action client or None if pending
- Return type:
- Raises:
InterfaceError – If no protocol available
- static get_available_protocols(interface_type, protocols)[source]¶
Customize fallback chain for interface type.
- Parameters:
interface_type (
str) – “server”, “publisher”, or “actionServer”protocols (
List[ProtocolType]) – Ordered list of protocols to try
- Return type:
Example
>>> # Prioritize SharedMemory over ROS2 >>> InterfaceFactory.set_fallback_chain( ... "server", ... [ProtocolType.SHARED_MEMORY, ProtocolType.ROS2, ProtocolType.UDS] ... )
- async static create_from_blueprint(blueprint, **override_kwargs)[source]¶
Create interface from a HandlerBlueprint.
This is the primary method for two-phase initialization: 1. Blueprint created during decoration/configuration 2. Interface created when blueprint is bound with callback
- Parameters:
blueprint (
HandlerBlueprint) – HandlerBlueprint instance**override_kwargs – Override blueprint metadata
- Return type:
Union[VyraServer,VyraPublisher,VyraSubscriber,VyraActionServer,None]- Returns:
Created interface or None if pending (callback not bound yet)
Example
>>> from vyra_base.com.core.blueprints import ServiceBlueprint >>> blueprint = ServiceBlueprint(name="my_service") >>> # ... later when callback available ... >>> blueprint.bind_callback(my_callback) >>> server = await InterfaceFactory.create_from_blueprint(blueprint)
- async static bind_pending_callback(name, callback=None, handle_goal_request=None, handle_cancel_request=None, execution_callback=None)[source]¶
Bind one or more callbacks to a pending interface and initialize it.
For servers/subscribers, use the
callbackparameter. For action servers, use the dedicatedhandle_goal_request,handle_cancel_requestandexecution_callbackparameters. If a parameter isNone, the previously stored value is kept.- Parameters:
name (
str) – Interface name (must match pending registration)callback (
Optional[Callable]) – Callback for server (response_callback) or subscriber (subscriber_callback)handle_goal_request (
Optional[Callable]) – Action server goal-accept callbackhandle_cancel_request (
Optional[Callable]) – Action server cancel callbackexecution_callback (
Optional[Callable]) – Action server execution callback
- Return type:
Union[VyraServer,VyraSubscriber,VyraActionServer,None]- Returns:
Initialized interface or None if not found in pending
Example
>>> # Phase 1: Create server without callback >>> await InterfaceFactory.create_server("my_service", response_callback=None) >>> # Phase 2: Bind callback later >>> server = await InterfaceFactory.bind_pending_callback( ... "my_service", callback=my_callback_function ... ) >>> >>> # Action server – bind all three callbacks >>> action_srv = await InterfaceFactory.bind_pending_callback( ... "my_action", ... handle_goal_request=on_goal, ... handle_cancel_request=on_cancel, ... execution_callback=execute, ... )
- async static process_pending_interfaces()[source]¶
Process all pending interfaces, attempting to initialize those with callbacks.
This should be called periodically (e.g., from entity event loop) or after binding callbacks via CallbackRegistry.
- Return type:
- Returns:
Dictionary mapping interface names to initialization success status
Example
>>> # In entity event loop >>> while running: ... results = await InterfaceFactory.process_pending_interfaces() ... await asyncio.sleep(1.0)
- vyra_base.com.remote_service(name=None, protocols=None, auto_register=True, namespace=None, **kwargs)[source]¶
Decorator for request/response communication (ROS2 Service, gRPC, etc.)
Creates a ServiceBlueprint and registers it in CallbackRegistry. The decorated method will be bound to the blueprint during component initialization.
- Parameters:
name (
Optional[str]) – Service name (defaults to function name)protocols (
Optional[List[ProtocolType]]) – Preferred protocols (will use factory fallback if None)auto_register (
bool) – Whether to auto-register blueprint (default True)namespace (
Optional[str]) – Optional module namespace for blueprint registration**kwargs – Additional metadata (service_type, qos, etc.)
Example
>>> class MyComponent(OperationalStateMachine): ... @remote_service() ... async def calculate(self, request, response=None): ... result = request["x"] + request["y"] ... return {"result": result} ... ... @remote_service(protocols=[ProtocolType.ROS2, ProtocolType.ZENOH]) ... async def process_data(self, request, response=None): ... # This method prioritizes ROS2, falls back to Zenoh ... return {"status": "processed"}
Note
Methods must accept (request, response=None) signature
Response parameter maintained for ROS2 compatibility
Async methods recommended (sync methods wrapped automatically)
Blueprint created during decoration, callback bound during initialization
- vyra_base.com.remote_publisher(name=None, protocols=None, auto_register=True, namespace=None, **kwargs)[source]¶
Decorator for pub/sub communication (publisher side).
Creates a PublisherBlueprint and registers it in CallbackRegistry. The decorated method becomes a publisher interface.
- Parameters:
Example
>>> class Component: ... @remote_publisher(protocols=[ProtocolType.REDIS]) ... async def publish_status(self, message): ... '''This method will become a publisher''' ... pass ... ... async def some_task(self): ... # Publish status update ... await self.publish_status({"state": "running"})
Note
Publisher creation happens via InterfaceFactory during binding
The decorated method is replaced with actual publishing logic
- vyra_base.com.remote_subscriber(name=None, protocols=None, auto_register=True, namespace=None, **kwargs)[source]¶
Decorator for subscription callbacks (subscriber side of pub/sub).
Creates a SubscriberBlueprint and registers it in CallbackRegistry. The decorated method will be called when messages are received on the topic.
- Parameters:
Example
>>> class Component: ... @remote_subscriber(topic="/status") ... async def on_status_update(self, message): ... logger.info(f"Received status: {message}")
Note
Subscriber is automatically created and bound during registration
The decorated method is called for each received message
- vyra_base.com.get_decorated_methods(obj)[source]¶
Get all methods decorated with communication decorators.
Scans an object for decorated methods and returns them organized by type. Also retrieves associated blueprints from CallbackRegistry.
- Parameters:
obj (
Any) – Object to inspect (typically a component instance)- Returns:
- {
“servers”: […], # @remote_service methods “publishers”: […], # @remote_publisher methods “subscribers”: […], # @remote_subscriber methods “actions”: […] # @remote_actionServer methods
}
- Each entry contains: {
“name”: str, # Interface name “method”: Callable, # Bound method “protocols”: List, # Protocol preferences “kwargs”: dict, # Additional metadata “blueprint”: Blueprint # Associated blueprint (if registered)
}
- Return type:
Example
>>> component = MyComponent() >>> methods = get_decorated_methods(component) >>> print(f"Found {len(methods['callables'])} services") >>> for svc in methods['callables']: ... if svc['blueprint'] and not svc['blueprint'].is_bound(): ... svc['blueprint'].bind_callback(svc['method'])
- vyra_base.com.bind_decorated_callbacks(component, namespace=None, force_rebind=False)[source]¶
Bind all decorated methods from a component to their registered blueprints.
This is the Phase 2 of the two-phase initialization, typically called during component initialization (e.g., in Component.__init__ or set_interfaces()).
- Parameters:
- Return type:
- Returns:
Dictionary mapping interface names to binding success status
Example
>>> component = MyComponent() >>> results = bind_decorated_callbacks(component, namespace="v2_modulemanager") >>> print(f"Successfully bound {sum(results.values())} interfaces") >>> failed = [name for name, success in results.items() if not success] >>> if failed: ... logger.warning(f"Failed to bind: {failed}")
- class vyra_base.com.HandlerBlueprint(name, interface_type, protocols=<factory>, metadata=<factory>, _callback=None)[source]¶
Bases:
ABCBase blueprint for all communication interfaces.
A blueprint represents the definition of an interface (what it should be), separate from its implementation (the callback that handles requests).
- Parameters:
- name¶
Unique identifier for this interface
- interface_type¶
Type of communication pattern
- protocols¶
Preferred transport protocols (with fallback)
- metadata¶
Additional configuration (from JSON or decorator)
- _callback¶
Implementation function (bound during phase 2)
Example
>>> blueprint = ServiceBlueprint( ... name="calculate", ... protocols=[ProtocolType.ROS2], ... metadata={"qos": 10} ... ) >>> blueprint.is_bound() False >>> blueprint.bind_callback(my_calculate_function) >>> blueprint.is_bound() True
- interface_type: InterfaceType¶
- protocols: List[ProtocolType]¶
- bind_callback(callback)[source]¶
Bind a callback implementation to this blueprint.
- Parameters:
callback (
Callable) – Function to handle requests/events- Raises:
ValueError – If callback signature is invalid
RuntimeError – If already bound
- Return type:
- class vyra_base.com.ServiceBlueprint(name, protocols=None, metadata=None, service_type=None)[source]¶
Bases:
HandlerBlueprintBlueprint for request/response communication (ROS2 Service, gRPC, etc.)
- Expected callback signature:
async def handler(request, response=None) -> Union[dict, response_type] or def handler(request, response=None) -> Union[dict, response_type]
Example
>>> @remote_service() ... async def calculate(self, request, response=None): ... result = request["x"] + request["y"] ... return {"result": result}
- Parameters:
- class vyra_base.com.PublisherBlueprint(name, protocols=None, metadata=None, message_type=None)[source]¶
Bases:
HandlerBlueprintBlueprint for pub/sub communication (ROS2 Topic, Redis, MQTT, etc.)
Note: Publishers don’t typically have callbacks - they’re used to send data. The callback here is for the publish() method wrapper.
- Expected callback signature:
async def publisher(self, message: dict) -> None or def publisher(self, message: dict) -> None
Example
>>> @remote_publisher ... async def publish_status(self, message): ... # Actual publishing handled by transport ... pass
- Parameters:
- class vyra_base.com.SubscriberBlueprint(name, protocols=None, metadata=None, message_type=None)[source]¶
Bases:
HandlerBlueprintBlueprint for subscription callbacks (receives published data)
- Expected callback signature:
async def handler(self, message) -> None or def handler(self, message) -> None
Example
>>> @remote_subscriber ... async def on_status_update(self, message): ... logger.info(f"Received: {message}")
- Parameters:
- class vyra_base.com.ActionBlueprint(name, protocols=None, metadata=None, action_type=None)[source]¶
Bases:
HandlerBlueprintBlueprint for ActionServer with multiple lifecycle callbacks.
REQUIRED callbacks: - execute: Main execution (required) - on_goal: Accept/reject goal (optional, default: accept) - on_cancel: Accept/reject cancel (optional, default: accept)
- The goal_handle provides:
goal_handle.goal: The goal request data
goal_handle.publish_feedback(feedback): Send progress updates
goal_handle.is_cancel_requested(): Check for cancellation
goal_handle.succeed()/abort()/canceled(): Set final status
Example
>>> @remote_actionServer.on_goal(name="process_batch") ... async def accept_goal(self, goal_request): ... return goal_request.count <= 100 ... >>> @remote_actionServer.on_cancel(name="process_batch") ... async def cancel_batch(self, goal_handle): ... return True ... >>> @remote_actionServer.execute(name="process_batch") ... async def execute_batch(self, goal_handle): ... for i in range(goal_handle.goal.count): ... if goal_handle.is_cancel_requested(): ... goal_handle.canceled() ... return {"processed": i} ... process_item(i) ... goal_handle.publish_feedback({"progress": i+1}) ... goal_handle.succeed() ... return {"processed": goal_handle.goal.count}
- Parameters:
- bind_callback(callback, callback_type='execute')[source]¶
Bind a specific callback type.
- Parameters:
- Raises:
ValueError – If callback_type is invalid
RuntimeError – If callback already bound for this type
- Return type:
- bind_callbacks(**callbacks)[source]¶
Bind multiple callbacks at once.
- Return type:
Example
- blueprint.bind_callbacks(
on_goal=handle_goal, on_cancel=handle_cancel, execute=execute_task
)
- class vyra_base.com.CallbackRegistry[source]¶
Bases:
objectThread-safe global registry for handler blueprints.
Features: - Per-module namespacing via path prefixes - Late binding support (register blueprint, bind callback later) - Thread-safe operations - Debugging tools (list unbound, statistics)
The registry uses a singleton pattern via class methods.
- classmethod register_blueprint(blueprint, namespace=None, override=False)[source]¶
Register a blueprint in the registry.
- Parameters:
blueprint (
Union[ServiceBlueprint,PublisherBlueprint,SubscriberBlueprint,ActionBlueprint]) – Blueprint to registernamespace (
Optional[str]) – Optional module namespace (e.g., “v2_modulemanager”)override (
bool) – If True, replace existing blueprint with same name
- Return type:
- Returns:
Fully qualified name (with namespace if provided)
- Raises:
ValueError – If blueprint with same name already exists and override=False
Example
>>> blueprint = ServiceBlueprint(name="initialize", ...) >>> full_name = CallbackRegistry.register_blueprint( ... blueprint, namespace="v2_modulemanager" ... ) >>> print(full_name) 'v2_modulemanager/initialize'
- classmethod get_blueprint(name, namespace=None)[source]¶
Retrieve a blueprint by name.
- Parameters:
- Return type:
Union[ServiceBlueprint,PublisherBlueprint,SubscriberBlueprint,ActionBlueprint,None]- Returns:
Blueprint if found, None otherwise
Example
>>> # With explicit namespace >>> bp = CallbackRegistry.get_blueprint("initialize", namespace="v2_modulemanager")
>>> # With fully qualified name >>> bp = CallbackRegistry.get_blueprint("v2_modulemanager/initialize")
- classmethod bind_callback(name, callback, callback_type='default', namespace=None)[source]¶
Bind a callback to a registered blueprint.
For ActionBlueprint, callback_type specifies which lifecycle callback to bind (‘on_goal’, ‘on_cancel’, ‘execute’). For other blueprints, use ‘default’.
- Parameters:
- Return type:
- Returns:
True if binding succeeded, False if blueprint not found
- Raises:
RuntimeError – If blueprint already has a bound callback
ValueError – If callback signature is invalid or callback_type invalid
Example
>>> # Service binding >>> async def my_service(request, response=None): ... return {"status": "ok"} >>> CallbackRegistry.bind_callback("my_service", my_service) True
>>> # ActionServer binding (multi-callback) >>> async def execute_action(goal_handle): ... return {"result": "done"} >>> CallbackRegistry.bind_callback( ... "process", execute_action, callback_type='execute' ... ) True
- classmethod unbind_callback(name, namespace=None)[source]¶
Remove callback binding from a blueprint.
- classmethod list_all(namespace=None, interface_type=None)[source]¶
List all registered blueprint names.
- classmethod list_unbound(namespace=None)[source]¶
List blueprints that don’t have callbacks bound yet.
Useful for debugging initialization issues.
- Parameters:
- Return type:
- Returns:
List of unbound blueprint names
Example
>>> unbound = CallbackRegistry.list_unbound() >>> if unbound: ... logger.warning(f"Unbound interfaces: {unbound}")
- classmethod get_statistics(namespace=None)[source]¶
Get registry statistics.
- Parameters:
- Return type:
- Returns:
Dictionary with counts by type and binding status
Example
>>> stats = CallbackRegistry.get_statistics() >>> print(stats) {'total': 15, 'bound': 12, 'unbound': 3, 'services': 8, ...}
- class vyra_base.com.AbstractProtocolProvider(protocol)[source]¶
Bases:
ABCAbstract base class for all protocol providers.
Each protocol (ROS2, Shared Memory, MQTT, etc.) implements this interface to provide consistent publisher/subscriber/server/client/actionServer/actionClient creation across transports.
- Parameters:
protocol (ProtocolType)
- __init__(protocol)[source]¶
- Parameters:
protocol (ProtocolType)
- abstractmethod async check_availability()[source]¶
Check if protocol is available (libraries installed, services running).
- Returns:
True if protocol is available
- Return type:
- abstractmethod async shutdown()[source]¶
Shutdown the protocol provider and cleanup resources.
- Return type:
- abstractmethod async create_publisher(name, **kwargs)[source]¶
Create a publisher interface.
- Parameters:
name (
str) – Publisher name**kwargs – Protocol-specific parameters (message_type, qos, etc.)
- Returns:
Created publisher interface
- Return type:
- abstractmethod async create_subscriber(name, subscriber_callback, **kwargs)[source]¶
Create a subscriber interface.
- Parameters:
- Returns:
Created subscriber interface
- Return type:
- abstractmethod async create_server(name, response_callback, **kwargs)[source]¶
Create a server interface.
- Parameters:
- Returns:
Created server interface
- Return type:
- abstractmethod async create_client(name, **kwargs)[source]¶
Create a client interface.
- Parameters:
name (
str) – Client name**kwargs – Protocol-specific parameters (service_type, qos, etc.)
- Returns:
Created client interface
- Return type:
- abstractmethod async create_action_server(name, handle_goal_request=None, handle_cancel_request=None, execution_callback=None, **kwargs)[source]¶
Create an action server interface.
- Parameters:
name (
str) – Action server namehandle_goal_request (
Optional[Callable]) – Callback to accept/reject goals (async)handle_cancel_request (
Optional[Callable]) – Callback to handle cancellations (async)execution_callback (
Optional[Callable]) – Callback for goal execution (async)**kwargs – Protocol-specific parameters (action_type, qos, etc.)
- Returns:
Created action server interface
- Return type:
- abstractmethod async create_action_client(name, direct_response_callback=None, feedback_callback=None, goal_callback=None, **kwargs)[source]¶
Create an action client interface.
- Parameters:
name (
str) – Action client namedirect_response_callback (
Optional[Callable]) – Callback for goal acceptance (async)feedback_callback (
Optional[Callable]) – Callback for feedback updates (async)goal_callback (
Optional[Callable]) – Callback for final result (async)**kwargs – Protocol-specific parameters (action_type, qos, etc.)
- Returns:
Created action client interface
- Return type:
- create_topic_builder(module_name, module_id)[source]¶
Create a TopicBuilder for consistent naming conventions.
- Parameters:
- Return type:
- Returns:
TopicBuilder instance configured with module info
- class vyra_base.com.ProviderRegistry[source]¶
Bases:
objectCentral registry for all protocol providers.
Manages provider lifecycle, discovery, and factory methods. Thread-safe singleton pattern.
Providers are keyed by (ProtocolType, module_id). “Own” / default providers are registered with module_id=None. Remote-module providers are registered with their target module_id.
- register_provider(provider, module_id=None, force=False)[source]¶
Register a protocol provider.
- Parameters:
provider (
AbstractProtocolProvider) – Provider instance to registermodule_id (
Optional[str]) – Target module-ID this provider is configured for. Use None (default) for the “own” provider that publishes / serves on behalf of the local module.force (
bool) – If True, replace existing provider for the same key
- Return type:
- get_provider(protocol, module_id=None, require_available=True)[source]¶
Get provider for a (protocol, module_id) pair.
Lookup order: 1. Exact key (protocol, module_id) 2. If module_id != None and not found: fall back to (protocol, None)
- Parameters:
protocol (
ProtocolType) – Protocol typemodule_id (
Optional[str]) – Target module ID or None for the default providerrequire_available (
bool) – If True, only return if protocol is available
- Return type:
- Returns:
Provider instance or None
- get_or_create_provider_for_module(protocol, module_name, module_id)[source]¶
Return the provider for (protocol, module_id).
If no provider is registered for that specific module_id yet, the default provider (module_id=None) is looked up and registered under the new key automatically – it will be used with an external TopicBuilder injected by the factory at creation time.
- Parameters:
protocol (
ProtocolType) – Protocol typemodule_name (
str) – Human-readable name of the target modulemodule_id (
str) – Unique ID of the target module
- Return type:
- Returns:
Provider instance or None if no provider is available at all
- list_registered()[source]¶
Get list of all registered (protocol, module_id) keys.
- Return type:
List[Tuple[ProtocolType,Optional[str]]]
- list_available()[source]¶
Get list of available (protocol, module_id) keys.
- Return type:
List[Tuple[ProtocolType,Optional[str]]]
- is_available(protocol, module_id=None)[source]¶
Check if a provider for (protocol, module_id) is available.
- Return type:
- Parameters:
protocol (ProtocolType)
module_id (str | None)
- unregister_provider(protocol, module_id=None)[source]¶
Unregister a provider.
- Parameters:
protocol (
ProtocolType) – Protocol to unregistermodule_id (
Optional[str]) – Module ID of the entry to remove (None = default provider)
- Returns:
True if provider was unregistered
- Return type:
- async initialize_all(config=None)[source]¶
Initialize all registered providers.
Note: aliased entries (multiple module_ids sharing the same provider instance) will attempt initialization only once per unique instance.
- Parameters:
config (
Optional[Dict[ProtocolType,Dict]]) – Optional configuration per protocol- Return type:
List[Tuple[ProtocolType,Optional[str]]]- Returns:
List of successfully initialized (protocol, module_id) keys
- class vyra_base.com.TopicBuilder(module_name, module_id, namespace=None, interface_paths=None, enable_interface_loading=True)[source]¶
Bases:
objectBuilder for consistent topic/service names across transport protocols.
- Features:
Validates module names and IDs
Ensures naming consistency
Supports optional namespace and subsection path levels
Protocol-agnostic design
Topic path structure:
{module_name}_{module_id}[/{namespace}]/{function_name}[/{subsection}]
Examples
>>> builder = TopicBuilder("v2_modulemanager", "abc123") >>> builder.build("get_modules") 'v2_modulemanager_abc123/get_modules'
>>> builder.build("NewsFeed", namespace="feeder") 'v2_modulemanager_abc123/feeder/NewsFeed'
>>> builder.build("run_task", namespace="action", subsection="cancel") 'v2_modulemanager_abc123/action/run_task/cancel'
>>> # Instance-level default namespace (applied to every build() call) >>> feeder_builder = TopicBuilder("v2_modulemanager", "abc123", namespace="feeder") >>> feeder_builder.build("NewsFeed") 'v2_modulemanager_abc123/feeder/NewsFeed'
- Parameters:
- MODULE_NAME_PATTERN = re.compile('^[a-zA-Z0-9_]+$')¶
- MODULE_ID_PATTERN = re.compile('^[a-zA-Z0-9_]+$')¶
- FUNCTION_NAME_PATTERN = re.compile('^[a-zA-Z0-9_]+$')¶
- SUBSECTION_PATTERN = re.compile('^[a-zA-Z0-9_/\\-]+$')¶
- __init__(module_name, module_id, namespace=None, interface_paths=None, enable_interface_loading=True)[source]¶
Initialize topic builder with optional interface loading.
- Parameters:
module_name (
str) – Name of the module (e.g., “v2_modulemanager”)module_id (
str) – Unique module instance ID (e.g., “abc123” or full hash)namespace (
Optional[str]) – Optional default namespace segment applied to everybuild()call that does not supply an explicitnamespaceargument. For example, passnamespace="feeder"when creating aTopicBuilderfor a feeder publisher so that all feeder topics are automatically scoped underfeeder/.interface_paths (
Optional[list[str|Path]]) – Optional list of interface base paths. If None, uses InterfacePathRegistry defaults.enable_interface_loading (
bool) – Enable dynamic interface loading. Set to False for pure topic naming without interface loading.
- Raises:
ValueError – If module_name or module_id contains invalid characters
- build(function_name, namespace=None, subsection=None, interface_type=None)[source]¶
Build a topic/service name.
Path:
{module_name}_{module_id}[/{namespace}]/{function_name}[/{subsection}]If namespace is
Nonethe instance-level default namespace (set via the constructor) is used. Passnamespace=""to explicitly suppress the instance default for a single call.- Parameters:
function_name (
str) – Name of the function/interfacenamespace (
Optional[str]) – Optional namespace segment (overrides instance default). Omit or passNoneto inherit the instance default. Pass""to suppress the instance default for this call.subsection (
Optional[str]) – Optional trailing subsection segmentinterface_type (
Optional[InterfaceType]) – Type of interface (for logging/validation)
- Return type:
- Returns:
Complete topic/service name
Examples
>>> builder.build("get_modules") 'v2_modulemanager_abc123/get_modules'
>>> builder.build("NewsFeed", namespace="feeder") 'v2_modulemanager_abc123/feeder/NewsFeed'
>>> builder.build("run_task", namespace="action", subsection="cancel") 'v2_modulemanager_abc123/action/run_task/cancel'
- parse(topic_name)[source]¶
Parse a topic name back into components.
Understands both the 2-level path (
module/function) and the 3-level path (module/namespace/functionormodule/function/subsection) and the 4-level path (module/namespace/function/subsection).Disambiguation rule for 2 remaining segments: the second segment is treated as
subsection(notnamespace), preserving backward compatibility with the previoussubactionfield. To get anamespace-only path (without subsection), use 3 segments: build the topic with onlynamespaceset.Path:
{module_name}_{module_id}[/{namespace}]/{function_name}[/{subsection}]- Parameters:
topic_name (
str) – Complete topic name to parse- Return type:
- Returns:
TopicComponents with parsed values
- Raises:
ValueError – If topic name doesn’t match expected format
Examples
>>> builder.parse("v2_modulemanager_abc123/get_modules") TopicComponents(module_name='v2_modulemanager', module_id='abc123', function_name='get_modules', namespace=None, subsection=None)
>>> builder.parse("v2_modulemanager_abc123/feeder/NewsFeed") TopicComponents(module_name='v2_modulemanager', module_id='abc123', function_name='NewsFeed', namespace='feeder', subsection=None)
>>> builder.parse("v2_modulemanager_abc123/action/run_task/cancel") TopicComponents(module_name='v2_modulemanager', module_id='abc123', function_name='run_task', namespace='action', subsection='cancel')
- build_topic_name(function_name, namespace=None, subsection=None, interface_type=None)[source]¶
Build only the topic/service name (without loading interface).
Pure naming function - works without ROS2 or interface loading. Useful for slim mode or when interface type is not needed.
- Parameters:
- Return type:
- Returns:
Complete topic/service name
Examples
>>> builder.build_topic_name("get_modules") 'v2_modulemanager_abc123/get_modules'
- load_interface_type(function_name, protocol='ros2')[source]¶
Load interface type for a function (ROS2 or Protobuf).
Dynamically loads interface class/module without compile-time imports. Requires interface loader to be enabled.
- Parameters:
- Return type:
- Returns:
Interface type/module, or None if not found or loader disabled
Examples
>>> # Load ROS2 service interface >>> srv_type = builder.load_interface_type("get_interface_list", "ros2")
>>> # Load protobuf for Zenoh >>> pb_module = builder.load_interface_type("get_interface_list", "zenoh")
- build_with_interface(function_name, namespace=None, subsection=None, interface_type=None, protocol='ros2')[source]¶
Build topic name AND load interface type in one call.
Combines topic naming with interface loading for convenience. Returns both the topic name and the loaded interface.
- Parameters:
- Return type:
- Returns:
Tuple of (topic_name, interface_type_or_module) interface_type_or_module is None if loading failed/disabled
Examples
>>> # Build topic and load ROS2 service >>> topic, service_type = builder.build_with_interface( ... "get_interface_list", ... interface_type=InterfaceType.SERVER, ... protocol="ros2" ... ) >>> print(topic) # 'v2_modulemanager_abc123/get_interface_list' >>> print(service_type) # <class '...VBASEGetInterfaceList'>
>>> # Build topic and load protobuf for Zenoh >>> topic, pb_module = builder.build_with_interface( ... "state_feed", ... interface_type=InterfaceType.SPEAKER, ... protocol="zenoh" ... )
- get_interface_loader()[source]¶
Get the interface loader instance.
- Return type:
- Returns:
InterfaceLoader instance, or None if disabled
- class vyra_base.com.TopicComponents(module_name, module_id, function_name, namespace=None, subsection=None)[source]¶
Bases:
objectComponents of a parsed topic name.
Represents the structured parts of a VYRA topic path:
{module_name}_{module_id}[/{namespace}]/{function_name}[/{subsection}]- Parameters:
- vyra_base.com.TopicInterfaceType¶
alias of
InterfaceType
- vyra_base.com.create_topic_builder(module_name, module_id, namespace=None)[source]¶
Factory function to create a TopicBuilder instance.
- Parameters:
- Return type:
- Returns:
Configured TopicBuilder instance
- vyra_base.com.build_topic(module_name, module_id, function_name, namespace=None, subsection=None)[source]¶
Build a topic name without creating a builder instance.
Convenience function for one-off topic generation.
- Parameters:
- Return type:
- Returns:
Complete topic name
- vyra_base.com.parse_topic(topic_name)[source]¶
Parse a topic name without a builder instance.
- Parameters:
topic_name (
str) – Topic name to parse- Return type:
- Returns:
TopicComponents with parsed values
- class vyra_base.com.InterfacePathRegistry[source]¶
Bases:
objectThread-safe singleton registry for interface base paths.
Stores paths to interface directories containing: - /config/.json - Interface metadata - /server/.srv - ROS2 service definitions - /publisher/.msg - ROS2 message definitions - /actionServer/.action - ROS2 action definitions - /proto/*.proto - Protocol Buffer definitions - /proto/*_pb2.py - Generated Python protobuf modules
Default path points to vyra_base/interfaces (embedded package). Modules can register additional paths via set_interface_paths().
Examples
>>> registry = InterfacePathRegistry.get_instance() >>> registry.set_interface_paths([ ... "/path/to/interfaces1", ... "/path/to/interfaces2" ... ]) >>> paths = registry.get_interface_paths()
- classmethod get_instance()[source]¶
Get singleton instance (thread-safe).
- Return type:
- Returns:
InterfacePathRegistry singleton instance
- set_interface_paths(paths)[source]¶
Set interface base paths (replaces existing paths).
- Parameters:
paths (
list[str|Path]) – List of interface directory paths- Raises:
ValueError – If any path doesn’t exist or isn’t readable
- Return type:
Examples
>>> registry.set_interface_paths([ ... "/path/to/interfaces1", ... "/path/to/interfaces2" ... ])
- add_interface_path(path)[source]¶
Add an interface path to existing paths (without replacing).
- Parameters:
- Raises:
ValueError – If path doesn’t exist or isn’t readable
- Return type:
- vyra_base.com.get_interface_registry()[source]¶
Convenience function to get singleton instance.
- Return type:
- Returns:
InterfacePathRegistry singleton
- class vyra_base.com.InterfaceLoader(interface_paths=None, auto_update_paths=True)[source]¶
Bases:
objectDynamic loader for ROS2 and Protocol Buffer interfaces.
Provides runtime loading of interface type classes from string names, eliminating compile-time import dependencies.
- Features:
ROS2 interface loading via rosidl_runtime_py
Protocol Buffer module loading (*_pb2.py)
Interface metadata loading from JSON configs
Caching for performance
Graceful fallback when ROS2 unavailable
Examples
>>> loader = InterfaceLoader()
# Load ROS2 service interface >>> srv_type = loader.load_ros2_interface(“your_module_interfaces/srv/VBASEGetInterfaceList”) >>> print(srv_type) # <class ‘your_module_interfaces.srv.VBASEGetInterfaceList’>
# Load Protocol Buffer interface >>> pb_type = loader.load_protobuf_interface(“VBASEGetInterfaceList”) >>> print(pb_type) # <module ‘VBASEGetInterfaceList_pb2’>
# Load by function name from metadata >>> interface = loader.get_interface_for_function(“get_interface_list”, protocol=”ros2”)
- load_ros2_interface(interface_path)[source]¶
Load ROS2 interface type from string path.
Uses rosidl_runtime_py.utilities to dynamically load interface classes without compile-time imports.
- Parameters:
interface_path (
str) – Interface path in format “package/type/Name” Examples: - “std_msgs/msg/String” - “your_module_interfaces/srv/VBASEGetInterfaceList” - “action_msgs/action/MyAction”- Return type:
- Returns:
Interface type class, or None if unavailable
Examples
>>> loader.load_ros2_interface("std_msgs/msg/String") <class 'std_msgs.msg._string.String'>
- load_protobuf_interface(interface_name)[source]¶
Load Protocol Buffer interface module (*_pb2.py).
Searches proto/ directories in registered interface paths for generated Python protobuf modules.
- Parameters:
interface_name (
str) – Base name of interface (without _pb2 suffix) Examples: - “VBASEGetInterfaceList” → VBASEGetInterfaceList_pb2.py - “VBASEStateFeed” → VBASEStateFeed_pb2.py- Return type:
- Returns:
Loaded protobuf module, or None if not found
Examples
>>> loader.load_protobuf_interface("VBASEGetInterfaceList") <module 'VBASEGetInterfaceList_pb2'>
- load_interface_metadata(reload=False)[source]¶
Load interface metadata from JSON config files.
Scans config/ directories in registered interface paths and loads all *.json files containing interface metadata.
- Parameters:
reload (
bool) – Force reload even if cached- Return type:
- Returns:
Dict mapping function_name → metadata dict
Examples
>>> metadata = loader.load_interface_metadata() >>> print(metadata.keys()) dict_keys(['get_interface_list', 'health_check', 'initialize', ...])
>>> meta = metadata['get_interface_list'] >>> print(meta['type']) # 'callable' >>> print(meta['filetype']) # ['VBASEGetInterfaceList.srv', 'VBASEGetInterfaceList.proto']
- get_interface_for_function(function_name, protocol='ros2')[source]¶
Load interface type for a function by name.
Looks up function in metadata, extracts appropriate interface path based on protocol, and loads the interface.
- Parameters:
- Return type:
- Returns:
Interface type/module, or None if not found
Examples
>>> # Load ROS2 service interface >>> srv = loader.get_interface_for_function("get_interface_list", protocol="ros2")
>>> # Load protobuf for Zenoh >>> pb = loader.get_interface_for_function("get_interface_list", protocol="zenoh")
- class vyra_base.com.BaseFeeder[source]¶
Bases:
IFeederConcrete base class for all VYRA feeders.
Implements
IFeederand provides:Protocol auto-resolution via
FeederConfigResolver— the transport protocol is read from the module’s interface config JSON (functionnamematched againstfeeder_name).Pre-start buffer — messages fed before
start()are queued and flushed automatically.Metrics —
feed_count,error_count,last_feed_at.Health check —
is_alive()probes the backing publisher.Retry policy — configurable
max_retriesandretry_delay.
Abstract class — subclasses set
_feederName,_type, optionally_interface_paths.- get_feeder_name()[source]¶
Return the feeder name (=
functionnamein interface config).- Return type:
- register_condition(condition_function, *, name=None, tag='news', execution_point='ALWAYS', success_message=None, failure_message=None)[source]¶
Register a synchronous bool-returning condition callback.
- evaluate_conditions(context, *, rule_names=None, tags=None, execution_point=None)[source]¶
Evaluate registered conditions and return resulting messages.
By default all registered conditions are evaluated. Use
rule_namesand/ortagsto limit evaluation to a subset.
- set_interface_paths(paths)[source]¶
Override the interface paths used for protocol resolution.
Called by a
VyraEntityafter constructing the feeder to provide module-specific config paths.
- async create(loggingOn=False)[source]¶
Create the publisher using protocol resolved from interface config.
Resolution order:
FeederConfigResolver.resolve(feeder_name, interface_paths)— reads the module’s JSON config, mapstagsto a protocol.If no config found (or
interface_pathsempty): fall back toInterfaceFactory.create_publisherwith the default chain[ZENOH, ROS2, REDIS, UDS].
- Raises:
FeederException – If no protocol is available at all.
- Parameters:
loggingOn (
bool) – Emit feeder messages also to the base logger.- Return type:
- async start()[source]¶
Start the feeder (implements
IFeeder).Subclasses may override to add extra initialisation before calling
await super().start().- Return type:
- async feed(msg)[source]¶
Enqueue msg for publishing (implements
IFeeder).If the feeder is not yet ready the message is buffered. Otherwise the publish path is executed synchronously (event-loop aware).
- add_handler(handler)[source]¶
Attach a
CommunicationHandler.- Parameters:
handler (
CommunicationHandler) – Handler instance to attach.- Returns:
Trueif added,Falseif already present.- Return type:
- class vyra_base.com.StateFeeder(node, module_entity, loggingOn=False)[source]¶
Bases:
BaseFeederResponsible for loading a VYRA Handler and feeding StateEntry elements to this handler.
- Parameters:
node (
Optional[Any]) – The VyraNode instance associated with this feeder.module_entity (
ModuleEntry) – Module configuration entry.loggingOn (
bool) – Flag to enable or disable logging next to feeding. Defaults to False.
- Raises:
FeederException – If the publisher cannot be created.
- Variables:
_feederName – Name of the feeder.
_doc – Documentation string for the feeder.
_level – Logging level.
_node – VyraNode instance.
_module_entity – Module configuration.
- __init__(node, module_entity, loggingOn=False)[source]¶
Initializes a StateFeeder instance for sending changes in state of a module.
- Parameters:
node (
Optional[Any]) – The VyraNode instance (optional, None if ROS2 unavailable).module_entity (
ModuleEntry) – Module configuration entry.loggingOn (
bool) – Flag to enable or disable logging next to feeding. Defaults to False.
- Raises:
FeederException – If the publisher cannot be created.
- async start()[source]¶
Starts the feeder by initializing handlers.
Automatically resolves the transport protocol from the module’s interface config (reads
StateFeedentry, pickstags).- Return type:
- async feed(stateElement)[source]¶
Feed a state entry to the feeder.
Validates the input type, then delegates to
feed()which calls_prepare_entry_for_publish()followed byMessageMapperfor protocol-aware conversion.- Parameters:
stateElement (
StateEntry) – The state entry to feed.- Raises:
FeederException – If the provided element is not of type StateEntry.
- Return type:
- class vyra_base.com.NewsFeeder(node, module_entity, loggingOn=False)[source]¶
Bases:
BaseFeederCollection of the news messages.
- Parameters:
node (
Optional[Any]) – The VyraNode instance associated with this feeder.module_entity (
ModuleEntry) – Module configuration entry.loggingOn (
bool) – Flag to enable or disable logging next to feeding. Defaults to False.
- Raises:
FeederException – If the publisher cannot be created.
- __init__(node, module_entity, loggingOn=False)[source]¶
Initialize the BaseFeeder.
- Parameters:
node (Any | None)
module_entity (ModuleEntry)
loggingOn (bool)
- async start()[source]¶
Starts the feeder by initializing handlers.
Automatically resolves the transport protocol from the module’s interface config (reads
NewsFeedentry, pickstags).- Return type:
- async feed(newsElement)[source]¶
Feed a news entry to the feeder.
Normalises the input to a
NewsEntry, then delegates tofeed()which calls_prepare_entry_for_publish()followed byMessageMapperfor protocol-aware conversion.- Parameters:
newsElement (
Union[NewsEntry,str,list]) – The news entry to be fed. Can be a string or list of strings to be processed into a NewsEntry, or a NewsEntry object.- Raises:
FeederException – If the type of newsElement is not supported.
- Return type:
- register_news_condition(condition_function, *, name=None, execution_point='ALWAYS', success_message=None, failure_message=None, tag='news')[source]¶
Register a synchronous bool condition for tracked feeder messages.
- monitor(*, tag='news', label=None, severity='INFO', entity=None, during_interval_seconds=0.05)[source]¶
Return a runtime-monitoring decorator bound to this feeder.
- async evaluate_tracked_conditions(context)[source]¶
Evaluate registered conditions and publish resulting tracker messages.
- evaluate_tracked_conditions_sync(context)[source]¶
Sync variant of
evaluate_tracked_conditions().
- build_newsfeed(*args)[source]¶
Build a well structured newsfeed entry from plain text and module information.
- Parameters:
args (
Any) – The arguments to be processed into a news entry. Can be a string or list of strings.- Returns:
A structured news entry containing the message, level, timestamp, UUID, module name, module ID, module template, and type.
- Return type:
- Raises:
FeederException – If the type of the message level is not valid.
- class vyra_base.com.ErrorFeeder(node, module_entity, loggingOn=True)[source]¶
Bases:
BaseFeederCollection of the error messages.
- Parameters:
node (
Optional[Any]) – The VyraNode instance associated with this feeder.module_entity (
ModuleEntry) – The module configuration entry.loggingOn (
bool) – Flag to enable or disable logging next to feeding. Defaults to False.
- Raises:
FeederException – If the publisher cannot be created.
- __init__(node, module_entity, loggingOn=True)[source]¶
Initialize the BaseFeeder.
- Parameters:
node (Any | None)
module_entity (ModuleEntry)
loggingOn (bool)
- async start()[source]¶
Starts the feeder by initializing handlers.
Automatically resolves the transport protocol from the module’s interface config (reads
ErrorFeedentry, pickstags).- Return type:
- async feed(errorElement)[source]¶
Feed an error entry to the feeder.
Normalises the input to an
ErrorEntry, then delegates tofeed()which calls_prepare_entry_for_publish()followed byMessageMapperfor protocol-aware conversion.- Parameters:
errorElement (
Union[ErrorEntry,dict]) – The error entry to be fed. Can be a dictionary with error details or anErrorEntryobject.- Raises:
FeederException – If the type of errorElement is neither a dict nor an
ErrorEntry.- Return type:
- feed_sync(errorElement)[source]¶
Sync version of
feed()with ErrorEntry normalization.- Return type:
- Parameters:
errorElement (ErrorEntry | dict)
- monitor(*, tag='error', label=None, severity='WARNING', entity=None, during_interval_seconds=0.05)[source]¶
Return an exception-monitoring decorator bound to this feeder.
- build_errorfeed(errorDict)[source]¶
Build an error entry from the given keyword arguments.
- Parameters:
errorDict (
dict) –A dictionary containing error details. Keys are:
code: int16 - Error code (default: 0x00000000)uuid: UUID - Unique identifier for the error (default: a new UUID)description: str - Description of the error (default: ‘’)solution: str - Suggested solution for the error (default: ‘’)miscellaneous: str - Additional information (default: ‘’)level: ErrorEntry.ERROR_LEVEL - Level of the error (default: ErrorEntry.ERROR_LEVEL.MINOR_FAULT)
- Returns:
An instance of
vyra_base.defaults.entries.ErrorEntrypopulated with the provided details.- Return type:
- class vyra_base.com.ROS2Publisher(publisherInfo, node)[source]¶
Bases:
objectBase class for ROS2 publishers.
This class is intended to be factory-created to implement a specific publisher for a topic.
- Parameters:
publisherInfo (PublisherInfo)
node (VyraNode)
- __init__(publisherInfo, node)[source]¶
Initialize the ROS2Publisher.
- Parameters:
publisherInfo (
PublisherInfo) – PublisherInfo instance containing publisher configuration.node (
VyraNode) – The ROS2 node to which the publisher belongs.
- Return type:
None
- create_publisher()[source]¶
Create a publisher in the ROS2 node.
This method should be called to register the publisher within the ROS2 node.
- Raises:
ValueError – If publisher type or name is not provided.
- Return type:
- publish(msg)[source]¶
Publish a message to the topic.
This method should be overridden in subclasses to provide specific functionality.
- Parameters:
msg (
Any) – The message to publish.- Raises:
ValueError – If publisher type, name, or publisher instance is not set.
- Return type:
- class vyra_base.com.ROS2Subscriber(subscriptionInfo, node)[source]¶
Bases:
objectBase class for ROS2 subscriptions.
This class is intended to be factory-created to implement specific subscriptions for topics.
- Parameters:
subscriptionInfo (SubscriberInfo)
node (VyraNode)
- __init__(subscriptionInfo, node)[source]¶
Initialize the ROS2Subscriber.
- Parameters:
subscriptionInfo (
SubscriberInfo) – Information about the subscription.node (
VyraNode) – The ROS2 node to attach the subscription to.
- Return type:
None
- create_subscription()[source]¶
Create and register the subscription with the ROS2 node.
- Raises:
ValueError – If the subscription type or name is not provided.
- Return type:
- class vyra_base.com.VyraNode(node_settings)[source]¶
Bases:
NodeVyra node class.
- Parameters:
node_settings (
NodeSettings) – NodeSettings object containing the node’s settings.
- __init__(node_settings)[source]¶
- Parameters:
node_settings (NodeSettings)
- Return type:
None
- set_reload()[source]¶
Set the reload event to notify that the node settings have changed.
- Return type:
- property node_settings: NodeSettings¶
Get the node settings.
- Returns:
NodeSettings object containing the node’s settings.
- Return type:
- class vyra_base.com.CheckerNode(enable_rosout=False)[source]¶
Bases:
NodeNode to check the availability of other nodes.
Note: Uses enable_rosout=False to avoid SROS2 permission issues with the /rosout topic during node availability checks. :param enable_rosout: Whether to enable the rosout logger. :type enable_rosout:
bool- Parameters:
enable_rosout (bool)
- class vyra_base.com.NodeSettings(name='Vyra_node', parameters=<factory>)[source]¶
Bases:
objectSettings for a Vyra node.
- Variables:
- Parameters:
- class vyra_base.com.ROS2ActionClient(actionInfo, node)[source]¶
Bases:
objectBase class for ROS2 action client.
This class will be factory created to call specific action functionality.
- Parameters:
actionInfo (ActionClientInfo)
node (VyraNode)
- __init__(actionInfo, node)[source]¶
Initialize the ROS2ActionClient.
- Parameters:
actionInfo (
ActionClientInfo) – Information about the action.node (
VyraNode) – The ROS2 node.
- Return type:
None
- create_action_client()[source]¶
Create an action client in the ROS2 node.
This method should be called to register the action client with the ROS2 node.
- Return type:
- send_goal(order)[source]¶
Send a goal to the action server.
- Parameters:
order (Any) – The order to send as part of the goal.
- Raises:
ValueError – If the action client has not been created.
- Returns:
A future representing the goal request.
- Return type:
Future
- async send_goal_async(goal, feedback_callback=None)[source]¶
Send a goal asynchronously and wait for result.
- Parameters:
- Raises:
ValueError – If the action client has not been created.
- Returns:
The goal handle.
- Return type:
- goal_response_callback(future)[source]¶
Callback for the goal response.
- Parameters:
future (Future) – The future containing the goal handle.
- Return type:
- class vyra_base.com.ROS2ActionServer(actionInfo, node)[source]¶
Bases:
objectBase class for ROS2 actions.
This class is intended to be factory-created to implement specific action functionality.
- Parameters:
actionInfo (ActionServerInfo)
node (VyraNode)
- __init__(actionInfo, node)[source]¶
Initialize the ROS2ActionServer.
- Parameters:
action (ActionServerInfo) – The action configuration.
node (
VyraNode) – The ROS2 node to attach the action server to.actionInfo (ActionServerInfo)
- Return type:
None
- create_action_server()[source]¶
Create and register the action server with the ROS2 node.
This method should be called to register the action server with the ROS2 node.
- Return type:
- execute_callback(goal_handle)[source]¶
Execute the action callback.
This method should be overridden in subclasses to provide specific functionality.
- Parameters:
goal_handle (Any) – The goal handle for the action.
- Return type:
- goal_callback(goal_handle)[source]¶
Handle the goal callback.
This method should be overridden in subclasses to provide specific functionality.
- Parameters:
goal_handle (Any) – The goal handle for the action.
- Return type:
GoalResponse
- class vyra_base.com.ROS2ServiceClient(serviceInfo, node)[source]¶
Bases:
objectBase class for ROS2 services.
This class is intended to be factory-created to implement specific service functionality.
- Parameters:
serviceInfo (ServiceClientInfo)
node (VyraNode)
- __init__(serviceInfo, node)[source]¶
Initialize the ROS2ServiceClient.
- Parameters:
serviceInfo (
ServiceClientInfo) – Information about the service.node (
VyraNode) – ROS2 node instance.
- Return type:
None
- property service_info: ServiceClientInfo¶
Get the service information.
- async create_service_caller()[source]¶
Create a service caller in the ROS2 node.
This method should be called to register the service caller with the ROS2 node.
- Raises:
ValueError – If the service type or name is not provided.
TimeoutError – If the service is not available within the timeout period.
- Return type:
- async send(**kwargs)¶
Wrapper for asynchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.
- class vyra_base.com.ROS2ServiceServer(serviceInfo, node, async_loop=None)[source]¶
Bases:
objectBase class for ROS2 services.
This class is intended to be factory-created to implement specific service functionality.
- Parameters:
serviceInfo (ServiceServerInfo)
node (VyraNode)
- __init__(serviceInfo, node, async_loop=None)[source]¶
Initialize the ROS2ServiceServer.
- Parameters:
serviceInfo (
ServiceServerInfo) – Information about the service.node (
VyraNode) – The ROS2 node to attach the service to.async_loop (Any, Optional) – Optional asyncio event loop.
- Return type:
None
- property service_info: ServiceServerInfo¶
Get the service information.
- create_service(callback, async_loop=None)[source]¶
Create and register a service in the ROS2 node.
- Parameters:
callback (
Callable) – The callback function to handle service requests.async_loop (
AbstractEventLoop) – Optional asyncio event loop.
- Raises:
TypeError – If the callback is not callable.
ValueError – If the service type or name is not provided.
- Return type:
- destroy_service()[source]¶
Destroy the service in the ROS2 node.
This method will remove the service if it exists.
- Return type:
- run_async_in_thread(coro, timeout=None)[source]¶
Execute an async coroutine in a separate thread with its own event loop.
Creates a new thread with a dedicated asyncio event loop to run async code from synchronous ROS2 service callbacks.
- Parameters:
coro – Coroutine to execute.
timeout – Optional timeout in seconds.
- Returns:
Result from coroutine execution.
- class vyra_base.com.ZenohProvider(module_name, module_id, protocol=ProtocolType.ZENOH)[source]¶
Bases:
AbstractProtocolProviderProtocol provider for Zenoh transport.
Features: - Efficient Pub/Sub with zero-copy capabilities - Query/Reply for request-response patterns - Router-based scalability - Built-in discovery and fault tolerance - Multi-protocol support (TCP, UDP, shared memory)
Requirements: - eclipse-zenoh Python package - Zenoh router (typically Docker service)
Example
>>> # Initialize provider >>> provider = ZenohProvider(ProtocolType.ZENOH) >>> >>> if await provider.check_availability(): ... await provider.initialize(config={ ... "mode": "client", ... "connect": ["tcp/zenoh-router:7447"] ... }) ... ... # Create callable (query/reply) ... async def handle_request(req): ... return {"result": req["value"] * 2} ... ... callable = await provider.create_callable( ... "/calculate", ... handle_request ... ) ... ... # Create publisher (pub/sub) ... publisher = await provider.create_publisher("/sensor_data")
- Parameters:
module_name (str)
module_id (str)
protocol (ProtocolType)
- __init__(module_name, module_id, protocol=ProtocolType.ZENOH)[source]¶
Initialize Zenoh provider.
- Parameters:
protocol (
ProtocolType) – Protocol type (must be ZENOH)module_name (str)
module_id (str)
- async check_availability()[source]¶
Check if Zenoh is available.
- Returns:
True if zenoh-python is installed
- Return type:
- async initialize(config=None)[source]¶
Initialize Zenoh provider and open session.
- Parameters:
config (
Optional[Dict[str,Any]]) – Configuration dictionary: - mode: “peer”, “client”, or “router” (default: “client”) - connect: List of endpoints (default: [“tcp/zenoh-router:7447”]) - listen: List of listen endpoints - format: Serialization format (default: “json”)- Returns:
True if initialization successful
- Return type:
- async create_publisher(name, topic_builder=None, message_type=None, **kwargs)[source]¶
Create Zenoh Publisher.
- Parameters:
name (
str) – Publisher nametopic_builder (
Optional[TopicBuilder]) – TopicBuilder instance**kwargs – Additional publisher options
- Return type:
- Returns:
ZenohPublisherImpl instance
- async create_subscriber(name, topic_builder=None, subscriber_callback=None, message_type=None, **kwargs)[source]¶
Create Zenoh Subscriber.
- Parameters:
- Return type:
- Returns:
ZenohSubscriberImpl instance
- async create_server(name, topic_builder=None, response_callback=None, service_type=None, **kwargs)[source]¶
Create Zenoh Server (Queryable).
- Parameters:
- Return type:
- Returns:
ZenohServerImpl instance
- async create_client(name, topic_builder=None, service_type=None, request_callback=None, **kwargs)[source]¶
Create Zenoh Client (Query sender).
- Parameters:
name (
str) – Client nametopic_builder (
Optional[TopicBuilder]) – TopicBuilder instance (uses provider’s default if omitted)service_type (
Optional[type]) – Optional service type class (ignored in Zenoh – schema-less)request_callback (
Optional[Callable]) – Optional async callback for responses**kwargs – Additional client options
- Return type:
- Returns:
ZenohClientImpl instance
- async create_action_server(name, topic_builder=None, handle_goal_request=None, handle_cancel_request=None, execution_callback=None, action_type=None, **kwargs)[source]¶
Create Zenoh Action Server.
- Parameters:
name (
str) – Action server nametopic_builder (
Optional[TopicBuilder]) – TopicBuilder instancehandle_goal_request (
Optional[Callable]) – Async callback for goal requestshandle_cancel_request (
Optional[Callable]) – Async callback for cancel requestsexecution_callback (
Optional[Callable]) – Async callback for goal execution**kwargs – Additional action server options
- Return type:
- Returns:
ZenohActionServerImpl instance
- async create_action_client(name, topic_builder=None, action_type=None, direct_response=None, feedback_callback=None, goal_response_callback=None, direct_response_callback=None, goal_callback=None, **kwargs)[source]¶
Create Zenoh Action Client.
- Parameters:
name (
str) – Action client nametopic_builder (
Optional[TopicBuilder]) – TopicBuilder instancedirect_response (
Optional[Callable]) – Optional async callback for resultsfeedback_callback (
Optional[Callable]) – Optional async callback for feedbackgoal_response_callback (
Optional[Callable]) – Optional async callback for goal responses**kwargs – Additional action client options
direct_response_callback (Callable | None)
goal_callback (Callable | None)
- Return type:
- Returns:
ZenohActionClientImpl instance
- require_initialization()[source]¶
Ensure provider is initialized.
- Raises:
ProviderError – If not initialized
- Return type:
- class vyra_base.com.ZenohSession(config=None)[source]¶
Bases:
objectManages Zenoh session for VYRA modules.
Provides: - Session lifecycle management - Connection to Zenoh router - Query/Reply, Pub/Sub, and Task primitives - Automatic reconnection
Example
>>> config = SessionConfig( ... mode=SessionMode.CLIENT, ... connect=["tcp/zenoh-router:7447"] ... ) >>> session = ZenohSession(config) >>> await session.open() >>> >>> # Use session for communication >>> pub = session.declare_publisher("/topic") >>> pub.put("Hello Zenoh") >>> >>> await session.close()
- Parameters:
config (Optional[SessionConfig])
- __init__(config=None)[source]¶
Initialize Zenoh session manager.
- Parameters:
config (
Optional[SessionConfig]) – Session configuration
- async open()[source]¶
Open Zenoh session and connect to router.
- Returns:
True if session opened successfully
- Return type:
- class vyra_base.com.SessionConfig(mode=SessionMode.CLIENT, connect=<factory>, listen=<factory>, id=None, scouting_multicast=True, timeout_ms=5000)[source]¶
Bases:
objectConfiguration for Zenoh session.
- Parameters:
- mode¶
Session mode (peer/client/router)
- connect¶
List of endpoints to connect to (e.g., [“tcp/zenoh-router:7447”])
- listen¶
List of endpoints to listen on
- id¶
Optional session ID
- scouting_multicast¶
Enable multicast scouting for peer discovery
- timeout_ms¶
Timeout for operations in milliseconds
- mode: SessionMode = 'client'¶
- class vyra_base.com.SessionMode(*values)[source]¶
-
Zenoh session modes.
- PEER = 'peer'¶
- CLIENT = 'client'¶
- ROUTER = 'router'¶
- class vyra_base.com.RedisProvider(module_name, module_id, protocol=ProtocolType.REDIS, **redis_kwargs)[source]¶
Bases:
AbstractProtocolProviderProtocol provider for Redis communication.
Features: - Pub/Sub messaging via Publisher interface - Key-Value storage via Callable interface (get/set) - TLS support with ACL authentication - Streaming support (Redis Streams)
Wraps existing vyra_base.com.transport.redis.communication.RedisClient for seamless integration with established infrastructure.
Example
>>> # Initialize provider >>> provider = RedisProvider( ... protocol=ProtocolType.REDIS, ... module_name="my_module" ... ) >>> >>> if await provider.check_availability(): ... await provider.initialize() ... ... # Create publisher (Pub/Sub) ... publisher = await provider.create_publisher( ... "sensor_updates", ... module_name="robot" ... ) ... await publisher.publish({"temperature": 23.5}) ... ... # Listen for messages ... async def on_message(data): ... print(f"Received: {data}") ... await publisher.listen(on_message)
- Parameters:
module_name (str)
module_id (str)
protocol (ProtocolType)
- __init__(module_name, module_id, protocol=ProtocolType.REDIS, **redis_kwargs)[source]¶
Initialize Redis provider.
- Parameters:
protocol (
ProtocolType) – Protocol type (must be REDIS)module_name (
str) – Module name for Redis namespacemodule_id (
str) – Module ID for Redis namespace - host: Redis host (default: from env REDIS_HOST) - port: Redis port (default: from env REDIS_PORT) - username: Redis ACL username - password: Redis ACL password - use_tls: Enable TLS (default: from env)
- async check_availability()[source]¶
Check if Redis is available.
- Returns:
True if Redis client can be imported
- Return type:
- async create_publisher(name, topic_builder=None, message_type=None, **kwargs)[source]¶
Create Redis Publisher (Pub/Sub).
- Parameters:
name (
str) – Publisher nametopic_builder (
Optional[TopicBuilder]) – TopicBuilder instance (uses provider’s default if omitted)message_type (
type) – Message type class**kwargs – Additional publisher options
- Return type:
- Returns:
RedisPublisherImpl instance
- async create_subscriber(name, topic_builder=None, subscriber_callback=None, message_type=None, **kwargs)[source]¶
Create Redis Subscriber (Pub/Sub).
- Parameters:
name (
str) – Subscriber nametopic_builder (
Optional[TopicBuilder]) – TopicBuilder instance (uses provider’s default if omitted)subscriber_callback (
Callable) – Async callback for received messagesmessage_type (
type) – Message type class**kwargs – Additional subscriber options
- Return type:
- Returns:
RedisSubscriberImpl instance
- async create_server(name, topic_builder=None, response_callback=None, service_type=None, **kwargs)[source]¶
Create Redis Server (request/response pattern).
- Parameters:
- Return type:
- Returns:
RedisServerImpl instance
- async create_client(name, topic_builder=None, service_type=None, request_callback=None, **kwargs)[source]¶
Create Redis Client (request/response pattern).
- Parameters:
- Return type:
- Returns:
RedisClientImpl instance
- async create_action_server(name, topic_builder=None, handle_goal_request=None, handle_cancel_request=None, execution_callback=None, action_type=None, **kwargs)[source]¶
Create Redis Action Server (state tracking + pub/sub).
- Parameters:
name (
str) – Action server nametopic_builder (
Optional[TopicBuilder]) – TopicBuilder instance (uses provider’s default if omitted)handle_goal_request (
Callable) – Async callback for goal requestshandle_cancel_request (
Callable) – Async callback for cancel requestsexecution_callback (
Callable) – Async callback for goal executionaction_type (
type) – Action type class**kwargs – Additional action server options
- Return type:
- Returns:
RedisActionServerImpl instance
- async create_action_client(name, topic_builder=None, action_type=None, direct_response_callback=None, feedback_callback=None, goal_callback=None, **kwargs)[source]¶
Create Redis Action Client.
- Parameters:
name (
str) – Action client nametopic_builder (
Optional[TopicBuilder]) – TopicBuilder instance (uses provider’s default if omitted)action_type (
type) – Action type classdirect_response_callback (
Optional[Callable]) – Optional async callback for resultsfeedback_callback (
Optional[Callable]) – Optional async callback for feedbackgoal_callback (
Optional[Callable]) – Optional async callback for goal responses**kwargs – Additional action client options
- Return type:
- Returns:
RedisActionClientImpl instance
- get_client()[source]¶
Get underlying Redis client for advanced operations.
- Returns:
Underlying vyra_base Redis client
- Return type:
- Raises:
ProviderError – If provider not initialized
- class vyra_base.com.RedisClient(**kwargs)[source]¶
Bases:
objectUnified Redis Client for vyra_base Combines RedisAccess and RedisManipulator functionality with TLS support and streaming capabilities for professional communication
- __init__(**kwargs)¶
Wrapper for synchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.
- async connect(**kwargs)¶
Wrapper for asynchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.
- async configure_base_settings(**kwargs)¶
Wrapper for asynchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.
- async close(**kwargs)¶
Wrapper for asynchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.
- async get(**kwargs)¶
Wrapper for asynchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.
- async set(**kwargs)¶
Wrapper for asynchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.
- async delete(**kwargs)¶
Wrapper for asynchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.
- async exists(**kwargs)¶
Wrapper for asynchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.
- async clear(**kwargs)¶
Wrapper for asynchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.
- async get_all_keys(**kwargs)¶
Wrapper for asynchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.
- async get_keys_by_pattern(**kwargs)¶
Wrapper for asynchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.
- async get_type(**kwargs)¶
Wrapper for asynchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.
- async get_length(**kwargs)¶
Wrapper for asynchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.
- async publish_message(**kwargs)¶
Wrapper for asynchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.
- async subscribe_channel(**kwargs)¶
Wrapper for asynchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.
- async subscribe_to_key(**kwargs)¶
Wrapper for asynchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.
- async unsubscribe_from_key(**kwargs)¶
Wrapper for asynchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.
- async health_check(**kwargs)¶
Wrapper for asynchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.
- async subscribe_pattern(**kwargs)¶
Wrapper for asynchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.
- async publish_with_metadata(**kwargs)¶
Wrapper for asynchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.
- async create_pubsub_listener(**kwargs)¶
Wrapper for asynchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.
- async parse_message(**kwargs)¶
Wrapper for asynchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.
- async get_active_listeners(**kwargs)¶
Wrapper for asynchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.
- async remove_listener_channels(**kwargs)¶
Wrapper for asynchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.
- async stop_pubsub_listener(**kwargs)¶
Wrapper for asynchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.
- async publish_event(channel, data, add_metadata=True)[source]¶
Alias for publish_with_metadata (backward compatibility)
- async create_stream_listener(channels, callback_handler, callback_context=None)[source]¶
Alias for create_pubsub_listener (backward compatibility)
- async xadd(**kwargs)¶
Wrapper for asynchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.
- async xread(**kwargs)¶
Wrapper for asynchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.
- async xreadgroup(**kwargs)¶
Wrapper for asynchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.
- async xack(**kwargs)¶
Wrapper for asynchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.
- async xgroup_create(**kwargs)¶
Wrapper for asynchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.
- async xgroup_destroy(**kwargs)¶
Wrapper for asynchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.
- async xlen(**kwargs)¶
Wrapper for asynchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.
- async xtrim(**kwargs)¶
Wrapper for asynchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.
- async xpending(**kwargs)¶
Wrapper for asynchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.
- class vyra_base.com.UDSProvider(module_name, module_id, protocol=ProtocolType.UDS)[source]¶
Bases:
AbstractProtocolProviderProtocol provider for Unix Domain Socket transport.
Features: - Low-latency local IPC - Stream-based communication - Automatic connection management - No serialization overhead (JSON)
Requirements: - Unix-like OS (Linux, macOS) - File system access to /tmp/vyra_sockets
Limitations: - Local machine only - No pub/sub pattern (use Callable for request-response)
Example
>>> # Initialize provider >>> provider = UDSProvider(ProtocolType.UDS) >>> >>> if await provider.check_availability(): ... await provider.initialize() ... ... # Create callable (server) ... async def handle_request(req): ... return {"result": req["value"] * 2} ... ... callable = await provider.create_callable( ... "calculate", ... handle_request, ... module_name="math_service" ... ) ... ... # Create client ... client = await provider.create_callable( ... "calculate", ... None, # No callback for client ... module_name="math_service" ... ) ... result = await client.call({"value": 21})
- Parameters:
module_name (str)
module_id (str)
protocol (ProtocolType)
- __init__(module_name, module_id, protocol=ProtocolType.UDS)[source]¶
Initialize UDS provider.
- Parameters:
protocol (
ProtocolType) – Protocol type (must be UDS)module_name (
str) – Default module name for interfacesmodule_id (str)
- async check_availability()[source]¶
Check if UDS transport is available.
- Returns:
Always True on Unix-like systems
- Return type:
- async create_publisher(name, topic_builder=None, message_type=None, **kwargs)[source]¶
Create UDS Publisher (datagram sockets).
- Parameters:
name (
str) – Publisher nametopic_builder (
Optional[TopicBuilder]) – TopicBuilder instance**kwargs – Additional publisher options (module_name override)
- Return type:
- Returns:
UdsPublisherImpl instance
- async create_subscriber(name, topic_builder=None, subscriber_callback=None, message_type=None, **kwargs)[source]¶
Create UDS Subscriber (datagram sockets).
- Parameters:
- Return type:
- Returns:
UdsSubscriberImpl instance
- async create_server(name, topic_builder=None, response_callback=None, service_type=None, **kwargs)[source]¶
Create UDS Server (stream sockets).
- Parameters:
- Return type:
- Returns:
UdsServerImpl instance
- async create_client(name, topic_builder=None, service_type=None, request_callback=None, **kwargs)[source]¶
Create UDS Client (stream sockets).
- Parameters:
- Return type:
- Returns:
UdsClientImpl instance
- async create_action_server(name, topic_builder=None, handle_goal_request=None, handle_cancel_request=None, execution_callback=None, action_type=None, **kwargs)[source]¶
Create UDS Action Server (stream sockets + state messages).
- Parameters:
name (
str) – Action server nametopic_builder (
Optional[TopicBuilder]) – TopicBuilder instancehandle_goal_request (
Optional[Callable[[Any],Awaitable[bool]]]) – Async callback for goal requestshandle_cancel_request (
Optional[Callable[[Any],Awaitable[bool]]]) – Async callback for cancel requestsexecution_callback (
Optional[Callable[[Any],Awaitable[bool]]]) – Async callback for goal execution**kwargs – Additional action server options (module_name override)
- Return type:
- Returns:
UdsActionServerImpl instance
- async create_action_client(name, topic_builder=None, action_type=None, direct_response_callback=None, feedback_callback=None, goal_callback=None, **kwargs)[source]¶
Create UDS Action Client.
- Parameters:
name (
str) – Action client nametopic_builder (
Optional[TopicBuilder]) – TopicBuilder instancedirect_response_callback (
Optional[Callable[[Any],Awaitable[None]]]) – Optional async callback for resultsfeedback_callback (
Optional[Callable[[Any],Awaitable[None]]]) – Optional async callback for feedbackgoal_callback (
Optional[Callable[[Any],Awaitable[None]]]) – Optional async callback for goal responses**kwargs – Additional action client options (module_name override)
- Return type:
- Returns:
UdsActionClientImpl instance
- class vyra_base.com.GrpcServer(socket_path, max_workers=10, options=None)[source]¶
Bases:
objectgRPC Server over Unix Domain Socket.
Supports registering servicers for handling different RPC patterns.
Example:
class MyServiceServicer(my_service_pb2_grpc.MyServiceServicer): async def UnaryMethod(self, request, context): return MyResponse(result="success") async def ServerStreamingMethod(self, request, context): for i in range(10): yield MyResponse(result=f"item_{i}") server = GrpcServer("/tmp/my_service.sock") server.add_service( my_service_pb2_grpc.add_MyServiceServicer_to_server, MyServiceServicer() ) await server.start()
- async start()[source]¶
Start the gRPC server on Unix Domain Socket.
- Raises:
RuntimeError – If no servicers are registered
- class vyra_base.com.GrpcClient(**kwargs)[source]¶
Bases:
objectHigh-level gRPC Client wrapper.
Provides simple interface for gRPC communication without requiring proto files or code generation.
Features: - Unary RPC (request-response) - Automatic connection management - SSL/TLS support
- Parameters:
target – gRPC server target (e.g., “localhost:50051”)
credentials – Optional SSL credentials
options – Optional channel options
timeout – Default timeout for operations
Example
>>> client = GrpcClient(target="192.168.1.10:50051") >>> await client.connect() >>> >>> # Unary call >>> response = await client.call_method("/service/Method", request_bytes) >>> >>> await client.close()
- __init__(**kwargs)¶
Wrapper for synchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.
- async connect(**kwargs)¶
Wrapper for asynchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.
- async close(**kwargs)¶
Wrapper for asynchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.
- async call_method(**kwargs)¶
Wrapper for asynchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.
- class vyra_base.com.MqttClient(**kwargs)[source]¶
Bases:
objectHigh-level MQTT Client wrapper.
Features: - Publish/Subscribe messaging - QoS support (0, 1, 2) - TLS support - Automatic reconnection - JSON serialization
- Parameters:
broker – MQTT broker hostname
port – MQTT broker port (default: 1883, TLS: 8883)
client_id – Optional client ID
username – Optional username for authentication
password – Optional password for authentication
use_tls – Enable TLS/SSL
ca_cert – Path to CA certificate for TLS
keepalive – Keepalive interval in seconds
Example
>>> client = MqttClient( ... broker="mqtt.example.com", ... port=8883, ... use_tls=True, ... username="user", ... password="pass" ... ) >>> await client.connect() >>> await client.publish("sensors/temp", {"value": 23.5}, qos=1)
- __init__(**kwargs)¶
Wrapper for synchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.
- async connect(**kwargs)¶
Wrapper for asynchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.
- async close(**kwargs)¶
Wrapper for asynchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.
- async publish(**kwargs)¶
Wrapper for asynchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.
- async subscribe(**kwargs)¶
Wrapper for asynchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.
- async unsubscribe(**kwargs)¶
Wrapper for asynchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.
- async health_check(**kwargs)¶
Wrapper for asynchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.
- class vyra_base.com.RestClient(**kwargs)[source]¶
Bases:
objectHigh-level REST/HTTP Client wrapper.
Features: - GET, POST, PUT, DELETE, PATCH methods - JSON serialization - Headers management - Timeout support - TLS/SSL support
- Parameters:
base_url – Base URL for API
timeout – Default timeout in seconds
headers – Default headers
verify_ssl – Verify SSL certificates
- __init__(**kwargs)¶
Wrapper for synchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.
- async connect(**kwargs)¶
Wrapper for asynchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.
- async close(**kwargs)¶
Wrapper for asynchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.
- async get(**kwargs)¶
Wrapper for asynchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.
- async post(**kwargs)¶
Wrapper for asynchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.
- async put(**kwargs)¶
Wrapper for asynchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.
- async delete(**kwargs)¶
Wrapper for asynchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.
- async health_check(**kwargs)¶
Wrapper for asynchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.
- class vyra_base.com.WebSocketClient(**kwargs)[source]¶
Bases:
objectHigh-level WebSocket Client wrapper.
Features: - Send/Receive messages - JSON serialization - Automatic reconnection - Callback-based message handling
- Parameters:
url – WebSocket URL (ws:// or wss://)
timeout – Connection timeout in seconds
headers – Optional headers for connection
- __init__(**kwargs)¶
Wrapper for synchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.
- async connect(**kwargs)¶
Wrapper for asynchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.
- async close(**kwargs)¶
Wrapper for asynchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.
- async send(**kwargs)¶
Wrapper for asynchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.
- async receive(**kwargs)¶
Wrapper for asynchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.
- async listen(**kwargs)¶
Wrapper for asynchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.
- async health_check(**kwargs)¶
Wrapper for asynchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.