Quellcode für vyra_base.core.volatile

from __future__ import annotations

import asyncio
from typing import Any, Type, Optional

try:
    from vyra_base.com.transport.t_ros2.node import VyraNode  # noqa: F401
except ImportError:
    VyraNode = None  # type: ignore[assignment,misc]
from vyra_base.com.core.types import VyraPublisher
from vyra_base.helper.error_handler import ErrorTraceback
from vyra_base.com.transport.t_redis import RedisClient, REDIS_TYPE
from vyra_base.com import InterfaceFactory, ProtocolType, remote_service

import logging
logger = logging.getLogger(__name__)
[Doku] class Volatile: """ A class to manage volatile parameters. Volatile parameters are temporary parameters that are not persisted in the database. They are stored in Redis and can be used for temporary data storage. This class provides methods to read, write, and manage volatile parameters. It also provides methods to subscribe to changes on volatile parameters. The volatile parameters are identified by their keys, which are strings. The class uses Redis as the storage backend for volatile parameters. API: - Volatiles could only be written by the module itself. - Volatiles could be subscribed by other modules to get notified on changes. - Volatiles are not persisted and will be lost on system restart. - Volatiles are not shared between modules, each module has its own set of volatiles. - Volatiles are identified by their keys, which are strings. - Volatiles could be of different types, such as string, hash, list, set. Example usage: - current robot state - io states - status information - temporary data storage - any custom data defined by the module. """ EVENT_TOPIC_PREFIX = "volatile/"
[Doku] def __init__( self, storage_access_transient: RedisClient, module_name: str, module_id: str, node: Optional[Any], transient_base_types: dict[str, Any]): """ Initialize the Volatile class. """ self.module_name: str = module_name self.module_id: str = module_id self._volatile_prefix: str = f"{self.module_name}_{self.module_id}/volatile/" self.communication_node: Optional[Any] = node self.REDIS_TYPE_MAP: dict[REDIS_TYPE, Any] = { REDIS_TYPE.STRING: transient_base_types.get('VolatileString'), REDIS_TYPE.HASH: transient_base_types.get('VolatileHash'), REDIS_TYPE.LIST: transient_base_types.get('VolatileList'), REDIS_TYPE.SET: transient_base_types.get('VolatileSet') } self.redis: RedisClient = storage_access_transient self._active_shouter: dict[str, VyraPublisher] = {} self._listener: asyncio.Task | None = None
def _qualified_key(self, key: str) -> str: if key.startswith(self._volatile_prefix): return key return f"{self._volatile_prefix}{key}" def _external_key(self, key: str) -> str: if key.startswith(self._volatile_prefix): return key[len(self._volatile_prefix):] return key def __del__(self): """ Clean up the Volatile instance. This will unsubscribe from all active shouters and stop the listener if active. Note: Cannot await in destructor, so cleanup may not be complete. Use explicit cleanup() method for proper async cleanup. """ if self._listener: self._listener.cancel() self._listener = None # Note: Cannot await in __del__, so we just clear the references # Proper cleanup should be done via explicit cleanup() method self._active_shouter.clear()
[Doku] async def cleanup(self): """Async cleanup method for proper resource cleanup.""" if self._listener: self._listener.cancel() self._listener = None for key, publisher in self._active_shouter.items(): await publisher.shutdown() self._active_shouter.clear()
@ErrorTraceback.w_check_error_exist async def activate_listener(self, channel: str|list[str]): """ Activate the Redis pub/sub listener for monitoring volatile parameter changes. This method starts listening for changes on registered volatile parameters. When a volatile value changes in Redis, the listener will receive a message and trigger the appropriate ROS2 publisher to publish the change. **Workflow:** 1. Get currently active Redis pub/sub channels 2. Identify new channels that need listeners 3. Create Redis pub/sub listener with change notification callback :param channel: The channel name(s) to activate (currently unused, for future extension). :type channel: str | list[str] :raises KeyError: If the channel does not exist in Redis. :raises RuntimeError: If the listener creation fails. """ # Get list of channels that are already being listened to active_listener = (await self.redis.get_active_listeners())['active_channels'] if not isinstance(channel, list): channel = [channel] # # Find channels in our publisher registry that don't have active listeners yet # new_listener = [li for li in self._active_shouter.keys() if li not in active_listener] new_listener: list[str] = [li for li in channel if li in self._active_shouter and li not in active_listener] if len(new_listener) == 0: logger.warning( f"No new volatile channels to listen to. Active listeners: {active_listener}, Requested: {channel}") return # No new channels to listen to # Create pub/sub listener for new channels await self.redis.create_pubsub_listener( channels=new_listener, callback_handler=self.on_volatile_change_received )
[Doku] async def deactivate_listener(self, channel: str|list[str]): """ Deactivate the Redis pub/sub listener for volatile parameter changes. This method stops listening for changes on all registered volatile parameters. After calling this, no further change notifications will be received. """ if not isinstance(channel, list): channel = [channel] logger.info(f"Deactivating listeners for channels: {channel}") await self.redis.remove_listener_channels(channels=channel)
@ErrorTraceback.w_check_error_exist async def on_volatile_change_received(self, message: dict, callback_context): """ Callback handler that processes Redis pub/sub messages when volatile values change. This method is called automatically by the Redis pub/sub listener whenever a volatile parameter changes. It publishes the change to the corresponding ROS2 topic via the registered VyraPublisher. **Message flow:** 1. Redis detects value change on subscribed key 2. Redis pub/sub sends message to this callback 3. Callback extracts channel and message type 4. If valid, publishes change to ROS2 topic via VyraPublisher :param message: Redis pub/sub message containing type, channel, and data. :type message: dict :param callback_context: Context information from the Redis listener. :type callback_context: Any **Expected message format:** { "type": "message", "channel": "volatile_key_name", "data": <value> } """ message_type = message.get("type", None) channel = message.get("channel", None) # Only process actual messages (not subscribe/unsubscribe events) if message_type == "message" and channel in self._active_shouter: # Publish the changed value to the ROS2 topic await self._active_shouter[channel].publish(message["data"]) @ErrorTraceback.w_check_error_exist async def read_all_volatile_names(self) -> list: """ Retrieve all volatile parameter names (keys) stored in Redis. This method queries Redis for all keys in the module's namespace and returns them as a list. Useful for discovering available volatile parameters or iterating over all stored values. :return: List of volatile parameter key names. :rtype: list[str] **Example:** .. code-block:: python keys = await volatile.read_all_volatile_names() print(f"Available volatiles: {keys}") # Output: ['temperature', 'pressure', 'humidity'] """ keys = list(await self.redis.get_all_keys()) return [self._external_key(k) for k in keys if str(k).startswith(self._volatile_prefix)] @ErrorTraceback.w_check_error_exist async def set_volatile_value(self, key: Any, value: Type[REDIS_TYPE]): """ Set or update the value of a volatile parameter in Redis. This method stores a value under the specified key. If the key already exists, its value will be overwritten. The value is stored in Redis using the appropriate data structure (string, hash, list, or set). :param key: The unique identifier for the volatile parameter. :type key: str :param value: The value to store (can be string, dict, list, or set). :type value: Any **Example:** .. code-block:: python # Store simple value await volatile.set_volatile_value("temperature", 23.5) # Store complex data await volatile.set_volatile_value("sensor_data", { "temp": 23.5, "humidity": 60.2 }) """ await self.redis.set(self._qualified_key(str(key)), value) @ErrorTraceback.w_check_error_exist async def get_volatile_value(self, key: str) -> Any: """ Retrieve the current value of a volatile parameter from Redis. This method fetches the value stored under the specified key. If the key does not exist, None is returned. :param key: The unique identifier of the volatile parameter. :type key: str :return: The stored value, or None if the key does not exist. :rtype: Any **Example:** .. code-block:: python value = await volatile.get_volatile_value("temperature") if value is not None: print(f"Current temperature: {value}°C") """ return await self.redis.get(self._qualified_key(key)) @ErrorTraceback.w_check_error_exist async def publish_volatile_to_ros2(self, volatile_key: str, ros2_topic_name: str | None = None): """ Create a ROS2 publisher for a volatile parameter and subscribe to its changes. This method sets up automatic ROS2 topic publishing whenever the specified volatile parameter changes in Redis. It creates a ROS2 publisher that will publish change events to a topic that other modules can subscribe to. **Setup workflow:** 1. Verify that the volatile key exists in Redis 2. Determine the Redis data type (string, hash, list, set) 3. Create appropriate ROS2 publisher for that data type 4. Subscribe to Redis key-space notifications for that key 5. When key changes, Redis notifies this module 6. Notification triggers ROS2 topic publication via publisher :param volatile_key: The name of the volatile parameter to monitor. :type volatile_key: str :param ros2_topic_name: Optional custom name for the ROS2 topic. If None, uses volatile_key as topic name. :type ros2_topic_name: str | None :raises KeyError: If the volatile key does not exist in Redis. :raises ValueError: If the Redis data type is not supported. **Example usage:** .. code-block:: python # Step 1: Create volatile value await volatile.set_volatile_value("temperature", 23.5) # Step 2: Create ROS2 publisher with default topic name await volatile.publish_volatile_to_ros2("temperature") # Alternative: Use custom topic name await volatile.publish_volatile_to_ros2("temperature", "sensor_temp") # Step 3: Activate listener to receive notifications await volatile.activate_listener("temperature") # Step 4: Any change now publishes to ROS2 topic await volatile.set_volatile_value("temperature", 24.1) # -> Automatically published to ROS2 topic! **Resulting ROS2 topic:** The created topic name follows the pattern: ``/module_name/volatile/<ros2_topic_name or volatile_key>`` """ # Step 1: Verify volatile key exists qualified_key = self._qualified_key(volatile_key) if not await self.redis.exists(qualified_key): raise KeyError( f"Volatile key '{volatile_key}' does not exist in Redis. " f"Create it first with set_volatile_value().") # Step 2: Get Redis data type redis_type: REDIS_TYPE | None = await self.redis.get_type(qualified_key) if redis_type is None: raise KeyError(f"Could not determine type for key '{volatile_key}'.") if redis_type not in self.REDIS_TYPE_MAP: raise ValueError( f"Unsupported Redis type: {redis_type}. " f"Supported types: {list(self.REDIS_TYPE_MAP.keys())}") if self.communication_node is None: raise RuntimeError( "ROS2 node not available. publish_volatile_to_ros2 requires an active ROS2 node. " "Pass a VyraNode instance when constructing Volatile in FULL mode." ) mapped_type = self.REDIS_TYPE_MAP[redis_type] if mapped_type is None: raise ValueError( f"No message type mapped for Redis type '{redis_type}'. " f"Ensure transient_base_types contains '{redis_type.name}' entries." ) # Use custom topic name or default to volatile_key topic_name = ros2_topic_name if ros2_topic_name is not None else volatile_key # Step 3: Create ROS2 publisher for this volatile type publisher: VyraPublisher = await InterfaceFactory.create_publisher( name=topic_name, protocols=[ProtocolType.ROS2], message_type=mapped_type, node=self.communication_node, is_publisher=True ) self._active_shouter[qualified_key] = publisher # Step 4: Subscribe to Redis key-space notifications await self.redis.subscribe_to_key(qualified_key) @ErrorTraceback.w_check_error_exist async def unsubscribe_from_changes(self, volatile_key: str): """ Unsubscribe from change notifications for a specific volatile parameter. This method stops monitoring the specified volatile parameter and removes the ROS2 topic publication. After calling this, changes to the volatile will no longer trigger ROS2 topic messages. :param volatile_key: The name of the volatile parameter to stop monitoring. :type volatile_key: str :raises KeyError: If the volatile key does not exist in Redis. **Example:** .. code-block:: python # Stop monitoring temperature changes await volatile.unsubscribe_from_changes("temperature") """ qualified_key = self._qualified_key(volatile_key) if await self.redis.exists(qualified_key): await self.redis.unsubscribe_from_key(qualified_key) # Remove publisher from registry if it exists if qualified_key in self._active_shouter: await self._active_shouter[qualified_key].shutdown() del self._active_shouter[qualified_key] else: raise KeyError( f"Volatile key '{volatile_key}' does not exist in Redis.")
[Doku] async def has_volatile(self, key: str) -> bool: """ Check if a volatile parameter with the given key exists in Redis. :param key: The key of the volatile parameter to check. :type key: str :return: True if the volatile exists, False otherwise. :rtype: bool **Example:** .. code-block:: python if await volatile.has_volatile("temperature"): value = await volatile.get_volatile_value("temperature") """ return await self.redis.exists(self._qualified_key(key))
[Doku] async def get_volatile_impl(self, key: str) -> dict[str, Any]: if not await self.has_volatile(key): return { "success": False, "message": f"Volatile key '{key}' does not exist.", "value": "", } import json as _json value = await self.get_volatile_value(key) return { "success": True, "message": f"Volatile '{key}' retrieved successfully.", "value": _json.dumps(value) if not isinstance(value, str) else value, }
[Doku] async def set_volatile_impl(self, key: str, value: Any) -> dict[str, Any]: try: await self.set_volatile_value(key, value) return { "success": True, "message": f"Volatile '{key}' set successfully.", } except Exception as e: return { "success": False, "message": f"Failed to set volatile '{key}': {e}", }
[Doku] async def create_new_volatile_impl(self, key: str, value: Any) -> dict[str, Any]: if await self.has_volatile(key): return { "success": False, "message": f"Volatile '{key}' already exists.", } return await self.set_volatile_impl(key, value)
[Doku] async def read_all_volatiles_impl(self) -> dict[str, Any]: import json as _json result = [] keys = await self.read_all_volatile_names() for key in keys: value = await self.get_volatile_value(key) redis_type = await self.redis.get_type(self._qualified_key(key)) result.append( { "key": key, "value": value, "type": redis_type.value if redis_type is not None else "unknown", "address": self._qualified_key(key), } ) return {"all_volatiles_json": _json.dumps(result)}
[Doku] @remote_service() async def get_volatile(self, request: Any, response: Any) -> None: """ Get the current value of a volatile parameter (remote service interface). :param request: Request object with ``key`` attribute. :param response: Response object updated with ``success``, ``message``, ``value``. """ key = request.key result = await self.get_volatile_impl(key) response.success = result["success"] response.message = result["message"] response.value = result.get("value", "") return None
[Doku] @remote_service() async def set_volatile(self, request: Any, response: Any) -> None: """ Set the value of a volatile parameter (remote service interface). :param request: Request object with ``key`` and ``value`` attributes. :param response: Response object updated with ``success`` and ``message``. """ key = request.key value = request.value result = await self.set_volatile_impl(key, value) response.success = result["success"] response.message = result["message"] return None
[Doku] @remote_service() async def create_new_volatile(self, request: Any, response: Any) -> None: """ Create a new volatile key (fails if key already exists). """ key = request.key value = request.value result = await self.create_new_volatile_impl(key, value) response.success = result["success"] response.message = result["message"] return None
[Doku] @remote_service() async def delete_volatile(self, request: Any, response: Any) -> None: """ Delete a volatile parameter from Redis by key. Request fields: key (str): Volatile key to delete (required). """ result = await self.delete_volatile_impl(key=str(request.key)) response.success = result["success"] response.message = result["message"] return None
[Doku] async def delete_volatile_impl(self, key: str) -> dict[str, Any]: """ Delete a volatile parameter from Redis. :param key: The volatile key to delete. :type key: str :return: Result dict with ``success`` and ``message``. :rtype: dict[str, Any] """ if not key: return {"success": False, "message": "Volatile key is required."} if not await self.has_volatile(key): return {"success": False, "message": f"Volatile '{key}' not found."} # Stop any active ROS2 publisher for this volatile qualified = self._qualified_key(key) if qualified in self._active_shouter: try: await self.unsubscribe_from_changes(key) except Exception: pass await self.redis.delete(qualified) logger.info(f"Volatile '{key}' deleted successfully.") return {"success": True, "message": f"Volatile '{key}' deleted successfully."}
[Doku] @remote_service() async def read_all_volatiles(self, request: Any, response: Any) -> None: """ Read all volatile parameters with their current values (remote service interface). Returns a JSON-encoded list of objects with ``key`` and ``value`` fields. :param request: Request object (unused). :param response: Response object updated with ``all_volatiles_json``. """ try: result = await self.read_all_volatiles_impl() response.all_volatiles_json = result["all_volatiles_json"] except Exception as e: logger.error(f"Failed to read all volatiles: {e}") response.all_volatiles_json = "[]" return None