投资计划引擎:把策略配置变成自动执行的利器
配置写好了,谁来执行?从「被动等信号」到「主动跑策略」的架构跃迁
投资计划引擎:把策略配置变成自动执行的利器
配置写好了,谁来执行?从「被动等信号」到「主动跑策略」的架构跃迁
「量化实战手记」— 记录从想法到落地的真实开发历程
引言:配置与执行的断层
量化系统有一个常见的困境:策略配置写得很漂亮,但没人去执行。
我们的投资计划模块正是这种情况。十个强类型的策略配置——选股、开仓、加仓、做T、止损、止盈、仓位管理、资金分配、终止条件、交易时段——每一个都定义了清晰的参数和规则。但在实际运行中,只有开仓、加仓和止损被信号触发时被动执行,其余六个配置躺在数据库里,从来没有代码去读它们。
这就像买了一辆配置齐全的车,但只有方向盘和刹车是接了线的。座椅加热、自动巡航、车道保持——都在配置表上打了勾,实际一个也没生效。
这篇文章记录的,就是把那些"写了但没用"的配置,变成真正自动运行的执行引擎的过程。
第一章:诊断——什么在运行,什么在沉睡
先搞清楚现状。现有的执行链路是这样的:
信号到来时,系统会做开仓判断、加仓节奏控制、止损止盈检查。这些被动信号驱动的部分运转正常。
但问题在于:有很多规则不该等信号,而应该主动扫描。
比如终止条件——计划亏了 15% 应该自动停下来,但这需要有人定期去算;比如交易时段——凌晨三点的信号不该被处理,但代码里没有时段过滤;比如做T——持仓股日内振幅够大就该高抛低吸,但这需要主动去扫价格。
原则:系统中有两类规则——响应式(来了信号再判断)和主动式(定期去检查)。缺少主动式执行器,意味着一半的配置永远不会生效。
第二章:四个执行器——补全缺失的拼图
分析完十个策略配置的覆盖情况,缺的恰好是四个主动式执行器:
| 执行器 | 读取配置 | 做什么 | 触发方式 |
|---|---|---|---|
| 交易时段守卫 | TradingWindowConfig | 过滤非交易时段信号、排除黑名单 | 信号路径前置 |
| 终止条件检查 | TerminationConfig | 亏损/回撤/过期超限时停止计划 | 每 5 分钟扫描 |
| 做T引擎 | TTradingConfig | 日内价差满足条件时高抛低吸 | 每 3 分钟扫描 |
| 定时选股 | ScreeningConfig | 按 cron 表达式执行选股并导入池 | Cron 定时 |
前三个是定时轮询型,最后一个是 cron 调度型。用一个独立的 APScheduler 服务把它们统一起来。
原则:识别出系统中"定义了规则但没有执行者"的配置项,为每一类规则匹配对应的触发模式——响应式走信号路径,主动式走定时调度。
第三章:设计——为什么用独立进程
触发方式有三种选择:集成到现有信号监控循环(30 秒一次)、集成到信号路径做后处理、独立 APScheduler 服务。
选择独立进程的原因很直接:
职责分离。信号监控的职责是"发现信号并推送",不应该关心计划是否该终止。如果监控循环崩溃,终止检查还得继续跑。
调度灵活。选股可能是每天早上 9 点跑一次,终止检查每 5 分钟跑一次,做T每 3 分钟扫一次。这些节奏差异很大,塞进同一个 30 秒循环会让代码变得复杂。
独立伸缩。做T引擎需要频繁获取实时行情,对 TDX 连接有压力。放在独立进程里,可以单独控制资源,不影响信号监控的稳定性。
原则:不同节奏的任务应该跑在不同的进程里。信号监控是高频低延迟的,策略执行是低频可容忍延迟的——混在一起只会互相拖累。
第四章:实现——每个执行器的核心逻辑
交易时段守卫:信号路径的前置门控
这是唯一一个不走定时调度的执行器。它挂在信号处理链路的最前面——信号进来先过这一关,不通过直接跳过,不进入后续的风控检查。
为什么要放在最前面?因为失败快速。一个凌晨两点的信号,没必要去查询持仓、查成交记录、做复杂的风控判断。时段不对,直接丢弃。
def check(self, plan, signal):
cfg = plan.trading_window
# 计划有效期
if cfg.active_end and now > parse(cfg.active_end):
return skip("计划已过有效期")
# 交易时段(默认 09:30 ~ 15:00)
if not (daily_start <= now.time() <= daily_end):
return skip("不在交易时段")
# 黑名单
if signal.code in cfg.blacklist:
return skip("标的在黑名单中")
return None # 通过,继续后续处理
这段代码的逻辑很直接:三层过滤,每层不通过就立即返回,不浪费后续计算。返回 None 表示放行,返回 TradeDecision 表示拦截。
终止条件检查器:定期扫描计划健康状态
这个执行器的工作模式像一个体检医生——每 5 分钟给所有活跃计划做一次检查,发现不健康的立即叫停。
五个检查项按优先级排列:过期最优先(时间到了就该停),然后是累计亏损和最大回撤(账户层面的硬约束),再是当日亏损(日内风控),最后是全部平仓(业务层面的退出条件)。
def _check_plan(self, plan):
cfg = plan.termination
# 1. 计划过期
if cfg.expire_at and now >= cfg.expire_at:
return "expire_at"
# 2. 累计亏损超限
if cfg.max_loss_pct and plan.pnl_pct <= -cfg.max_loss_pct:
return "max_loss"
# 3. 最大回撤超限
if plan.max_drawdown >= cfg.max_drawdown_pct:
return "max_drawdown"
# 4~5 ...
触发终止时的动作很重:状态转换 + 释放注册表 + 日志告警。释放注册表意味着这个计划的股票代码不再被占用,可以被其他计划接管。
做T引擎:日内高抛低吸
做T是最有技术含量的执行器。它需要实时价格、持仓均价、日内振幅,然后在毫秒级别做决策。
核心思路是:当前价偏离持仓均价超过阈值时,执行反向操作。偏高就先卖后买(高抛),偏低就先买后卖(低吸)。
def _evaluate(self, code, current_price, plan):
cfg = plan.t_trading
# 持仓均价
avg_price = total_amount / total_quantity
# 日内价差
spread_pct = (current_price - avg_price) / avg_price * 100
# 高抛:当前价高于均价超过阈值
if spread_pct >= cfg.min_spread_pct:
return sell_signal
# 低吸:当前价低于均价超过阈值
if spread_pct <= -cfg.min_spread_pct:
return buy_signal
做T的关键不是"能不能赚",而是风险可控。round_trip_max_hours 控制回转时限——超过时限就不开新仓;quantity_ratio 控制仓位比例——永远只动一部分持仓,不全仓做T。
实时价格从哪里来?复用已有的 TDX 行情接口 get_security_quotes,批量获取,减少连接开销。
定时选股:让策略自己找标的
这是四个执行器中最灵活的一个。每个计划的 screening.schedule_cron 字段直接存 cron 表达式,比如 "0 9 * * 1-5" 表示工作日 9 点执行。
启动时遍历所有活跃计划,为每个有 cron 配置的计划注册一个 APScheduler job。选股结果如果配置了 auto_import_to_pool,自动导入到计划的股票池。
def register_all(self, scheduler):
for plan in active_plans:
cfg = plan.screening
if not cfg.strategy_name or not cfg.schedule_cron:
continue
scheduler.add_job(
run_screening,
"cron",
**parse_cron(cfg.schedule_cron), # "0 9 * * 1-5"
)
策略类通过延迟加载的方式获取——不在启动时全量导入,用到哪个才加载哪个。这样即使某个策略依赖的服务不可用,也不会影响其他策略的注册。
原则:每个执行器只关注一件事,只读自己需要的配置。时段守卫不关心止损,终止检查不关心做T。单一职责让每个执行器可以独立测试、独立部署、独立修改。
第五章:部署——容器化运行
执行引擎作为独立进程运行,在 Docker 环境中对应一个独立容器。和已有的信号监控容器 fq_guardian 并列,共享同一组基础设施依赖。
两个容器通过Redis 下单队列协同。信号监控把计划路由后的信号推到队列,执行引擎的做T结果也推到同一个队列。下游的交易执行服务从队列消费,统一处理。
Docker Compose 中的配置和已有容器保持一致——同样的镜像、同样的环境变量、同样的健康检查依赖:
fq_plan_executor:
image: fq_rear:latest
command: ["python", "-m", "freshquant.plan.executor"]
depends_on:
fq_mongodb: { condition: service_healthy }
fq_redis: { condition: service_healthy }
原则:容器之间通过消息队列解耦,不直接调用。一个容器挂了不影响另一个,重启后自动恢复工作。
总结
回顾这次改动,核心思路只有一条:为每一类配置规则匹配对应的执行模式。
信号驱动的规则(开仓、加仓、止损)走现有链路,定期检查的规则(终止条件、做T)走定时调度,时段过滤走前置守卫,选股走 cron 定时。十个配置项,三种触发模式,四个执行器——刚好覆盖。
改动范围是六个新文件加一个修改:
| 文件 | 作用 |
|---|---|
trading_window_guard.py |
时段 + 黑名单守卫 |
termination_checker.py |
终止条件定期扫描 |
t_trading_engine.py |
日内做T引擎 |
screening_scheduler.py |
定时选股调度 |
executor.py |
APScheduler 服务入口 |
lifecycle_service.py |
集成时段守卫(修改) |
核心原则:定义规则的成本很低,执行规则的成本很高。如果系统中存在"配置了但不执行"的规则,问题不在配置,而在缺少对应的执行器。补全执行链路,才能让配置真正变成行为。
附录:技术速查
启动命令:python -m freshquant.plan.executor
执行器调度频率:
| 执行器 | 频率 | 配置来源 |
|---|---|---|
| TerminationChecker | 5 min | TerminationConfig |
| TTradingEngine | 3 min | TTradingConfig |
| ScreeningScheduler | Cron | ScreeningConfig.schedule_cron |
| TradingWindowGuard | 每信号 | TradingWindowConfig |
信号完整路径: