vyra_base.com package

Subpackages

Submodules

vyra_base.com.handler module

Communication handlers for V.Y.R.A. modules.

Handler hierarchy

IFeederHandler  (interfaces.py)  — ABC: dispatch() + get_protocol() + logging.Handler
    └── CommunicationHandler  (communication.py)  — concrete base
            ├── ROS2Handler    (ros2.py)      — ROS2 CAL publisher (via InterfaceFactory)
            ├── ZenohHandler   (zenoh.py)     — Zenoh CAL publisher (via InterfaceFactory)
            ├── RedisHandler   (redis.py)     — Redis CAL publisher (via InterfaceFactory)
            ├── UDSHandler     (uds.py)       — UDS CAL publisher (via InterfaceFactory)
            └── DBCommunicationHandler (database.py) — async DB persistence

Use HandlerFactory to create handlers without importing protocol-specific modules directly.

class vyra_base.com.handler.IFeederHandler(*args, **kwargs)[source]

Bases: Handler, ABC

Abstract base class for all VYRA feeder handlers.

Every concrete handler must implement dispatch() (async direct transport) and get_protocol(). The emit() method bridges the Python logging pipeline to dispatch() so that handlers can be attached directly to a Python logging.Logger.

Subclasses must not change the emitdispatch delegation unless there is a specific reason (e.g. DBCommunicationHandler logs the formatted string rather than the raw message object).

Variables:
  • __handlerName__ – Human-readable handler identifier. Set in each subclass.

  • __doc__ – One-line description of what this handler transports.

  • activated (bool) – Whether this handler is active. Inactive handlers are skipped during dispatch without raising errors.

Parameters:
  • args (Any)

  • kwargs (Any)

__init__(*args, **kwargs)[source]

Initialise the handler and set activated to True.

Parameters:
Return type:

None

activate()[source]

Enable this handler so it participates in message dispatch.

Return type:

None

deactivate()[source]

Disable this handler so it is silently skipped during dispatch.

Return type:

None

abstractmethod async dispatch(message)[source]

Transport message over the backing protocol.

This is the primary entry-point for the feeder. The feeder calls dispatch directly (bypassing the logging pipeline) when it already has a fully-formed domain object (e.g. StateEntry).

Parameters:

message (Any) – Domain object or raw value to transport.

Raises:

HandlerDispatchError – If the transport operation fails unrecoverably.

Return type:

None

abstractmethod get_protocol()[source]

Return the ProtocolType value this handler uses (e.g. "ros2", "zenoh", …).

Using str as return type keeps the module importable without the full vyra_base.com.core.types dependency tree.

Return type:

str

get_handler_name()[source]

Return the handler’s human-readable name.

Defaults to __handlerName__.

Return type:

str

is_available()[source]

Check whether the backing transport is currently reachable.

The default implementation always returns True. Override in transport handlers to probe the actual protocol connection.

Return type:

bool

emit(record)[source]

Bridge Python logging to dispatch().

Called by the Python logging framework. Schedules dispatch() via asyncio.get_event_loop() so that the synchronous logging call does not block the event loop.

If activated is False the record is silently discarded.

Subclasses that need a different bridging strategy (e.g. formatting the record as a plain string for a database) should override this method directly.

Parameters:

record (LogRecord) – Python LogRecord to emit.

Return type:

None

class vyra_base.com.handler.CommunicationHandler(initiator='', publisher=None, type=None)[source]

Bases: IFeederHandler

Abstract base class for all communication handlers.

Parameters:
FILL_AT_LOADING: bool = True
__init__(initiator='', publisher=None, type=None)[source]

Initialise the CommunicationHandler.

Parameters:
  • initiator (str) – Optional initiator string (used in log messages).

  • publisher (VyraPublisher) – Optional pre-created publisher object.

  • type (type) – Optional message-type information.

async dispatch(message)[source]

Transport message over the backing protocol.

Subclasses must override this method.

Parameters:

message (Any) – The message to transport.

Raises:

NotImplementedError – Always — subclasses must implement.

Return type:

None

get_protocol()[source]

Return the protocol identifier string.

Subclasses must override this method.

Raises:

NotImplementedError – Always — subclasses must implement.

Return type:

str

class vyra_base.com.handler.ROS2Handler(initiator, publisher, type)[source]

Bases: CommunicationHandler

ROS2 communication handler

Parameters:
__init__(initiator, publisher, type)[source]

Initialize the ROS2Handler.

Parameters:
  • initiator (str) – The initiator of the handler.

  • publisher (VyraPublisher) – The publisher instance to use.

  • type (Any) – The ROS2 message type.

async emit(record)[source]

Emit a log record by publishing it as a ROS2 message.

Converts Python log records to ROS2 message format and publishes to configured topic. Maps fields from log record to ROS2 message type.

Parameters:

record (LogRecord) – Python log record to publish.

class vyra_base.com.handler.ZenohHandler(initiator, publisher, type)[source]

Bases: CommunicationHandler

Zenoh transport handler

Parameters:
__init__(initiator, publisher, type)[source]

Initialise the CommunicationHandler.

Parameters:
  • initiator (str) – Optional initiator string (used in log messages).

  • publisher (VyraPublisher) – Optional pre-created publisher object.

  • type (Any) – Optional message-type information.

get_protocol()[source]

Return "zenoh".

Return type:

str

is_available()[source]

Return True if the publisher is set and connected.

Return type:

bool

async dispatch(message)[source]

Publish message via the Zenoh CAL publisher.

Parameters:

message (Any) – Domain object to publish. Must be compatible with the message type configured for the topic.

Return type:

None

async emit(record)[source]

Publish the raw record.msg object (not the formatted string).

Zenoh topics carry typed domain objects, not plain log strings.

Parameters:

record (LogRecord) – Python log record whose msg is a domain object.

Return type:

None

class vyra_base.com.handler.RedisHandler(initiator, publisher, type)[source]

Bases: CommunicationHandler

Redis pub/sub transport handler

Parameters:
__init__(initiator, publisher, type)[source]

Initialise the CommunicationHandler.

Parameters:
  • initiator (str) – Optional initiator string (used in log messages).

  • publisher (VyraPublisher) – Optional pre-created publisher object.

  • type (Any) – Optional message-type information.

get_protocol()[source]

Return "redis".

Return type:

str

is_available()[source]

Return True if the publisher is set.

Return type:

bool

async dispatch(message)[source]

Publish message via the Redis CAL publisher.

Parameters:

message (Any) – Domain object to publish.

Return type:

None

async emit(record)[source]

Publish the raw record.msg domain object.

Parameters:

record (LogRecord) – Python log record whose msg is a domain object.

Return type:

None

class vyra_base.com.handler.UDSHandler(initiator, publisher, type)[source]

Bases: CommunicationHandler

Unix Domain Socket transport handler

Parameters:
__init__(initiator, publisher, type)[source]

Initialise the CommunicationHandler.

Parameters:
  • initiator (str) – Optional initiator string (used in log messages).

  • publisher (VyraPublisher) – Optional pre-created publisher object.

  • type (Any) – Optional message-type information.

get_protocol()[source]

Return "uds".

Return type:

str

is_available()[source]

Return True if the publisher is set.

Return type:

bool

async dispatch(message)[source]

Publish message via the UDS CAL publisher.

Parameters:

message (Any) – Domain object to publish.

Return type:

None

async emit(record)[source]

Publish the raw record.msg domain object.

Parameters:

record (LogRecord) – Python log record whose msg is a domain object.

Return type:

None

class vyra_base.com.handler.DBCommunicationHandler(database, source='DBCommunicationHandler')[source]

Bases: CommunicationHandler

Database persistence handler

Parameters:
  • database (Any)

  • source (str)

__init__(database, source='DBCommunicationHandler')[source]

Initialise the CommunicationHandler.

Parameters:
  • initiator (str) – Optional initiator string (used in log messages).

  • publisher (VyraPublisher, optional) – Optional pre-created publisher object.

  • type (type, optional) – Optional message-type information.

  • database (Any)

  • source (str)

get_protocol()[source]

Return "database".

Return type:

str

is_available()[source]

Return True if a database writer is configured.

Return type:

bool

async dispatch(message)[source]

Persist message to the database with a structured schema.

Parameters:

message (Any) – Domain object or any value to persist. Converted to string for the "message" field. If the object has a __dict__ attribute the full dict is stored in "metadata".

Return type:

None

emit(record)[source]

Persist the formatted log record string to the database.

Unlike transport handlers, the DB handler stores the formatted string so that plain Python log records are human-readable.

Parameters:

record (LogRecord) – Python log record to persist.

Return type:

None

class vyra_base.com.handler.DatabaseWriter(*args, **kwargs)[source]

Bases: Protocol

Structural protocol for database backends.

Any object that implements async write(record: dict) -> None qualifies — no inheritance required.

async write(record)[source]
Return type:

None

Parameters:

record (dict)

__init__(*args, **kwargs)
class vyra_base.com.handler.ErrorLogDatabaseHandler(database, model, field_definitions, *, max_rows=10000, activated=None, source='ErrorLogDatabaseHandler')[source]

Bases: CommunicationHandler

Generic database persistence handler for log records

Parameters:
  • database (Optional[DbAccess])

  • model (Type[Base])

  • field_definitions (dict[str, FieldSpec])

  • max_rows (int)

  • activated (Optional[bool])

  • source (str)

__init__(database, model, field_definitions, *, max_rows=10000, activated=None, source='ErrorLogDatabaseHandler')[source]

Initialise the CommunicationHandler.

Parameters:
  • initiator (str) – Optional initiator string (used in log messages).

  • publisher (VyraPublisher, optional) – Optional pre-created publisher object.

  • type (type, optional) – Optional message-type information.

  • database (DbAccess | None)

  • model (Type[Base])

  • field_definitions (dict[str, FieldSpec])

  • max_rows (int)

  • activated (bool | None)

  • source (str)

Return type:

None

configure(database)[source]

Wire a DbAccess and activate.

Meant to be called once by _activate_errorfeed_db_handler() after storage has been initialised.

Parameters:

database (DbAccess) – Ready DbAccess object.

Return type:

None

get_protocol()[source]

Return "database".

Return type:

str

is_available()[source]

Return True when a DbManipulator is configured.

Return type:

bool

async dispatch(message)[source]

Validate and persist message to the database.

The message is expected to be a dict (output of _prepare_entry_for_publish()) or an object with a __dict__. The method calls _build_record() to map the raw message to column values, validates them against _field_definitions, then writes with ID-rotation.

Parameters:

message (Any) – Raw message dict or domain object.

Return type:

None

classmethod default_error_log_fields()[source]

Return the standard field definitions for the error_logs table.

Returns:

Dict of {column_name: FieldSpec}.

Return type:

dict[str, FieldSpec]

class vyra_base.com.handler.FieldSpec(python_type=typing.Any, required=True, nullable=False)[source]

Bases: object

Describes a single column in the target log table.

Parameters:
  • python_type (type) – Expected Python type for the field value (used for validation). Use Any to skip type checking.

  • required (bool) – If True the field must be present in the record dict.

  • nullable (bool) – If True the field value may be None even when required.

python_type

alias of Any

required: bool = True
nullable: bool = False
__init__(python_type=typing.Any, required=True, nullable=False)
Parameters:
Return type:

None

class vyra_base.com.handler.HandlerFactory[source]

Bases: object

Factory that creates IFeederHandler instances for a given ProtocolType.

All transport handlers use create_publisher() to obtain their VyraPublisher — the factory does not touch provider internals directly.

async static create(protocol, initiator, feeder_name, message_type, *, node=None, qos_profile=None, database=None, extra_kwargs=None)[source]

Create a handler for the given protocol.

Transport handler creation flow:

HandlerFactory.create(ProtocolType.ZENOH, ...)
  └─► InterfaceFactory.create_publisher(protocols=[ZENOH], ...)
        └─► ZenohProvider.create_publisher(...)   ← t_zenoh/provider.py
              └─► ZenohPublisherImpl(...)         ← t_zenoh/vyra_models/
  └─► ZenohHandler(initiator, publisher, message_type)
Parameters:
  • protocol (Any) – Transport protocol to use. One of ProtocolType.

  • initiator (str) – Name of the feeder owning this handler (used in log messages).

  • feeder_name (str) – Topic / service name forwarded to InterfaceFactory.create_publisher.

  • message_type (Any) – Message type for the publisher (e.g. ROS2 msg class, protobuf class, or None for dict-based protocols).

  • node (Optional[Any]) – ROS2 node (required only for ProtocolType.ROS2).

  • qos_profile (Optional[Any]) – ROS2 QoS profile (optional, ROS2 only).

  • database (Optional[Any]) – Database connection/handler (required only for ProtocolType.DATABASE / DB handlers).

  • extra_kwargs (Optional[dict]) – Extra keyword arguments forwarded to InterfaceFactory.create_publisher.

Returns:

A fully initialised handler instance ready to be added to a feeder.

Return type:

IFeederHandler

Raises:
class vyra_base.com.handler.VyraLogHandler(capacity=1000, max_message_length=10000)[source]

Bases: Handler

Lightweight in-memory ring-buffer that captures recent log records.

Each record is stored as a plain dict so it can be JSON-serialised without extra effort. The buffer holds at most capacity entries; older entries are dropped automatically (FIFO).

Usage:

handler = VyraLogHandler(capacity=1000)
handler.setLevel(logging.DEBUG)
logging.getLogger().addHandler(handler)

# Later, retrieve entries:
recent = handler.get_recent(limit=100)
Parameters:
  • capacity (int) – Maximum number of log records to keep in the ring-buffer.

  • max_message_length (int) – Maximum length of a single log message (default 10000 chars).

__init__(capacity=1000, max_message_length=10000)[source]

Initializes the instance - basically setting the formatter to None and the filter list to empty.

Parameters:
  • capacity (int)

  • max_message_length (int)

Return type:

None

emit(record)[source]

Append a formatted log record to the ring-buffer.

Return type:

None

Parameters:

record (LogRecord)

get_recent(limit=200, since_ts=None)[source]

Return up to limit most-recent log entries, ordered oldest-first.

Parameters:
  • limit (int) – Maximum number of entries to return. 0 or a negative value returns the entire buffer (subject to since_ts filter).

  • since_ts (Optional[float]) – Optional UNIX timestamp in seconds (float). When provided, only entries whose seq value (millisecond epoch) corresponds to a time >= since_ts are returned. Entries from before this point are excluded.

Returns:

List of dicts with keys level, message, logger_name, timestamp, seq.

Return type:

list

Module contents

VYRA Base Communication Module (COM)

Multi-protocol communication system with automatic protocol selection and fallback.

Architecture

Transport Layer (in-process / low-latency)

ROS2 · Zenoh · Redis · UDS

External Layer (out-of-process / cross-service)

gRPC · MQTT · REST · WebSocket · Shared Memory

Industrial Layer

Modbus · OPC UA

Public API

Decorators (use on component methods):

@remote_service — expose a method as a callable service (request/response) @remote_publisher — expose a method as a publisher (fire-and-forget) @remote_subscriber — register a method as a message subscriber @remote_actionServer — expose a method group as an action server (goal/feedback/result)

Factory:

InterfaceFactory — create server/client/publisher/subscriber/action objects

Example:

from vyra_base.com import remote_service, remote_publisher, InterfaceFactory, ProtocolType

class MyComponent:
    @remote_service(name="ping", protocols=[ProtocolType.ZENOH])
    async def ping(self, request, response=None):
        return {"pong": True}
exception vyra_base.com.CommunicationError(message, details=None, original_exception=None)[source]

Bases: Exception

Base exception for all communication errors.

Parameters:
message

Error description

details

Optional additional error context

original_exception

Original exception if wrapped

__init__(message, details=None, original_exception=None)[source]
Parameters:
exception vyra_base.com.ProtocolUnavailableError(protocol, message=None)[source]

Bases: ProtocolError

Raised when a requested protocol is not available or not installed.

Parameters:
  • protocol (str)

  • message (str | None)

__init__(protocol, message=None)[source]
Parameters:
  • protocol (str)

  • message (str | None)

exception vyra_base.com.ProtocolNotInitializedError(protocol, component=None)[source]

Bases: ProtocolError

Raised when trying to use a protocol that hasn’t been initialized.

Parameters:
  • protocol (str)

  • component (str | None)

__init__(protocol, component=None)[source]
Parameters:
  • protocol (str)

  • component (str | None)

