vyra_base.core package

Submodules

vyra_base.core.entity module

class vyra_base.core.entity.VyraEntity(**kwargs)[Quellcode]

Bases: object

Base class for all V.Y.R.A. entities .

This class initializes the entity with a ROS2 node, state, news, and error feeders. It also provides methods to register remote callables and manage interfaces. It is designed to be extended by specific entities that require additional functionality.

Variablen:
  • node (VyraNode) – The ROS2 node for the entity.

  • state_feeder (StateFeeder) – Feeder for state updates.

  • news_feeder (NewsFeeder) – Feeder for news updates.

  • error_feeder (ErrorFeeder) – Feeder for error updates.

  • state_machine (UnifiedStateMachine) – State machine for managing entity states.

  • security_manager (Optional[SecurityManager]) – Security manager for authentication and session management (optional).

  • _interface_list (list[FunctionConfigEntry]) – List of interface configurations.

  • _storage_list (list[Storage]) – List of registered storage objects.

Verursacht:

RuntimeError – If the node name is already available in the ROS2 system.

__init__(**kwargs)

Wrapper for synchronous functions.

Parameter:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Rückgabe:

Result of the wrapped function.

property node: VyraNode | None

Get the ROS2 node of the entity.

Rückgabe:

The ROS2 node.

Rückgabetyp:

VyraNode

property namespace: str

Get the namespace of the ROS2 node.

Rückgabe:

The namespace of the node.

Rückgabetyp:

str

set_interface_paths(interface_paths)[Quellcode]

Set interface base paths for dynamic interface discovery.

Updates the global InterfacePathRegistry used by all TopicBuilders and InterfaceLoaders in this entity’s protocol providers.

Must be called BEFORE set_interfaces() to affect interface loading.

Parameter:

interface_paths (list[str | Path]) – List of base interface directory paths. Each path should point to a directory containing: - /config/.json - Interface metadata - /service/.srv - ROS2 service definitions - /publisher/.msg - ROS2 message definitions - /actionServer/.action - ROS2 action definitions - /proto/*.proto - Protocol Buffer definitions - /proto/*_pb2.py - Generated Python protobuf modules

Verursacht:

ValueError – If no valid paths provided

Rückgabetyp:

None

Examples

>>> from ament_index_python.packages import get_package_share_directory
>>> entity = VyraEntity(...)
>>>
>>> # Set custom interface paths
>>> module_interfaces = Path(get_package_share_directory("v2_modulemanager_interfaces"))
>>> vyra_interfaces = Path(get_package_share_directory("vyra_module_template_interfaces"))
>>> entity.set_interface_paths([module_interfaces, vyra_interfaces])
>>>
>>> # Now set_interfaces() will use dynamic loading from these paths
>>> await entity.set_interfaces(base_interfaces)
async startup_entity()[Quellcode]

Execute the entity startup sequence using the state machine.

This method performs the complete initialization sequence: 1. Start initialization (Lifecycle: Uninitialized → Initializing) 2. Initialize resources (storages, parameters, etc.) 3. Complete initialization (Lifecycle: Initializing → Active) 4. Set operational ready (Operational: Idle → Ready)

Rückgabe:

True if startup successful, False otherwise.

Rückgabetyp:

bool

async shutdown_entity()[Quellcode]

Execute the entity shutdown sequence using the state machine.

This method performs graceful shutdown: 1. Begin shutdown (Lifecycle: Active → ShuttingDown) 2. Clean up resources (operational tasks, storages, etc.) 3. Complete shutdown (Lifecycle: ShuttingDown → Deactivated)

Rückgabe:

True if shutdown successful, False otherwise.

Rückgabetyp:

bool

async setup_storage(config, transient_base_types, parameter_base_types)[Quellcode]

Set up the storage for the entity.

This method initializes the storage access for the entity, including persistent and transient storage. It should be called during the initialization of the entity.

Parameter:
  • config (dict[str, Any]) – Optional configuration for the storage setup.

  • transient_base_types (dict[str, Any]) – Dictionary containing base types for transient storage. Must be defined in the vyra module interfaces.

  • parameter_base_types (dict[str, Any])

Verursacht:
Rückgabetyp:

None

Rückgabe:

None

async set_interfaces(settings)[Quellcode]

Add communication interfaces to this module.

Protocol selection is driven by FunctionConfigTags:

  • empty tags → register on every available / applicable protocol.

  • non-empty tags → register only on the explicitly listed protocols.

Dispatch is fully delegated to InterfaceBuilder so this method stays lean and serves solely as orchestration glue.

Parameter:

settings (list[FunctionConfigEntry]) – List of FunctionConfigEntry.

Rückgabetyp:

None

bind_interface_callbacks(component, settings=None)[Quellcode]

Bind decorated callbacks from a component to interface settings.

This method discovers decorated methods (e.g., @remote_actionServer.on_goal/on_cancel/execute) from a component and binds them to the corresponding FunctionConfigEntry objects.

For ActionServers with multi-callback pattern, callbacks are stored in: - setting.metadata[‚callbacks‘][‚on_goal‘] - setting.metadata[‚callbacks‘][‚on_cancel‘] - setting.metadata[‚callbacks‘][‚execute‘]

Parameter:
  • component (Any) – Component instance with decorated methods

  • settings (Optional[list[FunctionConfigEntry]]) – Optional list of FunctionConfigEntry to update. If None, uses self._interface_list

Rückgabe:

Mapping of interface names to binding success status

Rückgabetyp:

dict[str, bool]

Example

>>> class MyComponent:
...     @remote_actionServer.on_goal(name="process")
...     async def accept_goal(self, goal_request): return True
...
...     @remote_actionServer.on_cancel(name="process")
...     async def cancel(self, goal_handle): return True
...
...     @remote_actionServer.execute(name="process")
...     async def execute(self, goal_handle): return {"done": True}
>>>
>>> component = MyComponent()
>>> entity.bind_interface_callbacks(component)
{'process/on_goal': True, 'process/on_cancel': True, 'process/execute': True}
register_storage(storage)[Quellcode]

Register a storage object to the entity.

Parameter:

storage (Storage) – The storage object to register.

Verursacht:

TypeError – If storage is not an instance of Storage.

Rückgabetyp:

None

async get_interface_list(request, response)[Quellcode]

Retrieves all capabilities (publisher, service, action) of the entity that are set to visible for external access (ROS2 service interface).

This is the external ROS2 service endpoint. For internal calls, use get_interface_list_impl() instead.

Returns a list of JSON-serialized interface configurations showing which ROS2 services, topics, and jobs are available for this module.

Parameter:
  • request (Any) – The request object (unused).

  • response (Any) – The response object to update with the result.

Rückgabe:

None

Rückgabetyp:

Any

ROS2 Service Usage:

ros2 service call /module_name/get_interface_list \
    vyra_base_interfaces/srv/GetInterfaceList "{}"

Internal Usage:

result = await entity.get_interface_list_impl()
if result:
    interfaces = [json.loads(i) for i in result["interface_list"]]
    for interface in interfaces:
        print(f"Interface: {interface['functionname']}")
        print(f"  Type: {interface['type']}")
        print(f"  Description: {interface['description']}")
async get_interface_list_impl()[Quellcode]

Retrieves all capabilities (publisher, service, action) of the entity that are set to visible for external access (internal implementation).

This method filters the registered interfaces for those marked as visible and returns them as JSON-serialized strings. Useful for service discovery and dynamic interface introspection.

Rückgabe:

Dictionary containing list of interfaces, or None on error.

Rückgabetyp:

Optional[dict]

Return format:

{
    "interface_list": [
        '{"functionname": "...", "type": "service", ...}',
        '{"functionname": "...", "type": "publisher", ...}',
        ...
    ]
}

Example:

result = await entity.get_interface_list_impl()
if result:
    for interface_json in result["interface_list"]:
        interface = json.loads(interface_json)

        if interface["type"] == "service":
            print(f"Service: {interface['functionname']}")
        elif interface["type"] == "publisher":
            print(f"Topic: {interface['functionname']}")
async get_log_history(request, response)[Quellcode]

Return recent log lines from the in-memory ring-buffer.

Useful for the Dashboard „Logs“ tab: the modulemanager polls this service every few seconds via Zenoh and streams new lines to the browser via Server-Sent Events.

Rückgabetyp:

Any

Parameter:
  • request (Any)

  • response (Any)

Request fields:

limit (int): Max number of lines to return (default 200, max 10000). since_date (str, optional): ISO date (YYYY-MM-DD) or datetime

(YYYY-MM-DDTHH:MM:SS) string. When provided without a time component the whole day (00:00:00 – 23:59:59 local) is included. Entries before this point are excluded from the result.

Response fields:
logs_json (list): List of {level, message, logger_name, timestamp, seq} dicts,

ordered oldest-to-newest. The Zenoh serializer will convert this list to JSON. Consumers should use the seq field (millisecond UNIX timestamp) to de-duplicate on polling.

Bemerkung

Previously this returned a JSON string, but that caused double-serialization when the Zenoh transport layer serialized the response object again, producing escaped backslashes that accumulated with each call.

async acknowledge_error(request, response)[Quellcode]

Acknowledge an error entry identified by its error_id.

Finds the matching entry in the error_logs ring-buffer table and sets its acknowledged column to {timestamp, user}.

Rückgabetyp:

Any

Parameter:
  • request (Any)

  • response (Any)

Request fields:

error_id (str): UUID of the error entry to acknowledge (required). user (str): Name of the acknowledging user (optional, default: "system").

Response fields:

success (bool): True when the entry was found and updated. message (str): Human-readable status message.

async get_alias(request, response)[Quellcode]

Return the human-readable alias for this module instance.

The alias is set by the module manager at startup (synced from the database) or updated at runtime via set_alias.

Rückgabetyp:

Any

Parameter:
  • request (Any)

  • response (Any)

Response fields:

alias (str): The current alias, or an empty string when not set.

async set_alias(request, response)[Quellcode]

Set the human-readable alias for this module instance.

Called by the module manager on startup to sync the persisted alias, or by the user via the browser UI to rename a module.

Rückgabetyp:

Any

Parameter:
  • request (Any)

  • response (Any)

Request fields:

alias (str): New alias string (may be empty to clear the alias).

Response fields:

success (bool): Always True after a successful update.

static register_service_callbacks(callback_parent)[Quellcode]

Registers all remote services defined in the callback_parent.

Remote services must be decorated with @remote_service.

Example:

from vyra_base.com import remote_service

class MyParentClass:
    @remote_service()
    async def my_remote_function(self, request: Any, response: Any):
        pass

instance_my_parent = MyParentClass()

To register the remote service in this example, the instance_my_parent object must be passed to this function:

  • register_services_callbacks(instance_my_parent)

Inside your MyParentClass in a method you can call the same function and set the callback_parent to self to register the services of the instance itself:

  • register_services_callbacks(self)

This function will iterate over all attributes of the instance and register those marked as remote service with the DataSpace.

Warnung

This function will only register the callbacks. Run entity.set_interfaces(your_config) afterwards to load the interfaces in vyra.

Parameter:

callback_parent (object) – The class or instance containing the remote services.

Verursacht:
  • TypeError – If callback_parent is not an instance of object.

  • ValueError – If callback_parent does not have any remote services.

  • RuntimeError – If the service registration fails.

Rückgabe:

None

Rückgabetyp:

None

Module contents

VYRA Base Core Module Provides core functionalities for VYRA applications, including entity management, ROS2 node integration, and storage handling. Provides: - VyraEntity: Core entity class integrating ROS2 node and storage

class vyra_base.core.VyraEntity(**kwargs)[Quellcode]

Bases: object

Base class for all V.Y.R.A. entities .

This class initializes the entity with a ROS2 node, state, news, and error feeders. It also provides methods to register remote callables and manage interfaces. It is designed to be extended by specific entities that require additional functionality.

Variablen:
  • node (VyraNode) – The ROS2 node for the entity.

  • state_feeder (StateFeeder) – Feeder for state updates.

  • news_feeder (NewsFeeder) – Feeder for news updates.

  • error_feeder (ErrorFeeder) – Feeder for error updates.

  • state_machine (UnifiedStateMachine) – State machine for managing entity states.

  • security_manager (Optional[SecurityManager]) – Security manager for authentication and session management (optional).

  • _interface_list (list[FunctionConfigEntry]) – List of interface configurations.

  • _storage_list (list[Storage]) – List of registered storage objects.

Verursacht:

RuntimeError – If the node name is already available in the ROS2 system.

__init__(**kwargs)

Wrapper for synchronous functions.

Parameter:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Rückgabe:

Result of the wrapped function.

property node: VyraNode | None

Get the ROS2 node of the entity.

Rückgabe:

The ROS2 node.

Rückgabetyp:

VyraNode

property namespace: str

Get the namespace of the ROS2 node.

Rückgabe:

The namespace of the node.

Rückgabetyp:

str

set_interface_paths(interface_paths)[Quellcode]

Set interface base paths for dynamic interface discovery.

Updates the global InterfacePathRegistry used by all TopicBuilders and InterfaceLoaders in this entity’s protocol providers.

Must be called BEFORE set_interfaces() to affect interface loading.

Parameter:

interface_paths (list[str | Path]) – List of base interface directory paths. Each path should point to a directory containing: - /config/.json - Interface metadata - /service/.srv - ROS2 service definitions - /publisher/.msg - ROS2 message definitions - /actionServer/.action - ROS2 action definitions - /proto/*.proto - Protocol Buffer definitions - /proto/*_pb2.py - Generated Python protobuf modules

Verursacht:

ValueError – If no valid paths provided

Rückgabetyp:

None

Examples

>>> from ament_index_python.packages import get_package_share_directory
>>> entity = VyraEntity(...)
>>>
>>> # Set custom interface paths
>>> module_interfaces = Path(get_package_share_directory("v2_modulemanager_interfaces"))
>>> vyra_interfaces = Path(get_package_share_directory("vyra_module_template_interfaces"))
>>> entity.set_interface_paths([module_interfaces, vyra_interfaces])
>>>
>>> # Now set_interfaces() will use dynamic loading from these paths
>>> await entity.set_interfaces(base_interfaces)
async startup_entity()[Quellcode]

Execute the entity startup sequence using the state machine.

This method performs the complete initialization sequence: 1. Start initialization (Lifecycle: Uninitialized → Initializing) 2. Initialize resources (storages, parameters, etc.) 3. Complete initialization (Lifecycle: Initializing → Active) 4. Set operational ready (Operational: Idle → Ready)

Rückgabe:

True if startup successful, False otherwise.

Rückgabetyp:

bool

async shutdown_entity()[Quellcode]

Execute the entity shutdown sequence using the state machine.

This method performs graceful shutdown: 1. Begin shutdown (Lifecycle: Active → ShuttingDown) 2. Clean up resources (operational tasks, storages, etc.) 3. Complete shutdown (Lifecycle: ShuttingDown → Deactivated)

Rückgabe:

True if shutdown successful, False otherwise.

Rückgabetyp:

bool

async setup_storage(config, transient_base_types, parameter_base_types)[Quellcode]

Set up the storage for the entity.

This method initializes the storage access for the entity, including persistent and transient storage. It should be called during the initialization of the entity.

Parameter:
  • config (dict[str, Any]) – Optional configuration for the storage setup.

  • transient_base_types (dict[str, Any]) – Dictionary containing base types for transient storage. Must be defined in the vyra module interfaces.

  • parameter_base_types (dict[str, Any])

Verursacht:
Rückgabetyp:

None

Rückgabe:

None

async set_interfaces(settings)[Quellcode]

Add communication interfaces to this module.

Protocol selection is driven by FunctionConfigTags:

  • empty tags → register on every available / applicable protocol.

  • non-empty tags → register only on the explicitly listed protocols.

Dispatch is fully delegated to InterfaceBuilder so this method stays lean and serves solely as orchestration glue.

Parameter:

settings (list[FunctionConfigEntry]) – List of FunctionConfigEntry.

Rückgabetyp:

None

bind_interface_callbacks(component, settings=None)[Quellcode]

Bind decorated callbacks from a component to interface settings.

This method discovers decorated methods (e.g., @remote_actionServer.on_goal/on_cancel/execute) from a component and binds them to the corresponding FunctionConfigEntry objects.

For ActionServers with multi-callback pattern, callbacks are stored in: - setting.metadata[‚callbacks‘][‚on_goal‘] - setting.metadata[‚callbacks‘][‚on_cancel‘] - setting.metadata[‚callbacks‘][‚execute‘]

Parameter:
  • component (Any) – Component instance with decorated methods

  • settings (Optional[list[FunctionConfigEntry]]) – Optional list of FunctionConfigEntry to update. If None, uses self._interface_list

Rückgabe:

Mapping of interface names to binding success status

Rückgabetyp:

dict[str, bool]

Example

>>> class MyComponent:
...     @remote_actionServer.on_goal(name="process")
...     async def accept_goal(self, goal_request): return True
...
...     @remote_actionServer.on_cancel(name="process")
...     async def cancel(self, goal_handle): return True
...
...     @remote_actionServer.execute(name="process")
...     async def execute(self, goal_handle): return {"done": True}
>>>
>>> component = MyComponent()
>>> entity.bind_interface_callbacks(component)
{'process/on_goal': True, 'process/on_cancel': True, 'process/execute': True}
register_storage(storage)[Quellcode]

Register a storage object to the entity.

Parameter:

storage (Storage) – The storage object to register.

Verursacht:

TypeError – If storage is not an instance of Storage.

Rückgabetyp:

None

async get_interface_list(request, response)[Quellcode]

Retrieves all capabilities (publisher, service, action) of the entity that are set to visible for external access (ROS2 service interface).

This is the external ROS2 service endpoint. For internal calls, use get_interface_list_impl() instead.

Returns a list of JSON-serialized interface configurations showing which ROS2 services, topics, and jobs are available for this module.

Parameter:
  • request (Any) – The request object (unused).

  • response (Any) – The response object to update with the result.

Rückgabe:

None

Rückgabetyp:

Any

ROS2 Service Usage:

ros2 service call /module_name/get_interface_list \
    vyra_base_interfaces/srv/GetInterfaceList "{}"

Internal Usage:

result = await entity.get_interface_list_impl()
if result:
    interfaces = [json.loads(i) for i in result["interface_list"]]
    for interface in interfaces:
        print(f"Interface: {interface['functionname']}")
        print(f"  Type: {interface['type']}")
        print(f"  Description: {interface['description']}")
async get_interface_list_impl()[Quellcode]

Retrieves all capabilities (publisher, service, action) of the entity that are set to visible for external access (internal implementation).

This method filters the registered interfaces for those marked as visible and returns them as JSON-serialized strings. Useful for service discovery and dynamic interface introspection.

Rückgabe:

Dictionary containing list of interfaces, or None on error.

Rückgabetyp:

Optional[dict]

Return format:

{
    "interface_list": [
        '{"functionname": "...", "type": "service", ...}',
        '{"functionname": "...", "type": "publisher", ...}',
        ...
    ]
}

Example:

result = await entity.get_interface_list_impl()
if result:
    for interface_json in result["interface_list"]:
        interface = json.loads(interface_json)

        if interface["type"] == "service":
            print(f"Service: {interface['functionname']}")
        elif interface["type"] == "publisher":
            print(f"Topic: {interface['functionname']}")
async get_log_history(request, response)[Quellcode]

Return recent log lines from the in-memory ring-buffer.

Useful for the Dashboard „Logs“ tab: the modulemanager polls this service every few seconds via Zenoh and streams new lines to the browser via Server-Sent Events.

Rückgabetyp:

Any

Parameter:
  • request (Any)

  • response (Any)

Request fields:

limit (int): Max number of lines to return (default 200, max 10000). since_date (str, optional): ISO date (YYYY-MM-DD) or datetime

(YYYY-MM-DDTHH:MM:SS) string. When provided without a time component the whole day (00:00:00 – 23:59:59 local) is included. Entries before this point are excluded from the result.

Response fields:
logs_json (list): List of {level, message, logger_name, timestamp, seq} dicts,

ordered oldest-to-newest. The Zenoh serializer will convert this list to JSON. Consumers should use the seq field (millisecond UNIX timestamp) to de-duplicate on polling.

Bemerkung

Previously this returned a JSON string, but that caused double-serialization when the Zenoh transport layer serialized the response object again, producing escaped backslashes that accumulated with each call.

async acknowledge_error(request, response)[Quellcode]

Acknowledge an error entry identified by its error_id.

Finds the matching entry in the error_logs ring-buffer table and sets its acknowledged column to {timestamp, user}.

Rückgabetyp:

Any

Parameter:
  • request (Any)

  • response (Any)

Request fields:

error_id (str): UUID of the error entry to acknowledge (required). user (str): Name of the acknowledging user (optional, default: "system").

Response fields:

success (bool): True when the entry was found and updated. message (str): Human-readable status message.

async get_alias(request, response)[Quellcode]

Return the human-readable alias for this module instance.

The alias is set by the module manager at startup (synced from the database) or updated at runtime via set_alias.

Rückgabetyp:

Any

Parameter:
  • request (Any)

  • response (Any)

Response fields:

alias (str): The current alias, or an empty string when not set.

async set_alias(request, response)[Quellcode]

Set the human-readable alias for this module instance.

Called by the module manager on startup to sync the persisted alias, or by the user via the browser UI to rename a module.

Rückgabetyp:

Any

Parameter:
  • request (Any)

  • response (Any)

Request fields:

alias (str): New alias string (may be empty to clear the alias).

Response fields:

success (bool): Always True after a successful update.

static register_service_callbacks(callback_parent)[Quellcode]

Registers all remote services defined in the callback_parent.

Remote services must be decorated with @remote_service.

Example:

from vyra_base.com import remote_service

class MyParentClass:
    @remote_service()
    async def my_remote_function(self, request: Any, response: Any):
        pass

instance_my_parent = MyParentClass()

To register the remote service in this example, the instance_my_parent object must be passed to this function:

  • register_services_callbacks(instance_my_parent)

Inside your MyParentClass in a method you can call the same function and set the callback_parent to self to register the services of the instance itself:

  • register_services_callbacks(self)

This function will iterate over all attributes of the instance and register those marked as remote service with the DataSpace.

Warnung

This function will only register the callbacks. Run entity.set_interfaces(your_config) afterwards to load the interfaces in vyra.

Parameter:

callback_parent (object) – The class or instance containing the remote services.

Verursacht:
  • TypeError – If callback_parent is not an instance of object.

  • ValueError – If callback_parent does not have any remote services.

  • RuntimeError – If the service registration fails.

Rückgabe:

None

Rückgabetyp:

None

class vyra_base.core.InterfaceBuilder[Quellcode]

Bases: object

Static factory that creates transport interfaces for a FunctionConfigEntry.

All methods are classmethods — no instance state is held.

classmethod wants_ros2(tags)[Quellcode]
Rückgabetyp:

bool

Parameter:

tags (list[FunctionConfigTags])

classmethod wants_zenoh(tags)[Quellcode]
Rückgabetyp:

bool

Parameter:

tags (list[FunctionConfigTags])

classmethod wants_redis(tags)[Quellcode]
Rückgabetyp:

bool

Parameter:

tags (list[FunctionConfigTags])

classmethod wants_uds(tags)[Quellcode]
Rückgabetyp:

bool

Parameter:

tags (list[FunctionConfigTags])

classmethod has_ros2_type(interfacetypes)[Quellcode]

Return True if interfacetypes contains at least one actual Python class (ROS2 type).

When a proto-only interface is configured, all entries in the list are strings. Only if at least one entry is a resolved class should we attempt ROS2 creation.

Rückgabetyp:

bool

Parameter:

interfacetypes (Any)

async classmethod create_service(setting, callbacks, node, ros2_available, registry=None)[Quellcode]

Create service servers for all protocols the entry’s tags request.

Parameter:
  • setting (FunctionConfigEntry) – Interface configuration entry.

  • callbacks (dict[str, Any] | None) – Dict with key 'response' holding the callback.

  • node (Any) – ROS2 node (may be None when ROS2 is unavailable).

  • ros2_available (bool) – Whether ROS2 is up and a node is present.

  • registry (ProviderRegistry | None) – Optional pre-created ProviderRegistry instance. A new one is created when None (avoids repeated look-ups when called for many interfaces in one loop).

Rückgabetyp:

None

async classmethod create_action(setting, callbacks, node, ros2_available, registry=None)[Quellcode]

Create action servers for all protocols the entry’s tags request.

Tries each tagged protocol in order (ROS2, Zenoh, Redis, UDS). A failure on one protocol is logged as a warning and the next is attempted.

Verursacht:

ValueError – When callbacks are missing (required for action servers).

Rückgabetyp:

None

Parameter:
async classmethod create_publisher(setting, node, ros2_available)[Quellcode]

Create publishers for all protocols the entry’s tags request.

Rückgabetyp:

None

Parameter:
async classmethod upgrade_service(setting, new_callbacks, registry=None)[Quellcode]

Upgrade a previously registered service that had no callback with a new one.

Called when a decorated callback is bound after set_interfaces ran (two-phase / blueprint initialisation pattern).

Rückgabetyp:

None

Parameter:
class vyra_base.core.Parameter(parameter_base_types, node, storage_access_persistant, storage_access_persistant_default=None, storage_access_transient=None)[Quellcode]

Bases: object

Managing module parameters.

Parameter:
  • parameter_base_types (dict[str, Any])

  • node (Any)

  • storage_access_persistant (Any)

  • storage_access_persistant_default (Any)

  • storage_access_transient (Any)

__init__(parameter_base_types, node, storage_access_persistant, storage_access_persistant_default=None, storage_access_transient=None)[Quellcode]

Initialize vyra module parameters. Parameter containing basic settings for the module which could be used bei the specific processes in the module and will be configured by the user or by other modules. They will be stored in a database table. Other modules could subscribe to a parameter change event to be informed about changes on a parameter.

Parameter:
  • storage_access – An object that provides access to the storage system.

  • node (Any) – The ROS2 node to attach the parameter manager to.

  • parameter_base_types (dict[str, Any]) – A dictionary of base types for parameters.

  • storage_access_persistant (Any) – The database access object for persistent parameters.

  • storage_access_transient (Any) – The database access object for transient parameters.

  • storage_access_persistant_default (Any)

Verursacht:

ValueError – If the storage_access is not of type DbAccess.

Rückgabetyp:

None

static replace_validation_rules(rules)[Quellcode]

Replace the active validation ruleset for all Parameter instances.

This modifies the module-level _validator singleton, so every Parameter instance created in this process will use the new rules.

Parameter:

rules (dict) – New ruleset dict – must match the structure of parameter_rules.yaml.

Rückgabetyp:

None

Example:

import yaml
with open("/etc/myapp/custom_param_rules.yaml") as f:
    Parameter.replace_validation_rules(yaml.safe_load(f))
static load_validation_rules_from_file(path)[Quellcode]

Load a YAML ruleset from a file and make it active.

Parameter:

path (str) – Absolute or relative path to the YAML rules file.

Rückgabetyp:

None

async load_defaults(**kwargs)

Wrapper for asynchronous functions.

Parameter:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Rückgabe:

Result of the wrapped function.

async get_parameter(request, response)[Quellcode]

Get a parameter value by its key (ROS2 service interface).

This is the external ROS2 service endpoint. For internal calls, use get_parameter_impl() instead.

Parameter:
  • request (Any) – The request object containing the key of the parameter to retrieve.

  • response (Any) – The response object to store the result.

Rückgabe:

True if successful, False otherwise.

Rückgabetyp:

bool

ROS2 Service Usage:

ros2 service call /module_name/get_parameter \
    vyra_base_interfaces/srv/GetParameter \
    "{key: 'param_name'}"

Internal Usage:

result = await param_manager.get_parameter_impl("param_name")
if result:
    print(result["value"])
async get_parameter_impl(key)[Quellcode]

Get a parameter value by its key (internal implementation).

This method contains the actual business logic for retrieving a parameter. It can be called directly from Python code without going through the ROS2 service layer.

Parameter:

key (str) – The key of the parameter to retrieve.

Rückgabe:

Dictionary with parameter data or None on error.

Rückgabetyp:

Optional[dict]

Return format:

{
    "value": "<json_serialized_parameter>",
    "success": True,
    "message": "Parameter 'key' retrieved successfully."
}

Example:

result = await param_manager.get_parameter_impl("robot_speed")
if result and result["success"]:
    param_data = json.loads(result["value"])
    print(f"Speed: {param_data['value']}")
async set_parameter(request, response)[Quellcode]

Set a parameter value by its key (ROS2 service interface).

This is the external ROS2 service endpoint. For internal calls, use set_parameter_impl() instead.

Parameter:
  • request (Any) – The request object containing key and value.

  • response (Any) – The response object to update with the result.

Rückgabe:

None

Rückgabetyp:

None

ROS2 Service Usage:

ros2 service call /module_name/set_parameter \
    vyra_base_interfaces/srv/SetParameter \
    "{key: 'robot_speed', value: '2.5'}"

Internal Usage:

result = await param_manager.set_parameter_impl("robot_speed", "2.5")
if result and result["success"]:
    print("Parameter updated!")
async set_parameter_impl(key, value)[Quellcode]

Set a parameter value by its key (internal implementation).

Validates type compatibility, min/max bounds and range constraints before writing to the database.

Parameter:
  • key (str) – The key of the parameter to set.

  • value (Any) – The value to set for the parameter.

Rückgabe:

Dictionary with success status and message, or None on error.

Rückgabetyp:

Optional[dict]

async create_new_parameter(request, response)[Quellcode]

Create a new parameter (strict mode, no upsert).

Fails if the parameter already exists.

Rückgabetyp:

None

Parameter:
  • request (Any)

  • response (Any)

async create_new_parameter_impl(key, value)[Quellcode]

Create a new parameter and fail if key already exists.

Rückgabetyp:

Optional[dict]

Parameter:
async create_parameter_full(request, response)[Quellcode]

Create a new parameter with full metadata and validation.

Rückgabetyp:

None

Parameter:
  • request (Any)

  • response (Any)

Request fields:

name (str): Parameter key (required). default_value (Any): Default value (required). type (str): Type string — one of int, float, string, bool, list, dict (required). description (str): Human-readable description (required). displayname (str, optional): Display name (max 255 chars). visible (bool, optional): Visibility flag (default False). editable (bool, optional): Editable flag (default False). min_value (str, optional): Minimum value/length constraint. max_value (str, optional): Maximum value/length constraint. range_value (dict, optional): Allowed values/types dict.

async create_parameter_full_impl(name, default_value, type_str, description, displayname=None, visible=False, editable=False, min_value=None, max_value=None, range_value=None)[Quellcode]

Create a new parameter with full metadata and strict validation.

Validation is delegated to the module-level _validator (a ParameterValidator instance loaded from parameter_rules.yaml).

Rückgabetyp:

Optional[dict]

Parameter:
  • name (str)

  • default_value (Any)

  • type_str (str)

  • description (str)

  • displayname (str | None)

  • visible (bool)

  • editable (bool)

  • min_value (Any)

  • max_value (Any)

  • range_value (dict | None)

async update_parameter_metadata(request, response)[Quellcode]

Update display metadata for an existing parameter.

Rückgabetyp:

None

Parameter:
  • request (Any)

  • response (Any)

Request fields:

name (str): Parameter key (required). displayname (str, optional): New display name. visible (bool, optional): New visibility flag. editable (bool, optional): New editable flag.

async update_parameter_metadata_impl(key, displayname=None, visible=None, editable=None, default_value=None)[Quellcode]

Update only displayname, visible, editable, and/or default_value for an existing parameter.

Rückgabetyp:

Optional[dict]

Parameter:
  • key (str)

  • displayname (str | None)

  • visible (bool | None)

  • editable (bool | None)

  • default_value (Any | None)

async delete_parameter(request, response)[Quellcode]

Delete a parameter from persistence by key.

Rückgabetyp:

None

Parameter:
  • request (Any)

  • response (Any)

Request fields:

key (str): Parameter key to delete (required).

async delete_parameter_impl(key)[Quellcode]

Delete a parameter from the database by key.

Parameter:

key (str) – The parameter name/key to delete.

Rückgabe:

Result dict with success and message, or None on internal error.

Rückgabetyp:

Optional[dict]

async read_all_params(request, response)[Quellcode]

Read all parameters (ROS2 service interface).

This is the external ROS2 service endpoint. For internal calls, use read_all_params_impl() instead.

Returns a JSON string containing all parameters with their metadata.

Parameter:
  • request (Any) – The request object (unused).

  • response (Any) – The response object to update with the result.

Rückgabe:

None

Rückgabetyp:

None

Response format:

response.all_params_json = '[{"name": "param1", "value": "value1", ...}, ...]'

ROS2 Service Usage:

ros2 service call /module_name/read_all_params \
    vyra_base_interfaces/srv/ReadAllParams "{}"

Internal Usage:

result = await param_manager.read_all_params_impl()
if result:
    params = json.loads(result["all_params_json"])
    for param in params:
        print(f"{param['name']}: {param['value']}")
async read_all_params_impl()[Quellcode]

Read all parameters (internal implementation).

This method retrieves all parameters from the database and serializes them to JSON format.

Rückgabe:

Dictionary containing all parameters as JSON string, or None on error.

Rückgabetyp:

Optional[dict]

Return format:

{
    "all_params_json": '[{"name": "...", "value": "...", "type": "..."}, ...]'
}

Example:

result = await param_manager.read_all_params_impl()
if result:
    params = json.loads(result["all_params_json"])
    for param in params:
        print(f"Parameter: {param['name']}")
        print(f"  Value: {param['value']}")
        print(f"  Type: {param['type']}")
after_update_param_callback(**kwargs)

Wrapper for synchronous functions.

Parameter:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Rückgabe:

Result of the wrapped function.

async param_changed_topic(request, response)[Quellcode]

Get the ROS2 topic name for parameter change events (ROS2 service interface).

This is the external ROS2 service endpoint. For internal calls, use param_changed_topic_impl() instead.

Returns the topic name where parameter change notifications are published. Other modules can subscribe to this topic to receive updates.

Parameter:
  • request (Any) – The request object (unused).

  • response (Any) – The response object to update with the result.

Rückgabe:

None

Rückgabetyp:

None

ROS2 Service Usage:

ros2 service call /module_name/param_changed_topic \
    vyra_base_interfaces/srv/ParamChangedTopic "{}"

Internal Usage:

result = await param_manager.param_changed_topic_impl()
if result:
    topic = result["topic"]
    print(f"Subscribe to parameter changes: {topic}")
async param_changed_topic_impl()[Quellcode]

Get the ROS2 topic name for parameter change events (internal implementation).

This method retrieves the topic name from the internal publisher server. The topic is used to broadcast parameter changes to other modules.

Rückgabe:

Dictionary containing the topic name, or None on error.

Rückgabetyp:

Optional[dict]

Return format:

{
    "topic": "/module_name/param_update_event"
}

Example:

result = await param_manager.param_changed_topic_impl()
if result:
    # Subscribe to parameter change events
    node.create_subscription(
        UpdateParamEvent,
        result["topic"],
        callback=on_param_changed
    )
async has_parameter(key)[Quellcode]

Check if a parameter with the given key exists.

Parameter:

key (str) – The key of the parameter to check.

Rückgabe:

True if the parameter exists, False otherwise.

Rückgabetyp:

bool

Example:

if await param_manager.has_parameter("robot_speed"):
    result = await param_manager.get_parameter_impl("robot_speed")
class vyra_base.core.Volatile(storage_access_transient, module_name, module_id, node, transient_base_types)[Quellcode]

Bases: object

A class to manage volatile parameters. Volatile parameters are temporary parameters that are not persisted in the database. They are stored in Redis and can be used for temporary data storage. This class provides methods to read, write, and manage volatile parameters. It also provides methods to subscribe to changes on volatile parameters. The volatile parameters are identified by their keys, which are strings. The class uses Redis as the storage backend for volatile parameters. API:

  • Volatiles could only be written by the module itself.

  • Volatiles could be subscribed by other modules to get notified on changes.

  • Volatiles are not persisted and will be lost on system restart.

  • Volatiles are not shared between modules, each module has its own set of volatiles.

  • Volatiles are identified by their keys, which are strings.

  • Volatiles could be of different types, such as string, hash, list, set.

Example usage:
  • current robot state

  • io states

  • status information

  • temporary data storage

  • any custom data defined by the module.

Parameter:
  • storage_access_transient (RedisClient)

  • module_name (str)

  • module_id (str)

  • node (Optional[Any])

  • transient_base_types (dict[str, Any])

EVENT_TOPIC_PREFIX = 'volatile/'
__init__(storage_access_transient, module_name, module_id, node, transient_base_types)[Quellcode]

Initialize the Volatile class.

Parameter:
async cleanup()[Quellcode]

Async cleanup method for proper resource cleanup.

async activate_listener(**kwargs)

Wrapper for asynchronous functions.

Parameter:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Rückgabe:

Result of the wrapped function.

async deactivate_listener(channel)[Quellcode]

Deactivate the Redis pub/sub listener for volatile parameter changes.

This method stops listening for changes on all registered volatile parameters. After calling this, no further change notifications will be received.

Parameter:

channel (str | list[str])

async on_volatile_change_received(**kwargs)

Wrapper for asynchronous functions.

Parameter:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Rückgabe:

Result of the wrapped function.

async read_all_volatile_names(**kwargs)

Wrapper for asynchronous functions.

Parameter:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Rückgabe:

Result of the wrapped function.

async set_volatile_value(**kwargs)

Wrapper for asynchronous functions.

Parameter:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Rückgabe:

Result of the wrapped function.

async get_volatile_value(**kwargs)

Wrapper for asynchronous functions.

Parameter:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Rückgabe:

Result of the wrapped function.

async publish_volatile_to_ros2(**kwargs)

Wrapper for asynchronous functions.

Parameter:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Rückgabe:

Result of the wrapped function.

async unsubscribe_from_changes(**kwargs)

Wrapper for asynchronous functions.

Parameter:
  • args – Positional arguments.

  • kwargs – Keyword arguments.

Rückgabe:

Result of the wrapped function.

async has_volatile(key)[Quellcode]

Check if a volatile parameter with the given key exists in Redis.

Parameter:

key (str) – The key of the volatile parameter to check.

Rückgabe:

True if the volatile exists, False otherwise.

Rückgabetyp:

bool

Example:

if await volatile.has_volatile("temperature"):
    value = await volatile.get_volatile_value("temperature")
async get_volatile_impl(key)[Quellcode]
Rückgabetyp:

dict[str, Any]

Parameter:

key (str)

async set_volatile_impl(key, value)[Quellcode]
Rückgabetyp:

dict[str, Any]

Parameter:
async create_new_volatile_impl(key, value)[Quellcode]
Rückgabetyp:

dict[str, Any]

Parameter:
async read_all_volatiles_impl()[Quellcode]
Rückgabetyp:

dict[str, Any]

async get_volatile(request, response)[Quellcode]

Get the current value of a volatile parameter (remote service interface).

Parameter:
  • request (Any) – Request object with key attribute.

  • response (Any) – Response object updated with success, message, value.

Rückgabetyp:

None

async set_volatile(request, response)[Quellcode]

Set the value of a volatile parameter (remote service interface).

Parameter:
  • request (Any) – Request object with key and value attributes.

  • response (Any) – Response object updated with success and message.

Rückgabetyp:

None

async create_new_volatile(request, response)[Quellcode]

Create a new volatile key (fails if key already exists).

Rückgabetyp:

None

Parameter:
  • request (Any)

  • response (Any)

async delete_volatile(request, response)[Quellcode]

Delete a volatile parameter from Redis by key.

Rückgabetyp:

None

Parameter:
  • request (Any)

  • response (Any)

Request fields:

key (str): Volatile key to delete (required).

async delete_volatile_impl(key)[Quellcode]

Delete a volatile parameter from Redis.

Parameter:

key (str) – The volatile key to delete.

Rückgabe:

Result dict with success and message.

Rückgabetyp:

dict[str, Any]

async read_all_volatiles(request, response)[Quellcode]

Read all volatile parameters with their current values (remote service interface).

Returns a JSON-encoded list of objects with key and value fields.

Parameter:
  • request (Any) – Request object (unused).

  • response (Any) – Response object updated with all_volatiles_json.

Rückgabetyp:

None