"""
Zenoh Protocol Provider
Implements AbstractProtocolProvider for Zenoh transport.
Provides efficient, scalable communication via Zenoh router.
"""
from __future__ import annotations
import asyncio
import logging
from typing import Any, Callable, Optional, Dict
from vyra_base.com.core.topic_builder import TopicBuilder
from vyra_base.com.core.types import (
ProtocolType,
VyraPublisher,
VyraSubscriber,
VyraServer,
VyraClient,
VyraActionServer,
VyraActionClient
)
from vyra_base.com.core.exceptions import (
ProtocolUnavailableError,
ProviderError,
)
from vyra_base.com.transport.t_zenoh.communication.session import ZenohSession, SessionConfig, SessionMode
from vyra_base.com.transport.t_zenoh.vyra_models import (
VyraPublisherImpl,
VyraSubscriberImpl,
VyraServerImpl,
VyraClientImpl,
VyraActionServerImpl,
VyraActionClientImpl
)
from vyra_base.com.transport.t_zenoh.communication.serializer import SerializationFormat
from vyra_base.com.providers.protocol_provider import AbstractProtocolProvider
from vyra_base.com.core.callback_registry import CallbackRegistry
logger = logging.getLogger(__name__)
# Check if zenoh is available
try:
import zenoh
ZENOH_AVAILABLE = True
except ImportError:
ZENOH_AVAILABLE = False
logger.warning(
"⚠️ zenoh-python not available. Zenoh transport disabled. "
"Install via: pip install eclipse-zenoh"
)
[docs]
class ZenohProvider(AbstractProtocolProvider):
"""
Protocol provider for Zenoh transport.
Features:
- Efficient Pub/Sub with zero-copy capabilities
- Query/Reply for request-response patterns
- Router-based scalability
- Built-in discovery and fault tolerance
- Multi-protocol support (TCP, UDP, shared memory)
Requirements:
- eclipse-zenoh Python package
- Zenoh router (typically Docker service)
Example:
>>> # Initialize provider
>>> provider = ZenohProvider(ProtocolType.ZENOH)
>>>
>>> if await provider.check_availability():
... await provider.initialize(config={
... "mode": "client",
... "connect": ["tcp/zenoh-router:7447"]
... })
...
... # Create callable (query/reply)
... async def handle_request(req):
... return {"result": req["value"] * 2}
...
... callable = await provider.create_callable(
... "/calculate",
... handle_request
... )
...
... # Create publisher (pub/sub)
... publisher = await provider.create_publisher("/sensor_data")
"""
[docs]
def __init__(
self,
module_name: str,
module_id: str,
protocol: ProtocolType = ProtocolType.ZENOH
):
"""
Initialize Zenoh provider.
Args:
protocol: Protocol type (must be ZENOH)
"""
super().__init__(protocol)
self._session: Optional[ZenohSession] = None
self._format = SerializationFormat.JSON
# Topic builder for consistent naming
self._topic_builder = TopicBuilder(module_name, module_id)
[docs]
async def check_availability(self) -> bool:
"""
Check if Zenoh is available.
Returns:
bool: True if zenoh-python is installed
"""
self._available = ZENOH_AVAILABLE
if not self._available:
logger.warning("Zenoh not available")
else:
logger.debug("✅ Zenoh is available")
return self._available
[docs]
async def initialize(self, config: Optional[Dict[str, Any]] = None) -> bool:
"""
Initialize Zenoh provider and open session.
Args:
config: Configuration dictionary:
- mode: "peer", "client", or "router" (default: "client")
- connect: List of endpoints (default: ["tcp/zenoh-router:7447"])
- listen: List of listen endpoints
- format: Serialization format (default: "json")
Returns:
bool: True if initialization successful
"""
if self._initialized:
logger.warning("Zenoh provider already initialized")
return True
if not self._available:
raise ProtocolUnavailableError("Zenoh is not available")
try:
logger.info("🔧 Initializing Zenoh provider...")
# Parse configuration
config = config or {}
self._config.update(config)
mode_str = config.get("mode", "client")
mode = SessionMode(mode_str)
connect = config.get("connect", ["tcp/zenoh-router:7447"])
listen = config.get("listen", [])
format_str = config.get("format", "json")
self._format = SerializationFormat(format_str)
# Create session config
session_config = SessionConfig(
mode=mode,
connect=connect,
listen=listen
)
# Create and open session
self._session = ZenohSession(session_config)
await self._session.open()
self._initialized = True
logger.info("✅ Zenoh provider initialized")
try:
session_info = self._session.session.info()
# zenoh 1.x: zid is a property, not callable
zid = session_info.zid() if callable(session_info.zid) else session_info.zid
logger.info(f"Session ID: {zid}")
except Exception:
logger.info("✅ Zenoh provider initialized (session ID unavailable)")
return True
except Exception as e:
logger.error(f"❌ Failed to initialize Zenoh provider: {e}")
raise ProviderError(f"Zenoh initialization failed: {e}")
[docs]
async def shutdown(self) -> None:
"""Shutdown Zenoh provider and close session."""
if not self._initialized:
logger.debug("Zenoh provider not initialized, nothing to shutdown")
return
try:
logger.info("🛑 Shutting down Zenoh provider...")
if self._session:
await self._session.close()
self._session = None
self._initialized = False
logger.info("✅ Zenoh provider shutdown complete")
except Exception as e:
logger.error(f"❌ Error shutting down Zenoh provider: {e}")
raise
# ============================================================================
# UNIFIED TRANSPORT LAYER METHODS
# ============================================================================
[docs]
async def create_publisher(
self,
name: str,
topic_builder: Optional[TopicBuilder] = None,
message_type: Optional[type] = None,
**kwargs
) -> VyraPublisher:
"""
Create Zenoh Publisher.
Args:
name: Publisher name
topic_builder: TopicBuilder instance
message_type: Message type class
**kwargs: Additional publisher options
Returns:
ZenohPublisherImpl instance
"""
self.require_initialization()
try:
if not self._session or not self._session.is_open:
raise ProviderError("Zenoh session not open")
effective_topic_builder = topic_builder or self._topic_builder
if message_type is None:
message_type = effective_topic_builder.load_interface_type(
name, self.protocol
)
if message_type is None:
raise ProviderError(f"Message type for publisher '{name}' not found in topic builder")
publisher = VyraPublisherImpl(
name=name,
topic_builder=effective_topic_builder,
zenoh_session=self._session.session,
message_type=message_type,
**kwargs
)
await publisher.initialize()
logger.info(f"✅ Zenoh Publisher created: {name}")
return publisher
except Exception as e:
logger.error(f"❌ Failed to create Zenoh Publisher '{name}': {e}")
raise ProviderError(f"Failed to create publisher: {e}")
[docs]
async def create_subscriber(
self,
name: str,
topic_builder: Optional[TopicBuilder] = None,
subscriber_callback: Optional[Callable] = None,
message_type: Optional[type] = None,
**kwargs
) -> VyraSubscriber:
"""
Create Zenoh Subscriber.
Args:
name: Subscriber name
topic_builder: TopicBuilder instance
subscriber_callback: Async callback for received messages
message_type: Message type class
**kwargs: Additional subscriber options
Returns:
ZenohSubscriberImpl instance
"""
self.require_initialization()
try:
if not self._session or not self._session.is_open:
raise ProviderError("Zenoh session not open")
effective_topic_builder = topic_builder or self._topic_builder
if message_type is None:
message_type = effective_topic_builder.load_interface_type(
name, self.protocol
)
if message_type is None:
raise ProviderError(f"Message type for subscriber '{name}' not found in topic builder")
if subscriber_callback is None:
_bp = CallbackRegistry.get_blueprint(name)
if _bp and _bp.is_bound():
subscriber_callback = _bp.callback
else:
raise ProviderError(
f"No subscriber_callback provided for '{name}' and no bound blueprint found in CallbackRegistry"
)
subscriber = VyraSubscriberImpl(
name=name,
topic_builder=effective_topic_builder,
subscriber_callback=subscriber_callback,
zenoh_session=self._session.session,
message_type=message_type,
**kwargs
)
await subscriber.initialize()
await subscriber.subscribe()
logger.info(f"✅ Zenoh Subscriber created: {name}")
return subscriber
except Exception as e:
logger.error(f"❌ Failed to create Zenoh Subscriber '{name}': {e}")
raise ProviderError(f"Failed to create subscriber: {e}")
[docs]
async def create_server(
self,
name: str,
topic_builder: Optional[TopicBuilder] = None,
response_callback: Optional[Callable] = None,
service_type: Optional[type] = None,
**kwargs
) -> VyraServer:
"""
Create Zenoh Server (Queryable).
Args:
name: Server name
topic_builder: TopicBuilder instance
response_callback: Async callback for handling requests
service_type: Service type class
**kwargs: Additional server options
Returns:
ZenohServerImpl instance
"""
self.require_initialization()
try:
if not self._session or not self._session.is_open:
raise ProviderError("Zenoh session not open")
effective_topic_builder = topic_builder or self._topic_builder
if service_type is None:
service_type = effective_topic_builder.load_interface_type(
name, self.protocol
)
if service_type is None:
raise ProviderError(f"Service type for server '{name}' not found in topic builder")
if response_callback is None:
_bp = CallbackRegistry.get_blueprint(name)
if _bp and _bp.is_bound():
response_callback = _bp.callback
else:
raise ProviderError(
f"No response_callback provided for server '{name}' and no bound blueprint found in CallbackRegistry"
)
server = VyraServerImpl(
name=name,
topic_builder=effective_topic_builder,
response_callback=response_callback,
zenoh_session=self._session.session,
service_type=service_type,
**kwargs
)
await server.initialize()
await server.serve()
logger.info(f"✅ Zenoh Server created: {name}")
return server
except Exception as e:
logger.error(f"❌ Failed to create Zenoh Server '{name}': {e}")
raise ProviderError(f"Failed to create server: {e}")
[docs]
async def create_client(
self,
name: str,
topic_builder: Optional[TopicBuilder] = None,
service_type: Optional[type] = None,
request_callback: Optional[Callable] = None,
**kwargs
) -> VyraClient:
"""
Create Zenoh Client (Query sender).
Args:
name: Client name
topic_builder: TopicBuilder instance (uses provider's default if omitted)
service_type: Optional service type class (ignored in Zenoh – schema-less)
request_callback: Optional async callback for responses
**kwargs: Additional client options
Returns:
ZenohClientImpl instance
"""
self.require_initialization()
try:
if not self._session or not self._session.is_open:
raise ProviderError("Zenoh session not open")
# Use provider's topic_builder if none provided
effective_topic_builder = topic_builder or self._topic_builder
if service_type is None:
service_type = effective_topic_builder.load_interface_type(
name, self.protocol
)
client = VyraClientImpl(
name=name,
topic_builder=effective_topic_builder,
request_callback=request_callback,
zenoh_session=self._session.session,
service_type=service_type,
**kwargs
)
await client.initialize()
logger.info(f"✅ Zenoh Client created: {name}")
return client
except Exception as e:
logger.error(f"❌ Failed to create Zenoh Client '{name}': {e}")
raise ProviderError(f"Failed to create client: {e}")
[docs]
async def create_action_server(
self,
name: str,
topic_builder: Optional[TopicBuilder] = None,
handle_goal_request: Optional[Callable] = None,
handle_cancel_request: Optional[Callable] = None,
execution_callback: Optional[Callable] = None,
action_type: Optional[type] = None,
**kwargs
) -> VyraActionServer:
"""
Create Zenoh Action Server.
Args:
name: Action server name
topic_builder: TopicBuilder instance
handle_goal_request: Async callback for goal requests
handle_cancel_request: Async callback for cancel requests
execution_callback: Async callback for goal execution
action_type: Action type class
**kwargs: Additional action server options
Returns:
ZenohActionServerImpl instance
"""
self.require_initialization()
try:
if not self._session or not self._session.is_open:
raise ProviderError("Zenoh session not open")
effective_topic_builder = topic_builder or self._topic_builder
if action_type is None:
action_type = effective_topic_builder.load_interface_type(
name, self.protocol
)
if action_type is None:
raise ProviderError(f"Action type for action server '{name}' not found in topic builder")
if execution_callback is None:
_bp = CallbackRegistry.get_blueprint(name)
if _bp and _bp.is_bound():
execution_callback = _bp.callback
else:
raise ProviderError(
f"No execution_callback provided for action server '{name}' and no bound blueprint found in CallbackRegistry"
)
if handle_goal_request is None:
_bp = CallbackRegistry.get_blueprint(name)
if _bp is not None:
handle_goal_request = getattr(_bp, 'get_callback', lambda x: None)('on_goal')
if handle_cancel_request is None:
_bp = CallbackRegistry.get_blueprint(name)
if _bp is not None:
handle_cancel_request = getattr(_bp, 'get_callback', lambda x: None)('on_cancel')
action_server = VyraActionServerImpl(
name=name,
topic_builder=effective_topic_builder,
handle_goal_request=handle_goal_request,
handle_cancel_request=handle_cancel_request,
execution_callback=execution_callback,
zenoh_session=self._session.session,
action_type=action_type,
**kwargs
)
await action_server.initialize()
logger.info(f"✅ Zenoh Action Server created: {name}")
return action_server
except Exception as e:
logger.error(f"❌ Failed to create Zenoh Action Server '{name}': {e}")
raise ProviderError(f"Failed to create action server: {e}")
[docs]
async def create_action_client(
self,
name: str,
topic_builder: Optional[TopicBuilder] = None,
action_type: Optional[type] = None,
direct_response: Optional[Callable] = None,
feedback_callback: Optional[Callable] = None,
goal_response_callback: Optional[Callable] = None,
# Factory-style aliases (InterfaceFactory.create_action_client uses these names)
direct_response_callback: Optional[Callable] = None,
goal_callback: Optional[Callable] = None,
**kwargs
) -> VyraActionClient:
"""
Create Zenoh Action Client.
Args:
name: Action client name
topic_builder: TopicBuilder instance
action_type: Action type class
direct_response: Optional async callback for results
feedback_callback: Optional async callback for feedback
goal_response_callback: Optional async callback for goal responses
**kwargs: Additional action client options
Returns:
ZenohActionClientImpl instance
"""
# Resolve factory-style aliases
direct_response = direct_response or direct_response_callback
goal_response_callback = goal_response_callback or goal_callback
self.require_initialization()
try:
if not self._session or not self._session.is_open:
raise ProviderError("Zenoh session not open")
effective_topic_builder = topic_builder or self._topic_builder
if action_type is None:
action_type = effective_topic_builder.load_interface_type(
name, self.protocol
)
action_client = VyraActionClientImpl(
name=name,
topic_builder=effective_topic_builder,
direct_response=direct_response,
feedback_callback=feedback_callback,
goal_response_callback=goal_response_callback,
zenoh_session=self._session.session,
action_type=action_type,
**kwargs
)
await action_client.initialize()
logger.info(f"✅ Zenoh Action Client created: {name}")
return action_client
except Exception as e:
logger.error(f"❌ Failed to create Zenoh Action Client '{name}': {e}")
raise ProviderError(f"Failed to create action client: {e}")
[docs]
def require_initialization(self) -> None:
"""
Ensure provider is initialized.
Raises:
ProviderError: If not initialized
"""
if not self._initialized:
raise ProviderError("Zenoh provider not initialized. Call initialize() first.")
if not self._session or not self._session.is_open:
raise ProviderError("Zenoh session not open")
[docs]
def get_session(self) -> Optional[ZenohSession]:
"""Get the underlying Zenoh session."""
return self._session