exception vyra_base.com.TransportError(message, details=None, original_exception=None)[source]

Bases: CommunicationError

Base exception for transport layer errors.

Parameters:
exception vyra_base.com.ProviderError(message, details=None, original_exception=None)[source]

Bases: CommunicationError

Base exception for provider-related errors.

Parameters:
exception vyra_base.com.ProviderNotFoundError(provider_name, protocol=None)[source]

Bases: ProviderError

Raised when requested provider is not registered.

Parameters:
  • provider_name (str)

  • protocol (str | None)

__init__(provider_name, protocol=None)[source]
Parameters:
  • provider_name (str)

  • protocol (str | None)

exception vyra_base.com.ProviderRegistrationError(provider_name, reason)[source]

Bases: ProviderError

Raised when provider registration fails.

Parameters:
  • provider_name (str)

  • reason (str)

__init__(provider_name, reason)[source]
Parameters:
  • provider_name (str)

  • reason (str)

exception vyra_base.com.InterfaceError(message, details=None, original_exception=None)[source]

Bases: CommunicationError

Base exception for interface creation/management errors.

Parameters:
exception vyra_base.com.TServerError(message, details=None, original_exception=None)[source]

Bases: InterfaceError

Raised when transport server creation or invocation fails.

Parameters:
exception vyra_base.com.TSubscriberError(message, details=None, original_exception=None)[source]

Bases: InterfaceError

Raised when transport subscriber creation or subscription fails.

Parameters:
exception vyra_base.com.ActionServerError(message, details=None, original_exception=None)[source]

Bases: InterfaceError

Raised when actionServer creation or execution fails.

Parameters:
class vyra_base.com.ProtocolType(*values)[source]

Bases: str, Enum

Supported communication protocols.

ROS2 = 'ros2'
ZENOH = 'zenoh'
REDIS = 'redis'
UDS = 'uds'
SHARED_MEMORY = 'sharedmemory'
MQTT = 'mqtt'
GRPC = 'grpc'
REST = 'rest'
WEBSOCKET = 'websocket'
MODBUS = 'modbus'
OPCUA = 'opcua'
class vyra_base.com.InterfaceType(*values)[source]

Bases: str, Enum

VYRA communication interface types.

PUBLISHER = 'publisher'
SUBSCRIBER = 'subscriber'
SERVER = 'server'
CLIENT = 'client'
ACTION_SERVER = 'actionServer'
ACTION_CLIENT = 'actionClient'
class vyra_base.com.AccessLevel(*values)[source]

Bases: str, Enum

Access control levels for interfaces.

PUBLIC = 'public'
PROTECTED = 'protected'
PRIVATE = 'private'
INTERNAL = 'internal'
class vyra_base.com.ActionStatus(*values)[source]

Bases: Enum

Status values for action goals (ROS2 Action compatible).

Used to track the lifecycle state of action server goals.

UNKNOWN = 0
ACCEPTED = 1
EXECUTING = 2
CANCELING = 3
SUCCEEDED = 4
CANCELED = 5
ABORTED = 6
class vyra_base.com.VyraPublisher(name, topic_builder, protocol=ProtocolType.ROS2, **kwargs)[source]

Bases: VyraTransport

Publisher interface for one-way message publishing.

Represents publish-only communication (no callbacks): - ROS2: Topic Publisher - Zenoh: Publisher - Redis: Pub/Sub Publisher - UDS: Datagram Socket Sender

Parameters:
message_type: type
__init__(name, topic_builder, protocol=ProtocolType.ROS2, **kwargs)[source]
Parameters:
topic_builder: TopicBuilder
async initialize()[source]

Initialize publisher with transport layer.

Return type:

bool

async shutdown()[source]

Shutdown publisher.

Return type:

None

async publish(message)[source]

Publish a message (async).

Parameters:

message (Any) – Message data to publish

Returns:

True if published successfully

Return type:

bool

class vyra_base.com.VyraSubscriber(name, topic_builder, subscriber_callback=None, protocol=ProtocolType.ROS2, **kwargs)[source]

Bases: VyraTransport

Subscriber interface for receiving messages with callback.

Represents subscribe-only communication: - ROS2: Topic Subscriber - Zenoh: Subscriber - Redis: Pub/Sub Subscriber - UDS: Datagram Socket Receiver

Parameters:
__init__(name, topic_builder, subscriber_callback=None, protocol=ProtocolType.ROS2, **kwargs)[source]
Parameters:
topic_builder: TopicBuilder
async initialize()[source]

Initialize subscriber with transport layer.

Return type:

bool

async shutdown()[source]

Shutdown subscriber.

Return type:

None

async subscribe()[source]

Start subscribing to messages. Calls subscriber_callback for each received message.

Return type:

None

class vyra_base.com.VyraServer(name, topic_builder, response_callback=None, protocol=ProtocolType.ROS2, **kwargs)[source]

Bases: VyraTransport

Server interface for request-response communication.

Represents service server: - ROS2: Service Server - Zenoh: Queryable - Redis: Request-Response Pattern - UDS: Stream Socket RPC Server

Parameters:
__init__(name, topic_builder, response_callback=None, protocol=ProtocolType.ROS2, **kwargs)[source]
Parameters:
topic_builder: TopicBuilder
async initialize()[source]

Initialize server with transport layer.

Return type:

bool

async shutdown()[source]

Shutdown server.

Return type:

None

async serve()[source]

Start serving requests. Calls response_callback for each request and returns response.

Return type:

None

class vyra_base.com.VyraClient(name, topic_builder, request_callback=None, protocol=ProtocolType.ROS2, **kwargs)[source]

Bases: VyraTransport

Client interface for request-response communication.

Represents service client: - ROS2: Service Client - Zenoh: Query Client - Redis: Request-Response Pattern - UDS: Stream Socket RPC Client

Parameters:
__init__(name, topic_builder, request_callback=None, protocol=ProtocolType.ROS2, **kwargs)[source]
Parameters:
topic_builder: TopicBuilder
async initialize()[source]

Initialize client with transport layer.

Return type:

bool

async shutdown()[source]

Shutdown client.

Return type:

None

async call(request, timeout=5.0)[source]

Send request and await response (async).

Parameters:
  • request (Any) – Request data

  • timeout (float) – Timeout in seconds

Return type:

Any

Returns:

Response data

class vyra_base.com.VyraActionServer(name, topic_builder, handle_goal_request=None, handle_cancel_request=None, execution_callback=None, protocol=ProtocolType.ROS2, **kwargs)[source]

Bases: VyraTransport

Action Server interface for long-running tasks.

Represents async task server with progress feedback: - ROS2: Action Server - Zenoh: Callable + Publishers (control/feedback/result) - Redis: State tracking with Pub/Sub - UDS: Stream-based state messages

Parameters:
__init__(name, topic_builder, handle_goal_request=None, handle_cancel_request=None, execution_callback=None, protocol=ProtocolType.ROS2, **kwargs)[source]
Parameters:
topic_builder: TopicBuilder
async initialize()[source]

Initialize action server with transport layer.

Return type:

bool

async shutdown()[source]

Shutdown action server.

Return type:

None

async start()[source]

Start action server. Handles goal requests, cancellations, and executions.

Return type:

None

class vyra_base.com.VyraActionClient(name, topic_builder, direct_response_callback=None, feedback_callback=None, goal_callback=None, protocol=ProtocolType.ROS2, **kwargs)[source]

Bases: VyraTransport

Action Client interface for long-running tasks.

Represents async task client: - ROS2: Action Client - Zenoh: Query + Subscribers (control/feedback/result) - Redis: State tracking with Pub/Sub - UDS: Stream-based state messages

Parameters:
__init__(name, topic_builder, direct_response_callback=None, feedback_callback=None, goal_callback=None, protocol=ProtocolType.ROS2, **kwargs)[source]
Parameters:
topic_builder: TopicBuilder
async initialize()[source]

Initialize action client with transport layer.

Return type:

bool

async shutdown()[source]

Shutdown action client.

Return type:

None

async send_goal(goal, **kwargs)[source]

Send goal to action server.

Parameters:
  • goal (Any) – Goal data

  • **kwargs – Additional parameters (timeout, etc.)

Return type:

Any

Returns:

Goal handle or result

async cancel_goal(goal_handle)[source]

Cancel a running goal.

Parameters:

goal_handle (Any) – Handle returned from send_goal

Returns:

True if cancellation accepted

Return type:

bool

class vyra_base.com.IServiceHandler[source]

Bases: ABC

Abstract interface for service (request-response) handlers.

Implement this interface for services that handle single request-response interactions like ROS2 Services, gRPC unary calls, REST endpoints.

Example

class CalculateHandler(IServiceHandler):
async def handle_request(self, request):

result = request.x + request.y return {“result”: result}

abstractmethod async handle_request(request)[source]

Handle incoming service request.

Parameters:

request (Any) – Request data/object

Return type:

Any

Returns:

Response data (dict or response object)

Example

async def handle_request(self, request):

# Process request result = process(request.data) # Return response return {“result”: result, “status”: “success”}

class vyra_base.com.IActionHandler[source]

Bases: ABC

Abstract interface for action (long-running task) handlers.

REQUIRED for all ActionServer implementations. Defines the three-phase lifecycle of action goals:

  1. on_goal: Accept or reject incoming goals

  2. execute: Main execution logic with feedback

  3. on_cancel: Handle cancellation requests

Example

class BatchProcessorHandler(IActionHandler):
async def on_goal(self, goal):

return goal.count <= 100

async def execute(self, handle):
for i in range(handle.goal.count):
if handle.is_cancel_requested():

handle.canceled() return {“processed”: i}

process_item(i) handle.publish_feedback({“progress”: i+1})

handle.succeed() return {“processed”: handle.goal.count}

async def on_cancel(self):

return True

abstractmethod async on_goal(goal)[source]

Called when a new goal is received.

Decide whether to accept or reject the goal based on its parameters, current system state, resource availability, etc.

Parameters:

goal (Any) – Goal request data

Returns:

True to accept, False to reject

Return type:

bool

Example

async def on_goal(self, goal):

# Check if goal is valid if goal.count <= 0 or goal.count > 1000:

logger.warning(f”Rejecting invalid goal count: {goal.count}”) return False

# Check system resources if self.is_busy():

logger.info(“Rejecting goal: system busy”) return False

return True

abstractmethod async execute(handle)[source]

Execute the goal and return result.

This is the main execution logic. Use the goal_handle to: - Access goal parameters via handle.goal - Publish feedback via handle.publish_feedback() - Check cancellation via handle.is_cancel_requested() - Set final status via handle.succeed()/abort()/canceled()

Parameters:

handle (IGoalHandle) – Goal handle for accessing goal data and publishing feedback

Returns:

Result data to return to client

Return type:

Dict[str, Any]

Example

async def execute(self, handle):

total = handle.goal.count

for i in range(total):

# Check for cancellation if handle.is_cancel_requested():

handle.canceled() return {

“processed”: i, “status”: “CANCELED” # ActionStatus.CANCELED from vyra_base.com.core.types

}

# Do work await self.process_item(i)

# Publish feedback await handle.publish_feedback({

“progress”: i + 1, “total”: total, “percent”: (i + 1) / total * 100

})

# Success handle.succeed() return {

“processed”: total, “status”: “SUCCEEDED” # ActionStatus.SUCCEEDED from vyra_base.com.core.types

}

abstractmethod async on_cancel()[source]

Called when cancellation is requested.

Decide whether to accept the cancellation request. Some goals may not be cancellable (e.g., already near completion, critical operations).

Returns:

True to accept cancellation, False to reject

Return type:

bool

Example

async def on_cancel(self):

# Check if safe to cancel if self.in_critical_section():

logger.warning(“Cannot cancel: in critical section”) return False

logger.info(“Accepting cancellation request”) return True

class vyra_base.com.IGoalHandle[source]

Bases: ABC

Abstract interface for action goal handles.

Provides methods for: - Publishing feedback during execution - Checking cancellation requests - Setting goal status (succeed/abort/cancel)

abstractmethod async publish_feedback(feedback)[source]

Publish feedback about goal progress.

Parameters:

feedback (Dict[str, Any]) – Dictionary with progress information

Return type:

None

Example:

await goal_handle.publish_feedback({
    "progress": 50,
    "message": "Processing..."
})
abstractmethod is_cancel_requested()[source]

Check if cancellation has been requested.

Return type:

bool

abstractmethod succeed()[source]

Mark goal as succeeded.

Return type:

None

abstractmethod abort()[source]

Mark goal as aborted (error occurred).

Return type:

None

abstractmethod canceled()[source]

Mark goal as canceled (by request).

Return type:

None

abstract property goal: Any

Get the goal request data.

Returns:

Goal request object with parameters

class vyra_base.com.GoalHandle(goal_id, goal, feedback_fn)[source]

Bases: IGoalHandle

Transport-agnostic goal handle passed to execution callbacks.

This is the main communication object between the transport layer and the application. All action servers create a GoalHandle for each accepted goal and pass it to the user’s execution_callback.

Implements the IGoalHandle interface so user code typed against the abstract interface works seamlessly with the concrete implementation.

Example usage inside an execution_callback:

async def my_execute(goal_handle: GoalHandle):
    for i in range(10):
        await goal_handle.publish_feedback({'progress': i * 10})
        if goal_handle.is_cancel_requested():
            goal_handle.canceled()
            return {'status': 'canceled'}
    goal_handle.succeed()
    return {'done': True}
Parameters:
__init__(goal_id, goal, feedback_fn)[source]
Parameters:
  • goal_id (str) – Unique identifier for this goal execution (UUID string).

  • goal (Any) – The goal payload received from the client.

  • feedback_fn (Callable) – Async callable (goal_id: str, feedback: Any) -> None provided by the transport layer to publish feedback.

Return type:

None

goal_id: str
status: str
property goal: Any

The original goal payload sent by the client.

async publish_feedback(feedback)[source]

Send feedback to all connected clients.

Return type:

None

Parameters:

feedback (Any)

is_cancel_requested()[source]

Return True if the client requested cancellation.

Return type:

bool

succeed()[source]

Mark goal as succeeded.

Return type:

None

abort()[source]

Mark goal as aborted due to an error.

Return type:

None

canceled()[source]

Mark goal as canceled by client request.

Return type:

None

request_cancel()[source]

Called by the transport layer when a cancel request arrives.

Return type:

None

set_succeeded(result=None)[source]

Alias for succeed().

Return type:

None

Parameters:

result (Any)

set_aborted(reason='')[source]

Alias for abort().

Return type:

None

Parameters:

reason (str)

set_canceled()[source]

Alias for canceled().

Return type:

None

class vyra_base.com.InterfaceFactory[source]

Bases: object

Factory for creating communication interfaces with protocol fallback.

Features: - Automatic protocol selection - Fallback chain (ROS2 → SharedMemory → UDS → Redis → gRPC) - Protocol availability checking - Graceful degradation

Example

>>> # Auto-select best available protocol
>>> server = await InterfaceFactory.create_server(
...     "my_service",
...     callback=handle_request
... )
>>>
>>> # Explicit protocol with fallback
>>> server = await InterfaceFactory.create_server(
...     "my_service",
...     protocols=[ProtocolType.ROS2, ProtocolType.SHARED_MEMORY],
...     callback=handle_request
... )
>>>
>>> # Publisher for pub/sub
>>> publisher = await InterfaceFactory.create_publisher(
...     "temperature",
...     protocols=[ProtocolType.REDIS, ProtocolType.MQTT]
... )
PUBLISHER_FALLBACK = [ProtocolType.ZENOH, ProtocolType.ROS2, ProtocolType.REDIS, ProtocolType.UDS]
SUBSCRIBER_FALLBACK = [ProtocolType.ZENOH, ProtocolType.ROS2, ProtocolType.REDIS, ProtocolType.UDS]
SERVER_FALLBACK = [ProtocolType.ZENOH, ProtocolType.ROS2, ProtocolType.REDIS, ProtocolType.UDS]
CLIENT_FALLBACK = [ProtocolType.ZENOH, ProtocolType.ROS2, ProtocolType.REDIS, ProtocolType.UDS]
ACTION_SERVER_FALLBACK = [ProtocolType.ZENOH, ProtocolType.ROS2, ProtocolType.REDIS, ProtocolType.UDS]
ACTION_CLIENT_FALLBACK = [ProtocolType.ZENOH, ProtocolType.ROS2, ProtocolType.REDIS, ProtocolType.UDS]
static loop_check_pending()[source]

