UDS Transport Module¶
Provides Unix Domain Socket-based transport implementation with layered architecture.
Overview¶
The UDS transport module implements the AbstractProtocolProvider interface
for low-latency local inter-process communication using Unix Domain Sockets.
Architecture Layers:
communication/: Core UDS functionality (UnixSocket, socket management)
vyra_models/: VYRA abstractions (UDSCallable)
provider.py: Protocol provider implementation
Features:
✅ Stream-based local IPC
✅ Low-latency request-response
✅ Server/Client pattern with
is_callableflag✅ Automatic connection management
✅ JSON message serialization
✅ File-based socket addressing
✅ Async socket operations
Usage¶
Basic Provider Setup¶
from vyra_base.com.transport.t_uds import UDSProvider, UDS_AVAILABLE
from vyra_base.com.core.types import ProtocolType
import os
if UDS_AVAILABLE:
# Create provider
provider = UDSProvider(
module_name="my_module",
module_id="abc123",
protocol=ProtocolType.UDS
)
# Initialize with socket directory
socket_dir = "/tmp/vyra_sockets"
os.makedirs(socket_dir, exist_ok=True)
await provider.initialize(config={
"socket_dir": socket_dir
})
Create Request-Response Server¶
# Server callback
async def handle_calculation(request_data):
result = request_data["a"] + request_data["b"]
return {"sum": result}
# Create server (is_callable=True)
server = await provider.create_callable(
name="calculator",
callback=handle_calculation
)
# Socket created at: /tmp/vyra_sockets/my_module_calculator.sock
Create Request-Response Client¶
# Create client (is_callable=False)
client = await provider.create_callable(
name="calculator",
callback=None, # No callback for client
is_callable=False
)
# Make request
request = {"a": 10, "b": 32}
response = await client.call(request, timeout=2.0)
print(f"Result: {response['sum']}")
Configuration¶
Socket Configuration¶
config = {
"socket_dir": "/var/run/vyra",
"socket_permissions": 0o660, # rw-rw----
"buffer_size": 8192,
"timeout": 5.0,
"max_connections": 10
}
await provider.initialize(config=config)
Socket Naming¶
Sockets are named using the pattern: {module_name}_{service_name}.sock
# Example socket paths
provider = UDSProvider(module_name="sensor_manager")
# Server: /tmp/vyra_sockets/sensor_manager_get_data.sock
await provider.create_callable("get_data", callback=handler)
# Client connects to same path
client = await provider.create_callable(
"get_data",
callback=None,
is_callable=False
)
Server/Client Flags¶
Interface Type |
Flag |
Behavior |
|---|---|---|
Request-Response Server |
|
Creates socket server, responds to requests (callback required) |
Request-Response Client |
|
Connects to socket server, makes requests (no callback) |
Advanced Usage¶
Custom Socket Path¶
# Override default socket naming
server = await provider.create_callable(
name="custom",
callback=handler,
socket_path="/var/run/myapp/custom.sock"
)
Error Handling¶
from vyra_base.com.transport.t_uds.exceptions import (
UDSConnectionError,
UDSTimeoutError
)
try:
response = await client.call(request, timeout=1.0)
except UDSTimeoutError:
logger.error("Request timed out")
except UDSConnectionError as e:
logger.error(f"Connection failed: {e}")
Socket Cleanup¶
# Cleanup on shutdown
async def cleanup():
await provider.shutdown()
# Removes socket files
# Auto-cleanup with context manager
async with UDSProvider(...) as provider:
await provider.initialize()
# ... use provider ...
# Sockets cleaned up automatically
Performance Considerations¶
UDS provides the lowest latency for local IPC:
Transport |
Typical Latency |
Use Case |
|---|---|---|
UDS |
< 100 μs |
Local high-frequency IPC |
Redis (localhost) |
~ 200-500 μs |
Local with pub/sub features |
ROS2 (DDS) |
~ 1-5 ms |
Distributed systems, discovery |
Best Practices:
Use UDS for high-frequency local communication (e.g., control loops)
Use Redis for pub/sub patterns or remote access
Use ROS2 for distributed systems with discovery
API Reference¶
UDS Protocol Provider
Implements AbstractProtocolProvider for Unix Domain Socket transport. Provides low-latency local IPC via stream sockets.
- class vyra_base.com.transport.t_uds.provider.UDSProvider(module_name, module_id, protocol=ProtocolType.UDS)[Quellcode]¶
Bases:
AbstractProtocolProviderProtocol provider for Unix Domain Socket transport.
Features: - Low-latency local IPC - Stream-based communication - Automatic connection management - No serialization overhead (JSON)
Requirements: - Unix-like OS (Linux, macOS) - File system access to /tmp/vyra_sockets
Limitations: - Local machine only - No pub/sub pattern (use Callable for request-response)
Example
>>> # Initialize provider >>> provider = UDSProvider(ProtocolType.UDS) >>> >>> if await provider.check_availability(): ... await provider.initialize() ... ... # Create callable (server) ... async def handle_request(req): ... return {"result": req["value"] * 2} ... ... callable = await provider.create_callable( ... "calculate", ... handle_request, ... module_name="math_service" ... ) ... ... # Create client ... client = await provider.create_callable( ... "calculate", ... None, # No callback for client ... module_name="math_service" ... ) ... result = await client.call({"value": 21})
- Parameter:
module_name (str)
module_id (str)
protocol (ProtocolType)
- __init__(module_name, module_id, protocol=ProtocolType.UDS)[Quellcode]¶
Initialize UDS provider.
- Parameter:
protocol (
ProtocolType) – Protocol type (must be UDS)module_name (
str) – Default module name for interfacesmodule_id (str)
- async check_availability()[Quellcode]¶
Check if UDS transport is available.
- Rückgabe:
Always True on Unix-like systems
- Rückgabetyp:
- async initialize(config=None)[Quellcode]¶
Initialize UDS provider.
- async shutdown()[Quellcode]¶
Shutdown the provider and cleanup resources.
- Rückgabetyp:
- async create_publisher(name, topic_builder=None, message_type=None, **kwargs)[Quellcode]¶
Create UDS Publisher (datagram sockets).
- Parameter:
name (
str) – Publisher nametopic_builder (
Optional[TopicBuilder]) – TopicBuilder instance**kwargs – Additional publisher options (module_name override)
- Rückgabetyp:
- Rückgabe:
UdsPublisherImpl instance
- async create_subscriber(name, topic_builder=None, subscriber_callback=None, message_type=None, **kwargs)[Quellcode]¶
Create UDS Subscriber (datagram sockets).
- Parameter:
- Rückgabetyp:
- Rückgabe:
UdsSubscriberImpl instance
- async create_server(name, topic_builder=None, response_callback=None, service_type=None, **kwargs)[Quellcode]¶
Create UDS Server (stream sockets).
- Parameter:
- Rückgabetyp:
- Rückgabe:
UdsServerImpl instance
- async create_client(name, topic_builder=None, service_type=None, request_callback=None, **kwargs)[Quellcode]¶
Create UDS Client (stream sockets).
- Parameter:
- Rückgabetyp:
- Rückgabe:
UdsClientImpl instance
- async create_action_server(name, topic_builder=None, handle_goal_request=None, handle_cancel_request=None, execution_callback=None, action_type=None, **kwargs)[Quellcode]¶
Create UDS Action Server (stream sockets + state messages).
- Parameter:
name (
str) – Action server nametopic_builder (
Optional[TopicBuilder]) – TopicBuilder instancehandle_goal_request (
Optional[Callable[[Any],Awaitable[bool]]]) – Async callback for goal requestshandle_cancel_request (
Optional[Callable[[Any],Awaitable[bool]]]) – Async callback for cancel requestsexecution_callback (
Optional[Callable[[Any],Awaitable[bool]]]) – Async callback for goal execution**kwargs – Additional action server options (module_name override)
- Rückgabetyp:
- Rückgabe:
UdsActionServerImpl instance
- async create_action_client(name, topic_builder=None, action_type=None, direct_response_callback=None, feedback_callback=None, goal_callback=None, **kwargs)[Quellcode]¶
Create UDS Action Client.
- Parameter:
name (
str) – Action client nametopic_builder (
Optional[TopicBuilder]) – TopicBuilder instancedirect_response_callback (
Optional[Callable[[Any],Awaitable[None]]]) – Optional async callback for resultsfeedback_callback (
Optional[Callable[[Any],Awaitable[None]]]) – Optional async callback for feedbackgoal_callback (
Optional[Callable[[Any],Awaitable[None]]]) – Optional async callback for goal responses**kwargs – Additional action client options (module_name override)
- Rückgabetyp:
- Rückgabe:
UdsActionClientImpl instance
UDS VYRA Models Layer
VYRA abstractions for Unix Domain Socket communication.
Legacy patterns (deprecated): - UDSCallable: Request-response pattern via UDS - UDSSpeaker: Publish-subscribe pattern via UDS - UDSJob: Long-running task pattern via UDS
Unified transport layer: - Publisher/Subscriber (datagram sockets) - Server/Client (stream sockets) - ActionServer/ActionClient (stream sockets + state messages)
- Usage:
>>> from vyra_base.com.transport.t_uds.vyra_models import UdsServerImpl >>> >>> # Server side >>> async def handle_request(request): ... return {"result": request["value"] * 2} >>> >>> server = UdsServerImpl( ... name="calculate", ... topic_builder=builder, ... response_callback=handle_request, ... service_type=CalcService, ... module_name="math_service" ... ) >>> await server.initialize()
- class vyra_base.com.transport.t_uds.vyra_models.VyraPublisherImpl(name, topic_builder, message_type, module_name='vyra', **kwargs)[Quellcode]¶
Bases:
VyraPublisherUnix Domain Socket publisher implementation using datagram sockets.
Pattern: Sends messages to subscribers via datagram socket. Each subscriber listens on their own socket path. Publisher maintains list of subscriber paths (broadcast pattern).
Socket path: /tmp/vyra_sockets/pub_{module}_{topic}.sock
- Parameter:
name (str)
topic_builder (TopicBuilder)
message_type (type)
module_name (str)
- __init__(name, topic_builder, message_type, module_name='vyra', **kwargs)[Quellcode]¶
- Parameter:
name (str)
topic_builder (TopicBuilder)
message_type (type)
module_name (str)
- async initialize()[Quellcode]¶
Initialize UDS publisher.
- Rückgabetyp:
- async cleanup()[Quellcode]¶
Cleanup UDS publisher resources.
- Rückgabetyp:
- async shutdown()[Quellcode]¶
Shutdown UDS publisher.
- Rückgabetyp:
- async publish(message)[Quellcode]¶
Publish message via UDS datagram broadcast to all matching subscribers.
Discovers subscribers by scanning for .topic meta files in the socket directory. Sends the serialized message to each subscriber whose topic matches.
- class vyra_base.com.transport.t_uds.vyra_models.VyraSubscriberImpl(name, topic_builder, subscriber_callback, message_type, module_name='vyra', **kwargs)[Quellcode]¶
Bases:
VyraSubscriberUnix Domain Socket subscriber implementation using datagram sockets.
Pattern: Listens on own socket for messages from publishers. Socket path: /tmp/vyra_sockets/sub_{module}_{topic}_{instance_id}.sock
- Parameter:
- __init__(name, topic_builder, subscriber_callback, message_type, module_name='vyra', **kwargs)[Quellcode]¶
- async initialize()[Quellcode]¶
Initialize UDS subscriber.
- Rückgabetyp:
- async subscribe()[Quellcode]¶
Start listening for messages.
- Rückgabetyp:
- Rückgabe:
True on success
- async cleanup()[Quellcode]¶
Cleanup UDS resources.
- property interface_type: InterfaceType¶
Get interface type.
- class vyra_base.com.transport.t_uds.vyra_models.VyraServerImpl(name, topic_builder, response_callback, service_type, module_name='vyra', **kwargs)[Quellcode]¶
Bases:
VyraServerUnix Domain Socket server implementation using stream sockets.
Pattern: Listens for connections, receives request, sends response. Socket path: /tmp/vyra_sockets/srv_{module}_{service}.sock
- Parameter:
- __init__(name, topic_builder, response_callback, service_type, module_name='vyra', **kwargs)[Quellcode]¶
- async initialize()[Quellcode]¶
Initialize UDS server.
- Rückgabetyp:
- async serve()[Quellcode]¶
Start serving requests.
- Rückgabetyp:
- Rückgabe:
True on success
- async cleanup()[Quellcode]¶
Cleanup UDS resources.
- property interface_type: InterfaceType¶
Get interface type.
- class vyra_base.com.transport.t_uds.vyra_models.VyraClientImpl(name, topic_builder, request_callback, service_type=None, module_name='vyra', **kwargs)[Quellcode]¶
Bases:
VyraClientUnix Domain Socket client implementation using stream sockets.
Pattern: Connects to server socket, sends request, receives response. Server socket path: /tmp/vyra_sockets/srv_{module}_{service}.sock
- Parameter:
- __init__(name, topic_builder, request_callback, service_type=None, module_name='vyra', **kwargs)[Quellcode]¶
- async initialize()[Quellcode]¶
Initialize UDS client.
- Rückgabetyp:
- async call(request, timeout=5.0)[Quellcode]¶
Send request and await response.
- async cleanup()[Quellcode]¶
Cleanup resources (stateless client, nothing to cleanup).
- property interface_type: InterfaceType¶
Get interface type.
- class vyra_base.com.transport.t_uds.vyra_models.VyraActionServerImpl(name, topic_builder, handle_goal_request, handle_cancel_request, execution_callback, action_type, module_name='vyra', **kwargs)[Quellcode]¶
Bases:
VyraActionServerUnix Domain Socket action server using stream sockets.
Pattern: - Main socket: {VYRA_SOCKET_DIR}/{action}.sock - Receives: goal_request | cancel_request | feedback_subscribe - Per Goal: Creates dedicated feedback socket at {module}_{action}_{goal_id}_feedback.sock
- Parameter:
- __init__(name, topic_builder, handle_goal_request, handle_cancel_request, execution_callback, action_type, module_name='vyra', **kwargs)[Quellcode]¶
- async initialize()[Quellcode]¶
Initialize UDS action server.
- Rückgabetyp:
- async serve()[Quellcode]¶
Start serving action requests.
- Rückgabetyp:
- async publish_feedback(goal_id, feedback)[Quellcode]¶
Publish feedback for active goal.
TODO: Broadcast to subscribers via dedicated feedback socket.
- async cleanup()[Quellcode]¶
Cleanup UDS resources.
- property interface_type: InterfaceType¶
Get interface type.
- class vyra_base.com.transport.t_uds.vyra_models.VyraActionClientImpl(name, topic_builder, direct_response, feedback_callback, goal_response_callback, action_type=None, module_name='vyra', **kwargs)[Quellcode]¶
Bases:
VyraActionClientUnix Domain Socket action client using stream sockets.
Pattern: - Connects to server at {VYRA_SOCKET_DIR}/{module}_{action}.sock - Sends goal_request, receives goal_id and feedback_socket_path - Optionally connects to feedback_socket for streaming updates
- Parameter:
- __init__(name, topic_builder, direct_response, feedback_callback, goal_response_callback, action_type=None, module_name='vyra', **kwargs)[Quellcode]¶
- async initialize()[Quellcode]¶
Initialize UDS action client.
- Rückgabetyp:
- async send_goal(goal, timeout=5.0)[Quellcode]¶
Send goal to action server.
- async cancel_goal(goal_id, timeout=5.0)[Quellcode]¶
Cancel active goal.
- async cleanup()[Quellcode]¶
Cleanup UDS resources.
- property interface_type: InterfaceType¶
Get interface type.
Dependencies¶
asyncio(Async I/O, built-in)socket(Unix sockets, built-in)json(Serialization, built-in)
See Also¶
Interface Factory - InterfaceFactory for protocol-agnostic usage
ROS2 Transport Module - ROS2 transport provider
Redis Transport Module - Redis transport provider