返回交易笔记
TR Day 82

自动化执行 — IBKR API 定时跑

| 概念 | 解释 | 对应 cron 的差异 |

2026-07-30
Phase 3: 实盘+规模化+迁移
AutomationCronAirflowTaskSchedulerIBKRProductionDAG

日期: 2026-07-30 方向: Phase 3 / 自动化 阶段: Phase 3: 实盘+规模化+迁移 标签: #Automation #Cron #Airflow #TaskScheduler #IBKR #Production #DAG


今日目标

类型内容
学习三层自动化(cron / Task Scheduler / Airflow)的取舍、IBKR Gateway 自动重启的应对、幂等与状态机思路
实操把 Phase 1-2 三大策略(双因子月度 / Wheel 周度 / IC + LLM 事件触发)拆成可调度的 production 脚本
产出TR-DAY82 笔记 + rebalance.py / wheel_check.py / daily_report.py / state_machine.py 骨架 + cron 表

Phase 3 Week 12 终章前的「自动化收口」。Day 1-81 我们把三套策略跑通了,今天要回答的问题是:当我下个月忙到没空看盘时,这套系统能不能自己运行?


一、为什么 Day 82 才讲自动化(顺序的合理性)

很多人学量化时第一个想到的就是「写个 bot 让它自己跑」。这是反模式。理由:

  1. 没跑过手动版本就上自动化 = 把不确定性放大 N 倍:人在 loop 里至少能 catch 异常;移除人之后,bug 直接变成钱。
  2. 自动化前必须先有可观测性(Day 83 监控告警):你不能让一个看不见的东西替你下单。
  3. 自动化前必须先有幂等性:脚本任何一步失败重跑都不能爆仓。Day 1-81 我们把策略写成「拿到当前状态 → 计算目标状态 → 执行 diff」的模式,今天这套自动化才能站住。
阶段操作模式适用
Day 1-50手动执行(笔记本里跑 cell)学习阶段
Day 51-75半自动(脚本 + 人工审核 fill)实盘起步
Day 82+全自动(cron 触发,告警兜底)规模化

PM 视角的对应:产品从「人肉运营」到「自动化运营」的演进。先有 SOP,再有 workflow,最后有自驱动系统。不能跳级——你没写过 SOP 的流程是不可能跑自动化的,因为你都不知道异常该怎么处理。


二、自动化的总目标:把哪些动作搬走

Phase 1-2 三策略对自动化的需求频率完全不同,必须分开调度。

策略触发条件频率自动化必要性
双因子组合 rebalance每月 1 号开盘后月度 1 次中(手动也能做,但容易忘)
Wheel 仓位检查 + Roll周一开盘后每周 1 次高(Roll 决策时间敏感)
Iron Condor 财报触发财报前 N 天事件驱动极高(错过时间窗就废了)
LLM signal scan财报当天 / 重大新闻事件驱动极高
日报生成16:30 ET 收盘后每日 1 次中(看心情,但有 audit 价值)
周报周日晚 20:00每周 1 次低(PM 自己写更有价值)

核心认知:不是所有东西都要自动化。让 LLM 帮你想策略 / 让你自己拍板 / 让脚本执行——这是 <$5k 个人量化最舒服的人机配比。


三、三层自动化方案对比

3.1 三层方案

方案适合规模复杂度<$5k 推荐
L1本地 cron(Linux/Mac)个人单机✓✓
L2Windows Task SchedulerWindows 单机★★✓✓(Win 用户)
L3Apache Airflow多 DAG / 团队 / 复杂依赖★★★★✗(懂概念即可)

3.2 cron 的优势

  • 系统自带,零依赖
  • 配置一行解决
  • 失败一目了然(看 mail / syslog)
  • 用了 30+ 年的稳定性

3.3 Airflow 的优势(为什么大公司用)

  • DAG:一个任务依赖另一个,可视化
  • retry / SLA:失败自动重试,超 SLA 告警
  • 回填(backfill):错过的任务可以补跑
  • UI:哪个任务跑了多久,一目了然
  • scheduler 容错:服务挂了重启后能接着跑

3.4 关键决策

