Source code for vyra_base.state.operational_metaclass
"""
Metaclass for automatic operational state management.
This module provides a metaclass that automatically wraps lifecycle methods
with state validation and transition logic, following industrial automation
best practices.
"""
import asyncio
import functools
import logging
from typing import Callable, Any, Optional
from .state_types import OperationalState
logger = logging.getLogger(__name__)
[docs]
class OperationalStateError(Exception):
"""Raised when an operation is called in an invalid operational state."""
pass
[docs]
class MetaOperationalState(type):
"""
Metaclass that automatically manages operational state transitions.
This metaclass wraps user-defined lifecycle methods and automatically handles:
- Pre-condition state validation
- State transitions before method execution
- Success/failure state transitions after method execution
- Error handling and logging
Supported lifecycle methods and their state transitions:
``initialize()``:
Pre-condition: IDLE
On success: IDLE -> READY (resets operation counter)
On failure: IDLE -> ERROR
``pause()``:
Pre-condition: RUNNING
On success: RUNNING -> PAUSED
On failure: No state change
``resume()``:
Pre-condition: PAUSED
On success: PAUSED -> READY (resets operation counter)
On failure: PAUSED -> ERROR
``stop()``:
Pre-condition: RUNNING, PAUSED
On success: (current) -> STOPPED
On failure: (current) -> ERROR
``reset()``:
Pre-condition: STOPPED, ERROR
On success: (current) -> IDLE
On failure: No state change
Example:
>>> class MyModule(OperationalStateMachine):
... def initialize(self):
... # Setup hardware, load config, etc.
... print("Initializing...")
... return True
...
>>>
>>> module = MyModule(state_machine)
>>> module.initialize() # Automatically: IDLE -> READY on success
"""
# Mapping of lifecycle methods to their state transition rules
STATE_RULES = {
'initialize': {
'required_states': {OperationalState.IDLE},
'pre_transition': None,
'success_transition': OperationalState.READY,
'failure_transition': OperationalState.ERROR,
'reset_counter': True, # Reset operation counter on success
},
'pause': {
'required_states': {OperationalState.RUNNING},
'pre_transition': None,
'success_transition': OperationalState.PAUSED,
'failure_transition': OperationalState.ERROR,
'reset_counter': False,
},
'resume': {
'required_states': {OperationalState.PAUSED},
'pre_transition': None,
'success_transition': OperationalState.READY,
'failure_transition': OperationalState.ERROR,
'reset_counter': True, # Reset operation counter on success
},
'stop': {
'required_states': {OperationalState.RUNNING, OperationalState.PAUSED},
'pre_transition': None,
'success_transition': OperationalState.STOPPED,
'failure_transition': OperationalState.ERROR,
'reset_counter': False,
},
'reset': {
'required_states': {OperationalState.STOPPED, OperationalState.ERROR},
'pre_transition': None,
'success_transition': OperationalState.IDLE,
'failure_transition': None,
'reset_counter': False,
},
}
def __new__(cls, name, bases, attrs):
"""
Create a new class with wrapped lifecycle methods.
Args:
name: Class name
bases: Base classes
attrs: Class attributes and methods
Returns:
New class with wrapped methods
"""
# Find and wrap all lifecycle methods that have state rules
for method_name, rules in cls.STATE_RULES.items():
if method_name in attrs:
original_method = attrs[method_name]
attrs[method_name] = cls._wrap_method(original_method, method_name, rules)
logger.debug(f"Wrapped method '{method_name}' with state transitions")
return super().__new__(cls, name, bases, attrs)
@staticmethod
def _wrap_method(original_method: Callable, method_name: str, rules: dict) -> Callable:
"""
Wrap a lifecycle method with state validation and transition logic.
Supports both sync and async (coroutine) original methods. When the
wrapped method is async the returned wrapper is also ``async def`` so
that callers can properly ``await`` it.
Args:
original_method: Original user-defined method (sync or async)
method_name: Name of the method
rules: State transition rules
Returns:
Wrapped method with state management (async if original is async)
"""
is_async = asyncio.iscoroutinefunction(original_method)
if is_async:
@functools.wraps(original_method)
async def async_wrapper(self, *args, **kwargs) -> Any:
current_state = self.get_operational_state()
required_states = rules['required_states']
if current_state not in required_states:
error_msg = (
f"Cannot call {method_name} in state {current_state.value}. "
f"Required states: {[s.value for s in required_states]}"
)
logger.error(error_msg)
raise OperationalStateError(error_msg)
logger.info(f"Executing {method_name} from state {current_state.value}")
if rules['pre_transition'] is not None:
logger.debug(f"Pre-transition: {current_state.value} -> {rules['pre_transition'].value}")
self._set_operational_state(rules['pre_transition'])
success = False
result = None
error = None
try:
result = await original_method(self, *args, **kwargs)
if isinstance(result, bool):
success = result
else:
success = True
except Exception as e:
logger.error(f"{method_name} failed with exception: {e}", exc_info=True)
success = False
error = e
if success and rules['success_transition'] is not None:
new_state = rules['success_transition']
logger.info(f"Success transition: {self.get_operational_state().value} -> {new_state.value}")
if rules.get('reset_counter', False):
logger.debug(f"Resetting operation counter (was {self.get_operation_counter()})")
self._reset_operation_counter()
self._set_operational_state(new_state)
elif not success and rules['failure_transition'] is not None:
new_state = rules['failure_transition']
logger.warning(f"Failure transition: {self.get_operational_state().value} -> {new_state.value}")
self._set_operational_state(new_state)
if error is not None:
raise error
logger.info(f"{method_name} completed. Current state: {self.get_operational_state().value}")
return result
return async_wrapper
@functools.wraps(original_method)
def wrapper(self, *args, **kwargs) -> Any:
# Get current operational state
current_state = self.get_operational_state()
# Validate pre-condition
required_states = rules['required_states']
if current_state not in required_states:
error_msg = (
f"Cannot call {method_name} in state {current_state.value}. "
f"Required states: {[s.value for s in required_states]}"
)
logger.error(error_msg)
raise OperationalStateError(error_msg)
logger.info(f"Executing {method_name} from state {current_state.value}")
# Apply pre-transition if defined
if rules['pre_transition'] is not None:
logger.debug(f"Pre-transition: {current_state.value} -> {rules['pre_transition'].value}")
self._set_operational_state(rules['pre_transition'])
# Execute user method
success = False
result = None
error = None
try:
result = original_method(self, *args, **kwargs)
# Determine success based on return value
# If method returns bool, use it; otherwise assume success
if isinstance(result, bool):
success = result
else:
success = True
except Exception as e:
logger.error(f"{method_name} failed with exception: {e}", exc_info=True)
success = False
error = e
# Apply post-transition based on success/failure
if success and rules['success_transition'] is not None:
new_state = rules['success_transition']
logger.info(f"Success transition: {self.get_operational_state().value} -> {new_state.value}")
# Reset operation counter if specified
if rules.get('reset_counter', False):
logger.debug(f"Resetting operation counter (was {self.get_operation_counter()})")
self._reset_operation_counter()
self._set_operational_state(new_state)
elif not success and rules['failure_transition'] is not None:
new_state = rules['failure_transition']
logger.warning(f"Failure transition: {self.get_operational_state().value} -> {new_state.value}")
self._set_operational_state(new_state)
# Re-raise exception if occurred
if error is not None:
raise error
logger.info(f"{method_name} completed. Current state: {self.get_operational_state().value}")
return result
return wrapper