好的,下面给出在前面“定时器驱动”和“事件驱动”两种方案中,将“停止位”传感改为信号量 StopPoint(True=找到,False=未找到)的实现示例。你只需要把原来的 at_stop_position 替换为 StopPoint,并在验证时读取该信号即可。
一、定时器驱动版本(基于周期检查 StopPoint)
- 要点:
- Hardware 中维护 StopPoint: bool
- 电机动作命令不阻塞;StopPoint 的更新由硬件层或模拟逻辑设置
- sampling 状态下定期检查 StopPoint;True 则进入 feeding,否则继续下发找停止位命令
示例代码:
from transitions import Machine, State
import threading
import time
class Scheduler:
def __init__(self):
self.tasks = set()
def schedule_once(self, delay_s, fn):
t = threading.Timer(delay_s, fn)
t.start()
self.tasks.add(t)
return t
def schedule_periodic(self, interval_s, fn, stop_event: threading.Event):
def loop():
while not stop_event.is_set():
fn()
time.sleep(interval_s)
t = threading.Thread(target=loop, daemon=True)
t.start()
return t
def cancel_all(self):
for t in list(self.tasks):
try:
t.cancel()
except Exception:
pass
finally:
self.tasks.discard(t)
class Hardware:
def __init__(self):
self.operation = False
self.valve_open = False
self.current_pressure = 0.0
self.target_pressure = 0.6
self.StopPoint = False # 信号量:True=找到停止位,False=未找到
self.target_rpm = 120
def open_valve(self):
print("[HW] 打开气压阀")
self.valve_open = True
def close_valve(self):
print("[HW] 关闭气压阀")
self.valve_open = False
def read_pressure(self):
if self.valve_open:
self.current_pressure = min(self.target_pressure, self.current_pressure + 0.1)
else:
self.current_pressure = max(0.0, self.current_pressure - 0.05)
print(f"[HW] 压力={self.current_pressure:.2f}")
return self.current_pressure
# 电机相关命令均为非阻塞,下发后由硬件更新 StopPoint
def command_motor_find_stop(self, direction='cw'):
print(f"[HW] 寻找停止位({direction}一圈)命令下发")
# 真实硬件:一圈完成后由传感器置 StopPoint
# 示例模拟:稍后由外部/测试代码设置 StopPoint=True
def command_motor_reverse_one_turn(self):
print("[HW] 反转一圈命令下发")
# 真实硬件:一圈完成后根据传感器置 StopPoint
# 示例模拟:稍后由外部/测试代码设置 StopPoint=True/False
def command_motor_feed_start(self, rpm):
print(f"[HW] 喂料开始,RPM={rpm}")
def command_motor_feed_stop(self):
print("[HW] 喂料停止")
class Controller:
states = [
State(name='init', on_enter=['do_init'], on_exit=['cleanup_timers']),
State(name='idle', on_enter=['on_enter_idle'], on_exit=['cleanup_timers']),
State(name='sampling', on_enter=['on_enter_sampling'], on_exit=['cleanup_timers']),
State(name='feeding', on_enter=['on_enter_feeding'], on_exit=['cleanup_timers'])
]
def __init__(self, hw: Hardware, sched: Scheduler):
self.hw = hw
self.sched = sched
self.periodic_stop = threading.Event()
self.machine = Machine(model=self, states=Controller.states, initial='init', ignore_invalid_triggers=True)
self.machine.add_transition('start', 'init', 'idle')
self.machine.add_transition('to_sampling', ['idle', 'feeding'], 'sampling')
self.machine.add_transition('to_feeding', 'sampling', 'feeding')
self.machine.add_transition('to_idle', ['sampling', 'feeding'], 'idle')
def cleanup_timers(self):
print("[FSM] 清理定时器")
self.periodic_stop.set()
self.periodic_stop = threading.Event()
self.sched.cancel_all()
def do_init(self):
print("[FSM] init:初始化寻找停止位")
self.hw.command_motor_find_stop(direction='cw')
self.start()
def on_enter_idle(self):
print("[FSM] idle:等待 operation=True")
self.hw.close_valve()
def check_op():
if self.hw.operation:
self.hw.open_valve()
# 气压周期检查
def check_pressure():
if self.hw.read_pressure() >= self.hw.target_pressure:
print("[FSM] 气压达标 -> sampling")
self.to_sampling()
self.sched.schedule_periodic(0.1, check_pressure, self.periodic_stop)
check_op()
self.sched.schedule_periodic(0.2, check_op, self.periodic_stop)
def on_enter_sampling(self):
print("[FSM] sampling:反转一圈并检查 StopPoint")
if not self.hw.operation:
self.to_idle()
return
self.hw.command_motor_reverse_one_turn()
def verify_stop():
# 使用 StopPoint 信号量替代传感标志
if self.hw.StopPoint is True:
print("[FSM] StopPoint=True -> feeding")
self.to_feeding()
else:
print("[FSM] StopPoint=False,继续寻找停止位")
self.hw.command_motor_find_stop('cw')
# 保持周期检查,直到 StopPoint=True
self.sched.schedule_periodic(0.2, verify_stop, self.periodic_stop)
def on_enter_feeding(self):
print("[FSM] feeding:喂料10s")
if not self.hw.operation:
self.to_idle()
return
self.hw.command_motor_feed_start(self.hw.target_rpm)
def finish_feed():
self.hw.command_motor_feed_stop()
if self.hw.operation:
# 喂料结束后进入取样,形成循环
self.to_sampling()
else:
self.to_idle()
self.sched.schedule_once(10.0, finish_feed)
# 示例运行(StopPoint 的设置仅为模拟)
if __name__ == "__main__":
hw = Hardware()
sched = Scheduler()
ctl = Controller(hw, sched)
# 启动 operation
hw.operation = True
# 模拟:1秒后气压达标,进入 sampling;随后 0.5 秒后置 StopPoint=True
threading.Timer(1.0, lambda: setattr(hw, 'current_pressure', hw.target_pressure)).start()
threading.Timer(1.5, lambda: setattr(hw, 'StopPoint', True)).start()
# 15秒后关闭操作
threading.Timer(15.0, lambda: setattr(hw, 'operation', False)).start()
time.sleep(20)
二、事件驱动版本(StopPoint 作为事件来源)
- 要点:
- StopPoint 的变化通过事件总线发出 stop_point_changed(True/False)
- sampling 状态下等待事件;True 则进入 feeding;False 则继续发找停止位命令
示例代码:
from transitions import Machine, State
import threading
class EventBus:
def __init__(self):
self.handlers = {}
def on(self, event_name, handler):
self.handlers.setdefault(event_name, []).append(handler)
def emit(self, event_name, **kwargs):
for h in self.handlers.get(event_name, []):
h(**kwargs)
class Hardware:
def __init__(self, bus: EventBus):
self.bus = bus
self.operation = False
self.valve_open = False
self.current_pressure = 0.0
self.target_pressure = 0.6
self.StopPoint = False
self.target_rpm = 120
def open_valve(self):
self.valve_open = True
# 真实系统:压力由传感器上报;这里直接达标并发事件
self.current_pressure = self.target_pressure
self.bus.emit('pressure_updated', value=self.current_pressure)
def close_valve(self):
self.valve_open = False
self.current_pressure = 0.0
self.bus.emit('pressure_updated', value=self.current_pressure)
def command_motor_find_stop(self, direction='cw'):
print(f"[HW] 命令:寻找停止位({direction})")
# 真实系统:旋转完成后由传感器发 StopPoint 事件
# 示例:不立即发事件,等待外部模拟
def command_motor_reverse_one_turn(self):
print("[HW] 命令:反转一圈")
# 真实系统:一圈后由传感器发 StopPoint 事件
# 示例:等待外部模拟
def command_motor_feed_start(self, rpm, duration_s=10):
print(f"[HW] 喂料开始 RPM={rpm}")
threading.Timer(duration_s, lambda: self.bus.emit('feed_finished')).start()
def set_stop_point(self, value: bool):
self.StopPoint = value
self.bus.emit('stop_point_changed', value=value)
class Controller:
states = [
State(name='init', on_enter=['do_init']),
State(name='idle', on_enter=['on_enter_idle']),
State(name='sampling', on_enter=['on_enter_sampling']),
State(name='feeding', on_enter=['on_enter_feeding'])
]
def __init__(self, hw: Hardware, bus: EventBus):
self.hw = hw
self.bus = bus
self.machine = Machine(model=self, states=Controller.states, initial='init', ignore_invalid_triggers=True)
self.machine.add_transition('start', 'init', 'idle')
self.machine.add_transition('to_sampling', ['idle', 'feeding'], 'sampling')
self.machine.add_transition('to_feeding', 'sampling', 'feeding')
self.machine.add_transition('to_idle', ['sampling', 'feeding'], 'idle')
# 事件绑定
bus.on('pressure_updated', self.on_pressure_updated)
bus.on('stop_point_changed', self.on_stop_point_changed)
bus.on('feed_finished', self.on_feed_finished)
def do_init(self):
print("[FSM] init:寻找停止位")
self.hw.command_motor_find_stop('cw')
self.start()
def on_enter_idle(self):
print("[FSM] idle:等待 operation=True")
self.hw.close_valve()
def start_operation(self):
if self.state == 'idle':
self.hw.open_valve()
# 等待 pressure_updated 事件达到目标压力
def on_pressure_updated(self, value):
print(f"[Event] 压力={value}")
if self.state == 'idle' and self.hw.operation and value >= self.hw.target_pressure:
self.to_sampling()
def on_enter_sampling(self):
print("[FSM] sampling:反转一圈,等待 StopPoint 事件")
if not self.hw.operation:
self.to_idle()
return
self.hw.command_motor_reverse_one_turn()
# 等待 stop_point_changed 事件
def on_stop_point_changed(self, value: bool):
print(f"[Event] StopPoint={value}")
if self.state != 'sampling':
# 在非 sampling 状态收到事件,可能是噪声或历史事件,忽略
return
if value is True:
self.to_feeding()
else:
# 未找到停止位,继续寻找
self.hw.command_motor_find_stop('cw')
def on_enter_feeding(self):
print("[FSM] feeding:喂料10s")
if not self.hw.operation:
self.to_idle()
return
self.hw.command_motor_feed_start(self.hw.target_rpm, duration_s=10)
def on_feed_finished(self):
print("[Event] 喂料完成")
if self.hw.operation:
self.to_sampling()
else:
self.to_idle()
# 示例运行
if __name__ == "__main__":
bus = EventBus()
hw = Hardware(bus)
ctl = Controller(hw, bus)
# 启动 operation
hw.operation = True
ctl.start_operation()
# 模拟:2秒后 StopPoint=True
threading.Timer(2.0, lambda: hw.set_stop_point(True)).start()
# 12秒后 StopPoint=False(下次取样时会促使继续找停止位)
threading.Timer(12.0, lambda: hw.set_stop_point(False)).start()
# 20秒后停止
threading.Timer(20.0, lambda: setattr(hw, 'operation', False)).start()
import time
time.sleep(25)
实战提示:
- StopPoint 的来源应是硬件传感器或编码器到位信号,请确保:
- 信号消抖与边沿触发(避免短暂毛刺导致误判)。
- 在事件驱动版本中,保证事件在同一线程串行执行或在调用状态机前加锁,防止竞态。
- 当进入 feeding 或 idle 时,考虑复位 StopPoint(例如在进入 feeding 时置 False),避免使用旧值影响下一次取样。
- 故障与超时:
- 长时间 StopPoint=False 时,应进入 fault 状态或报警,避免无限旋转。
- 气压长时间不达标也应超时处理。
- 如果 transitions 与事件在多线程运行,建议将所有状态迁移调用串行化(例如用一个队列把事件回调投递到状态机线程)。