Zenoh Transport

The Zenoh transport is the default and recommended protocol for VYRA modules.

Zenoh Transport Module

Provides Zenoh-based transport implementation with layered architecture:

Layers:
  • communication/: Core Zenoh functionality (Query/Reply, Pub/Sub, Tasks)

  • vyra_models/: VYRA abstractions (ZenohCallable, ZenohPublisher, ZenohJob)

  • session.py: Zenoh session management and lifecycle

  • provider.py: Interface layer for VYRA integration

Features:
  • Query/Reply pattern for request-response (ZenohCallable)

  • Pub/Sub for topic-based messaging (ZenohPublisher)

  • Task-based long-running operations (ZenohJob)

  • Zero-copy and efficient serialization

  • Router-based architecture for scalability

  • Built-in discovery and fault tolerance

Usage:

from vyra_base.com.transport.t_zenoh import ZenohProvider, ZENOH_AVAILABLE

if ZENOH_AVAILABLE:

provider = ZenohProvider() await provider.initialize(config={

„mode“: „client“, „connect“: [„tcp/zenoh-router:7447“]

}) callable = await provider.create_callable(„/service“, callback) publisher = await provider.create_publisher(„/topic“)

class vyra_base.com.transport.t_zenoh.ZenohSession(config=None)[Quellcode]

Bases: object

Manages Zenoh session for VYRA modules.

Provides: - Session lifecycle management - Connection to Zenoh router - Query/Reply, Pub/Sub, and Task primitives - Automatic reconnection

Example

>>> config = SessionConfig(
...     mode=SessionMode.CLIENT,
...     connect=["tcp/zenoh-router:7447"]
... )
>>> session = ZenohSession(config)
>>> await session.open()
>>>
>>> # Use session for communication
>>> pub = session.declare_publisher("/topic")
>>> pub.put("Hello Zenoh")
>>>
>>> await session.close()
Parameter:

config (Optional[SessionConfig])

__init__(config=None)[Quellcode]

Initialize Zenoh session manager.

Parameter:

config (Optional[SessionConfig]) – Session configuration

property is_open: bool

Check if session is open.

property session: Any

Get underlying Zenoh session.

async open()[Quellcode]

Open Zenoh session and connect to router.

Rückgabe:

True if session opened successfully

Rückgabetyp:

bool

async close()[Quellcode]

Close Zenoh session and cleanup resources.

Rückgabetyp:

None

declare_publisher(key_expr, **kwargs)[Quellcode]

Declare a Zenoh publisher.

Parameter:
  • key_expr (str) – Key expression (topic path)

  • **kwargs – Additional publisher options

Rückgabetyp:

Any

Rückgabe:

Zenoh Publisher

declare_subscriber(key_expr, callback, **kwargs)[Quellcode]

Declare a Zenoh subscriber.

Parameter:
  • key_expr (str) – Key expression (topic path)

  • callback – Callback for incoming messages

  • **kwargs – Additional subscriber options

Rückgabetyp:

Any

Rückgabe:

Zenoh Subscriber

declare_queryable(key_expr, callback, **kwargs)[Quellcode]

Declare a Zenoh queryable (service server).

Parameter:
  • key_expr (str) – Key expression (service path)

  • callback – Callback for query handling

  • **kwargs – Additional queryable options

Rückgabetyp:

Any

Rückgabe:

Zenoh Queryable

async get(key_expr, timeout=None, **kwargs)[Quellcode]

Send a Zenoh query (client request).

Parameter:
  • key_expr (str) – Key expression (service path)

  • timeout (Optional[float]) – Query timeout in seconds

  • **kwargs – Additional query parameters

Rückgabetyp:

Any

Rückgabe:

Query replies

class vyra_base.com.transport.t_zenoh.SessionConfig(mode=SessionMode.CLIENT, connect=<factory>, listen=<factory>, id=None, scouting_multicast=True, timeout_ms=5000)[Quellcode]

Bases: object

Configuration for Zenoh session.

Parameter:
mode

Session mode (peer/client/router)

connect

List of endpoints to connect to (e.g., [„tcp/zenoh-router:7447“])

listen

List of endpoints to listen on

id

Optional session ID

scouting_multicast

Enable multicast scouting for peer discovery

timeout_ms

Timeout for operations in milliseconds

mode: SessionMode = 'client'
connect: List[str]
listen: List[str]
id: str | None = None
scouting_multicast: bool = True
timeout_ms: int = 5000
to_zenoh_config()[Quellcode]

Convert to Zenoh configuration dictionary.

Rückgabetyp:

Dict[str, Any]

__init__(mode=SessionMode.CLIENT, connect=<factory>, listen=<factory>, id=None, scouting_multicast=True, timeout_ms=5000)
Parameter:
Rückgabetyp:

None