账户 <$5k & 单人 → cron / Task Scheduler
账户 $5k-$50k & 多策略 → cron 够用,但加好 logging
账户 $50k+ / 多人 / 跨市场 → Airflow / Prefect / Dagster

今天的实操路径:cron(如果在 Linux/Mac)或 Task Scheduler(如果在 Windows)。Airflow 第七章讲概念,不实操。


四、cron 语法速记

cron 表 5 字段:分 时 日 月 周

cron 表达式含义
0 9 * * 1每周一 09:00
30 9 1 * *每月 1 号 09:30
30 16 * * 1-5工作日 16:30
*/5 * * * *每 5 分钟
0 20 * * 0每周日 20:00
0 9-16 * * 1-5工作日每小时(9-16 点)

4.1 Phase 1-2 策略的 cron 表(美东时间 ET)

# === Phase 1-2 量化策略调度(机器时区配置为 America/New_York) ===
# 月度:双因子 rebalance(每月 1 号开盘 30 分钟后)
30 9 1 * *  /home/quant/venv/bin/python /home/quant/strategies/rebalance.py >> /var/log/quant/rebalance.log 2>&1

# 周度:Wheel 仓位检查 + Roll 决策(每周一 9:30)
30 9 * * 1  /home/quant/venv/bin/python /home/quant/strategies/wheel_check.py >> /var/log/quant/wheel.log 2>&1

# 日度:财报触发 + LLM signal scan(工作日 7:00 提前扫一遍当天财报)
0 7 * * 1-5  /home/quant/venv/bin/python /home/quant/strategies/earnings_scan.py >> /var/log/quant/earnings.log 2>&1

# 日度:日报生成(工作日 16:30 收盘后)
30 16 * * 1-5  /home/quant/venv/bin/python /home/quant/reports/daily_report.py >> /var/log/quant/daily.log 2>&1

# 周度:周报(每周日 20:00)
0 20 * * 0  /home/quant/venv/bin/python /home/quant/reports/weekly_report.py >> /var/log/quant/weekly.log 2>&1

# 健康检查:每小时 ping IB Gateway,确保还活着
0 * * * *  /home/quant/venv/bin/python /home/quant/ops/health_check.py >> /var/log/quant/health.log 2>&1

4.2 关键坑

解释解决
时区:cron 默认用机器时区美股交易要的是 ET,机器可能是 UTC在脚本头 os.environ['TZ'] = 'America/New_York' 或服务器装 ET 时区
PATH 不全:cron 环境跟 shell 不一样pip / python 找不到用绝对路径调 venv 里的 python
% 在 cron 里是特殊字符用了会被当 newline\% 转义
stderr 不重定向就丢了报错看不到永远加 2>&1
失败没人知道cron 默认只发 local mail在脚本里加 Telegram/邮件告警(见第八章)

五、Windows Task Scheduler

Windows 用户的对应方案。GUI 比 cron 易用,但坑也不少。

5.1 创建任务(GUI 流程)

任务计划程序 → 创建任务(不是「创建基本任务」,那个功能太弱)

[常规]:
  - 名称:Quant_Wheel_Check
  - ☑ 不管用户是否登录都要运行
  - ☑ 使用最高权限运行

[触发器]:
  - 每周一 9:30 ET(注意时区!Win 不支持时区,需要在脚本里转)

[操作]:
  - 启动程序
  - 程序:C:\quant\venv\Scripts\python.exe
  - 参数:C:\quant\strategies\wheel_check.py
  - 起始于:C:\quant\strategies\

[条件]:
  - ☐ 只在交流电源时启动(笔记本党必须取消勾选)
  - ☑ 唤醒计算机运行此任务  ← 关键!

[设置]:
  - ☑ 允许按需运行
  - ☑ 如果任务失败,按以下频率重试:1 分钟,3 次
  - ☑ 如果运行时间超过 30 分钟,则停止任务

5.2 Windows 特殊坑

