UDS Transport Module

Provides Unix Domain Socket-based transport implementation with layered architecture.

Overview

The UDS transport module implements the AbstractProtocolProvider interface for low-latency local inter-process communication using Unix Domain Sockets.

Architecture Layers:

  • communication/: Core UDS functionality (UnixSocket, socket management)

  • vyra_models/: VYRA abstractions (UDSCallable)

  • provider.py: Protocol provider implementation

Features:

  • ✅ Stream-based local IPC

  • ✅ Low-latency request-response

  • ✅ Server/Client pattern with is_callable flag

  • ✅ Automatic connection management

  • ✅ JSON message serialization

  • ✅ File-based socket addressing

  • ✅ Async socket operations

Usage

Basic Provider Setup

from vyra_base.com.transport.t_uds import UDSProvider, UDS_AVAILABLE
from vyra_base.com.core.types import ProtocolType
import os

if UDS_AVAILABLE:
    # Create provider
    provider = UDSProvider(
        module_name="my_module",
        module_id="abc123",
        protocol=ProtocolType.UDS
    )

    # Initialize with socket directory
    socket_dir = "/tmp/vyra_sockets"
    os.makedirs(socket_dir, exist_ok=True)

    await provider.initialize(config={
        "socket_dir": socket_dir
    })

Create Request-Response Server

# Server callback
async def handle_calculation(request_data):
    result = request_data["a"] + request_data["b"]
    return {"sum": result}

# Create server (is_callable=True)
server = await provider.create_callable(
    name="calculator",
    callback=handle_calculation
)
# Socket created at: /tmp/vyra_sockets/my_module_calculator.sock

Create Request-Response Client

# Create client (is_callable=False)
client = await provider.create_callable(
    name="calculator",
    callback=None,  # No callback for client
    is_callable=False
)

# Make request
request = {"a": 10, "b": 32}
response = await client.call(request, timeout=2.0)
print(f"Result: {response['sum']}")

Configuration

Socket Configuration

config = {
    "socket_dir": "/var/run/vyra",
    "socket_permissions": 0o660,  # rw-rw----
    "buffer_size": 8192,
    "timeout": 5.0,
    "max_connections": 10
}

await provider.initialize(config=config)

Socket Naming

Sockets are named using the pattern: {module_name}_{service_name}.sock

# Example socket paths
provider = UDSProvider(module_name="sensor_manager")

# Server: /tmp/vyra_sockets/sensor_manager_get_data.sock
await provider.create_callable("get_data", callback=handler)

# Client connects to same path
client = await provider.create_callable(
    "get_data",
    callback=None,
    is_callable=False
)

Server/Client Flags

Interface Type

Flag

Behavior

Request-Response Server

is_callable=True

Creates socket server, responds to requests (callback required)

Request-Response Client

is_callable=False

Connects to socket server, makes requests (no callback)

Advanced Usage

Custom Socket Path

# Override default socket naming
server = await provider.create_callable(
    name="custom",
    callback=handler,
    socket_path="/var/run/myapp/custom.sock"
)

Error Handling

from vyra_base.com.transport.t_uds.exceptions import (
    UDSConnectionError,
    UDSTimeoutError
)

try:
    response = await client.call(request, timeout=1.0)
except UDSTimeoutError:
    logger.error("Request timed out")
except UDSConnectionError as e:
    logger.error(f"Connection failed: {e}")

Socket Cleanup

# Cleanup on shutdown
async def cleanup():
    await provider.shutdown()
    # Removes socket files

# Auto-cleanup with context manager
async with UDSProvider(...) as provider:
    await provider.initialize()
    # ... use provider ...
# Sockets cleaned up automatically

Performance Considerations

UDS provides the lowest latency for local IPC:

Transport

Typical Latency

Use Case

UDS

< 100 μs

Local high-frequency IPC

Redis (localhost)

~ 200-500 μs

Local with pub/sub features

ROS2 (DDS)

~ 1-5 ms

Distributed systems, discovery

Best Practices:

  • Use UDS for high-frequency local communication (e.g., control loops)

  • Use Redis for pub/sub patterns or remote access

  • Use ROS2 for distributed systems with discovery

API Reference

UDS Protocol Provider

Implements AbstractProtocolProvider for Unix Domain Socket transport. Provides low-latency local IPC via stream sockets.

class vyra_base.com.transport.t_uds.provider.UDSProvider(module_name, module_id, protocol=ProtocolType.UDS)[source]

Bases: AbstractProtocolProvider

Protocol provider for Unix Domain Socket transport.

Features: - Low-latency local IPC - Stream-based communication - Automatic connection management - No serialization overhead (JSON)

Requirements: - Unix-like OS (Linux, macOS) - File system access to /tmp/vyra_sockets

Limitations: - Local machine only - No pub/sub pattern (use Callable for request-response)

Example

