让计划自由启停:跨进程任务同步的 Redis 实践

让计划自由启停:跨进程任务同步的 Redis 实践

让计划自由启停:跨进程任务同步的 Redis 实践

当 WebUI 和调度器各走各的路,谁来牵线搭桥?

「量化实战手记」— 记录从想法到落地的真实开发历程

引言:一个被忽视的断层

量化系统通常由多个进程协作运行。Web 服务负责用户交互,后台引擎负责定时任务执行。用户在前端轻轻一点「停止计划」,期待一切随之安静下来——但事实并非如此。

在我们的系统中,Flask API 处理 WebUI 请求,APScheduler 在另一个进程里默默驱动选股、做T、终止检查三类定时任务。两者通过 MongoDB 共享计划状态,却对彼此的运行时状态一无所知。

用户停掉一个计划,MongoDB 里状态确实变成了 STOPPED。但 APScheduler 进程里,那个计划的选股 cron 任务还在忠实地每隔一段时间触发一次——像一台没人关掉的机器,在空荡荡的工厂里反复运转。

第一章:问题全貌

先来看看 APScheduler 中注册了哪些任务,它们和计划状态的关系是什么:

任务 ID 引擎 触发方式 按计划注册?
termination_checker 终止条件检查 全局 interval 5min
t_trading_engine 做T引擎 全局 interval 3min
screening_{plan_id} 选股调度 每个计划一个 cron

前两个是全局任务,每次运行时从数据库查询 ACTIVE 状态的计划,天然过滤掉了已停止的计划。它们不受影响。

真正的问题是第三类:选股 cron 任务。每个活跃计划在启动时注册一个 cron job,绑定到计划 ID。计划被停掉后,cron job 依然存在,每隔设定时间就会触发。

这就是「断层」:状态变更写入了数据库,却没有传播到调度器的运行时。数据库说「停了」,调度器说「我还在」。

原则:共享存储(数据库)只能保证数据一致,无法保证运行时一致。进程间状态的同步,需要专门的通信机制。

第二章:方案选择

怎么让两个互不相识的进程「对话」?常见方案有三:

方案一:轮询。Scheduler 进程定期扫描数据库,发现状态变更就调整任务。实现简单,但有延迟——轮询间隔内,任务已经在空跑了。

方案二:Redis pub/sub。WebUI 发布一条消息,Scheduler 实时收到并处理。零延迟,实现也不复杂。

方案三:消息队列。用 RabbitMQ 或 Kafka 做事件总线。功能强大,但引入新中间件,对这个问题来说过重了。

我们的系统已经重度依赖 Redis——行情推送、订单队列、去重缓存都在用它。Redis pub/sub 是自然选择:不引入新依赖,实时性好,实现简洁。

原则:选择通信方案时,优先复用已有基础设施。引入新中间件的成本,往往比方案本身的优劣更重要。

第三章:核心实现

整个方案的核心是三个动作:发布、订阅、回调。我们逐一实现。

事件总线:发布与订阅

新增 plan_event_bus.py,两个角色:publish_plan_status_changed 给发布端用,PlanEventSubscriber 给订阅端用。

发布端只需要一行代码——往 Redis 频道塞一条 JSON:

CHANNEL = "plan:status_changed"

def publish_plan_status_changed(plan_id: str, status: str) -> None:
    payload = json.dumps({"plan_id": plan_id, "status": status})
    redis_db.publish(CHANNEL, payload)

订阅端跑在守护线程里,阻塞式监听 Redis pubsub,收到消息就调用回调函数:

class PlanEventSubscriber:
    def __init__(self, callback: Callable[[str, str], None]) -> None:
        self._callback = callback
        self._stop_event = threading.Event()

    def _listen_loop(self) -> None:
        pubsub = redis_db.pubsub()
        pubsub.subscribe(CHANNEL)
        for message in pubsub.listen():
            if self._stop_event.is_set():
                break
            if message["type"] != "message":
                continue
            data = json.loads(message["data"])
            self._callback(data["plan_id"], data["status"])

为什么要用守护线程?因为 pubsub.listen() 是阻塞调用,不能放在 APScheduler 的工作线程里——会卡住整个调度器。

回调:增删 cron job

回调逻辑很直接——收到 ACTIVE 就注册,收到 STOPPED 就移除:

def _on_plan_status_changed(self, plan_id: str, status: str) -> None:
    if status == PlanStatus.ACTIVE.value:
        self.screening_scheduler.register_one(self.scheduler, plan_id)
    elif status == PlanStatus.STOPPED.value:
        self.screening_scheduler.unregister_one(self.scheduler, plan_id)

register_one 从数据库读取计划配置,检查是否有选股策略和 cron 表达式,满足条件就注册。unregister_one 更简单——按 job ID 移除即可。

发布端接入

WebUI 和 CLI 都是计划的操控入口。状态变更成功后,紧接着发布一条事件:

# routes.py - WebUI 端点
plan.transition_to(target_status, reason)
plan.save()
publish_plan_status_changed(plan.id, target_status.value)  # 新增

# plan_commands.py - CLI 命令
plan.transition_to(PlanStatus.STOPPED, reason)
plan.save()
publish_plan_status_changed(plan.id, PlanStatus.STOPPED.value)  # 新增

三行代码,把「状态已变更」这个事实广播出去。

原则:事件发布应该紧跟状态变更,作为同一操作的最后一步。先写库,再发事件——确保订阅者收到消息时,数据库已经是最新状态。

第四章:防御性设计

跨进程通信天然不可靠。Redis 可能断连,消息可能丢失,发布端可能比订阅端先启动。怎么兜底?

第一层:状态守卫

即使 cron job 没被移除,执行时也要检查计划状态。在 _run_for_plan_async 中加了守卫:

if plan.status != PlanStatus.ACTIVE:
    return ScreeningRunResult(
        plan_id=plan_id, plan_name=plan.name,
        errors=[f"计划状态 {plan.status.value},跳过"],
    )

这保证了一个最坏情况:即使 Redis 消息丢了,cron 触发后查到 STOPPED 状态,直接跳过,不会产生实际副作用。

第二层:终止时主动移除

系统的 TerminationChecker 每 5 分钟扫描一次活跃计划,检查终止条件(亏损超限、过期等)。触发终止时,除了更新数据库状态,也主动移除 APScheduler 中对应的 cron job:

def _terminate(self, plan, reason):
    plan.transition_to(PlanStatus.STOPPED, reason=reason)
    plan.save()
    # 主动移除 cron job(进程内,直接操作 scheduler)
    if self.scheduler is not None:
        job_id = f"screening_{plan.id[:8]}"
        self.scheduler.remove_job(job_id)

这条路径是进程内的,不经过 Redis,所以天然可靠。

两层防御形成互补:主动移除负责正常路径,状态守卫兜底异常路径。

原则:跨进程通信永远要假设「消息可能丢失」。正路走发布订阅,岔路走状态守卫——双保险比单依赖可靠。

第五章:启动与关闭

整个事件系统在 PlanExecutorApp 中完成组装。启动时注册订阅线程,关闭时优雅停止:

class PlanExecutorApp:
    def __init__(self):
        self.scheduler = BlockingScheduler(...)
        self.subscriber = PlanEventSubscriber(self._on_plan_status_changed)

    def run(self):
        self.setup()
        self.subscriber.start()             # 启动订阅守护线程
        atexit.register(self.subscriber.stop)  # 退出时停止订阅
        atexit.register(lambda: self.scheduler.shutdown(wait=False))
        self.scheduler.start()              # 阻塞

订阅线程设为 daemon=True,主进程退出时自动回收。即使 atexit 没触发,也不会留下僵尸线程。

原则:守护线程配 atexit 双保险。deamon 保证异常退出时不会卡住进程,atexit 保证正常退出时优雅清理。

总结

这次改动涉及 6 个文件,核心逻辑不到 100 行:

文件 改动
plan_event_bus.py 新增:Redis pub/sub 事件总线
screening_scheduler.py 重构 register_all,新增 register_one / unregister_one
termination_checker.py 终止时移除 cron job
routes.py WebUI 状态变更后发布事件
plan_commands.py CLI 状态变更后发布事件
executor.py 集成订阅线程,回调增删 cron

回顾整个过程,有三个设计决策值得记录:

第一,只同步需要同步的。全局任务(做T引擎、终止检查)天然过滤,不需要跨进程同步。只对「按计划注册」的选股 cron 做增删,避免过度设计。

第二,发布紧跟变更。事件发布是状态变更操作的最后一步,确保订阅者收到消息时数据已经落库。

第三,消息可丢,逻辑不能错。Redis pub/sub 不保证送达,所以每个执行入口都加了状态守卫。消息到了是锦上添花,消息没到也不会出事。

核心原则:跨进程状态同步 = 消息通道(尽力而为) + 状态守卫(兜底保底)。前者追求实时性,后者保证正确性。

附录:技术细节

如果你关心实现细节,这里是一些关键技术点:

Redis pub/sub vs Stream:pub/sub 是「发后即忘」,不持久化、不重放。Stream 支持持久化和消费组,但对这个场景太重了。选择 pub/sub 是因为:消息丢了有状态守卫兜底,不需要「恰好一次」语义。

pubsub.listen() 的线程安全:redis-py 的 pubsub 对象不是线程安全的。PlanEventSubscriberlisten() 放在独立线程中,通过 threading.Event 控制停止,避免与 APScheduler 的工作线程冲突。

job_id 截断:使用 plan.id[:8] 生成 job ID。这是因为 APScheduler 的 job ID 不宜过长,8 位十六进制足够区分不同计划,冲突概率可忽略。

浙ICP备2026022231号-1      浙公网安备33011002019439号