解决
睡眠 / 休眠时任务不跑勾选「唤醒计算机运行此任务」,并在电源选项里允许唤醒计时器
没有时区支持在脚本内部用 pytz / zoneinfo 校验当前 ET 时间是不是预期窗口
PowerShell 执行策略阻止python.exe 直接调 .py,不要套 .ps1 wrapper
任务历史默认关闭任务计划程序 → 启用所有任务历史,否则失败查不到
UAC 弹窗阻塞勾选「使用最高权限运行」+ 用户登录态运行

5.3 Linux 子系统(WSL)方案

如果 Windows 主机想用 cron 语法,可以装 WSL2 + Ubuntu,里面跑 cron。但注意:WSL 关闭时 cron 不跑,需要把 WSL 设成开机启动 + 不自动休眠。综合下来不如直接用 Task Scheduler。


六、Airflow 入门(懂概念,不实操)

<$5k 不需要 Airflow,但面试 / 简历 / 未来扩展都需要会讲。

6.1 核心概念

概念解释对应 cron 的差异
DAGDirected Acyclic Graph,一组任务 + 依赖关系cron 是孤立任务,DAG 是任务图
TaskDAG 中的单个步骤对应 cron 一行
Operator任务的执行器(PythonOperator / BashOperator / ...)cron 只能 shell
Scheduler决定什么时候跑什么cron 守护进程
Executor怎么跑(Local / Celery / Kubernetes)cron 没这层
SLA任务超时阈值,超了告警cron 无
Retry失败自动重试 N 次cron 无(需要脚本内实现)
Backfill历史日期补跑cron 无
XCom任务间传递数据cron 无(要走文件 / DB)

6.2 一个 Airflow DAG 长什么样

# dags/wheel_strategy.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'quant',
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'sla': timedelta(minutes=30),
    'email_on_failure': True,
    'email': ['info@veloxwallet.com'],
}

with DAG(
    'wheel_weekly',
    default_args=default_args,
    schedule_interval='30 9 * * 1',          # 每周一 9:30
    start_date=datetime(2026, 7, 1),
    catchup=False,
    tags=['quant', 'wheel'],
) as dag:

    check_ib_gateway = PythonOperator(
        task_id='check_ib_gateway',
        python_callable=lambda: __import__('ops.health').check_gw(),
    )

    fetch_positions = PythonOperator(
        task_id='fetch_positions',
        python_callable=lambda: __import__('strategies.wheel').fetch(),
    )

    compute_rolls = PythonOperator(
        task_id='compute_rolls',
        python_callable=lambda: __import__('strategies.wheel').compute(),
    )

    execute_rolls = PythonOperator(
        task_id='execute_rolls',
        python_callable=lambda: __import__('strategies.wheel').execute(),
    )

    send_report = PythonOperator(
        task_id='send_report',
        python_callable=lambda: __import__('reports').wheel_report(),
    )

    check_ib_gateway >> fetch_positions >> compute_rolls >> execute_rolls >> send_report

这是 cron 给不了的check_ib_gateway 挂了,下游全部不执行,UI 里红一片,自动告警。cron 写出来需要在每个脚本里手写依赖检查,难维护。

6.3 什么时候真的要上 Airflow

  • 任务数 > 20 个
  • 任务之间有复杂依赖
  • 需要 backfill(比如想重跑过去 6 个月每天的因子计算)
  • 需要团队协作(Airflow UI 大家都能看)
  • 不要为「显得专业」而上 Airflow——单 DAG + 单机 cron 也能跑生产,少了几十 GB 的部署复杂度。

七、Production 脚本骨架(4 个文件)

7.1 state_machine.py:共用的防重复执行机制

# ops/state_machine.py
"""幂等性兜底:每个任务实例只跑一次。
失败可重跑,成功不重跑。"""
import json, hashlib, os, fcntl, datetime as dt
from pathlib import Path

STATE_DIR = Path("/var/lib/quant/state")
STATE_DIR.mkdir(parents=True, exist_ok=True)

class TaskState:
    PENDING = "pending"
    RUNNING = "running"
    SUCCESS = "success"
    FAILED  = "failed"