class vyra_base.com.transport.t_zenoh.SessionMode(*values)[Quellcode]

Bases: str, Enum

Zenoh session modes.

PEER = 'peer'
CLIENT = 'client'
ROUTER = 'router'
class vyra_base.com.transport.t_zenoh.VyraPublisherImpl(name, topic_builder, zenoh_session, message_type=None, **kwargs)[Quellcode]

Bases: VyraPublisher

Vyra-based publisher implementation.

Wraps ZenohPublisher for consistent async interface. Zenoh is async-native, no callback adapter needed.

Parameter:
__init__(name, topic_builder, zenoh_session, message_type=None, **kwargs)[Quellcode]
Parameter:
async initialize()[Quellcode]

Initialize Zenoh publisher.

Rückgabetyp:

bool

async publish(message)[Quellcode]

Publish message via Zenoh.

Parameter:

message (Any) – Message instance (msg_type) or dict

Rückgabetyp:

bool

Rückgabe:

True on success

async cleanup()[Quellcode]

Cleanup Zenoh resources.

async shutdown()[Quellcode]

Shutdown publisher.

Rückgabetyp:

None

class vyra_base.com.transport.t_zenoh.VyraSubscriberImpl(name, topic_builder, zenoh_session, subscriber_callback=None, message_type=None, **kwargs)[Quellcode]

Bases: VyraSubscriber

Vyra-based subscriber implementation.

Wraps ZenohSubscriber for consistent async interface. Zenoh callbacks are async-native.

Parameter:
__init__(name, topic_builder, zenoh_session, subscriber_callback=None, message_type=None, **kwargs)[Quellcode]
Parameter:
async initialize()[Quellcode]

Initialize Zenoh subscriber.

Rückgabetyp:

bool

async subscribe()[Quellcode]

Start subscribing to Zenoh topic.

Note: Subscriber is already active after initialize(). This method exists for API compatibility.

Rückgabetyp:

bool

Rückgabe:

True on success

async cleanup()[Quellcode]

Cleanup Zenoh resources.

async shutdown()[Quellcode]

Shutdown subscriber.

Rückgabetyp:

None

class vyra_base.com.transport.t_zenoh.VyraServerImpl(name, topic_builder, zenoh_session, service_type=None, response_callback=None, **kwargs)[Quellcode]

Bases: VyraServer

Vyra-based server implementation using Queryable.

TODO: Implement using ZenohQueryable from communication layer. Pattern: Queryable receives queries and responds with query.reply()

Parameter:
__init__(name, topic_builder, zenoh_session, service_type=None, response_callback=None, **kwargs)[Quellcode]
Parameter:
async initialize()[Quellcode]

Initialize Zenoh queryable (server).

Rückgabetyp:

bool

async serve()[Quellcode]

Start serving queries.

Rückgabetyp:

bool

Rückgabe:

True on success

async cleanup()[Quellcode]

Cleanup Zenoh resources.

async shutdown()[Quellcode]

Shutdown server.

Rückgabetyp:

None

class vyra_base.com.transport.t_zenoh.VyraClientImpl(name, topic_builder, request_callback=None, zenoh_session=None, service_type=None, **kwargs)[Quellcode]

Bases: VyraClient

Vyra-based client implementation using Query (get).

TODO: Implement using session.get() for queries. Pattern: Send query to key, receive replies from queryables

Parameter:
__init__(name, topic_builder, request_callback=None, zenoh_session=None, service_type=None, **kwargs)[Quellcode]
Parameter:
async initialize()[Quellcode]

Initialize Zenoh client.

Rückgabetyp:

bool

async call(request, timeout=5.0)[Quellcode]

Send query to Zenoh server and await response.

Parameter:
  • request (Any) – Request message instance

  • timeout (float) – Query timeout in seconds

Rückgabetyp:

Optional[Any]

Rückgabe:

Response object on success, None on failure

async cleanup()[Quellcode]

Cleanup resources (none for stateless client).

async shutdown()[Quellcode]

Shutdown client.

Rückgabetyp:

None

class vyra_base.com.transport.t_zenoh.VyraActionServerImpl(name, topic_builder, handle_goal_request, handle_cancel_request, execution_callback, zenoh_session=None, action_type=None, **kwargs)[Quellcode]

Bases: VyraActionServer

Vyra-based action server implementation.

Architecture: - Queryable at {namespace}/{fn}/goal for goal requests - Queryable at {namespace}/{fn}/cancel for cancel requests - Publisher at {namespace}/{fn}/{goal_id}/feedback for feedback - Publisher at {namespace}/{fn}/{goal_id}/result for results

Channel keys are built by _action_channel() which stacks the protocol-level sub-channel on top of the config-level namespace / subsection following the VYRA topic convention.

