Volatile - Volatile real-time data

The Volatile-class manages volatile data via a Redis backend., which must be read and written quickly - ideal for real-time applications.

Concept

Volatiles are temporary, fast data stores that:

  • Are stored in Redis (In-Memory Database)

  • Provide very fast access (~0.1ms)

  • Are lost on restart (volatile)

  • Support change events

  • Use Pub/Sub mechanisms

When to use Volatiles?

Ideal for:

  • Real-time sensor data

  • Status variables

  • Temporary calculation results

  • Event-based communication between modules

  • Caching of data

Not suitable for:

Access via Entity

from vyra_base.core.entity import VyraEntity

entity = VyraEntity(...)

# Volatile-Access (all methods are internal - no ROS2 services)
volatile = entity.volatile

Bemerkung

Important: Unlike Parameters, Volatile methods do not have a separate ROS2 service interface. All volatile operations (set_volatile_value, get_volatile_value, etc.) are internal methods designed for use within your module’s Python code.

However, you can publish volatile changes to ROS2 topics using subscribe_to_changes() - see Change-Events & ROS2 Topic Mapping for details.

Set value

All volatile operations are internal methods - they are called directly from your Python code without going through ROS2 services:

# Set value (internal method)
await entity.volatile.set_volatile_value("temperature", 23.5)

# Complex data (automatically serialized)
await entity.volatile.set_volatile_value("sensor_data", {
    "temperature": 23.5,
    "humidity": 60.2,
    "timestamp": "2026-01-19T10:30:00"
})

# Lists/Arrays
await entity.volatile.set_volatile_value("measurements", [1, 2, 3, 4, 5])

All read operations are internal methods:

# Read single value (internal method)
temperature = await entity.volatile.get_volatile_value("temperature")
print(f"Current temperature: {temperature}°C")

# Complex data
sensor_data = await entity.volatile.get_volatile_value("sensor_data")
print(f"Temperature: {sensor_data['temperature']}")

# Read all existing keys (internal method)ty.volatile.get_volatile_value("temperature")
print(f"Current temperature: {temperature}°C")

# Complex data
sensor_data = await entity.volatile.get_volatile_value("sensor_data")
print(f"Temperature: {sensor_data['temperature']}")

# Read all existing keys
all_keys = await entity.volatile.read_all_volatile_names()
for key in all_keys:
    value = await entity.volatile.get_volatile_value(key)

Change-Events & ROS2 Topic Mapping

While volatile methods themselves are internal, you can publish volatile changes to ROS2 topics for inter-module communication. This allows other modules to .. important:

**Internal vs. External:**
 (Internal)

First, create a volatile parameter with an initial value using the internal method:

# Create volatile with initial value (internal method)

Monitor changes to volatiles in real-time and publish them automatically to ROS2 topics.

Complete workflow to map a volatile to a ROS2 topic:

Step 1: Create the Volatile ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ (Setup ROS2 Publishing) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Subscribe to change notifications for the volatile. This creates a ROS2 speaker and sets up Redis key-space notifications:

# Subscribe to changes -> creates ROS2 topic (internal method

Step 2: Subscribe to Changes

Subscribe to change notifications for the volatile. This creates a ROS2 speaker and sets up Redis key-space notifications:

(Internal)


Activate the Redis pub/sub listener to receive change notifications:

# Activate the listener to receive notifications (internal method)

Step 3: Activate the Listener

Activate the Redis pub/sub listener to receive change notifications:

# Activate the listener to receive notifications
await entity.volatile.activate_listener("temperature")

Step 4: Changes Automatically Published (Internal → External)

Now, whenever the volatile value changes internally, it’s automatically published to the ROS2 topic for external subscribers:

# Any update now triggers ROS2 topic publication (internal method)
await entity.volatile.set_volatile_value("temperature", 24.1)
# -> Automatically published to /module_name/volatile/temperature!

# You update internally, other modules receive via ROS2 topic subscription

Complete Example

async def setup_volatile_ros2_mapping(entity: VyraEntity):
    \"\"\"Complete example: Map volatile to ROS2 topic\"\"\"

    # Step 1: Create volatile
    await entity.volatile.set_volatile_value("sensor_data", {
        "temperature": 23.5,
        "humidity": 60.2,
        "timestamp": "2026-01-21T10:30:00"
    })

    # Step 2: Subscribe to changes
    await entity.volatile.subscribe_to_changes("sensor_data")

    # Step 3: Activate listener
    await entity.volatile.activate_listener("sensor_data")

    # Step 4: Updates are now published to ROS2
    while True:
        # Read sensor (simulated)
        new_data = read_sensor()

        # Update volatile -> triggers ROS2 publication
        await entity.volatile.set_volatile_value("sensor_data", new_data)

        await asyncio.sleep(1)  # Update every second