class TaskRun:
    def __init__(self, task_name: str, run_date: dt.date | None = None):
        self.task_name = task_name
        self.run_date = run_date or dt.date.today()
        self.key = hashlib.md5(f"{task_name}:{self.run_date}".encode()).hexdigest()
        self.path = STATE_DIR / f"{task_name}_{self.run_date}.json"

    def acquire(self) -> bool:
        """返回 True 表示拿到锁、可以执行;False 表示已经跑过 / 正在跑。"""
        if self.path.exists():
            state = json.loads(self.path.read_text())
            if state["status"] == TaskState.SUCCESS:
                print(f"[{self.task_name}] already SUCCESS on {self.run_date}, skip")
                return False
            if state["status"] == TaskState.RUNNING:
                started = dt.datetime.fromisoformat(state["started_at"])
                if (dt.datetime.now() - started).total_seconds() < 3600:
                    print(f"[{self.task_name}] still RUNNING (started {started}), skip")
                    return False
                # 超过 1 小时算挂了,允许重跑
        self._write(TaskState.RUNNING, started_at=dt.datetime.now().isoformat())
        return True

    def succeed(self, payload: dict | None = None):
        self._write(TaskState.SUCCESS, finished_at=dt.datetime.now().isoformat(),
                    payload=payload or {})

    def fail(self, err: str):
        self._write(TaskState.FAILED, finished_at=dt.datetime.now().isoformat(),
                    error=err)

    def _write(self, status: str, **kw):
        data = {"task": self.task_name, "date": str(self.run_date), "status": status, **kw}
        self.path.write_text(json.dumps(data, indent=2))

这一层就是「production 思维」的核心:任何任务的状态都落盘,可重启、可审计、可手动干预。

7.2 rebalance.py:月度双因子组合再平衡

# strategies/rebalance.py
"""月度双因子组合 rebalance。每月 1 号开盘后跑一次。
幂等:同一月份重跑不会重复下单。"""
from ib_insync import IB
from ops.state_machine import TaskRun
from ops.alerts import notify, alert_fail
from ops.ib_session import connect_with_retry
import datetime as dt, traceback

TASK = "monthly_rebalance"

def main():
    run = TaskRun(TASK, run_date=dt.date.today().replace(day=1))
    if not run.acquire():
        return

    ib = None
    try:
        ib = connect_with_retry(client_id=11)
        positions = ib.positions()
        targets = compute_targets(ib)              # 双因子打分 → 目标权重
        orders  = diff_to_orders(positions, targets)

        # 全部用 LMT 而不是 MKT,避开开盘 5 分钟的高波动
        trades = []
        for o in orders:
            t = ib.placeOrder(o.contract, o.order)
            trades.append(t)
        ib.sleep(60)

        filled = [t for t in trades if t.orderStatus.status == "Filled"]
        partial = [t for t in trades if t.orderStatus.status == "Submitted"]

        notify(f"[REBALANCE] filled={len(filled)} partial={len(partial)}")
        run.succeed(payload={"filled": len(filled), "partial": len(partial)})

    except Exception as e:
        err = traceback.format_exc()
        alert_fail(TASK, err)
        run.fail(err)
        raise
    finally:
        if ib and ib.isConnected():
            ib.disconnect()

if __name__ == "__main__":
    main()

7.3 wheel_check.py:周度 Wheel 检查

# strategies/wheel_check.py
"""每周一开盘 30 分钟后检查 Wheel 仓位。
1) 列出所有当前 short put / covered call
2) 对快到期的(<7DTE)/被深度 ITM 的算 Roll 候选
3) 输出建议 → 看 AUTO_EXECUTE 决定是否直接下单"""
from ops.state_machine import TaskRun
from ops.ib_session import connect_with_retry
from ops.alerts import notify, alert_fail
import os, datetime as dt, traceback

TASK = "wheel_weekly_check"
AUTO_EXECUTE = os.environ.get("WHEEL_AUTO_EXECUTE", "false").lower() == "true"