Parameter:
__init__(name, topic_builder, handle_goal_request, handle_cancel_request, execution_callback, zenoh_session=None, action_type=None, **kwargs)[Quellcode]
Parameter:
async initialize()[Quellcode]

Initialize Zenoh action server.

Rückgabetyp:

bool

async publish_feedback(goal_id, feedback)[Quellcode]

Publish feedback for active goal.

Parameter:
  • goal_id (str)

  • feedback (Any)

async cleanup()[Quellcode]

Cleanup Zenoh resources.

async shutdown()[Quellcode]

Shutdown action server.

Rückgabetyp:

None

class vyra_base.com.transport.t_zenoh.VyraActionClientImpl(name, topic_builder, direct_response=None, feedback_callback=None, goal_response_callback=None, zenoh_session=None, action_type=None, **kwargs)[Quellcode]

Bases: VyraActionClient

Vyra-based action client implementation.

Architecture: - Query to {namespace}/{fn}/goal for sending goals - Subscriber to {namespace}/{fn}/{goal_id}/feedback for feedback - Subscriber to {namespace}/{fn}/{goal_id}/result for result - Query to {namespace}/{fn}/cancel for canceling

Channel keys are built by _action_channel() following the VYRA topic naming convention.

Parameter:
__init__(name, topic_builder, direct_response=None, feedback_callback=None, goal_response_callback=None, zenoh_session=None, action_type=None, **kwargs)[Quellcode]
Parameter:
async initialize()[Quellcode]

Initialize Zenoh action client.

Rückgabetyp:

bool

async send_goal(goal)[Quellcode]

Send goal to action server.

Parameter:

goal (Any) – Goal message instance

Rückgabetyp:

Optional[str]

Rückgabe:

goal_id on success, None on failure

TODO: 1. Serialize goal 2. Query to self._key_goal (built via _action_channel) 3. Parse response for goal_id 4. Subscribe to feedback and result topics 5. Call goal_response_callback if set

async cancel_goal(goal_id)[Quellcode]

Cancel active goal.

Rückgabetyp:

bool

Parameter:

goal_id (str)

async cleanup()[Quellcode]

Cleanup Zenoh resources.

async shutdown()[Quellcode]

Shutdown action client.

Rückgabetyp:

None

class vyra_base.com.transport.t_zenoh.ZenohProvider(module_name, module_id, protocol=ProtocolType.ZENOH)[Quellcode]

Bases: AbstractProtocolProvider

Protocol provider for Zenoh transport.

Features: - Efficient Pub/Sub with zero-copy capabilities - Query/Reply for request-response patterns - Router-based scalability - Built-in discovery and fault tolerance - Multi-protocol support (TCP, UDP, shared memory)

Requirements: - eclipse-zenoh Python package - Zenoh router (typically Docker service)

Example

>>> # Initialize provider
>>> provider = ZenohProvider(ProtocolType.ZENOH)
>>>
>>> if await provider.check_availability():
...     await provider.initialize(config={
...         "mode": "client",
...         "connect": ["tcp/zenoh-router:7447"]
...     })
...
...     # Create callable (query/reply)
...     async def handle_request(req):
...         return {"result": req["value"] * 2}
...
...     callable = await provider.create_callable(
...         "/calculate",
...         handle_request
...     )
...
...     # Create publisher (pub/sub)
...     publisher = await provider.create_publisher("/sensor_data")
Parameter:
__init__(module_name, module_id, protocol=ProtocolType.ZENOH)[Quellcode]

Initialize Zenoh provider.

Parameter:
  • protocol (ProtocolType) – Protocol type (must be ZENOH)

  • module_name (str)

  • module_id (str)

async check_availability()[Quellcode]

Check if Zenoh is available.

Rückgabe:

True if zenoh-python is installed

Rückgabetyp:

bool

async initialize(config=None)[Quellcode]

Initialize Zenoh provider and open session.

Parameter:

config (Optional[Dict[str, Any]]) – Configuration dictionary: - mode: „peer“, „client“, or „router“ (default: „client“) - connect: List of endpoints (default: [„tcp/zenoh-router:7447“]) - listen: List of listen endpoints - format: Serialization format (default: „json“)

Rückgabe:

True if initialization successful

Rückgabetyp:

bool

async shutdown()[Quellcode]

Shutdown Zenoh provider and close session.

Rückgabetyp:

None

async create_publisher(name, topic_builder=None, message_type=None, **kwargs)[Quellcode]

Create Zenoh Publisher.

Parameter:
  • name (str) – Publisher name

  • topic_builder (Optional[TopicBuilder]) – TopicBuilder instance

  • message_type (Optional[type]) – Message type class

  • **kwargs – Additional publisher options

Rückgabetyp:

VyraPublisher

Rückgabe:

ZenohPublisherImpl instance

