UDS Transport Module

Provides Unix Domain Socket-based transport implementation with layered architecture.

Overview

The UDS transport module implements the AbstractProtocolProvider interface for low-latency local inter-process communication using Unix Domain Sockets.

Architecture Layers:

  • communication/: Core UDS functionality (UnixSocket, socket management)

  • vyra_models/: VYRA abstractions (UDSCallable)

  • provider.py: Protocol provider implementation

Features:

  • ✅ Stream-based local IPC

  • ✅ Low-latency request-response

  • ✅ Server/Client pattern with is_callable flag

  • ✅ Automatic connection management

  • ✅ JSON message serialization

  • ✅ File-based socket addressing

  • ✅ Async socket operations

Usage

Basic Provider Setup

from vyra_base.com.transport.t_uds import UDSProvider, UDS_AVAILABLE
from vyra_base.com.core.types import ProtocolType
import os

if UDS_AVAILABLE:
    # Create provider
    provider = UDSProvider(
        module_name="my_module",
        module_id="abc123",
        protocol=ProtocolType.UDS
    )

    # Initialize with socket directory
    socket_dir = "/tmp/vyra_sockets"
    os.makedirs(socket_dir, exist_ok=True)

    await provider.initialize(config={
        "socket_dir": socket_dir
    })

Create Request-Response Server

# Server callback
async def handle_calculation(request_data):
    result = request_data["a"] + request_data["b"]
    return {"sum": result}

# Create server (is_callable=True)
server = await provider.create_callable(
    name="calculator",
    callback=handle_calculation
)
# Socket created at: /tmp/vyra_sockets/my_module_calculator.sock

Create Request-Response Client

# Create client (is_callable=False)
client = await provider.create_callable(
    name="calculator",
    callback=None,  # No callback for client
    is_callable=False
)

# Make request
request = {"a": 10, "b": 32}
response = await client.call(request, timeout=2.0)
print(f"Result: {response['sum']}")

Configuration

Socket Configuration

config = {
    "socket_dir": "/var/run/vyra",
    "socket_permissions": 0o660,  # rw-rw----
    "buffer_size": 8192,
    "timeout": 5.0,
    "max_connections": 10
}

await provider.initialize(config=config)

Socket Naming

Sockets are named using the pattern: {module_name}_{service_name}.sock

# Example socket paths
provider = UDSProvider(module_name="sensor_manager")

# Server: /tmp/vyra_sockets/sensor_manager_get_data.sock
await provider.create_callable("get_data", callback=handler)

# Client connects to same path
client = await provider.create_callable(
    "get_data",
    callback=None,
    is_callable=False
)

Server/Client Flags

Interface Type

Flag

Behavior

Request-Response Server

is_callable=True

Creates socket server, responds to requests (callback required)

Request-Response Client

is_callable=False

Connects to socket server, makes requests (no callback)

Advanced Usage

Custom Socket Path

# Override default socket naming
server = await provider.create_callable(
    name="custom",
    callback=handler,
    socket_path="/var/run/myapp/custom.sock"
)

Error Handling

from vyra_base.com.transport.t_uds.exceptions import (
    UDSConnectionError,
    UDSTimeoutError
)

try:
    response = await client.call(request, timeout=1.0)
except UDSTimeoutError:
    logger.error("Request timed out")
except UDSConnectionError as e:
    logger.error(f"Connection failed: {e}")

Socket Cleanup

# Cleanup on shutdown
async def cleanup():
    await provider.shutdown()
    # Removes socket files

# Auto-cleanup with context manager
async with UDSProvider(...) as provider:
    await provider.initialize()
    # ... use provider ...
# Sockets cleaned up automatically

Performance Considerations

UDS provides the lowest latency for local IPC:

Transport

Typical Latency

Use Case

UDS

< 100 μs

Local high-frequency IPC

Redis (localhost)

~ 200-500 μs

Local with pub/sub features

ROS2 (DDS)

~ 1-5 ms

Distributed systems, discovery