def main():
    run = TaskRun(TASK)
    if not run.acquire():
        return
    ib = None
    try:
        ib = connect_with_retry(client_id=12)
        opt_positions = [p for p in ib.positions() if p.contract.secType == "OPT"]

        roll_candidates = []
        for p in opt_positions:
            dte = days_to_expiry(p.contract.lastTradeDateOrContractMonth)
            moneyness = compute_moneyness(ib, p.contract)
            if dte < 7 or moneyness < -0.05:       # ITM 5% 或 7 天内到期
                roll_candidates.append((p, dte, moneyness))

        if not roll_candidates:
            notify("[WHEEL] no roll candidates this week.")
            run.succeed(payload={"candidates": 0})
            return

        suggestions = [build_roll_suggestion(ib, p) for p, _, _ in roll_candidates]
        msg = format_wheel_report(suggestions)
        notify(msg)

        if AUTO_EXECUTE:
            for s in suggestions:
                ib.placeOrder(s.contract, s.combo_order)
            ib.sleep(30)

        run.succeed(payload={"candidates": len(roll_candidates),
                             "auto_executed": AUTO_EXECUTE})

    except Exception as e:
        err = traceback.format_exc()
        alert_fail(TASK, err)
        run.fail(err)
        raise
    finally:
        if ib and ib.isConnected():
            ib.disconnect()

if __name__ == "__main__":
    main()

关键设计AUTO_EXECUTE 默认 false。即便我把 cron 设好,初期 4 周脚本只发 Telegram 建议,我手动确认再开 AUTO_EXECUTE=true这是 production deployment 的标准做法——shadow 模式先跑一段,再切实流量。

7.4 daily_report.py:每日日报

# reports/daily_report.py
"""每个交易日 16:30 ET 生成日报:
- 当日 PnL(realized + unrealized)
- 持仓 Greeks 暴露
- 当日订单成交摘要
- 异常 flag(回撤超阈 / 单仓集中 / margin 紧张)"""
from ops.state_machine import TaskRun
from ops.ib_session import connect_with_retry
from ops.alerts import notify_html
import datetime as dt, traceback

TASK = "daily_report"

def main():
    run = TaskRun(TASK)
    if not run.acquire():
        return
    ib = None
    try:
        ib = connect_with_retry(client_id=13)
        account = ib.accountSummary()
        positions = ib.positions()
        executions = ib.executions()

        report = {
            "date": str(dt.date.today()),
            "nav": account_value(account, "NetLiquidation"),
            "daily_pnl": account_value(account, "DailyPnL"),
            "gross_exposure": sum(abs(p.marketValue) for p in positions),
            "greeks": aggregate_greeks(ib, positions),
            "executions": len(executions),
            "alerts": check_alerts(account, positions),
        }
        html = render_html_report(report)
        notify_html("Daily Report", html)
        run.succeed(payload=report)
    except Exception as e:
        alert_fail(TASK, traceback.format_exc())
        run.fail(str(e))
        raise
    finally:
        if ib and ib.isConnected():
            ib.disconnect()

if __name__ == "__main__":
    main()

八、错误处理:retry / 告警 / DLQ

8.1 三层失败处理

任务失败
    │
    ▼
1) 自动 retry 3 次(指数 backoff:30s, 2min, 5min)
    │
    ├── 成功 → 继续
    └── 仍失败
            │
            ▼
2) Telegram + Email 告警
    │
    ▼
3) 写入 dead letter queue(本地 SQLite)等待人工处理

8.2 retry 包装器

# ops/retry.py
import time, functools, traceback

def retry(times=3, delays=(30, 120, 300), exceptions=(Exception,)):
    def deco(fn):
        @functools.wraps(fn)
        def wrapper(*a, **k):
            last = None
            for i in range(times):
                try:
                    return fn(*a, **k)
                except exceptions as e:
                    last = e
                    if i < len(delays):
                        print(f"[retry] attempt {i+1} failed: {e}; sleep {delays[i]}s")
                        time.sleep(delays[i])
            raise last
        return wrapper
    return deco

8.3 Telegram 告警

# ops/alerts.py
import os, requests, datetime as dt

BOT = os.environ["TG_BOT_TOKEN"]
CHAT = os.environ["TG_CHAT_ID"]

def notify(msg: str):
    requests.post(
        f"https://api.telegram.org/bot{BOT}/sendMessage",
        json={"chat_id": CHAT, "text": f"{dt.datetime.now():%H:%M} {msg}"},
        timeout=10,
    )