Unsubscribe from Changes

To stop monitoring and remove the ROS2 topic:

# Stop publishing changes to ROS2
await entity.volatile.unsubscribe_from_changes("temperature")

Channel-Listener

For advanced Pub/Sub communication:

# Listener for a channel activate
await entity.volatile.activate_listener("sensor_channel")

# Now all messages on 'sensor_channel' are received

Practical Examples

Sensor Data with ROS2 Publishing

async def sensor_monitoring_example(entity: VyraEntity):
    \"\"\"Monitor sensor data and publish changes to ROS2 topics\"\"\"

    # Setup: Create volatiles and subscribe to changes
    await entity.volatile.set_volatile_value("temperature", 20.0)
    await entity.volatile.set_volatile_value("humidity", 50.0)

    await entity.volatile.subscribe_to_changes("temperature")
    await entity.volatile.subscribe_to_changes("humidity")
    await entity.volatile.activate_listener("temperature")

    # Monitor and publish sensor data
    while True:
        # Read data from sensor (simulated)
        current_temp = read_sensor_temperature()
        current_humidity = read_sensor_humidity()

        # Update volatiles -> automatically published to ROS2 topics
        await entity.volatile.set_volatile_value("temperature", current_temp)
        await entity.volatile.set_volatile_value("humidity", current_humidity)

        # React to critical values
        if current_temp > 30:
            await entity.news_feeder.feed("⚠️ Temperature critical!")

        await asyncio.slvia ROS2

# Module A: Publish status via volatile -> ROS2 topic
async def module_a_publisher(entity_a: VyraEntity):
    # Setup volatile with ROS2 publishing
    await entity_a.volatile.set_volatile_value("module_a_status", "idle")
    await entity_a.volatile.subscribe_to_changes("module_a_status")
    await entity_a.volatile.activate_listener("module_a_status")

    # Status updates are now published to ROS2
    await entity_a.volatile.set_volatile_value("module_a_status", "running")
    await entity_a.volatile.set_volatile_value("module_a_status", "completed")

# Module B: Subscribe to Module A's ROS2 topic
async def module_b_subscriber(entity_b: VyraNode):
    # Subscribe to the ROS2 topic created by Module A
    # Topic name: /module_a/volatile/module_a_status

    def on_status_received(msg):
        print(f"Module A status: {msg}")

    entity_b.create_subscription(
        msg_type=String,  # or appropriate type
        topic='/module_a/volatile/module_a_status',
        callback=on_status_received,
        qos_profile=10
    volatile_value("shared_status")
print(f"Status of Module A: {status}")

Bemerkung

Both modules must use the same Redis server (default in the VYRA system).

Redis-Backend

Connection: Automatic via entity.storage.redis_client

Storage Location: In-Memory (RAM of the Redis server)

Persistence: None - Data is lost on Redis restart .. tip:

Redis can optionally be configured to create snapshots.
This is done via the Redis configuration, not in VYRA itself.
Further information: https://redis.io/docs/management/persistence/

Performance

Typical access times:

  • Writing: ~0.1 - 0.5 ms

  • Reading: ~0.1 - 0.3 ms

  • EveSubscribe to changes for values that other modules need

    await volatile.subscribe_to_changes(„robot_pose“) await volatile.activate_listener(„robot_pose“)

    # Always create the volatile before subscribing await volatile.set_volatile_value(„status“, „idle“) # First! await volatile.subscribe_to_changes(„status“) # Then!

Avoid:

# Don't subscribe before creating the volatile
await volatile.subscribe_to_changes("status")  # ❌ KeyError!
await volatile.set_volatile_value("status", "idle")

# Very long keys
await volatile.set_volatile_value("this_is_a_very_long_key_name...", value)

# Very large data structures (> 1 MB)
await volatile.set_volatile_value("huge_data", very_large_object)

# High-frequency subscriptions without need
for i in range(10000):
    await volatile.subscribe_to_changess
await volatile.set_volatile_value("robot_pose", {
    "x": 1.5, "y": 2.3, "theta": 0.78
})

# Change-Events for critical values
await volatile.add_change_event("emergency_stop")

Avoid:

# Very long keys
await volatile.set_volatile_value("this_is_a_very_long_key_name...", value)

# Very large data structures (> 1 MB)
await volatile.set_volatile_value("huge_data", very_large_object)

# High-frequency Events without need
for i in range(10000):
    await volatile.add_change_event(f"sensor_{i}")

Error Handling

try:
    value = await entity.volatile.get_volatile_value("nonexistent_key")
except KeyError:
    print("Key does not exist")
    # Set default value
    await entity.volatile.set_volatile_value("nonexistent_key", 0)
except Exception as e:
    print(f"Redis connection error: {e}")

Further Information