Quellcode fΓΌr vyra_base.com.transport.t_redis.provider

"""
Redis Transport Provider

Implements AbstractProtocolProvider for Redis transport.
Provides message queue and key-value storage via Redis.
"""
import logging
from typing import Any, Callable, Optional, Dict, cast

from vyra_base.com.core.topic_builder import TopicBuilder
from vyra_base.com.core.types import (
    ProtocolType,
    VyraPublisher,
    VyraSubscriber,
    VyraServer,
    VyraClient,
    VyraActionServer,
    VyraActionClient,
)
from vyra_base.com.core.exceptions import (
    ProtocolUnavailableError,
    ProviderError,
)
from vyra_base.com.providers.protocol_provider import AbstractProtocolProvider
from vyra_base.com.core.callback_registry import CallbackRegistry
from vyra_base.com.transport.t_redis.communication.redis_client import RedisClient
from vyra_base.com.transport.t_redis.vyra_models import (
    RedisPublisherImpl,
    RedisSubscriberImpl,
    RedisServerImpl,
    RedisClientImpl,
    RedisActionServerImpl,
    RedisActionClientImpl,
)
from vyra_base.com.core.blueprints import (
    ActionBlueprint, 
    ServiceBlueprint,
    PublisherBlueprint,
    SubscriberBlueprint,
) 

logger = logging.getLogger(__name__)


