docs: scaffold polisher-control foundation
This commit is contained in:
65
host/polisher_control/state_machine.py
Normal file
65
host/polisher_control/state_machine.py
Normal file
@@ -0,0 +1,65 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from enum import StrEnum
|
||||
|
||||
|
||||
class MachineState(StrEnum):
|
||||
IDLE = "IDLE"
|
||||
JOB_LOADED = "JOB_LOADED"
|
||||
READY = "READY"
|
||||
RUNNING = "RUNNING"
|
||||
PAUSED = "PAUSED"
|
||||
ABORTING = "ABORTING"
|
||||
COMPLETED = "COMPLETED"
|
||||
ABORTED = "ABORTED"
|
||||
FAULTED = "FAULTED"
|
||||
MANUAL = "MANUAL"
|
||||
|
||||
|
||||
TRANSITIONS: dict[tuple[MachineState, str], MachineState] = {
|
||||
(MachineState.IDLE, "load_job"): MachineState.JOB_LOADED,
|
||||
(MachineState.IDLE, "enter_manual"): MachineState.MANUAL,
|
||||
(MachineState.JOB_LOADED, "operator_acknowledge"): MachineState.READY,
|
||||
(MachineState.JOB_LOADED, "unload_job"): MachineState.IDLE,
|
||||
(MachineState.READY, "start"): MachineState.RUNNING,
|
||||
(MachineState.RUNNING, "pause"): MachineState.PAUSED,
|
||||
(MachineState.RUNNING, "segment_complete"): MachineState.RUNNING,
|
||||
(MachineState.RUNNING, "all_segments_complete"): MachineState.COMPLETED,
|
||||
(MachineState.RUNNING, "abort"): MachineState.ABORTING,
|
||||
(MachineState.RUNNING, "fault"): MachineState.FAULTED,
|
||||
(MachineState.PAUSED, "resume"): MachineState.RUNNING,
|
||||
(MachineState.PAUSED, "abort"): MachineState.ABORTING,
|
||||
(MachineState.MANUAL, "exit_manual"): MachineState.IDLE,
|
||||
(MachineState.MANUAL, "fault"): MachineState.FAULTED,
|
||||
(MachineState.ABORTING, "abort_complete"): MachineState.ABORTED,
|
||||
(MachineState.COMPLETED, "reset"): MachineState.IDLE,
|
||||
(MachineState.ABORTED, "reset"): MachineState.IDLE,
|
||||
(MachineState.FAULTED, "reset"): MachineState.IDLE,
|
||||
}
|
||||
|
||||
|
||||
class IllegalTransitionError(RuntimeError):
|
||||
pass
|
||||
|
||||
|
||||
@dataclass
|
||||
class TransitionEvent:
|
||||
from_state: MachineState
|
||||
trigger: str
|
||||
to_state: MachineState
|
||||
|
||||
|
||||
class StateMachine:
|
||||
def __init__(self, initial: MachineState = MachineState.IDLE) -> None:
|
||||
self.state = initial
|
||||
self.events: list[TransitionEvent] = []
|
||||
|
||||
def trigger(self, name: str) -> MachineState:
|
||||
key = (self.state, name)
|
||||
if key not in TRANSITIONS:
|
||||
raise IllegalTransitionError(f"Illegal transition: {self.state} + {name}")
|
||||
previous = self.state
|
||||
self.state = TRANSITIONS[key]
|
||||
self.events.append(TransitionEvent(previous, name, self.state))
|
||||
return self.state
|
||||
Reference in New Issue
Block a user