行情引擎的「体检」机制
行情引擎的「体检」机制
每 5 分钟一次全量扫描,让数据缺口无处藏身
「量化实战手记」— 记录从想法到落地的真实开发历程
引言:一块砖的缺失
行情引擎在实时推送 Bar 数据时,每根 Bar 到达后都会与上一根做时间连续性检查。如果中间有空档,立刻补数据。这套机制在大多数场景下运行良好。
但有一个盲区:如果最后一根 Bar 的时间完全正确,但前面的某些 Bar 丢了,系统是发现不了的。
这就好比一栋楼,顶层好好的,但中间某几层少了砖——你站在楼顶往下看,一切正常。只有把每层都走一遍,才能发现那些空洞。
第一章:为什么相邻检测不够
先看看现有的缺口检测是怎么工作的。引擎收到每一根 Bar 时,GapDetector 会记住「上次 Bar 到了哪个时间」,然后推算下一个应该出现的时间。
这种「相邻检测」能发现连续两根 Bar 之间的空档。核心逻辑很简洁:推算下一个应该出现的时间,如果实际到的更晚,中间就是缺口。
但问题是:一旦状态推进到最新时间,历史窗口就被永久关上了。GapDetector 只记住「现在到了哪里」,不记住「过去少了什么」。
原则:增量检测只能发现「此刻」的断裂,无法回溯「历史」的缺失。任何只追踪最新状态的检测机制,都存在历史盲区。
第二章:两种补漏思路
要补上这个盲区,有两条路可以走:
| 方案 | 做法 | 特点 |
|---|---|---|
| 启动时全量对齐 | 服务启动时从数据库读当日所有 Bar,与交易时间表做 diff | 只跑一次,运行期间不会再查 |
| 周期性全量扫描 | 每隔几分钟,拿数据库中实际 Bar 与完整交易时间表做对比 | 持续检测,运行中也能发现缺口 |
启动时全量对齐的问题在于:它只查一次。如果服务在交易时段中途出现短暂的数据源故障,恢复后历史缺口就再也没人管了。
周期性扫描更合适。它就像定期体检——不管你之前感觉多好,每隔一段时间都做一次全面检查,确保没有遗漏。
间隔多久合适?5 分钟。理由很简单:
- 实时相邻检测已经兜住了大部分情况,周期扫描主要补漏
- 5 分钟意味着最多 5 根 1m Bar 缺失就能被发现
- 一个交易日约 48 次扫描,对数据库压力可控
原则:当增量机制有盲区时,用周期性全量检查来补充。频率的选择要在「发现速度」和「资源开销」之间取平衡。
第三章:核心实现
全量扫描的核心思路:对每个标的,查询 MongoDB 中当日实际已有的 Bar 时间集合,与交易时间表生成的完整时间集合做差集。
关键代码——full_scan 方法,按标的逐一查询已有 Bar 时间,与完整时间表做差集:
async def full_scan(self, trading_date):
for period in self._scan_periods: # 默认 ["1m"]
# 生成当日完整交易时间表
expected = self._trading_hours.bar_times(trading_date, period)
expected_set = set(expected)
for symbol in self._symbols:
# 查询该标的当日已有的 Bar 时间
actual = await self._query_symbol_times(symbol, trading_date, period)
# 差集 = 缺失的 Bar
missing = sorted(expected_set - actual)
if missing:
yield BackfillTask(symbol, period, missing, "full_scan")
这段代码的设计意图很明确:不做全局模糊匹配,而是按标的精确查询。每个标的独立查 MongoDB,独立计算差集,所以能精确知道「哪个标的缺了哪些 Bar」。
查询 MongoDB 的部分只取 datetime 字段,不读完整文档,减少网络和内存开销:
cursor = collection.find(
{
"code": symbol,
"frequence": frequence,
"datetime": {"$gte": day_start, "$lte": day_end},
},
{"datetime": 1, "_id": 0}, # 只取时间字段
)
原则:全量扫描要精确到标的级别。全局聚合虽然快,但无法定位「谁缺了什么」,最终还是要拆分,不如一开始就按标的查。
第四章:踩坑与修正
初版实现犯了一个错误:先全局查询所有 Bar 时间,再想办法把缺失时间分配给各标的。
问题是——从时间点无法反推属于哪个标的。09:33 这根 Bar 缺了,但它是 sh600000 缺的还是 sz000001 缺的?全局聚合后这些信息全部丢失了。
第一反应是用某种均分策略把缺失时间分配给标的,让 fetch_history 自然过滤。但这很不优雅,而且会产生大量无效请求。
修正方案:把查询粒度下沉到标的级别。每个标的独立查自己的 Bar 时间,独立算差集。虽然查询次数多了(标的数 x 周期数),但每次查询有索引命中,实际开销可控。
原则:当聚合后的信息不够定位问题时,不要在聚合层做猜测和分配。回到原始粒度,用精确查询替代模糊分配。
第五章:自动化调度
全量扫描不能只跑一次,需要周期性执行。在 FeedRunner 中新增一个后台任务,每 5 分钟触发一次。
调度逻辑有两个要点:
第一,只在交易日执行。非交易日没有行情,扫描无意义,白白浪费数据库查询。
第二,通过 FeedEngine.enqueue_backfill() 入队,而不是直接访问内部队列。这是一个很小的封装,但让依赖关系更清晰——Runner 不需要知道 Engine 内部用什么数据结构。
async def _gap_scan_loop(self, gap_detector, interval):
await asyncio.sleep(interval) # 首次延迟
while not self._stopped:
now = datetime.now(_CST)
if gap_detector._trading_hours.is_trading_day(now.date()):
tasks = await gap_detector.full_scan(now.date())
for task in tasks:
await self._engine.enqueue_backfill(task)
await asyncio.sleep(interval)
间隔时间通过配置注入,默认 300 秒,可以通过环境变量 FRESHQUANT_GAP_SCAN_INTERVAL 调整。
原则:周期性任务的间隔应该是可配置的。不同场景对「发现速度」的要求不同,硬编码间隔等于替用户做了决定。
总结
这次改动涉及三个文件,新增约 80 行代码:
| 模块 | 改动 |
|---|---|
| GapDetector | 新增 bind()、full_scan()、_query_symbol_times() |
| FeedRunner | 新增 _gap_scan_loop(),FeedConfig 新增 gap_scan_interval |
| FeedEngine | 新增 enqueue_backfill() 公开方法 |
整体设计思路:增量检测 + 周期性全量扫描,形成互补的双保险。
- 实时相邻检测:发现「此刻」的断裂,延迟为 0
- 周期全量扫描:发现「历史」的缺失,延迟最多 5 分钟
核心原则:没有任何单一检测手段能覆盖所有场景。增量检测快但有盲区,全量扫描慢但能补漏。两者配合,才能让数据缺口无处藏身。
附录:技术速查
如果你想在自己的项目中实现类似的「体检」机制,几个关键参数供参考:
扫描间隔:5 分钟。对你的场景来说,需要权衡「数据延迟容忍度」和「数据库查询开销」。如果标的数量超过 500 只,考虑分批扫描。
查询优化:只取 datetime 字段,利用 {code, frequence, datetime} 复合索引。避免读取完整文档。
只扫描基础周期:1m Bar 是基础数据,5m/15m/60m/1d 由 Resampler 从 1m 合成。扫描 1m 就够了,不需要扫描多周期。