>>> # Initialize provider
>>> provider = UDSProvider(ProtocolType.UDS)
>>>
>>> if await provider.check_availability():
...     await provider.initialize()
...
...     # Create callable (server)
...     async def handle_request(req):
...         return {"result": req["value"] * 2}
...
...     callable = await provider.create_callable(
...         "calculate",
...         handle_request,
...         module_name="math_service"
...     )
...
...     # Create client
...     client = await provider.create_callable(
...         "calculate",
...         None,  # No callback for client
...         module_name="math_service"
...     )
...     result = await client.call({"value": 21})
Parameters:
__init__(module_name, module_id, protocol=ProtocolType.UDS)[source]

Initialize UDS provider.

Parameters:
  • protocol (ProtocolType) – Protocol type (must be UDS)

  • module_name (str) – Default module name for interfaces

  • module_id (str)

async check_availability()[source]

Check if UDS transport is available.

Returns:

Always True on Unix-like systems

Return type:

bool

async initialize(config=None)[source]

Initialize UDS provider.

Parameters:

config (Optional[Dict[str, Any]]) – Optional configuration - socket_dir: Socket directory (default: /tmp/vyra_sockets) - connect_timeout: Connection timeout - call_timeout: Default call timeout

Returns:

True if initialization successful

Return type:

bool

async shutdown()[source]

Shutdown the provider and cleanup resources.

Return type:

None

async create_publisher(name, topic_builder=None, message_type=None, **kwargs)[source]

Create UDS Publisher (datagram sockets).

Parameters:
  • name (str) – Publisher name

  • topic_builder (Optional[TopicBuilder]) – TopicBuilder instance

  • message_type (Optional[type]) – Message type class

  • **kwargs – Additional publisher options (module_name override)

Return type:

VyraPublisher

Returns:

UdsPublisherImpl instance

async create_subscriber(name, topic_builder=None, subscriber_callback=None, message_type=None, **kwargs)[source]

Create UDS Subscriber (datagram sockets).

Parameters:
Return type:

VyraSubscriber

Returns:

UdsSubscriberImpl instance

async create_server(name, topic_builder=None, response_callback=None, service_type=None, **kwargs)[source]

Create UDS Server (stream sockets).

Parameters:
Return type:

VyraServer

Returns:

UdsServerImpl instance

async create_client(name, topic_builder=None, service_type=None, request_callback=None, **kwargs)[source]

Create UDS Client (stream sockets).

Parameters:
Return type:

VyraClient

Returns:

UdsClientImpl instance

async create_action_server(name, topic_builder=None, handle_goal_request=None, handle_cancel_request=None, execution_callback=None, action_type=None, **kwargs)[source]

Create UDS Action Server (stream sockets + state messages).

Parameters:
Return type:

VyraActionServer

Returns:

UdsActionServerImpl instance

async create_action_client(name, topic_builder=None, action_type=None, direct_response_callback=None, feedback_callback=None, goal_callback=None, **kwargs)[source]

Create UDS Action Client.

Parameters:
Return type:

VyraActionClient

Returns:

UdsActionClientImpl instance

UDS VYRA Models Layer

VYRA abstractions for Unix Domain Socket communication.

Legacy patterns (deprecated): - UDSCallable: Request-response pattern via UDS - UDSSpeaker: Publish-subscribe pattern via UDS - UDSJob: Long-running task pattern via UDS

Unified transport layer: - Publisher/Subscriber (datagram sockets) - Server/Client (stream sockets) - ActionServer/ActionClient (stream sockets + state messages)

Usage:
>>> from vyra_base.com.transport.t_uds.vyra_models import UdsServerImpl
>>>
>>> # Server side
>>> async def handle_request(request):
...     return {"result": request["value"] * 2}
>>>
>>> server = UdsServerImpl(
...     name="calculate",
...     topic_builder=builder,
...     response_callback=handle_request,
...     service_type=CalcService,
...     module_name="math_service"
... )
>>> await server.initialize()
class vyra_base.com.transport.t_uds.vyra_models.VyraPublisherImpl(name, topic_builder, message_type, module_name='vyra', **kwargs)[source]

Bases: VyraPublisher

Unix Domain Socket publisher implementation using datagram sockets.

Pattern: Sends messages to subscribers via datagram socket. Each subscriber listens on their own socket path. Publisher maintains list of subscriber paths (broadcast pattern).

Socket path: /tmp/vyra_sockets/pub_{module}_{topic}.sock

Parameters:
__init__(name, topic_builder, message_type, module_name='vyra', **kwargs)[source]
Parameters:
async initialize()[source]

Initialize UDS publisher.

Return type:

bool

async cleanup()[source]

Cleanup UDS publisher resources.

Return type:

None

async shutdown()[source]

Shutdown UDS publisher.

Return type:

None

async publish(message)[source]

Publish message via UDS datagram broadcast to all matching subscribers.

Discovers subscribers by scanning for .topic meta files in the socket directory. Sends the serialized message to each subscriber whose topic matches.

Return type:

bool

Parameters:

message (Any)

class vyra_base.com.transport.t_uds.vyra_models.VyraSubscriberImpl(name, topic_builder, subscriber_callback, message_type, module_name='vyra', **kwargs)[source]

Bases: VyraSubscriber

