Event System¶
The State Machine uses an event-driven architecture pattern. All state transitions are triggered by events.
Event Structure¶
Each event is an immutable StateEvent instance:
from dataclasses import dataclass
from datetime import datetime
from typing import Optional, Dict, Any
@dataclass(frozen=True)
class StateEvent:
event_type: EventType # Event type
timestamp: datetime # Timestamp
origin_layer: Optional[str] # Origin layer (lifecycle/operational/health)
payload: Optional[Dict[str, Any]] # Event-specific data
event_id: Optional[str] # Unique event ID
Example¶
from vyra_base.state.state_events import StateEvent, EventType
event = StateEvent(
event_type=EventType.TASK_START,
origin_layer="operational",
payload={"task_id": "task_123", "priority": "high"}
)
# Handle the event
state_machine.handle_event(event)
Event Types Overview¶
Lifecycle Events¶
Event |
Transition |
Description |
|---|---|---|
START |
Offline → Initializing |
Start module initialization |
INIT_SUCCESS |
Initializing → Active |
Initialization successful |
INIT_FAILURE |
Initializing → Recovering |
Initialization failed |
SHUTDOWN |
Active → ShuttingDown |
Controlled shutdown |
FINISHED |
ShuttingDown → Offline |
Shutdown completed |
FAULT_DETECTED |
Active → Recovering |
Error detected, recovery needed |
RECOVERY_SUCCESS |
Recovering → Active |
Recovery successful |
RECOVERY_FAILED |
Recovering → ShuttingDown |
Recovery failed |
Operational Events¶
Event |
Transition |
Description |
|---|---|---|
SET_READY |
Idle → Ready |
Signal readiness for tasks |
TASK_START |
Ready → Running |
Start task execution |
SET_BACKGROUND |
Running → BackgroundRunning |
Switch to background processing |
SET_FOREGROUND |
BackgroundRunning → Running |
Return to foreground execution |
TASK_PAUSE |
Running → Paused |
Pause task |
TASK_RESUME |
Paused → Ready |
Resume paused task |
TASK_COMPLETE |
Running → Ready |
Task completed successfully |
TASK_STOP |
Running → Stopped |
Stop task (requires reset) |
TASK_RESET |
Stopped → Idle |
Reset operational state |
Health Events¶
Event |
Transition |
Description |
|---|---|---|
WARN |
Healthy → Warning |
Report non-critical warning |
CLEAR_WARNING |
Warning → Healthy |
Warning resolved |
FAULT |
Any → Critical |
Critical error detected |
RECOVER |
Critical → Healthy/Warning |
Recovery from critical error |
RESET |
Any → Healthy |
Reset health state |
Cross-Layer Events¶
Event |
Impact |
|---|---|
INTERRUPT |
Interrupts current operation, can affect all layers |
EMERGENCY_STOP |
Emergency stop: Health→Critical, Operational→Stopped, Lifecycle→ShuttingDown |
PRIORITY_OVERRIDE |
Admin/Debug override for special system states |
Event Handling¶
Basic Usage¶
from vyra_base.state import UnifiedStateMachine
from vyra_base.state.state_events import EventType
# Create state machine
state_machine = UnifiedStateMachine(module_name="my_module")
# Handle events using EventType
state_machine.handle_event(EventType.START)
state_machine.handle_event(EventType.INIT_SUCCESS)
state_machine.handle_event(EventType.SET_READY)
With Payload¶
Events can carry additional data in the payload:
from vyra_base.state.state_events import StateEvent, EventType
# Create event with payload
event = StateEvent(
event_type=EventType.TASK_START,
origin_layer="operational",
payload={
"task_id": "task_123",
"priority": "high",
"parameters": {"speed": 100}
}
)
# Handle the event
state_machine.handle_event(event)
Event Callbacks¶
Register callbacks to react to events:
def on_lifecycle_event(old_state, new_state, event):
print(f"Lifecycle: {old_state} → {new_state}")
print(f"Event: {event.event_type}")
if event.payload:
print(f"Payload: {event.payload}")
def on_operational_event(old_state, new_state, event):
print(f"Operational: {old_state} → {new_state}")
def on_health_event(old_state, new_state, event):
print(f"Health: {old_state} → {new_state}")
# Register callbacks
state_machine.register_lifecycle_callback(on_lifecycle_event)
state_machine.register_operational_callback(on_operational_event)
state_machine.register_health_callback(on_health_event)
Event Validation¶
Invalid Events¶
The state machine validates events before processing:
from vyra_base.state.state_types import InvalidTransitionError
try:
# Try to start task when not ready
state_machine.handle_event(EventType.TASK_START)
except InvalidTransitionError as e:
print(f"Invalid event: {e}")
# Output: "Cannot transition from Idle to Running with event TASK_START"
Event Sequence Validation¶
Events must follow valid transition sequences:
# ✅ Valid sequence
state_machine.handle_event(EventType.START) # Offline → Initializing
state_machine.handle_event(EventType.INIT_SUCCESS) # Initializing → Active
state_machine.handle_event(EventType.SET_READY) # Idle → Ready
state_machine.handle_event(EventType.TASK_START) # Ready → Running
# ❌ Invalid sequence
state_machine.handle_event(EventType.TASK_START) # Error: Not in Ready state
Event Priority¶
Events have different priorities:
High Priority (Immediate)¶
EMERGENCY_STOPFAULTINTERRUPT
These events are processed immediately and can override current operations.
Normal Priority¶
Most lifecycle events (
START,INIT_SUCCESS,SHUTDOWN)Most operational events (
TASK_START,TASK_STOP)Health monitoring events (
WARN,CLEAR_WARNING)
Low Priority (Queued)¶
Status updates
Monitoring events
Non-critical notifications
Event Queue¶
Events are processed through an internal queue:
# Events are queued and processed in order
state_machine.handle_event(EventType.SET_READY)
state_machine.handle_event(EventType.TASK_START)
state_machine.handle_event(EventType.TASK_COMPLETE)
# High priority events jump the queue
state_machine.handle_event(EventType.EMERGENCY_STOP) # Processed immediately
Async Event Handling¶
For async operations:
async def handle_events_async():
# Handle event asynchronously
await state_machine.handle_event_async(EventType.TASK_START)
# Wait for task completion
await task_execution()
await state_machine.handle_event_async(EventType.TASK_COMPLETE)
Event Patterns¶
Task Execution Pattern¶
# Standard task execution pattern
state_machine.handle_event(EventType.SET_READY)
state_machine.handle_event(EventType.TASK_START)
try:
# Execute task
execute_task()
state_machine.handle_event(EventType.TASK_COMPLETE)
except Exception as e:
# Handle error
state_machine.handle_event(EventType.TASK_STOP)
state_machine.handle_event(EventType.FAULT)
Error Recovery Pattern¶
# Error recovery pattern
def handle_error():
# Detect error
state_machine.handle_event(EventType.FAULT_DETECTED)
# Attempt recovery
try:
perform_recovery()
state_machine.handle_event(EventType.RECOVERY_SUCCESS)
except Exception:
state_machine.handle_event(EventType.RECOVERY_FAILED)
Graceful Shutdown Pattern¶
# Graceful shutdown pattern
def shutdown():
# Stop current tasks
if state_machine.get_operational_state() == OperationalState.RUNNING:
state_machine.handle_event(EventType.TASK_STOP)
# Initiate shutdown
state_machine.handle_event(EventType.SHUTDOWN)
# Cleanup
cleanup_resources()
# Finish shutdown
state_machine.handle_event(EventType.FINISHED)
Event Logging¶
All events are automatically logged:
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
# Events will be logged automatically
state_machine.handle_event(EventType.TASK_START)
# Output: INFO: Handling event TASK_START: Ready → Running
Custom Event Logging¶
Add custom logging via callbacks:
def log_event(old_state, new_state, event):
logging.info(
f"State change: {old_state} → {new_state}",
extra={
"event_type": event.event_type,
"event_id": event.event_id,
"timestamp": event.timestamp,
"payload": event.payload
}
)
state_machine.register_lifecycle_callback(log_event)
Best Practices¶
Always use EventType enum
❌ Don’t use strings:
state_machine.handle_event("task_start") # Wrong
✅ Use EventType:
state_machine.handle_event(EventType.TASK_START) # Correct
Handle exceptions
try: state_machine.handle_event(EventType.TASK_START) except InvalidTransitionError as e: logger.error(f"Transition failed: {e}")
Use payload for context
event = StateEvent( event_type=EventType.FAULT, payload={"error": str(exception), "component": "database"} ) state_machine.handle_event(event)
Register callbacks for monitoring
state_machine.register_lifecycle_callback(monitor_lifecycle) state_machine.register_operational_callback(monitor_operations) state_machine.register_health_callback(monitor_health)
Use emergency stop for critical situations
if critical_error_detected(): state_machine.handle_event(EventType.EMERGENCY_STOP)
See Also¶
3-Layer State Machine Architecture - Architecture overview
State Definitions - State definitions
State Transitions - Transition rules
API Reference - Complete API reference