自动化执行 — IBKR API 定时跑
| 概念 | 解释 | 对应 cron 的差异 |
日期: 2026-07-30 方向: Phase 3 / 自动化 阶段: Phase 3: 实盘+规模化+迁移 标签: #Automation #Cron #Airflow #TaskScheduler #IBKR #Production #DAG
今日目标
| 类型 | 内容 |
|---|---|
| 学习 | 三层自动化(cron / Task Scheduler / Airflow)的取舍、IBKR Gateway 自动重启的应对、幂等与状态机思路 |
| 实操 | 把 Phase 1-2 三大策略(双因子月度 / Wheel 周度 / IC + LLM 事件触发)拆成可调度的 production 脚本 |
| 产出 | TR-DAY82 笔记 + rebalance.py / wheel_check.py / daily_report.py / state_machine.py 骨架 + cron 表 |
Phase 3 Week 12 终章前的「自动化收口」。Day 1-81 我们把三套策略跑通了,今天要回答的问题是:当我下个月忙到没空看盘时,这套系统能不能自己运行?
一、为什么 Day 82 才讲自动化(顺序的合理性)
很多人学量化时第一个想到的就是「写个 bot 让它自己跑」。这是反模式。理由:
- 没跑过手动版本就上自动化 = 把不确定性放大 N 倍:人在 loop 里至少能 catch 异常;移除人之后,bug 直接变成钱。
- 自动化前必须先有可观测性(Day 83 监控告警):你不能让一个看不见的东西替你下单。
- 自动化前必须先有幂等性:脚本任何一步失败重跑都不能爆仓。Day 1-81 我们把策略写成「拿到当前状态 → 计算目标状态 → 执行 diff」的模式,今天这套自动化才能站住。
| 阶段 | 操作模式 | 适用 |
|---|---|---|
| Day 1-50 | 手动执行(笔记本里跑 cell) | 学习阶段 |
| Day 51-75 | 半自动(脚本 + 人工审核 fill) | 实盘起步 |
| Day 82+ | 全自动(cron 触发,告警兜底) | 规模化 |
PM 视角的对应:产品从「人肉运营」到「自动化运营」的演进。先有 SOP,再有 workflow,最后有自驱动系统。不能跳级——你没写过 SOP 的流程是不可能跑自动化的,因为你都不知道异常该怎么处理。
二、自动化的总目标:把哪些动作搬走
Phase 1-2 三策略对自动化的需求频率完全不同,必须分开调度。
| 策略 | 触发条件 | 频率 | 自动化必要性 |
|---|---|---|---|
| 双因子组合 rebalance | 每月 1 号开盘后 | 月度 1 次 | 中(手动也能做,但容易忘) |
| Wheel 仓位检查 + Roll | 周一开盘后 | 每周 1 次 | 高(Roll 决策时间敏感) |
| Iron Condor 财报触发 | 财报前 N 天 | 事件驱动 | 极高(错过时间窗就废了) |
| LLM signal scan | 财报当天 / 重大新闻 | 事件驱动 | 极高 |
| 日报生成 | 16:30 ET 收盘后 | 每日 1 次 | 中(看心情,但有 audit 价值) |
| 周报 | 周日晚 20:00 | 每周 1 次 | 低(PM 自己写更有价值) |
核心认知:不是所有东西都要自动化。让 LLM 帮你想策略 / 让你自己拍板 / 让脚本执行——这是 <$5k 个人量化最舒服的人机配比。
三、三层自动化方案对比
3.1 三层方案
| 层 | 方案 | 适合规模 | 复杂度 | <$5k 推荐 |
|---|---|---|---|---|
| L1 | 本地 cron(Linux/Mac) | 个人单机 | ★ | ✓✓ |
| L2 | Windows Task Scheduler | Windows 单机 | ★★ | ✓✓(Win 用户) |
| L3 | Apache Airflow | 多 DAG / 团队 / 复杂依赖 | ★★★★ | ✗(懂概念即可) |
3.2 cron 的优势
- 系统自带,零依赖
- 配置一行解决
- 失败一目了然(看 mail / syslog)
- 用了 30+ 年的稳定性
3.3 Airflow 的优势(为什么大公司用)
- DAG:一个任务依赖另一个,可视化
- retry / SLA:失败自动重试,超 SLA 告警
- 回填(backfill):错过的任务可以补跑
- UI:哪个任务跑了多久,一目了然
- scheduler 容错:服务挂了重启后能接着跑
3.4 关键决策
账户 <$5k & 单人 → cron / Task Scheduler
账户 $5k-$50k & 多策略 → cron 够用,但加好 logging
账户 $50k+ / 多人 / 跨市场 → Airflow / Prefect / Dagster
今天的实操路径:cron(如果在 Linux/Mac)或 Task Scheduler(如果在 Windows)。Airflow 第七章讲概念,不实操。
四、cron 语法速记
cron 表 5 字段:分 时 日 月 周。
| cron 表达式 | 含义 |
|---|---|
0 9 * * 1 | 每周一 09:00 |
30 9 1 * * | 每月 1 号 09:30 |
30 16 * * 1-5 | 工作日 16:30 |
*/5 * * * * | 每 5 分钟 |
0 20 * * 0 | 每周日 20:00 |
0 9-16 * * 1-5 | 工作日每小时(9-16 点) |
4.1 Phase 1-2 策略的 cron 表(美东时间 ET)
# === Phase 1-2 量化策略调度(机器时区配置为 America/New_York) ===
# 月度:双因子 rebalance(每月 1 号开盘 30 分钟后)
30 9 1 * * /home/quant/venv/bin/python /home/quant/strategies/rebalance.py >> /var/log/quant/rebalance.log 2>&1
# 周度:Wheel 仓位检查 + Roll 决策(每周一 9:30)
30 9 * * 1 /home/quant/venv/bin/python /home/quant/strategies/wheel_check.py >> /var/log/quant/wheel.log 2>&1
# 日度:财报触发 + LLM signal scan(工作日 7:00 提前扫一遍当天财报)
0 7 * * 1-5 /home/quant/venv/bin/python /home/quant/strategies/earnings_scan.py >> /var/log/quant/earnings.log 2>&1
# 日度:日报生成(工作日 16:30 收盘后)
30 16 * * 1-5 /home/quant/venv/bin/python /home/quant/reports/daily_report.py >> /var/log/quant/daily.log 2>&1
# 周度:周报(每周日 20:00)
0 20 * * 0 /home/quant/venv/bin/python /home/quant/reports/weekly_report.py >> /var/log/quant/weekly.log 2>&1
# 健康检查:每小时 ping IB Gateway,确保还活着
0 * * * * /home/quant/venv/bin/python /home/quant/ops/health_check.py >> /var/log/quant/health.log 2>&1
4.2 关键坑
| 坑 | 解释 | 解决 |
|---|---|---|
| 时区:cron 默认用机器时区 | 美股交易要的是 ET,机器可能是 UTC | 在脚本头 os.environ['TZ'] = 'America/New_York' 或服务器装 ET 时区 |
| PATH 不全:cron 环境跟 shell 不一样 | pip / python 找不到 | 用绝对路径调 venv 里的 python |
% 在 cron 里是特殊字符 | 用了会被当 newline | 用 \% 转义 |
| stderr 不重定向就丢了 | 报错看不到 | 永远加 2>&1 |
| 失败没人知道 | cron 默认只发 local mail | 在脚本里加 Telegram/邮件告警(见第八章) |
五、Windows Task Scheduler
Windows 用户的对应方案。GUI 比 cron 易用,但坑也不少。
5.1 创建任务(GUI 流程)
任务计划程序 → 创建任务(不是「创建基本任务」,那个功能太弱)
[常规]:
- 名称:Quant_Wheel_Check
- ☑ 不管用户是否登录都要运行
- ☑ 使用最高权限运行
[触发器]:
- 每周一 9:30 ET(注意时区!Win 不支持时区,需要在脚本里转)
[操作]:
- 启动程序
- 程序:C:\quant\venv\Scripts\python.exe
- 参数:C:\quant\strategies\wheel_check.py
- 起始于:C:\quant\strategies\
[条件]:
- ☐ 只在交流电源时启动(笔记本党必须取消勾选)
- ☑ 唤醒计算机运行此任务 ← 关键!
[设置]:
- ☑ 允许按需运行
- ☑ 如果任务失败,按以下频率重试:1 分钟,3 次
- ☑ 如果运行时间超过 30 分钟,则停止任务
5.2 Windows 特殊坑
| 坑 | 解决 |
|---|---|
| 睡眠 / 休眠时任务不跑 | 勾选「唤醒计算机运行此任务」,并在电源选项里允许唤醒计时器 |
| 没有时区支持 | 在脚本内部用 pytz / zoneinfo 校验当前 ET 时间是不是预期窗口 |
| PowerShell 执行策略阻止 | 用 python.exe 直接调 .py,不要套 .ps1 wrapper |
| 任务历史默认关闭 | 任务计划程序 → 启用所有任务历史,否则失败查不到 |
| UAC 弹窗阻塞 | 勾选「使用最高权限运行」+ 用户登录态运行 |
5.3 Linux 子系统(WSL)方案
如果 Windows 主机想用 cron 语法,可以装 WSL2 + Ubuntu,里面跑 cron。但注意:WSL 关闭时 cron 不跑,需要把 WSL 设成开机启动 + 不自动休眠。综合下来不如直接用 Task Scheduler。
六、Airflow 入门(懂概念,不实操)
<$5k 不需要 Airflow,但面试 / 简历 / 未来扩展都需要会讲。
6.1 核心概念
| 概念 | 解释 | 对应 cron 的差异 |
|---|---|---|
| DAG | Directed Acyclic Graph,一组任务 + 依赖关系 | cron 是孤立任务,DAG 是任务图 |
| Task | DAG 中的单个步骤 | 对应 cron 一行 |
| Operator | 任务的执行器(PythonOperator / BashOperator / ...) | cron 只能 shell |
| Scheduler | 决定什么时候跑什么 | cron 守护进程 |
| Executor | 怎么跑(Local / Celery / Kubernetes) | cron 没这层 |
| SLA | 任务超时阈值,超了告警 | cron 无 |
| Retry | 失败自动重试 N 次 | cron 无(需要脚本内实现) |
| Backfill | 历史日期补跑 | cron 无 |
| XCom | 任务间传递数据 | cron 无(要走文件 / DB) |
6.2 一个 Airflow DAG 长什么样
# dags/wheel_strategy.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'quant',
'retries': 3,
'retry_delay': timedelta(minutes=5),
'sla': timedelta(minutes=30),
'email_on_failure': True,
'email': ['info@veloxwallet.com'],
}
with DAG(
'wheel_weekly',
default_args=default_args,
schedule_interval='30 9 * * 1', # 每周一 9:30
start_date=datetime(2026, 7, 1),
catchup=False,
tags=['quant', 'wheel'],
) as dag:
check_ib_gateway = PythonOperator(
task_id='check_ib_gateway',
python_callable=lambda: __import__('ops.health').check_gw(),
)
fetch_positions = PythonOperator(
task_id='fetch_positions',
python_callable=lambda: __import__('strategies.wheel').fetch(),
)
compute_rolls = PythonOperator(
task_id='compute_rolls',
python_callable=lambda: __import__('strategies.wheel').compute(),
)
execute_rolls = PythonOperator(
task_id='execute_rolls',
python_callable=lambda: __import__('strategies.wheel').execute(),
)
send_report = PythonOperator(
task_id='send_report',
python_callable=lambda: __import__('reports').wheel_report(),
)
check_ib_gateway >> fetch_positions >> compute_rolls >> execute_rolls >> send_report
这是 cron 给不了的:check_ib_gateway 挂了,下游全部不执行,UI 里红一片,自动告警。cron 写出来需要在每个脚本里手写依赖检查,难维护。
6.3 什么时候真的要上 Airflow
- 任务数 > 20 个
- 任务之间有复杂依赖
- 需要 backfill(比如想重跑过去 6 个月每天的因子计算)
- 需要团队协作(Airflow UI 大家都能看)
- 不要为「显得专业」而上 Airflow——单 DAG + 单机 cron 也能跑生产,少了几十 GB 的部署复杂度。
七、Production 脚本骨架(4 个文件)
7.1 state_machine.py:共用的防重复执行机制
# ops/state_machine.py
"""幂等性兜底:每个任务实例只跑一次。
失败可重跑,成功不重跑。"""
import json, hashlib, os, fcntl, datetime as dt
from pathlib import Path
STATE_DIR = Path("/var/lib/quant/state")
STATE_DIR.mkdir(parents=True, exist_ok=True)
class TaskState:
PENDING = "pending"
RUNNING = "running"
SUCCESS = "success"
FAILED = "failed"
class TaskRun:
def __init__(self, task_name: str, run_date: dt.date | None = None):
self.task_name = task_name
self.run_date = run_date or dt.date.today()
self.key = hashlib.md5(f"{task_name}:{self.run_date}".encode()).hexdigest()
self.path = STATE_DIR / f"{task_name}_{self.run_date}.json"
def acquire(self) -> bool:
"""返回 True 表示拿到锁、可以执行;False 表示已经跑过 / 正在跑。"""
if self.path.exists():
state = json.loads(self.path.read_text())
if state["status"] == TaskState.SUCCESS:
print(f"[{self.task_name}] already SUCCESS on {self.run_date}, skip")
return False
if state["status"] == TaskState.RUNNING:
started = dt.datetime.fromisoformat(state["started_at"])
if (dt.datetime.now() - started).total_seconds() < 3600:
print(f"[{self.task_name}] still RUNNING (started {started}), skip")
return False
# 超过 1 小时算挂了,允许重跑
self._write(TaskState.RUNNING, started_at=dt.datetime.now().isoformat())
return True
def succeed(self, payload: dict | None = None):
self._write(TaskState.SUCCESS, finished_at=dt.datetime.now().isoformat(),
payload=payload or {})
def fail(self, err: str):
self._write(TaskState.FAILED, finished_at=dt.datetime.now().isoformat(),
error=err)
def _write(self, status: str, **kw):
data = {"task": self.task_name, "date": str(self.run_date), "status": status, **kw}
self.path.write_text(json.dumps(data, indent=2))
这一层就是「production 思维」的核心:任何任务的状态都落盘,可重启、可审计、可手动干预。
7.2 rebalance.py:月度双因子组合再平衡
# strategies/rebalance.py
"""月度双因子组合 rebalance。每月 1 号开盘后跑一次。
幂等:同一月份重跑不会重复下单。"""
from ib_insync import IB
from ops.state_machine import TaskRun
from ops.alerts import notify, alert_fail
from ops.ib_session import connect_with_retry
import datetime as dt, traceback
TASK = "monthly_rebalance"
def main():
run = TaskRun(TASK, run_date=dt.date.today().replace(day=1))
if not run.acquire():
return
ib = None
try:
ib = connect_with_retry(client_id=11)
positions = ib.positions()
targets = compute_targets(ib) # 双因子打分 → 目标权重
orders = diff_to_orders(positions, targets)
# 全部用 LMT 而不是 MKT,避开开盘 5 分钟的高波动
trades = []
for o in orders:
t = ib.placeOrder(o.contract, o.order)
trades.append(t)
ib.sleep(60)
filled = [t for t in trades if t.orderStatus.status == "Filled"]
partial = [t for t in trades if t.orderStatus.status == "Submitted"]
notify(f"[REBALANCE] filled={len(filled)} partial={len(partial)}")
run.succeed(payload={"filled": len(filled), "partial": len(partial)})
except Exception as e:
err = traceback.format_exc()
alert_fail(TASK, err)
run.fail(err)
raise
finally:
if ib and ib.isConnected():
ib.disconnect()
if __name__ == "__main__":
main()
7.3 wheel_check.py:周度 Wheel 检查
# strategies/wheel_check.py
"""每周一开盘 30 分钟后检查 Wheel 仓位。
1) 列出所有当前 short put / covered call
2) 对快到期的(<7DTE)/被深度 ITM 的算 Roll 候选
3) 输出建议 → 看 AUTO_EXECUTE 决定是否直接下单"""
from ops.state_machine import TaskRun
from ops.ib_session import connect_with_retry
from ops.alerts import notify, alert_fail
import os, datetime as dt, traceback
TASK = "wheel_weekly_check"
AUTO_EXECUTE = os.environ.get("WHEEL_AUTO_EXECUTE", "false").lower() == "true"
def main():
run = TaskRun(TASK)
if not run.acquire():
return
ib = None
try:
ib = connect_with_retry(client_id=12)
opt_positions = [p for p in ib.positions() if p.contract.secType == "OPT"]
roll_candidates = []
for p in opt_positions:
dte = days_to_expiry(p.contract.lastTradeDateOrContractMonth)
moneyness = compute_moneyness(ib, p.contract)
if dte < 7 or moneyness < -0.05: # ITM 5% 或 7 天内到期
roll_candidates.append((p, dte, moneyness))
if not roll_candidates:
notify("[WHEEL] no roll candidates this week.")
run.succeed(payload={"candidates": 0})
return
suggestions = [build_roll_suggestion(ib, p) for p, _, _ in roll_candidates]
msg = format_wheel_report(suggestions)
notify(msg)
if AUTO_EXECUTE:
for s in suggestions:
ib.placeOrder(s.contract, s.combo_order)
ib.sleep(30)
run.succeed(payload={"candidates": len(roll_candidates),
"auto_executed": AUTO_EXECUTE})
except Exception as e:
err = traceback.format_exc()
alert_fail(TASK, err)
run.fail(err)
raise
finally:
if ib and ib.isConnected():
ib.disconnect()
if __name__ == "__main__":
main()
关键设计:AUTO_EXECUTE 默认 false。即便我把 cron 设好,初期 4 周脚本只发 Telegram 建议,我手动确认再开 AUTO_EXECUTE=true。这是 production deployment 的标准做法——shadow 模式先跑一段,再切实流量。
7.4 daily_report.py:每日日报
# reports/daily_report.py
"""每个交易日 16:30 ET 生成日报:
- 当日 PnL(realized + unrealized)
- 持仓 Greeks 暴露
- 当日订单成交摘要
- 异常 flag(回撤超阈 / 单仓集中 / margin 紧张)"""
from ops.state_machine import TaskRun
from ops.ib_session import connect_with_retry
from ops.alerts import notify_html
import datetime as dt, traceback
TASK = "daily_report"
def main():
run = TaskRun(TASK)
if not run.acquire():
return
ib = None
try:
ib = connect_with_retry(client_id=13)
account = ib.accountSummary()
positions = ib.positions()
executions = ib.executions()
report = {
"date": str(dt.date.today()),
"nav": account_value(account, "NetLiquidation"),
"daily_pnl": account_value(account, "DailyPnL"),
"gross_exposure": sum(abs(p.marketValue) for p in positions),
"greeks": aggregate_greeks(ib, positions),
"executions": len(executions),
"alerts": check_alerts(account, positions),
}
html = render_html_report(report)
notify_html("Daily Report", html)
run.succeed(payload=report)
except Exception as e:
alert_fail(TASK, traceback.format_exc())
run.fail(str(e))
raise
finally:
if ib and ib.isConnected():
ib.disconnect()
if __name__ == "__main__":
main()
八、错误处理:retry / 告警 / DLQ
8.1 三层失败处理
任务失败
│
▼
1) 自动 retry 3 次(指数 backoff:30s, 2min, 5min)
│
├── 成功 → 继续
└── 仍失败
│
▼
2) Telegram + Email 告警
│
▼
3) 写入 dead letter queue(本地 SQLite)等待人工处理
8.2 retry 包装器
# ops/retry.py
import time, functools, traceback
def retry(times=3, delays=(30, 120, 300), exceptions=(Exception,)):
def deco(fn):
@functools.wraps(fn)
def wrapper(*a, **k):
last = None
for i in range(times):
try:
return fn(*a, **k)
except exceptions as e:
last = e
if i < len(delays):
print(f"[retry] attempt {i+1} failed: {e}; sleep {delays[i]}s")
time.sleep(delays[i])
raise last
return wrapper
return deco
8.3 Telegram 告警
# ops/alerts.py
import os, requests, datetime as dt
BOT = os.environ["TG_BOT_TOKEN"]
CHAT = os.environ["TG_CHAT_ID"]
def notify(msg: str):
requests.post(
f"https://api.telegram.org/bot{BOT}/sendMessage",
json={"chat_id": CHAT, "text": f"{dt.datetime.now():%H:%M} {msg}"},
timeout=10,
)
def alert_fail(task: str, err: str):
notify(f"🚨 [FAIL] {task}\n{err[:500]}")
8.4 Dead Letter Queue
任务最终失败之后不要丢掉——写入本地 SQLite,每周日做一次复盘:
# ops/dlq.py
import sqlite3, json, datetime as dt
def push_dlq(task: str, payload: dict, err: str):
con = sqlite3.connect("/var/lib/quant/dlq.db")
con.execute("""
CREATE TABLE IF NOT EXISTS dlq(
id INTEGER PRIMARY KEY AUTOINCREMENT,
task TEXT, payload TEXT, err TEXT, created_at TEXT
)
""")
con.execute("INSERT INTO dlq(task,payload,err,created_at) VALUES (?,?,?,?)",
(task, json.dumps(payload), err, dt.datetime.now().isoformat()))
con.commit()
这是产品思维迁移:用户提交订单失败不能就消失,必须进可观测的「失败队列」。
九、IBKR API 特殊考虑
9.1 IB Gateway 每日重启
| 重启时间 | 影响 |
|---|---|
| 凌晨 ~04:00 ET(每日自动 reboot) | 所有 client 连接断开,需要重新登录 |
| 周日完整维护 | 系统服务全部下线 ~6 小时 |
| 周二重启(TWS) | 用 IB Gateway 不受影响 |
应对:所有调度任务都不能假设「连接还在」。每次跑都从 connect_with_retry 开始。
9.2 auto-reconnect 包装
# ops/ib_session.py
from ib_insync import IB
import time, os
GW_HOST = os.environ.get("IB_HOST", "127.0.0.1")
GW_PORT = int(os.environ.get("IB_PORT", "4002")) # Paper Gateway
def connect_with_retry(client_id: int, max_attempts: int = 5) -> IB:
ib = IB()
last = None
for i in range(max_attempts):
try:
ib.connect(GW_HOST, GW_PORT, clientId=client_id, timeout=20)
if ib.isConnected():
# 端口护栏:Paper 必须是 4002,Live 4001
assert ib.client.port in (4001, 4002), "wrong port"
if os.environ.get("ALLOW_LIVE") != "true":
assert ib.client.port == 4002, "LIVE port not allowed without ALLOW_LIVE=true"
return ib
except Exception as e:
last = e
print(f"[ib_session] attempt {i+1} failed: {e}")
time.sleep(15 * (i + 1))
raise RuntimeError(f"Cannot connect to IB Gateway after {max_attempts} attempts: {last}")
注意护栏两层:
- 端口必须是 IBKR 已知端口;
- 实盘端口(4001)必须配合
ALLOW_LIVE=true环境变量才允许,否则脚本拒绝执行。这是 Day 1 那条assert ib.client.port == 4002原则的工程化版本。
9.3 clientId 管理
不同任务用不同 clientId,避免互相挤占:
| 脚本 | clientId | 原因 |
|---|---|---|
rebalance.py | 11 | 月度任务 |
wheel_check.py | 12 | 周度任务 |
daily_report.py | 13 | 日度只读 |
earnings_scan.py | 14 | 事件触发 |
health_check.py | 99 | 健康探针,最高 ID 防冲突 |
clientId 相同时新连接会踢掉旧连接——多任务并发时血洗。
十、Cloud 替代方案(懂概念)
如果未来要把单机搬到 cloud:
10.1 AWS Lambda + EventBridge
EventBridge cron rule (每周一 9:30 ET)
│
▼
Lambda function (装好 ib_insync 的 layer)
│
▼
连接 IB Gateway (运行在 EC2 上)
│
▼
CloudWatch Logs / SNS 告警
问题:Lambda 不能跑 IB Gateway(无 GUI),必须额外 EC2 跑 GW,反而比本地复杂。
10.2 真正的 cloud 路径
- EC2 t3.small 跑 IB Gateway + cron + 脚本(约 $15/月)
- 快照每日备份
- CloudWatch alarms 接 SNS → 短信告警
但**<$5k 完全没必要**:电费 < $5/月,家里电脑本地跑性价比最高。Cloud 是规模到了之后(多账户 / 7×24 监控)才考虑。
十一、PM 视角:从 manual ops 到 autopilot
| 演进阶段 | 产品对应 | 量化对应 |
|---|---|---|
| 0. Manual | 客服手动处理工单 | 我每天看盘下单 |
| 1. SOP | 工单处理流程文档 | 策略 SOP 写清楚 |
| 2. Tool-assisted | 客服后台 + 脚本 | Jupyter notebook 半自动 |
| 3. Workflow | 工单系统 + 自动分派 | cron + 脚本 |
| 4. Autopilot | 系统自动决策 + 异常升级 | Airflow + 告警 + DLQ |
| 5. Self-healing | 自动修复 + AI 介入 | LLM + 监控 + 自动回滚 |
今天我们走的是第 3 步到第 4 步。前面 81 天都是在做第 1-3 步的准备。关键认知:
- 跳级失败率极高:没有第 2 步的 tooling 直接到第 4 步 = 自动化加速失败
- 每升一级需要的可观测性多 10 倍:自动驾驶意味着「人不在 loop 里」,那么仪表盘必须比有人时清晰 10 倍
- Manual 的价值不应该被低估:人在 loop 里的判断力是有价值的,自动化的目的是「让人去看更重要的」,不是「让人失业」
对应到产品:当你设计一个「自动审批」类功能,问自己——
- 失败时谁知道?
- 失败的损失谁兜底?
- 自动决策的边界在哪?
- 用户能不能 override?
Day 82 写的脚本里我都加了 AUTO_EXECUTE 开关、ALLOW_LIVE 护栏、Telegram 告警、DLQ——这些不是「锦上添花」,是从手动模式跨到自动模式的最低要求。少一个都不行。
十二、今日 checklist
按顺序执行:
- (1) 把 Phase 1-2 的 3 大策略列出来(rebalance / wheel / IC+earnings),写出每个的触发频率和预估执行时长
- (2) 装好
state_machine.py+retry.py+alerts.py+ib_session.py四个工具模块 - (3) 把 Day 60+ 写过的
rebalance.py/wheel_check.py重构成本笔记的骨架(加 state_machine + alerts) - (4) 注册 Telegram bot,拿到 TG_BOT_TOKEN 和 TG_CHAT_ID,写进
.env - (5) 写
health_check.py—— 简单 ping IB Gateway,每小时跑一次 - (6) 配置 cron(Linux/Mac)或 Task Scheduler(Win)
- (7) shadow 模式跑 1 周:所有脚本
AUTO_EXECUTE=false,只发告警不下单。一周后人工 review - (8) 在
docs/daily/TR_PROGRESS.md把 Day 82 标 ✅ - (9) 阅读 Airflow 官方文档前两章(30 分钟概念扫盲)
十三、明日预告
Day 83: 监控告警 — 当自动化系统出问题时
- Heartbeat / Health check 模式
- 异常检测:什么算「正常」?什么需要告警?
- 告警分级(P0 / P1 / P2)和送达渠道(Telegram / Email / 电话)
- Alert fatigue:怎么避免每天 50 条告警把自己淹死
- 监控指标:策略指标 vs 系统指标 vs 业务指标
- Grafana / Prometheus 入门(懂概念,单机用 SQLite + 简单 dashboard)
- 关键报警场景:margin 不足 / IB Gateway 挂了 / 单仓集中 / 回撤超阈
- PM 视角:从 incident 到 postmortem 的标准流程
Day 82+83 是 Phase 3 自动化的双胞胎——有自动化没监控 = 蒙眼裸奔,必须配对学。
实际执行记录
启动一项填一项,时间戳 + 卡点。
- [hh:mm] 三策略调度频率表整理完成 — ...
- [hh:mm]
state_machine.py实现 + 单元测试 — ... - [hh:mm] Telegram bot 注册、第一条告警跑通 — ...
- [hh:mm]
rebalance.py/wheel_check.py重构,加幂等 — ... - [hh:mm] cron / Task Scheduler 配置完成 — ...
- [hh:mm] shadow 模式连跑 3 天验证 — ...
- 卡点 / 学到的:
总字数:约 6,400 字 今日完成度:理论 ✓ / 实操(你自己执行)/ 笔记 ✓