Redis Transport Module¶
Provides Redis-based transport implementation with layered architecture.
Overview¶
The Redis transport module implements the AbstractProtocolProvider interface
for Redis Pub/Sub and Request-Response patterns.
Architecture Layers:
communication/: Core Redis functionality (RedisClient, connection handling)
vyra_models/: VYRA abstractions (RedisCallable, RedisSpeaker)
provider.py: Protocol provider implementation
Features:
✅ Request-Response pattern (RedisCallable)
✅ Pub/Sub pattern (RedisSpeaker)
✅ Server/Client pattern with
is_callableflag✅ Connection pooling
✅ TLS/SSL support
✅ Automatic reconnection
✅ JSON message serialization
Usage¶
Basic Provider Setup¶
from vyra_base.com.transport.t_redis import RedisProvider, REDIS_AVAILABLE
from vyra_base.com.core.types import ProtocolType
if REDIS_AVAILABLE:
# Create provider
provider = RedisProvider(
module_name="my_module",
module_id="abc123",
protocol=ProtocolType.REDIS
)
# Initialize with config
await provider.initialize(config={
"host": "redis.example.com",
"port": 6379,
"ssl": True,
"ssl_ca_certs": "/path/to/ca-cert.pem",
"ssl_certfile": "/path/to/client-cert.pem",
"ssl_keyfile": "/path/to/client-key.pem"
})
Create Request-Response Server¶
# Server callback
async def handle_request(request_data):
# Process request
result = {"sum": request_data["a"] + request_data["b"]}
return result
# Create server (is_callable=True)
server = await provider.create_callable(
name="calculation_service",
callback=handle_request
)
Create Request-Response Client¶
# Create client (is_callable=False)
client = await provider.create_callable(
name="calculation_service",
callback=None, # No callback for client
is_callable=False
)
# Make request
request = {"a": 5, "b": 3}
response = await client.call(request, timeout=5.0)
print(f"Result: {response['sum']}")
Create Publisher¶
# Create publisher (is_publisher=True)
publisher = await provider.create_speaker(
name="status_channel",
is_publisher=True
)
# Publish message
await publisher.shout({"status": "running", "progress": 75})
Create Subscriber¶
# Subscriber callback
def on_message(msg_data):
print(f"Received: {msg_data}")
# Create subscriber (is_publisher=False)
subscriber = await provider.create_speaker(
name="status_channel",
is_publisher=False
)
# Start listening
await subscriber.listen(on_message)
Configuration¶
Connection Configuration¶
config = {
"host": "redis.example.com",
"port": 6379,
"db": 0,
"password": "secret",
"ssl": True,
"ssl_ca_certs": "/path/to/ca-cert.pem",
"ssl_certfile": "/path/to/client-cert.pem",
"ssl_keyfile": "/path/to/client-key.pem",
"ssl_cert_reqs": "required", # 'required', 'optional', 'none'
"socket_timeout": 5.0,
"socket_connect_timeout": 5.0,
"retry_on_timeout": True,
"max_connections": 50
}
await provider.initialize(config=config)
Connection Pooling¶
from redis.connection import ConnectionPool
pool = ConnectionPool(
host="redis.example.com",
port=6379,
max_connections=100,
decode_responses=True
)
# Use pool in provider
provider = RedisProvider(connection_pool=pool)
Server/Client Flags¶
Interface Type |
Flag |
Behavior |
|---|---|---|
Request-Response Server |
|
Responds to requests (callback required) |
Request-Response Client |
|
Makes requests (no callback) |
Publisher |
|
Publishes messages |
Subscriber |
|
Receives messages (callback required) |
Advanced Usage¶
Custom Message Serialization¶
import json
import msgpack
# Use MessagePack for binary serialization
class MsgPackProvider(RedisProvider):
def serialize(self, data):
return msgpack.packb(data)
def deserialize(self, data):
return msgpack.unpackb(data, raw=False)
Pattern Subscriptions¶
# Subscribe to pattern (e.g., "sensor.*")
subscriber = await provider.create_speaker(
name="sensor.*", # Pattern with wildcard
is_publisher=False
)
def on_sensor_data(msg_data, channel):
print(f"From {channel}: {msg_data}")
await subscriber.listen(on_sensor_data)
API Reference¶
Redis Transport Provider
Implements AbstractProtocolProvider for Redis transport. Provides message queue and key-value storage via Redis.
- class vyra_base.com.transport.t_redis.provider.RedisProvider(module_name, module_id, protocol=ProtocolType.REDIS, **redis_kwargs)[source]¶
Bases:
AbstractProtocolProviderProtocol provider for Redis communication.
Features: - Pub/Sub messaging via Publisher interface - Key-Value storage via Callable interface (get/set) - TLS support with ACL authentication - Streaming support (Redis Streams)
Wraps existing vyra_base.com.transport.redis.communication.RedisClient for seamless integration with established infrastructure.
Example
>>> # Initialize provider >>> provider = RedisProvider( ... protocol=ProtocolType.REDIS, ... module_name="my_module" ... ) >>> >>> if await provider.check_availability(): ... await provider.initialize() ... ... # Create publisher (Pub/Sub) ... publisher = await provider.create_publisher( ... "sensor_updates", ... module_name="robot" ... ) ... await publisher.publish({"temperature": 23.5}) ... ... # Listen for messages ... async def on_message(data): ... print(f"Received: {data}") ... await publisher.listen(on_message)
- Parameters:
module_name (str)
module_id (str)
protocol (ProtocolType)
- __init__(module_name, module_id, protocol=ProtocolType.REDIS, **redis_kwargs)[source]¶
Initialize Redis provider.
- Parameters:
protocol (
ProtocolType) – Protocol type (must be REDIS)module_name (
str) – Module name for Redis namespacemodule_id (
str) – Module ID for Redis namespace - host: Redis host (default: from env REDIS_HOST) - port: Redis port (default: from env REDIS_PORT) - username: Redis ACL username - password: Redis ACL password - use_tls: Enable TLS (default: from env)
- async check_availability()[source]¶
Check if Redis is available.
- Returns:
True if Redis client can be imported
- Return type:
- async create_publisher(name, topic_builder=None, message_type=None, **kwargs)[source]¶
Create Redis Publisher (Pub/Sub).
- Parameters:
name (
str) – Publisher nametopic_builder (
Optional[TopicBuilder]) – TopicBuilder instance (uses provider’s default if omitted)message_type (
type) – Message type class**kwargs – Additional publisher options
- Return type:
- Returns:
RedisPublisherImpl instance
- async create_subscriber(name, topic_builder=None, subscriber_callback=None, message_type=None, **kwargs)[source]¶
Create Redis Subscriber (Pub/Sub).
- Parameters:
name (
str) – Subscriber nametopic_builder (
Optional[TopicBuilder]) – TopicBuilder instance (uses provider’s default if omitted)subscriber_callback (
Callable) – Async callback for received messagesmessage_type (
type) – Message type class**kwargs – Additional subscriber options
- Return type:
- Returns:
RedisSubscriberImpl instance
- async create_server(name, topic_builder=None, response_callback=None, service_type=None, **kwargs)[source]¶
Create Redis Server (request/response pattern).
- Parameters:
- Return type:
- Returns:
RedisServerImpl instance
- async create_client(name, topic_builder=None, service_type=None, request_callback=None, **kwargs)[source]¶
Create Redis Client (request/response pattern).
- Parameters:
- Return type:
- Returns:
RedisClientImpl instance
- async create_action_server(name, topic_builder=None, handle_goal_request=None, handle_cancel_request=None, execution_callback=None, action_type=None, **kwargs)[source]¶
Create Redis Action Server (state tracking + pub/sub).
- Parameters:
name (
str) – Action server nametopic_builder (
Optional[TopicBuilder]) – TopicBuilder instance (uses provider’s default if omitted)handle_goal_request (
Callable) – Async callback for goal requestshandle_cancel_request (
Callable) – Async callback for cancel requestsexecution_callback (
Callable) – Async callback for goal executionaction_type (
type) – Action type class**kwargs – Additional action server options
- Return type:
- Returns:
RedisActionServerImpl instance
- async create_action_client(name, topic_builder=None, action_type=None, direct_response_callback=None, feedback_callback=None, goal_callback=None, **kwargs)[source]¶
Create Redis Action Client.
- Parameters:
name (
str) – Action client nametopic_builder (
Optional[TopicBuilder]) – TopicBuilder instance (uses provider’s default if omitted)action_type (
type) – Action type classdirect_response_callback (
Optional[Callable]) – Optional async callback for resultsfeedback_callback (
Optional[Callable]) – Optional async callback for feedbackgoal_callback (
Optional[Callable]) – Optional async callback for goal responses**kwargs – Additional action client options
- Return type:
- Returns:
RedisActionClientImpl instance
- get_client()[source]¶
Get underlying Redis client for advanced operations.
- Returns:
Underlying vyra_base Redis client
- Return type:
- Raises:
ProviderError – If provider not initialized
Redis VYRA Models
VYRA abstraction layer for Redis transport.
Legacy patterns (deprecated): - Callable: Request-response via key-value - Speaker: Pub/Sub via channels - Job: Long-running tasks via Pub/Sub + key-value
Unified transport layer: - Publisher/Subscriber (pub/sub) - Server/Client (request/response via key-value) - ActionServer/ActionClient (long-running tasks with feedback)
Example
>>> from vyra_base.com.transport.t_redis.vyra_models import (
... RedisPublisherImpl, RedisSubscriberImpl, RedisServerImpl
... )
>>>
>>> # Publisher
>>> publisher = RedisPublisherImpl(
... name="sensor_data",
... topic_builder=builder,
... message_type=SensorMsg,
... redis_client=client
... )
>>> await publisher.initialize()
>>> await publisher.publish({"temperature": 23.5})
>>>
>>> # Server
>>> async def handle_request(req):
... return {"result": req["value"] * 2}
>>>
>>> server = RedisServerImpl(
... name="calculate",
... topic_builder=builder,
... response_callback=handle_request,
... service_type=CalcService,
... redis_client=client
... )
>>> await server.initialize()
- class vyra_base.com.transport.t_redis.vyra_models.RedisPublisherImpl(name, topic_builder, redis_client, message_type, **kwargs)[source]¶
Bases:
VyraPublisherRedis-based publisher implementation using Pub/Sub.
Pattern: Publishes messages to Redis channel at topic name.
- Parameters:
name (str)
topic_builder (TopicBuilder)
redis_client (RedisClient)
message_type (type)
- __init__(name, topic_builder, redis_client, message_type, **kwargs)[source]¶
- Parameters:
name (str)
topic_builder (TopicBuilder)
redis_client (RedisClient)
message_type (type)
- class vyra_base.com.transport.t_redis.vyra_models.RedisSubscriberImpl(name, topic_builder, subscriber_callback, redis_client, message_type, **kwargs)[source]¶
Bases:
VyraSubscriberRedis-based subscriber implementation using Pub/Sub.
Pattern: Subscribes to Redis channel and listens for messages.
- Parameters:
name (str)
topic_builder (TopicBuilder)
redis_client (RedisClient)
message_type (type)
- __init__(name, topic_builder, subscriber_callback, redis_client, message_type, **kwargs)[source]¶
- Parameters:
name (str)
topic_builder (TopicBuilder)
redis_client (RedisClient)
message_type (type)
- class vyra_base.com.transport.t_redis.vyra_models.RedisServerImpl(name, topic_builder, response_callback, redis_client, service_type, **kwargs)[source]¶
Bases:
VyraServerRedis-based server implementation using request/response pattern.
Pattern: - Listens on channel “srv:{service_name}:requests” - Receives requests with {request_id, data, response_channel} - Processes request via callback - Publishes response to response_channel with {request_id, data}
- Parameters:
name (str)
topic_builder (TopicBuilder)
redis_client (RedisClient)
service_type (type)
- __init__(name, topic_builder, response_callback, redis_client, service_type, **kwargs)[source]¶
- Parameters:
name (str)
topic_builder (TopicBuilder)
redis_client (RedisClient)
service_type (type)
- class vyra_base.com.transport.t_redis.vyra_models.RedisClientImpl(name, topic_builder, redis_client, request_callback=None, service_type=None, **kwargs)[source]¶
Bases:
VyraClientRedis-based client implementation using request/response pattern.
Pattern: - Publishes request to “srv:{service_name}:requests” with unique response_channel - Listens on own response_channel for response - Matches responses by request_id
- Parameters:
name (str)
topic_builder (TopicBuilder)
redis_client (RedisClient)
service_type (type)
- __init__(name, topic_builder, redis_client, request_callback=None, service_type=None, **kwargs)[source]¶
- Parameters:
name (str)
topic_builder (TopicBuilder)
redis_client (RedisClient)
service_type (type)
- class vyra_base.com.transport.t_redis.vyra_models.RedisActionServerImpl(name, topic_builder, handle_goal_request, handle_cancel_request, execution_callback, redis_client=None, action_type=None, **kwargs)[source]¶
Bases:
VyraActionServerRedis-based action server with state tracking.
Architecture: Keys: -
{fn}/{goal_id}/state→ JSON: {status, timestamp} -{fn}/{goal_id}/feedback→ JSON: Latest feedback -{fn}/{goal_id}/result→ JSON: Final resultChannels: -
{fn}/goal→ Goal requests -{fn}/cancel→ Cancel requests -{fn}/{goal_id}/updates→ Feedback/result notificationsAll keys are built via
_action_channel()so thatnamespaceandsubsectionfrom the config are respected automatically.- Parameters:
- __init__(name, topic_builder, handle_goal_request, handle_cancel_request, execution_callback, redis_client=None, action_type=None, **kwargs)[source]¶
- class vyra_base.com.transport.t_redis.vyra_models.RedisActionClientImpl(name, topic_builder, direct_response=None, feedback_callback=None, goal_response_callback=None, redis_client=None, action_type=None, **kwargs)[source]¶
Bases:
VyraActionClientRedis-based action client with state tracking.
Architecture: - Publishes goals to
{fn}/goalchannel - Subscribes to{fn}/{goal_id}/updatesfor feedback/result - Reads{fn}/{goal_id}/feedbackand{fn}/{goal_id}/resultkeysAll keys are built via
_action_channel()following VYRA naming conventions.- Parameters:
- __init__(name, topic_builder, direct_response=None, feedback_callback=None, goal_response_callback=None, redis_client=None, action_type=None, **kwargs)[source]¶
Dependencies¶
redis(Redis Python client)hiredis(Optional, for faster parsing)
See Also¶
Interface Factory - InterfaceFactory for protocol-agnostic usage
ROS2 Transport Module - ROS2 transport provider
UDS Transport Module - Unix Domain Socket transport provider