Check pending interfaces and initialize if callbacks are now available. Should be called periodically (e.g. in main loop) to handle late callback registration.

static register_provider(provider)[source]

Register one or more protocol providers.

Parameters:

provider (Union[AbstractProtocolProvider, list]) – Single provider instance or list of providers

Return type:

None

static unregister_provider(protocol)[source]

Unregister a protocol provider.

Parameters:

protocol (ProtocolType) – Protocol type to unregister

Return type:

None

async static create_publisher(name, protocols=None, **kwargs)[source]

Create Publisher with automatic protocol selection.

Only the own (default) provider is used.

Parameters:
  • name (str) – Topic/channel name

  • protocols (Optional[List[ProtocolType]]) – Preferred protocols (fallback if not specified)

  • **kwargs – Additional parameters (message_type, qos, etc.)

Returns:

Initialized publisher

Return type:

VyraPublisher

Raises:

InterfaceError – If no protocol available

async static create_subscriber(name, subscriber_callback, protocols=None, module_id=None, module_name=None, **kwargs)[source]

Create Subscriber with automatic protocol selection.

Parameters:
  • name (str) – Topic/channel name

  • subscriber_callback (Callable) – Async callback for received messages

  • protocols (Optional[List[ProtocolType]]) – Preferred protocols (fallback if not specified)

  • module_id (Optional[str]) – Optional target module ID. When provided the registry looks up (or creates) a provider scoped to that module.

  • module_name (Optional[str]) – Human-readable name of the target module (required when module_id is supplied).

  • **kwargs – Additional parameters (message_type, qos, etc.)

Returns:

Initialized subscriber or None if pending

Return type:

Optional[VyraSubscriber]

Raises:

InterfaceError – If no protocol available

async static create_server(name, response_callback, protocols=None, **kwargs)[source]

Create Server with automatic protocol selection.

Only the own (default) provider is used. module_id / module_name are determined by the provider that was registered without a module_id.

Parameters:
  • name (str) – Service name

  • response_callback (Optional[Callable]) – Async callback for requests

  • protocols (Optional[List[ProtocolType]]) – Preferred protocols (fallback if not specified)

  • **kwargs – Additional parameters (service_type, qos, etc.)

Returns:

Initialized server or None if pending

Return type:

Optional[VyraServer]

Raises:

InterfaceError – If no protocol available

async static create_client(name, protocols=None, module_id=None, module_name=None, **kwargs)[source]

Create Client with automatic protocol selection.

Parameters:
  • name (str) – Service name

  • protocols (Optional[List[ProtocolType]]) – Preferred protocols (fallback if not specified)

  • module_id (Optional[str]) – Optional target module ID. When provided the registry looks up (or creates) a provider scoped to that module.

  • module_name (Optional[str]) – Human-readable name of the target module (required when module_id is supplied).

  • **kwargs – Additional parameters (service_type, qos, etc.)

Returns:

Initialized client

Return type:

VyraClient

Raises:

InterfaceError – If no protocol available

async static create_action_server(name, handle_goal_request, handle_cancel_request, execution_callback, protocols=None, **kwargs)[source]

Create Action Server with automatic protocol selection.

Only the own (default) provider is used.

Parameters:
  • name (str) – Action name

  • handle_goal_request (Optional[Callable]) – Async callback to accept/reject goals

  • handle_cancel_request (Optional[Callable]) – Async callback for cancellations

  • execution_callback (Optional[Callable]) – Async callback for goal execution

  • protocols (Optional[List[ProtocolType]]) – Preferred protocols (fallback if not specified)

  • **kwargs – Additional parameters (action_type, qos, etc.)

Returns:

Initialized action server or None if pending

Return type:

Optional[VyraActionServer]

Raises:

InterfaceError – If no protocol available

async static create_action_client(name, direct_response_callback=None, feedback_callback=None, goal_callback=None, protocols=None, module_id=None, module_name=None, **kwargs)[source]

Create Action Client with automatic protocol selection.

Parameters:
  • name (str) – Action name

  • direct_response_callback (Optional[Callable]) – Async callback for goal acceptance

  • feedback_callback (Optional[Callable]) – Async callback for feedback

  • goal_callback (Optional[Callable]) – Async callback for final result

  • protocols (Optional[List[ProtocolType]]) – Preferred protocols (fallback if not specified)

  • module_id (Optional[str]) – Optional target module ID. When provided the registry looks up (or creates) a provider scoped to that module.

  • module_name (Optional[str]) – Human-readable name of the target module (required when module_id is supplied).

  • **kwargs – Additional parameters (action_type, qos, etc.)

Returns:

Initialized action client or None if pending

Return type:

Optional[VyraActionClient]

Raises:

InterfaceError – If no protocol available

static get_available_protocols(interface_type, protocols)[source]

Customize fallback chain for interface type.

Parameters:
  • interface_type (str) – “server”, “publisher”, or “actionServer”

  • protocols (List[ProtocolType]) – Ordered list of protocols to try

Return type:

List[ProtocolType]

Example

>>> # Prioritize SharedMemory over ROS2
>>> InterfaceFactory.set_fallback_chain(
...     "server",
...     [ProtocolType.SHARED_MEMORY, ProtocolType.ROS2, ProtocolType.UDS]
... )
async static create_from_blueprint(blueprint, **override_kwargs)[source]

Create interface from a HandlerBlueprint.

This is the primary method for two-phase initialization: 1. Blueprint created during decoration/configuration 2. Interface created when blueprint is bound with callback

Parameters:
  • blueprint (HandlerBlueprint) – HandlerBlueprint instance

  • **override_kwargs – Override blueprint metadata

Return type:

Union[VyraServer, VyraPublisher, VyraSubscriber, VyraActionServer, None]

Returns:

Created interface or None if pending (callback not bound yet)

Example

>>> from vyra_base.com.core.blueprints import ServiceBlueprint
>>> blueprint = ServiceBlueprint(name="my_service")
>>> # ... later when callback available ...
>>> blueprint.bind_callback(my_callback)
>>> server = await InterfaceFactory.create_from_blueprint(blueprint)
async static bind_pending_callback(name, callback=None, handle_goal_request=None, handle_cancel_request=None, execution_callback=None)[source]

Bind one or more callbacks to a pending interface and initialize it.

For servers/subscribers, use the callback parameter. For action servers, use the dedicated handle_goal_request, handle_cancel_request and execution_callback parameters. If a parameter is None, the previously stored value is kept.

Parameters:
  • name (str) – Interface name (must match pending registration)

  • callback (Optional[Callable]) – Callback for server (response_callback) or subscriber (subscriber_callback)

  • handle_goal_request (Optional[Callable]) – Action server goal-accept callback

  • handle_cancel_request (Optional[Callable]) – Action server cancel callback

  • execution_callback (Optional[Callable]) – Action server execution callback

Return type:

Union[VyraServer, VyraSubscriber, VyraActionServer, None]

Returns:

Initialized interface or None if not found in pending

Example

>>> # Phase 1: Create server without callback
>>> await InterfaceFactory.create_server("my_service", response_callback=None)
>>> # Phase 2: Bind callback later
>>> server = await InterfaceFactory.bind_pending_callback(
...     "my_service", callback=my_callback_function
... )
>>>
>>> # Action server – bind all three callbacks
>>> action_srv = await InterfaceFactory.bind_pending_callback(
...     "my_action",
...     handle_goal_request=on_goal,
...     handle_cancel_request=on_cancel,
...     execution_callback=execute,
... )
async static process_pending_interfaces()[source]

Process all pending interfaces, attempting to initialize those with callbacks.

This should be called periodically (e.g., from entity event loop) or after binding callbacks via CallbackRegistry.

Return type:

Dict[str, bool]

Returns:

Dictionary mapping interface names to initialization success status

Example

>>> # In entity event loop
>>> while running:
...     results = await InterfaceFactory.process_pending_interfaces()
...     await asyncio.sleep(1.0)
static get_pending_count()[source]

Get count of pending interfaces awaiting callback binding.

Return type:

int

static list_pending()[source]

Get list of pending interface names.

Return type:

List[str]

static has_pending(name)[source]

Check if an interface is pending callback binding.

Return type:

bool

Parameters:

name (str)

vyra_base.com.remote_service(name=None, protocols=None, auto_register=True, namespace=None, **kwargs)[source]

Decorator for request/response communication (ROS2 Service, gRPC, etc.)

Creates a ServiceBlueprint and registers it in CallbackRegistry. The decorated method will be bound to the blueprint during component initialization.

Parameters:
  • name (Optional[str]) – Service name (defaults to function name)

  • protocols (Optional[List[ProtocolType]]) – Preferred protocols (will use factory fallback if None)

  • auto_register (bool) – Whether to auto-register blueprint (default True)

  • namespace (Optional[str]) – Optional module namespace for blueprint registration

  • **kwargs – Additional metadata (service_type, qos, etc.)

Example

>>> class MyComponent(OperationalStateMachine):
...     @remote_service()
...     async def calculate(self, request, response=None):
...         result = request["x"] + request["y"]
...         return {"result": result}
...
...     @remote_service(protocols=[ProtocolType.ROS2, ProtocolType.ZENOH])
...     async def process_data(self, request, response=None):
...         # This method prioritizes ROS2, falls back to Zenoh
...         return {"status": "processed"}

Note

  • Methods must accept (request, response=None) signature

  • Response parameter maintained for ROS2 compatibility

  • Async methods recommended (sync methods wrapped automatically)

  • Blueprint created during decoration, callback bound during initialization

vyra_base.com.remote_publisher(name=None, protocols=None, auto_register=True, namespace=None, **kwargs)[source]

Decorator for pub/sub communication (publisher side).

Creates a PublisherBlueprint and registers it in CallbackRegistry. The decorated method becomes a publisher interface.

Parameters:
  • name (Optional[str]) – Topic/channel name (defaults to function name)

  • protocols (Optional[List[ProtocolType]]) – Preferred protocols (will use factory fallback if None)

  • auto_register (bool) – Whether to auto-register blueprint

  • namespace (Optional[str]) – Optional module namespace

  • **kwargs – Additional metadata (message_type, qos, retain, etc.)

Example

>>> class Component:
...     @remote_publisher(protocols=[ProtocolType.REDIS])
...     async def publish_status(self, message):
...         '''This method will become a publisher'''
...         pass
...
...     async def some_task(self):
...         # Publish status update
...         await self.publish_status({"state": "running"})

Note

  • Publisher creation happens via InterfaceFactory during binding

  • The decorated method is replaced with actual publishing logic

vyra_base.com.remote_subscriber(name=None, protocols=None, auto_register=True, namespace=None, **kwargs)[source]

Decorator for subscription callbacks (subscriber side of pub/sub).

Creates a SubscriberBlueprint and registers it in CallbackRegistry. The decorated method will be called when messages are received on the topic.

Parameters:
  • name (Optional[str]) – Topic/channel name (defaults to function name)

  • protocols (Optional[List[ProtocolType]]) – Preferred protocols (will use factory fallback if None)

  • auto_register (bool) – Whether to auto-register blueprint

  • namespace (Optional[str]) – Optional module namespace

  • **kwargs – Additional metadata (message_type, qos, etc.)

Example

>>> class Component:
...     @remote_subscriber(topic="/status")
...     async def on_status_update(self, message):
...         logger.info(f"Received status: {message}")

Note

  • Subscriber is automatically created and bound during registration

  • The decorated method is called for each received message

vyra_base.com.get_decorated_methods(obj)[source]

Get all methods decorated with communication decorators.

Scans an object for decorated methods and returns them organized by type. Also retrieves associated blueprints from CallbackRegistry.

Parameters:

obj (Any) – Object to inspect (typically a component instance)

Returns:

{

“servers”: […], # @remote_service methods “publishers”: […], # @remote_publisher methods “subscribers”: […], # @remote_subscriber methods “actions”: […] # @remote_actionServer methods

}

Each entry contains: {

“name”: str, # Interface name “method”: Callable, # Bound method “protocols”: List, # Protocol preferences “kwargs”: dict, # Additional metadata “blueprint”: Blueprint # Associated blueprint (if registered)

}

Return type:

dict

Example

>>> component = MyComponent()
>>> methods = get_decorated_methods(component)
>>> print(f"Found {len(methods['callables'])} services")
>>> for svc in methods['callables']:
...     if svc['blueprint'] and not svc['blueprint'].is_bound():
...         svc['blueprint'].bind_callback(svc['method'])
vyra_base.com.bind_decorated_callbacks(component, namespace=None, force_rebind=False)[source]

Bind all decorated methods from a component to their registered blueprints.

This is the Phase 2 of the two-phase initialization, typically called during component initialization (e.g., in Component.__init__ or set_interfaces()).

Parameters:
  • component (Any) – Component instance with decorated methods

  • namespace (Optional[str]) – Optional namespace for blueprint lookup

  • force_rebind (bool) – If True, unbind existing callbacks before binding

Return type:

Dict[str, bool]

Returns:

Dictionary mapping interface names to binding success status

Example

>>> component = MyComponent()
>>> results = bind_decorated_callbacks(component, namespace="v2_modulemanager")
>>> print(f"Successfully bound {sum(results.values())} interfaces")
>>> failed = [name for name, success in results.items() if not success]
>>> if failed:
...     logger.warning(f"Failed to bind: {failed}")
class vyra_base.com.HandlerBlueprint(name, interface_type, protocols=<factory>, metadata=<factory>, _callback=None)[source]

Bases: ABC

Base blueprint for all communication interfaces.

A blueprint represents the definition of an interface (what it should be), separate from its implementation (the callback that handles requests).

Parameters:
name

Unique identifier for this interface

interface_type

Type of communication pattern

protocols

Preferred transport protocols (with fallback)

metadata

Additional configuration (from JSON or decorator)

_callback

Implementation function (bound during phase 2)

Example

>>> blueprint = ServiceBlueprint(
...     name="calculate",
...     protocols=[ProtocolType.ROS2],
...     metadata={"qos": 10}
... )
>>> blueprint.is_bound()
False
>>> blueprint.bind_callback(my_calculate_function)
>>> blueprint.is_bound()
True
name: str
interface_type: InterfaceType
protocols: List[ProtocolType]
metadata: Dict[str, Any]
property callback: Callable | None

Get the bound callback (may be None if not yet bound)

is_bound()[source]

Check if a callback has been bound to this blueprint

Return type:

bool

bind_callback(callback)[source]

Bind a callback implementation to this blueprint.

Parameters:

callback (Callable) – Function to handle requests/events

Raises:
Return type:

None

unbind_callback()[source]

Remove the current callback binding.

Return type:

Optional[Callable]

Returns:

The previously bound callback, or None

get_metadata(key, default=None)[source]

Get metadata value by key

Return type:

Any

Parameters:
update_metadata(**kwargs)[source]

Update metadata fields

Return type:

None

__init__(name, interface_type, protocols=<factory>, metadata=<factory>, _callback=None)
Parameters:
Return type:

None

class vyra_base.com.ServiceBlueprint(name, protocols=None, metadata=None, service_type=None)[source]

Bases: HandlerBlueprint

Blueprint for request/response communication (ROS2 Service, gRPC, etc.)

Expected callback signature:

async def handler(request, response=None) -> Union[dict, response_type] or def handler(request, response=None) -> Union[dict, response_type]

Example

>>> @remote_service()
... async def calculate(self, request, response=None):
...     result = request["x"] + request["y"]
...     return {"result": result}
Parameters:
__init__(name, protocols=None, metadata=None, service_type=None)[source]
Parameters:
class vyra_base.com.PublisherBlueprint(name, protocols=None, metadata=None, message_type=None)[source]

Bases: HandlerBlueprint

Blueprint for pub/sub communication (ROS2 Topic, Redis, MQTT, etc.)

Note: Publishers don’t typically have callbacks - they’re used to send data. The callback here is for the publish() method wrapper.

Expected callback signature:

async def publisher(self, message: dict) -> None or def publisher(self, message: dict) -> None

Example

>>> @remote_publisher
... async def publish_status(self, message):
...     # Actual publishing handled by transport
...     pass
Parameters:
__init__(name, protocols=None, metadata=None, message_type=None)[source]
Parameters:
class vyra_base.com.SubscriberBlueprint(name, protocols=None, metadata=None, message_type=None)[source]

Bases: HandlerBlueprint

