Quellcode für vyra_base.state.operational_state_machine

"""
Base class for operational state machines using metaclass-based automation.

This module provides a base class that modules can inherit from to get
automatic operational state management through lifecycle methods.
"""

import logging
from typing import Optional, Dict, Any

from .operational_metaclass import MetaOperationalState
from .state_machine import StateMachine
from .state_types import OperationalState


logger = logging.getLogger(__name__)


[Doku] class OperationalStateMachine(metaclass=MetaOperationalState): """ Base class for modules that need automatic operational state management. This class integrates with the 3-layer StateMachine and provides a clean interface for defining lifecycle methods that automatically handle state transitions. Subclasses should implement one or more of the following methods: - initialize(): Setup and initialization logic - pause(): Pause current operation - resume(): Resume paused operation - stop(): Stop current operation - reset(): Reset to initial state For dynamic operations, use the @operation decorator. All methods should return True on success, False on failure. Exceptions are caught and treated as failures. Example: >>> class MyModule(OperationalStateMachine): ... def __init__(self, state_machine): ... super().__init__(state_machine) ... self.data = None ... ... def initialize(self): ... # This runs when initialize() is called ... # Pre-condition: IDLE state ... # On success: IDLE -> READY (operation counter reset) ... # On failure: IDLE -> ERROR ... self.data = [] ... print("Initialized!") ... return True >>> >>> # Usage >>> fsm = StateMachine() >>> module = MyModule(fsm) >>> module.initialize() # Automatic state management >>> # Now in READY state, ready for operations """
[Doku] def __init__(self, state_machine: StateMachine): """ Initialize the operational state machine. Args: state_machine: The underlying 3-layer StateMachine instance """ self._state_machine = state_machine self._operation_counter = 0 # Reference counter for active operations logger.info(f"{self.__class__.__name__} initialized with state machine")
# ------------------------------------------------------------------------- # State Query Methods # -------------------------------------------------------------------------
[Doku] def get_operational_state(self) -> OperationalState: """Get current operational state.""" return self._state_machine.get_operational_state()
[Doku] def get_all_states(self) -> Dict[str, str]: """Get all current states (lifecycle, operational, health).""" return self._state_machine.get_current_state()
[Doku] def is_idle(self) -> bool: """Check if in IDLE state.""" return self.get_operational_state() == OperationalState.IDLE
[Doku] def is_ready(self) -> bool: """Check if in READY state.""" return self.get_operational_state() == OperationalState.READY
[Doku] def is_running(self) -> bool: """Check if in RUNNING state.""" return self.get_operational_state() == OperationalState.RUNNING
[Doku] def is_paused(self) -> bool: """Check if in PAUSED state.""" return self.get_operational_state() == OperationalState.PAUSED
[Doku] def is_stopped(self) -> bool: """Check if in STOPPED state.""" return self.get_operational_state() == OperationalState.STOPPED
[Doku] def is_error(self) -> bool: """Check if in ERROR state.""" return self.get_operational_state() == OperationalState.ERROR
# ------------------------------------------------------------------------- # Internal State Transition Method # ------------------------------------------------------------------------- def _set_operational_state(self, target_state: OperationalState): """ Internal method to set operational state. This is called by the metaclass wrapper to perform state transitions. Args: target_state: Target operational state """ from .state_events import StateEvent, EventType # Map target states to appropriate events (context-sensitive for READY) current_state = self.get_operational_state() if target_state == OperationalState.READY and current_state == OperationalState.PAUSED: # Resuming from PAUSED uses TASK_RESUME, not SET_READY event_type = EventType.TASK_RESUME elif target_state == OperationalState.READY and current_state == OperationalState.RUNNING: # Completing from RUNNING uses TASK_COMPLETE, not SET_READY event_type = EventType.TASK_COMPLETE else: event_mapping = { OperationalState.READY: EventType.SET_READY, OperationalState.RUNNING: EventType.TASK_START, OperationalState.PAUSED: EventType.TASK_PAUSE, OperationalState.STOPPED: EventType.TASK_STOP, OperationalState.IDLE: EventType.TASK_RESET, OperationalState.ERROR: EventType.TASK_ERROR, } event_type = event_mapping.get(target_state) if event_type is None: logger.error(f"No event mapping for target state {target_state.value}") return event = StateEvent(event_type, payload={"source": "OperationalStateMachine"}) self._state_machine.send_event(event) # ------------------------------------------------------------------------- # Operation Reference Counting Methods # ------------------------------------------------------------------------- def _increment_operation_counter(self): """ Increment the operation reference counter. If counter goes from 0 to 1 and current state is READY, automatically transition to RUNNING. This is called by the @operation decorator when starting an operation. """ current_state = self.get_operational_state() # Only increment if in valid state if current_state not in {OperationalState.READY, OperationalState.RUNNING}: logger.warning( f"Cannot increment operation counter in state {current_state.value}. " f"Counter remains at {self._operation_counter}" ) return self._operation_counter += 1 logger.debug(f"Operation counter incremented to {self._operation_counter}") # Transition to RUNNING if this is the first operation if self._operation_counter == 1 and current_state == OperationalState.READY: logger.info("First operation started, transitioning READY -> RUNNING") self._set_operational_state(OperationalState.RUNNING) def _decrement_operation_counter(self): """ Decrement the operation reference counter. If counter reaches 0 and current state is RUNNING, automatically transition back to READY. This is called by the @operation decorator when completing an operation. """ if self._operation_counter <= 0: logger.warning("Operation counter already at 0, cannot decrement") return self._operation_counter -= 1 logger.debug(f"Operation counter decremented to {self._operation_counter}") # Transition to READY if all operations completed if self._operation_counter == 0 and self.get_operational_state() == OperationalState.RUNNING: logger.info("All operations completed, transitioning RUNNING -> READY") self._set_operational_state(OperationalState.READY)
[Doku] def get_operation_counter(self) -> int: """ Get the current operation reference counter value. Returns: Number of currently active operations """ return self._operation_counter
def _reset_operation_counter(self): """ Reset the operation reference counter to zero. This is called by on_initialize() and on_resume() to ensure a clean state after initialization or resuming from pause. """ if self._operation_counter != 0: logger.warning( f"Resetting operation counter from {self._operation_counter} to 0. " f"This may indicate incomplete operations." ) self._operation_counter = 0 logger.debug("Operation counter reset to 0") # ------------------------------------------------------------------------- # Public Lifecycle API # ------------------------------------------------------------------------- # Lifecycle Methods - Implement these in subclasses # ------------------------------------------------------------------------- # Subclasses should implement: initialize(), pause(), resume(), stop(), reset() # These methods will be automatically wrapped by the MetaOperationalState metaclass # with state validation, transitions, and error handling. # For dynamic operations, use the @operation decorator. def __repr__(self) -> str: """String representation.""" return f"{self.__class__.__name__}(state={self.get_operational_state().value})"