from datetime import datetime
import json
import logging
from typing import Any, Optional
from sqlalchemy import event
from sqlalchemy import inspect
from vyra_base.com import InterfaceFactory, ProtocolType, remote_service
from vyra_base.helper.error_handler import ErrorTraceback
from vyra_base.storage.db_access import DBSTATUS, DbAccess
from vyra_base.storage.db_manipulator import DBReturnValue, DbManipulator
from vyra_base.storage.tb_params import Parameter as DbParameter
from vyra_base.storage.tb_params import TypeEnum
from vyra_base.core.parameter_validator import ParameterValidator, _convert_value
# Safe mapping from TypeEnum string values to actual Python types.
# Using eval(type_string) fails for "string" (Python uses "str", not "string").
_TYPE_CONVERTERS: dict = {
"int": int,
"integer": int,
"string": str,
"str": str,
"bool": bool,
"boolean": bool,
"float": float,
"list": list,
"dict": dict,
}
# Module-level singleton validator loaded from parameter_rules.yaml
_validator = ParameterValidator()
logger = logging.getLogger(__name__)
[docs]
class Parameter:
"""
Managing module parameters.
"""
[docs]
def __init__(
self,
parameter_base_types: dict[str, Any],
node: Any,
storage_access_persistant: Any,
storage_access_persistant_default: Any=None,
storage_access_transient: Any = None
) -> None:
"""
Initialize vyra module parameters. Parameter containing basic settings for the
module which could be used bei the specific processes in the module and will
be configured by the user or by other modules. They will be stored in a database
table. Other modules could subscribe to a parameter change event to be informed
about changes on a parameter.
:param storage_access: An object that provides access to the storage system.
:param node: The ROS2 node to attach the parameter manager to.
:param parameter_base_types: A dictionary of base types for parameters.
:type parameter_base_types: dict[str, Any]
:param storage_access_persistant: The database access object for persistent parameters.
:type storage_access_persistant: DbAccess
:param storage_access_transient: The database access object for transient parameters.
:type storage_access_transient: DbAccess | None
:raises ValueError: If the storage_access is not of type DbAccess.
"""
self.storage_access_persistant = storage_access_persistant
self.storage_access_persistant_default = storage_access_persistant_default
self.persistant_manipulator = DbManipulator(
storage_access_persistant,
DbParameter
)
self.parameter_base_types = parameter_base_types
self.update_param_event_ident = "param_update_event_publisher"
# Store node reference for lazy publisher initialization
self._node = node
self.update_parameter_publisher: Optional[Any] = None # Will be VYRA/Zenoh publisher
self.storage_access_transient: Any|None = storage_access_transient
async def _init_publisher(self) -> None:
"""Initialize the parameter update publisher lazily."""
if self.update_parameter_publisher is None:
self.update_parameter_publisher = await InterfaceFactory.create_publisher(
name=self.update_param_event_ident,
protocols=[ProtocolType.ZENOH],
node=self._node,
is_publisher=True
)
[docs]
@staticmethod
def replace_validation_rules(rules: dict) -> None:
"""
Replace the active validation ruleset for all :class:`Parameter` instances.
This modifies the *module-level* ``_validator`` singleton, so every
Parameter instance created in this process will use the new rules.
:param rules: New ruleset dict – must match the structure of
``parameter_rules.yaml``.
Example::
import yaml
with open("/etc/myapp/custom_param_rules.yaml") as f:
Parameter.replace_validation_rules(yaml.safe_load(f))
"""
_validator.replace_rules(rules)
[docs]
@staticmethod
def load_validation_rules_from_file(path: str) -> None:
"""
Load a YAML ruleset from a file and make it active.
:param path: Absolute or relative path to the YAML rules file.
"""
_validator.load_rules_from_file(path)
@ErrorTraceback.w_check_error_exist
async def load_defaults(
self,
storage_access_default: Optional[DbAccess] = None,
override_exist: bool = False) -> bool:
"""
Initialize parameters. Load default parameters if they does not exist.
Also add them to the transient storage.
This method should be implemented to set up initial parameters.
:param storage_access_default: The database access object for default parameters.
:param override_exist: If True, existing parameters will be overridden with defaults.
:return: None
:raises ValueError: If the storage_access_default is not of type DbAccess.
"""
if storage_access_default is not None and not isinstance(storage_access_default, DbAccess):
raise ValueError("Invalid storage_access_default. Must be of type DbAccess.")
if storage_access_default is None:
if self.storage_access_persistant_default is None:
logger.warning("No default storage access provided. Skipping default "
"parameter loading.")
return False
storage_access_default = self.storage_access_persistant_default
if not isinstance(storage_access_default, DbAccess):
raise ValueError("Invalid storage_access_default. Must be of type DbAccess.")
default_manipulator = DbManipulator(
storage_access_default,
DbParameter
)
if not await storage_access_default.check_table_exists(DbParameter):
logger.info(
f"Table '{DbParameter.__tablename__}' does not exist in the default "
"database. Skipping default parameter loading.")
return False
all_return: DBReturnValue = await self.persistant_manipulator.get_all()
if not isinstance(all_return.value, list):
raise ValueError(f"all_return.value must be a list. Is: {all_return.value}")
all_params: list[dict] = [default_manipulator.to_dict(p) for p in all_return.value]
all_default_return: DBReturnValue = await default_manipulator.get_all()
if not isinstance(all_default_return.value, list):
raise ValueError("all_default.value must be a list.")
all_default: list[dict] = [
default_manipulator.to_dict(p) for p in all_default_return.value
]
param_name_list: list[str] = [n["name"] for n in all_params]
for default_item in all_default:
if default_item["name"] not in param_name_list:
logger.info("Loading default parameter "
f"'{default_item['name']}: {default_item['value']}'")
param_obj: dict[str, Any] = {
key: default_item[key] for key in default_item if key != "id"
}
await self.persistant_manipulator.add(param_obj)
elif override_exist:
logger.info("Overriding existing parameter "
f"'{default_item['name']}: {default_item['value']}'")
param_obj: dict[str, Any] = {
key: default_item[key] for key in default_item if key != "id"
}
await self.persistant_manipulator.update(param_obj)
event.listen(DbParameter, "after_update", self.after_update_param_callback)
return True
[docs]
@remote_service()
async def get_parameter(self, request: Any, response: Any) -> bool:
"""
Get a parameter value by its key (ROS2 service interface).
This is the external ROS2 service endpoint. For internal calls,
use :meth:`get_parameter_impl` instead.
:param request: The request object containing the key of the parameter to retrieve.
:type request: Any
:param response: The response object to store the result.
:type response: Any
:return: True if successful, False otherwise.
:rtype: bool
**ROS2 Service Usage:**
.. code-block:: bash
ros2 service call /module_name/get_parameter \\
vyra_base_interfaces/srv/GetParameter \\
"{key: 'param_name'}"
**Internal Usage:**
.. code-block:: python
result = await param_manager.get_parameter_impl("param_name")
if result:
print(result["value"])
"""
key = request.key
param_return = await self.get_parameter_impl(key)
if param_return is None:
response.json_value = "[]"
response.success = False
response.message = f"Failed to retrieve parameter with key '{key}'."
return False
response.json_value = param_return["value"]
response.success = param_return["success"]
response.message = param_return["message"]
return True
[docs]
async def get_parameter_impl(self, key: str) -> Optional[dict]:
"""
Get a parameter value by its key (internal implementation).
This method contains the actual business logic for retrieving a parameter.
It can be called directly from Python code without going through the ROS2
service layer.
:param key: The key of the parameter to retrieve.
:type key: str
:return: Dictionary with parameter data or None on error.
:rtype: Optional[dict]
**Return format:**
.. code-block:: python
{
"value": "<json_serialized_parameter>",
"success": True,
"message": "Parameter 'key' retrieved successfully."
}
**Example:**
.. code-block:: python
result = await param_manager.get_parameter_impl("robot_speed")
if result and result["success"]:
param_data = json.loads(result["value"])
print(f"Speed: {param_data['value']}")
"""
response: Any = {}
response['value'] = "[]"
all_params: DBReturnValue = await self.persistant_manipulator.get_all(
filters={"name": key})
# Treat NOT_FOUND the same as a real DB error for get (parameter doesn't exist)
if all_params.status == DBSTATUS.NOT_FOUND:
all_params.status = DBSTATUS.ERROR # map to failure so caller gets proper error
if all_params.status != DBSTATUS.SUCCESS:
response['success'] = False
response['message'] = f"Failed to retrieve parameter with key '{key}'."
logger.warn(response['message'])
return None
if not isinstance(all_params.value, list):
response['success'] = False
response['message'] = "Internal error: all_params.value is not a list."
logger.warn(response['message'])
return None
all_params.value = self.persistant_manipulator.to_dict(all_params.value[0])
response['message'] = f"Parameter '{key}' retrieved successfully."
response['success'] = True
response['value'] = json.dumps(all_params.value)
return response
[docs]
@remote_service()
async def set_parameter(self, request: Any, response: Any) -> None:
"""
Set a parameter value by its key (ROS2 service interface).
This is the external ROS2 service endpoint. For internal calls,
use :meth:`set_parameter_impl` instead.
:param request: The request object containing key and value.
:type request: Any
:param response: The response object to update with the result.
:type response: Any
:return: None
:rtype: None
**ROS2 Service Usage:**
.. code-block:: bash
ros2 service call /module_name/set_parameter \\
vyra_base_interfaces/srv/SetParameter \\
"{key: 'robot_speed', value: '2.5'}"
**Internal Usage:**
.. code-block:: python
result = await param_manager.set_parameter_impl("robot_speed", "2.5")
if result and result["success"]:
print("Parameter updated!")
"""
key = request.key
value = request.value
param_return = await self.set_parameter_impl(key, value)
if param_return is None:
response.success = False
response.message = "Internal error occurred"
return None
response.success = param_return["success"]
response.message = param_return["message"]
return None
[docs]
async def set_parameter_impl(self, key: str, value: Any) -> Optional[dict]:
"""
Set a parameter value by its key (internal implementation).
Validates type compatibility, min/max bounds and range constraints
before writing to the database.
:param key: The key of the parameter to set.
:type key: str
:param value: The value to set for the parameter.
:type value: Any
:return: Dictionary with success status and message, or None on error.
:rtype: Optional[dict]
"""
response: dict = {}
param_ret: DBReturnValue = await self.persistant_manipulator.get_all(
filters={"name": key})
# DBSTATUS.NOT_FOUND means the query ran fine but no rows matched.
# Treat NOT_FOUND the same as SUCCESS+empty-list: fall through to the upsert path.
if param_ret.status == DBSTATUS.NOT_FOUND:
param_ret.status = DBSTATUS.SUCCESS
param_ret.value = []
if param_ret.status != DBSTATUS.SUCCESS:
response['success'] = False
response['message'] = f"Failed to retrieve parameter with key '{key}' (DB error)."
logger.warning(response['message'])
return response
if not isinstance(param_ret.value, list):
response['success'] = False
response['message'] = "Internal error: param_ret.value is not a list."
logger.error(response['message'])
return response
if len(param_ret.value) == 0:
# Parameter does not exist — create it as a string parameter (upsert behavior)
try:
new_param: dict[str, Any] = {
"name": key,
"value": str(value),
"default_value": str(value),
"type": TypeEnum.string.value,
"displayname": key,
}
add_ret: DBReturnValue = await self.persistant_manipulator.add(new_param)
if add_ret.status != DBSTATUS.SUCCESS:
response['success'] = False
response['message'] = f"Failed to create parameter with key '{key}'."
logger.error(response['message'])
return response
response['success'] = True
response['message'] = f"Parameter '{key}' created successfully."
logger.info(response['message'])
return response
except Exception as e:
response['success'] = False
response['message'] = f"Failed to create parameter '{key}': {e}"
logger.error(response['message'])
return response
if len(param_ret.value) > 1:
logger.warning(
f"Multiple parameters found with key '{key}'. "
"Updating the first one found.")
param_record = param_ret.value[0]
# ── Validate type / min / max / range via ParameterValidator ────────
validation_errors = _validator.validate_set(
param_record=param_record,
new_value=value,
)
if validation_errors:
response['success'] = False
response['message'] = validation_errors[0]
logger.warning("set_parameter validation failed for '%s': %s", key, validation_errors[0])
return response
# ── Type conversion ──────────────────────────────────────────────────
try:
type_str = param_record.type.value if hasattr(param_record.type, 'value') else str(param_record.type)
converted, conv_error = _convert_value(value, type_str)
if conv_error:
raise ValueError(conv_error)
param_obj: dict[str, Any] = {"value": converted}
except (ValueError, TypeError) as ve:
response['success'] = False
response['message'] = (
f"Failed to convert value '{value}' to type '{type_str}': {ve}")
logger.error(response['message'])
return response
update_ret: DBReturnValue = await self.persistant_manipulator.update(
param_obj, filters={"name": key})
if update_ret.status != DBSTATUS.SUCCESS:
response['success'] = False
response['message'] = f"Failed to update parameter with key '{key}'."
logger.error(response['message'])
return response
response['success'] = True
response['message'] = f"Parameter '{key}' updated successfully."
logger.info(response['message'])
return response
[docs]
@remote_service()
async def create_new_parameter(self, request: Any, response: Any) -> None:
"""
Create a new parameter (strict mode, no upsert).
Fails if the parameter already exists.
"""
key = request.key
value = request.value
result = await self.create_new_parameter_impl(key, value)
if result is None:
response.success = False
response.message = "Internal error occurred"
return None
response.success = result["success"]
response.message = result["message"]
return None
[docs]
async def create_new_parameter_impl(self, key: str, value: Any) -> Optional[dict]:
"""
Create a new parameter and fail if key already exists.
"""
response: dict[str, Any] = {}
param_ret: DBReturnValue = await self.persistant_manipulator.get_all(
filters={"name": key}
)
if param_ret.status == DBSTATUS.NOT_FOUND:
param_ret.status = DBSTATUS.SUCCESS
param_ret.value = []
if param_ret.status != DBSTATUS.SUCCESS:
response['success'] = False
response['message'] = f"Failed to retrieve parameter with key '{key}' (DB error)."
logger.warning(response['message'])
return response
if not isinstance(param_ret.value, list):
response['success'] = False
response['message'] = "Internal error: param_ret.value is not a list."
logger.error(response['message'])
return response
if len(param_ret.value) > 0:
response['success'] = False
response['message'] = f"Parameter '{key}' already exists."
logger.warning(response['message'])
return response
new_param: dict[str, Any] = {
"name": key,
"value": str(value),
"default_value": str(value),
"type": TypeEnum.string.value,
"displayname": key,
}
add_ret: DBReturnValue = await self.persistant_manipulator.add(new_param)
if add_ret.status != DBSTATUS.SUCCESS:
response['success'] = False
response['message'] = f"Failed to create parameter with key '{key}'."
logger.error(response['message'])
return response
response['success'] = True
response['message'] = f"Parameter '{key}' created successfully."
logger.info(response['message'])
return response
# ------------------------------------------------------------------
# create_parameter_full – create with full metadata & validation
# ------------------------------------------------------------------
[docs]
@remote_service()
async def create_parameter_full(self, request: Any, response: Any) -> None:
"""
Create a new parameter with full metadata and validation.
Request fields:
name (str): Parameter key (required).
default_value (Any): Default value (required).
type (str): Type string — one of int, float, string, bool, list, dict (required).
description (str): Human-readable description (required).
displayname (str, optional): Display name (max 255 chars).
visible (bool, optional): Visibility flag (default False).
editable (bool, optional): Editable flag (default False).
min_value (str, optional): Minimum value/length constraint.
max_value (str, optional): Maximum value/length constraint.
range_value (dict, optional): Allowed values/types dict.
"""
result = await self.create_parameter_full_impl(
name=str(request.name),
default_value=getattr(request, "default_value", None),
type_str=str(request.type),
description=str(request.description),
displayname=getattr(request, "displayname", None),
visible=bool(getattr(request, "visible", False)),
editable=bool(getattr(request, "editable", False)),
min_value=getattr(request, "min_value", None),
max_value=getattr(request, "max_value", None),
range_value=getattr(request, "range_value", None),
)
if result is None:
response.success = False
response.message = "Internal error occurred"
return None
response.success = result["success"]
response.message = result["message"]
return None
[docs]
async def create_parameter_full_impl(
self,
name: str,
default_value: Any,
type_str: str,
description: str,
displayname: Optional[str] = None,
visible: bool = False,
editable: bool = False,
min_value: Any = None,
max_value: Any = None,
range_value: Optional[dict] = None,
) -> Optional[dict]:
"""
Create a new parameter with full metadata and strict validation.
Validation is delegated to the module-level :data:`_validator`
(a :class:`~vyra_base.core.parameter_validator.ParameterValidator`
instance loaded from ``parameter_rules.yaml``).
"""
resp: dict[str, Any] = {}
# Delegate all field/type/range validation to ParameterValidator
errors = _validator.validate_create(
name=name,
default_value=default_value,
type_str=type_str,
description=description,
displayname=displayname,
min_value=min_value,
max_value=max_value,
range_value=range_value,
)
if errors:
resp['success'] = False
resp['message'] = errors[0]
logger.warning(resp['message'])
return resp
# Convert default_value to the correct Python type
default_value, conv_error = _convert_value(default_value, type_str)
if conv_error:
resp['success'] = False
resp['message'] = f"Cannot convert default_value to type '{type_str}': {conv_error}"
logger.warning(resp['message'])
return resp
dn = displayname if displayname else name
# Handle range_value JSON deserialization if passed as string
if isinstance(range_value, str):
try:
range_value = json.loads(range_value)
except json.JSONDecodeError:
range_value = None
# --- Check for duplicate ---
param_ret: DBReturnValue = await self.persistant_manipulator.get_all(
filters={"name": name}
)
if param_ret.status not in (DBSTATUS.SUCCESS, DBSTATUS.NOT_FOUND):
resp['success'] = False
resp['message'] = f"DB error while checking for existing parameter '{name}'."
logger.error(resp['message'])
return resp
existing = param_ret.value if isinstance(param_ret.value, list) else []
if len(existing) > 0:
resp['success'] = False
resp['message'] = f"Parameter '{name}' already exists."
logger.warning(resp['message'])
return resp
# --- Build record ---
new_param: dict[str, Any] = {
"name": name,
"value": default_value,
"default_value": default_value,
"type": type_str,
"description": description,
"displayname": dn,
"visible": visible,
"editable": editable,
}
if min_value not in (None, ''):
new_param["min_value"] = str(min_value)
if max_value not in (None, ''):
new_param["max_value"] = str(max_value)
if range_value is not None:
new_param["range_value"] = range_value
add_ret: DBReturnValue = await self.persistant_manipulator.add(new_param)
if add_ret.status != DBSTATUS.SUCCESS:
resp['success'] = False
resp['message'] = f"Failed to create parameter '{name}'."
logger.error(resp['message'])
return resp
resp['success'] = True
resp['message'] = f"Parameter '{name}' created successfully."
logger.info(resp['message'])
return resp
# ------------------------------------------------------------------
# update_parameter_metadata – update displayname / visible / editable
# ------------------------------------------------------------------
# ------------------------------------------------------------------
# delete_parameter – remove a parameter from persistence
# ------------------------------------------------------------------
[docs]
@remote_service()
async def delete_parameter(self, request: Any, response: Any) -> None:
"""
Delete a parameter from persistence by key.
Request fields:
key (str): Parameter key to delete (required).
"""
result = await self.delete_parameter_impl(key=str(request.key))
if result is None:
response.success = False
response.message = "Internal error occurred"
return None
response.success = result["success"]
response.message = result["message"]
return None
[docs]
async def delete_parameter_impl(self, key: str) -> Optional[dict]:
"""
Delete a parameter from the database by key.
:param key: The parameter name/key to delete.
:type key: str
:return: Result dict with ``success`` and ``message``, or None on internal error.
:rtype: Optional[dict]
"""
resp: dict[str, Any] = {}
if not key:
resp['success'] = False
resp['message'] = "Parameter key is required."
logger.warning(resp['message'])
return resp
param_ret: DBReturnValue = await self.persistant_manipulator.get_all(
filters={"name": key}
)
if param_ret.status == DBSTATUS.NOT_FOUND or (
isinstance(param_ret.value, list) and len(param_ret.value) == 0
):
resp['success'] = False
resp['message'] = f"Parameter '{key}' not found."
logger.warning(resp['message'])
return resp
if param_ret.status != DBSTATUS.SUCCESS:
resp['success'] = False
resp['message'] = f"DB error while fetching parameter '{key}'."
logger.error(resp['message'])
return resp
# Retrieve primary key from the first matching record
if isinstance(param_ret.value, list) and len(param_ret.value) > 0:
param_record = param_ret.value[0]
else:
resp['success'] = False
resp['message'] = "Internal error: param_ret.value is not a list or is empty."
logger.error(resp['message'])
return resp
pkey_field = self.persistant_manipulator._read_pkey()
record_id = getattr(param_record, pkey_field, None)
if record_id is None:
record_dict = self.persistant_manipulator.to_dict(param_record)
record_id = record_dict.get(pkey_field)
if record_id is None:
resp['success'] = False
resp['message'] = f"Could not determine primary key for parameter '{key}'."
logger.error(resp['message'])
return resp
del_ret: DBReturnValue = await self.persistant_manipulator.delete(record_id)
if del_ret.status != DBSTATUS.SUCCESS:
resp['success'] = False
resp['message'] = f"Failed to delete parameter '{key}'."
logger.error(resp['message'])
return resp
resp['success'] = True
resp['message'] = f"Parameter '{key}' deleted successfully."
logger.info(resp['message'])
return resp
[docs]
@remote_service()
async def read_all_params(self, request: Any, response: Any) -> None:
"""
Read all parameters (ROS2 service interface).
This is the external ROS2 service endpoint. For internal calls,
use :meth:`read_all_params_impl` instead.
Returns a JSON string containing all parameters with their metadata.
:param request: The request object (unused).
:type request: Any
:param response: The response object to update with the result.
:type response: Any
:return: None
:rtype: None
**Response format:**
.. code-block:: python
response.all_params_json = '[{"name": "param1", "value": "value1", ...}, ...]'
**ROS2 Service Usage:**
.. code-block:: bash
ros2 service call /module_name/read_all_params \\
vyra_base_interfaces/srv/ReadAllParams "{}"
**Internal Usage:**
.. code-block:: python
result = await param_manager.read_all_params_impl()
if result:
params = json.loads(result["all_params_json"])
for param in params:
print(f"{param['name']}: {param['value']}")
"""
param_return = await self.read_all_params_impl()
if param_return is None:
response.all_params_json = "[]"
return None
response.all_params_json = param_return["all_params_json"]
return None
[docs]
async def read_all_params_impl(self) -> Optional[dict]:
"""
Read all parameters (internal implementation).
This method retrieves all parameters from the database and serializes
them to JSON format.
:return: Dictionary containing all parameters as JSON string, or None on error.
:rtype: Optional[dict]
**Return format:**
.. code-block:: python
{
"all_params_json": '[{"name": "...", "value": "...", "type": "..."}, ...]'
}
**Example:**
.. code-block:: python
result = await param_manager.read_all_params_impl()
if result:
params = json.loads(result["all_params_json"])
for param in params:
print(f"Parameter: {param['name']}")
print(f" Value: {param['value']}")
print(f" Type: {param['type']}")
"""
response: dict = {}
ret_obj: DBReturnValue = await self.persistant_manipulator.get_all()
if not isinstance(ret_obj.value, list):
logger.error("Internal error: ret_obj.value is not a list.")
return None
all_params = [
self.persistant_manipulator.to_dict(p) for p in ret_obj.value]
response['all_params_json'] = json.dumps(all_params)
return response
@ErrorTraceback.w_check_error_exist
def after_update_param_callback(self, mapper, connection, target) -> None:
"""
Callback function that is called after a parameter is updated. Will shout
the parameter change event.
:param mapper: The mapper.
:param connection: The database connection.
:param target: The target object that was updated.
"""
logger.info(f"Parameter '{target.name}' updated to '{target.value}'")
insp = inspect(target)
for attr in insp.attrs:
if attr.history.has_changes():
old_value = attr.history.deleted
new_value = attr.history.added
logger.debug(f"Feld {attr.key} changed: {old_value} -> {new_value}")
# NOTE: SQLAlchemy event callbacks must be synchronous - cannot use await
# TODO: Implement async event queue for parameter change notifications via ROS2
logger.debug(f"Parameter '{target.name}' updated (ROS2 change event not published)")
[docs]
@remote_service()
async def param_changed_topic(self, request: Any, response: Any) -> None:
"""
Get the ROS2 topic name for parameter change events (ROS2 service interface).
This is the external ROS2 service endpoint. For internal calls,
use :meth:`param_changed_topic_impl` instead.
Returns the topic name where parameter change notifications are published.
Other modules can subscribe to this topic to receive updates.
:param request: The request object (unused).
:type request: Any
:param response: The response object to update with the result.
:type response: Any
:return: None
:rtype: None
**ROS2 Service Usage:**
.. code-block:: bash
ros2 service call /module_name/param_changed_topic \\
vyra_base_interfaces/srv/ParamChangedTopic "{}"
**Internal Usage:**
.. code-block:: python
result = await param_manager.param_changed_topic_impl()
if result:
topic = result["topic"]
print(f"Subscribe to parameter changes: {topic}")
"""
param_return = await self.param_changed_topic_impl()
if param_return is None:
response.topic = ""
return None
response.topic = param_return["topic"]
return None
[docs]
async def param_changed_topic_impl(self) -> Optional[dict]:
"""
Get the ROS2 topic name for parameter change events (internal implementation).
This method retrieves the topic name from the internal publisher server.
The topic is used to broadcast parameter changes to other modules.
:return: Dictionary containing the topic name, or None on error.
:rtype: Optional[dict]
**Return format:**
.. code-block:: python
{
"topic": "/module_name/param_update_event"
}
**Example:**
.. code-block:: python
result = await param_manager.param_changed_topic_impl()
if result:
# Subscribe to parameter change events
node.create_subscription(
UpdateParamEvent,
result["topic"],
callback=on_param_changed
)
"""
response: dict = {}
# Ensure publisher is initialized
if self.update_parameter_publisher is None:
await self._init_publisher()
# Access internal publisher from ROS2Publisher
pub_server = getattr(self.update_parameter_publisher, '_publisher', None)
if pub_server is None:
logger.error("Publisher server is not initialized.")
return None
response['topic'] = pub_server.publisher_info.name
return response
[docs]
async def has_parameter(self, key: str) -> bool:
"""
Check if a parameter with the given key exists.
:param key: The key of the parameter to check.
:type key: str
:return: True if the parameter exists, False otherwise.
:rtype: bool
**Example:**
.. code-block:: python
if await param_manager.has_parameter("robot_speed"):
result = await param_manager.get_parameter_impl("robot_speed")
"""
result: DBReturnValue = await self.persistant_manipulator.get_all(
filters={"name": key}
)
return (
result.status == DBSTATUS.SUCCESS
and isinstance(result.value, list)
and len(result.value) > 0
)