Blueprint for subscription callbacks (receives published data)

Expected callback signature:

async def handler(self, message) -> None or def handler(self, message) -> None

Example

>>> @remote_subscriber
... async def on_status_update(self, message):
...     logger.info(f"Received: {message}")
Parameters:
__init__(name, protocols=None, metadata=None, message_type=None)[source]
Parameters:
class vyra_base.com.ActionBlueprint(name, protocols=None, metadata=None, action_type=None)[source]

Bases: HandlerBlueprint

Blueprint for ActionServer with multiple lifecycle callbacks.

REQUIRED callbacks: - execute: Main execution (required) - on_goal: Accept/reject goal (optional, default: accept) - on_cancel: Accept/reject cancel (optional, default: accept)

The goal_handle provides:
  • goal_handle.goal: The goal request data

  • goal_handle.publish_feedback(feedback): Send progress updates

  • goal_handle.is_cancel_requested(): Check for cancellation

  • goal_handle.succeed()/abort()/canceled(): Set final status

Example

>>> @remote_actionServer.on_goal(name="process_batch")
... async def accept_goal(self, goal_request):
...     return goal_request.count <= 100
...
>>> @remote_actionServer.on_cancel(name="process_batch")
... async def cancel_batch(self, goal_handle):
...     return True
...
>>> @remote_actionServer.execute(name="process_batch")
... async def execute_batch(self, goal_handle):
...     for i in range(goal_handle.goal.count):
...         if goal_handle.is_cancel_requested():
...             goal_handle.canceled()
...             return {"processed": i}
...         process_item(i)
...         goal_handle.publish_feedback({"progress": i+1})
...     goal_handle.succeed()
...     return {"processed": goal_handle.goal.count}
Parameters:
__init__(name, protocols=None, metadata=None, action_type=None)[source]
Parameters:
property callback: Callable | None

Returns ‘execute’ callback for backward compatibility.

bind_callback(callback, callback_type='execute')[source]

Bind a specific callback type.

Parameters:
  • callback (Callable) – Function to bind

  • callback_type (str) – One of ‘on_goal’, ‘on_cancel’, ‘execute’

Raises:
Return type:

None

bind_callbacks(**callbacks)[source]

Bind multiple callbacks at once.

Return type:

None

Example

blueprint.bind_callbacks(

on_goal=handle_goal, on_cancel=handle_cancel, execute=execute_task

)

get_callback(callback_type)[source]

Get specific callback by type.

Return type:

Optional[Callable]

Parameters:

callback_type (str)

is_bound(callback_type=None)[source]

Check if callbacks are bound.

Parameters:

callback_type (Optional[str]) – Check specific callback, or None for “required bound”

Returns:

If callback_type given, checks that one.

If None, checks if ‘execute’ (required) is bound.

Return type:

bool

is_fully_bound()[source]

Check if ALL callbacks are bound.

Return type:

bool

unbind_callback(callback_type='execute')[source]

Remove specific callback binding.

Return type:

Optional[Callable]

Parameters:

callback_type (str)

class vyra_base.com.CallbackRegistry[source]

Bases: object

Thread-safe global registry for handler blueprints.

Features: - Per-module namespacing via path prefixes - Late binding support (register blueprint, bind callback later) - Thread-safe operations - Debugging tools (list unbound, statistics)

The registry uses a singleton pattern via class methods.

classmethod initialize()[source]

Initialize the registry (idempotent)

Return type:

None

classmethod clear()[source]

Clear all blueprints (primarily for testing)

Return type:

None

classmethod register_blueprint(blueprint, namespace=None, override=False)[source]

Register a blueprint in the registry.

Parameters:
Return type:

str

Returns:

Fully qualified name (with namespace if provided)

Raises:

ValueError – If blueprint with same name already exists and override=False

Example

>>> blueprint = ServiceBlueprint(name="initialize", ...)
>>> full_name = CallbackRegistry.register_blueprint(
...     blueprint, namespace="v2_modulemanager"
... )
>>> print(full_name)
'v2_modulemanager/initialize'
classmethod get_blueprint(name, namespace=None)[source]

Retrieve a blueprint by name.

Parameters:
  • name (str) – Blueprint name (can be short name or fully qualified)

  • namespace (Optional[str]) – Optional namespace to prepend

Return type:

Union[ServiceBlueprint, PublisherBlueprint, SubscriberBlueprint, ActionBlueprint, None]

Returns:

Blueprint if found, None otherwise

Example

>>> # With explicit namespace
>>> bp = CallbackRegistry.get_blueprint("initialize", namespace="v2_modulemanager")
>>> # With fully qualified name
>>> bp = CallbackRegistry.get_blueprint("v2_modulemanager/initialize")
classmethod bind_callback(name, callback, callback_type='default', namespace=None)[source]

Bind a callback to a registered blueprint.

For ActionBlueprint, callback_type specifies which lifecycle callback to bind (‘on_goal’, ‘on_cancel’, ‘execute’). For other blueprints, use ‘default’.

Parameters:
  • name (str) – Blueprint name to bind to

  • callback (Callable) – Implementation function

  • callback_type (str) – For ActionBlueprint: ‘on_goal’, ‘on_cancel’, ‘execute’ For others: ‘default’ (ignored)

  • namespace (Optional[str]) – Optional namespace

Return type:

bool

Returns:

True if binding succeeded, False if blueprint not found

Raises:
  • RuntimeError – If blueprint already has a bound callback

  • ValueError – If callback signature is invalid or callback_type invalid

Example

>>> # Service binding
>>> async def my_service(request, response=None):
...     return {"status": "ok"}
>>> CallbackRegistry.bind_callback("my_service", my_service)
True
>>> # ActionServer binding (multi-callback)
>>> async def execute_action(goal_handle):
...     return {"result": "done"}
>>> CallbackRegistry.bind_callback(
...     "process", execute_action, callback_type='execute'
... )
True
classmethod unbind_callback(name, namespace=None)[source]

Remove callback binding from a blueprint.

Parameters:
  • name (str) – Blueprint name

  • namespace (Optional[str]) – Optional namespace

Return type:

bool

Returns:

True if unbinding succeeded, False if blueprint not found

classmethod list_all(namespace=None, interface_type=None)[source]

List all registered blueprint names.

Parameters:
  • namespace (Optional[str]) – Filter by namespace prefix

  • interface_type (Optional[InterfaceType]) – Filter by interface type

Return type:

List[str]

Returns:

List of blueprint names

classmethod list_unbound(namespace=None)[source]

List blueprints that don’t have callbacks bound yet.

Useful for debugging initialization issues.

Parameters:

namespace (Optional[str]) – Filter by namespace

Return type:

List[str]

Returns:

List of unbound blueprint names

Example

>>> unbound = CallbackRegistry.list_unbound()
>>> if unbound:
...     logger.warning(f"Unbound interfaces: {unbound}")
classmethod list_bound(namespace=None)[source]

List blueprints that have callbacks bound.

Parameters:

namespace (Optional[str]) – Filter by namespace

Return type:

List[str]

Returns:

List of bound blueprint names

classmethod get_statistics(namespace=None)[source]

Get registry statistics.

Parameters:

namespace (Optional[str]) – Filter by namespace

Return type:

Dict[str, int]

Returns:

Dictionary with counts by type and binding status

Example

>>> stats = CallbackRegistry.get_statistics()
>>> print(stats)
{'total': 15, 'bound': 12, 'unbound': 3, 'services': 8, ...}
classmethod remove_blueprint(name, namespace=None)[source]

Remove a blueprint from the registry.

Parameters:
  • name (str) – Blueprint name

  • namespace (Optional[str]) – Optional namespace

Return type:

bool

Returns:

True if removed, False if not found

classmethod exists(name, namespace=None)[source]

Check if a blueprint exists in the registry.

Parameters:
  • name (str) – Blueprint name

  • namespace (Optional[str]) – Optional namespace

Return type:

bool

Returns:

True if blueprint exists

classmethod debug_print(namespace=None)[source]

Print registry contents for debugging.

For ActionBlueprints, shows multi-callback status (on_goal/on_cancel/execute).

Parameters:

namespace (Optional[str]) – Filter by namespace

Return type:

None

class vyra_base.com.AbstractProtocolProvider(protocol)[source]

Bases: ABC

Abstract base class for all protocol providers.

Each protocol (ROS2, Shared Memory, MQTT, etc.) implements this interface to provide consistent publisher/subscriber/server/client/actionServer/actionClient creation across transports.

Parameters:

protocol (ProtocolType)

__init__(protocol)[source]
Parameters:

protocol (ProtocolType)

property protocol: str

Get protocol name.

abstractmethod async check_availability()[source]

Check if protocol is available (libraries installed, services running).

Returns:

True if protocol is available

Return type:

bool

abstractmethod async initialize(config=None)[source]

Initialize the protocol provider.

Parameters:

config (Optional[Dict[str, Any]]) – Optional configuration dictionary

Returns:

True if initialization successful

Return type:

bool

abstractmethod async shutdown()[source]

Shutdown the protocol provider and cleanup resources.

Return type:

None

abstractmethod async create_publisher(name, **kwargs)[source]

Create a publisher interface.

Parameters:
  • name (str) – Publisher name

  • **kwargs – Protocol-specific parameters (message_type, qos, etc.)

Returns:

Created publisher interface

Return type:

VyraPublisher

abstractmethod async create_subscriber(name, subscriber_callback, **kwargs)[source]

Create a subscriber interface.

Parameters:
  • name (str) – Subscriber name

  • subscriber_callback (Optional[Callable]) – Callback for received messages (async)

  • **kwargs – Protocol-specific parameters (message_type, qos, etc.)

Returns:

Created subscriber interface

Return type:

VyraSubscriber

abstractmethod async create_server(name, response_callback, **kwargs)[source]

Create a server interface.

Parameters:
  • name (str) – Server name

  • response_callback (Optional[Callable]) – Callback for requests (async)

  • **kwargs – Protocol-specific parameters (service_type, qos, etc.)

Returns:

Created server interface

Return type:

VyraServer

abstractmethod async create_client(name, **kwargs)[source]

Create a client interface.

Parameters:
  • name (str) – Client name

  • **kwargs – Protocol-specific parameters (service_type, qos, etc.)

Returns:

Created client interface

Return type:

VyraClient

abstractmethod async create_action_server(name, handle_goal_request=None, handle_cancel_request=None, execution_callback=None, **kwargs)[source]

Create an action server interface.

Parameters:
  • name (str) – Action server name

  • handle_goal_request (Optional[Callable]) – Callback to accept/reject goals (async)

  • handle_cancel_request (Optional[Callable]) – Callback to handle cancellations (async)

  • execution_callback (Optional[Callable]) – Callback for goal execution (async)

  • **kwargs – Protocol-specific parameters (action_type, qos, etc.)

Returns:

Created action server interface

Return type:

VyraActionServer

abstractmethod async create_action_client(name, direct_response_callback=None, feedback_callback=None, goal_callback=None, **kwargs)[source]

Create an action client interface.

Parameters:
  • name (str) – Action client name

  • direct_response_callback (Optional[Callable]) – Callback for goal acceptance (async)

  • feedback_callback (Optional[Callable]) – Callback for feedback updates (async)

  • goal_callback (Optional[Callable]) – Callback for final result (async)

  • **kwargs – Protocol-specific parameters (action_type, qos, etc.)

Returns:

Created action client interface

Return type:

VyraActionClient

create_topic_builder(module_name, module_id)[source]

Create a TopicBuilder for consistent naming conventions.

Parameters:
  • module_name (str) – Name of the module (e.g., “v2_modulemanager”)

  • module_id (str) – Unique module ID (e.g., “abc123”)

Return type:

TopicBuilder

Returns:

TopicBuilder instance configured with module info

property is_available: bool

Check if protocol is available.

property is_initialized: bool

Check if provider is initialized.

get_config()[source]

Get current configuration.

Return type:

Dict[str, Any]

update_config(config)[source]

Update configuration.

Return type:

None

Parameters:

config (Dict[str, Any])

require_availability()[source]

Raise exception if protocol is not available.

Return type:

None

require_initialization()[source]

Raise exception if provider is not initialized.

Return type:

None

class vyra_base.com.ProviderRegistry[source]

Bases: object

Central registry for all protocol providers.

Manages provider lifecycle, discovery, and factory methods. Thread-safe singleton pattern.

Providers are keyed by (ProtocolType, module_id). “Own” / default providers are registered with module_id=None. Remote-module providers are registered with their target module_id.

__init__()[source]

Initialize registry (only once).

register_provider(provider, module_id=None, force=False)[source]

Register a protocol provider.

Parameters:
  • provider (AbstractProtocolProvider) – Provider instance to register

  • module_id (Optional[str]) – Target module-ID this provider is configured for. Use None (default) for the “own” provider that publishes / serves on behalf of the local module.

  • force (bool) – If True, replace existing provider for the same key

Return type:

None

get_provider(protocol, module_id=None, require_available=True)[source]

Get provider for a (protocol, module_id) pair.

Lookup order: 1. Exact key (protocol, module_id) 2. If module_id != None and not found: fall back to (protocol, None)

Parameters:
  • protocol (ProtocolType) – Protocol type

  • module_id (Optional[str]) – Target module ID or None for the default provider

  • require_available (bool) – If True, only return if protocol is available

Return type:

Optional[AbstractProtocolProvider]

Returns:

Provider instance or None

get_or_create_provider_for_module(protocol, module_name, module_id)[source]

Return the provider for (protocol, module_id).

If no provider is registered for that specific module_id yet, the default provider (module_id=None) is looked up and registered under the new key automatically – it will be used with an external TopicBuilder injected by the factory at creation time.

Parameters:
  • protocol (ProtocolType) – Protocol type

  • module_name (str) – Human-readable name of the target module

  • module_id (str) – Unique ID of the target module

Return type:

Optional[AbstractProtocolProvider]

Returns:

Provider instance or None if no provider is available at all

list_registered()[source]

Get list of all registered (protocol, module_id) keys.

Return type:

List[Tuple[ProtocolType, Optional[str]]]

list_available()[source]

Get list of available (protocol, module_id) keys.

Return type:

List[Tuple[ProtocolType, Optional[str]]]

is_available(protocol, module_id=None)[source]

Check if a provider for (protocol, module_id) is available.

Return type:

bool

Parameters:
unregister_provider(protocol, module_id=None)[source]

Unregister a provider.

Parameters:
  • protocol (ProtocolType) – Protocol to unregister

  • module_id (Optional[str]) – Module ID of the entry to remove (None = default provider)

Returns:

True if provider was unregistered

Return type:

bool

async initialize_all(config=None)[source]

Initialize all registered providers.

Note: aliased entries (multiple module_ids sharing the same provider instance) will attempt initialization only once per unique instance.

Parameters:

config (Optional[Dict[ProtocolType, Dict]]) – Optional configuration per protocol

Return type:

List[Tuple[ProtocolType, Optional[str]]]

Returns:

List of successfully initialized (protocol, module_id) keys

async shutdown_all()[source]

Shutdown all unique provider instances.

Return type:

None

get_statistics()[source]

Get registry statistics.

Return type:

Dict[str, Any]

class vyra_base.com.TopicBuilder(module_name, module_id, namespace=None, interface_paths=None, enable_interface_loading=True)[source]

Bases: object

Builder for consistent topic/service names across transport protocols.

Features:
  • Validates module names and IDs

  • Ensures naming consistency

  • Supports optional namespace and subsection path levels

  • Protocol-agnostic design

Topic path structure:

{module_name}_{module_id}[/{namespace}]/{function_name}[/{subsection}]

Examples

>>> builder = TopicBuilder("v2_modulemanager", "abc123")
>>> builder.build("get_modules")
'v2_modulemanager_abc123/get_modules'
>>> builder.build("NewsFeed", namespace="feeder")
'v2_modulemanager_abc123/feeder/NewsFeed'
>>> builder.build("run_task", namespace="action", subsection="cancel")
'v2_modulemanager_abc123/action/run_task/cancel'
>>> # Instance-level default namespace (applied to every build() call)
>>> feeder_builder = TopicBuilder("v2_modulemanager", "abc123", namespace="feeder")
>>> feeder_builder.build("NewsFeed")
'v2_modulemanager_abc123/feeder/NewsFeed'
Parameters:
  • module_name (str)

  • module_id (str)

  • namespace (Optional[str])

  • interface_paths (Optional[list[str | Path]])

  • enable_interface_loading (bool)

