投资计划引擎:把策略配置变成自动执行的利器

配置写好了,谁来执行?从「被动等信号」到「主动跑策略」的架构跃迁

投资计划引擎:把策略配置变成自动执行的利器

投资计划引擎:把策略配置变成自动执行的利器

配置写好了,谁来执行?从「被动等信号」到「主动跑策略」的架构跃迁

「量化实战手记」— 记录从想法到落地的真实开发历程

引言:配置与执行的断层

量化系统有一个常见的困境:策略配置写得很漂亮,但没人去执行

我们的投资计划模块正是这种情况。十个强类型的策略配置——选股、开仓、加仓、做T、止损、止盈、仓位管理、资金分配、终止条件、交易时段——每一个都定义了清晰的参数和规则。但在实际运行中,只有开仓、加仓和止损被信号触发时被动执行,其余六个配置躺在数据库里,从来没有代码去读它们。

这就像买了一辆配置齐全的车,但只有方向盘和刹车是接了线的。座椅加热、自动巡航、车道保持——都在配置表上打了勾,实际一个也没生效。

这篇文章记录的,就是把那些"写了但没用"的配置,变成真正自动运行的执行引擎的过程。

第一章:诊断——什么在运行,什么在沉睡

先搞清楚现状。现有的执行链路是这样的:

信号监控 计划路由 风控检查 Redis 下单 ✓ 已运行 ✗ 沉睡中:时段守卫 / 终止检查 / 做T引擎 / 选股调度

信号到来时,系统会做开仓判断、加仓节奏控制、止损止盈检查。这些被动信号驱动的部分运转正常。

但问题在于:有很多规则不该等信号,而应该主动扫描

比如终止条件——计划亏了 15% 应该自动停下来,但这需要有人定期去算;比如交易时段——凌晨三点的信号不该被处理,但代码里没有时段过滤;比如做T——持仓股日内振幅够大就该高抛低吸,但这需要主动去扫价格。

原则:系统中有两类规则——响应式(来了信号再判断)和主动式(定期去检查)。缺少主动式执行器,意味着一半的配置永远不会生效。

第二章:四个执行器——补全缺失的拼图

分析完十个策略配置的覆盖情况,缺的恰好是四个主动式执行器:

执行器 读取配置 做什么 触发方式
交易时段守卫 TradingWindowConfig 过滤非交易时段信号、排除黑名单 信号路径前置
终止条件检查 TerminationConfig 亏损/回撤/过期超限时停止计划 每 5 分钟扫描
做T引擎 TTradingConfig 日内价差满足条件时高抛低吸 每 3 分钟扫描
定时选股 ScreeningConfig 按 cron 表达式执行选股并导入池 Cron 定时

前三个是定时轮询型,最后一个是 cron 调度型。用一个独立的 APScheduler 服务把它们统一起来。

原则:识别出系统中"定义了规则但没有执行者"的配置项,为每一类规则匹配对应的触发模式——响应式走信号路径,主动式走定时调度。

第三章:设计——为什么用独立进程

触发方式有三种选择:集成到现有信号监控循环(30 秒一次)、集成到信号路径做后处理、独立 APScheduler 服务。

选择独立进程的原因很直接:

职责分离。信号监控的职责是"发现信号并推送",不应该关心计划是否该终止。如果监控循环崩溃,终止检查还得继续跑。

调度灵活。选股可能是每天早上 9 点跑一次,终止检查每 5 分钟跑一次,做T每 3 分钟扫一次。这些节奏差异很大,塞进同一个 30 秒循环会让代码变得复杂。

独立伸缩。做T引擎需要频繁获取实时行情,对 TDX 连接有压力。放在独立进程里,可以单独控制资源,不影响信号监控的稳定性。

PlanExecutorApp APScheduler BlockingScheduler 时段守卫 信号前置 终止检查 每 5 分钟 做T引擎 每 3 分钟 选股调度 Cron 定时 共享依赖 InvestmentPlan · StockPlanRegistry · Redis 下单队列 MongoDB Redis TDX 行情
原则:不同节奏的任务应该跑在不同的进程里。信号监控是高频低延迟的,策略执行是低频可容忍延迟的——混在一起只会互相拖累。

第四章:实现——每个执行器的核心逻辑

交易时段守卫:信号路径的前置门控

这是唯一一个不走定时调度的执行器。它挂在信号处理链路的最前面——信号进来先过这一关,不通过直接跳过,不进入后续的风控检查。

为什么要放在最前面?因为失败快速。一个凌晨两点的信号,没必要去查询持仓、查成交记录、做复杂的风控判断。时段不对,直接丢弃。

def check(self, plan, signal):
    cfg = plan.trading_window

    # 计划有效期
    if cfg.active_end and now > parse(cfg.active_end):
        return skip("计划已过有效期")

    # 交易时段(默认 09:30 ~ 15:00)
    if not (daily_start <= now.time() <= daily_end):
        return skip("不在交易时段")

    # 黑名单
    if signal.code in cfg.blacklist:
        return skip("标的在黑名单中")

    return None  # 通过,继续后续处理

这段代码的逻辑很直接:三层过滤,每层不通过就立即返回,不浪费后续计算。返回 None 表示放行,返回 TradeDecision 表示拦截。

终止条件检查器:定期扫描计划健康状态

这个执行器的工作模式像一个体检医生——每 5 分钟给所有活跃计划做一次检查,发现不健康的立即叫停。

