Source code for vyra_base.state.operational_state_machine
"""
Base class for operational state machines using metaclass-based automation.
This module provides a base class that modules can inherit from to get
automatic operational state management through lifecycle methods.
"""
import logging
from typing import Optional, Dict, Any
from .operational_metaclass import MetaOperationalState
from .state_machine import StateMachine
from .state_types import OperationalState
logger = logging.getLogger(__name__)
[docs]
class OperationalStateMachine(metaclass=MetaOperationalState):
"""
Base class for modules that need automatic operational state management.
This class integrates with the 3-layer StateMachine and provides a
clean interface for defining lifecycle methods that automatically
handle state transitions.
Subclasses should implement one or more of the following methods:
- initialize(): Setup and initialization logic
- pause(): Pause current operation
- resume(): Resume paused operation
- stop(): Stop current operation
- reset(): Reset to initial state
For dynamic operations, use the @operation decorator.
All methods should return True on success, False on failure.
Exceptions are caught and treated as failures.
Example:
>>> class MyModule(OperationalStateMachine):
... def __init__(self, state_machine):
... super().__init__(state_machine)
... self.data = None
...
... def initialize(self):
... # This runs when initialize() is called
... # Pre-condition: IDLE state
... # On success: IDLE -> READY (operation counter reset)
... # On failure: IDLE -> ERROR
... self.data = []
... print("Initialized!")
... return True
>>>
>>> # Usage
>>> fsm = StateMachine()
>>> module = MyModule(fsm)
>>> module.initialize() # Automatic state management
>>> # Now in READY state, ready for operations
"""
[docs]
def __init__(self, state_machine: StateMachine):
"""
Initialize the operational state machine.
Args:
state_machine: The underlying 3-layer StateMachine instance
"""
self._state_machine = state_machine
self._operation_counter = 0 # Reference counter for active operations
logger.info(f"{self.__class__.__name__} initialized with state machine")
# -------------------------------------------------------------------------
# State Query Methods
# -------------------------------------------------------------------------
[docs]
def get_operational_state(self) -> OperationalState:
"""Get current operational state."""
return self._state_machine.get_operational_state()
[docs]
def get_all_states(self) -> Dict[str, str]:
"""Get all current states (lifecycle, operational, health)."""
return self._state_machine.get_current_state()
[docs]
def is_idle(self) -> bool:
"""Check if in IDLE state."""
return self.get_operational_state() == OperationalState.IDLE
[docs]
def is_ready(self) -> bool:
"""Check if in READY state."""
return self.get_operational_state() == OperationalState.READY
[docs]
def is_running(self) -> bool:
"""Check if in RUNNING state."""
return self.get_operational_state() == OperationalState.RUNNING
[docs]
def is_paused(self) -> bool:
"""Check if in PAUSED state."""
return self.get_operational_state() == OperationalState.PAUSED
[docs]
def is_stopped(self) -> bool:
"""Check if in STOPPED state."""
return self.get_operational_state() == OperationalState.STOPPED
[docs]
def is_error(self) -> bool:
"""Check if in ERROR state."""
return self.get_operational_state() == OperationalState.ERROR
# -------------------------------------------------------------------------
# Internal State Transition Method
# -------------------------------------------------------------------------
def _set_operational_state(self, target_state: OperationalState):
"""
Internal method to set operational state.
This is called by the metaclass wrapper to perform state transitions.
Args:
target_state: Target operational state
"""
from .state_events import StateEvent, EventType
# Map target states to appropriate events (context-sensitive for READY)
current_state = self.get_operational_state()
if target_state == OperationalState.READY and current_state == OperationalState.PAUSED:
# Resuming from PAUSED uses TASK_RESUME, not SET_READY
event_type = EventType.TASK_RESUME
elif target_state == OperationalState.READY and current_state == OperationalState.RUNNING:
# Completing from RUNNING uses TASK_COMPLETE, not SET_READY
event_type = EventType.TASK_COMPLETE
else:
event_mapping = {
OperationalState.READY: EventType.SET_READY,
OperationalState.RUNNING: EventType.TASK_START,
OperationalState.PAUSED: EventType.TASK_PAUSE,
OperationalState.STOPPED: EventType.TASK_STOP,
OperationalState.IDLE: EventType.TASK_RESET,
OperationalState.ERROR: EventType.TASK_ERROR,
}
event_type = event_mapping.get(target_state)
if event_type is None:
logger.error(f"No event mapping for target state {target_state.value}")
return
event = StateEvent(event_type, payload={"source": "OperationalStateMachine"})
self._state_machine.send_event(event)
# -------------------------------------------------------------------------
# Operation Reference Counting Methods
# -------------------------------------------------------------------------
def _increment_operation_counter(self):
"""
Increment the operation reference counter.
If counter goes from 0 to 1 and current state is READY,
automatically transition to RUNNING.
This is called by the @operation decorator when starting an operation.
"""
current_state = self.get_operational_state()
# Only increment if in valid state
if current_state not in {OperationalState.READY, OperationalState.RUNNING}:
logger.warning(
f"Cannot increment operation counter in state {current_state.value}. "
f"Counter remains at {self._operation_counter}"
)
return
self._operation_counter += 1
logger.debug(f"Operation counter incremented to {self._operation_counter}")
# Transition to RUNNING if this is the first operation
if self._operation_counter == 1 and current_state == OperationalState.READY:
logger.info("First operation started, transitioning READY -> RUNNING")
self._set_operational_state(OperationalState.RUNNING)
def _decrement_operation_counter(self):
"""
Decrement the operation reference counter.
If counter reaches 0 and current state is RUNNING,
automatically transition back to READY.
This is called by the @operation decorator when completing an operation.
"""
if self._operation_counter <= 0:
logger.warning("Operation counter already at 0, cannot decrement")
return
self._operation_counter -= 1
logger.debug(f"Operation counter decremented to {self._operation_counter}")
# Transition to READY if all operations completed
if self._operation_counter == 0 and self.get_operational_state() == OperationalState.RUNNING:
logger.info("All operations completed, transitioning RUNNING -> READY")
self._set_operational_state(OperationalState.READY)
[docs]
def get_operation_counter(self) -> int:
"""
Get the current operation reference counter value.
Returns:
Number of currently active operations
"""
return self._operation_counter
def _reset_operation_counter(self):
"""
Reset the operation reference counter to zero.
This is called by on_initialize() and on_resume() to ensure
a clean state after initialization or resuming from pause.
"""
if self._operation_counter != 0:
logger.warning(
f"Resetting operation counter from {self._operation_counter} to 0. "
f"This may indicate incomplete operations."
)
self._operation_counter = 0
logger.debug("Operation counter reset to 0")
# -------------------------------------------------------------------------
# Public Lifecycle API
# -------------------------------------------------------------------------
# Lifecycle Methods - Implement these in subclasses
# -------------------------------------------------------------------------
# Subclasses should implement: initialize(), pause(), resume(), stop(), reset()
# These methods will be automatically wrapped by the MetaOperationalState metaclass
# with state validation, transitions, and error handling.
# For dynamic operations, use the @operation decorator.
def __repr__(self) -> str:
"""String representation."""
return f"{self.__class__.__name__}(state={self.get_operational_state().value})"