def alert_fail(task: str, err: str):
    notify(f"🚨 [FAIL] {task}\n{err[:500]}")

8.4 Dead Letter Queue

任务最终失败之后不要丢掉——写入本地 SQLite,每周日做一次复盘:

# ops/dlq.py
import sqlite3, json, datetime as dt

def push_dlq(task: str, payload: dict, err: str):
    con = sqlite3.connect("/var/lib/quant/dlq.db")
    con.execute("""
        CREATE TABLE IF NOT EXISTS dlq(
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            task TEXT, payload TEXT, err TEXT, created_at TEXT
        )
    """)
    con.execute("INSERT INTO dlq(task,payload,err,created_at) VALUES (?,?,?,?)",
                (task, json.dumps(payload), err, dt.datetime.now().isoformat()))
    con.commit()

这是产品思维迁移:用户提交订单失败不能就消失,必须进可观测的「失败队列」。


九、IBKR API 特殊考虑

9.1 IB Gateway 每日重启

重启时间影响
凌晨 ~04:00 ET(每日自动 reboot)所有 client 连接断开,需要重新登录
周日完整维护系统服务全部下线 ~6 小时
周二重启(TWS)用 IB Gateway 不受影响

应对:所有调度任务都不能假设「连接还在」。每次跑都从 connect_with_retry 开始。

9.2 auto-reconnect 包装

# ops/ib_session.py
from ib_insync import IB
import time, os

GW_HOST = os.environ.get("IB_HOST", "127.0.0.1")
GW_PORT = int(os.environ.get("IB_PORT", "4002"))   # Paper Gateway

def connect_with_retry(client_id: int, max_attempts: int = 5) -> IB:
    ib = IB()
    last = None
    for i in range(max_attempts):
        try:
            ib.connect(GW_HOST, GW_PORT, clientId=client_id, timeout=20)
            if ib.isConnected():
                # 端口护栏:Paper 必须是 4002,Live 4001
                assert ib.client.port in (4001, 4002), "wrong port"
                if os.environ.get("ALLOW_LIVE") != "true":
                    assert ib.client.port == 4002, "LIVE port not allowed without ALLOW_LIVE=true"
                return ib
        except Exception as e:
            last = e
            print(f"[ib_session] attempt {i+1} failed: {e}")
            time.sleep(15 * (i + 1))
    raise RuntimeError(f"Cannot connect to IB Gateway after {max_attempts} attempts: {last}")

注意护栏两层

  1. 端口必须是 IBKR 已知端口;
  2. 实盘端口(4001)必须配合 ALLOW_LIVE=true 环境变量才允许,否则脚本拒绝执行。这是 Day 1 那条 assert ib.client.port == 4002 原则的工程化版本。

9.3 clientId 管理

不同任务用不同 clientId,避免互相挤占:

脚本clientId原因
rebalance.py11月度任务
wheel_check.py12周度任务
daily_report.py13日度只读
earnings_scan.py14事件触发
health_check.py99健康探针,最高 ID 防冲突

clientId 相同时新连接会踢掉旧连接——多任务并发时血洗


十、Cloud 替代方案(懂概念)

如果未来要把单机搬到 cloud:

10.1 AWS Lambda + EventBridge

EventBridge cron rule (每周一 9:30 ET)
    │
    ▼
Lambda function (装好 ib_insync 的 layer)
    │
    ▼
连接 IB Gateway (运行在 EC2 上)
    │
    ▼
CloudWatch Logs / SNS 告警

问题:Lambda 不能跑 IB Gateway(无 GUI),必须额外 EC2 跑 GW,反而比本地复杂。

10.2 真正的 cloud 路径

  • EC2 t3.small 跑 IB Gateway + cron + 脚本(约 $15/月)
  • 快照每日备份
  • CloudWatch alarms 接 SNS → 短信告警

但**<$5k 完全没必要**:电费 < $5/月,家里电脑本地跑性价比最高。Cloud 是规模到了之后(多账户 / 7×24 监控)才考虑。


