UDS Transport Module¶
Provides Unix Domain Socket-based transport implementation with layered architecture.
Overview¶
The UDS transport module implements the AbstractProtocolProvider interface
for low-latency local inter-process communication using Unix Domain Sockets.
Architecture Layers:
communication/: Core UDS functionality (UnixSocket, socket management)
vyra_models/: VYRA abstractions (UDSCallable)
provider.py: Protocol provider implementation
Features:
✅ Stream-based local IPC
✅ Low-latency request-response
✅ Server/Client pattern with
is_callableflag✅ Automatic connection management
✅ JSON message serialization
✅ File-based socket addressing
✅ Async socket operations
Usage¶
Basic Provider Setup¶
from vyra_base.com.transport.t_uds import UDSProvider, UDS_AVAILABLE
from vyra_base.com.core.types import ProtocolType
import os
if UDS_AVAILABLE:
# Create provider
provider = UDSProvider(
module_name="my_module",
module_id="abc123",
protocol=ProtocolType.UDS
)
# Initialize with socket directory
socket_dir = "/tmp/vyra_sockets"
os.makedirs(socket_dir, exist_ok=True)
await provider.initialize(config={
"socket_dir": socket_dir
})
Create Request-Response Server¶
# Server callback
async def handle_calculation(request_data):
result = request_data["a"] + request_data["b"]
return {"sum": result}
# Create server (is_callable=True)
server = await provider.create_callable(
name="calculator",
callback=handle_calculation
)
# Socket created at: /tmp/vyra_sockets/my_module_calculator.sock
Create Request-Response Client¶
# Create client (is_callable=False)
client = await provider.create_callable(
name="calculator",
callback=None, # No callback for client
is_callable=False
)
# Make request
request = {"a": 10, "b": 32}
response = await client.call(request, timeout=2.0)
print(f"Result: {response['sum']}")
Configuration¶
Socket Configuration¶
config = {
"socket_dir": "/var/run/vyra",
"socket_permissions": 0o660, # rw-rw----
"buffer_size": 8192,
"timeout": 5.0,
"max_connections": 10
}
await provider.initialize(config=config)
Socket Naming¶
Sockets are named using the pattern: {module_name}_{service_name}.sock
# Example socket paths
provider = UDSProvider(module_name="sensor_manager")
# Server: /tmp/vyra_sockets/sensor_manager_get_data.sock
await provider.create_callable("get_data", callback=handler)
# Client connects to same path
client = await provider.create_callable(
"get_data",
callback=None,
is_callable=False
)
Server/Client Flags¶
Interface Type |
Flag |
Behavior |
|---|---|---|
Request-Response Server |
|
Creates socket server, responds to requests (callback required) |
Request-Response Client |
|
Connects to socket server, makes requests (no callback) |
Advanced Usage¶
Custom Socket Path¶
# Override default socket naming
server = await provider.create_callable(
name="custom",
callback=handler,
socket_path="/var/run/myapp/custom.sock"
)
Error Handling¶
from vyra_base.com.transport.t_uds.exceptions import (
UDSConnectionError,
UDSTimeoutError
)
try:
response = await client.call(request, timeout=1.0)
except UDSTimeoutError:
logger.error("Request timed out")
except UDSConnectionError as e:
logger.error(f"Connection failed: {e}")
Socket Cleanup¶
# Cleanup on shutdown
async def cleanup():
await provider.shutdown()
# Removes socket files
# Auto-cleanup with context manager
async with UDSProvider(...) as provider:
await provider.initialize()
# ... use provider ...
# Sockets cleaned up automatically
Performance Considerations¶
UDS provides the lowest latency for local IPC:
Transport |
Typical Latency |
Use Case |
|---|---|---|
UDS |
< 100 μs |
Local high-frequency IPC |
Redis (localhost) |
~ 200-500 μs |
Local with pub/sub features |
ROS2 (DDS) |
~ 1-5 ms |
Distributed systems, discovery |
Best Practices:
Use UDS for high-frequency local communication (e.g., control loops)
Use Redis for pub/sub patterns or remote access
Use ROS2 for distributed systems with discovery
API Reference¶
UDS Protocol Provider
Implements AbstractProtocolProvider for Unix Domain Socket transport. Provides low-latency local IPC via stream sockets.
- class vyra_base.com.transport.t_uds.provider.UDSProvider(module_name, module_id, protocol=ProtocolType.UDS)[source]¶
Bases:
AbstractProtocolProviderProtocol provider for Unix Domain Socket transport.
Features: - Low-latency local IPC - Stream-based communication - Automatic connection management - No serialization overhead (JSON)
Requirements: - Unix-like OS (Linux, macOS) - File system access to /tmp/vyra_sockets
Limitations: - Local machine only - No pub/sub pattern (use Callable for request-response)
Example
>>> # Initialize provider >>> provider = UDSProvider(ProtocolType.UDS) >>> >>> if await provider.check_availability(): ... await provider.initialize() ... ... # Create callable (server) ... async def handle_request(req): ... return {"result": req["value"] * 2} ... ... callable = await provider.create_callable( ... "calculate", ... handle_request, ... module_name="math_service" ... ) ... ... # Create client ... client = await provider.create_callable( ... "calculate", ... None, # No callback for client ... module_name="math_service" ... ) ... result = await client.call({"value": 21})
- Parameters:
module_name (str)
module_id (str)
protocol (ProtocolType)
- __init__(module_name, module_id, protocol=ProtocolType.UDS)[source]¶
Initialize UDS provider.
- Parameters:
protocol (
ProtocolType) – Protocol type (must be UDS)module_name (
str) – Default module name for interfacesmodule_id (str)
- async check_availability()[source]¶
Check if UDS transport is available.
- Returns:
Always True on Unix-like systems
- Return type:
- async create_publisher(name, topic_builder=None, message_type=None, **kwargs)[source]¶
Create UDS Publisher (datagram sockets).
- Parameters:
name (
str) – Publisher nametopic_builder (
Optional[TopicBuilder]) – TopicBuilder instance**kwargs – Additional publisher options (module_name override)
- Return type:
- Returns:
UdsPublisherImpl instance
- async create_subscriber(name, topic_builder=None, subscriber_callback=None, message_type=None, **kwargs)[source]¶
Create UDS Subscriber (datagram sockets).
- Parameters:
- Return type:
- Returns:
UdsSubscriberImpl instance
- async create_server(name, topic_builder=None, response_callback=None, service_type=None, **kwargs)[source]¶
Create UDS Server (stream sockets).
- Parameters:
- Return type:
- Returns:
UdsServerImpl instance
- async create_client(name, topic_builder=None, service_type=None, request_callback=None, **kwargs)[source]¶
Create UDS Client (stream sockets).
- Parameters:
- Return type:
- Returns:
UdsClientImpl instance
- async create_action_server(name, topic_builder=None, handle_goal_request=None, handle_cancel_request=None, execution_callback=None, action_type=None, **kwargs)[source]¶
Create UDS Action Server (stream sockets + state messages).
- Parameters:
name (
str) – Action server nametopic_builder (
Optional[TopicBuilder]) – TopicBuilder instancehandle_goal_request (
Optional[Callable[[Any],Awaitable[bool]]]) – Async callback for goal requestshandle_cancel_request (
Optional[Callable[[Any],Awaitable[bool]]]) – Async callback for cancel requestsexecution_callback (
Optional[Callable[[Any],Awaitable[bool]]]) – Async callback for goal execution**kwargs – Additional action server options (module_name override)
- Return type:
- Returns:
UdsActionServerImpl instance
- async create_action_client(name, topic_builder=None, action_type=None, direct_response_callback=None, feedback_callback=None, goal_callback=None, **kwargs)[source]¶
Create UDS Action Client.
- Parameters:
name (
str) – Action client nametopic_builder (
Optional[TopicBuilder]) – TopicBuilder instancedirect_response_callback (
Optional[Callable[[Any],Awaitable[None]]]) – Optional async callback for resultsfeedback_callback (
Optional[Callable[[Any],Awaitable[None]]]) – Optional async callback for feedbackgoal_callback (
Optional[Callable[[Any],Awaitable[None]]]) – Optional async callback for goal responses**kwargs – Additional action client options (module_name override)
- Return type:
- Returns:
UdsActionClientImpl instance
UDS VYRA Models Layer
VYRA abstractions for Unix Domain Socket communication.
Legacy patterns (deprecated): - UDSCallable: Request-response pattern via UDS - UDSSpeaker: Publish-subscribe pattern via UDS - UDSJob: Long-running task pattern via UDS
Unified transport layer: - Publisher/Subscriber (datagram sockets) - Server/Client (stream sockets) - ActionServer/ActionClient (stream sockets + state messages)
- Usage:
>>> from vyra_base.com.transport.t_uds.vyra_models import UdsServerImpl >>> >>> # Server side >>> async def handle_request(request): ... return {"result": request["value"] * 2} >>> >>> server = UdsServerImpl( ... name="calculate", ... topic_builder=builder, ... response_callback=handle_request, ... service_type=CalcService, ... module_name="math_service" ... ) >>> await server.initialize()
- class vyra_base.com.transport.t_uds.vyra_models.VyraPublisherImpl(name, topic_builder, message_type, module_name='vyra', **kwargs)[source]¶
Bases:
VyraPublisherUnix Domain Socket publisher implementation using datagram sockets.
Pattern: Sends messages to subscribers via datagram socket. Each subscriber listens on their own socket path. Publisher maintains list of subscriber paths (broadcast pattern).
Socket path: /tmp/vyra_sockets/pub_{module}_{topic}.sock
- Parameters:
name (str)
topic_builder (TopicBuilder)
message_type (type)
module_name (str)
- class vyra_base.com.transport.t_uds.vyra_models.VyraSubscriberImpl(name, topic_builder, subscriber_callback, message_type, module_name='vyra', **kwargs)[source]¶
Bases:
VyraSubscriberUnix Domain Socket subscriber implementation using datagram sockets.
Pattern: Listens on own socket for messages from publishers. Socket path: /tmp/vyra_sockets/sub_{module}_{topic}_{instance_id}.sock
- Parameters:
- __init__(name, topic_builder, subscriber_callback, message_type, module_name='vyra', **kwargs)[source]¶
- property interface_type: InterfaceType¶
Get interface type.
- class vyra_base.com.transport.t_uds.vyra_models.VyraServerImpl(name, topic_builder, response_callback, service_type, module_name='vyra', **kwargs)[source]¶
Bases:
VyraServerUnix Domain Socket server implementation using stream sockets.
Pattern: Listens for connections, receives request, sends response. Socket path: /tmp/vyra_sockets/srv_{module}_{service}.sock
- Parameters:
- __init__(name, topic_builder, response_callback, service_type, module_name='vyra', **kwargs)[source]¶
- property interface_type: InterfaceType¶
Get interface type.
- class vyra_base.com.transport.t_uds.vyra_models.VyraClientImpl(name, topic_builder, request_callback, service_type=None, module_name='vyra', **kwargs)[source]¶
Bases:
VyraClientUnix Domain Socket client implementation using stream sockets.
Pattern: Connects to server socket, sends request, receives response. Server socket path: /tmp/vyra_sockets/srv_{module}_{service}.sock
- Parameters:
- __init__(name, topic_builder, request_callback, service_type=None, module_name='vyra', **kwargs)[source]¶
- property interface_type: InterfaceType¶
Get interface type.
- class vyra_base.com.transport.t_uds.vyra_models.VyraActionServerImpl(name, topic_builder, handle_goal_request, handle_cancel_request, execution_callback, action_type, module_name='vyra', **kwargs)[source]¶
Bases:
VyraActionServerUnix Domain Socket action server using stream sockets.
Pattern: - Main socket: {VYRA_SOCKET_DIR}/{action}.sock - Receives: goal_request | cancel_request | feedback_subscribe - Per Goal: Creates dedicated feedback socket at {module}_{action}_{goal_id}_feedback.sock
- Parameters:
- __init__(name, topic_builder, handle_goal_request, handle_cancel_request, execution_callback, action_type, module_name='vyra', **kwargs)[source]¶
- async publish_feedback(goal_id, feedback)[source]¶
Publish feedback for active goal.
TODO: Broadcast to subscribers via dedicated feedback socket.
- property interface_type: InterfaceType¶
Get interface type.
- class vyra_base.com.transport.t_uds.vyra_models.VyraActionClientImpl(name, topic_builder, direct_response, feedback_callback, goal_response_callback, action_type=None, module_name='vyra', **kwargs)[source]¶
Bases:
VyraActionClientUnix Domain Socket action client using stream sockets.
Pattern: - Connects to server at {VYRA_SOCKET_DIR}/{module}_{action}.sock - Sends goal_request, receives goal_id and feedback_socket_path - Optionally connects to feedback_socket for streaming updates
- Parameters:
- __init__(name, topic_builder, direct_response, feedback_callback, goal_response_callback, action_type=None, module_name='vyra', **kwargs)[source]¶
- property interface_type: InterfaceType¶
Get interface type.
Dependencies¶
asyncio(Async I/O, built-in)socket(Unix sockets, built-in)json(Serialization, built-in)
See Also¶
Interface Factory - InterfaceFactory for protocol-agnostic usage
ROS2 Transport Module - ROS2 transport provider
Redis Transport Module - Redis transport provider