Best Practices:

  • Use UDS for high-frequency local communication (e.g., control loops)

  • Use Redis for pub/sub patterns or remote access

  • Use ROS2 for distributed systems with discovery

API Reference

UDS Protocol Provider

Implements AbstractProtocolProvider for Unix Domain Socket transport. Provides low-latency local IPC via stream sockets.

class vyra_base.com.transport.t_uds.provider.UDSProvider(module_name, module_id, protocol=ProtocolType.UDS)[Quellcode]

Bases: AbstractProtocolProvider

Protocol provider for Unix Domain Socket transport.

Features: - Low-latency local IPC - Stream-based communication - Automatic connection management - No serialization overhead (JSON)

Requirements: - Unix-like OS (Linux, macOS) - File system access to /tmp/vyra_sockets

Limitations: - Local machine only - No pub/sub pattern (use Callable for request-response)

Example

>>> # Initialize provider
>>> provider = UDSProvider(ProtocolType.UDS)
>>>
>>> if await provider.check_availability():
...     await provider.initialize()
...
...     # Create callable (server)
...     async def handle_request(req):
...         return {"result": req["value"] * 2}
...
...     callable = await provider.create_callable(
...         "calculate",
...         handle_request,
...         module_name="math_service"
...     )
...
...     # Create client
...     client = await provider.create_callable(
...         "calculate",
...         None,  # No callback for client
...         module_name="math_service"
...     )
...     result = await client.call({"value": 21})
Parameter:
__init__(module_name, module_id, protocol=ProtocolType.UDS)[Quellcode]

Initialize UDS provider.

Parameter:
  • protocol (ProtocolType) – Protocol type (must be UDS)

  • module_name (str) – Default module name for interfaces

  • module_id (str)

async check_availability()[Quellcode]

Check if UDS transport is available.

Rückgabe:

Always True on Unix-like systems

Rückgabetyp:

bool

async initialize(config=None)[Quellcode]

Initialize UDS provider.

Parameter:

config (Optional[Dict[str, Any]]) – Optional configuration - socket_dir: Socket directory (default: /tmp/vyra_sockets) - connect_timeout: Connection timeout - call_timeout: Default call timeout

Rückgabe:

True if initialization successful

Rückgabetyp:

bool

async shutdown()[Quellcode]

Shutdown the provider and cleanup resources.

Rückgabetyp:

None

async create_publisher(name, topic_builder=None, message_type=None, **kwargs)[Quellcode]

Create UDS Publisher (datagram sockets).

Parameter:
  • name (str) – Publisher name

  • topic_builder (Optional[TopicBuilder]) – TopicBuilder instance

  • message_type (Optional[type]) – Message type class

  • **kwargs – Additional publisher options (module_name override)

Rückgabetyp:

VyraPublisher

Rückgabe:

UdsPublisherImpl instance

async create_subscriber(name, topic_builder=None, subscriber_callback=None, message_type=None, **kwargs)[Quellcode]

Create UDS Subscriber (datagram sockets).

Parameter:
Rückgabetyp:

VyraSubscriber

Rückgabe:

UdsSubscriberImpl instance

async create_server(name, topic_builder=None, response_callback=None, service_type=None, **kwargs)[Quellcode]

Create UDS Server (stream sockets).

Parameter:
Rückgabetyp:

VyraServer

Rückgabe:

UdsServerImpl instance

async create_client(name, topic_builder=None, service_type=None, request_callback=None, **kwargs)[Quellcode]

Create UDS Client (stream sockets).

Parameter:
Rückgabetyp:

VyraClient

Rückgabe:

UdsClientImpl instance

async create_action_server(name, topic_builder=None, handle_goal_request=None, handle_cancel_request=None, execution_callback=None, action_type=None, **kwargs)[Quellcode]

Create UDS Action Server (stream sockets + state messages).

Parameter:
Rückgabetyp:

VyraActionServer

Rückgabe:

UdsActionServerImpl instance

async create_action_client(name, topic_builder=None, action_type=None, direct_response_callback=None, feedback_callback=None, goal_callback=None, **kwargs)[Quellcode]