MODULE_NAME_PATTERN = re.compile('^[a-zA-Z0-9_]+$')
MODULE_ID_PATTERN = re.compile('^[a-zA-Z0-9_]+$')
FUNCTION_NAME_PATTERN = re.compile('^[a-zA-Z0-9_]+$')
SUBSECTION_PATTERN = re.compile('^[a-zA-Z0-9_/\\-]+$')
__init__(module_name, module_id, namespace=None, interface_paths=None, enable_interface_loading=True)[source]

Initialize topic builder with optional interface loading.

Parameters:
  • module_name (str) – Name of the module (e.g., “v2_modulemanager”)

  • module_id (str) – Unique module instance ID (e.g., “abc123” or full hash)

  • namespace (Optional[str]) – Optional default namespace segment applied to every build() call that does not supply an explicit namespace argument. For example, pass namespace="feeder" when creating a TopicBuilder for a feeder publisher so that all feeder topics are automatically scoped under feeder/.

  • interface_paths (Optional[list[str | Path]]) – Optional list of interface base paths. If None, uses InterfacePathRegistry defaults.

  • enable_interface_loading (bool) – Enable dynamic interface loading. Set to False for pure topic naming without interface loading.

Raises:

ValueError – If module_name or module_id contains invalid characters

property module_prefix: str

Get the module prefix (<module_name>_<module_id>).

build(function_name, namespace=None, subsection=None, interface_type=None)[source]

Build a topic/service name.

Path: {module_name}_{module_id}[/{namespace}]/{function_name}[/{subsection}]

If namespace is None the instance-level default namespace (set via the constructor) is used. Pass namespace="" to explicitly suppress the instance default for a single call.

Parameters:
  • function_name (str) – Name of the function/interface

  • namespace (Optional[str]) – Optional namespace segment (overrides instance default). Omit or pass None to inherit the instance default. Pass "" to suppress the instance default for this call.

  • subsection (Optional[str]) – Optional trailing subsection segment

  • interface_type (Optional[InterfaceType]) – Type of interface (for logging/validation)

Return type:

str

Returns:

Complete topic/service name

Examples

>>> builder.build("get_modules")
'v2_modulemanager_abc123/get_modules'
>>> builder.build("NewsFeed", namespace="feeder")
'v2_modulemanager_abc123/feeder/NewsFeed'
>>> builder.build("run_task", namespace="action", subsection="cancel")
'v2_modulemanager_abc123/action/run_task/cancel'
parse(topic_name)[source]

Parse a topic name back into components.

Understands both the 2-level path (module/function) and the 3-level path (module/namespace/function or module/function/subsection) and the 4-level path (module/namespace/function/subsection).

Disambiguation rule for 2 remaining segments: the second segment is treated as subsection (not namespace), preserving backward compatibility with the previous subaction field. To get a namespace-only path (without subsection), use 3 segments: build the topic with only namespace set.

Path: {module_name}_{module_id}[/{namespace}]/{function_name}[/{subsection}]

Parameters:

topic_name (str) – Complete topic name to parse

Return type:

TopicComponents

Returns:

TopicComponents with parsed values

Raises:

ValueError – If topic name doesn’t match expected format

Examples

>>> builder.parse("v2_modulemanager_abc123/get_modules")
TopicComponents(module_name='v2_modulemanager', module_id='abc123',
                function_name='get_modules', namespace=None, subsection=None)
>>> builder.parse("v2_modulemanager_abc123/feeder/NewsFeed")
TopicComponents(module_name='v2_modulemanager', module_id='abc123',
                function_name='NewsFeed', namespace='feeder', subsection=None)
>>> builder.parse("v2_modulemanager_abc123/action/run_task/cancel")
TopicComponents(module_name='v2_modulemanager', module_id='abc123',
                function_name='run_task', namespace='action', subsection='cancel')
validate(topic_name)[source]

Validate if a topic name follows the naming convention.

Parameters:

topic_name (str) – Topic name to validate

Return type:

bool

Returns:

True if valid, False otherwise

build_topic_name(function_name, namespace=None, subsection=None, interface_type=None)[source]

Build only the topic/service name (without loading interface).

Pure naming function - works without ROS2 or interface loading. Useful for slim mode or when interface type is not needed.

Parameters:
  • function_name (str) – Name of the function/interface

  • namespace (Optional[str]) – Optional namespace segment (overrides instance default)

  • subsection (Optional[str]) – Optional trailing subsection segment

  • interface_type (Optional[InterfaceType]) – Type of interface (for logging only)

Return type:

str

Returns:

Complete topic/service name

Examples

>>> builder.build_topic_name("get_modules")
'v2_modulemanager_abc123/get_modules'
load_interface_type(function_name, protocol='ros2')[source]

Load interface type for a function (ROS2 or Protobuf).

Dynamically loads interface class/module without compile-time imports. Requires interface loader to be enabled.

Parameters:
  • function_name (str) – Name of function (from metadata)

  • protocol (str) – Protocol to use: “ros2”, “zenoh”, “redis”, “uds” - “ros2” → loads .srv/.msg/.action interfaces - others → loads .proto → *_pb2.py modules

Return type:

Union[type, Any, None]

Returns:

Interface type/module, or None if not found or loader disabled

Examples

>>> # Load ROS2 service interface
>>> srv_type = builder.load_interface_type("get_interface_list", "ros2")
>>> # Load protobuf for Zenoh
>>> pb_module = builder.load_interface_type("get_interface_list", "zenoh")
build_with_interface(function_name, namespace=None, subsection=None, interface_type=None, protocol='ros2')[source]

Build topic name AND load interface type in one call.

Combines topic naming with interface loading for convenience. Returns both the topic name and the loaded interface.

Parameters:
  • function_name (str) – Name of the function/interface

  • subaction – Optional subaction

  • interface_type (Optional[InterfaceType]) – Type of interface (InterfaceType) for logging

  • protocol (str) – Protocol for interface loading

  • namespace (str | None)

  • subsection (str | None)

Return type:

tuple[str, Union[type, Any, None]]

Returns:

Tuple of (topic_name, interface_type_or_module) interface_type_or_module is None if loading failed/disabled

Examples

>>> # Build topic and load ROS2 service
>>> topic, service_type = builder.build_with_interface(
...     "get_interface_list",
...     interface_type=InterfaceType.SERVER,
...     protocol="ros2"
... )
>>> print(topic)  # 'v2_modulemanager_abc123/get_interface_list'
>>> print(service_type)  # <class '...VBASEGetInterfaceList'>
>>> # Build topic and load protobuf for Zenoh
>>> topic, pb_module = builder.build_with_interface(
...     "state_feed",
...     interface_type=InterfaceType.SPEAKER,
...     protocol="zenoh"
... )
get_interface_loader()[source]

Get the interface loader instance.

Return type:

Optional[InterfaceLoader]

Returns:

InterfaceLoader instance, or None if disabled

reload_interface_metadata()[source]

Force reload of interface metadata from config files.

Clears cache and reloads all JSON metadata. Only works if interface loader is enabled.

Return type:

None

get_loaded_interfaces_stats()[source]

Get statistics about loaded interfaces.

Return type:

dict[str, int]

Returns:

Dict with cache statistics, or empty dict if loader disabled

class vyra_base.com.TopicComponents(module_name, module_id, function_name, namespace=None, subsection=None)[source]

Bases: object

Components of a parsed topic name.

Represents the structured parts of a VYRA topic path: {module_name}_{module_id}[/{namespace}]/{function_name}[/{subsection}]

Parameters:
  • module_name (str)

  • module_id (str)

  • function_name (str)

  • namespace (str | None)

  • subsection (str | None)

module_name: str
module_id: str
function_name: str
namespace: str | None = None
subsection: str | None = None
__init__(module_name, module_id, function_name, namespace=None, subsection=None)
Parameters:
  • module_name (str)

  • module_id (str)

  • function_name (str)

  • namespace (str | None)

  • subsection (str | None)

Return type:

None

vyra_base.com.TopicInterfaceType

alias of InterfaceType

vyra_base.com.create_topic_builder(module_name, module_id, namespace=None)[source]

Factory function to create a TopicBuilder instance.

Parameters:
  • module_name (str) – Name of the module

  • module_id (str) – Unique module instance ID

  • namespace (Optional[str]) – Optional default namespace applied to every build() call (e.g. "feeder" for feeder publishers).

Return type:

TopicBuilder

Returns:

Configured TopicBuilder instance

vyra_base.com.build_topic(module_name, module_id, function_name, namespace=None, subsection=None)[source]

Build a topic name without creating a builder instance.

Convenience function for one-off topic generation.

Parameters:
  • module_name (str) – Name of the module

  • module_id (str) – Unique module instance ID

  • function_name (str) – Name of the function/interface

  • namespace (Optional[str]) – Optional namespace segment

  • subsection (Optional[str]) – Optional trailing subsection segment

Return type:

str

Returns:

Complete topic name

vyra_base.com.parse_topic(topic_name)[source]

Parse a topic name without a builder instance.

Parameters:

topic_name (str) – Topic name to parse

Return type:

TopicComponents

Returns:

TopicComponents with parsed values

class vyra_base.com.InterfacePathRegistry[source]

Bases: object

Thread-safe singleton registry for interface base paths.