Unix Domain Socket subscriber implementation using datagram sockets.

Pattern: Listens on own socket for messages from publishers. Socket path: /tmp/vyra_sockets/sub_{module}_{topic}_{instance_id}.sock

Parameters:
__init__(name, topic_builder, subscriber_callback, message_type, module_name='vyra', **kwargs)[source]
Parameters:
async initialize()[source]

Initialize UDS subscriber.

Return type:

bool

async subscribe()[source]

Start listening for messages.

Return type:

bool

Returns:

True on success

async cleanup()[source]

Cleanup UDS resources.

property interface_type: InterfaceType

Get interface type.

class vyra_base.com.transport.t_uds.vyra_models.VyraServerImpl(name, topic_builder, response_callback, service_type, module_name='vyra', **kwargs)[source]

Bases: VyraServer

Unix Domain Socket server implementation using stream sockets.

Pattern: Listens for connections, receives request, sends response. Socket path: /tmp/vyra_sockets/srv_{module}_{service}.sock

Parameters:
__init__(name, topic_builder, response_callback, service_type, module_name='vyra', **kwargs)[source]
Parameters:
async initialize()[source]

Initialize UDS server.

Return type:

bool

async serve()[source]

Start serving requests.

Return type:

bool

Returns:

True on success

async cleanup()[source]

Cleanup UDS resources.

property interface_type: InterfaceType

Get interface type.

class vyra_base.com.transport.t_uds.vyra_models.VyraClientImpl(name, topic_builder, request_callback, service_type=None, module_name='vyra', **kwargs)[source]

Bases: VyraClient

Unix Domain Socket client implementation using stream sockets.

Pattern: Connects to server socket, sends request, receives response. Server socket path: /tmp/vyra_sockets/srv_{module}_{service}.sock

Parameters:
__init__(name, topic_builder, request_callback, service_type=None, module_name='vyra', **kwargs)[source]
Parameters:
async initialize()[source]

Initialize UDS client.

Return type:

bool

async call(request, timeout=5.0)[source]

Send request and await response.

Parameters:
  • request (Any) – Request message instance or dict

  • timeout (float) – Call timeout in seconds

Return type:

Optional[Any]

Returns:

Response object on success, None on failure

async cleanup()[source]

Cleanup resources (stateless client, nothing to cleanup).

property interface_type: InterfaceType

Get interface type.

class vyra_base.com.transport.t_uds.vyra_models.VyraActionServerImpl(name, topic_builder, handle_goal_request, handle_cancel_request, execution_callback, action_type, module_name='vyra', **kwargs)[source]

Bases: VyraActionServer

Unix Domain Socket action server using stream sockets.

Pattern: - Main socket: {VYRA_SOCKET_DIR}/{action}.sock - Receives: goal_request | cancel_request | feedback_subscribe - Per Goal: Creates dedicated feedback socket at {module}_{action}_{goal_id}_feedback.sock

Parameters:
__init__(name, topic_builder, handle_goal_request, handle_cancel_request, execution_callback, action_type, module_name='vyra', **kwargs)[source]
Parameters:
async initialize()[source]

Initialize UDS action server.

Return type:

bool

async serve()[source]

Start serving action requests.

Return type:

bool

async publish_feedback(goal_id, feedback)[source]

Publish feedback for active goal.

TODO: Broadcast to subscribers via dedicated feedback socket.

Parameters:
  • goal_id (str)

  • feedback (Any)

async cleanup()[source]

Cleanup UDS resources.

property interface_type: InterfaceType

Get interface type.

class vyra_base.com.transport.t_uds.vyra_models.VyraActionClientImpl(name, topic_builder, direct_response, feedback_callback, goal_response_callback, action_type=None, module_name='vyra', **kwargs)[source]

Bases: VyraActionClient

Unix Domain Socket action client using stream sockets.

Pattern: - Connects to server at {VYRA_SOCKET_DIR}/{module}_{action}.sock - Sends goal_request, receives goal_id and feedback_socket_path - Optionally connects to feedback_socket for streaming updates

Parameters:
__init__(name, topic_builder, direct_response, feedback_callback, goal_response_callback, action_type=None, module_name='vyra', **kwargs)[source]
Parameters:
async initialize()[source]

Initialize UDS action client.

Return type:

bool

async send_goal(goal, timeout=5.0)[source]

Send goal to action server.

Parameters:
  • goal (Any) – Goal message instance or dict

  • timeout (float) – Goal acceptance timeout

Return type:

Optional[str]

Returns:

goal_id on success, None on failure

async cancel_goal(goal_id, timeout=5.0)[source]

Cancel active goal.

Parameters:
  • goal_id (str) – ID of goal to cancel

  • timeout (float) – Cancel timeout

Return type:

bool

Returns:

True on successful cancellation

async cleanup()[source]

Cleanup UDS resources.

property interface_type: InterfaceType

Get interface type.

Dependencies

  • asyncio (Async I/O, built-in)

  • socket (Unix sockets, built-in)

  • json (Serialization, built-in)

See Also