"""
InterfaceBuilder — Static helper for creating transport interfaces.
Centralises protocol-dispatch logic that was previously spread throughout
``VyraEntity.set_interfaces``. Each ``create_*`` classmethod accepts a
:class:`~vyra_base.defaults.entries.FunctionConfigEntry` and creates the
appropriate transport handles for **all** protocols the entry's *tags*
request (or for all available protocols when *tags* is empty).
Tag semantics
-------------
- **Empty tags** → register on every available / applicable protocol
(backward-compatible default).
- **Non-empty tags** → register *only* on the explicitly listed protocols.
Example::
# tags=["ros2"] → ROS2 only
# tags=["zenoh"] → Zenoh only
# tags=["ros2","zenoh"] → both
# tags=[] → all applicable (backward compat)
"""
from __future__ import annotations
import logging
from typing import Any
from vyra_base.com import InterfaceFactory, ProtocolType
from vyra_base.com.providers.provider_registry import ProviderRegistry
from vyra_base.defaults.entries import (
FunctionConfigEntry,
FunctionConfigTags,
)
logger = logging.getLogger(__name__)
[docs]
class InterfaceBuilder:
"""
Static factory that creates transport interfaces for a
:class:`~vyra_base.defaults.entries.FunctionConfigEntry`.
All methods are classmethods — no instance state is held.
"""
# ── Tag helpers ───────────────────────────────────────────────────────────
@classmethod
def _wants(cls, tags: list[FunctionConfigTags], protocol: FunctionConfigTags) -> bool:
"""Return True if *tags* is empty (default to all) or contains *protocol*."""
return not tags or protocol in tags
[docs]
@classmethod
def wants_ros2(cls, tags: list[FunctionConfigTags]) -> bool:
return cls._wants(tags, FunctionConfigTags.ROS2)
[docs]
@classmethod
def wants_zenoh(cls, tags: list[FunctionConfigTags]) -> bool:
return cls._wants(tags, FunctionConfigTags.ZENOH)
[docs]
@classmethod
def wants_redis(cls, tags: list[FunctionConfigTags]) -> bool:
return cls._wants(tags, FunctionConfigTags.REDIS)
[docs]
@classmethod
def wants_uds(cls, tags: list[FunctionConfigTags]) -> bool:
return cls._wants(tags, FunctionConfigTags.UDS)
[docs]
@classmethod
def has_ros2_type(cls, interfacetypes: Any) -> bool:
"""Return True if *interfacetypes* contains at least one actual Python class (ROS2 type).
When a proto-only interface is configured, all entries in the list are strings.
Only if at least one entry is a resolved class should we attempt ROS2 creation.
"""
if interfacetypes is None:
return False
if isinstance(interfacetypes, list):
return any(not isinstance(t, str) for t in interfacetypes)
return not isinstance(interfacetypes, str)
# ── Service (request / response) ─────────────────────────────────────────
[docs]
@classmethod
async def create_service(
cls,
setting: FunctionConfigEntry,
callbacks: dict[str, Any] | None,
node: Any,
ros2_available: bool,
registry: ProviderRegistry | None = None,
) -> None:
"""
Create service servers for all protocols the entry's tags request.
:param setting: Interface configuration entry.
:param callbacks: Dict with key ``'response'`` holding the callback.
:param node: ROS2 node (may be ``None`` when ROS2 is unavailable).
:param ros2_available: Whether ROS2 is up and a node is present.
:param registry: Optional pre-created :class:`ProviderRegistry` instance.
A new one is created when *None* (avoids repeated look-ups
when called for many interfaces in one loop).
"""
if registry is None:
registry = ProviderRegistry()
name = setting.functionname
response_cb = callbacks.get("response") if callbacks else None
tags = setting.tags
# ROS2
if ros2_available and cls.wants_ros2(tags) and cls.has_ros2_type(setting.interfacetypes):
try:
await InterfaceFactory.create_server(
name=name,
response_callback=response_cb,
protocols=[ProtocolType.ROS2],
service_type=setting.interfacetypes,
node=node,
)
logger.info(f"✅ ROS2 service created: {name}")
except Exception as exc:
logger.warning(f"⚠️ ROS2 service '{name}' failed: {exc}")
# Zenoh
if registry.is_available(ProtocolType.ZENOH) and cls.wants_zenoh(tags):
try:
result = await InterfaceFactory.create_server(
name=name,
response_callback=response_cb,
protocols=[ProtocolType.ZENOH],
service_type=setting.interfacetypes,
)
if result is not None:
logger.info(f"✅ Zenoh service created: {name}")
else:
logger.debug(
f"⏳ Zenoh service '{name}' pending "
f"— no callback bound yet (bind callback first)"
)
except Exception as exc:
logger.warning(f"⚠️ Zenoh service '{name}' failed: {exc}")
# Redis
if registry.is_available(ProtocolType.REDIS) and cls.wants_redis(tags):
try:
result = await InterfaceFactory.create_server(
name=name,
response_callback=response_cb,
protocols=[ProtocolType.REDIS],
service_type=setting.interfacetypes,
)
if result is not None:
logger.info(f"✅ Redis service created: {name}")
else:
logger.debug(
f"⏳ Redis service '{name}' pending "
f"— no callback bound yet (bind callback first)"
)
except Exception as exc:
logger.warning(f"⚠️ Redis service '{name}' failed: {exc}")
# UDS
if registry.is_available(ProtocolType.UDS) and cls.wants_uds(tags):
try:
result = await InterfaceFactory.create_server(
name=name,
response_callback=response_cb,
protocols=[ProtocolType.UDS],
service_type=setting.interfacetypes,
)
if result is not None:
logger.info(f"✅ UDS service created: {name}")
else:
logger.debug(
f"⏳ UDS service '{name}' pending "
f"— no callback bound yet (bind callback first)"
)
except Exception as exc:
logger.warning(f"⚠️ UDS service '{name}' failed: {exc}")
# ── Action server ─────────────────────────────────────────────────────────
[docs]
@classmethod
async def create_action(
cls,
setting: FunctionConfigEntry,
callbacks: dict[str, Any] | None,
node: Any,
ros2_available: bool,
registry: ProviderRegistry | None = None,
) -> None:
"""
Create action servers for all protocols the entry's tags request.
Tries each tagged protocol in order (ROS2, Zenoh, Redis, UDS).
A failure on one protocol is logged as a warning and the next is attempted.
:raises ValueError: When callbacks are missing (required for action servers).
"""
if registry is None:
registry = ProviderRegistry()
name = setting.functionname
if not callbacks:
msg = (
f"❌ ActionServer '{name}' has no callbacks defined. "
"Use @remote_actionServer.on_goal/on_cancel/execute decorators."
)
logger.error(msg)
raise ValueError(msg)
tags = setting.tags
on_goal = callbacks.get("on_goal")
on_cancel = callbacks.get("on_cancel")
execute = callbacks.get("execute")
logger.debug(
f"ActionServer '{name}' callbacks — "
f"on_goal={bool(on_goal)}, "
f"on_cancel={bool(on_cancel)}, "
f"execute={bool(execute)}"
)
# ROS2
if ros2_available and cls.wants_ros2(tags) and cls.has_ros2_type(setting.interfacetypes):
try:
await InterfaceFactory.create_action_server(
name=name,
handle_goal_request=on_goal,
handle_cancel_request=on_cancel,
execution_callback=execute,
protocols=[ProtocolType.ROS2],
action_type=setting.interfacetypes,
node=node,
namespace=setting.namespace,
)
logger.info(f"✅ ROS2 action server created: {name}")
except Exception as exc:
logger.warning(f"⚠️ ROS2 action server '{name}' failed: {exc}")
# Zenoh
if registry.is_available(ProtocolType.ZENOH) and cls.wants_zenoh(tags):
try:
result = await InterfaceFactory.create_action_server(
name=name,
handle_goal_request=on_goal,
handle_cancel_request=on_cancel,
execution_callback=execute,
protocols=[ProtocolType.ZENOH],
action_type=setting.interfacetypes,
namespace=setting.namespace,
)
if result is not None:
logger.info(f"✅ Zenoh action server created: {name}")
else:
logger.debug(
f"⏳ Zenoh action server '{name}' pending "
f"— no callback bound yet"
)
except Exception as exc:
logger.warning(f"⚠️ Zenoh action server '{name}' failed: {exc}")
# Redis
if registry.is_available(ProtocolType.REDIS) and cls.wants_redis(tags):
try:
result = await InterfaceFactory.create_action_server(
name=name,
handle_goal_request=on_goal,
handle_cancel_request=on_cancel,
execution_callback=execute,
protocols=[ProtocolType.REDIS],
action_type=setting.interfacetypes,
namespace=setting.namespace,
)
if result is not None:
logger.info(f"✅ Redis action server created: {name}")
else:
logger.debug(
f"⏳ Redis action server '{name}' pending "
f"— no callback bound yet"
)
except Exception as exc:
logger.warning(f"⚠️ Redis action server '{name}' failed: {exc}")
# UDS
if registry.is_available(ProtocolType.UDS) and cls.wants_uds(tags):
try:
result = await InterfaceFactory.create_action_server(
name=name,
handle_goal_request=on_goal,
handle_cancel_request=on_cancel,
execution_callback=execute,
protocols=[ProtocolType.UDS],
action_type=setting.interfacetypes,
namespace=setting.namespace,
)
if result is not None:
logger.info(f"✅ UDS action server created: {name}")
else:
logger.debug(
f"⏳ UDS action server '{name}' pending "
f"— no callback bound yet"
)
except Exception as exc:
logger.warning(f"⚠️ UDS action server '{name}' failed: {exc}")
# ── Publisher (one-way message) ───────────────────────────────────────────
[docs]
@classmethod
async def create_publisher(
cls,
setting: FunctionConfigEntry,
node: Any,
ros2_available: bool,
) -> None:
"""
Create publishers for all protocols the entry's tags request.
"""
name = setting.functionname
tags = setting.tags
# Periodic config
periodic = setting.periodic is not None
if setting.periodic is None:
periodic_caller = None
else:
periodic_caller = setting.periodic.caller
if setting.periodic is None:
periodic_interval = None
else:
periodic_interval = setting.periodic.interval
# ROS2
if ros2_available and cls.wants_ros2(tags) and cls.has_ros2_type(setting.interfacetypes):
try:
await InterfaceFactory.create_publisher(
name=name,
protocols=[ProtocolType.ROS2],
message_type=setting.interfacetypes,
node=node,
is_publisher=True,
qos_profile=setting.qosprofile,
description=setting.description,
periodic=periodic,
interval_time=periodic_interval,
periodic_caller=periodic_caller,
)
logger.info(f"✅ ROS2 publisher created: {name}")
except Exception as exc:
logger.warning(
f"⚠️ ROS2 publisher '{name}' could not be created (non-fatal): {exc}"
)
elif ros2_available and cls.wants_ros2(tags):
logger.debug(f"⏭️ Skipping ROS2 publisher '{name}': no resolved ROS2 type in interfacetypes")
else:
logger.debug(f"⏭️ Skipping ROS2 publisher '{name}': tags={tags}")
# ── Late-binding upgrade (callback added after initial registration) ───────
[docs]
@classmethod
async def upgrade_service(
cls,
setting: FunctionConfigEntry,
new_callbacks: dict[str, Any],
registry: ProviderRegistry | None = None,
) -> None:
"""
Upgrade a previously registered service that had no callback with a new one.
Called when a decorated callback is bound after ``set_interfaces`` ran
(two-phase / blueprint initialisation pattern).
"""
if registry is None:
registry = ProviderRegistry()
name = setting.functionname
response_cb = new_callbacks.get("response")
if registry.is_available(ProtocolType.ZENOH):
try:
result = await InterfaceFactory.create_server(
name=name,
response_callback=response_cb,
protocols=[ProtocolType.ZENOH],
service_type=setting.interfacetypes,
)
if result is not None:
logger.info(f"✅ Zenoh server upgraded: {name}")
else:
logger.warning(f"⚠️ Zenoh upgrade returned None for '{name}'")
except Exception as exc:
logger.warning(f"⚠️ Failed to upgrade Zenoh server '{name}': {exc}")