五个检查项按优先级排列:过期最优先(时间到了就该停),然后是累计亏损最大回撤(账户层面的硬约束),再是当日亏损(日内风控),最后是全部平仓(业务层面的退出条件)。

def _check_plan(self, plan):
    cfg = plan.termination

    # 1. 计划过期
    if cfg.expire_at and now >= cfg.expire_at:
        return "expire_at"

    # 2. 累计亏损超限
    if cfg.max_loss_pct and plan.pnl_pct <= -cfg.max_loss_pct:
        return "max_loss"

    # 3. 最大回撤超限
    if plan.max_drawdown >= cfg.max_drawdown_pct:
        return "max_drawdown"

    # 4~5 ...

触发终止时的动作很重:状态转换 + 释放注册表 + 日志告警。释放注册表意味着这个计划的股票代码不再被占用,可以被其他计划接管。

做T引擎:日内高抛低吸

做T是最有技术含量的执行器。它需要实时价格、持仓均价、日内振幅,然后在毫秒级别做决策。

核心思路是:当前价偏离持仓均价超过阈值时,执行反向操作。偏高就先卖后买(高抛),偏低就先买后卖(低吸)。

def _evaluate(self, code, current_price, plan):
    cfg = plan.t_trading

    # 持仓均价
    avg_price = total_amount / total_quantity

    # 日内价差
    spread_pct = (current_price - avg_price) / avg_price * 100

    # 高抛:当前价高于均价超过阈值
    if spread_pct >= cfg.min_spread_pct:
        return sell_signal

    # 低吸:当前价低于均价超过阈值
    if spread_pct <= -cfg.min_spread_pct:
        return buy_signal

做T的关键不是"能不能赚",而是风险可控round_trip_max_hours 控制回转时限——超过时限就不开新仓;quantity_ratio 控制仓位比例——永远只动一部分持仓,不全仓做T。

实时价格从哪里来?复用已有的 TDX 行情接口 get_security_quotes,批量获取,减少连接开销。

定时选股:让策略自己找标的

这是四个执行器中最灵活的一个。每个计划的 screening.schedule_cron 字段直接存 cron 表达式,比如 "0 9 * * 1-5" 表示工作日 9 点执行。

启动时遍历所有活跃计划,为每个有 cron 配置的计划注册一个 APScheduler job。选股结果如果配置了 auto_import_to_pool,自动导入到计划的股票池。

def register_all(self, scheduler):
    for plan in active_plans:
        cfg = plan.screening
        if not cfg.strategy_name or not cfg.schedule_cron:
            continue

        scheduler.add_job(
            run_screening,
            "cron",
            **parse_cron(cfg.schedule_cron),  # "0 9 * * 1-5"
        )

策略类通过延迟加载的方式获取——不在启动时全量导入,用到哪个才加载哪个。这样即使某个策略依赖的服务不可用,也不会影响其他策略的注册。

原则:每个执行器只关注一件事,只读自己需要的配置。时段守卫不关心止损,终止检查不关心做T。单一职责让每个执行器可以独立测试、独立部署、独立修改。

第五章:部署——容器化运行

执行引擎作为独立进程运行,在 Docker 环境中对应一个独立容器。和已有的信号监控容器 fq_guardian 并列,共享同一组基础设施依赖。

MongoDB + Redis + TDX 行情 fq_guardian 信号监控 30s 轮询 fq_plan_executor APScheduler 执行引擎 Redis 下单队列(共享)

两个容器通过Redis 下单队列协同。信号监控把计划路由后的信号推到队列,执行引擎的做T结果也推到同一个队列。下游的交易执行服务从队列消费,统一处理。

Docker Compose 中的配置和已有容器保持一致——同样的镜像、同样的环境变量、同样的健康检查依赖:

fq_plan_executor:
  image: fq_rear:latest
  command: ["python", "-m", "freshquant.plan.executor"]
  depends_on:
    fq_mongodb: { condition: service_healthy }
    fq_redis:   { condition: service_healthy }
原则:容器之间通过消息队列解耦,不直接调用。一个容器挂了不影响另一个,重启后自动恢复工作。

总结

回顾这次改动,核心思路只有一条:为每一类配置规则匹配对应的执行模式

信号驱动的规则(开仓、加仓、止损)走现有链路,定期检查的规则(终止条件、做T)走定时调度,时段过滤走前置守卫,选股走 cron 定时。十个配置项,三种触发模式,四个执行器——刚好覆盖。

改动范围是六个新文件加一个修改:

文件 作用
trading_window_guard.py 时段 + 黑名单守卫
termination_checker.py 终止条件定期扫描
t_trading_engine.py 日内做T引擎
screening_scheduler.py 定时选股调度
executor.py APScheduler 服务入口
lifecycle_service.py 集成时段守卫(修改)
核心原则:定义规则的成本很低,执行规则的成本很高。如果系统中存在"配置了但不执行"的规则,问题不在配置,而在缺少对应的执行器。补全执行链路,才能让配置真正变成行为。

附录:技术速查

启动命令python -m freshquant.plan.executor

执行器调度频率

执行器 频率 配置来源
TerminationChecker 5 min TerminationConfig
TTradingEngine 3 min TTradingConfig
ScreeningScheduler Cron ScreeningConfig.schedule_cron
TradingWindowGuard 每信号 TradingWindowConfig

信号完整路径

信号到达 时段守卫 上下文 风控检查 Redis 黄色节点 = 新增的时段守卫层,信号路径的第一道关卡
浙ICP备2026022231号-1      浙公网安备33011002019439号