Zenoh Transport

The Zenoh transport is the default and recommended protocol for VYRA modules.

Zenoh Transport Module

Provides Zenoh-based transport implementation with layered architecture:

Layers:
  • communication/: Core Zenoh functionality (Query/Reply, Pub/Sub, Tasks)

  • vyra_models/: VYRA abstractions (ZenohCallable, ZenohPublisher, ZenohJob)

  • session.py: Zenoh session management and lifecycle

  • provider.py: Interface layer for VYRA integration

Features:
  • Query/Reply pattern for request-response (ZenohCallable)

  • Pub/Sub for topic-based messaging (ZenohPublisher)

  • Task-based long-running operations (ZenohJob)

  • Zero-copy and efficient serialization

  • Router-based architecture for scalability

  • Built-in discovery and fault tolerance

Usage:

from vyra_base.com.transport.t_zenoh import ZenohProvider, ZENOH_AVAILABLE

if ZENOH_AVAILABLE:

provider = ZenohProvider() await provider.initialize(config={

“mode”: “client”, “connect”: [“tcp/zenoh-router:7447”]

}) callable = await provider.create_callable(“/service”, callback) publisher = await provider.create_publisher(“/topic”)

class vyra_base.com.transport.t_zenoh.ZenohSession(config=None)[source]

Bases: object

Manages Zenoh session for VYRA modules.

Provides: - Session lifecycle management - Connection to Zenoh router - Query/Reply, Pub/Sub, and Task primitives - Automatic reconnection

Example

>>> config = SessionConfig(
...     mode=SessionMode.CLIENT,
...     connect=["tcp/zenoh-router:7447"]
... )
>>> session = ZenohSession(config)
>>> await session.open()
>>>
>>> # Use session for communication
>>> pub = session.declare_publisher("/topic")
>>> pub.put("Hello Zenoh")
>>>
>>> await session.close()
Parameters:

config (Optional[SessionConfig])

__init__(config=None)[source]

Initialize Zenoh session manager.

Parameters:

config (Optional[SessionConfig]) – Session configuration

property is_open: bool

Check if session is open.

property session: Any

Get underlying Zenoh session.

async open()[source]

Open Zenoh session and connect to router.

Returns:

True if session opened successfully

Return type:

bool

async close()[source]

Close Zenoh session and cleanup resources.

Return type:

None

declare_publisher(key_expr, **kwargs)[source]

Declare a Zenoh publisher.

Parameters:
  • key_expr (str) – Key expression (topic path)

  • **kwargs – Additional publisher options

Return type:

Any

Returns:

Zenoh Publisher

declare_subscriber(key_expr, callback, **kwargs)[source]

Declare a Zenoh subscriber.

Parameters:
  • key_expr (str) – Key expression (topic path)

  • callback – Callback for incoming messages

  • **kwargs – Additional subscriber options

Return type:

Any

Returns:

Zenoh Subscriber

declare_queryable(key_expr, callback, **kwargs)[source]

Declare a Zenoh queryable (service server).

Parameters:
  • key_expr (str) – Key expression (service path)

  • callback – Callback for query handling

  • **kwargs – Additional queryable options

Return type:

Any

Returns:

Zenoh Queryable

async get(key_expr, timeout=None, **kwargs)[source]

Send a Zenoh query (client request).

Parameters:
  • key_expr (str) – Key expression (service path)

  • timeout (Optional[float]) – Query timeout in seconds

  • **kwargs – Additional query parameters

Return type:

Any

Returns:

Query replies

class vyra_base.com.transport.t_zenoh.SessionConfig(mode=SessionMode.CLIENT, connect=<factory>, listen=<factory>, id=None, scouting_multicast=True, timeout_ms=5000)[source]

Bases: object

Configuration for Zenoh session.

Parameters:
mode

Session mode (peer/client/router)

connect

List of endpoints to connect to (e.g., [“tcp/zenoh-router:7447”])

listen

List of endpoints to listen on

id

Optional session ID

scouting_multicast

Enable multicast scouting for peer discovery

timeout_ms

Timeout for operations in milliseconds

mode: SessionMode = 'client'
connect: List[str]
listen: List[str]
id: str | None = None
scouting_multicast: bool = True
timeout_ms: int = 5000
to_zenoh_config()[source]

Convert to Zenoh configuration dictionary.

Return type:

Dict[str, Any]

__init__(mode=SessionMode.CLIENT, connect=<factory>, listen=<factory>, id=None, scouting_multicast=True, timeout_ms=5000)
Parameters:
Return type:

None

class vyra_base.com.transport.t_zenoh.SessionMode(*values)[source]

Bases: str, Enum

Zenoh session modes.

PEER = 'peer'
CLIENT = 'client'
ROUTER = 'router'
class vyra_base.com.transport.t_zenoh.VyraPublisherImpl(name, topic_builder, zenoh_session, message_type=None, **kwargs)[source]

Bases: VyraPublisher

Vyra-based publisher implementation.

Wraps ZenohPublisher for consistent async interface. Zenoh is async-native, no callback adapter needed.

Parameters:
__init__(name, topic_builder, zenoh_session, message_type=None, **kwargs)[source]
Parameters:
async initialize()[source]

Initialize Zenoh publisher.

Return type:

bool

async publish(message)[source]

Publish message via Zenoh.

Parameters:

message (Any) – Message instance (msg_type) or dict

Return type:

bool

Returns:

True on success

async cleanup()[source]

Cleanup Zenoh resources.

async shutdown()[source]

Shutdown publisher.

Return type:

None

class vyra_base.com.transport.t_zenoh.VyraSubscriberImpl(name, topic_builder, zenoh_session, subscriber_callback=None, message_type=None, **kwargs)[source]

Bases: VyraSubscriber

Vyra-based subscriber implementation.

Wraps ZenohSubscriber for consistent async interface. Zenoh callbacks are async-native.

Parameters:
__init__(name, topic_builder, zenoh_session, subscriber_callback=None, message_type=None, **kwargs)[source]
Parameters:
async initialize()[source]

Initialize Zenoh subscriber.

Return type:

bool

async subscribe()[source]

Start subscribing to Zenoh topic.

Note: Subscriber is already active after initialize(). This method exists for API compatibility.

Return type:

bool

Returns:

True on success

async cleanup()[source]

Cleanup Zenoh resources.

async shutdown()[source]

Shutdown subscriber.

Return type:

None

class vyra_base.com.transport.t_zenoh.VyraServerImpl(name, topic_builder, zenoh_session, service_type=None, response_callback=None, **kwargs)[source]

Bases: VyraServer

Vyra-based server implementation using Queryable.

TODO: Implement using ZenohQueryable from communication layer. Pattern: Queryable receives queries and responds with query.reply()

Parameters:
__init__(name, topic_builder, zenoh_session, service_type=None, response_callback=None, **kwargs)[source]
Parameters:
async initialize()[source]

Initialize Zenoh queryable (server).

Return type:

bool

async serve()[source]

Start serving queries.

Return type:

bool

Returns:

True on success

async cleanup()[source]

Cleanup Zenoh resources.

async shutdown()[source]

Shutdown server.

Return type:

None

class vyra_base.com.transport.t_zenoh.VyraClientImpl(name, topic_builder, request_callback=None, zenoh_session=None, service_type=None, **kwargs)[source]

Bases: VyraClient

Vyra-based client implementation using Query (get).

TODO: Implement using session.get() for queries. Pattern: Send query to key, receive replies from queryables

Parameters:
__init__(name, topic_builder, request_callback=None, zenoh_session=None, service_type=None, **kwargs)[source]
Parameters:
async initialize()[source]

Initialize Zenoh client.

Return type:

bool

async call(request, timeout=5.0)[source]

Send query to Zenoh server and await response.

Parameters:
  • request (Any) – Request message instance

  • timeout (float) – Query timeout in seconds

Return type:

Optional[Any]

Returns:

Response object on success, None on failure

async cleanup()[source]

Cleanup resources (none for stateless client).

async shutdown()[source]

Shutdown client.

Return type:

None

class vyra_base.com.transport.t_zenoh.VyraActionServerImpl(name, topic_builder, handle_goal_request, handle_cancel_request, execution_callback, zenoh_session=None, action_type=None, **kwargs)[source]

Bases: VyraActionServer

Vyra-based action server implementation.

Architecture: - Queryable at {namespace}/{fn}/goal for goal requests - Queryable at {namespace}/{fn}/cancel for cancel requests - Publisher at {namespace}/{fn}/{goal_id}/feedback for feedback - Publisher at {namespace}/{fn}/{goal_id}/result for results

Channel keys are built by _action_channel() which stacks the protocol-level sub-channel on top of the config-level namespace / subsection following the VYRA topic convention.

Parameters:
__init__(name, topic_builder, handle_goal_request, handle_cancel_request, execution_callback, zenoh_session=None, action_type=None, **kwargs)[source]
Parameters:
async initialize()[source]

Initialize Zenoh action server.

Return type:

bool

async publish_feedback(goal_id, feedback)[source]

Publish feedback for active goal.

Parameters:
  • goal_id (str)

  • feedback (Any)

async cleanup()[source]

Cleanup Zenoh resources.

async shutdown()[source]

Shutdown action server.

Return type:

None

class vyra_base.com.transport.t_zenoh.VyraActionClientImpl(name, topic_builder, direct_response=None, feedback_callback=None, goal_response_callback=None, zenoh_session=None, action_type=None, **kwargs)[source]

Bases: VyraActionClient

Vyra-based action client implementation.

Architecture: - Query to {namespace}/{fn}/goal for sending goals - Subscriber to {namespace}/{fn}/{goal_id}/feedback for feedback - Subscriber to {namespace}/{fn}/{goal_id}/result for result - Query to {namespace}/{fn}/cancel for canceling

Channel keys are built by _action_channel() following the VYRA topic naming convention.

Parameters:
__init__(name, topic_builder, direct_response=None, feedback_callback=None, goal_response_callback=None, zenoh_session=None, action_type=None, **kwargs)[source]
Parameters:
async initialize()[source]

Initialize Zenoh action client.

Return type:

bool

async send_goal(goal)[source]

Send goal to action server.

Parameters:

goal (Any) – Goal message instance

Return type:

Optional[str]

Returns:

goal_id on success, None on failure

TODO: 1. Serialize goal 2. Query to self._key_goal (built via _action_channel) 3. Parse response for goal_id 4. Subscribe to feedback and result topics 5. Call goal_response_callback if set

async cancel_goal(goal_id)[source]

Cancel active goal.

Return type:

bool

Parameters:

goal_id (str)

async cleanup()[source]

Cleanup Zenoh resources.

async shutdown()[source]

Shutdown action client.

Return type:

None

class vyra_base.com.transport.t_zenoh.ZenohProvider(module_name, module_id, protocol=ProtocolType.ZENOH)[source]

Bases: AbstractProtocolProvider

Protocol provider for Zenoh transport.

Features: - Efficient Pub/Sub with zero-copy capabilities - Query/Reply for request-response patterns - Router-based scalability - Built-in discovery and fault tolerance - Multi-protocol support (TCP, UDP, shared memory)

Requirements: - eclipse-zenoh Python package - Zenoh router (typically Docker service)

Example

>>> # Initialize provider
>>> provider = ZenohProvider(ProtocolType.ZENOH)
>>>
>>> if await provider.check_availability():
...     await provider.initialize(config={
...         "mode": "client",
...         "connect": ["tcp/zenoh-router:7447"]
...     })
...
...     # Create callable (query/reply)
...     async def handle_request(req):
...         return {"result": req["value"] * 2}
...
...     callable = await provider.create_callable(
...         "/calculate",
...         handle_request
...     )
...
...     # Create publisher (pub/sub)
...     publisher = await provider.create_publisher("/sensor_data")
Parameters:
__init__(module_name, module_id, protocol=ProtocolType.ZENOH)[source]

Initialize Zenoh provider.

Parameters:
  • protocol (ProtocolType) – Protocol type (must be ZENOH)

  • module_name (str)

  • module_id (str)

async check_availability()[source]

Check if Zenoh is available.

Returns:

True if zenoh-python is installed

Return type:

bool

async initialize(config=None)[source]

Initialize Zenoh provider and open session.

Parameters:

config (Optional[Dict[str, Any]]) – Configuration dictionary: - mode: “peer”, “client”, or “router” (default: “client”) - connect: List of endpoints (default: [“tcp/zenoh-router:7447”]) - listen: List of listen endpoints - format: Serialization format (default: “json”)

Returns:

True if initialization successful

Return type:

bool

async shutdown()[source]

Shutdown Zenoh provider and close session.

Return type:

None

async create_publisher(name, topic_builder=None, message_type=None, **kwargs)[source]

Create Zenoh Publisher.

Parameters:
  • name (str) – Publisher name

  • topic_builder (Optional[TopicBuilder]) – TopicBuilder instance

  • message_type (Optional[type]) – Message type class

  • **kwargs – Additional publisher options

Return type:

VyraPublisher

Returns:

ZenohPublisherImpl instance

async create_subscriber(name, topic_builder=None, subscriber_callback=None, message_type=None, **kwargs)[source]

Create Zenoh Subscriber.

Parameters:
  • name (str) – Subscriber name

  • topic_builder (Optional[TopicBuilder]) – TopicBuilder instance

  • subscriber_callback (Optional[Callable]) – Async callback for received messages

  • message_type (Optional[type]) – Message type class

  • **kwargs – Additional subscriber options

Return type:

VyraSubscriber

Returns:

ZenohSubscriberImpl instance

async create_server(name, topic_builder=None, response_callback=None, service_type=None, **kwargs)[source]

Create Zenoh Server (Queryable).

Parameters:
  • name (str) – Server name

  • topic_builder (Optional[TopicBuilder]) – TopicBuilder instance

  • response_callback (Optional[Callable]) – Async callback for handling requests

  • service_type (Optional[type]) – Service type class

  • **kwargs – Additional server options

Return type:

VyraServer

Returns:

ZenohServerImpl instance

async create_client(name, topic_builder=None, service_type=None, request_callback=None, **kwargs)[source]

Create Zenoh Client (Query sender).

Parameters:
  • name (str) – Client name

  • topic_builder (Optional[TopicBuilder]) – TopicBuilder instance (uses provider’s default if omitted)

  • service_type (Optional[type]) – Optional service type class (ignored in Zenoh – schema-less)

  • request_callback (Optional[Callable]) – Optional async callback for responses

  • **kwargs – Additional client options

Return type:

VyraClient

Returns:

ZenohClientImpl instance

async create_action_server(name, topic_builder=None, handle_goal_request=None, handle_cancel_request=None, execution_callback=None, action_type=None, **kwargs)[source]

Create Zenoh Action Server.

Parameters:
  • name (str) – Action server name

  • topic_builder (Optional[TopicBuilder]) – TopicBuilder instance

  • handle_goal_request (Optional[Callable]) – Async callback for goal requests

  • handle_cancel_request (Optional[Callable]) – Async callback for cancel requests

  • execution_callback (Optional[Callable]) – Async callback for goal execution

  • action_type (Optional[type]) – Action type class

  • **kwargs – Additional action server options

Return type:

VyraActionServer

Returns:

ZenohActionServerImpl instance

async create_action_client(name, topic_builder=None, action_type=None, direct_response=None, feedback_callback=None, goal_response_callback=None, direct_response_callback=None, goal_callback=None, **kwargs)[source]

Create Zenoh Action Client.

Parameters:
  • name (str) – Action client name

  • topic_builder (Optional[TopicBuilder]) – TopicBuilder instance

  • action_type (Optional[type]) – Action type class

  • direct_response (Optional[Callable]) – Optional async callback for results

  • feedback_callback (Optional[Callable]) – Optional async callback for feedback

  • goal_response_callback (Optional[Callable]) – Optional async callback for goal responses

  • **kwargs – Additional action client options

  • direct_response_callback (Callable | None)

  • goal_callback (Callable | None)

Return type:

VyraActionClient

Returns:

ZenohActionClientImpl instance

require_initialization()[source]

Ensure provider is initialized.

Raises:

ProviderError – If not initialized

Return type:

None

get_session()[source]

Get the underlying Zenoh session.

Return type:

Optional[ZenohSession]

Overview

Zenoh is a high-performance publish/subscribe and query/reply protocol designed for distributed systems. It requires no central broker, works through NAT, and supports zero-copy for high-throughput scenarios.

Property

Value

Module

vyra_base.com.transport.t_zenoh

ProtocolType

ProtocolType.ZENOH

Default

Yes — first in fallback chain

Install

pip install eclipse-zenoh

Pattern

Pub/Sub + Query/Reply (queryable = server, query_client = client)

Key Classes

Class

Description

ZenohProvider

Protocol provider; registers Zenoh with ProviderRegistry

ZenohSession

Manages the Zenoh session lifecycle

SessionConfig

Configuration dataclass for the Zenoh session

SessionMode

Enum: CLIENT, PEER, ROUTER

Usage

Zenoh is used automatically via InterfaceFactory when available. To configure the session manually:

from vyra_base.com.transport.t_zenoh import ZenohProvider, ZenohSession, SessionConfig, SessionMode

config = SessionConfig(
    mode=SessionMode.PEER,
    connect=["tcp/localhost:7447"],
)
session = ZenohSession(config)
await session.open()

provider = ZenohProvider(session=session)
await provider.initialize()

For standard module use, Zenoh is initialized automatically at startup. See Quickstart — Building a VYRA Module for the full module initialization pattern.