Redis Transport Module

Provides Redis-based transport implementation with layered architecture.

Overview

The Redis transport module implements the AbstractProtocolProvider interface for Redis Pub/Sub and Request-Response patterns.

Architecture Layers:

  • communication/: Core Redis functionality (RedisClient, connection handling)

  • vyra_models/: VYRA abstractions (RedisCallable, RedisSpeaker)

  • provider.py: Protocol provider implementation

Features:

  • ✅ Request-Response pattern (RedisCallable)

  • ✅ Pub/Sub pattern (RedisSpeaker)

  • ✅ Server/Client pattern with is_callable flag

  • ✅ Connection pooling

  • ✅ TLS/SSL support

  • ✅ Automatic reconnection

  • ✅ JSON message serialization

Usage

Basic Provider Setup

from vyra_base.com.transport.t_redis import RedisProvider, REDIS_AVAILABLE
from vyra_base.com.core.types import ProtocolType

if REDIS_AVAILABLE:
    # Create provider
    provider = RedisProvider(
        module_name="my_module",
        module_id="abc123",
        protocol=ProtocolType.REDIS
    )

    # Initialize with config
    await provider.initialize(config={
        "host": "redis.example.com",
        "port": 6379,
        "ssl": True,
        "ssl_ca_certs": "/path/to/ca-cert.pem",
        "ssl_certfile": "/path/to/client-cert.pem",
        "ssl_keyfile": "/path/to/client-key.pem"
    })

Create Request-Response Server

# Server callback
async def handle_request(request_data):
    # Process request
    result = {"sum": request_data["a"] + request_data["b"]}
    return result

# Create server (is_callable=True)
server = await provider.create_callable(
    name="calculation_service",
    callback=handle_request
)

Create Request-Response Client

# Create client (is_callable=False)
client = await provider.create_callable(
    name="calculation_service",
    callback=None,  # No callback for client
    is_callable=False
)

# Make request
request = {"a": 5, "b": 3}
response = await client.call(request, timeout=5.0)
print(f"Result: {response['sum']}")

Create Publisher

# Create publisher (is_publisher=True)
publisher = await provider.create_speaker(
    name="status_channel",
    is_publisher=True
)

# Publish message
await publisher.shout({"status": "running", "progress": 75})

Create Subscriber

# Subscriber callback
def on_message(msg_data):
    print(f"Received: {msg_data}")

# Create subscriber (is_publisher=False)
subscriber = await provider.create_speaker(
    name="status_channel",
    is_publisher=False
)

# Start listening
await subscriber.listen(on_message)

Configuration

Connection Configuration

config = {
    "host": "redis.example.com",
    "port": 6379,
    "db": 0,
    "password": "secret",
    "ssl": True,
    "ssl_ca_certs": "/path/to/ca-cert.pem",
    "ssl_certfile": "/path/to/client-cert.pem",
    "ssl_keyfile": "/path/to/client-key.pem",
    "ssl_cert_reqs": "required",  # 'required', 'optional', 'none'
    "socket_timeout": 5.0,
    "socket_connect_timeout": 5.0,
    "retry_on_timeout": True,
    "max_connections": 50
}

await provider.initialize(config=config)

Connection Pooling

from redis.connection import ConnectionPool

pool = ConnectionPool(
    host="redis.example.com",
    port=6379,
    max_connections=100,
    decode_responses=True
)

# Use pool in provider
provider = RedisProvider(connection_pool=pool)

Server/Client Flags

Interface Type

Flag

Behavior

Request-Response Server

is_callable=True

Responds to requests (callback required)

Request-Response Client

is_callable=False

Makes requests (no callback)

Publisher

is_publisher=True

Publishes messages

Subscriber

is_publisher=False

Receives messages (callback required)

Advanced Usage

Custom Message Serialization

import json
import msgpack

# Use MessagePack for binary serialization
class MsgPackProvider(RedisProvider):
    def serialize(self, data):
        return msgpack.packb(data)

    def deserialize(self, data):
        return msgpack.unpackb(data, raw=False)

Pattern Subscriptions

# Subscribe to pattern (e.g., "sensor.*")
subscriber = await provider.create_speaker(
    name="sensor.*",  # Pattern with wildcard
    is_publisher=False
)

def on_sensor_data(msg_data, channel):
    print(f"From {channel}: {msg_data}")

await subscriber.listen(on_sensor_data)