async create_subscriber(name, topic_builder=None, subscriber_callback=None, message_type=None, **kwargs)[Quellcode]

Create Zenoh Subscriber.

Parameter:
  • name (str) – Subscriber name

  • topic_builder (Optional[TopicBuilder]) – TopicBuilder instance

  • subscriber_callback (Optional[Callable]) – Async callback for received messages

  • message_type (Optional[type]) – Message type class

  • **kwargs – Additional subscriber options

Rückgabetyp:

VyraSubscriber

Rückgabe:

ZenohSubscriberImpl instance

async create_server(name, topic_builder=None, response_callback=None, service_type=None, **kwargs)[Quellcode]

Create Zenoh Server (Queryable).

Parameter:
  • name (str) – Server name

  • topic_builder (Optional[TopicBuilder]) – TopicBuilder instance

  • response_callback (Optional[Callable]) – Async callback for handling requests

  • service_type (Optional[type]) – Service type class

  • **kwargs – Additional server options

Rückgabetyp:

VyraServer

Rückgabe:

ZenohServerImpl instance

async create_client(name, topic_builder=None, service_type=None, request_callback=None, **kwargs)[Quellcode]

Create Zenoh Client (Query sender).

Parameter:
  • name (str) – Client name

  • topic_builder (Optional[TopicBuilder]) – TopicBuilder instance (uses provider’s default if omitted)

  • service_type (Optional[type]) – Optional service type class (ignored in Zenoh – schema-less)

  • request_callback (Optional[Callable]) – Optional async callback for responses

  • **kwargs – Additional client options

Rückgabetyp:

VyraClient

Rückgabe:

ZenohClientImpl instance

async create_action_server(name, topic_builder=None, handle_goal_request=None, handle_cancel_request=None, execution_callback=None, action_type=None, **kwargs)[Quellcode]

Create Zenoh Action Server.

Parameter:
  • name (str) – Action server name

  • topic_builder (Optional[TopicBuilder]) – TopicBuilder instance

  • handle_goal_request (Optional[Callable]) – Async callback for goal requests

  • handle_cancel_request (Optional[Callable]) – Async callback for cancel requests

  • execution_callback (Optional[Callable]) – Async callback for goal execution

  • action_type (Optional[type]) – Action type class

  • **kwargs – Additional action server options

Rückgabetyp:

VyraActionServer

Rückgabe:

ZenohActionServerImpl instance

async create_action_client(name, topic_builder=None, action_type=None, direct_response=None, feedback_callback=None, goal_response_callback=None, direct_response_callback=None, goal_callback=None, **kwargs)[Quellcode]

Create Zenoh Action Client.

Parameter:
  • name (str) – Action client name

  • topic_builder (Optional[TopicBuilder]) – TopicBuilder instance

  • action_type (Optional[type]) – Action type class

  • direct_response (Optional[Callable]) – Optional async callback for results

  • feedback_callback (Optional[Callable]) – Optional async callback for feedback

  • goal_response_callback (Optional[Callable]) – Optional async callback for goal responses

  • **kwargs – Additional action client options

  • direct_response_callback (Callable | None)

  • goal_callback (Callable | None)

Rückgabetyp:

VyraActionClient

Rückgabe:

ZenohActionClientImpl instance

require_initialization()[Quellcode]

Ensure provider is initialized.

Verursacht:

ProviderError – If not initialized

Rückgabetyp:

None

get_session()[Quellcode]

Get the underlying Zenoh session.

Rückgabetyp:

Optional[ZenohSession]

Overview

Zenoh is a high-performance publish/subscribe and query/reply protocol designed for distributed systems. It requires no central broker, works through NAT, and supports zero-copy for high-throughput scenarios.

Property

Value

Module

vyra_base.com.transport.t_zenoh

ProtocolType

ProtocolType.ZENOH

Default

Yes — first in fallback chain

Install

pip install eclipse-zenoh

Pattern

Pub/Sub + Query/Reply (queryable = server, query_client = client)

Key Classes

Class

Description

ZenohProvider

Protocol provider; registers Zenoh with ProviderRegistry

ZenohSession

Manages the Zenoh session lifecycle

SessionConfig

Configuration dataclass for the Zenoh session

SessionMode

Enum: CLIENT, PEER, ROUTER

Usage

Zenoh is used automatically via InterfaceFactory when available. To configure the session manually:

from vyra_base.com.transport.t_zenoh import ZenohProvider, ZenohSession, SessionConfig, SessionMode

config = SessionConfig(
    mode=SessionMode.PEER,
    connect=["tcp/localhost:7447"],
)
session = ZenohSession(config)
await session.open()

provider = ZenohProvider(session=session)
await provider.initialize()

For standard module use, Zenoh is initialized automatically at startup. See Quickstart — Building a VYRA Module for the full module initialization pattern.