Stores paths to interface directories containing: - /config/.json - Interface metadata - /server/.srv - ROS2 service definitions - /publisher/.msg - ROS2 message definitions - /actionServer/.action - ROS2 action definitions - /proto/*.proto - Protocol Buffer definitions - /proto/*_pb2.py - Generated Python protobuf modules

Default path points to vyra_base/interfaces (embedded package). Modules can register additional paths via set_interface_paths().

Examples

>>> registry = InterfacePathRegistry.get_instance()
>>> registry.set_interface_paths([
...     "/path/to/interfaces1",
...     "/path/to/interfaces2"
... ])
>>> paths = registry.get_interface_paths()
__init__()[source]

Private constructor - use get_instance() instead.

classmethod get_instance()[source]

Get singleton instance (thread-safe).

Return type:

InterfacePathRegistry

Returns:

InterfacePathRegistry singleton instance

set_interface_paths(paths)[source]

Set interface base paths (replaces existing paths).

Parameters:

paths (list[str | Path]) – List of interface directory paths

Raises:

ValueError – If any path doesn’t exist or isn’t readable

Return type:

None

Examples

>>> registry.set_interface_paths([
...     "/path/to/interfaces1",
...     "/path/to/interfaces2"
... ])
add_interface_path(path)[source]

Add an interface path to existing paths (without replacing).

Parameters:

path (str | Path) – Interface directory path to add

Raises:

ValueError – If path doesn’t exist or isn’t readable

Return type:

None

get_interface_paths()[source]

Get list of registered interface paths.

Return type:

list[Path]

Returns:

Copy of interface paths list (thread-safe)

clear_interface_paths()[source]

Clear all interface paths and reset to default.

Return type:

None

get_config_paths()[source]

Get paths to config directories (/config subdirs of interface paths).

Return type:

list[Path]

Returns:

List of existing config directory paths

get_proto_paths()[source]

Get paths to proto directories (/proto subdirs of interface paths).

Return type:

list[Path]

Returns:

List of existing proto directory paths

vyra_base.com.get_interface_registry()[source]

Convenience function to get singleton instance.

Return type:

InterfacePathRegistry

Returns:

InterfacePathRegistry singleton

class vyra_base.com.InterfaceLoader(interface_paths=None, auto_update_paths=True)[source]

Bases: object

Dynamic loader for ROS2 and Protocol Buffer interfaces.

Provides runtime loading of interface type classes from string names, eliminating compile-time import dependencies.

Features:
  • ROS2 interface loading via rosidl_runtime_py

  • Protocol Buffer module loading (*_pb2.py)

  • Interface metadata loading from JSON configs

  • Caching for performance

  • Graceful fallback when ROS2 unavailable

Examples

>>> loader = InterfaceLoader()

# Load ROS2 service interface >>> srv_type = loader.load_ros2_interface(“your_module_interfaces/srv/VBASEGetInterfaceList”) >>> print(srv_type) # <class ‘your_module_interfaces.srv.VBASEGetInterfaceList’>

# Load Protocol Buffer interface >>> pb_type = loader.load_protobuf_interface(“VBASEGetInterfaceList”) >>> print(pb_type) # <module ‘VBASEGetInterfaceList_pb2’>

# Load by function name from metadata >>> interface = loader.get_interface_for_function(“get_interface_list”, protocol=”ros2”)

Parameters:
  • interface_paths (Optional[list[Path]])

  • auto_update_paths (bool)

__init__(interface_paths=None, auto_update_paths=True)[source]

Initialize interface loader.

Parameters:
  • interface_paths (Optional[list[Path]]) – List of base interface paths. If None, uses registry default.

  • auto_update_paths (bool) – If True, automatically updates environment paths for discovery

load_ros2_interface(interface_path)[source]

Load ROS2 interface type from string path.

Uses rosidl_runtime_py.utilities to dynamically load interface classes without compile-time imports.

Parameters:

interface_path (str) – Interface path in format “package/type/Name” Examples: - “std_msgs/msg/String” - “your_module_interfaces/srv/VBASEGetInterfaceList” - “action_msgs/action/MyAction”

Return type:

Optional[type]

Returns:

Interface type class, or None if unavailable

Examples

>>> loader.load_ros2_interface("std_msgs/msg/String")
<class 'std_msgs.msg._string.String'>
load_protobuf_interface(interface_name)[source]

Load Protocol Buffer interface module (*_pb2.py).

Searches proto/ directories in registered interface paths for generated Python protobuf modules.

Parameters:

interface_name (str) – Base name of interface (without _pb2 suffix) Examples: - “VBASEGetInterfaceList” → VBASEGetInterfaceList_pb2.py - “VBASEStateFeed” → VBASEStateFeed_pb2.py

Return type:

Optional[Any]

Returns:

Loaded protobuf module, or None if not found

Examples

>>> loader.load_protobuf_interface("VBASEGetInterfaceList")
<module 'VBASEGetInterfaceList_pb2'>
load_interface_metadata(reload=False)[source]

Load interface metadata from JSON config files.

Scans config/ directories in registered interface paths and loads all *.json files containing interface metadata.

Parameters:

reload (bool) – Force reload even if cached

Return type:

dict[str, dict]

Returns:

Dict mapping function_name → metadata dict

Examples

>>> metadata = loader.load_interface_metadata()
>>> print(metadata.keys())
dict_keys(['get_interface_list', 'health_check', 'initialize', ...])
>>> meta = metadata['get_interface_list']
>>> print(meta['type'])  # 'callable'
>>> print(meta['filetype'])  # ['VBASEGetInterfaceList.srv', 'VBASEGetInterfaceList.proto']
get_interface_for_function(function_name, protocol='ros2')[source]

Load interface type for a function by name.

Looks up function in metadata, extracts appropriate interface path based on protocol, and loads the interface.

Parameters:
  • function_name (str) – Name of function (from functionname in metadata)

  • protocol (str) – Protocol to use: “ros2”, “zenoh”, “redis”, “uds” (ros2 uses .srv/.msg/.action, others use .proto)

Return type:

Union[type, Any, None]

Returns:

Interface type/module, or None if not found

Examples

>>> # Load ROS2 service interface
>>> srv = loader.get_interface_for_function("get_interface_list", protocol="ros2")
>>> # Load protobuf for Zenoh
>>> pb = loader.get_interface_for_function("get_interface_list", protocol="zenoh")
clear_cache()[source]

Clear all cached interfaces and metadata.

Return type:

None

get_cache_stats()[source]

Get cache statistics.

Return type:

dict[str, int]

Returns:

Dict with cache sizes

class vyra_base.com.BaseFeeder[source]

Bases: IFeeder

Concrete base class for all VYRA feeders.

Implements IFeeder and provides:

  • Protocol auto-resolution via FeederConfigResolver — the transport protocol is read from the module’s interface config JSON (functionname matched against feeder_name).

  • Pre-start buffer — messages fed before start() are queued and flushed automatically.

  • Metricsfeed_count, error_count, last_feed_at.

  • Health checkis_alive() probes the backing publisher.

  • Retry policy — configurable max_retries and retry_delay.

Abstract class — subclasses set _feederName, _type, optionally _interface_paths.

__init__()[source]

Initialize the BaseFeeder.

Return type:

None

get_feeder_name()[source]

Return the feeder name (= functionname in interface config).

Return type:

str

get_protocol()[source]

Return the resolved transport protocol, or None before start.

Return type:

Optional[str]

is_alive()[source]

Return True if the publisher is set and available.

Return type:

bool

is_ready()[source]

Return True after start() has completed successfully.

Return type:

bool

get_buffer()[source]

Return the pre-start message buffer.

Return type:

deque

property feed_count: int

Number of successfully published messages.

property error_count: int

Number of publish errors.

property last_feed_at: datetime | None

Timestamp of the last successful feed() call.

property debounced_duplicate_count: int

Number of duplicate messages suppressed by debouncing.

register_condition(condition_function, *, name=None, tag='news', execution_point='ALWAYS', success_message=None, failure_message=None)[source]

Register a synchronous bool-returning condition callback.

Return type:

str

Parameters:
  • condition_function (Callable[[dict[str, Any]], bool])

  • name (str | None)

  • tag (str)

  • execution_point (Literal['BEFORE', 'DURING', 'AFTER', 'ALWAYS'])

  • success_message (str | None)

  • failure_message (str | None)

unregister_condition(name)[source]

Remove a previously registered condition callback by name.

Return type:

bool

Parameters:

name (str)

evaluate_conditions(context, *, rule_names=None, tags=None, execution_point=None)[source]

Evaluate registered conditions and return resulting messages.

By default all registered conditions are evaluated. Use rule_names and/or tags to limit evaluation to a subset.

Return type:

list[tuple[str, str]]

Parameters:
set_interface_paths(paths)[source]

Override the interface paths used for protocol resolution.

Called by a VyraEntity after constructing the feeder to provide module-specific config paths.

Parameters:

paths (Union[Sequence[str], list[Path]]) – List of directory or JSON file paths.

Return type:

None

async create(loggingOn=False)[source]

Create the publisher using protocol resolved from interface config.

Resolution order:

  1. FeederConfigResolver.resolve(feeder_name, interface_paths) — reads the module’s JSON config, maps tags to a protocol.

  2. If no config found (or interface_paths empty): fall back to InterfaceFactory.create_publisher with the default chain [ZENOH, ROS2, REDIS, UDS].

Raises:

FeederException – If no protocol is available at all.

Parameters:

loggingOn (bool) – Emit feeder messages also to the base logger.

Return type:

None

async start()[source]

Start the feeder (implements IFeeder).

Subclasses may override to add extra initialisation before calling await super().start().

Return type:

None

async feed(msg)[source]

Enqueue msg for publishing (implements IFeeder).

If the feeder is not yet ready the message is buffered. Otherwise the publish path is executed synchronously (event-loop aware).

Parameters:

msg (Any) – Message to publish.

Return type:

None

feed_sync(msg)[source]

Sync version of feed() for use in sync contexts.

Return type:

None

Parameters:

msg (Any)

add_handler(handler)[source]

Attach a CommunicationHandler.

Parameters:

handler (CommunicationHandler) – Handler instance to attach.

Returns:

True if added, False if already present.

Return type:

bool

add_handler_class(handler_class)[source]

Register a handler class to be instantiated during create().

Parameters:

handler_class (Type[CommunicationHandler]) – Handler class (must subclass CommunicationHandler).

Return type:

None

class vyra_base.com.StateFeeder(node, module_entity, loggingOn=False)[source]

Bases: BaseFeeder

Responsible for loading a VYRA Handler and feeding StateEntry elements to this handler.

Parameters:
  • node (Optional[Any]) – The VyraNode instance associated with this feeder.

  • module_entity (ModuleEntry) – Module configuration entry.

  • loggingOn (bool) – Flag to enable or disable logging next to feeding. Defaults to False.

Raises:

FeederException – If the publisher cannot be created.

Variables:
  • _feederName – Name of the feeder.

  • _doc – Documentation string for the feeder.

  • _level – Logging level.

  • _node – VyraNode instance.

  • _module_entity – Module configuration.

__init__(node, module_entity, loggingOn=False)[source]

Initializes a StateFeeder instance for sending changes in state of a module.

Parameters:
  • node (Optional[Any]) – The VyraNode instance (optional, None if ROS2 unavailable).

  • module_entity (ModuleEntry) – Module configuration entry.

  • loggingOn (bool) – Flag to enable or disable logging next to feeding. Defaults to False.

Raises:

FeederException – If the publisher cannot be created.

async start()[source]

Starts the feeder by initializing handlers.

Automatically resolves the transport protocol from the module’s interface config (reads StateFeed entry, picks tags).

Return type:

None

async feed(stateElement)[source]

Feed a state entry to the feeder.

Validates the input type, then delegates to feed() which calls _prepare_entry_for_publish() followed by MessageMapper for protocol-aware conversion.

Parameters:

stateElement (StateEntry) – The state entry to feed.

Raises:

FeederException – If the provided element is not of type StateEntry.

Return type:

None

feed_sync(msg)[source]

Sync version of feed() for use in sync contexts.

Return type:

None

Parameters:

msg (Any)

class vyra_base.com.NewsFeeder(node, module_entity, loggingOn=False)[source]

Bases: BaseFeeder

Collection of the news messages.

Parameters:
  • node (Optional[Any]) – The VyraNode instance associated with this feeder.

  • module_entity (ModuleEntry) – Module configuration entry.

  • loggingOn (bool) – Flag to enable or disable logging next to feeding. Defaults to False.

Raises:

FeederException – If the publisher cannot be created.

__init__(node, module_entity, loggingOn=False)[source]

Initialize the BaseFeeder.

Parameters:
async start()[source]

Starts the feeder by initializing handlers.

Automatically resolves the transport protocol from the module’s interface config (reads NewsFeed entry, picks tags).

Return type:

None

async feed(newsElement)[source]

Feed a news entry to the feeder.

Normalises the input to a NewsEntry, then delegates to feed() which calls _prepare_entry_for_publish() followed by MessageMapper for protocol-aware conversion.

Parameters:

newsElement (Union[NewsEntry, str, list]) – The news entry to be fed. Can be a string or list of strings to be processed into a NewsEntry, or a NewsEntry object.

Raises:

FeederException – If the type of newsElement is not supported.

Return type:

None

feed_sync(msg)[source]

Sync version of feed() — normalises input to NewsEntry before delegating.

Return type:

None

Parameters:

msg (Any)

register_news_condition(condition_function, *, name=None, execution_point='ALWAYS', success_message=None, failure_message=None, tag='news')[source]

Register a synchronous bool condition for tracked feeder messages.

Return type:

str

Parameters:
  • condition_function (Callable[[dict[str, Any]], bool])

  • name (str | None)

  • execution_point (Literal['BEFORE', 'DURING', 'AFTER', 'ALWAYS'])

  • success_message (str | None)

  • failure_message (str | None)

  • tag (str)

monitor(*, tag='news', label=None, severity='INFO', entity=None, during_interval_seconds=0.05)[source]

Return a runtime-monitoring decorator bound to this feeder.

Return type:

Callable

Parameters:
  • tag (str)

  • label (str | None)

  • severity (str)

  • entity (Any)

  • during_interval_seconds (float)

async evaluate_tracked_conditions(context)[source]

Evaluate registered conditions and publish resulting tracker messages.

Return type:

None

Parameters:

context (dict[str, Any])

evaluate_tracked_conditions_sync(context)[source]

Sync variant of evaluate_tracked_conditions().

Return type:

None

Parameters:

context (dict[str, Any])

build_newsfeed(*args)[source]

Build a well structured newsfeed entry from plain text and module information.

Parameters:

args (Any) – The arguments to be processed into a news entry. Can be a string or list of strings.

Returns:

A structured news entry containing the message, level, timestamp, UUID, module name, module ID, module template, and type.

Return type:

NewsEntry

Raises:

FeederException – If the type of the message level is not valid.

class vyra_base.com.ErrorFeeder(node, module_entity, loggingOn=True)[source]

Bases: BaseFeeder

Collection of the error messages.

Parameters:
  • node (Optional[Any]) – The VyraNode instance associated with this feeder.

  • module_entity (ModuleEntry) – The module configuration entry.

  • loggingOn (bool) – Flag to enable or disable logging next to feeding. Defaults to False.

Raises:

FeederException – If the publisher cannot be created.

__init__(node, module_entity, loggingOn=True)[source]

Initialize the BaseFeeder.

Parameters:
async start()[source]

Starts the feeder by initializing handlers.

Automatically resolves the transport protocol from the module’s interface config (reads ErrorFeed entry, picks tags).

Return type:

None

async feed(errorElement)[source]

Feed an error entry to the feeder.

Normalises the input to an ErrorEntry, then delegates to feed() which calls _prepare_entry_for_publish() followed by MessageMapper for protocol-aware conversion.

Parameters:

errorElement (Union[ErrorEntry, dict]) – The error entry to be fed. Can be a dictionary with error details or an ErrorEntry object.

Raises:

FeederException – If the type of errorElement is neither a dict nor an ErrorEntry.

Return type:

None

feed_sync(errorElement)[source]

Sync version of feed() with ErrorEntry normalization.

Return type:

None

Parameters:

errorElement (ErrorEntry | dict)

monitor(*, tag='error', label=None, severity='WARNING', entity=None, during_interval_seconds=0.05)[source]

Return an exception-monitoring decorator bound to this feeder.

Return type:

Callable

Parameters:
  • tag (str)

  • label (str | None)

  • severity (str)

  • entity (Any)

  • during_interval_seconds (float)

build_errorfeed(errorDict)[source]

Build an error entry from the given keyword arguments.

Parameters:

errorDict (dict) –

A dictionary containing error details. Keys are:

  • code: int16 - Error code (default: 0x00000000)

  • uuid: UUID - Unique identifier for the error (default: a new UUID)

  • description: str - Description of the error (default: ‘’)

  • solution: str - Suggested solution for the error (default: ‘’)

  • miscellaneous: str - Additional information (default: ‘’)

  • level: ErrorEntry.ERROR_LEVEL - Level of the error (default: ErrorEntry.ERROR_LEVEL.MINOR_FAULT)

Returns:

An instance of vyra_base.defaults.entries.ErrorEntry populated with the provided details.

Return type:

ErrorEntry

class vyra_base.com.ROS2Publisher(publisherInfo, node)[source]

Bases: object

Base class for ROS2 publishers.

This class is intended to be factory-created to implement a specific publisher for a topic.

Parameters:
  • publisherInfo (PublisherInfo)

  • node (VyraNode)

__init__(publisherInfo, node)[source]

Initialize the ROS2Publisher.

Parameters:
  • publisherInfo (PublisherInfo) – PublisherInfo instance containing publisher configuration.

  • node (VyraNode) – The ROS2 node to which the publisher belongs.

Return type:

None

create_publisher()[source]

Create a publisher in the ROS2 node.

This method should be called to register the publisher within the ROS2 node.

Raises:

ValueError – If publisher type or name is not provided.

Return type:

None

publish(msg)[source]

Publish a message to the topic.

This method should be overridden in subclasses to provide specific functionality.

Parameters:

msg (Any) – The message to publish.

Raises:

ValueError – If publisher type, name, or publisher instance is not set.

Return type:

None

destroy()[source]

Destroy the publisher and clean up resources.

This method should be called to properly shut down the publisher.

Return type:

None

class vyra_base.com.ROS2Subscriber(subscriptionInfo, node)[source]

Bases: object

Base class for ROS2 subscriptions.

This class is intended to be factory-created to implement specific subscriptions for topics.

Parameters:
  • subscriptionInfo (SubscriberInfo)

  • node (VyraNode)

__init__(subscriptionInfo, node)[source]

Initialize the ROS2Subscriber.

Parameters:
  • subscriptionInfo (SubscriberInfo) – Information about the subscription.

  • node (VyraNode) – The ROS2 node to attach the subscription to.

Return type:

None

create_subscription()[source]

Create and register the subscription with the ROS2 node.

Raises:

ValueError – If the subscription type or name is not provided.

Return type:

None

remove_subscription()[source]

Remove the subscription from the ROS2 node.

This method will destroy the subscription if it exists.

Return type:

None

callback(msg)[source]

Callback method for the subscription.

This method should be overridden in subclasses to provide specific functionality.

Parameters:

msg (Any) – The message received from the subscription.

Return type:

None

class vyra_base.com.VyraNode(node_settings)[source]

Bases: Node

Vyra node class.

Parameters:

node_settings (NodeSettings) – NodeSettings object containing the node’s settings.

__init__(node_settings)[source]
Parameters:

node_settings (NodeSettings)

Return type:

None

set_reload()[source]

Set the reload event to notify that the node settings have changed.

Return type:

None

property node_settings: NodeSettings

Get the node settings.

Returns:

NodeSettings object containing the node’s settings.

Return type:

NodeSettings

class vyra_base.com.CheckerNode(enable_rosout=False)[source]

Bases: Node

Node to check the availability of other nodes.

Note: Uses enable_rosout=False to avoid SROS2 permission issues with the /rosout topic during node availability checks. :param enable_rosout: Whether to enable the rosout logger. :type enable_rosout: bool

Parameters:

enable_rosout (bool)

__init__(enable_rosout=False)[source]
Parameters:

enable_rosout (bool)

Return type:

None

is_node_available(node_name)[source]

Check if a node with the given name is available.

Parameters:

node_name (str) – Name of the node to check.

Returns:

True if the node is available, False otherwise.

Return type:

bool

static check_node_name(node_name)[source]

Check if the node name is valid.

Parameters:

node_name (str) – Name of the node to check.

Returns:

True if the node name is valid, False otherwise.

Return type:

bool

class vyra_base.com.NodeSettings(name='Vyra_node', parameters=<factory>)[source]

Bases: object

Settings for a Vyra node.

Variables:
  • name (str) – Name of the node.

  • parameters (dict) – Dictionary of parameters for the node.

Parameters:
name: str = 'Vyra_node'
parameters: dict
__init__(name='Vyra_node', parameters=<factory>)
Parameters:
Return type:

None

class vyra_base.com.ROS2ActionClient(actionInfo, node)[source]

Bases: object

Base class for ROS2 action client.

This class will be factory created to call specific action functionality.

Parameters:
  • actionInfo (ActionClientInfo)

  • node (VyraNode)

__init__(actionInfo, node)[source]

Initialize the ROS2ActionClient.

Parameters:
  • actionInfo (ActionClientInfo) – Information about the action.

  • node (VyraNode) – The ROS2 node.

Return type:

None

create_action_client()[source]

Create an action client in the ROS2 node.

This method should be called to register the action client with the ROS2 node.

Return type:

None

send_goal(order)[source]

Send a goal to the action server.

Parameters:

order (Any) – The order to send as part of the goal.

Raises:

ValueError – If the action client has not been created.

Returns:

A future representing the goal request.

Return type:

Future

async send_goal_async(goal, feedback_callback=None)[source]

Send a goal asynchronously and wait for result.

Parameters:
  • goal (Any) – The goal message to send.

  • feedback_callback (Optional[Callable]) – Optional callback for feedback.

Raises:

ValueError – If the action client has not been created.

Returns:

The goal handle.

Return type:

Any

goal_response_callback(future)[source]

Callback for the goal response.

Parameters:

future (Future) – The future containing the goal handle.

Return type:

None

get_result_callback(future)[source]

Callback for receiving the result of the action.

Parameters:

future (Future) – The future containing the result.

Return type:

None

destroy()[source]

Destroy the action client and cleanup resources.

Return type:

None

class vyra_base.com.ROS2ActionServer(actionInfo, node)[source]

Bases: object

Base class for ROS2 actions.

This class is intended to be factory-created to implement specific action functionality.

Parameters:
  • actionInfo (ActionServerInfo)

  • node (VyraNode)

__init__(actionInfo, node)[source]

Initialize the ROS2ActionServer.

Parameters:
  • action (ActionServerInfo) – The action configuration.

  • node (VyraNode) – The ROS2 node to attach the action server to.

  • actionInfo (ActionServerInfo)

Return type:

None

create_action_server()[source]

Create and register the action server with the ROS2 node.

This method should be called to register the action server with the ROS2 node.

Return type:

None

execute_callback(goal_handle)[source]

Execute the action callback.

This method should be overridden in subclasses to provide specific functionality.

Parameters:

goal_handle (Any) – The goal handle for the action.

Return type:

None

goal_callback(goal_handle)[source]

Handle the goal callback.

This method should be overridden in subclasses to provide specific functionality.

Parameters:

goal_handle (Any) – The goal handle for the action.

Return type:

GoalResponse

cancel_callback(goal_handle)[source]

Handle the cancel callback.

Returns CancelResponse.ACCEPT if the cancel request is accepted, CancelResponse.REJECT otherwise.

Return type:

Any

destroy()[source]

Destroy the action server and cleanup resources.

Return type:

None

class vyra_base.com.ROS2ServiceClient(serviceInfo, node)[source]

Bases: object

Base class for ROS2 services.

This class is intended to be factory-created to implement specific service functionality.

Parameters:
  • serviceInfo (ServiceClientInfo)

  • node (VyraNode)

__init__(serviceInfo, node)[source]

Initialize the ROS2ServiceClient.

Parameters:
  • serviceInfo (ServiceClientInfo) – Information about the service.

  • node (VyraNode) – ROS2 node instance.

Return type:

None

property service_info: ServiceClientInfo

Get the service information.

async create_service_caller()[source]

Create a service caller in the ROS2 node.

This method should be called to register the service caller with the ROS2 node.

Raises:
  • ValueError – If the service type or name is not provided.

  • TimeoutError – If the service is not available within the timeout period.

Return type:

None

async send(**kwargs)

Wrapper for asynchronous functions.

Parameters:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Returns:

Result of the wrapped function.

destroy_service_caller()[source]

Destroy the service caller.

Return type:

None

class vyra_base.com.ROS2ServiceServer(serviceInfo, node, async_loop=None)[source]

Bases: object

Base class for ROS2 services.

This class is intended to be factory-created to implement specific service functionality.

Parameters:
  • serviceInfo (ServiceServerInfo)

  • node (VyraNode)

__init__(serviceInfo, node, async_loop=None)[source]

Initialize the ROS2ServiceServer.

Parameters:
  • serviceInfo (ServiceServerInfo) – Information about the service.

  • node (VyraNode) – The ROS2 node to attach the service to.

  • async_loop (Any, Optional) – Optional asyncio event loop.

Return type:

None

property service_info: ServiceServerInfo

Get the service information.

create_service(callback, async_loop=None)[source]

Create and register a service in the ROS2 node.

Parameters:
  • callback (Callable) – The callback function to handle service requests.

  • async_loop (AbstractEventLoop) – Optional asyncio event loop.

Raises:
  • TypeError – If the callback is not callable.

  • ValueError – If the service type or name is not provided.

Return type:

None

destroy_service()[source]

Destroy the service in the ROS2 node.

This method will remove the service if it exists.

Return type:

None

run_async_in_thread(coro, timeout=None)[source]

Execute an async coroutine in a separate thread with its own event loop.

Creates a new thread with a dedicated asyncio event loop to run async code from synchronous ROS2 service callbacks.

Parameters:
  • coro – Coroutine to execute.

  • timeout – Optional timeout in seconds.

Returns:

Result from coroutine execution.

class vyra_base.com.ZenohProvider(module_name, module_id, protocol=ProtocolType.ZENOH)[source]

Bases: AbstractProtocolProvider

Protocol provider for Zenoh transport.

Features: - Efficient Pub/Sub with zero-copy capabilities - Query/Reply for request-response patterns - Router-based scalability - Built-in discovery and fault tolerance - Multi-protocol support (TCP, UDP, shared memory)

Requirements: - eclipse-zenoh Python package - Zenoh router (typically Docker service)

Example

>>> # Initialize provider
>>> provider = ZenohProvider(ProtocolType.ZENOH)
>>>
>>> if await provider.check_availability():
...     await provider.initialize(config={
...         "mode": "client",
...         "connect": ["tcp/zenoh-router:7447"]
...     })
...
...     # Create callable (query/reply)
...     async def handle_request(req):
...         return {"result": req["value"] * 2}
...
...     callable = await provider.create_callable(
...         "/calculate",
...         handle_request
...     )
...
...     # Create publisher (pub/sub)
...     publisher = await provider.create_publisher("/sensor_data")
Parameters:
__init__(module_name, module_id, protocol=ProtocolType.ZENOH)[source]

Initialize Zenoh provider.

Parameters:
  • protocol (ProtocolType) – Protocol type (must be ZENOH)

  • module_name (str)

  • module_id (str)

async check_availability()[source]

Check if Zenoh is available.

Returns:

True if zenoh-python is installed

Return type:

bool

async initialize(config=None)[source]

Initialize Zenoh provider and open session.

Parameters:

config (Optional[Dict[str, Any]]) – Configuration dictionary: - mode: “peer”, “client”, or “router” (default: “client”) - connect: List of endpoints (default: [“tcp/zenoh-router:7447”]) - listen: List of listen endpoints - format: Serialization format (default: “json”)

Returns:

True if initialization successful

Return type:

bool

async shutdown()[source]

Shutdown Zenoh provider and close session.

Return type:

None

async create_publisher(name, topic_builder=None, message_type=None, **kwargs)[source]

Create Zenoh Publisher.

Parameters:
  • name (str) – Publisher name

  • topic_builder (Optional[TopicBuilder]) – TopicBuilder instance

  • message_type (Optional[type]) – Message type class

  • **kwargs – Additional publisher options

Return type:

VyraPublisher

Returns:

ZenohPublisherImpl instance

async create_subscriber(name, topic_builder=None, subscriber_callback=None, message_type=None, **kwargs)[source]

Create Zenoh Subscriber.

Parameters:
  • name (str) – Subscriber name

  • topic_builder (Optional[TopicBuilder]) – TopicBuilder instance

  • subscriber_callback (Optional[Callable]) – Async callback for received messages

  • message_type (Optional[type]) – Message type class

  • **kwargs – Additional subscriber options

Return type:

VyraSubscriber

Returns:

ZenohSubscriberImpl instance

async create_server(name, topic_builder=None, response_callback=None, service_type=None, **kwargs)[source]

Create Zenoh Server (Queryable).

Parameters:
  • name (str) – Server name

  • topic_builder (Optional[TopicBuilder]) – TopicBuilder instance

  • response_callback (Optional[Callable]) – Async callback for handling requests

  • service_type (Optional[type]) – Service type class

  • **kwargs – Additional server options

Return type:

VyraServer

Returns:

ZenohServerImpl instance

async create_client(name, topic_builder=None, service_type=None, request_callback=None, **kwargs)[source]

Create Zenoh Client (Query sender).

Parameters:
  • name (str) – Client name

  • topic_builder (Optional[TopicBuilder]) – TopicBuilder instance (uses provider’s default if omitted)

  • service_type (Optional[type]) – Optional service type class (ignored in Zenoh – schema-less)

  • request_callback (Optional[Callable]) – Optional async callback for responses

  • **kwargs – Additional client options

Return type:

VyraClient

Returns:

ZenohClientImpl instance

async create_action_server(name, topic_builder=None, handle_goal_request=None, handle_cancel_request=None, execution_callback=None, action_type=None, **kwargs)[source]

Create Zenoh Action Server.

Parameters:
  • name (str) – Action server name

  • topic_builder (Optional[TopicBuilder]) – TopicBuilder instance

  • handle_goal_request (Optional[Callable]) – Async callback for goal requests

  • handle_cancel_request (Optional[Callable]) – Async callback for cancel requests

  • execution_callback (Optional[Callable]) – Async callback for goal execution

  • action_type (Optional[type]) – Action type class

  • **kwargs – Additional action server options

Return type:

VyraActionServer

Returns:

ZenohActionServerImpl instance

async create_action_client(name, topic_builder=None, action_type=None, direct_response=None, feedback_callback=None, goal_response_callback=None, direct_response_callback=None, goal_callback=None, **kwargs)[source]

Create Zenoh Action Client.

Parameters:
  • name (str) – Action client name

  • topic_builder (Optional[TopicBuilder]) – TopicBuilder instance

  • action_type (Optional[type]) – Action type class

  • direct_response (Optional[Callable]) – Optional async callback for results

  • feedback_callback (Optional[Callable]) – Optional async callback for feedback

  • goal_response_callback (Optional[Callable]) – Optional async callback for goal responses

  • **kwargs – Additional action client options

  • direct_response_callback (Callable | None)

  • goal_callback (Callable | None)

Return type:

VyraActionClient

Returns:

ZenohActionClientImpl instance

require_initialization()[source]

Ensure provider is initialized.

Raises:

ProviderError – If not initialized

Return type:

None

get_session()[source]

Get the underlying Zenoh session.

Return type:

Optional[ZenohSession]

class vyra_base.com.ZenohSession(config=None)[source]

Bases: object

Manages Zenoh session for VYRA modules.

Provides: - Session lifecycle management - Connection to Zenoh router - Query/Reply, Pub/Sub, and Task primitives - Automatic reconnection

Example

>>> config = SessionConfig(
...     mode=SessionMode.CLIENT,
...     connect=["tcp/zenoh-router:7447"]
... )
>>> session = ZenohSession(config)
>>> await session.open()
>>>
>>> # Use session for communication
>>> pub = session.declare_publisher("/topic")
>>> pub.put("Hello Zenoh")
>>>
>>> await session.close()
Parameters:

config (Optional[SessionConfig])

__init__(config=None)[source]

Initialize Zenoh session manager.

Parameters:

config (Optional[SessionConfig]) – Session configuration

property is_open: bool

Check if session is open.

property session: Any

Get underlying Zenoh session.

async open()[source]

Open Zenoh session and connect to router.

Returns:

True if session opened successfully

Return type:

bool

async close()[source]

Close Zenoh session and cleanup resources.

Return type:

None

declare_publisher(key_expr, **kwargs)[source]

Declare a Zenoh publisher.

Parameters:
  • key_expr (str) – Key expression (topic path)

  • **kwargs – Additional publisher options

Return type:

Any

Returns:

Zenoh Publisher

declare_subscriber(key_expr, callback, **kwargs)[source]

Declare a Zenoh subscriber.

Parameters:
  • key_expr (str) – Key expression (topic path)

  • callback – Callback for incoming messages

  • **kwargs – Additional subscriber options

Return type:

Any

Returns:

Zenoh Subscriber

declare_queryable(key_expr, callback, **kwargs)[source]

Declare a Zenoh queryable (service server).

Parameters:
  • key_expr (str) – Key expression (service path)

  • callback – Callback for query handling

  • **kwargs – Additional queryable options

Return type:

Any

Returns:

Zenoh Queryable

async get(key_expr, timeout=None, **kwargs)[source]

Send a Zenoh query (client request).

Parameters:
  • key_expr (str) – Key expression (service path)

  • timeout (Optional[float]) – Query timeout in seconds

  • **kwargs – Additional query parameters

Return type:

Any

Returns:

Query replies

class vyra_base.com.SessionConfig(mode=SessionMode.CLIENT, connect=<factory>, listen=<factory>, id=None, scouting_multicast=True, timeout_ms=5000)[source]

Bases: object

Configuration for Zenoh session.

Parameters:
mode

Session mode (peer/client/router)

connect

List of endpoints to connect to (e.g., [“tcp/zenoh-router:7447”])

listen

List of endpoints to listen on

id

Optional session ID

scouting_multicast

Enable multicast scouting for peer discovery

timeout_ms

Timeout for operations in milliseconds

mode: SessionMode = 'client'
connect: List[str]
listen: List[str]
id: str | None = None
scouting_multicast: bool = True
timeout_ms: int = 5000
to_zenoh_config()[source]

Convert to Zenoh configuration dictionary.

Return type:

Dict[str, Any]

__init__(mode=SessionMode.CLIENT, connect=<factory>, listen=<factory>, id=None, scouting_multicast=True, timeout_ms=5000)
Parameters:
Return type:

None

class vyra_base.com.SessionMode(*values)[source]

Bases: str, Enum

Zenoh session modes.

PEER = 'peer'
CLIENT = 'client'
ROUTER = 'router'
class vyra_base.com.RedisProvider(module_name, module_id, protocol=ProtocolType.REDIS, **redis_kwargs)[source]

Bases: AbstractProtocolProvider

Protocol provider for Redis communication.

Features: - Pub/Sub messaging via Publisher interface - Key-Value storage via Callable interface (get/set) - TLS support with ACL authentication - Streaming support (Redis Streams)

Wraps existing vyra_base.com.transport.redis.communication.RedisClient for seamless integration with established infrastructure.

Example

>>> # Initialize provider
>>> provider = RedisProvider(
...     protocol=ProtocolType.REDIS,
...     module_name="my_module"
... )
>>>
>>> if await provider.check_availability():
...     await provider.initialize()
...
...     # Create publisher (Pub/Sub)
...     publisher = await provider.create_publisher(
...         "sensor_updates",
...         module_name="robot"
...     )
...     await publisher.publish({"temperature": 23.5})
...
...     # Listen for messages
...     async def on_message(data):
...         print(f"Received: {data}")
...     await publisher.listen(on_message)
Parameters:
__init__(module_name, module_id, protocol=ProtocolType.REDIS, **redis_kwargs)[source]

Initialize Redis provider.

Parameters:
  • protocol (ProtocolType) – Protocol type (must be REDIS)

  • module_name (str) – Module name for Redis namespace

  • module_id (str) – Module ID for Redis namespace - host: Redis host (default: from env REDIS_HOST) - port: Redis port (default: from env REDIS_PORT) - username: Redis ACL username - password: Redis ACL password - use_tls: Enable TLS (default: from env)

async check_availability()[source]

Check if Redis is available.

Returns:

True if Redis client can be imported

Return type:

bool

async initialize(config=None)[source]

Initialize Redis client.

Parameters:

config (Optional[Dict[str, Any]]) – Optional configuration overrides

Returns:

True if initialization successful

Return type:

bool

Raises:

ProviderError – If initialization fails

async shutdown()[source]

Shutdown Redis client.

Return type:

None

async create_publisher(name, topic_builder=None, message_type=None, **kwargs)[source]

Create Redis Publisher (Pub/Sub).

Parameters:
  • name (str) – Publisher name

  • topic_builder (Optional[TopicBuilder]) – TopicBuilder instance (uses provider’s default if omitted)

  • message_type (type) – Message type class

  • **kwargs – Additional publisher options

Return type:

VyraPublisher

Returns:

RedisPublisherImpl instance

async create_subscriber(name, topic_builder=None, subscriber_callback=None, message_type=None, **kwargs)[source]

Create Redis Subscriber (Pub/Sub).

Parameters:
  • name (str) – Subscriber name

  • topic_builder (Optional[TopicBuilder]) – TopicBuilder instance (uses provider’s default if omitted)

  • subscriber_callback (Callable) – Async callback for received messages

  • message_type (type) – Message type class

  • **kwargs – Additional subscriber options

Return type:

VyraSubscriber

Returns:

RedisSubscriberImpl instance

async create_server(name, topic_builder=None, response_callback=None, service_type=None, **kwargs)[source]

Create Redis Server (request/response pattern).

Parameters:
  • name (str) – Server name

  • topic_builder (Optional[TopicBuilder]) – TopicBuilder instance (uses provider’s default if omitted)

  • response_callback (Optional[Callable]) – Async callback for handling requests

  • service_type (Optional[type]) – Service type class

  • **kwargs – Additional server options

Return type:

VyraServer

Returns:

RedisServerImpl instance

async create_client(name, topic_builder=None, service_type=None, request_callback=None, **kwargs)[source]

Create Redis Client (request/response pattern).

Parameters:
  • name (str) – Client name

  • topic_builder (Optional[TopicBuilder]) – TopicBuilder instance (uses provider’s default if omitted)

  • service_type (type) – Service type class

  • request_callback (Optional[Callable]) – Optional async callback for responses

  • **kwargs – Additional client options

Return type:

VyraClient

Returns:

RedisClientImpl instance

async create_action_server(name, topic_builder=None, handle_goal_request=None, handle_cancel_request=None, execution_callback=None, action_type=None, **kwargs)[source]

Create Redis Action Server (state tracking + pub/sub).

Parameters:
  • name (str) – Action server name

  • topic_builder (Optional[TopicBuilder]) – TopicBuilder instance (uses provider’s default if omitted)

  • handle_goal_request (Callable) – Async callback for goal requests

  • handle_cancel_request (Callable) – Async callback for cancel requests

  • execution_callback (Callable) – Async callback for goal execution

  • action_type (type) – Action type class

  • **kwargs – Additional action server options

Return type:

VyraActionServer

Returns:

RedisActionServerImpl instance

async create_action_client(name, topic_builder=None, action_type=None, direct_response_callback=None, feedback_callback=None, goal_callback=None, **kwargs)[source]

Create Redis Action Client.

Parameters:
  • name (str) – Action client name

  • topic_builder (Optional[TopicBuilder]) – TopicBuilder instance (uses provider’s default if omitted)

  • action_type (type) – Action type class

  • direct_response_callback (Optional[Callable]) – Optional async callback for results

  • feedback_callback (Optional[Callable]) – Optional async callback for feedback

  • goal_callback (Optional[Callable]) – Optional async callback for goal responses

  • **kwargs – Additional action client options

Return type:

VyraActionClient

Returns:

RedisActionClientImpl instance

get_client()[source]

Get underlying Redis client for advanced operations.

Returns:

Underlying vyra_base Redis client

Return type:

Any

Raises:

ProviderError – If provider not initialized

class vyra_base.com.RedisClient(**kwargs)[source]

Bases: object

Unified Redis Client for vyra_base Combines RedisAccess and RedisManipulator functionality with TLS support and streaming capabilities for professional communication

__init__(**kwargs)

Wrapper for synchronous functions.

Parameters:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Returns:

Result of the wrapped function.

async connect(**kwargs)

Wrapper for asynchronous functions.

Parameters:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Returns:

Result of the wrapped function.

async ping()[source]

Ping Redis server to check connectivity.

Return type:

bool

async configure_base_settings(**kwargs)

Wrapper for asynchronous functions.

Parameters:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Returns:

Result of the wrapped function.

async close(**kwargs)

Wrapper for asynchronous functions.

Parameters:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Returns:

Result of the wrapped function.

async get(**kwargs)

Wrapper for asynchronous functions.

Parameters:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Returns:

Result of the wrapped function.

async set(**kwargs)

Wrapper for asynchronous functions.

Parameters:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Returns:

Result of the wrapped function.

async delete(**kwargs)

Wrapper for asynchronous functions.

Parameters:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Returns:

Result of the wrapped function.

async exists(**kwargs)

Wrapper for asynchronous functions.

Parameters:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Returns:

Result of the wrapped function.

async clear(**kwargs)

Wrapper for asynchronous functions.

Parameters:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Returns:

Result of the wrapped function.

async get_all_keys(**kwargs)

Wrapper for asynchronous functions.

Parameters:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Returns:

Result of the wrapped function.

async get_keys_by_pattern(**kwargs)

Wrapper for asynchronous functions.

Parameters:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Returns:

Result of the wrapped function.

async get_type(**kwargs)

Wrapper for asynchronous functions.

Parameters:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Returns:

Result of the wrapped function.

async get_length(**kwargs)

Wrapper for asynchronous functions.

Parameters:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Returns:

Result of the wrapped function.

async publish_message(**kwargs)

Wrapper for asynchronous functions.

Parameters:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Returns:

Result of the wrapped function.

async subscribe_channel(**kwargs)

Wrapper for asynchronous functions.

Parameters:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Returns:

Result of the wrapped function.

async subscribe_to_key(**kwargs)

Wrapper for asynchronous functions.

Parameters:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Returns:

Result of the wrapped function.

async unsubscribe_from_key(**kwargs)

Wrapper for asynchronous functions.

Parameters:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Returns:

Result of the wrapped function.

async health_check(**kwargs)

Wrapper for asynchronous functions.

Parameters:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Returns:

Result of the wrapped function.

async subscribe_pattern(**kwargs)

Wrapper for asynchronous functions.

Parameters:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Returns:

Result of the wrapped function.

async publish_with_metadata(**kwargs)

Wrapper for asynchronous functions.

Parameters:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Returns:

Result of the wrapped function.

async create_pubsub_listener(**kwargs)

Wrapper for asynchronous functions.

Parameters:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Returns:

Result of the wrapped function.

async parse_message(**kwargs)

Wrapper for asynchronous functions.

Parameters:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Returns:

Result of the wrapped function.

async get_active_listeners(**kwargs)

Wrapper for asynchronous functions.

Parameters:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Returns:

Result of the wrapped function.

async remove_listener_channels(**kwargs)

Wrapper for asynchronous functions.

Parameters:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Returns:

Result of the wrapped function.

async stop_pubsub_listener(**kwargs)

Wrapper for asynchronous functions.

Parameters:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Returns:

Result of the wrapped function.

async publish_event(channel, data, add_metadata=True)[source]

Alias for publish_with_metadata (backward compatibility)

Return type:

int

Parameters:
async create_stream_listener(channels, callback_handler, callback_context=None)[source]

Alias for create_pubsub_listener (backward compatibility)

Return type:

None

Parameters:

channels (list[str])

async xadd(**kwargs)

Wrapper for asynchronous functions.

Parameters:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Returns:

Result of the wrapped function.

async xread(**kwargs)

Wrapper for asynchronous functions.

Parameters:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Returns:

Result of the wrapped function.

async xreadgroup(**kwargs)

Wrapper for asynchronous functions.

Parameters:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Returns:

Result of the wrapped function.

async xack(**kwargs)

Wrapper for asynchronous functions.

Parameters:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Returns:

Result of the wrapped function.

async xgroup_create(**kwargs)

Wrapper for asynchronous functions.

Parameters:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Returns:

Result of the wrapped function.

async xgroup_destroy(**kwargs)

Wrapper for asynchronous functions.

Parameters:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Returns:

Result of the wrapped function.

async xlen(**kwargs)

Wrapper for asynchronous functions.

Parameters:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Returns:

Result of the wrapped function.

async xtrim(**kwargs)

Wrapper for asynchronous functions.

Parameters:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Returns:

Result of the wrapped function.

async xpending(**kwargs)

Wrapper for asynchronous functions.

Parameters:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Returns:

Result of the wrapped function.

class vyra_base.com.UDSProvider(module_name, module_id, protocol=ProtocolType.UDS)[source]

Bases: AbstractProtocolProvider

Protocol provider for Unix Domain Socket transport.

Features: - Low-latency local IPC - Stream-based communication - Automatic connection management - No serialization overhead (JSON)

Requirements: - Unix-like OS (Linux, macOS) - File system access to /tmp/vyra_sockets

Limitations: - Local machine only - No pub/sub pattern (use Callable for request-response)

Example

>>> # Initialize provider
>>> provider = UDSProvider(ProtocolType.UDS)
>>>
>>> if await provider.check_availability():
...     await provider.initialize()
...
...     # Create callable (server)
...     async def handle_request(req):
...         return {"result": req["value"] * 2}
...
...     callable = await provider.create_callable(
...         "calculate",
...         handle_request,
...         module_name="math_service"
...     )
...
...     # Create client
...     client = await provider.create_callable(
...         "calculate",
...         None,  # No callback for client
...         module_name="math_service"
...     )
...     result = await client.call({"value": 21})
Parameters:
__init__(module_name, module_id, protocol=ProtocolType.UDS)[source]

Initialize UDS provider.

Parameters:
  • protocol (ProtocolType) – Protocol type (must be UDS)

  • module_name (str) – Default module name for interfaces

  • module_id (str)

async check_availability()[source]

Check if UDS transport is available.

Returns:

Always True on Unix-like systems

Return type:

bool

async initialize(config=None)[source]

Initialize UDS provider.

Parameters:

config (Optional[Dict[str, Any]]) – Optional configuration - socket_dir: Socket directory (default: /tmp/vyra_sockets) - connect_timeout: Connection timeout - call_timeout: Default call timeout

Returns:

True if initialization successful

Return type:

bool

async shutdown()[source]

Shutdown the provider and cleanup resources.

Return type:

None

async create_publisher(name, topic_builder=None, message_type=None, **kwargs)[source]

Create UDS Publisher (datagram sockets).

Parameters:
  • name (str) – Publisher name

  • topic_builder (Optional[TopicBuilder]) – TopicBuilder instance

  • message_type (Optional[type]) – Message type class

  • **kwargs – Additional publisher options (module_name override)

Return type:

VyraPublisher

Returns:

UdsPublisherImpl instance

async create_subscriber(name, topic_builder=None, subscriber_callback=None, message_type=None, **kwargs)[source]

Create UDS Subscriber (datagram sockets).

Parameters:
Return type:

VyraSubscriber

Returns:

UdsSubscriberImpl instance

async create_server(name, topic_builder=None, response_callback=None, service_type=None, **kwargs)[source]

Create UDS Server (stream sockets).

Parameters:
Return type:

VyraServer

Returns:

UdsServerImpl instance

async create_client(name, topic_builder=None, service_type=None, request_callback=None, **kwargs)[source]

Create UDS Client (stream sockets).

Parameters:
Return type:

VyraClient

Returns:

UdsClientImpl instance

async create_action_server(name, topic_builder=None, handle_goal_request=None, handle_cancel_request=None, execution_callback=None, action_type=None, **kwargs)[source]

Create UDS Action Server (stream sockets + state messages).

Parameters:
Return type:

VyraActionServer

Returns:

UdsActionServerImpl instance

async create_action_client(name, topic_builder=None, action_type=None, direct_response_callback=None, feedback_callback=None, goal_callback=None, **kwargs)[source]

Create UDS Action Client.

Parameters:
Return type:

VyraActionClient

Returns:

UdsActionClientImpl instance

class vyra_base.com.GrpcServer(socket_path, max_workers=10, options=None)[source]

Bases: object

gRPC Server over Unix Domain Socket.

Supports registering servicers for handling different RPC patterns.

Example:

class MyServiceServicer(my_service_pb2_grpc.MyServiceServicer):
    async def UnaryMethod(self, request, context):
        return MyResponse(result="success")

    async def ServerStreamingMethod(self, request, context):
        for i in range(10):
            yield MyResponse(result=f"item_{i}")

server = GrpcServer("/tmp/my_service.sock")
server.add_service(
    my_service_pb2_grpc.add_MyServiceServicer_to_server,
    MyServiceServicer()
)
await server.start()
Parameters:
  • socket_path (str | Path)

  • max_workers (int)

  • options (Optional[list])

__init__(socket_path, max_workers=10, options=None)[source]

Initialize gRPC Server.

Parameters:
  • socket_path (str | Path) – Path to Unix Domain Socket

  • max_workers (int) – Maximum number of worker threads

  • options (Optional[list]) – gRPC server options (e.g., max message size)

add_service(add_servicer_to_server_fn, servicer)[source]

Add a servicer to the server.

Parameters:
  • add_servicer_to_server_fn (Callable) – Generated function from protobuf (e.g., add_MyServiceServicer_to_server)

  • servicer (Any) – Instance of the servicer class

async start()[source]

Start the gRPC server on Unix Domain Socket.

Raises:

RuntimeError – If no servicers are registered

async wait_for_termination()[source]

Wait for server termination (blocking).

async stop(grace=5.0)[source]

Stop the gRPC server gracefully.

Parameters:

grace (Optional[float]) – Grace period in seconds for graceful shutdown

property is_running: bool

Check if server is running.

class vyra_base.com.GrpcClient(**kwargs)[source]

Bases: object

High-level gRPC Client wrapper.

Provides simple interface for gRPC communication without requiring proto files or code generation.

Features: - Unary RPC (request-response) - Automatic connection management - SSL/TLS support

Parameters:
  • target – gRPC server target (e.g., “localhost:50051”)

  • credentials – Optional SSL credentials

  • options – Optional channel options

  • timeout – Default timeout for operations

Example

>>> client = GrpcClient(target="192.168.1.10:50051")
>>> await client.connect()
>>>
>>> # Unary call
>>> response = await client.call_method("/service/Method", request_bytes)
>>>
>>> await client.close()
__init__(**kwargs)

Wrapper for synchronous functions.

Parameters:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Returns:

Result of the wrapped function.

property is_connected: bool

Check if client is connected.

property channel: grpc.aio.Channel | None

Get the gRPC channel.

async connect(**kwargs)

Wrapper for asynchronous functions.

Parameters:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Returns:

Result of the wrapped function.

async close(**kwargs)

Wrapper for asynchronous functions.

Parameters:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Returns:

Result of the wrapped function.

async call_method(**kwargs)

Wrapper for asynchronous functions.

Parameters:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Returns:

Result of the wrapped function.

class vyra_base.com.MqttClient(**kwargs)[source]

Bases: object

High-level MQTT Client wrapper.

Features: - Publish/Subscribe messaging - QoS support (0, 1, 2) - TLS support - Automatic reconnection - JSON serialization

Parameters:
  • broker – MQTT broker hostname

  • port – MQTT broker port (default: 1883, TLS: 8883)

  • client_id – Optional client ID

  • username – Optional username for authentication

  • password – Optional password for authentication

  • use_tls – Enable TLS/SSL

  • ca_cert – Path to CA certificate for TLS

  • keepalive – Keepalive interval in seconds

Example

>>> client = MqttClient(
...     broker="mqtt.example.com",
...     port=8883,
...     use_tls=True,
...     username="user",
...     password="pass"
... )
>>> await client.connect()
>>> await client.publish("sensors/temp", {"value": 23.5}, qos=1)
__init__(**kwargs)

Wrapper for synchronous functions.

Parameters:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Returns:

Result of the wrapped function.

async connect(**kwargs)

Wrapper for asynchronous functions.

Parameters:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Returns:

Result of the wrapped function.

async close(**kwargs)

Wrapper for asynchronous functions.

Parameters:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Returns:

Result of the wrapped function.

async publish(**kwargs)

Wrapper for asynchronous functions.

Parameters:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Returns:

Result of the wrapped function.

async subscribe(**kwargs)

Wrapper for asynchronous functions.

Parameters:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Returns:

Result of the wrapped function.

async unsubscribe(**kwargs)

Wrapper for asynchronous functions.

Parameters:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Returns:

Result of the wrapped function.

async health_check(**kwargs)

Wrapper for asynchronous functions.

Parameters:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Returns:

Result of the wrapped function.

class vyra_base.com.RestClient(**kwargs)[source]

Bases: object

High-level REST/HTTP Client wrapper.

Features: - GET, POST, PUT, DELETE, PATCH methods - JSON serialization - Headers management - Timeout support - TLS/SSL support

Parameters:
  • base_url – Base URL for API

  • timeout – Default timeout in seconds

  • headers – Default headers

  • verify_ssl – Verify SSL certificates

__init__(**kwargs)

Wrapper for synchronous functions.

Parameters:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Returns:

Result of the wrapped function.

async connect(**kwargs)

Wrapper for asynchronous functions.

Parameters:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Returns:

Result of the wrapped function.

async close(**kwargs)

Wrapper for asynchronous functions.

Parameters:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Returns:

Result of the wrapped function.

async get(**kwargs)

Wrapper for asynchronous functions.

Parameters:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Returns:

Result of the wrapped function.

async post(**kwargs)

Wrapper for asynchronous functions.

Parameters:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Returns:

Result of the wrapped function.

async put(**kwargs)

Wrapper for asynchronous functions.

Parameters:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Returns:

Result of the wrapped function.

async delete(**kwargs)

Wrapper for asynchronous functions.

Parameters:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Returns:

Result of the wrapped function.

async health_check(**kwargs)

Wrapper for asynchronous functions.

Parameters:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Returns:

Result of the wrapped function.

class vyra_base.com.WebSocketClient(**kwargs)[source]

Bases: object

High-level WebSocket Client wrapper.

Features: - Send/Receive messages - JSON serialization - Automatic reconnection - Callback-based message handling

Parameters:
  • url – WebSocket URL (ws:// or wss://)

  • timeout – Connection timeout in seconds

  • headers – Optional headers for connection

__init__(**kwargs)

Wrapper for synchronous functions.

Parameters:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Returns:

Result of the wrapped function.

async connect(**kwargs)

Wrapper for asynchronous functions.

Parameters:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Returns:

Result of the wrapped function.

async close(**kwargs)

Wrapper for asynchronous functions.

Parameters:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Returns:

Result of the wrapped function.

async send(**kwargs)

Wrapper for asynchronous functions.

Parameters:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Returns:

Result of the wrapped function.

async receive(**kwargs)

Wrapper for asynchronous functions.

Parameters:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Returns:

Result of the wrapped function.

async listen(**kwargs)

Wrapper for asynchronous functions.

Parameters:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Returns:

Result of the wrapped function.

async health_check(**kwargs)

Wrapper for asynchronous functions.

Parameters:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Returns:

Result of the wrapped function.