API Reference

Redis Transport Provider

Implements AbstractProtocolProvider for Redis transport. Provides message queue and key-value storage via Redis.

class vyra_base.com.transport.t_redis.provider.RedisProvider(module_name, module_id, protocol=ProtocolType.REDIS, **redis_kwargs)[source]

Bases: AbstractProtocolProvider

Protocol provider for Redis communication.

Features: - Pub/Sub messaging via Publisher interface - Key-Value storage via Callable interface (get/set) - TLS support with ACL authentication - Streaming support (Redis Streams)

Wraps existing vyra_base.com.transport.redis.communication.RedisClient for seamless integration with established infrastructure.

Example

>>> # Initialize provider
>>> provider = RedisProvider(
...     protocol=ProtocolType.REDIS,
...     module_name="my_module"
... )
>>>
>>> if await provider.check_availability():
...     await provider.initialize()
...
...     # Create publisher (Pub/Sub)
...     publisher = await provider.create_publisher(
...         "sensor_updates",
...         module_name="robot"
...     )
...     await publisher.publish({"temperature": 23.5})
...
...     # Listen for messages
...     async def on_message(data):
...         print(f"Received: {data}")
...     await publisher.listen(on_message)
Parameters:
__init__(module_name, module_id, protocol=ProtocolType.REDIS, **redis_kwargs)[source]

Initialize Redis provider.

Parameters:
  • protocol (ProtocolType) – Protocol type (must be REDIS)

  • module_name (str) – Module name for Redis namespace

  • module_id (str) – Module ID for Redis namespace - host: Redis host (default: from env REDIS_HOST) - port: Redis port (default: from env REDIS_PORT) - username: Redis ACL username - password: Redis ACL password - use_tls: Enable TLS (default: from env)

async check_availability()[source]

Check if Redis is available.

Returns:

True if Redis client can be imported

Return type:

bool

async initialize(config=None)[source]

Initialize Redis client.

Parameters:

config (Optional[Dict[str, Any]]) – Optional configuration overrides

Returns:

True if initialization successful

Return type:

bool

Raises:

ProviderError – If initialization fails

async shutdown()[source]

Shutdown Redis client.

Return type:

None

async create_publisher(name, topic_builder=None, message_type=None, **kwargs)[source]

Create Redis Publisher (Pub/Sub).

Parameters:
  • name (str) – Publisher name

  • topic_builder (Optional[TopicBuilder]) – TopicBuilder instance (uses provider’s default if omitted)

  • message_type (type) – Message type class

  • **kwargs – Additional publisher options

Return type:

VyraPublisher

Returns:

RedisPublisherImpl instance

async create_subscriber(name, topic_builder=None, subscriber_callback=None, message_type=None, **kwargs)[source]

Create Redis Subscriber (Pub/Sub).

Parameters:
  • name (str) – Subscriber name

  • topic_builder (Optional[TopicBuilder]) – TopicBuilder instance (uses provider’s default if omitted)

  • subscriber_callback (Callable) – Async callback for received messages

  • message_type (type) – Message type class

  • **kwargs – Additional subscriber options

Return type:

VyraSubscriber

Returns:

RedisSubscriberImpl instance

async create_server(name, topic_builder=None, response_callback=None, service_type=None, **kwargs)[source]

Create Redis Server (request/response pattern).

Parameters:
  • name (str) – Server name

  • topic_builder (Optional[TopicBuilder]) – TopicBuilder instance (uses provider’s default if omitted)

  • response_callback (Optional[Callable]) – Async callback for handling requests

  • service_type (Optional[type]) – Service type class

  • **kwargs – Additional server options

Return type:

VyraServer

Returns:

RedisServerImpl instance

async create_client(name, topic_builder=None, service_type=None, request_callback=None, **kwargs)[source]

Create Redis Client (request/response pattern).

Parameters:
  • name (str) – Client name

  • topic_builder (Optional[TopicBuilder]) – TopicBuilder instance (uses provider’s default if omitted)

  • service_type (type) – Service type class

  • request_callback (Optional[Callable]) – Optional async callback for responses

  • **kwargs – Additional client options

Return type:

VyraClient

Returns:

RedisClientImpl instance

async create_action_server(name, topic_builder=None, handle_goal_request=None, handle_cancel_request=None, execution_callback=None, action_type=None, **kwargs)[source]

