Quellcode für vyra_base.core.parameter

from datetime import datetime
import json
import logging
from typing import Any, Optional

from sqlalchemy import event
from sqlalchemy import inspect

from vyra_base.com import InterfaceFactory, ProtocolType, remote_service
from vyra_base.helper.error_handler import ErrorTraceback
from vyra_base.storage.db_access import DBSTATUS, DbAccess
from vyra_base.storage.db_manipulator import DBReturnValue, DbManipulator
from vyra_base.storage.tb_params import Parameter as DbParameter
from vyra_base.storage.tb_params import TypeEnum
from vyra_base.core.parameter_validator import ParameterValidator, _convert_value

# Safe mapping from TypeEnum string values to actual Python types.
# Using eval(type_string) fails for "string" (Python uses "str", not "string").
_TYPE_CONVERTERS: dict = {
    "int": int,
    "integer": int,
    "string": str,
    "str": str,
    "bool": bool,
    "boolean": bool,
    "float": float,
    "list": list,
    "dict": dict,
}

# Module-level singleton validator loaded from parameter_rules.yaml
_validator = ParameterValidator()

logger = logging.getLogger(__name__)


[Doku] class Parameter: """ Managing module parameters. """
[Doku] def __init__( self, parameter_base_types: dict[str, Any], node: Any, storage_access_persistant: Any, storage_access_persistant_default: Any=None, storage_access_transient: Any = None ) -> None: """ Initialize vyra module parameters. Parameter containing basic settings for the module which could be used bei the specific processes in the module and will be configured by the user or by other modules. They will be stored in a database table. Other modules could subscribe to a parameter change event to be informed about changes on a parameter. :param storage_access: An object that provides access to the storage system. :param node: The ROS2 node to attach the parameter manager to. :param parameter_base_types: A dictionary of base types for parameters. :type parameter_base_types: dict[str, Any] :param storage_access_persistant: The database access object for persistent parameters. :type storage_access_persistant: DbAccess :param storage_access_transient: The database access object for transient parameters. :type storage_access_transient: DbAccess | None :raises ValueError: If the storage_access is not of type DbAccess. """ self.storage_access_persistant = storage_access_persistant self.storage_access_persistant_default = storage_access_persistant_default self.persistant_manipulator = DbManipulator( storage_access_persistant, DbParameter ) self.parameter_base_types = parameter_base_types self.update_param_event_ident = "param_update_event_publisher" # Store node reference for lazy publisher initialization self._node = node self.update_parameter_publisher: Optional[Any] = None # Will be VYRA/Zenoh publisher self.storage_access_transient: Any|None = storage_access_transient
async def _init_publisher(self) -> None: """Initialize the parameter update publisher lazily.""" if self.update_parameter_publisher is None: self.update_parameter_publisher = await InterfaceFactory.create_publisher( name=self.update_param_event_ident, protocols=[ProtocolType.ZENOH], node=self._node, is_publisher=True )
[Doku] @staticmethod def replace_validation_rules(rules: dict) -> None: """ Replace the active validation ruleset for all :class:`Parameter` instances. This modifies the *module-level* ``_validator`` singleton, so every Parameter instance created in this process will use the new rules. :param rules: New ruleset dict – must match the structure of ``parameter_rules.yaml``. Example:: import yaml with open("/etc/myapp/custom_param_rules.yaml") as f: Parameter.replace_validation_rules(yaml.safe_load(f)) """ _validator.replace_rules(rules)
[Doku] @staticmethod def load_validation_rules_from_file(path: str) -> None: """ Load a YAML ruleset from a file and make it active. :param path: Absolute or relative path to the YAML rules file. """ _validator.load_rules_from_file(path)
@ErrorTraceback.w_check_error_exist async def load_defaults( self, storage_access_default: Optional[DbAccess] = None, override_exist: bool = False) -> bool: """ Initialize parameters. Load default parameters if they does not exist. Also add them to the transient storage. This method should be implemented to set up initial parameters. :param storage_access_default: The database access object for default parameters. :param override_exist: If True, existing parameters will be overridden with defaults. :return: None :raises ValueError: If the storage_access_default is not of type DbAccess. """ if storage_access_default is not None and not isinstance(storage_access_default, DbAccess): raise ValueError("Invalid storage_access_default. Must be of type DbAccess.") if storage_access_default is None: if self.storage_access_persistant_default is None: logger.warning("No default storage access provided. Skipping default " "parameter loading.") return False storage_access_default = self.storage_access_persistant_default if not isinstance(storage_access_default, DbAccess): raise ValueError("Invalid storage_access_default. Must be of type DbAccess.") default_manipulator = DbManipulator( storage_access_default, DbParameter ) if not await storage_access_default.check_table_exists(DbParameter): logger.info( f"Table '{DbParameter.__tablename__}' does not exist in the default " "database. Skipping default parameter loading.") return False all_return: DBReturnValue = await self.persistant_manipulator.get_all() if not isinstance(all_return.value, list): raise ValueError(f"all_return.value must be a list. Is: {all_return.value}") all_params: list[dict] = [default_manipulator.to_dict(p) for p in all_return.value] all_default_return: DBReturnValue = await default_manipulator.get_all() if not isinstance(all_default_return.value, list): raise ValueError("all_default.value must be a list.") all_default: list[dict] = [ default_manipulator.to_dict(p) for p in all_default_return.value ] param_name_list: list[str] = [n["name"] for n in all_params] for default_item in all_default: if default_item["name"] not in param_name_list: logger.info("Loading default parameter " f"'{default_item['name']}: {default_item['value']}'") param_obj: dict[str, Any] = { key: default_item[key] for key in default_item if key != "id" } await self.persistant_manipulator.add(param_obj) elif override_exist: logger.info("Overriding existing parameter " f"'{default_item['name']}: {default_item['value']}'") param_obj: dict[str, Any] = { key: default_item[key] for key in default_item if key != "id" } await self.persistant_manipulator.update(param_obj) event.listen(DbParameter, "after_update", self.after_update_param_callback) return True
[Doku] @remote_service() async def get_parameter(self, request: Any, response: Any) -> bool: """ Get a parameter value by its key (ROS2 service interface). This is the external ROS2 service endpoint. For internal calls, use :meth:`get_parameter_impl` instead. :param request: The request object containing the key of the parameter to retrieve. :type request: Any :param response: The response object to store the result. :type response: Any :return: True if successful, False otherwise. :rtype: bool **ROS2 Service Usage:** .. code-block:: bash ros2 service call /module_name/get_parameter \\ vyra_base_interfaces/srv/GetParameter \\ "{key: 'param_name'}" **Internal Usage:** .. code-block:: python result = await param_manager.get_parameter_impl("param_name") if result: print(result["value"]) """ key = request.key param_return = await self.get_parameter_impl(key) if param_return is None: response.json_value = "[]" response.success = False response.message = f"Failed to retrieve parameter with key '{key}'." return False response.json_value = param_return["value"] response.success = param_return["success"] response.message = param_return["message"] return True
[Doku] async def get_parameter_impl(self, key: str) -> Optional[dict]: """ Get a parameter value by its key (internal implementation). This method contains the actual business logic for retrieving a parameter. It can be called directly from Python code without going through the ROS2 service layer. :param key: The key of the parameter to retrieve. :type key: str :return: Dictionary with parameter data or None on error. :rtype: Optional[dict] **Return format:** .. code-block:: python { "value": "<json_serialized_parameter>", "success": True, "message": "Parameter 'key' retrieved successfully." } **Example:** .. code-block:: python result = await param_manager.get_parameter_impl("robot_speed") if result and result["success"]: param_data = json.loads(result["value"]) print(f"Speed: {param_data['value']}") """ response: Any = {} response['value'] = "[]" all_params: DBReturnValue = await self.persistant_manipulator.get_all( filters={"name": key}) # Treat NOT_FOUND the same as a real DB error for get (parameter doesn't exist) if all_params.status == DBSTATUS.NOT_FOUND: all_params.status = DBSTATUS.ERROR # map to failure so caller gets proper error if all_params.status != DBSTATUS.SUCCESS: response['success'] = False response['message'] = f"Failed to retrieve parameter with key '{key}'." logger.warn(response['message']) return None if not isinstance(all_params.value, list): response['success'] = False response['message'] = "Internal error: all_params.value is not a list." logger.warn(response['message']) return None all_params.value = self.persistant_manipulator.to_dict(all_params.value[0]) response['message'] = f"Parameter '{key}' retrieved successfully." response['success'] = True response['value'] = json.dumps(all_params.value) return response
[Doku] @remote_service() async def set_parameter(self, request: Any, response: Any) -> None: """ Set a parameter value by its key (ROS2 service interface). This is the external ROS2 service endpoint. For internal calls, use :meth:`set_parameter_impl` instead. :param request: The request object containing key and value. :type request: Any :param response: The response object to update with the result. :type response: Any :return: None :rtype: None **ROS2 Service Usage:** .. code-block:: bash ros2 service call /module_name/set_parameter \\ vyra_base_interfaces/srv/SetParameter \\ "{key: 'robot_speed', value: '2.5'}" **Internal Usage:** .. code-block:: python result = await param_manager.set_parameter_impl("robot_speed", "2.5") if result and result["success"]: print("Parameter updated!") """ key = request.key value = request.value param_return = await self.set_parameter_impl(key, value) if param_return is None: response.success = False response.message = "Internal error occurred" return None response.success = param_return["success"] response.message = param_return["message"] return None
[Doku] async def set_parameter_impl(self, key: str, value: Any) -> Optional[dict]: """ Set a parameter value by its key (internal implementation). Validates type compatibility, min/max bounds and range constraints before writing to the database. :param key: The key of the parameter to set. :type key: str :param value: The value to set for the parameter. :type value: Any :return: Dictionary with success status and message, or None on error. :rtype: Optional[dict] """ response: dict = {} param_ret: DBReturnValue = await self.persistant_manipulator.get_all( filters={"name": key}) # DBSTATUS.NOT_FOUND means the query ran fine but no rows matched. # Treat NOT_FOUND the same as SUCCESS+empty-list: fall through to the upsert path. if param_ret.status == DBSTATUS.NOT_FOUND: param_ret.status = DBSTATUS.SUCCESS param_ret.value = [] if param_ret.status != DBSTATUS.SUCCESS: response['success'] = False response['message'] = f"Failed to retrieve parameter with key '{key}' (DB error)." logger.warning(response['message']) return response if not isinstance(param_ret.value, list): response['success'] = False response['message'] = "Internal error: param_ret.value is not a list." logger.error(response['message']) return response if len(param_ret.value) == 0: # Parameter does not exist — create it as a string parameter (upsert behavior) try: new_param: dict[str, Any] = { "name": key, "value": str(value), "default_value": str(value), "type": TypeEnum.string.value, "displayname": key, } add_ret: DBReturnValue = await self.persistant_manipulator.add(new_param) if add_ret.status != DBSTATUS.SUCCESS: response['success'] = False response['message'] = f"Failed to create parameter with key '{key}'." logger.error(response['message']) return response response['success'] = True response['message'] = f"Parameter '{key}' created successfully." logger.info(response['message']) return response except Exception as e: response['success'] = False response['message'] = f"Failed to create parameter '{key}': {e}" logger.error(response['message']) return response if len(param_ret.value) > 1: logger.warning( f"Multiple parameters found with key '{key}'. " "Updating the first one found.") param_record = param_ret.value[0] # ── Validate type / min / max / range via ParameterValidator ──────── validation_errors = _validator.validate_set( param_record=param_record, new_value=value, ) if validation_errors: response['success'] = False response['message'] = validation_errors[0] logger.warning("set_parameter validation failed for '%s': %s", key, validation_errors[0]) return response # ── Type conversion ────────────────────────────────────────────────── try: type_str = param_record.type.value if hasattr(param_record.type, 'value') else str(param_record.type) converted, conv_error = _convert_value(value, type_str) if conv_error: raise ValueError(conv_error) param_obj: dict[str, Any] = {"value": converted} except (ValueError, TypeError) as ve: response['success'] = False response['message'] = ( f"Failed to convert value '{value}' to type '{type_str}': {ve}") logger.error(response['message']) return response update_ret: DBReturnValue = await self.persistant_manipulator.update( param_obj, filters={"name": key}) if update_ret.status != DBSTATUS.SUCCESS: response['success'] = False response['message'] = f"Failed to update parameter with key '{key}'." logger.error(response['message']) return response response['success'] = True response['message'] = f"Parameter '{key}' updated successfully." logger.info(response['message']) return response
[Doku] @remote_service() async def create_new_parameter(self, request: Any, response: Any) -> None: """ Create a new parameter (strict mode, no upsert). Fails if the parameter already exists. """ key = request.key value = request.value result = await self.create_new_parameter_impl(key, value) if result is None: response.success = False response.message = "Internal error occurred" return None response.success = result["success"] response.message = result["message"] return None
[Doku] async def create_new_parameter_impl(self, key: str, value: Any) -> Optional[dict]: """ Create a new parameter and fail if key already exists. """ response: dict[str, Any] = {} param_ret: DBReturnValue = await self.persistant_manipulator.get_all( filters={"name": key} ) if param_ret.status == DBSTATUS.NOT_FOUND: param_ret.status = DBSTATUS.SUCCESS param_ret.value = [] if param_ret.status != DBSTATUS.SUCCESS: response['success'] = False response['message'] = f"Failed to retrieve parameter with key '{key}' (DB error)." logger.warning(response['message']) return response if not isinstance(param_ret.value, list): response['success'] = False response['message'] = "Internal error: param_ret.value is not a list." logger.error(response['message']) return response if len(param_ret.value) > 0: response['success'] = False response['message'] = f"Parameter '{key}' already exists." logger.warning(response['message']) return response new_param: dict[str, Any] = { "name": key, "value": str(value), "default_value": str(value), "type": TypeEnum.string.value, "displayname": key, } add_ret: DBReturnValue = await self.persistant_manipulator.add(new_param) if add_ret.status != DBSTATUS.SUCCESS: response['success'] = False response['message'] = f"Failed to create parameter with key '{key}'." logger.error(response['message']) return response response['success'] = True response['message'] = f"Parameter '{key}' created successfully." logger.info(response['message']) return response
# ------------------------------------------------------------------ # create_parameter_full – create with full metadata & validation # ------------------------------------------------------------------
[Doku] @remote_service() async def create_parameter_full(self, request: Any, response: Any) -> None: """ Create a new parameter with full metadata and validation. Request fields: name (str): Parameter key (required). default_value (Any): Default value (required). type (str): Type string — one of int, float, string, bool, list, dict (required). description (str): Human-readable description (required). displayname (str, optional): Display name (max 255 chars). visible (bool, optional): Visibility flag (default False). editable (bool, optional): Editable flag (default False). min_value (str, optional): Minimum value/length constraint. max_value (str, optional): Maximum value/length constraint. range_value (dict, optional): Allowed values/types dict. """ result = await self.create_parameter_full_impl( name=str(request.name), default_value=getattr(request, "default_value", None), type_str=str(request.type), description=str(request.description), displayname=getattr(request, "displayname", None), visible=bool(getattr(request, "visible", False)), editable=bool(getattr(request, "editable", False)), min_value=getattr(request, "min_value", None), max_value=getattr(request, "max_value", None), range_value=getattr(request, "range_value", None), ) if result is None: response.success = False response.message = "Internal error occurred" return None response.success = result["success"] response.message = result["message"] return None
[Doku] async def create_parameter_full_impl( self, name: str, default_value: Any, type_str: str, description: str, displayname: Optional[str] = None, visible: bool = False, editable: bool = False, min_value: Any = None, max_value: Any = None, range_value: Optional[dict] = None, ) -> Optional[dict]: """ Create a new parameter with full metadata and strict validation. Validation is delegated to the module-level :data:`_validator` (a :class:`~vyra_base.core.parameter_validator.ParameterValidator` instance loaded from ``parameter_rules.yaml``). """ resp: dict[str, Any] = {} # Delegate all field/type/range validation to ParameterValidator errors = _validator.validate_create( name=name, default_value=default_value, type_str=type_str, description=description, displayname=displayname, min_value=min_value, max_value=max_value, range_value=range_value, ) if errors: resp['success'] = False resp['message'] = errors[0] logger.warning(resp['message']) return resp # Convert default_value to the correct Python type default_value, conv_error = _convert_value(default_value, type_str) if conv_error: resp['success'] = False resp['message'] = f"Cannot convert default_value to type '{type_str}': {conv_error}" logger.warning(resp['message']) return resp dn = displayname if displayname else name # Handle range_value JSON deserialization if passed as string if isinstance(range_value, str): try: range_value = json.loads(range_value) except json.JSONDecodeError: range_value = None # --- Check for duplicate --- param_ret: DBReturnValue = await self.persistant_manipulator.get_all( filters={"name": name} ) if param_ret.status not in (DBSTATUS.SUCCESS, DBSTATUS.NOT_FOUND): resp['success'] = False resp['message'] = f"DB error while checking for existing parameter '{name}'." logger.error(resp['message']) return resp existing = param_ret.value if isinstance(param_ret.value, list) else [] if len(existing) > 0: resp['success'] = False resp['message'] = f"Parameter '{name}' already exists." logger.warning(resp['message']) return resp # --- Build record --- new_param: dict[str, Any] = { "name": name, "value": default_value, "default_value": default_value, "type": type_str, "description": description, "displayname": dn, "visible": visible, "editable": editable, } if min_value not in (None, ''): new_param["min_value"] = str(min_value) if max_value not in (None, ''): new_param["max_value"] = str(max_value) if range_value is not None: new_param["range_value"] = range_value add_ret: DBReturnValue = await self.persistant_manipulator.add(new_param) if add_ret.status != DBSTATUS.SUCCESS: resp['success'] = False resp['message'] = f"Failed to create parameter '{name}'." logger.error(resp['message']) return resp resp['success'] = True resp['message'] = f"Parameter '{name}' created successfully." logger.info(resp['message']) return resp
# ------------------------------------------------------------------ # update_parameter_metadata – update displayname / visible / editable # ------------------------------------------------------------------
[Doku] @remote_service() async def update_parameter_metadata(self, request: Any, response: Any) -> None: """ Update display metadata for an existing parameter. Request fields: name (str): Parameter key (required). displayname (str, optional): New display name. visible (bool, optional): New visibility flag. editable (bool, optional): New editable flag. """ result = await self.update_parameter_metadata_impl( key=str(request.name), displayname=getattr(request, "displayname", None), visible=getattr(request, "visible", None), editable=getattr(request, "editable", None), default_value=getattr(request, "default_value", None), ) if result is None: response.success = False response.message = "Internal error occurred" return None response.success = result["success"] response.message = result["message"] return None
[Doku] async def update_parameter_metadata_impl( self, key: str, displayname: Optional[str] = None, visible: Optional[bool] = None, editable: Optional[bool] = None, default_value: Optional[Any] = None, ) -> Optional[dict]: """ Update only displayname, visible, editable, and/or default_value for an existing parameter. """ resp: dict[str, Any] = {} if not key: resp['success'] = False resp['message'] = "Parameter name (key) is required." logger.warning(resp['message']) return resp # Fetch existing param_ret: DBReturnValue = await self.persistant_manipulator.get_all( filters={"name": key} ) if param_ret.status == DBSTATUS.NOT_FOUND or ( isinstance(param_ret.value, list) and len(param_ret.value) == 0 ): resp['success'] = False resp['message'] = f"Parameter '{key}' not found." logger.warning(resp['message']) return resp if param_ret.status != DBSTATUS.SUCCESS: resp['success'] = False resp['message'] = f"DB error while fetching parameter '{key}'." logger.error(resp['message']) return resp updates: dict[str, Any] = {} if displayname is not None: if len(str(displayname)) > 255: resp['success'] = False resp['message'] = "displayname exceeds 255 characters." logger.warning(resp['message']) return resp updates["displayname"] = str(displayname) if visible is not None: updates["visible"] = bool(visible) if editable is not None: updates["editable"] = bool(editable) if default_value is not None: updates["default_value"] = str(default_value) if not updates: resp['success'] = True resp['message'] = "No metadata fields to update." return resp upd_ret: DBReturnValue = await self.persistant_manipulator.update( filters={"name": key}, data=updates, ) if upd_ret.status != DBSTATUS.SUCCESS: resp['success'] = False resp['message'] = f"Failed to update metadata for parameter '{key}'." logger.error(resp['message']) return resp resp['success'] = True resp['message'] = f"Metadata for parameter '{key}' updated successfully." logger.info(resp['message']) return resp
# ------------------------------------------------------------------ # delete_parameter – remove a parameter from persistence # ------------------------------------------------------------------
[Doku] @remote_service() async def delete_parameter(self, request: Any, response: Any) -> None: """ Delete a parameter from persistence by key. Request fields: key (str): Parameter key to delete (required). """ result = await self.delete_parameter_impl(key=str(request.key)) if result is None: response.success = False response.message = "Internal error occurred" return None response.success = result["success"] response.message = result["message"] return None
[Doku] async def delete_parameter_impl(self, key: str) -> Optional[dict]: """ Delete a parameter from the database by key. :param key: The parameter name/key to delete. :type key: str :return: Result dict with ``success`` and ``message``, or None on internal error. :rtype: Optional[dict] """ resp: dict[str, Any] = {} if not key: resp['success'] = False resp['message'] = "Parameter key is required." logger.warning(resp['message']) return resp param_ret: DBReturnValue = await self.persistant_manipulator.get_all( filters={"name": key} ) if param_ret.status == DBSTATUS.NOT_FOUND or ( isinstance(param_ret.value, list) and len(param_ret.value) == 0 ): resp['success'] = False resp['message'] = f"Parameter '{key}' not found." logger.warning(resp['message']) return resp if param_ret.status != DBSTATUS.SUCCESS: resp['success'] = False resp['message'] = f"DB error while fetching parameter '{key}'." logger.error(resp['message']) return resp # Retrieve primary key from the first matching record if isinstance(param_ret.value, list) and len(param_ret.value) > 0: param_record = param_ret.value[0] else: resp['success'] = False resp['message'] = "Internal error: param_ret.value is not a list or is empty." logger.error(resp['message']) return resp pkey_field = self.persistant_manipulator._read_pkey() record_id = getattr(param_record, pkey_field, None) if record_id is None: record_dict = self.persistant_manipulator.to_dict(param_record) record_id = record_dict.get(pkey_field) if record_id is None: resp['success'] = False resp['message'] = f"Could not determine primary key for parameter '{key}'." logger.error(resp['message']) return resp del_ret: DBReturnValue = await self.persistant_manipulator.delete(record_id) if del_ret.status != DBSTATUS.SUCCESS: resp['success'] = False resp['message'] = f"Failed to delete parameter '{key}'." logger.error(resp['message']) return resp resp['success'] = True resp['message'] = f"Parameter '{key}' deleted successfully." logger.info(resp['message']) return resp
[Doku] @remote_service() async def read_all_params(self, request: Any, response: Any) -> None: """ Read all parameters (ROS2 service interface). This is the external ROS2 service endpoint. For internal calls, use :meth:`read_all_params_impl` instead. Returns a JSON string containing all parameters with their metadata. :param request: The request object (unused). :type request: Any :param response: The response object to update with the result. :type response: Any :return: None :rtype: None **Response format:** .. code-block:: python response.all_params_json = '[{"name": "param1", "value": "value1", ...}, ...]' **ROS2 Service Usage:** .. code-block:: bash ros2 service call /module_name/read_all_params \\ vyra_base_interfaces/srv/ReadAllParams "{}" **Internal Usage:** .. code-block:: python result = await param_manager.read_all_params_impl() if result: params = json.loads(result["all_params_json"]) for param in params: print(f"{param['name']}: {param['value']}") """ param_return = await self.read_all_params_impl() if param_return is None: response.all_params_json = "[]" return None response.all_params_json = param_return["all_params_json"] return None
[Doku] async def read_all_params_impl(self) -> Optional[dict]: """ Read all parameters (internal implementation). This method retrieves all parameters from the database and serializes them to JSON format. :return: Dictionary containing all parameters as JSON string, or None on error. :rtype: Optional[dict] **Return format:** .. code-block:: python { "all_params_json": '[{"name": "...", "value": "...", "type": "..."}, ...]' } **Example:** .. code-block:: python result = await param_manager.read_all_params_impl() if result: params = json.loads(result["all_params_json"]) for param in params: print(f"Parameter: {param['name']}") print(f" Value: {param['value']}") print(f" Type: {param['type']}") """ response: dict = {} ret_obj: DBReturnValue = await self.persistant_manipulator.get_all() if not isinstance(ret_obj.value, list): logger.error("Internal error: ret_obj.value is not a list.") return None all_params = [ self.persistant_manipulator.to_dict(p) for p in ret_obj.value] response['all_params_json'] = json.dumps(all_params) return response
@ErrorTraceback.w_check_error_exist def after_update_param_callback(self, mapper, connection, target) -> None: """ Callback function that is called after a parameter is updated. Will shout the parameter change event. :param mapper: The mapper. :param connection: The database connection. :param target: The target object that was updated. """ logger.info(f"Parameter '{target.name}' updated to '{target.value}'") insp = inspect(target) for attr in insp.attrs: if attr.history.has_changes(): old_value = attr.history.deleted new_value = attr.history.added logger.debug(f"Feld {attr.key} changed: {old_value} -> {new_value}") # NOTE: SQLAlchemy event callbacks must be synchronous - cannot use await # TODO: Implement async event queue for parameter change notifications via ROS2 logger.debug(f"Parameter '{target.name}' updated (ROS2 change event not published)")
[Doku] @remote_service() async def param_changed_topic(self, request: Any, response: Any) -> None: """ Get the ROS2 topic name for parameter change events (ROS2 service interface). This is the external ROS2 service endpoint. For internal calls, use :meth:`param_changed_topic_impl` instead. Returns the topic name where parameter change notifications are published. Other modules can subscribe to this topic to receive updates. :param request: The request object (unused). :type request: Any :param response: The response object to update with the result. :type response: Any :return: None :rtype: None **ROS2 Service Usage:** .. code-block:: bash ros2 service call /module_name/param_changed_topic \\ vyra_base_interfaces/srv/ParamChangedTopic "{}" **Internal Usage:** .. code-block:: python result = await param_manager.param_changed_topic_impl() if result: topic = result["topic"] print(f"Subscribe to parameter changes: {topic}") """ param_return = await self.param_changed_topic_impl() if param_return is None: response.topic = "" return None response.topic = param_return["topic"] return None
[Doku] async def param_changed_topic_impl(self) -> Optional[dict]: """ Get the ROS2 topic name for parameter change events (internal implementation). This method retrieves the topic name from the internal publisher server. The topic is used to broadcast parameter changes to other modules. :return: Dictionary containing the topic name, or None on error. :rtype: Optional[dict] **Return format:** .. code-block:: python { "topic": "/module_name/param_update_event" } **Example:** .. code-block:: python result = await param_manager.param_changed_topic_impl() if result: # Subscribe to parameter change events node.create_subscription( UpdateParamEvent, result["topic"], callback=on_param_changed ) """ response: dict = {} # Ensure publisher is initialized if self.update_parameter_publisher is None: await self._init_publisher() # Access internal publisher from ROS2Publisher pub_server = getattr(self.update_parameter_publisher, '_publisher', None) if pub_server is None: logger.error("Publisher server is not initialized.") return None response['topic'] = pub_server.publisher_info.name return response
[Doku] async def has_parameter(self, key: str) -> bool: """ Check if a parameter with the given key exists. :param key: The key of the parameter to check. :type key: str :return: True if the parameter exists, False otherwise. :rtype: bool **Example:** .. code-block:: python if await param_manager.has_parameter("robot_speed"): result = await param_manager.get_parameter_impl("robot_speed") """ result: DBReturnValue = await self.persistant_manipulator.get_all( filters={"name": key} ) return ( result.status == DBSTATUS.SUCCESS and isinstance(result.value, list) and len(result.value) > 0 )