66 lines
2.2 KiB
Python
66 lines
2.2 KiB
Python
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
from dataclasses import dataclass
|
||
|
|
from enum import StrEnum
|
||
|
|
|
||
|
|
|
||
|
|
class MachineState(StrEnum):
|
||
|
|
IDLE = "IDLE"
|
||
|
|
JOB_LOADED = "JOB_LOADED"
|
||
|
|
READY = "READY"
|
||
|
|
RUNNING = "RUNNING"
|
||
|
|
PAUSED = "PAUSED"
|
||
|
|
ABORTING = "ABORTING"
|
||
|
|
COMPLETED = "COMPLETED"
|
||
|
|
ABORTED = "ABORTED"
|
||
|
|
FAULTED = "FAULTED"
|
||
|
|
MANUAL = "MANUAL"
|
||
|
|
|
||
|
|
|
||
|
|
TRANSITIONS: dict[tuple[MachineState, str], MachineState] = {
|
||
|
|
(MachineState.IDLE, "load_job"): MachineState.JOB_LOADED,
|
||
|
|
(MachineState.IDLE, "enter_manual"): MachineState.MANUAL,
|
||
|
|
(MachineState.JOB_LOADED, "operator_acknowledge"): MachineState.READY,
|
||
|
|
(MachineState.JOB_LOADED, "unload_job"): MachineState.IDLE,
|
||
|
|
(MachineState.READY, "start"): MachineState.RUNNING,
|
||
|
|
(MachineState.RUNNING, "pause"): MachineState.PAUSED,
|
||
|
|
(MachineState.RUNNING, "segment_complete"): MachineState.RUNNING,
|
||
|
|
(MachineState.RUNNING, "all_segments_complete"): MachineState.COMPLETED,
|
||
|
|
(MachineState.RUNNING, "abort"): MachineState.ABORTING,
|
||
|
|
(MachineState.RUNNING, "fault"): MachineState.FAULTED,
|
||
|
|
(MachineState.PAUSED, "resume"): MachineState.RUNNING,
|
||
|
|
(MachineState.PAUSED, "abort"): MachineState.ABORTING,
|
||
|
|
(MachineState.MANUAL, "exit_manual"): MachineState.IDLE,
|
||
|
|
(MachineState.MANUAL, "fault"): MachineState.FAULTED,
|
||
|
|
(MachineState.ABORTING, "abort_complete"): MachineState.ABORTED,
|
||
|
|
(MachineState.COMPLETED, "reset"): MachineState.IDLE,
|
||
|
|
(MachineState.ABORTED, "reset"): MachineState.IDLE,
|
||
|
|
(MachineState.FAULTED, "reset"): MachineState.IDLE,
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
class IllegalTransitionError(RuntimeError):
|
||
|
|
pass
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass
|
||
|
|
class TransitionEvent:
|
||
|
|
from_state: MachineState
|
||
|
|
trigger: str
|
||
|
|
to_state: MachineState
|
||
|
|
|
||
|
|
|
||
|
|
class StateMachine:
|
||
|
|
def __init__(self, initial: MachineState = MachineState.IDLE) -> None:
|
||
|
|
self.state = initial
|
||
|
|
self.events: list[TransitionEvent] = []
|
||
|
|
|
||
|
|
def trigger(self, name: str) -> MachineState:
|
||
|
|
key = (self.state, name)
|
||
|
|
if key not in TRANSITIONS:
|
||
|
|
raise IllegalTransitionError(f"Illegal transition: {self.state} + {name}")
|
||
|
|
previous = self.state
|
||
|
|
self.state = TRANSITIONS[key]
|
||
|
|
self.events.append(TransitionEvent(previous, name, self.state))
|
||
|
|
return self.state
|