IPC - Inter Process Communication

VYRA supports IPC (Inter Process Communication) via gRPC with Unix Domain Sockets. This enables fast inter process communication within a module.

Concept

IPC used for:

Use Cases:

  • Communication between processes in the same container/module

  • Fast internal API calls without network overhead

  • Decoupling of frontend and backend within the same module

  • Python ↔ Python communication within a module

Not suitable for:

  • Inter-module communication (use ROS2)

  • Communication across container boundaries

  • External API access

Note

For communication between modules, always use ROS2 (Job/Callable). IPC is only intended for communication within a module.

gRPC via Unix Domain Socket

VYRA uses Unix Domain Sockets instead of TCP for gRPC communication:

Advantages:

  • ⚡ Very fast (no network latency)

  • 🔒 Secure (local processes only)

  • 🎯 No port conflicts

  • 💾 Less overhead than TCP/IP

Socket Path: /tmp/<module_name>_ipc.sock

Setup

Server-Side (Backend)

Create a gRPC server with Unix Domain Socket:

import grpc
from concurrent import futures
from vyra_base.com.handler.ipc import IPCHandler

# gRPC Server with Unix Domain Socket
socket_path = "/tmp/my_module_ipc.sock"
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))

# Service register (Your gRPC Service)
MyServiceServicer.add_to_server(MyServiceImpl(), server)

# Bind Unix Domain Socket
server.add_insecure_port(f'unix://{socket_path}')
server.start()
print(f"IPC Server running on {socket_path}")

Client-Side (Frontend/and other Processes)

Connect to the gRPC server:

import grpc

# Connect via Unix Domain Socket
socket_path = "/tmp/my_module_ipc.sock"
channel = grpc.insecure_channel(f'unix://{socket_path}')

# Client-Stub create
stub = MyServiceStub(channel)

# RPC call
request = MyRequest(data="Hello")
response = stub.CallMethod(request)
print(f"Response: {response.result}")

Practical Example

Scenario: Frontend-Backend Communication

Backend (Python gRPC Server):

# backend_service.proto (gRPC Definition)
# service BackendService {
#     rpc GetStatus(StatusRequest) returns (StatusResponse);
#     rpc ProcessData(DataRequest) returns (DataResponse);
# }

from backend_service_pb2_grpc import BackendServiceServicer
import grpc
from concurrent import futures

class BackendServiceImpl(BackendServiceServicer):
    def GetStatus(self, request, context):
        # Backend logic
        return StatusResponse(
            status="running",
            uptime_seconds=12345
        )

    def ProcessData(self, request, context):
        # Data processing
        result = process(request.data)
        return DataResponse(result=result)

# Start server
def start_ipc_server():
    socket_path = "/tmp/my_module_ipc.sock"
    server = grpc.server(futures.ThreadPoolExecuto(max_workers=10))
    BackendServiceServicer.add_to_server(BackendServiceImpl(), server)
    server.add_insecure_port(f'unix://{socket_path}')
    server.start()
    server.wait_for_termination()

Frontend (Python gRPC Client):

from backend_service_pb2 import StatusRequest, DataRequest
from backend_service_pb2_grpc import BackendServiceStub
import grpc

class FrontendClient:
    def __init__(self):
        socket_path = "/tmp/my_module_ipc.sock"
        self.channel = grpc.insecure_channel(f'unix://{socket_path}')
        self.stub = BackendServiceStub(self.channel)

    def get_backend_status(self):
        request = StatusRequest()
        response = self.stub.GetStatus(request)
        return {
            "status": response.status,
            "uptime": response.uptime_seconds
        }

    def process_data(self, data):
        request = DataRequest(data=data)
        response = self.stub.ProcessData(request)
        return response.result

# Usage
client = FrontendClient()
status = client.get_backend_status()
print(f"Backend-Status: {status}")

gRPC Proto-Definitionen

Define your gRPC services in .proto files:

syntax = "proto3";

package vyra.ipc;

// Service-Definition
service ModuleService {
    rpc Initialize(InitRequest) returns (InitResponse);
    rpc ExecuteCommand(CommandRequest) returns (CommandResponse);
    rpc GetData(DataRequest) returns (DataResponse);
}

// Message-Definitions
message InitRequest {
    string config_path = 1;
}

message InitResponse {
    bool success = 1;
    string message = 2;
}

message CommandRequest {
    string command = 1;
    map<string, string> parameters = 2;
}

message CommandResponse {
    int32 status_code = 1;
    string result = 2;
}

Compiling the Proto Files:

# Generate Python gRPC code
python -m grpc_tools.protoc \
    -I. \
    --python_out=. \
    --grpc_python_out=. \
    module_service.proto

IPCHandler Class

VYRA provides a helper class for IPC connections:

from vyra_base.com.handler.ipc import IPCHandler

# IPC-Handler initialize
ipc = IPCHandler(
    socket_path="/tmp/my_module_ipc.sock",
    service_stub=MyServiceStub
)

# Establish connection
await ipc.connect()

# RPC call
response = await ipc.call_method("GetStatus", request)

# Close connection
await ipc.disconnect()

Note

Further details to IPCHandler implementation can be found in the IPCHandler API Reference.

Performance

Typical Latencies:

  • Unix Domain Socket: ~0.05 - 0.2 ms

  • TCP localhost: ~0.2 - 1 ms

  • ROS2 Service: ~1 - 5 ms

➡️ IPC is 10-100x faster than ROS2 for local communication.

Best Practices

Recommended:

  • Use IPC only within a module

  • Define clear Proto definitions

  • Implement error handling and timeouts

  • Clean up socket files on shutdown

  • Use connection pooling for frequent calls

Avoid:

  • IPC for inter-module communication (use ROS2)

  • Very large messages (> 10 MB, use shared memory)

  • Blocking calls without timeout

  • Hardcoded socket paths (use your configuration)

Error Handling

import grpc

try:
    channel = grpc.insecure_channel(f'unix://{socket_path}')
    stub = MyServiceStub(channel)

    # With timeout
    response = stub.CallMethod(
        request,
        timeout=5.0  # 5 seconds Timeout
    )
except grpc.RpcError as e:
    if e.code() == grpc.StatusCode.UNAVAILABLE:
        print("Server not reachable")
    elif e.code() == grpc.StatusCode.DEADLINE_EXCEEDED:
        print("Timeout")
    else:
        print(f"gRPC-Error: {e}")
except Exception as e:
    print(f"Unexpected error: {e}")

Cleanup

Socket files should be deleted on shutdown:

import os
import signal

def cleanup_socket(socket_path):
    if os.path.exists(socket_path):
        os.remove(socket_path)
        print(f"Socket {socket_path} removed")

# Clean up on shutdown
def shutdown_handler(signum, frame):
    cleanup_socket("/tmp/my_module_ipc.sock")
    server.stop(grace_period=5)
    sys.exit(0)

signal.signal(signal.SIGINT, shutdown_handler)
signal.signal(signal.SIGTERM, shutdown_handler)

Further Information