Create UDS Action Client.

Parameter:
Rückgabetyp:

VyraActionClient

Rückgabe:

UdsActionClientImpl instance

UDS VYRA Models Layer

VYRA abstractions for Unix Domain Socket communication.

Legacy patterns (deprecated): - UDSCallable: Request-response pattern via UDS - UDSSpeaker: Publish-subscribe pattern via UDS - UDSJob: Long-running task pattern via UDS

Unified transport layer: - Publisher/Subscriber (datagram sockets) - Server/Client (stream sockets) - ActionServer/ActionClient (stream sockets + state messages)

Usage:
>>> from vyra_base.com.transport.t_uds.vyra_models import UdsServerImpl
>>>
>>> # Server side
>>> async def handle_request(request):
...     return {"result": request["value"] * 2}
>>>
>>> server = UdsServerImpl(
...     name="calculate",
...     topic_builder=builder,
...     response_callback=handle_request,
...     service_type=CalcService,
...     module_name="math_service"
... )
>>> await server.initialize()
class vyra_base.com.transport.t_uds.vyra_models.VyraPublisherImpl(name, topic_builder, message_type, module_name='vyra', **kwargs)[Quellcode]

Bases: VyraPublisher

Unix Domain Socket publisher implementation using datagram sockets.

Pattern: Sends messages to subscribers via datagram socket. Each subscriber listens on their own socket path. Publisher maintains list of subscriber paths (broadcast pattern).

Socket path: /tmp/vyra_sockets/pub_{module}_{topic}.sock

Parameter:
__init__(name, topic_builder, message_type, module_name='vyra', **kwargs)[Quellcode]
Parameter:
async initialize()[Quellcode]

Initialize UDS publisher.

Rückgabetyp:

bool

async cleanup()[Quellcode]

Cleanup UDS publisher resources.

Rückgabetyp:

None

async shutdown()[Quellcode]

Shutdown UDS publisher.

Rückgabetyp:

None

async publish(message)[Quellcode]

Publish message via UDS datagram broadcast to all matching subscribers.

Discovers subscribers by scanning for .topic meta files in the socket directory. Sends the serialized message to each subscriber whose topic matches.

Rückgabetyp:

bool

Parameter:

message (Any)

class vyra_base.com.transport.t_uds.vyra_models.VyraSubscriberImpl(name, topic_builder, subscriber_callback, message_type, module_name='vyra', **kwargs)[Quellcode]

Bases: VyraSubscriber

Unix Domain Socket subscriber implementation using datagram sockets.

Pattern: Listens on own socket for messages from publishers. Socket path: /tmp/vyra_sockets/sub_{module}_{topic}_{instance_id}.sock

Parameter:
__init__(name, topic_builder, subscriber_callback, message_type, module_name='vyra', **kwargs)[Quellcode]
Parameter:
async initialize()[Quellcode]

Initialize UDS subscriber.

Rückgabetyp:

bool

async subscribe()[Quellcode]

Start listening for messages.

Rückgabetyp:

bool

Rückgabe:

True on success

async cleanup()[Quellcode]

Cleanup UDS resources.

property interface_type: InterfaceType

Get interface type.

class vyra_base.com.transport.t_uds.vyra_models.VyraServerImpl(name, topic_builder, response_callback, service_type, module_name='vyra', **kwargs)[Quellcode]

Bases: VyraServer

Unix Domain Socket server implementation using stream sockets.

Pattern: Listens for connections, receives request, sends response. Socket path: /tmp/vyra_sockets/srv_{module}_{service}.sock

Parameter:
__init__(name, topic_builder, response_callback, service_type, module_name='vyra', **kwargs)[Quellcode]
Parameter:
async initialize()[Quellcode]

Initialize UDS server.

Rückgabetyp:

bool

async serve()[Quellcode]

Start serving requests.

Rückgabetyp:

bool

Rückgabe:

True on success

async cleanup()[Quellcode]

Cleanup UDS resources.

