Nico
Published on 2025-10-11 / 3 Visits
0
0

python 状态机demo

好的,下面给出在前面“定时器驱动”和“事件驱动”两种方案中,将“停止位”传感改为信号量 StopPoint(True=找到,False=未找到)的实现示例。你只需要把原来的 at_stop_position 替换为 StopPoint,并在验证时读取该信号即可。

一、定时器驱动版本(基于周期检查 StopPoint)

  • 要点:
    • Hardware 中维护 StopPoint: bool
    • 电机动作命令不阻塞;StopPoint 的更新由硬件层或模拟逻辑设置
    • sampling 状态下定期检查 StopPoint;True 则进入 feeding,否则继续下发找停止位命令

示例代码:

from transitions import Machine, State
import threading
import time

class Scheduler:
    def __init__(self):
        self.tasks = set()

    def schedule_once(self, delay_s, fn):
        t = threading.Timer(delay_s, fn)
        t.start()
        self.tasks.add(t)
        return t

    def schedule_periodic(self, interval_s, fn, stop_event: threading.Event):
        def loop():
            while not stop_event.is_set():
                fn()
                time.sleep(interval_s)
        t = threading.Thread(target=loop, daemon=True)
        t.start()
        return t

    def cancel_all(self):
        for t in list(self.tasks):
            try:
                t.cancel()
            except Exception:
                pass
            finally:
                self.tasks.discard(t)

class Hardware:
    def __init__(self):
        self.operation = False
        self.valve_open = False
        self.current_pressure = 0.0
        self.target_pressure = 0.6
        self.StopPoint = False         # 信号量:True=找到停止位,False=未找到
        self.target_rpm = 120

    def open_valve(self):
        print("[HW] 打开气压阀")
        self.valve_open = True

    def close_valve(self):
        print("[HW] 关闭气压阀")
        self.valve_open = False

    def read_pressure(self):
        if self.valve_open:
            self.current_pressure = min(self.target_pressure, self.current_pressure + 0.1)
        else:
            self.current_pressure = max(0.0, self.current_pressure - 0.05)
        print(f"[HW] 压力={self.current_pressure:.2f}")
        return self.current_pressure

    # 电机相关命令均为非阻塞,下发后由硬件更新 StopPoint
    def command_motor_find_stop(self, direction='cw'):
        print(f"[HW] 寻找停止位({direction}一圈)命令下发")
        # 真实硬件:一圈完成后由传感器置 StopPoint
        # 示例模拟:稍后由外部/测试代码设置 StopPoint=True

    def command_motor_reverse_one_turn(self):
        print("[HW] 反转一圈命令下发")
        # 真实硬件:一圈完成后根据传感器置 StopPoint
        # 示例模拟:稍后由外部/测试代码设置 StopPoint=True/False

    def command_motor_feed_start(self, rpm):
        print(f"[HW] 喂料开始,RPM={rpm}")

    def command_motor_feed_stop(self):
        print("[HW] 喂料停止")