Create Redis Action Server (state tracking + pub/sub).

Parameters:
  • name (str) – Action server name

  • topic_builder (Optional[TopicBuilder]) – TopicBuilder instance (uses provider’s default if omitted)

  • handle_goal_request (Callable) – Async callback for goal requests

  • handle_cancel_request (Callable) – Async callback for cancel requests

  • execution_callback (Callable) – Async callback for goal execution

  • action_type (type) – Action type class

  • **kwargs – Additional action server options

Return type:

VyraActionServer

Returns:

RedisActionServerImpl instance

async create_action_client(name, topic_builder=None, action_type=None, direct_response_callback=None, feedback_callback=None, goal_callback=None, **kwargs)[source]

Create Redis Action Client.

Parameters:
  • name (str) – Action client name

  • topic_builder (Optional[TopicBuilder]) – TopicBuilder instance (uses provider’s default if omitted)

  • action_type (type) – Action type class

  • direct_response_callback (Optional[Callable]) – Optional async callback for results

  • feedback_callback (Optional[Callable]) – Optional async callback for feedback

  • goal_callback (Optional[Callable]) – Optional async callback for goal responses

  • **kwargs – Additional action client options

Return type:

VyraActionClient

Returns:

RedisActionClientImpl instance

get_client()[source]

Get underlying Redis client for advanced operations.

Returns:

Underlying vyra_base Redis client

Return type:

Any

Raises:

ProviderError – If provider not initialized

Redis VYRA Models

VYRA abstraction layer for Redis transport.

Legacy patterns (deprecated): - Callable: Request-response via key-value - Speaker: Pub/Sub via channels - Job: Long-running tasks via Pub/Sub + key-value

Unified transport layer: - Publisher/Subscriber (pub/sub) - Server/Client (request/response via key-value) - ActionServer/ActionClient (long-running tasks with feedback)

Example

>>> from vyra_base.com.transport.t_redis.vyra_models import (
...     RedisPublisherImpl, RedisSubscriberImpl, RedisServerImpl
... )
>>>
>>> # Publisher
>>> publisher = RedisPublisherImpl(
...     name="sensor_data",
...     topic_builder=builder,
...     message_type=SensorMsg,
...     redis_client=client
... )
>>> await publisher.initialize()
>>> await publisher.publish({"temperature": 23.5})
>>>
>>> # Server
>>> async def handle_request(req):
...     return {"result": req["value"] * 2}
>>>
>>> server = RedisServerImpl(
...     name="calculate",
...     topic_builder=builder,
...     response_callback=handle_request,
...     service_type=CalcService,
...     redis_client=client
... )
>>> await server.initialize()
class vyra_base.com.transport.t_redis.vyra_models.RedisPublisherImpl(name, topic_builder, redis_client, message_type, **kwargs)[source]

Bases: VyraPublisher

Redis-based publisher implementation using Pub/Sub.

Pattern: Publishes messages to Redis channel at topic name.

Parameters:
__init__(name, topic_builder, redis_client, message_type, **kwargs)[source]
Parameters:
async initialize()[source]

Initialize Redis publisher (no setup needed for pub).

Return type:

bool

async publish(message)[source]

Publish message to Redis channel.

Parameters:

message (Any) – Message instance or dict

Return type:

bool

Returns:

True on success

async cleanup()[source]

Cleanup resources (none needed for publisher).

async shutdown()[source]

Shutdown publisher.

Return type:

None

class vyra_base.com.transport.t_redis.vyra_models.RedisSubscriberImpl(name, topic_builder, subscriber_callback, redis_client, message_type, **kwargs)[source]

Bases: VyraSubscriber

Redis-based subscriber implementation using Pub/Sub.

Pattern: Subscribes to Redis channel and listens for messages.

Parameters:
__init__(name, topic_builder, subscriber_callback, redis_client, message_type, **kwargs)[source]
Parameters:
async initialize()[source]

Initialize Redis subscriber.

Return type:

bool

async subscribe()[source]

Start subscribing to Redis channel.

Return type:

bool

Returns:

True on success

async cleanup()[source]

Cleanup Redis resources.

async shutdown()[source]

Shutdown subscriber.

Return type:

None

class vyra_base.com.transport.t_redis.vyra_models.RedisServerImpl(name, topic_builder, response_callback, redis_client, service_type, **kwargs)[source]

Bases: VyraServer

