Quellcode für vyra_base.state.unified

"""
Integration module for easy unified state machine access.

Provides a simple API that combines all three layers into a single
unified interface for module state management.
"""

import logging
from typing import Optional, Dict, Any, Callable

from .state_machine import StateMachine, StateMachineConfig
from .lifecycle_layer import LifecycleLayer
from .operational_layer import OperationalLayer
from .health_layer import HealthLayer
from .state_types import LifecycleState, OperationalState, HealthState, StateType


logger = logging.getLogger(__name__)


[Doku] class UnifiedStateMachine: """ Unified interface for 3-layer state machine. Combines Lifecycle, Operational, and Health layers into a single easy-to-use API. Recommended for most use cases. Example: >>> usm = UnifiedStateMachine() >>> usm.start() # Lifecycle >>> usm.complete_initialization() >>> usm.set_ready() # Operational >>> usm.start_task({'task_id': '123'}) >>> usm.report_warning({'cpu': '85%'}) # Health >>> usm.get_all_states() {'lifecycle': 'Active', 'operational': 'Running', 'health': 'Warning'} """
[Doku] def __init__(self, config: Optional[StateMachineConfig] = None): """ Initialize unified state machine. Args: config: Configuration for state machine behavior """ # Create core FSM self.fsm = StateMachine(config) # Create layer interfaces self.lifecycle = LifecycleLayer(self.fsm) self.operational = OperationalLayer(self.fsm) self.health = HealthLayer(self.fsm) logger.info("UnifiedStateMachine initialized")
# ------------------------------------------------------------------------- # Unified State Query # -------------------------------------------------------------------------
[Doku] def get_all_states(self) -> Dict[str, str]: """Get current state of all layers.""" return self.fsm.get_current_state()
[Doku] def get_lifecycle_state(self) -> LifecycleState: """Get lifecycle state.""" return self.lifecycle.get_state()
[Doku] def get_operational_state(self) -> OperationalState: """Get operational state.""" return self.operational.get_state()
[Doku] def get_health_state(self) -> HealthState: """Get health state.""" return self.health.get_state()
[Doku] def is_operational(self) -> bool: """Check if module can execute tasks.""" return self.fsm.is_operational()
[Doku] def is_healthy(self) -> bool: """Check if module health is OK.""" return self.fsm.is_healthy()
# Lifecycle state shortcuts
[Doku] def is_initializing(self) -> bool: """Check if module is initializing.""" return self.lifecycle.is_initializing()
[Doku] def is_active(self) -> bool: """Check if module lifecycle is Active.""" return self.lifecycle.is_active()
[Doku] def is_suspended(self) -> bool: """Check if module is suspended.""" return self.lifecycle.is_suspended()
[Doku] def is_recovering(self) -> bool: """Check if module is in recovery.""" return self.lifecycle.is_recovering()
[Doku] def is_shutting_down(self) -> bool: """Check if module is shutting down.""" return self.lifecycle.is_shutting_down()
[Doku] def is_offline(self) -> bool: """Check if module is offline.""" return self.lifecycle.is_offline()
# ------------------------------------------------------------------------- # Lifecycle Methods (delegated) # -------------------------------------------------------------------------
[Doku] def start(self, metadata: Optional[Dict[str, Any]] = None) -> LifecycleState: """Start module initialization.""" return self.lifecycle.start(metadata)
[Doku] def complete_initialization(self, result: Optional[Dict[str, Any]] = None) -> LifecycleState: """Complete initialization successfully.""" return self.lifecycle.complete_initialization(result)
[Doku] def fail_initialization(self, error: Optional[str] = None) -> LifecycleState: """Mark initialization as failed.""" return self.lifecycle.fail_initialization(error)
[Doku] def shutdown(self, reason: Optional[str] = None) -> LifecycleState: """Begin controlled shutdown.""" return self.lifecycle.shutdown(reason)
[Doku] def complete_shutdown(self) -> LifecycleState: """Complete shutdown process.""" return self.lifecycle.complete_shutdown()
[Doku] def suspend(self, reason: Optional[str] = None) -> LifecycleState: """ Suspend the module temporarily (e.g. for maintenance or updates). Transitions: Active → Suspended Args: reason: Optional reason for suspension Returns: New lifecycle state """ return self.lifecycle.enter_suspend(reason)
[Doku] def resume_from_suspend(self, info: Optional[Dict[str, Any]] = None) -> LifecycleState: """ Resume module from Suspended state. Transitions: Suspended → Active Args: info: Optional context about the resume Returns: New lifecycle state """ return self.lifecycle.resume_from_suspend(info)
[Doku] def enter_recovery(self, fault_info: Optional[Dict[str, Any]] = None) -> LifecycleState: """ Enter recovery mode due to fault. Transitions: Active → Recovering Args: fault_info: Fault description and context Returns: New lifecycle state """ return self.lifecycle.enter_recovery(fault_info)
[Doku] def complete_recovery(self, recovery_info: Optional[Dict[str, Any]] = None) -> LifecycleState: """ Complete recovery successfully. Transitions: Recovering → Active Args: recovery_info: Recovery details Returns: New lifecycle state """ return self.lifecycle.complete_recovery(recovery_info)
[Doku] def fail_recovery(self, error: Optional[str] = None) -> LifecycleState: """ Mark recovery as failed. Transitions: Recovering → ShuttingDown Args: error: Recovery failure reason Returns: New lifecycle state """ return self.lifecycle.fail_recovery(error)
# ------------------------------------------------------------------------- # Operational Methods (delegated) # -------------------------------------------------------------------------
[Doku] def set_ready(self, metadata: Optional[Dict[str, Any]] = None) -> OperationalState: """Signal readiness for tasks.""" return self.operational.set_ready(metadata)
[Doku] def start_task(self, task_info: Optional[Dict[str, Any]] = None) -> OperationalState: """Start task execution.""" return self.operational.start_task(task_info)
[Doku] def pause(self, reason: Optional[str] = None) -> OperationalState: """Pause current task.""" return self.operational.pause(reason)
[Doku] def resume(self) -> OperationalState: """Resume paused task.""" return self.operational.resume()
[Doku] def stop(self, result: Optional[Dict[str, Any]] = None) -> OperationalState: """Mark task as completed.""" return self.operational.stop(result)
[Doku] def reset(self) -> OperationalState: """Reset operational state to Idle.""" return self.operational.reset()
# ------------------------------------------------------------------------- # Health Methods (delegated) # -------------------------------------------------------------------------
[Doku] def report_warning(self, warning_info: Optional[Dict[str, Any]] = None) -> HealthState: """Report non-critical warning.""" return self.health.report_warning(warning_info)
[Doku] def report_fault(self, fault_info: Optional[Dict[str, Any]] = None) -> HealthState: """Report critical fault.""" return self.health.report_fault(fault_info)
[Doku] def recover(self, recovery_info: Optional[Dict[str, Any]] = None) -> HealthState: """Attempt recovery from fault.""" return self.health.recover(recovery_info)
[Doku] def emergency_stop(self, reason: str) -> Dict[str, str]: """Trigger emergency stop.""" return self.health.emergency_stop(reason)
# ------------------------------------------------------------------------- # Callbacks and Monitoring # -------------------------------------------------------------------------
[Doku] def on_lifecycle_change(self, callback: Callable, priority: int = 0): """Subscribe to lifecycle state changes.""" self.lifecycle.on_state_change(callback, priority)
[Doku] def on_operational_change(self, callback: Callable, priority: int = 0): """Subscribe to operational state changes.""" self.operational.on_state_change(callback, priority)
[Doku] def on_health_change(self, callback: Callable, priority: int = 0): """Subscribe to health state changes.""" self.health.on_state_change(callback, priority)
[Doku] def on_any_change(self, callback: Callable, priority: int = 0): """Subscribe to any state change across all layers.""" self.fsm.subscribe(StateType.ANY, callback, priority)
# ------------------------------------------------------------------------- # Diagnostics # -------------------------------------------------------------------------
[Doku] def get_diagnostic_info(self) -> Dict[str, Any]: """Get comprehensive diagnostic information.""" return { "states": self.get_all_states(), "lifecycle_info": self.lifecycle.get_info(), "operational_info": self.operational.get_info(), "health_info": self.health.get_info(), "fsm_diagnostics": self.fsm.get_diagnostic_info(), }
[Doku] def get_history(self, limit: Optional[int] = None): """Get state transition history.""" return self.fsm.get_history(limit)
[Doku] def clear_history(self): """Clear transition history.""" self.fsm.clear_history()
# ------------------------------------------------------------------------- # Event Handling (for OperationalStateMachine compatibility) # -------------------------------------------------------------------------
[Doku] def send_event(self, event): """ Send an event to the state machine. This method is required for compatibility with OperationalStateMachine which calls _state_machine.send_event(event). Args: event: StateEvent to send to the FSM """ self.fsm.send_event(event)
# ------------------------------------------------------------------------- # Standard Startup Sequence # -------------------------------------------------------------------------
[Doku] def standard_startup(self) -> bool: """ Execute standard module startup sequence. Sequence: 1. Start initialization 2. Complete initialization 3. Set operational ready Returns: True if startup successful, False otherwise """ try: self.start() logger.info("Startup: Initializing...") # Allow custom initialization logic here # In real implementation, this would initialize resources self.complete_initialization() logger.info("Startup: Initialization complete") self.set_ready() logger.info("Startup: Ready for tasks") return True except Exception as e: logger.error(f"Startup failed: {e}") self.fail_initialization(str(e)) return False
[Doku] def standard_shutdown(self) -> bool: """ Execute standard module shutdown sequence. Sequence: 1. Begin shutdown 2. Complete any pending tasks (module responsibility) 3. Complete shutdown Returns: True if shutdown successful, False otherwise """ try: self.shutdown("Standard shutdown") logger.info("Shutdown: Initiated") # Allow cleanup logic here # In real implementation, this would clean up resources self.complete_shutdown() logger.info("Shutdown: Complete") return True except Exception as e: logger.error(f"Shutdown failed: {e}") return False
def __repr__(self) -> str: """String representation.""" states = self.get_all_states() return f"UnifiedStateMachine(lifecycle={states['lifecycle']}, operational={states['operational']}, health={states['health']})"