PySide6: moveToThread的周期工作者
moveToThread的周期工作者import sys from PySide6.QtCore import QObject, Signal, QTimer, Slot, QThread from PySide6.QtWidgets import QApplication, QPushButton class ThreadWorker(QObject): _signal_start Signal() # 开始运行的信号 _signal_stop Signal() # 停止信号 _signal_run_now Signal() # 打破定时器周期立即运行一次 def __init__(self, target, looping, loop_interval0, *args, **kwargs): :param target: 需要在子线程中运行的目标函数 :param looping: 是周期运行还是一次性任务 :param loop_interval: 周期运行的间隙时间 :param args: 附带参数 :param kwargs: super().__init__() self._loop_timer None # 循环运行定时器 self.looping looping # 是否循环运行 self.loop_interval loop_interval # 循环运行间隔 self.target target # 目标函数 self.args args self.kwargs kwargs self.started False # 已经开始 self._signal_start.connect(self._start) self._signal_stop.connect(self._stop) self._signal_run_now.connect(self._run_target) # 内部接口在子线程内用槽函数操作定时器 Slot() def _start(self): self._run_target() if self.looping: if self._loop_timer is None: self._loop_timer QTimer() self._loop_timer.setInterval(self.loop_interval) self._loop_timer.timeout.connect(self._run_target) self._loop_timer.start() # 内部接口在子线程内用槽函数操作定时器 Slot() def _stop(self): if self._loop_timer is not None: self._loop_timer.stop() # 外部接口在调用线程内发射信号 def start(self): self.started True self._signal_start.emit() # 外部接口在调用线程内发射信号 def stop(self): self.started False self._signal_stop.emit() # 外部接口在调用线程内发射信号 def run_now(self): self._signal_run_now.emit() # 槽函数运行目标函数 Slot() def _run_target(self): if self._loop_timer is not None: self._loop_timer.stop() # 停止定时器防止信号堆积 if self.started: self.target(*self.args, **self.kwargs) if self._loop_timer is not None: self._loop_timer.start() # 重启定时器 def f_once(): print(run once) def f_timer_loop(): print(run loop) if __name__ __main__: app QApplication(sys.argv) w1 ThreadWorker(targetf_once, loopingFalse) t1 QThread() w1.moveToThread(t1) w1.start() t1.start() w2 ThreadWorker(targetf_timer_loop, loopingTrue, loop_interval2000) t2 QThread() w2.moveToThread(t2) w2.start() t2.start() b QPushButton(run_once) b.clicked.connect(w1.run_now) b.show() sys.exit(app.exec())