vyra_base.core package¶
Submodules¶
vyra_base.core.entity module¶
- class vyra_base.core.entity.VyraEntity(**kwargs)[source]
Bases:
objectBase class for all V.Y.R.A. entities .
This class initializes the entity with a ROS2 node, state, news, and error feeders. It also provides methods to register remote callables and manage interfaces. It is designed to be extended by specific entities that require additional functionality.
- Variables:
node (VyraNode) – The ROS2 node for the entity.
state_feeder (StateFeeder) – Feeder for state updates.
news_feeder (NewsFeeder) – Feeder for news updates.
error_feeder (ErrorFeeder) – Feeder for error updates.
state_machine (UnifiedStateMachine) – State machine for managing entity states.
security_manager (Optional[SecurityManager]) – Security manager for authentication and session management (optional).
_interface_list (list[FunctionConfigEntry]) – List of interface configurations.
_storage_list (list[Storage]) – List of registered storage objects.
- Raises:
RuntimeError – If the node name is already available in the ROS2 system.
- __init__(**kwargs)
Wrapper for synchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.
- property node: VyraNode | None
Get the ROS2 node of the entity.
- Returns:
The ROS2 node.
- Return type:
- property namespace: str
Get the namespace of the ROS2 node.
- Returns:
The namespace of the node.
- Return type:
- set_interface_paths(interface_paths)[source]
Set interface base paths for dynamic interface discovery.
Updates the global InterfacePathRegistry used by all TopicBuilders and InterfaceLoaders in this entity’s protocol providers.
Must be called BEFORE set_interfaces() to affect interface loading.
- Parameters:
interface_paths (
list[str|Path]) – List of base interface directory paths. Each path should point to a directory containing: - /config/.json - Interface metadata - /service/.srv - ROS2 service definitions - /publisher/.msg - ROS2 message definitions - /actionServer/.action - ROS2 action definitions - /proto/*.proto - Protocol Buffer definitions - /proto/*_pb2.py - Generated Python protobuf modules- Raises:
ValueError – If no valid paths provided
- Return type:
Examples
>>> from ament_index_python.packages import get_package_share_directory >>> entity = VyraEntity(...) >>> >>> # Set custom interface paths >>> module_interfaces = Path(get_package_share_directory("v2_modulemanager_interfaces")) >>> vyra_interfaces = Path(get_package_share_directory("vyra_module_template_interfaces")) >>> entity.set_interface_paths([module_interfaces, vyra_interfaces]) >>> >>> # Now set_interfaces() will use dynamic loading from these paths >>> await entity.set_interfaces(base_interfaces)
- async startup_entity()[source]
Execute the entity startup sequence using the state machine.
This method performs the complete initialization sequence: 1. Start initialization (Lifecycle: Uninitialized → Initializing) 2. Initialize resources (storages, parameters, etc.) 3. Complete initialization (Lifecycle: Initializing → Active) 4. Set operational ready (Operational: Idle → Ready)
- Returns:
True if startup successful, False otherwise.
- Return type:
- async shutdown_entity()[source]
Execute the entity shutdown sequence using the state machine.
This method performs graceful shutdown: 1. Begin shutdown (Lifecycle: Active → ShuttingDown) 2. Clean up resources (operational tasks, storages, etc.) 3. Complete shutdown (Lifecycle: ShuttingDown → Deactivated)
- Returns:
True if shutdown successful, False otherwise.
- Return type:
- async setup_storage(config, transient_base_types, parameter_base_types)[source]
Set up the storage for the entity.
This method initializes the storage access for the entity, including persistent and transient storage. It should be called during the initialization of the entity.
- Parameters:
- Raises:
ValueError – If the configuration is invalid or incomplete.
RuntimeError – If the storage setup fails.
- Return type:
- Returns:
None
- async set_interfaces(settings)[source]
Add communication interfaces to this module.
Protocol selection is driven by
FunctionConfigTags:empty tags → register on every available / applicable protocol.
non-empty tags → register only on the explicitly listed protocols.
Dispatch is fully delegated to
InterfaceBuilderso this method stays lean and serves solely as orchestration glue.- Parameters:
settings (
list[FunctionConfigEntry]) – List ofFunctionConfigEntry.- Return type:
- bind_interface_callbacks(component, settings=None)[source]
Bind decorated callbacks from a component to interface settings.
This method discovers decorated methods (e.g., @remote_actionServer.on_goal/on_cancel/execute) from a component and binds them to the corresponding FunctionConfigEntry objects.
For ActionServers with multi-callback pattern, callbacks are stored in: - setting.metadata[‘callbacks’][‘on_goal’] - setting.metadata[‘callbacks’][‘on_cancel’] - setting.metadata[‘callbacks’][‘execute’]
- Parameters:
component (
Any) – Component instance with decorated methodssettings (
Optional[list[FunctionConfigEntry]]) – Optional list of FunctionConfigEntry to update. If None, uses self._interface_list
- Returns:
Mapping of interface names to binding success status
- Return type:
Example
>>> class MyComponent: ... @remote_actionServer.on_goal(name="process") ... async def accept_goal(self, goal_request): return True ... ... @remote_actionServer.on_cancel(name="process") ... async def cancel(self, goal_handle): return True ... ... @remote_actionServer.execute(name="process") ... async def execute(self, goal_handle): return {"done": True} >>> >>> component = MyComponent() >>> entity.bind_interface_callbacks(component) {'process/on_goal': True, 'process/on_cancel': True, 'process/execute': True}
- register_storage(storage)[source]
Register a storage object to the entity.
- async get_interface_list(request, response)[source]
Retrieves all capabilities (publisher, service, action) of the entity that are set to visible for external access (ROS2 service interface).
This is the external ROS2 service endpoint. For internal calls, use
get_interface_list_impl()instead.Returns a list of JSON-serialized interface configurations showing which ROS2 services, topics, and jobs are available for this module.
- Parameters:
- Returns:
None
- Return type:
ROS2 Service Usage:
ros2 service call /module_name/get_interface_list \ vyra_base_interfaces/srv/GetInterfaceList "{}"
Internal Usage:
result = await entity.get_interface_list_impl() if result: interfaces = [json.loads(i) for i in result["interface_list"]] for interface in interfaces: print(f"Interface: {interface['functionname']}") print(f" Type: {interface['type']}") print(f" Description: {interface['description']}")
- async get_interface_list_impl()[source]
Retrieves all capabilities (publisher, service, action) of the entity that are set to visible for external access (internal implementation).
This method filters the registered interfaces for those marked as visible and returns them as JSON-serialized strings. Useful for service discovery and dynamic interface introspection.
Return format:
{ "interface_list": [ '{"functionname": "...", "type": "service", ...}', '{"functionname": "...", "type": "publisher", ...}', ... ] }
Example:
result = await entity.get_interface_list_impl() if result: for interface_json in result["interface_list"]: interface = json.loads(interface_json) if interface["type"] == "service": print(f"Service: {interface['functionname']}") elif interface["type"] == "publisher": print(f"Topic: {interface['functionname']}")
- async get_log_history(request, response)[source]
Return recent log lines from the in-memory ring-buffer.
Useful for the Dashboard “Logs” tab: the modulemanager polls this service every few seconds via Zenoh and streams new lines to the browser via Server-Sent Events.
- Request fields:
limit (int): Max number of lines to return (default 200, max 10000). since_date (str, optional): ISO date (
YYYY-MM-DD) or datetime(
YYYY-MM-DDTHH:MM:SS) string. When provided without a time component the whole day (00:00:00 – 23:59:59 local) is included. Entries before this point are excluded from the result.- Response fields:
- logs_json (list): List of
{level, message, logger_name, timestamp, seq}dicts, ordered oldest-to-newest. The Zenoh serializer will convert this list to JSON. Consumers should use the
seqfield (millisecond UNIX timestamp) to de-duplicate on polling.
- logs_json (list): List of
Note
Previously this returned a JSON string, but that caused double-serialization when the Zenoh transport layer serialized the response object again, producing escaped backslashes that accumulated with each call.
- async acknowledge_error(request, response)[source]
Acknowledge an error entry identified by its
error_id.Finds the matching entry in the
error_logsring-buffer table and sets itsacknowledgedcolumn to{timestamp, user}.- Request fields:
error_id (str): UUID of the error entry to acknowledge (required). user (str): Name of the acknowledging user (optional, default:
"system").- Response fields:
success (bool):
Truewhen the entry was found and updated. message (str): Human-readable status message.
- async get_alias(request, response)[source]
Return the human-readable alias for this module instance.
The alias is set by the module manager at startup (synced from the database) or updated at runtime via
set_alias.- Response fields:
alias (str): The current alias, or an empty string when not set.
- async set_alias(request, response)[source]
Set the human-readable alias for this module instance.
Called by the module manager on startup to sync the persisted alias, or by the user via the browser UI to rename a module.
- Request fields:
alias (str): New alias string (may be empty to clear the alias).
- Response fields:
success (bool): Always
Trueafter a successful update.
- static register_service_callbacks(callback_parent)[source]
Registers all remote services defined in the callback_parent.
Remote services must be decorated with
@remote_service.Example:
from vyra_base.com import remote_service class MyParentClass: @remote_service() async def my_remote_function(self, request: Any, response: Any): pass instance_my_parent = MyParentClass()
To register the remote service in this example, the instance_my_parent object must be passed to this function:
register_services_callbacks(instance_my_parent)
Inside your MyParentClass in a method you can call the same function and set the callback_parent to self to register the services of the instance itself:
register_services_callbacks(self)
This function will iterate over all attributes of the instance and register those marked as remote service with the DataSpace.
Warning
This function will only register the callbacks. Run
entity.set_interfaces(your_config)afterwards to load the interfaces in vyra.- Parameters:
callback_parent (
object) – The class or instance containing the remote services.- Raises:
TypeError – If callback_parent is not an instance of object.
ValueError – If callback_parent does not have any remote services.
RuntimeError – If the service registration fails.
- Returns:
None
- Return type:
None
Module contents¶
VYRA Base Core Module Provides core functionalities for VYRA applications, including entity management, ROS2 node integration, and storage handling. Provides: - VyraEntity: Core entity class integrating ROS2 node and storage
- class vyra_base.core.VyraEntity(**kwargs)[source]
Bases:
objectBase class for all V.Y.R.A. entities .
This class initializes the entity with a ROS2 node, state, news, and error feeders. It also provides methods to register remote callables and manage interfaces. It is designed to be extended by specific entities that require additional functionality.
- Variables:
node (VyraNode) – The ROS2 node for the entity.
state_feeder (StateFeeder) – Feeder for state updates.
news_feeder (NewsFeeder) – Feeder for news updates.
error_feeder (ErrorFeeder) – Feeder for error updates.
state_machine (UnifiedStateMachine) – State machine for managing entity states.
security_manager (Optional[SecurityManager]) – Security manager for authentication and session management (optional).
_interface_list (list[FunctionConfigEntry]) – List of interface configurations.
_storage_list (list[Storage]) – List of registered storage objects.
- Raises:
RuntimeError – If the node name is already available in the ROS2 system.
- __init__(**kwargs)
Wrapper for synchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.
- property node: VyraNode | None
Get the ROS2 node of the entity.
- Returns:
The ROS2 node.
- Return type:
- property namespace: str
Get the namespace of the ROS2 node.
- Returns:
The namespace of the node.
- Return type:
- set_interface_paths(interface_paths)[source]
Set interface base paths for dynamic interface discovery.
Updates the global InterfacePathRegistry used by all TopicBuilders and InterfaceLoaders in this entity’s protocol providers.
Must be called BEFORE set_interfaces() to affect interface loading.
- Parameters:
interface_paths (
list[str|Path]) – List of base interface directory paths. Each path should point to a directory containing: - /config/.json - Interface metadata - /service/.srv - ROS2 service definitions - /publisher/.msg - ROS2 message definitions - /actionServer/.action - ROS2 action definitions - /proto/*.proto - Protocol Buffer definitions - /proto/*_pb2.py - Generated Python protobuf modules- Raises:
ValueError – If no valid paths provided
- Return type:
Examples
>>> from ament_index_python.packages import get_package_share_directory >>> entity = VyraEntity(...) >>> >>> # Set custom interface paths >>> module_interfaces = Path(get_package_share_directory("v2_modulemanager_interfaces")) >>> vyra_interfaces = Path(get_package_share_directory("vyra_module_template_interfaces")) >>> entity.set_interface_paths([module_interfaces, vyra_interfaces]) >>> >>> # Now set_interfaces() will use dynamic loading from these paths >>> await entity.set_interfaces(base_interfaces)
- async startup_entity()[source]
Execute the entity startup sequence using the state machine.
This method performs the complete initialization sequence: 1. Start initialization (Lifecycle: Uninitialized → Initializing) 2. Initialize resources (storages, parameters, etc.) 3. Complete initialization (Lifecycle: Initializing → Active) 4. Set operational ready (Operational: Idle → Ready)
- Returns:
True if startup successful, False otherwise.
- Return type:
- async shutdown_entity()[source]
Execute the entity shutdown sequence using the state machine.
This method performs graceful shutdown: 1. Begin shutdown (Lifecycle: Active → ShuttingDown) 2. Clean up resources (operational tasks, storages, etc.) 3. Complete shutdown (Lifecycle: ShuttingDown → Deactivated)
- Returns:
True if shutdown successful, False otherwise.
- Return type:
- async setup_storage(config, transient_base_types, parameter_base_types)[source]
Set up the storage for the entity.
This method initializes the storage access for the entity, including persistent and transient storage. It should be called during the initialization of the entity.
- Parameters:
- Raises:
ValueError – If the configuration is invalid or incomplete.
RuntimeError – If the storage setup fails.
- Return type:
- Returns:
None
- async set_interfaces(settings)[source]
Add communication interfaces to this module.
Protocol selection is driven by
FunctionConfigTags:empty tags → register on every available / applicable protocol.
non-empty tags → register only on the explicitly listed protocols.
Dispatch is fully delegated to
InterfaceBuilderso this method stays lean and serves solely as orchestration glue.- Parameters:
settings (
list[FunctionConfigEntry]) – List ofFunctionConfigEntry.- Return type:
- bind_interface_callbacks(component, settings=None)[source]
Bind decorated callbacks from a component to interface settings.
This method discovers decorated methods (e.g., @remote_actionServer.on_goal/on_cancel/execute) from a component and binds them to the corresponding FunctionConfigEntry objects.
For ActionServers with multi-callback pattern, callbacks are stored in: - setting.metadata[‘callbacks’][‘on_goal’] - setting.metadata[‘callbacks’][‘on_cancel’] - setting.metadata[‘callbacks’][‘execute’]
- Parameters:
component (
Any) – Component instance with decorated methodssettings (
Optional[list[FunctionConfigEntry]]) – Optional list of FunctionConfigEntry to update. If None, uses self._interface_list
- Returns:
Mapping of interface names to binding success status
- Return type:
Example
>>> class MyComponent: ... @remote_actionServer.on_goal(name="process") ... async def accept_goal(self, goal_request): return True ... ... @remote_actionServer.on_cancel(name="process") ... async def cancel(self, goal_handle): return True ... ... @remote_actionServer.execute(name="process") ... async def execute(self, goal_handle): return {"done": True} >>> >>> component = MyComponent() >>> entity.bind_interface_callbacks(component) {'process/on_goal': True, 'process/on_cancel': True, 'process/execute': True}
- register_storage(storage)[source]
Register a storage object to the entity.
- async get_interface_list(request, response)[source]
Retrieves all capabilities (publisher, service, action) of the entity that are set to visible for external access (ROS2 service interface).
This is the external ROS2 service endpoint. For internal calls, use
get_interface_list_impl()instead.Returns a list of JSON-serialized interface configurations showing which ROS2 services, topics, and jobs are available for this module.
- Parameters:
- Returns:
None
- Return type:
ROS2 Service Usage:
ros2 service call /module_name/get_interface_list \ vyra_base_interfaces/srv/GetInterfaceList "{}"
Internal Usage:
result = await entity.get_interface_list_impl() if result: interfaces = [json.loads(i) for i in result["interface_list"]] for interface in interfaces: print(f"Interface: {interface['functionname']}") print(f" Type: {interface['type']}") print(f" Description: {interface['description']}")
- async get_interface_list_impl()[source]
Retrieves all capabilities (publisher, service, action) of the entity that are set to visible for external access (internal implementation).
This method filters the registered interfaces for those marked as visible and returns them as JSON-serialized strings. Useful for service discovery and dynamic interface introspection.
Return format:
{ "interface_list": [ '{"functionname": "...", "type": "service", ...}', '{"functionname": "...", "type": "publisher", ...}', ... ] }
Example:
result = await entity.get_interface_list_impl() if result: for interface_json in result["interface_list"]: interface = json.loads(interface_json) if interface["type"] == "service": print(f"Service: {interface['functionname']}") elif interface["type"] == "publisher": print(f"Topic: {interface['functionname']}")
- async get_log_history(request, response)[source]
Return recent log lines from the in-memory ring-buffer.
Useful for the Dashboard “Logs” tab: the modulemanager polls this service every few seconds via Zenoh and streams new lines to the browser via Server-Sent Events.
- Request fields:
limit (int): Max number of lines to return (default 200, max 10000). since_date (str, optional): ISO date (
YYYY-MM-DD) or datetime(
YYYY-MM-DDTHH:MM:SS) string. When provided without a time component the whole day (00:00:00 – 23:59:59 local) is included. Entries before this point are excluded from the result.- Response fields:
- logs_json (list): List of
{level, message, logger_name, timestamp, seq}dicts, ordered oldest-to-newest. The Zenoh serializer will convert this list to JSON. Consumers should use the
seqfield (millisecond UNIX timestamp) to de-duplicate on polling.
- logs_json (list): List of
Note
Previously this returned a JSON string, but that caused double-serialization when the Zenoh transport layer serialized the response object again, producing escaped backslashes that accumulated with each call.
- async acknowledge_error(request, response)[source]
Acknowledge an error entry identified by its
error_id.Finds the matching entry in the
error_logsring-buffer table and sets itsacknowledgedcolumn to{timestamp, user}.- Request fields:
error_id (str): UUID of the error entry to acknowledge (required). user (str): Name of the acknowledging user (optional, default:
"system").- Response fields:
success (bool):
Truewhen the entry was found and updated. message (str): Human-readable status message.
- async get_alias(request, response)[source]
Return the human-readable alias for this module instance.
The alias is set by the module manager at startup (synced from the database) or updated at runtime via
set_alias.- Response fields:
alias (str): The current alias, or an empty string when not set.
- async set_alias(request, response)[source]
Set the human-readable alias for this module instance.
Called by the module manager on startup to sync the persisted alias, or by the user via the browser UI to rename a module.
- Request fields:
alias (str): New alias string (may be empty to clear the alias).
- Response fields:
success (bool): Always
Trueafter a successful update.
- static register_service_callbacks(callback_parent)[source]
Registers all remote services defined in the callback_parent.
Remote services must be decorated with
@remote_service.Example:
from vyra_base.com import remote_service class MyParentClass: @remote_service() async def my_remote_function(self, request: Any, response: Any): pass instance_my_parent = MyParentClass()
To register the remote service in this example, the instance_my_parent object must be passed to this function:
register_services_callbacks(instance_my_parent)
Inside your MyParentClass in a method you can call the same function and set the callback_parent to self to register the services of the instance itself:
register_services_callbacks(self)
This function will iterate over all attributes of the instance and register those marked as remote service with the DataSpace.
Warning
This function will only register the callbacks. Run
entity.set_interfaces(your_config)afterwards to load the interfaces in vyra.- Parameters:
callback_parent (
object) – The class or instance containing the remote services.- Raises:
TypeError – If callback_parent is not an instance of object.
ValueError – If callback_parent does not have any remote services.
RuntimeError – If the service registration fails.
- Returns:
None
- Return type:
None
- class vyra_base.core.InterfaceBuilder[source]
Bases:
objectStatic factory that creates transport interfaces for a
FunctionConfigEntry.All methods are classmethods — no instance state is held.
- classmethod wants_ros2(tags)[source]
- Return type:
- Parameters:
tags (list[FunctionConfigTags])
- classmethod wants_zenoh(tags)[source]
- Return type:
- Parameters:
tags (list[FunctionConfigTags])
- classmethod wants_redis(tags)[source]
- Return type:
- Parameters:
tags (list[FunctionConfigTags])
- classmethod wants_uds(tags)[source]
- Return type:
- Parameters:
tags (list[FunctionConfigTags])
- classmethod has_ros2_type(interfacetypes)[source]
Return True if interfacetypes contains at least one actual Python class (ROS2 type).
When a proto-only interface is configured, all entries in the list are strings. Only if at least one entry is a resolved class should we attempt ROS2 creation.
- async classmethod create_service(setting, callbacks, node, ros2_available, registry=None)[source]
Create service servers for all protocols the entry’s tags request.
- Parameters:
setting (
FunctionConfigEntry) – Interface configuration entry.callbacks (
dict[str,Any] |None) – Dict with key'response'holding the callback.node (
Any) – ROS2 node (may beNonewhen ROS2 is unavailable).ros2_available (
bool) – Whether ROS2 is up and a node is present.registry (
ProviderRegistry|None) – Optional pre-createdProviderRegistryinstance. A new one is created when None (avoids repeated look-ups when called for many interfaces in one loop).
- Return type:
- async classmethod create_action(setting, callbacks, node, ros2_available, registry=None)[source]
Create action servers for all protocols the entry’s tags request.
Tries each tagged protocol in order (ROS2, Zenoh, Redis, UDS). A failure on one protocol is logged as a warning and the next is attempted.
- Raises:
ValueError – When callbacks are missing (required for action servers).
- Return type:
- Parameters:
setting (FunctionConfigEntry)
node (Any)
ros2_available (bool)
registry (ProviderRegistry | None)
- async classmethod create_publisher(setting, node, ros2_available)[source]
Create publishers for all protocols the entry’s tags request.
- Return type:
- Parameters:
setting (FunctionConfigEntry)
node (Any)
ros2_available (bool)
- async classmethod upgrade_service(setting, new_callbacks, registry=None)[source]
Upgrade a previously registered service that had no callback with a new one.
Called when a decorated callback is bound after
set_interfacesran (two-phase / blueprint initialisation pattern).- Return type:
- Parameters:
setting (FunctionConfigEntry)
registry (ProviderRegistry | None)
- class vyra_base.core.Parameter(parameter_base_types, node, storage_access_persistant, storage_access_persistant_default=None, storage_access_transient=None)[source]
Bases:
objectManaging module parameters.
- Parameters:
- __init__(parameter_base_types, node, storage_access_persistant, storage_access_persistant_default=None, storage_access_transient=None)[source]
Initialize vyra module parameters. Parameter containing basic settings for the module which could be used bei the specific processes in the module and will be configured by the user or by other modules. They will be stored in a database table. Other modules could subscribe to a parameter change event to be informed about changes on a parameter.
- Parameters:
storage_access – An object that provides access to the storage system.
node (
Any) – The ROS2 node to attach the parameter manager to.parameter_base_types (
dict[str,Any]) – A dictionary of base types for parameters.storage_access_persistant (
Any) – The database access object for persistent parameters.storage_access_transient (
Any) – The database access object for transient parameters.storage_access_persistant_default (Any)
- Raises:
ValueError – If the storage_access is not of type DbAccess.
- Return type:
None
- static replace_validation_rules(rules)[source]
Replace the active validation ruleset for all
Parameterinstances.This modifies the module-level
_validatorsingleton, so every Parameter instance created in this process will use the new rules.- Parameters:
rules (
dict) – New ruleset dict – must match the structure ofparameter_rules.yaml.- Return type:
Example:
import yaml with open("/etc/myapp/custom_param_rules.yaml") as f: Parameter.replace_validation_rules(yaml.safe_load(f))
- static load_validation_rules_from_file(path)[source]
Load a YAML ruleset from a file and make it active.
- async load_defaults(**kwargs)
Wrapper for asynchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.
- async get_parameter(request, response)[source]
Get a parameter value by its key (ROS2 service interface).
This is the external ROS2 service endpoint. For internal calls, use
get_parameter_impl()instead.- Parameters:
- Returns:
True if successful, False otherwise.
- Return type:
ROS2 Service Usage:
ros2 service call /module_name/get_parameter \ vyra_base_interfaces/srv/GetParameter \ "{key: 'param_name'}"
Internal Usage:
result = await param_manager.get_parameter_impl("param_name") if result: print(result["value"])
- async get_parameter_impl(key)[source]
Get a parameter value by its key (internal implementation).
This method contains the actual business logic for retrieving a parameter. It can be called directly from Python code without going through the ROS2 service layer.
- Parameters:
key (
str) – The key of the parameter to retrieve.- Returns:
Dictionary with parameter data or None on error.
- Return type:
Return format:
{ "value": "<json_serialized_parameter>", "success": True, "message": "Parameter 'key' retrieved successfully." }
Example:
result = await param_manager.get_parameter_impl("robot_speed") if result and result["success"]: param_data = json.loads(result["value"]) print(f"Speed: {param_data['value']}")
- async set_parameter(request, response)[source]
Set a parameter value by its key (ROS2 service interface).
This is the external ROS2 service endpoint. For internal calls, use
set_parameter_impl()instead.- Parameters:
- Returns:
None
- Return type:
ROS2 Service Usage:
ros2 service call /module_name/set_parameter \ vyra_base_interfaces/srv/SetParameter \ "{key: 'robot_speed', value: '2.5'}"
Internal Usage:
result = await param_manager.set_parameter_impl("robot_speed", "2.5") if result and result["success"]: print("Parameter updated!")
- async set_parameter_impl(key, value)[source]
Set a parameter value by its key (internal implementation).
Validates type compatibility, min/max bounds and range constraints before writing to the database.
- async create_new_parameter(request, response)[source]
Create a new parameter (strict mode, no upsert).
Fails if the parameter already exists.
- async create_new_parameter_impl(key, value)[source]
Create a new parameter and fail if key already exists.
- async create_parameter_full(request, response)[source]
Create a new parameter with full metadata and validation.
- Request fields:
name (str): Parameter key (required). default_value (Any): Default value (required). type (str): Type string — one of int, float, string, bool, list, dict (required). description (str): Human-readable description (required). displayname (str, optional): Display name (max 255 chars). visible (bool, optional): Visibility flag (default False). editable (bool, optional): Editable flag (default False). min_value (str, optional): Minimum value/length constraint. max_value (str, optional): Maximum value/length constraint. range_value (dict, optional): Allowed values/types dict.
- async create_parameter_full_impl(name, default_value, type_str, description, displayname=None, visible=False, editable=False, min_value=None, max_value=None, range_value=None)[source]
Create a new parameter with full metadata and strict validation.
Validation is delegated to the module-level
_validator(aParameterValidatorinstance loaded fromparameter_rules.yaml).
- async update_parameter_metadata(request, response)[source]
Update display metadata for an existing parameter.
- Request fields:
name (str): Parameter key (required). displayname (str, optional): New display name. visible (bool, optional): New visibility flag. editable (bool, optional): New editable flag.
- async update_parameter_metadata_impl(key, displayname=None, visible=None, editable=None, default_value=None)[source]
Update only displayname, visible, editable, and/or default_value for an existing parameter.
- async delete_parameter(request, response)[source]
Delete a parameter from persistence by key.
- Request fields:
key (str): Parameter key to delete (required).
- async delete_parameter_impl(key)[source]
Delete a parameter from the database by key.
- async read_all_params(request, response)[source]
Read all parameters (ROS2 service interface).
This is the external ROS2 service endpoint. For internal calls, use
read_all_params_impl()instead.Returns a JSON string containing all parameters with their metadata.
- Parameters:
- Returns:
None
- Return type:
Response format:
response.all_params_json = '[{"name": "param1", "value": "value1", ...}, ...]'
ROS2 Service Usage:
ros2 service call /module_name/read_all_params \ vyra_base_interfaces/srv/ReadAllParams "{}"
Internal Usage:
result = await param_manager.read_all_params_impl() if result: params = json.loads(result["all_params_json"]) for param in params: print(f"{param['name']}: {param['value']}")
- async read_all_params_impl()[source]
Read all parameters (internal implementation).
This method retrieves all parameters from the database and serializes them to JSON format.
- Returns:
Dictionary containing all parameters as JSON string, or None on error.
- Return type:
Return format:
{ "all_params_json": '[{"name": "...", "value": "...", "type": "..."}, ...]' }
Example:
result = await param_manager.read_all_params_impl() if result: params = json.loads(result["all_params_json"]) for param in params: print(f"Parameter: {param['name']}") print(f" Value: {param['value']}") print(f" Type: {param['type']}")
- after_update_param_callback(**kwargs)
Wrapper for synchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.
- async param_changed_topic(request, response)[source]
Get the ROS2 topic name for parameter change events (ROS2 service interface).
This is the external ROS2 service endpoint. For internal calls, use
param_changed_topic_impl()instead.Returns the topic name where parameter change notifications are published. Other modules can subscribe to this topic to receive updates.
- Parameters:
- Returns:
None
- Return type:
ROS2 Service Usage:
ros2 service call /module_name/param_changed_topic \ vyra_base_interfaces/srv/ParamChangedTopic "{}"
Internal Usage:
result = await param_manager.param_changed_topic_impl() if result: topic = result["topic"] print(f"Subscribe to parameter changes: {topic}")
- async param_changed_topic_impl()[source]
Get the ROS2 topic name for parameter change events (internal implementation).
This method retrieves the topic name from the internal publisher server. The topic is used to broadcast parameter changes to other modules.
Return format:
{ "topic": "/module_name/param_update_event" }
Example:
result = await param_manager.param_changed_topic_impl() if result: # Subscribe to parameter change events node.create_subscription( UpdateParamEvent, result["topic"], callback=on_param_changed )
- async has_parameter(key)[source]
Check if a parameter with the given key exists.
- Parameters:
key (
str) – The key of the parameter to check.- Returns:
True if the parameter exists, False otherwise.
- Return type:
Example:
if await param_manager.has_parameter("robot_speed"): result = await param_manager.get_parameter_impl("robot_speed")
- class vyra_base.core.Volatile(storage_access_transient, module_name, module_id, node, transient_base_types)[source]
Bases:
objectA class to manage volatile parameters. Volatile parameters are temporary parameters that are not persisted in the database. They are stored in Redis and can be used for temporary data storage. This class provides methods to read, write, and manage volatile parameters. It also provides methods to subscribe to changes on volatile parameters. The volatile parameters are identified by their keys, which are strings. The class uses Redis as the storage backend for volatile parameters. API:
Volatiles could only be written by the module itself.
Volatiles could be subscribed by other modules to get notified on changes.
Volatiles are not persisted and will be lost on system restart.
Volatiles are not shared between modules, each module has its own set of volatiles.
Volatiles are identified by their keys, which are strings.
Volatiles could be of different types, such as string, hash, list, set.
- Example usage:
current robot state
io states
status information
temporary data storage
any custom data defined by the module.
- Parameters:
storage_access_transient (RedisClient)
module_name (str)
module_id (str)
node (Optional[Any])
- EVENT_TOPIC_PREFIX = 'volatile/'
- __init__(storage_access_transient, module_name, module_id, node, transient_base_types)[source]
Initialize the Volatile class.
- async cleanup()[source]
Async cleanup method for proper resource cleanup.
- async activate_listener(**kwargs)
Wrapper for asynchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.
- async deactivate_listener(channel)[source]
Deactivate the Redis pub/sub listener for volatile parameter changes.
This method stops listening for changes on all registered volatile parameters. After calling this, no further change notifications will be received.
- async on_volatile_change_received(**kwargs)
Wrapper for asynchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.
- async read_all_volatile_names(**kwargs)
Wrapper for asynchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.
- async set_volatile_value(**kwargs)
Wrapper for asynchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.
- async get_volatile_value(**kwargs)
Wrapper for asynchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.
- async publish_volatile_to_ros2(**kwargs)
Wrapper for asynchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.
- async unsubscribe_from_changes(**kwargs)
Wrapper for asynchronous functions.
- Parameters:
args – Positional arguments.
kwargs – Keyword arguments.
- Returns:
Result of the wrapped function.
- async has_volatile(key)[source]
Check if a volatile parameter with the given key exists in Redis.
- Parameters:
key (
str) – The key of the volatile parameter to check.- Returns:
True if the volatile exists, False otherwise.
- Return type:
Example:
if await volatile.has_volatile("temperature"): value = await volatile.get_volatile_value("temperature")
- async set_volatile_impl(key, value)[source]
- async create_new_volatile_impl(key, value)[source]
- async get_volatile(request, response)[source]
Get the current value of a volatile parameter (remote service interface).
- async set_volatile(request, response)[source]
Set the value of a volatile parameter (remote service interface).
- async create_new_volatile(request, response)[source]
Create a new volatile key (fails if key already exists).
- async delete_volatile(request, response)[source]
Delete a volatile parameter from Redis by key.
- Request fields:
key (str): Volatile key to delete (required).
- async delete_volatile_impl(key)[source]
Delete a volatile parameter from Redis.
- async read_all_volatiles(request, response)[source]
Read all volatile parameters with their current values (remote service interface).
Returns a JSON-encoded list of objects with
keyandvaluefields.