class Controller:
    states = [
        State(name='init', on_enter=['do_init'], on_exit=['cleanup_timers']),
        State(name='idle', on_enter=['on_enter_idle'], on_exit=['cleanup_timers']),
        State(name='sampling', on_enter=['on_enter_sampling'], on_exit=['cleanup_timers']),
        State(name='feeding', on_enter=['on_enter_feeding'], on_exit=['cleanup_timers'])
    ]

    def __init__(self, hw: Hardware, sched: Scheduler):
        self.hw = hw
        self.sched = sched
        self.periodic_stop = threading.Event()
        self.machine = Machine(model=self, states=Controller.states, initial='init', ignore_invalid_triggers=True)

        self.machine.add_transition('start', 'init', 'idle')
        self.machine.add_transition('to_sampling', ['idle', 'feeding'], 'sampling')
        self.machine.add_transition('to_feeding', 'sampling', 'feeding')
        self.machine.add_transition('to_idle', ['sampling', 'feeding'], 'idle')

    def cleanup_timers(self):
        print("[FSM] 清理定时器")
        self.periodic_stop.set()
        self.periodic_stop = threading.Event()
        self.sched.cancel_all()

    def do_init(self):
        print("[FSM] init:初始化寻找停止位")
        self.hw.command_motor_find_stop(direction='cw')
        self.start()

    def on_enter_idle(self):
        print("[FSM] idle:等待 operation=True")
        self.hw.close_valve()

        def check_op():
            if self.hw.operation:
                self.hw.open_valve()
                # 气压周期检查
                def check_pressure():
                    if self.hw.read_pressure() >= self.hw.target_pressure:
                        print("[FSM] 气压达标 -> sampling")
                        self.to_sampling()
                self.sched.schedule_periodic(0.1, check_pressure, self.periodic_stop)

        check_op()
        self.sched.schedule_periodic(0.2, check_op, self.periodic_stop)

    def on_enter_sampling(self):
        print("[FSM] sampling:反转一圈并检查 StopPoint")
        if not self.hw.operation:
            self.to_idle()
            return

        self.hw.command_motor_reverse_one_turn()

        def verify_stop():
            # 使用 StopPoint 信号量替代传感标志
            if self.hw.StopPoint is True:
                print("[FSM] StopPoint=True -> feeding")
                self.to_feeding()
            else:
                print("[FSM] StopPoint=False,继续寻找停止位")
                self.hw.command_motor_find_stop('cw')
                # 保持周期检查,直到 StopPoint=True

        self.sched.schedule_periodic(0.2, verify_stop, self.periodic_stop)

    def on_enter_feeding(self):
        print("[FSM] feeding:喂料10s")
        if not self.hw.operation:
            self.to_idle()
            return

        self.hw.command_motor_feed_start(self.hw.target_rpm)

        def finish_feed():
            self.hw.command_motor_feed_stop()
            if self.hw.operation:
                # 喂料结束后进入取样,形成循环
                self.to_sampling()
            else:
                self.to_idle()

        self.sched.schedule_once(10.0, finish_feed)

# 示例运行(StopPoint 的设置仅为模拟)
if __name__ == "__main__":
    hw = Hardware()
    sched = Scheduler()
    ctl = Controller(hw, sched)

    # 启动 operation
    hw.operation = True

    # 模拟:1秒后气压达标,进入 sampling;随后 0.5 秒后置 StopPoint=True
    threading.Timer(1.0, lambda: setattr(hw, 'current_pressure', hw.target_pressure)).start()
    threading.Timer(1.5, lambda: setattr(hw, 'StopPoint', True)).start()

    # 15秒后关闭操作
    threading.Timer(15.0, lambda: setattr(hw, 'operation', False)).start()
    time.sleep(20)

二、事件驱动版本(StopPoint 作为事件来源)

  • 要点:
    • StopPoint 的变化通过事件总线发出 stop_point_changed(True/False)
    • sampling 状态下等待事件;True 则进入 feeding;False 则继续发找停止位命令

示例代码:

from transitions import Machine, State
import threading

class EventBus:
    def __init__(self):
        self.handlers = {}

    def on(self, event_name, handler):
        self.handlers.setdefault(event_name, []).append(handler)

    def emit(self, event_name, **kwargs):
        for h in self.handlers.get(event_name, []):
            h(**kwargs)

class Hardware:
    def __init__(self, bus: EventBus):
        self.bus = bus
        self.operation = False
        self.valve_open = False
        self.current_pressure = 0.0
        self.target_pressure = 0.6
        self.StopPoint = False
        self.target_rpm = 120

    def open_valve(self):
        self.valve_open = True
        # 真实系统:压力由传感器上报;这里直接达标并发事件
        self.current_pressure = self.target_pressure
        self.bus.emit('pressure_updated', value=self.current_pressure)

    def close_valve(self):
        self.valve_open = False
        self.current_pressure = 0.0
        self.bus.emit('pressure_updated', value=self.current_pressure)

    def command_motor_find_stop(self, direction='cw'):
        print(f"[HW] 命令:寻找停止位({direction})")
        # 真实系统:旋转完成后由传感器发 StopPoint 事件
        # 示例:不立即发事件,等待外部模拟

    def command_motor_reverse_one_turn(self):
        print("[HW] 命令:反转一圈")
        # 真实系统:一圈后由传感器发 StopPoint 事件
        # 示例:等待外部模拟

    def command_motor_feed_start(self, rpm, duration_s=10):
        print(f"[HW] 喂料开始 RPM={rpm}")
        threading.Timer(duration_s, lambda: self.bus.emit('feed_finished')).start()

    def set_stop_point(self, value: bool):
        self.StopPoint = value
        self.bus.emit('stop_point_changed', value=value)