十一、PM 视角:从 manual ops 到 autopilot

演进阶段产品对应量化对应
0. Manual客服手动处理工单我每天看盘下单
1. SOP工单处理流程文档策略 SOP 写清楚
2. Tool-assisted客服后台 + 脚本Jupyter notebook 半自动
3. Workflow工单系统 + 自动分派cron + 脚本
4. Autopilot系统自动决策 + 异常升级Airflow + 告警 + DLQ
5. Self-healing自动修复 + AI 介入LLM + 监控 + 自动回滚

今天我们走的是第 3 步到第 4 步。前面 81 天都是在做第 1-3 步的准备。关键认知

  1. 跳级失败率极高:没有第 2 步的 tooling 直接到第 4 步 = 自动化加速失败
  2. 每升一级需要的可观测性多 10 倍:自动驾驶意味着「人不在 loop 里」,那么仪表盘必须比有人时清晰 10 倍
  3. Manual 的价值不应该被低估:人在 loop 里的判断力是有价值的,自动化的目的是「让人去看更重要的」,不是「让人失业」

对应到产品:当你设计一个「自动审批」类功能,问自己——

  • 失败时谁知道?
  • 失败的损失谁兜底?
  • 自动决策的边界在哪?
  • 用户能不能 override?

Day 82 写的脚本里我都加了 AUTO_EXECUTE 开关、ALLOW_LIVE 护栏、Telegram 告警、DLQ——这些不是「锦上添花」,是从手动模式跨到自动模式的最低要求。少一个都不行。


十二、今日 checklist

按顺序执行:

  • (1) 把 Phase 1-2 的 3 大策略列出来(rebalance / wheel / IC+earnings),写出每个的触发频率预估执行时长
  • (2) 装好 state_machine.py + retry.py + alerts.py + ib_session.py 四个工具模块
  • (3) 把 Day 60+ 写过的 rebalance.py / wheel_check.py 重构成本笔记的骨架(加 state_machine + alerts)
  • (4) 注册 Telegram bot,拿到 TG_BOT_TOKEN 和 TG_CHAT_ID,写进 .env
  • (5)health_check.py —— 简单 ping IB Gateway,每小时跑一次
  • (6) 配置 cron(Linux/Mac)或 Task Scheduler(Win)
  • (7) shadow 模式跑 1 周:所有脚本 AUTO_EXECUTE=false,只发告警不下单。一周后人工 review
  • (8)docs/daily/TR_PROGRESS.md 把 Day 82 标 ✅
  • (9) 阅读 Airflow 官方文档前两章(30 分钟概念扫盲)

十三、明日预告

Day 83: 监控告警 — 当自动化系统出问题时

  • Heartbeat / Health check 模式
  • 异常检测:什么算「正常」?什么需要告警?
  • 告警分级(P0 / P1 / P2)和送达渠道(Telegram / Email / 电话)
  • Alert fatigue:怎么避免每天 50 条告警把自己淹死
  • 监控指标:策略指标 vs 系统指标 vs 业务指标
  • Grafana / Prometheus 入门(懂概念,单机用 SQLite + 简单 dashboard)
  • 关键报警场景:margin 不足 / IB Gateway 挂了 / 单仓集中 / 回撤超阈
  • PM 视角:从 incident 到 postmortem 的标准流程

Day 82+83 是 Phase 3 自动化的双胞胎——有自动化没监控 = 蒙眼裸奔,必须配对学。


实际执行记录

启动一项填一项,时间戳 + 卡点。

  • [hh:mm] 三策略调度频率表整理完成 — ...
  • [hh:mm] state_machine.py 实现 + 单元测试 — ...
  • [hh:mm] Telegram bot 注册、第一条告警跑通 — ...
  • [hh:mm] rebalance.py / wheel_check.py 重构,加幂等 — ...
  • [hh:mm] cron / Task Scheduler 配置完成 — ...
  • [hh:mm] shadow 模式连跑 3 天验证 — ...
  • 卡点 / 学到的:

总字数:约 6,400 字 今日完成度:理论 ✓ / 实操(你自己执行)/ 笔记 ✓