Redis-based server implementation using request/response pattern.

Pattern: - Listens on channel “srv:{service_name}:requests” - Receives requests with {request_id, data, response_channel} - Processes request via callback - Publishes response to response_channel with {request_id, data}

Parameters:
__init__(name, topic_builder, response_callback, redis_client, service_type, **kwargs)[source]
Parameters:
async initialize()[source]

Initialize Redis server.

Return type:

bool

async serve()[source]

Start serving requests.

Return type:

bool

Returns:

True on success

async cleanup()[source]

Cleanup Redis resources.

async shutdown()[source]

Shutdown interface.

Return type:

None

class vyra_base.com.transport.t_redis.vyra_models.RedisClientImpl(name, topic_builder, redis_client, request_callback=None, service_type=None, **kwargs)[source]

Bases: VyraClient

Redis-based client implementation using request/response pattern.

Pattern: - Publishes request to “srv:{service_name}:requests” with unique response_channel - Listens on own response_channel for response - Matches responses by request_id

Parameters:
__init__(name, topic_builder, redis_client, request_callback=None, service_type=None, **kwargs)[source]
Parameters:
async initialize()[source]

Initialize Redis client.

Return type:

bool

async call(request, timeout=5.0)[source]

Send request and wait for response.

Parameters:
  • request (Any) – Request message instance or dict

  • timeout (float) – Response timeout in seconds

Return type:

Optional[Any]

Returns:

Response object on success, None on failure

async cleanup()[source]

Cleanup Redis resources.

async shutdown()[source]

Shutdown interface.

Return type:

None

class vyra_base.com.transport.t_redis.vyra_models.RedisActionServerImpl(name, topic_builder, handle_goal_request, handle_cancel_request, execution_callback, redis_client=None, action_type=None, **kwargs)[source]

Bases: VyraActionServer

Redis-based action server with state tracking.

Architecture: Keys: - {fn}/{goal_id}/state → JSON: {status, timestamp} - {fn}/{goal_id}/feedback → JSON: Latest feedback - {fn}/{goal_id}/result → JSON: Final result

Channels: - {fn}/goal → Goal requests - {fn}/cancel → Cancel requests - {fn}/{goal_id}/updates → Feedback/result notifications

All keys are built via _action_channel() so that namespace and subsection from the config are respected automatically.

Parameters:
__init__(name, topic_builder, handle_goal_request, handle_cancel_request, execution_callback, redis_client=None, action_type=None, **kwargs)[source]
Parameters:
async initialize()[source]

Initialize Redis action server.

Return type:

bool

async publish_feedback(goal_id, feedback)[source]

Publish feedback for active goal.

Parameters:
  • goal_id (str)

  • feedback (Any)

async cleanup()[source]

Cleanup Redis resources.

async shutdown()[source]

Shutdown interface.

Return type:

None

class vyra_base.com.transport.t_redis.vyra_models.RedisActionClientImpl(name, topic_builder, direct_response=None, feedback_callback=None, goal_response_callback=None, redis_client=None, action_type=None, **kwargs)[source]

Bases: VyraActionClient

Redis-based action client with state tracking.

Architecture: - Publishes goals to {fn}/goal channel - Subscribes to {fn}/{goal_id}/updates for feedback/result - Reads {fn}/{goal_id}/feedback and {fn}/{goal_id}/result keys

All keys are built via _action_channel() following VYRA naming conventions.

Parameters:
__init__(name, topic_builder, direct_response=None, feedback_callback=None, goal_response_callback=None, redis_client=None, action_type=None, **kwargs)[source]
Parameters:
async initialize()[source]

Initialize Redis action client.

Return type:

bool

async send_goal(goal, timeout=5.0)[source]

Send goal to action server.

Parameters:
  • goal (Any) – Goal message instance or dict

  • timeout (float) – Goal acceptance timeout

Return type:

Optional[str]

Returns:

goal_id on success, None on failure

async cancel_goal(goal_id, timeout=5.0)[source]

Cancel active goal.

Parameters:
  • goal_id (str) – ID of goal to cancel

  • timeout (float) – Cancel timeout

Return type:

bool

Returns:

True on successful cancellation

async cleanup()[source]

Cleanup Redis resources.

async shutdown()[source]

Shutdown interface.

Return type:

None

Dependencies

  • redis (Redis Python client)

  • hiredis (Optional, for faster parsing)

See Also