property interface_type: InterfaceType

Get interface type.

class vyra_base.com.transport.t_uds.vyra_models.VyraClientImpl(name, topic_builder, request_callback, service_type=None, module_name='vyra', **kwargs)[Quellcode]

Bases: VyraClient

Unix Domain Socket client implementation using stream sockets.

Pattern: Connects to server socket, sends request, receives response. Server socket path: /tmp/vyra_sockets/srv_{module}_{service}.sock

Parameter:
__init__(name, topic_builder, request_callback, service_type=None, module_name='vyra', **kwargs)[Quellcode]
Parameter:
async initialize()[Quellcode]

Initialize UDS client.

Rückgabetyp:

bool

async call(request, timeout=5.0)[Quellcode]

Send request and await response.

Parameter:
  • request (Any) – Request message instance or dict

  • timeout (float) – Call timeout in seconds

Rückgabetyp:

Optional[Any]

Rückgabe:

Response object on success, None on failure

async cleanup()[Quellcode]

Cleanup resources (stateless client, nothing to cleanup).

property interface_type: InterfaceType

Get interface type.

class vyra_base.com.transport.t_uds.vyra_models.VyraActionServerImpl(name, topic_builder, handle_goal_request, handle_cancel_request, execution_callback, action_type, module_name='vyra', **kwargs)[Quellcode]

Bases: VyraActionServer

Unix Domain Socket action server using stream sockets.

Pattern: - Main socket: {VYRA_SOCKET_DIR}/{action}.sock - Receives: goal_request | cancel_request | feedback_subscribe - Per Goal: Creates dedicated feedback socket at {module}_{action}_{goal_id}_feedback.sock

Parameter:
__init__(name, topic_builder, handle_goal_request, handle_cancel_request, execution_callback, action_type, module_name='vyra', **kwargs)[Quellcode]
Parameter:
async initialize()[Quellcode]

Initialize UDS action server.

Rückgabetyp:

bool

async serve()[Quellcode]

Start serving action requests.

Rückgabetyp:

bool

async publish_feedback(goal_id, feedback)[Quellcode]

Publish feedback for active goal.

TODO: Broadcast to subscribers via dedicated feedback socket.

Parameter:
  • goal_id (str)

  • feedback (Any)

async cleanup()[Quellcode]

Cleanup UDS resources.

property interface_type: InterfaceType

Get interface type.

class vyra_base.com.transport.t_uds.vyra_models.VyraActionClientImpl(name, topic_builder, direct_response, feedback_callback, goal_response_callback, action_type=None, module_name='vyra', **kwargs)[Quellcode]

Bases: VyraActionClient

Unix Domain Socket action client using stream sockets.

Pattern: - Connects to server at {VYRA_SOCKET_DIR}/{module}_{action}.sock - Sends goal_request, receives goal_id and feedback_socket_path - Optionally connects to feedback_socket for streaming updates

Parameter:
__init__(name, topic_builder, direct_response, feedback_callback, goal_response_callback, action_type=None, module_name='vyra', **kwargs)[Quellcode]
Parameter:
async initialize()[Quellcode]

Initialize UDS action client.

Rückgabetyp:

bool

async send_goal(goal, timeout=5.0)[Quellcode]

Send goal to action server.

Parameter:
  • goal (Any) – Goal message instance or dict

  • timeout (float) – Goal acceptance timeout

Rückgabetyp:

Optional[str]

Rückgabe:

goal_id on success, None on failure

async cancel_goal(goal_id, timeout=5.0)[Quellcode]

Cancel active goal.

Parameter:
  • goal_id (str) – ID of goal to cancel

  • timeout (float) – Cancel timeout

Rückgabetyp:

bool

Rückgabe:

True on successful cancellation

async cleanup()[Quellcode]

Cleanup UDS resources.

property interface_type: InterfaceType

Get interface type.

Dependencies

  • asyncio (Async I/O, built-in)

  • socket (Unix sockets, built-in)

  • json (Serialization, built-in)

See Also