Zenoh Transport¶
The Zenoh transport is the default and recommended protocol for VYRA modules.
Zenoh Transport Module
Provides Zenoh-based transport implementation with layered architecture:
- Layers:
communication/: Core Zenoh functionality (Query/Reply, Pub/Sub, Tasks)
vyra_models/: VYRA abstractions (ZenohCallable, ZenohPublisher, ZenohJob)
session.py: Zenoh session management and lifecycle
provider.py: Interface layer for VYRA integration
- Features:
Query/Reply pattern for request-response (ZenohCallable)
Pub/Sub for topic-based messaging (ZenohPublisher)
Task-based long-running operations (ZenohJob)
Zero-copy and efficient serialization
Router-based architecture for scalability
Built-in discovery and fault tolerance
- Usage:
from vyra_base.com.transport.t_zenoh import ZenohProvider, ZENOH_AVAILABLE
- if ZENOH_AVAILABLE:
provider = ZenohProvider() await provider.initialize(config={
“mode”: “client”, “connect”: [“tcp/zenoh-router:7447”]
}) callable = await provider.create_callable(“/service”, callback) publisher = await provider.create_publisher(“/topic”)
- class vyra_base.com.transport.t_zenoh.ZenohSession(config=None)[source]
Bases:
objectManages Zenoh session for VYRA modules.
Provides: - Session lifecycle management - Connection to Zenoh router - Query/Reply, Pub/Sub, and Task primitives - Automatic reconnection
Example
>>> config = SessionConfig( ... mode=SessionMode.CLIENT, ... connect=["tcp/zenoh-router:7447"] ... ) >>> session = ZenohSession(config) >>> await session.open() >>> >>> # Use session for communication >>> pub = session.declare_publisher("/topic") >>> pub.put("Hello Zenoh") >>> >>> await session.close()
- Parameters:
config (Optional[SessionConfig])
- __init__(config=None)[source]
Initialize Zenoh session manager.
- Parameters:
config (
Optional[SessionConfig]) – Session configuration
- property is_open: bool
Check if session is open.
- property session: Any
Get underlying Zenoh session.
- async open()[source]
Open Zenoh session and connect to router.
- Returns:
True if session opened successfully
- Return type:
- declare_publisher(key_expr, **kwargs)[source]
Declare a Zenoh publisher.
- declare_subscriber(key_expr, callback, **kwargs)[source]
Declare a Zenoh subscriber.
- declare_queryable(key_expr, callback, **kwargs)[source]
Declare a Zenoh queryable (service server).
- class vyra_base.com.transport.t_zenoh.SessionConfig(mode=SessionMode.CLIENT, connect=<factory>, listen=<factory>, id=None, scouting_multicast=True, timeout_ms=5000)[source]
Bases:
objectConfiguration for Zenoh session.
- Parameters:
- mode
Session mode (peer/client/router)
- connect
List of endpoints to connect to (e.g., [“tcp/zenoh-router:7447”])
- listen
List of endpoints to listen on
- id
Optional session ID
- scouting_multicast
Enable multicast scouting for peer discovery
- timeout_ms
Timeout for operations in milliseconds
- mode: SessionMode = 'client'
- scouting_multicast: bool = True
- timeout_ms: int = 5000
- class vyra_base.com.transport.t_zenoh.SessionMode(*values)[source]
-
Zenoh session modes.
- PEER = 'peer'
- CLIENT = 'client'
- ROUTER = 'router'
- class vyra_base.com.transport.t_zenoh.VyraPublisherImpl(name, topic_builder, zenoh_session, message_type=None, **kwargs)[source]
Bases:
VyraPublisherVyra-based publisher implementation.
Wraps ZenohPublisher for consistent async interface. Zenoh is async-native, no callback adapter needed.
- Parameters:
name (str)
topic_builder (TopicBuilder)
zenoh_session (Any)
message_type (type)
- __init__(name, topic_builder, zenoh_session, message_type=None, **kwargs)[source]
- Parameters:
name (str)
topic_builder (TopicBuilder)
zenoh_session (Any)
message_type (type | None)
- async publish(message)[source]
Publish message via Zenoh.
- async cleanup()[source]
Cleanup Zenoh resources.
- class vyra_base.com.transport.t_zenoh.VyraSubscriberImpl(name, topic_builder, zenoh_session, subscriber_callback=None, message_type=None, **kwargs)[source]
Bases:
VyraSubscriberVyra-based subscriber implementation.
Wraps ZenohSubscriber for consistent async interface. Zenoh callbacks are async-native.
- Parameters:
- __init__(name, topic_builder, zenoh_session, subscriber_callback=None, message_type=None, **kwargs)[source]
- async subscribe()[source]
Start subscribing to Zenoh topic.
Note: Subscriber is already active after initialize(). This method exists for API compatibility.
- Return type:
- Returns:
True on success
- async cleanup()[source]
Cleanup Zenoh resources.
- class vyra_base.com.transport.t_zenoh.VyraServerImpl(name, topic_builder, zenoh_session, service_type=None, response_callback=None, **kwargs)[source]
Bases:
VyraServerVyra-based server implementation using Queryable.
TODO: Implement using ZenohQueryable from communication layer. Pattern: Queryable receives queries and responds with query.reply()
- Parameters:
- __init__(name, topic_builder, zenoh_session, service_type=None, response_callback=None, **kwargs)[source]
- async cleanup()[source]
Cleanup Zenoh resources.
- class vyra_base.com.transport.t_zenoh.VyraClientImpl(name, topic_builder, request_callback=None, zenoh_session=None, service_type=None, **kwargs)[source]
Bases:
VyraClientVyra-based client implementation using Query (get).
TODO: Implement using session.get() for queries. Pattern: Send query to key, receive replies from queryables
- Parameters:
- __init__(name, topic_builder, request_callback=None, zenoh_session=None, service_type=None, **kwargs)[source]
- async call(request, timeout=5.0)[source]
Send query to Zenoh server and await response.
- async cleanup()[source]
Cleanup resources (none for stateless client).
- class vyra_base.com.transport.t_zenoh.VyraActionServerImpl(name, topic_builder, handle_goal_request, handle_cancel_request, execution_callback, zenoh_session=None, action_type=None, **kwargs)[source]
Bases:
VyraActionServerVyra-based action server implementation.
Architecture: - Queryable at
{namespace}/{fn}/goalfor goal requests - Queryable at{namespace}/{fn}/cancelfor cancel requests - Publisher at{namespace}/{fn}/{goal_id}/feedbackfor feedback - Publisher at{namespace}/{fn}/{goal_id}/resultfor resultsChannel keys are built by
_action_channel()which stacks the protocol-level sub-channel on top of the config-levelnamespace/subsectionfollowing the VYRA topic convention.- Parameters:
- __init__(name, topic_builder, handle_goal_request, handle_cancel_request, execution_callback, zenoh_session=None, action_type=None, **kwargs)[source]
- async publish_feedback(goal_id, feedback)[source]
Publish feedback for active goal.
- async cleanup()[source]
Cleanup Zenoh resources.
- class vyra_base.com.transport.t_zenoh.VyraActionClientImpl(name, topic_builder, direct_response=None, feedback_callback=None, goal_response_callback=None, zenoh_session=None, action_type=None, **kwargs)[source]
Bases:
VyraActionClientVyra-based action client implementation.
Architecture: - Query to
{namespace}/{fn}/goalfor sending goals - Subscriber to{namespace}/{fn}/{goal_id}/feedbackfor feedback - Subscriber to{namespace}/{fn}/{goal_id}/resultfor result - Query to{namespace}/{fn}/cancelfor cancelingChannel keys are built by
_action_channel()following the VYRA topic naming convention.- Parameters:
- __init__(name, topic_builder, direct_response=None, feedback_callback=None, goal_response_callback=None, zenoh_session=None, action_type=None, **kwargs)[source]
- Parameters:
- async send_goal(goal)[source]
Send goal to action server.
- Parameters:
goal (
Any) – Goal message instance- Return type:
- Returns:
goal_id on success, None on failure
TODO: 1. Serialize goal 2. Query to
self._key_goal(built via _action_channel) 3. Parse response for goal_id 4. Subscribe to feedback and result topics 5. Call goal_response_callback if set
- async cleanup()[source]
Cleanup Zenoh resources.
- class vyra_base.com.transport.t_zenoh.ZenohProvider(module_name, module_id, protocol=ProtocolType.ZENOH)[source]
Bases:
AbstractProtocolProviderProtocol provider for Zenoh transport.
Features: - Efficient Pub/Sub with zero-copy capabilities - Query/Reply for request-response patterns - Router-based scalability - Built-in discovery and fault tolerance - Multi-protocol support (TCP, UDP, shared memory)
Requirements: - eclipse-zenoh Python package - Zenoh router (typically Docker service)
Example
>>> # Initialize provider >>> provider = ZenohProvider(ProtocolType.ZENOH) >>> >>> if await provider.check_availability(): ... await provider.initialize(config={ ... "mode": "client", ... "connect": ["tcp/zenoh-router:7447"] ... }) ... ... # Create callable (query/reply) ... async def handle_request(req): ... return {"result": req["value"] * 2} ... ... callable = await provider.create_callable( ... "/calculate", ... handle_request ... ) ... ... # Create publisher (pub/sub) ... publisher = await provider.create_publisher("/sensor_data")
- Parameters:
module_name (str)
module_id (str)
protocol (ProtocolType)
- __init__(module_name, module_id, protocol=ProtocolType.ZENOH)[source]
Initialize Zenoh provider.
- Parameters:
protocol (
ProtocolType) – Protocol type (must be ZENOH)module_name (str)
module_id (str)
- async check_availability()[source]
Check if Zenoh is available.
- Returns:
True if zenoh-python is installed
- Return type:
- async initialize(config=None)[source]
Initialize Zenoh provider and open session.
- Parameters:
config (
Optional[Dict[str,Any]]) – Configuration dictionary: - mode: “peer”, “client”, or “router” (default: “client”) - connect: List of endpoints (default: [“tcp/zenoh-router:7447”]) - listen: List of listen endpoints - format: Serialization format (default: “json”)- Returns:
True if initialization successful
- Return type:
- async create_publisher(name, topic_builder=None, message_type=None, **kwargs)[source]
Create Zenoh Publisher.
- Parameters:
name (
str) – Publisher nametopic_builder (
Optional[TopicBuilder]) – TopicBuilder instance**kwargs – Additional publisher options
- Return type:
- Returns:
ZenohPublisherImpl instance
- async create_subscriber(name, topic_builder=None, subscriber_callback=None, message_type=None, **kwargs)[source]
Create Zenoh Subscriber.
- Parameters:
- Return type:
- Returns:
ZenohSubscriberImpl instance
- async create_server(name, topic_builder=None, response_callback=None, service_type=None, **kwargs)[source]
Create Zenoh Server (Queryable).
- Parameters:
- Return type:
- Returns:
ZenohServerImpl instance
- async create_client(name, topic_builder=None, service_type=None, request_callback=None, **kwargs)[source]
Create Zenoh Client (Query sender).
- Parameters:
name (
str) – Client nametopic_builder (
Optional[TopicBuilder]) – TopicBuilder instance (uses provider’s default if omitted)service_type (
Optional[type]) – Optional service type class (ignored in Zenoh – schema-less)request_callback (
Optional[Callable]) – Optional async callback for responses**kwargs – Additional client options
- Return type:
- Returns:
ZenohClientImpl instance
- async create_action_server(name, topic_builder=None, handle_goal_request=None, handle_cancel_request=None, execution_callback=None, action_type=None, **kwargs)[source]
Create Zenoh Action Server.
- Parameters:
name (
str) – Action server nametopic_builder (
Optional[TopicBuilder]) – TopicBuilder instancehandle_goal_request (
Optional[Callable]) – Async callback for goal requestshandle_cancel_request (
Optional[Callable]) – Async callback for cancel requestsexecution_callback (
Optional[Callable]) – Async callback for goal execution**kwargs – Additional action server options
- Return type:
- Returns:
ZenohActionServerImpl instance
- async create_action_client(name, topic_builder=None, action_type=None, direct_response=None, feedback_callback=None, goal_response_callback=None, direct_response_callback=None, goal_callback=None, **kwargs)[source]
Create Zenoh Action Client.
- Parameters:
name (
str) – Action client nametopic_builder (
Optional[TopicBuilder]) – TopicBuilder instancedirect_response (
Optional[Callable]) – Optional async callback for resultsfeedback_callback (
Optional[Callable]) – Optional async callback for feedbackgoal_response_callback (
Optional[Callable]) – Optional async callback for goal responses**kwargs – Additional action client options
direct_response_callback (Callable | None)
goal_callback (Callable | None)
- Return type:
- Returns:
ZenohActionClientImpl instance
- require_initialization()[source]
Ensure provider is initialized.
- Raises:
ProviderError – If not initialized
- Return type:
- get_session()[source]
Get the underlying Zenoh session.
- Return type:
Overview¶
Zenoh is a high-performance publish/subscribe and query/reply protocol designed for distributed systems. It requires no central broker, works through NAT, and supports zero-copy for high-throughput scenarios.
Property |
Value |
|---|---|
Module |
|
|
|
Default |
Yes — first in fallback chain |
Install |
|
Pattern |
Pub/Sub + Query/Reply (queryable = server, query_client = client) |
Key Classes¶
Class |
Description |
|---|---|
|
Protocol provider; registers Zenoh with |
|
Manages the Zenoh session lifecycle |
|
Configuration dataclass for the Zenoh session |
|
Enum: |
Usage¶
Zenoh is used automatically via InterfaceFactory when available.
To configure the session manually:
from vyra_base.com.transport.t_zenoh import ZenohProvider, ZenohSession, SessionConfig, SessionMode
config = SessionConfig(
mode=SessionMode.PEER,
connect=["tcp/localhost:7447"],
)
session = ZenohSession(config)
await session.open()
provider = ZenohProvider(session=session)
await provider.initialize()
For standard module use, Zenoh is initialized automatically at startup. See Quickstart — Building a VYRA Module for the full module initialization pattern.