class Controller:
    states = [
        State(name='init', on_enter=['do_init']),
        State(name='idle', on_enter=['on_enter_idle']),
        State(name='sampling', on_enter=['on_enter_sampling']),
        State(name='feeding', on_enter=['on_enter_feeding'])
    ]

    def __init__(self, hw: Hardware, bus: EventBus):
        self.hw = hw
        self.bus = bus
        self.machine = Machine(model=self, states=Controller.states, initial='init', ignore_invalid_triggers=True)

        self.machine.add_transition('start', 'init', 'idle')
        self.machine.add_transition('to_sampling', ['idle', 'feeding'], 'sampling')
        self.machine.add_transition('to_feeding', 'sampling', 'feeding')
        self.machine.add_transition('to_idle', ['sampling', 'feeding'], 'idle')

        # 事件绑定
        bus.on('pressure_updated', self.on_pressure_updated)
        bus.on('stop_point_changed', self.on_stop_point_changed)
        bus.on('feed_finished', self.on_feed_finished)

    def do_init(self):
        print("[FSM] init:寻找停止位")
        self.hw.command_motor_find_stop('cw')
        self.start()

    def on_enter_idle(self):
        print("[FSM] idle:等待 operation=True")
        self.hw.close_valve()

    def start_operation(self):
        if self.state == 'idle':
            self.hw.open_valve()
            # 等待 pressure_updated 事件达到目标压力

    def on_pressure_updated(self, value):
        print(f"[Event] 压力={value}")
        if self.state == 'idle' and self.hw.operation and value >= self.hw.target_pressure:
            self.to_sampling()

    def on_enter_sampling(self):
        print("[FSM] sampling:反转一圈,等待 StopPoint 事件")
        if not self.hw.operation:
            self.to_idle()
            return
        self.hw.command_motor_reverse_one_turn()
        # 等待 stop_point_changed 事件

    def on_stop_point_changed(self, value: bool):
        print(f"[Event] StopPoint={value}")
        if self.state != 'sampling':
            # 在非 sampling 状态收到事件,可能是噪声或历史事件,忽略
            return
        if value is True:
            self.to_feeding()
        else:
            # 未找到停止位,继续寻找
            self.hw.command_motor_find_stop('cw')

    def on_enter_feeding(self):
        print("[FSM] feeding:喂料10s")
        if not self.hw.operation:
            self.to_idle()
            return
        self.hw.command_motor_feed_start(self.hw.target_rpm, duration_s=10)

    def on_feed_finished(self):
        print("[Event] 喂料完成")
        if self.hw.operation:
            self.to_sampling()
        else:
            self.to_idle()

# 示例运行
if __name__ == "__main__":
    bus = EventBus()
    hw = Hardware(bus)
    ctl = Controller(hw, bus)

    # 启动 operation
    hw.operation = True
    ctl.start_operation()

    # 模拟:2秒后 StopPoint=True
    threading.Timer(2.0, lambda: hw.set_stop_point(True)).start()

    # 12秒后 StopPoint=False(下次取样时会促使继续找停止位)
    threading.Timer(12.0, lambda: hw.set_stop_point(False)).start()

    # 20秒后停止
    threading.Timer(20.0, lambda: setattr(hw, 'operation', False)).start()

    import time
    time.sleep(25)

实战提示:

  • StopPoint 的来源应是硬件传感器或编码器到位信号,请确保:
    • 信号消抖与边沿触发(避免短暂毛刺导致误判)。
    • 在事件驱动版本中,保证事件在同一线程串行执行或在调用状态机前加锁,防止竞态。
    • 当进入 feeding 或 idle 时,考虑复位 StopPoint(例如在进入 feeding 时置 False),避免使用旧值影响下一次取样。
  • 故障与超时:
    • 长时间 StopPoint=False 时,应进入 fault 状态或报警,避免无限旋转。
    • 气压长时间不达标也应超时处理。
  • 如果 transitions 与事件在多线程运行,建议将所有状态迁移调用串行化(例如用一个队列把事件回调投递到状态机线程)。

Comment