[Doku] class RedisProvider(AbstractProtocolProvider): """ Protocol provider for Redis communication. Features: - Pub/Sub messaging via Publisher interface - Key-Value storage via Callable interface (get/set) - TLS support with ACL authentication - Streaming support (Redis Streams) Wraps existing vyra_base.com.transport.redis.communication.RedisClient for seamless integration with established infrastructure. Example: >>> # Initialize provider >>> provider = RedisProvider( ... protocol=ProtocolType.REDIS, ... module_name="my_module" ... ) >>> >>> if await provider.check_availability(): ... await provider.initialize() ... ... # Create publisher (Pub/Sub) ... publisher = await provider.create_publisher( ... "sensor_updates", ... module_name="robot" ... ) ... await publisher.publish({"temperature": 23.5}) ... ... # Listen for messages ... async def on_message(data): ... print(f"Received: {data}") ... await publisher.listen(on_message) """
[Doku] def __init__( self, module_name: str, module_id: str, protocol: ProtocolType = ProtocolType.REDIS, **redis_kwargs ): """ Initialize Redis provider. Args: protocol: Protocol type (must be REDIS) module_name: Module name for Redis namespace module_id: Module ID for Redis namespace - host: Redis host (default: from env REDIS_HOST) - port: Redis port (default: from env REDIS_PORT) - username: Redis ACL username - password: Redis ACL password - use_tls: Enable TLS (default: from env) """ super().__init__(protocol) self.module_name = module_name self._redis_kwargs = redis_kwargs self._client: Optional[RedisClient] = None self._topic_builder = TopicBuilder(module_name, module_id)
[Doku] async def check_availability(self) -> bool: """ Check if Redis is available. Returns: bool: True if Redis client can be imported """ logger.debug("βœ… Redis available") self._available = True return True
[Doku] async def initialize(self, config: Optional[Dict[str, Any]] = None) -> bool: """ Initialize Redis client. Args: config: Optional configuration overrides Returns: bool: True if initialization successful Raises: ProviderError: If initialization fails """ if not await self.check_availability(): raise ProtocolUnavailableError("Redis not available") try: # Merge config redis_config = {**self._redis_kwargs} if config: redis_config.update(config) # Create Redis client self._client = RedisClient( module_name=self.module_name, **redis_config ) # Test connection await self._client.connect() if not await self._client.health_check(): raise ProviderError("Redis health check failed") self._initialized = True logger.info(f"βœ… Redis provider initialized for module: {self.module_name}") return True except Exception as e: raise ProviderError(f"Redis initialization failed: {e}")
async def _ensure_initialized(self) -> None: """ Ensure the provider is initialized, attempting lazy initialization if not. Raises: ProviderError: If initialization cannot be completed. """ if self._initialized and self._client: return logger.info("πŸ”„ Redis provider not initialized β€” attempting lazy initialization") try: await self.initialize() except Exception as e: raise ProviderError(f"Redis lazy initialization failed: {e}") if not self._client: raise ProviderError("Redis initialization succeeded but client is None") def _require_client(self) -> RedisClient: """ Return the initialized Redis client, raising ProviderError if not available. Must only be called after _ensure_initialized(). """ if self._client is None: raise ProviderError("Redis client is None after initialization") return cast(RedisClient, self._client)
[Doku] async def shutdown(self) -> None: """Shutdown Redis client.""" if self._client: try: await self._client.close() logger.info("βœ… Redis provider shutdown complete") except Exception as e: logger.error(f"❌ Redis shutdown error: {e}") self._initialized = False self._client = None
# ============================================================================ # UNIFIED TRANSPORT LAYER METHODS # ============================================================================
[Doku] async def create_publisher( self, name: str, topic_builder: Optional[TopicBuilder] = None, message_type: type = None, **kwargs ) -> VyraPublisher: """ Create Redis Publisher (Pub/Sub). Args: name: Publisher name topic_builder: TopicBuilder instance (uses provider's default if omitted) message_type: Message type class **kwargs: Additional publisher options Returns: RedisPublisherImpl instance """ await self._ensure_initialized() effective_topic_builder = topic_builder or self._topic_builder if message_type is None: message_type = effective_topic_builder.load_interface_type( name, self.protocol, ) if message_type is None: raise ProviderError(f"Message type for publisher '{name}' not found in topic builder") try: publisher = RedisPublisherImpl( name=name, topic_builder=effective_topic_builder, redis_client=self._require_client(), message_type=message_type, **kwargs ) await publisher.initialize() logger.info(f"βœ… Redis Publisher created: {name}") return publisher except Exception as e: logger.error(f"❌ Failed to create Redis Publisher '{name}': {e}") raise ProviderError(f"Failed to create publisher: {e}")
[Doku] async def create_subscriber( self, name: str, topic_builder: Optional[TopicBuilder] = None, subscriber_callback: Callable = None, message_type: type = None, **kwargs ) -> VyraSubscriber: """ Create Redis Subscriber (Pub/Sub). Args: name: Subscriber name topic_builder: TopicBuilder instance (uses provider's default if omitted) subscriber_callback: Async callback for received messages message_type: Message type class **kwargs: Additional subscriber options Returns: RedisSubscriberImpl instance """ await self._ensure_initialized() effective_topic_builder = topic_builder or self._topic_builder if message_type is None: message_type = effective_topic_builder.load_interface_type( name, self.protocol, ) if message_type is None: raise ProviderError(f"Message type for subscriber '{name}' not found in topic builder") if subscriber_callback is None: _bp = CallbackRegistry.get_blueprint(name) if _bp and _bp.is_bound(): subscriber_callback = _bp.callback if subscriber_callback is None: raise ProviderError( f"No subscriber_callback provided for '{name}' and no bound blueprint found in CallbackRegistry" ) try: subscriber = RedisSubscriberImpl( name=name, topic_builder=effective_topic_builder, subscriber_callback=subscriber_callback, redis_client=self._require_client(), message_type=message_type, **kwargs ) await subscriber.initialize() await subscriber.subscribe() logger.info(f"βœ… Redis Subscriber created: {name}") return subscriber except Exception as e: logger.error(f"❌ Failed to create Redis Subscriber '{name}': {e}") raise ProviderError(f"Failed to create subscriber: {e}")
[Doku] async def create_server( self, name: str, topic_builder: Optional[TopicBuilder] = None, response_callback: Optional[Callable] = None, service_type: Optional[type] = None, **kwargs ) -> VyraServer: """ Create Redis Server (request/response pattern). Args: name: Server name topic_builder: TopicBuilder instance (uses provider's default if omitted) response_callback: Async callback for handling requests service_type: Service type class **kwargs: Additional server options Returns: RedisServerImpl instance """ await self._ensure_initialized() effective_topic_builder = topic_builder or self._topic_builder if service_type is None: service_type = effective_topic_builder.load_interface_type( name, self.protocol, ) if service_type is None: raise ProviderError(f"Service type for server '{name}' not found in topic builder") if response_callback is None: _bp = CallbackRegistry.get_blueprint(name) if _bp and _bp.is_bound(): response_callback = _bp.callback if response_callback is None: raise ProviderError( f"No response_callback provided for server '{name}' and no bound blueprint found in CallbackRegistry" ) try: server = RedisServerImpl( name=name, topic_builder=effective_topic_builder, response_callback=response_callback, redis_client=self._require_client(), service_type=service_type, **kwargs ) await server.initialize() await server.serve() logger.info(f"βœ… Redis Server created: {name}") return server except Exception as e: logger.error(f"❌ Failed to create Redis Server '{name}': {e}") raise ProviderError(f"Failed to create server: {e}")
[Doku] async def create_client( self, name: str, topic_builder: Optional[TopicBuilder] = None, service_type: type = None, request_callback: Optional[Callable] = None, **kwargs ) -> VyraClient: """ Create Redis Client (request/response pattern). Args: name: Client name topic_builder: TopicBuilder instance (uses provider's default if omitted) service_type: Service type class request_callback: Optional async callback for responses **kwargs: Additional client options Returns: RedisClientImpl instance """ await self._ensure_initialized() effective_topic_builder = topic_builder or self._topic_builder if service_type is None: service_type = effective_topic_builder.load_interface_type( name, self.protocol, ) # service_type is optional for Redis β€” dict-based communication works without it try: client = RedisClientImpl( name=name, topic_builder=effective_topic_builder, request_callback=request_callback, redis_client=self._require_client(), service_type=service_type, **kwargs ) await client.initialize() logger.info(f"βœ… Redis Client created: {name}") return client except Exception as e: logger.error(f"❌ Failed to create Redis Client '{name}': {e}") raise ProviderError(f"Failed to create client: {e}")
[Doku] async def create_action_server( self, name: str, topic_builder: Optional[TopicBuilder] = None, handle_goal_request: Callable = None, handle_cancel_request: Callable = None, execution_callback: Callable = None, action_type: type = None, **kwargs ) -> VyraActionServer: """ Create Redis Action Server (state tracking + pub/sub). Args: name: Action server name topic_builder: TopicBuilder instance (uses provider's default if omitted) handle_goal_request: Async callback for goal requests handle_cancel_request: Async callback for cancel requests execution_callback: Async callback for goal execution action_type: Action type class **kwargs: Additional action server options Returns: RedisActionServerImpl instance """ await self._ensure_initialized() effective_topic_builder = topic_builder or self._topic_builder if action_type is None: action_type = effective_topic_builder.load_interface_type( name, self.protocol, ) if action_type is None: raise ProviderError(f"Action type for action server '{name}' not found in topic builder") if execution_callback is None: _bp = CallbackRegistry.get_blueprint(name) if _bp and _bp.is_bound(): execution_callback = _bp.callback else: raise ProviderError( f"No execution_callback provided for action server '{name}' and no bound blueprint found in CallbackRegistry" ) if not isinstance(_bp, ActionBlueprint): raise ProviderError( f"Blueprint for action server '{name}' must be an ActionBlueprint. Found: {type(_bp).__name__}" ) if handle_goal_request is None: _bp_goal = cast(Optional[ActionBlueprint], CallbackRegistry.get_blueprint(name)) if _bp_goal and _bp_goal.is_bound('on_goal'): handle_goal_request = _bp_goal.get_callback('on_goal') if handle_cancel_request is None: _bp_cancel = cast(Optional[ActionBlueprint], CallbackRegistry.get_blueprint(name)) if _bp_cancel and _bp_cancel.is_bound('on_cancel'): handle_cancel_request = _bp_cancel.get_callback('on_cancel') if execution_callback is None or handle_goal_request is None or handle_cancel_request is None: raise ProviderError( f"Missing callbacks for action server '{name}'. " f"Execution callback: {execution_callback is not None}, " f"Goal request callback: {handle_goal_request is not None}, " f"Cancel request callback: {handle_cancel_request is not None}. " f"Ensure they are provided or bound in CallbackRegistry." ) try: action_server = RedisActionServerImpl( name=name, topic_builder=effective_topic_builder, handle_goal_request=handle_goal_request, handle_cancel_request=handle_cancel_request, execution_callback=execution_callback, redis_client=self._require_client(), action_type=action_type, **kwargs ) await action_server.initialize() logger.info(f"βœ… Redis Action Server created: {name}") return action_server except Exception as e: logger.error(f"❌ Failed to create Redis Action Server '{name}': {e}") raise ProviderError(f"Failed to create action server: {e}")
[Doku] async def create_action_client( self, name: str, topic_builder: Optional[TopicBuilder] = None, action_type: type = None, direct_response_callback: Optional[Callable] = None, feedback_callback: Optional[Callable] = None, goal_callback: Optional[Callable] = None, **kwargs ) -> VyraActionClient: """ Create Redis Action Client. Args: name: Action client name topic_builder: TopicBuilder instance (uses provider's default if omitted) action_type: Action type class direct_response_callback: Optional async callback for results feedback_callback: Optional async callback for feedback goal_callback: Optional async callback for goal responses **kwargs: Additional action client options Returns: RedisActionClientImpl instance """ await self._ensure_initialized() effective_topic_builder = topic_builder or self._topic_builder if action_type is None: action_type = effective_topic_builder.load_interface_type( name, self.protocol, ) # action_type is optional for Redis β€” dict-based communication works without it try: action_client = RedisActionClientImpl( name=name, topic_builder=effective_topic_builder, direct_response=direct_response_callback, feedback_callback=feedback_callback, goal_response_callback=goal_callback, redis_client=self._require_client(), action_type=action_type, **kwargs ) await action_client.initialize() logger.info(f"βœ… Redis Action Client created: {name}") return action_client except Exception as e: logger.error(f"❌ Failed to create Redis Action Client '{name}': {e}") raise ProviderError(f"Failed to create action client: {e}")
[Doku] def get_client(self) -> Any: """ Get underlying Redis client for advanced operations. Returns: RedisClient: Underlying vyra_base Redis client Raises: ProviderError: If provider not initialized """ if not self._initialized or not self._client: raise ProviderError("Provider not initialized β€” call initialize() first or use create_client() for lazy init") return self._client