Quellcode für vyra_base.defaults.entries

from __future__ import annotations

import uuid
from dataclasses import asdict, dataclass, field, is_dataclass, fields
from datetime import datetime
from enum import Enum, IntEnum
from re import DEBUG
from typing import Any, Callable, Union
from uuid import UUID

try:
    from rclpy.qos import QoSProfile
except ImportError:  # ROS2 not installed (e.g. dashboard-only containers)
    pass
from vyra_base.helper.error_handler import ErrorTraceback


[Doku] class FunctionConfigParamTypes(Enum): """ Enum for the function configuration parameter types. Represents the types of parameters that can be used in a function configuration. :cvar BOOL: Boolean type :cvar BYTE: Byte type :cvar CHAR: Char type :cvar INT8: 8-bit integer :cvar UINT8: 8-bit unsigned integer :cvar INT16: 16-bit integer :cvar UINT16: 16-bit unsigned integer :cvar INT32: 32-bit integer :cvar UINT32: 32-bit unsigned integer :cvar INT64: 64-bit integer :cvar UINT64: 64-bit unsigned integer :cvar FLOAT32: 32-bit float :cvar FLOAT64: 64-bit float :cvar STRING: String type :cvar TIME: Time type :cvar DURATION: Duration type :cvar ANY: Any type (user defined) :cvar NONE: None type :cvar ARRAY: Array type """ BOOL = "bool" BYTE = "byte" CHAR = "char" INT8 = "int8" UINT8 = "uint8" INT16 = "int16" UINT16 = "uint16" INT32 = "int32" UINT32 = "uint32" INT64 = "int64" UINT64 = "uint64" FLOAT32 = "float32" FLOAT64 = "float64" STRING = "string" TIME = "time" DURATION = "duration" ANY = "any" NONE = "none" ARRAY = "array"
[Doku] @dataclass(slots=True) class DCBase: """ Base dataclass providing utility methods for all VYRA dataclass entries. Provides common functionality like dict conversion for dataclass instances. """
[Doku] def asdict(self): """ Convert the dataclass instance to a dictionary. :return: Dictionary representation of the dataclass. :rtype: dict """ return asdict(self)
[Doku] class FunctionConfigBaseTypes(Enum): """ Enum for the function configuration base types. Represents the types of a function configuration. :cvar message: Represents a message function (simple message) :cvar service: Represents a service function (request-reply pattern) :cvar action: Represents an action function (request-feedback-reply pattern) """ message = 'message' service = 'service' action = 'action'
[Doku] class FunctionConfigTags(str, Enum): """ Enum for the function configuration tags. Defines which transport protocol(s) a function entry supports. Matches the protocol identifiers used in interface metadata JSON files. Using ``str, Enum`` so tag values compare equal to plain strings and serialise cleanly to JSON. :cvar ROS2: ROS2 transport (DDS topics, services, actions) :cvar ZENOH: Zenoh transport (pub/sub, queryable) :cvar REDIS: Redis transport (pub/sub, streams) :cvar UDS: Unix Domain Socket transport """ ROS2 = "ros2" ZENOH = "zenoh" REDIS = "redis" UDS = "uds"
[Doku] @dataclass(slots=True) class FunctionConfigBaseParams(DCBase): """ Enum for the function configuration base parameters. Represents the parameters of a function configuration. :ivar datatype: The data type of the parameter (:class:`FunctionConfigParamTypes`) :ivar displayname: The display name of the parameter :ivar description: A description of the parameter """ datatype: FunctionConfigParamTypes displayname: str description: str
[Doku] @dataclass(slots=True) class FunctionConfigBaseReturn(DCBase): """ Enum for the function configuration base return values. Represents the return values of a function configuration. :ivar datatype: The data type of the return value (:class:`FunctionConfigParamTypes`) :ivar displayname: The display name of the return value :ivar description: A description of the return value """ datatype: FunctionConfigParamTypes displayname: str description: str
[Doku] @dataclass(slots=True) class FunctionConfigDisplaystyle(DCBase): """ Enum for the function configuration display style. Defines how the function is displayed in the GUI. :ivar visible: If True, the function is visible in the GUI :ivar published: If True, the function is published in the GUI """ visible: bool = False published: bool = False
[Doku] @dataclass(slots=True) class FunctionConfigPeriodicPublisher(DCBase): """ Stores the periodic publisher settings. :param caller: The function to be called periodically :type caller: Callable :param interval: The time interval in seconds for the periodic call (default: 1.0, must be between 0.01 and 10.0) :type interval: float :raises ValueError: If the interval is not between 0.01 and 10.0 seconds """ caller: Callable interval: float = 1.0 def __post_init__(self): if not (0.01 <= self.interval <= 10.0): raise ValueError(f"speed {self.interval} out of bounds (0.01–10.0)")
[Doku] def asdict(self): """ Convert periodic publisher config to dictionary, replacing callable with name. :return: Dictionary with caller function name instead of reference. :rtype: dict """ result = {} for f in fields(self): if f.name == "caller": result[f.name] = getattr(self.caller, "__name__", None) continue value = getattr(self, f.name) result[f.name] = value return result
[Doku] @dataclass(slots=True) class FunctionConfigEntry(DCBase): """ Contains the function configuration. Stores the function settings for type safety and DDS communication node configuration. :param tags: Tags for the function :type tags: list[FunctionConfigTags] :param type: Type of the function :type type: FunctionConfigBaseTypes :param interfacetypes: Interface types of the function :type interfacetypes: Any :param functionname: Name of the function :type functionname: str :param displayname: Display name of the function :type displayname: str :param description: Description of the function :type description: str :param displaystyle: Display style of the function :type displaystyle: FunctionConfigDisplaystyle :param params: Parameters of the function :type params: list[FunctionConfigBaseParams] :param returns: Return values of the function :type returns: list[FunctionConfigBaseReturn] :param qosprofile: Quality of Service profile (default: 10) :type qosprofile: Union[int, QoSProfile] :param callbacks: Functions to be called when the function is invoked (only for callable) :type callbacks: list[Callable], Optional :param periodic: Function to be called periodically (only for publisher) :type periodic: FunctionConfigPeriodicPublisher, Optional :param namespace: Optional topic namespace segment inserted between module id and function name :type namespace: str, Optional :param subsection: Optional sub-topic segment appended after the function name :type subsection: str, Optional """ tags: list[FunctionConfigTags] type: FunctionConfigBaseTypes interfacetypes: Any functionname: str displayname: str description: str displaystyle: FunctionConfigDisplaystyle params: list[FunctionConfigBaseParams] = field(default_factory=list) returns: list[FunctionConfigBaseReturn] = field(default_factory=list) qosprofile: Union[int, QoSProfile] = 10 callbacks: Union[dict[str, Callable], None] = None periodic: Union[FunctionConfigPeriodicPublisher, None] = None namespace: Union[str, None] = None subsection: Union[str, None] = None
[Doku] def asdict(self): """ Convert FunctionConfigEntry to dictionary with special handling for callable fields. Converts callable references and ROS2 types to their string names for serialization. Nested dataclasses are recursively converted. :return: Dictionary representation with serializable values. :rtype: dict """ result = {} for f in fields(self): if f.name == "callbacks": result[f.name] = {name: getattr(cb, "__name__", None) for name, cb in self.callbacks.items()} if self.callbacks else None continue if f.name == "interfacetypes": result[f.name] = getattr(self.interfacetypes, "__name__", None) continue if f.name == "displaystyle": result[f.name] = self.displaystyle.asdict() continue value = getattr(self, f.name) result[f.name] = value return result
[Doku] @dataclass(slots=True) class ErrorEntry(DCBase): """ Container for the error entry. Stores the error details. :param level: The level of the error (default: 0, MINOR_FAULT) :type level: Union[ErrorEntry.ERROR_LEVEL, int] :param code: The error code (default: 0x00000000) :type code: int :param uuid: Unique identifier for the error entry :type uuid: Union[UUID, Any] :param timestamp: Timestamp of when the error occurred :type timestamp: Any :param description: Description of the error :type description: str :param solution: Suggested solution for the error :type solution: str :param miscellaneous: Additional information about the error :type miscellaneous: str :param module_name: Name of the module where the error occurred :type module_name: str :param module_id: Unique identifier for the module :type module_id: Union[UUID, Any] """ level: Union["ErrorEntry.ERROR_LEVEL", int] = 0 code: int = 0x00000000 uuid: Union[UUID, Any] = field(default_factory=uuid.uuid4) timestamp: Any = '' description: str = '' solution: str = '' miscellaneous: str = '' module_name: str = 'N/A' module_id: Union[UUID, Any] = 'N/A' def __repr__(self) -> str: return ( f"ErrorEntry(level={self.level}, code={self.code}, " f"description={self.description}, solution={self.solution}" )
[Doku] class ERROR_LEVEL(IntEnum): """ Enum for the error level. :cvar MINOR_FAULT: Minor fault (0) :cvar MAJOR_FAULT: Major fault (1) :cvar CRITICAL_FAULT: Critical fault (2) :cvar EMERGENCY_FAULT: Emergency fault (3) """ MINOR_FAULT = 0 MAJOR_FAULT = 1 CRITICAL_FAULT = 2 EMERGENCY_FAULT = 3
[Doku] class NewsEntry(dict): """ News feed entry object. Writes informational data to the graphical user interface. :param level: The level of the message (default: 2, INFO) :type level: Union[NewsEntry.MESSAGE_LEVEL, int] :param message: The message content :type message: str :param timestamp: Timestamp of when the message was created :type timestamp: Any :param uuid: Unique identifier for the news entry :type uuid: Union[UUID, Any] :param module_name: Name of the module that created the news entry :type module_name: str :param module_id: Unique identifier for the module :type module_id: Union[UUID, Any] """ __slots__ = ( 'level', 'message', 'timestamp', 'uuid', 'module_name', 'module_id' )
[Doku] def __init__( self, level: Union["NewsEntry.MESSAGE_LEVEL", int] = 2, message: str = '', timestamp: Any = datetime.now(), uuid: Union[UUID, Any] = uuid.uuid4(), module_name: str = 'N/A', module_id: Union[UUID, Any] = uuid.uuid4(), ) -> None: self.message: str = message self.level: Union[NewsEntry.MESSAGE_LEVEL, int] = level self.timestamp: Union[datetime, Any] = timestamp self.uuid: Union[UUID, Any] = uuid self.module_name: str = module_name self.module_id: Union[UUID, Any] = module_id
def __repr__(self) -> str: return ( f"NewsEntry(level={self.level}, message={self.message}" )
[Doku] class MESSAGE_LEVEL(IntEnum): """ Enum for the message type. :cvar ACTION: Action message (0) :cvar DEBUG: Debug message (1) :cvar INFO: Info message (2) :cvar HINT: Hint message (3) :cvar WARNING: Warning message (4) """ ACTION = 0 DEBUG = 1 INFO = 2 HINT = 3 WARNING = 4
[Doku] @dataclass(slots=True) class PullRequestEntry(DCBase): """ Contains the pull request entry. Stores the pull request details. :param uuid: Unique identifier for the pull request :type uuid: str :param ack_by_user: User who acknowledged the pull request :type ack_by_user: str :param ack_on_date: Date when the pull request was acknowledged :type ack_on_date: str :param request_structure: Structure of the request :type request_structure: str :param request_on_date: Date when the request was made :type request_on_date: str :param request_description: Description of the request :type request_description: str :param response: Response to the request :type response: str :param request_action: Action requested in the pull request :type request_action: str :param request_action_args: Arguments for the requested action :type request_action_args: list :param module_id: Unique identifier for the module associated with the pull request :type module_id: str :param color: Color code for visual representation in GUI :type color: int """ uuid: str ack_by_user: str ack_on_date: str request_structure: str request_on_date: str request_description: str response: str request_action: str request_action_args: list module_id: str color: int def __repr__(self) -> str: return ( f"PullRequestEntry(uuid={self.uuid}, ack_by_user={self.ack_by_user}, " f"ack_on_date={self.ack_on_date}, request_structure={self.request_structure}, " f"request_on_date={self.request_on_date}, request_description={self.request_description}, " f"response={self.response}, request_action={self.request_action}, " f"request_action_args={self.request_action_args}, module_id={self.module_id}, " f"color={self.color})" )
[Doku] @dataclass(repr=True, slots=True) class StateEntry(DCBase): """ Contains the state entry. Stores the state details for life cycle overview of the module. :param current: Current state of the module :type current: str :param trigger: Trigger that caused the state change :type trigger: str :param module_id: Unique identifier for the module :type module_id: Union[UUID, Any] :param module_name: Name of the module :type module_name: str :param timestamp: Timestamp of when the state was recorded :type timestamp: Any :param previous: Previous state of the module (default: 'N/A') :type previous: str """ current: str trigger: str module_id: Union[UUID, Any] module_name: str timestamp: Any previous: str = 'N/A' def __repr__(self) -> str: return ( f"StateEntry(current={self.current}, trigger={self.trigger}, " f"previous={self.previous})" )
[Doku] @dataclass(slots=True) class ModuleEntry(DCBase): """ Contains the module entry. Stores the module details. :param uuid: Unique identifier for the module :type uuid: UUID :param name: Name of the module :type name: str :param template: Template identifier for the module :type template: str :param description: Description of the module :type description: str :param version: Version of the module (semantic versioning, e.g. '1.0.0') :type version: str """ uuid: str name: str template: str description: str version: str def __repr__(self) -> str: return ( f"ModuleEntry(uuid={self.uuid}, name={self.name}, " f"template={self.template}, description={self.description}, " f"version={self.version})" )
[Doku] def to_dict(self) -> dict: """ Returns the dictionary representation of the ModuleEntry. :return: Dictionary containing the module details. :rtype: dict """ return { 'uuid': str(self.uuid), 'name': self.name, 'template': self.template, 'description': self.description, 'version': self.version }
[Doku] @staticmethod def gen_uuid() -> str: """Generates a new UUID for the module entry in hex format""" return uuid.uuid4().hex
[Doku] def restructure_uuid(self) -> UUID: """ Converts the UUID string to a UUID object. :return: UUID object of the module entry. :rtype: UUID """ return uuid.UUID(hex=self.uuid)
[Doku] @dataclass(slots=True) class AvailableModuleEntry(DCBase): """ Not used in the current implementation. """ pass