返回交易笔记
TR Day 83

监控告警系统 — 异常 PnL / Greeks 越界 / 网络断

production 监控告警体系、P0/P1/P2 分级方法、告警去重与抑制、SRE 视角下的可观测性

2026-07-31
Phase 3: 实盘+规模化+迁移
MonitoringAlertsTelegramDashboardSREIBGatewayProductionGrade

日期: 2026-07-31 方向: Phase 3 / 监控告警 阶段: Phase 3: 实盘+规模化+迁移 标签: #Monitoring #Alerts #Telegram #Dashboard #SRE #IBGateway #ProductionGrade


今日目标

类型内容
学习production 监控告警体系、P0/P1/P2 分级方法、告警去重与抑制、SRE 视角下的可观测性
实操Telegram bot 告警通道、Streamlit 实时 dashboard、IB Gateway 自动重连守护进程
产出TR-DAY83 笔记 + alerts.py + dashboard.py + gateway_watchdog.py + P0/P1 自检清单

一、为什么 Day 83 必须做监控告警:从「能跑」到「production」的那道分水岭

Day 1-60 我们把策略跑起来了,Day 61-82 我们把仓位规模做上去了,但有一个问题一直没认真处理:当我不在屏幕前时,系统在干什么?

10 年金融业经验告诉我一件事:生产事故从来不是策略本身的问题,而是「事故发生了 4 小时没人知道」的问题。2025 年 4 月某个周六凌晨我 IB Gateway 断了 11 小时——策略没出问题,因为根本没下成单;但如果那一晚是头寸需要平仓的 earnings 前夜呢?

监控告警的目的不是阻止异常发生(异常一定会发生),而是:

  1. 缩短 MTTD(Mean Time To Detect):从异常发生到我知道的时间
  2. 缩短 MTTR(Mean Time To Respond):从我知道到处理完成的时间
  3. 建立 audit trail:事后复盘有完整时间线
  4. 区分信号与噪声:避免「狼来了」让真正的 P0 被忽略

这套思路跟 SRE / DevOps 完全同构——量化交易系统本质就是一个 24/7 跑的生产服务,它的「SLO」不是 99.9% uptime,而是「关键决策点上不能漏单 + 不能错单 + 异常 30 分钟内有人响应」。


二、三级告警模型:抄 PagerDuty 的家底

散户最常见的错误是把所有告警当 P0:策略一回撤就 push,结果一周后开始忽略所有通知——心理学上叫「告警疲劳」(alert fatigue)。告警分级的本质是把「响应承诺」显式化

2.1 三级定义

级别含义响应 SLO通道触发示例
P0紧急 / 资金安全 / 系统失能5 分钟内响应Telegram + SMS + 电话账户冻结 / 单日亏损 >5% NAV / Gateway 持续断开 >15 分钟
P1重要 / 风险接近边界30 分钟内响应Telegram + Email单仓位 -10% / Net Delta 超限 / VIX >30 / 数据延迟 >5min
P2信息 / 日常监控24 小时内查看Email / Slack 日报日 P&L 报告 / 仓位变化总结 / 月度复盘

2.2 区分 P0 与 P1 的硬规则

我自己用的判定问句:「如果我现在在睡觉 / 开会 / 上厕所,这件事能等 30 分钟吗?

  • 能等 → P1
  • 不能等 → P0
  • 我都不在乎 → P2

反例:「策略今日 P&L = -2%」不是 P0,因为 -2% 在我们的 Sharpe 框架内是正常波动(年化 sigma 20%, 日 sigma ≈ 1.3%, 2σ 内)。把这个发 P0 就会让我下次真正 -5% 的告警被我下意识 swipe 掉。

2.3 P0 的资金线:必须用「绝对损失」而非「相对回撤」

P0 触发条件(任一):
  - 单日 P&L < -3% NAV         # 我们日 sigma 1.3%, -3% 是 2.3σ
  - 累计 drawdown > 10% NAV    # 提前 stop-trading 阈值
  - 单仓位 P&L < -$1,500       # 绝对金额,避免「小仓位 -50%」噪声
  - IB Gateway 断开 > 15 min   # 流动性事件期间致命
  - 账户被 risk margin call    # IBKR 主动通知
  - 异常委托:单笔 > $20k notional

三、监控指标全景:四大类 12 个核心 metrics

3.1 P&L 类

指标计算告警阈值级别
Daily P&Lrealized + unrealized - prev_day-3% NAVP0
Weekly P&L7 日累计-7% NAVP1
Drawdown from peak(NAV - peak) / peak-10%P0
单仓位 P&Lper-position MTM-10% 或 -$1,500P1

3.2 Greeks 类(期权专属)

指标含义告警阈值级别
Net Delta净方向暴露abs > 0.3 × NAV / $SPYP1
Net Vega净波动率暴露abs > 0.5% × NAVP1
Net Theta净时间衰减单日 < -0.5% NAVP2
Gamma 集中度单 strike gamma> 30% of totalP1

3.3 流动性 / 市场结构类

指标含义告警阈值级别
Bid-ask spread持仓品种 spread> 3σ of 30-day medianP1
Volume持仓品种成交量< 20% of 30-day medianP1
VIX level大盘隐波> 30P1
Funding rate期货持仓基差abs > 3σP1

3.4 基础设施类

指标含义告警阈值级别
IB Gateway 连接ib.isConnected()断开 > 5 minP1
IB Gateway 连接ib.isConnected()断开 > 15 minP0
API 响应延迟reqMktData round-trip> 2 秒P1
数据延迟last tick 时间 vs now> 60 秒(开盘时段)P1
磁盘空间log + data 目录< 10% freeP1

四、告警通道选型:成本 / 可达性 / 友好度三角

通道成本可达性富文本适用级别我的取舍
Telegram bot免费高(push notification)✓ 图片/markdownP0 / P1主通道
Email免费中(容易被忽略)✓ HTMLP2 / 备份次通道
SMS (Twilio)$0.0075/条最高(手机原生)✗ 纯文本P0 onlyP0 用
电话 (Twilio Voice)$0.013/分钟最高(必须接)✗ TTSP0 关键资金 >$50k 后启用
Slack / Discord免费✓ 富文本团队场景团队协作时用
Push 推送(Pushover)$5 一次性简单P0 / P1Telegram 替代

4.1 为什么 Telegram 是性价比之王

  • 完全免费、无每月配额
  • bot API 极简:发 message 一行 HTTP POST
  • 支持图片(可以推 P&L 曲线截图)
  • 在手机锁屏可见、有声音
  • 桌面 / 手机多端同步
  • 国内可达(用 VPN 也行)

4.2 为什么必须有 SMS 备份

Telegram 依赖网络 + 应用进程 + 通知权限。任何一个挂了 P0 就漏掉。SMS 走运营商基础设施,可达性是最后一道防线。

资金小(<$10k)时可以省 Twilio 费用,资金 >$50k 后这点钱必须花——一次漏掉的 P0 远超一年的 SMS 成本。


五、Telegram bot 完整实现

5.1 创建 bot

1. Telegram 找 @BotFather → /newbot → 取名 → 拿到 BOT_TOKEN
2. /setprivacy → Disable(让 bot 能收到 group 消息,可选)
3. 把 bot 加进一个只有自己的 group → 拿到 CHAT_ID(用 @userinfobot)
4. 存到环境变量:
   export TG_BOT_TOKEN=123456:ABC...
   export TG_CHAT_ID=-100123456789

5.2 alerts.py 核心代码

# alerts.py
import os
import time
import logging
import hashlib
from enum import Enum
from typing import Optional
from dataclasses import dataclass, field
from datetime import datetime, timedelta

import requests

log = logging.getLogger(__name__)

class Level(Enum):
    P0 = "P0"  # 紧急
    P1 = "P1"  # 重要
    P2 = "P2"  # 信息

@dataclass
class AlertManager:
    bot_token: str = field(default_factory=lambda: os.getenv("TG_BOT_TOKEN", ""))
    chat_id: str = field(default_factory=lambda: os.getenv("TG_CHAT_ID", ""))
    # dedup: (level, key) -> last_sent_time
    _last_sent: dict = field(default_factory=dict)
    # throttle window per level
    _throttle: dict = field(default_factory=lambda: {
        Level.P0: timedelta(minutes=5),    # P0 5 分钟同 key 只 1 次
        Level.P1: timedelta(minutes=30),   # P1 30 分钟
        Level.P2: timedelta(hours=24),     # P2 一天
    })

    def _dedup_key(self, level: Level, message: str) -> str:
        # 取 message 前 60 字符做 key(够区分不同事件,不至于把数字变化也当新事件)
        h = hashlib.md5(message[:60].encode()).hexdigest()[:8]
        return f"{level.value}:{h}"

    def _should_send(self, level: Level, message: str) -> bool:
        key = self._dedup_key(level, message)
        last = self._last_sent.get(key)
        window = self._throttle[level]
        if last and datetime.now() - last < window:
            return False
        self._last_sent[key] = datetime.now()
        return True

    def send(self, level: Level, title: str, body: str,
             image_path: Optional[str] = None, force: bool = False) -> bool:
        message = f"{title}\n{body}"
        if not force and not self._should_send(level, message):
            log.info(f"alert throttled: {title}")
            return False

        emoji = {"P0": "🚨", "P1": "⚠️", "P2": "ℹ️"}[level.value]
        ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        text = f"{emoji} *{level.value}* — {title}\n```\n{body}\n```\n_{ts}_"

        try:
            if image_path:
                with open(image_path, "rb") as f:
                    r = requests.post(
                        f"https://api.telegram.org/bot{self.bot_token}/sendPhoto",
                        data={"chat_id": self.chat_id, "caption": text,
                              "parse_mode": "Markdown"},
                        files={"photo": f}, timeout=10,
                    )
            else:
                r = requests.post(
                    f"https://api.telegram.org/bot{self.bot_token}/sendMessage",
                    data={"chat_id": self.chat_id, "text": text,
                          "parse_mode": "Markdown"}, timeout=10,
                )
            r.raise_for_status()
            return True
        except Exception as e:
            log.error(f"telegram send failed: {e}")
            # P0 失败 → 退化到 SMS
            if level == Level.P0:
                self._fallback_sms(title, body)
            return False

    def _fallback_sms(self, title: str, body: str):
        """P0 Telegram 失败时走 SMS(Twilio)。"""
        from twilio.rest import Client
        sid, token = os.getenv("TW_SID"), os.getenv("TW_TOKEN")
        if not sid:
            log.critical(f"SMS fallback unavailable! {title}")
            return
        client = Client(sid, token)
        client.messages.create(
            body=f"[P0] {title}\n{body[:140]}",
            from_=os.getenv("TW_FROM"),
            to=os.getenv("TW_TO"),
        )

    def clear(self, level: Level, title: str):
        """异常恢复时发 all-clear。"""
        body = "状态已恢复正常"
        self.send(level, f"[CLEARED] {title}", body, force=True)

# 单例
alerter = AlertManager()

5.3 调用示例

from alerts import alerter, Level

# P0 — 单日亏损越线
if daily_pnl_pct < -0.03:
    alerter.send(Level.P0,
        title="单日亏损越限",
        body=f"Daily P&L: {daily_pnl_pct:.2%}\nNAV: ${nav:,.0f}\n"
             f"主要拖累: {worst_position}")

# P1 — Net Delta 越界
if abs(net_delta) > 0.3 * nav / spy_price:
    alerter.send(Level.P1,
        title="Net Delta 越界",
        body=f"Net Delta: {net_delta:.2f}\nLimit: {0.3 * nav / spy_price:.2f}")

六、告警去重与抑制:避免「告警海啸」

6.1 去重(dedup)

同一事件触发 N 次只发 1 次。我的做法:

  • Key 化:把 message 前 60 字符 hash 作为 key(变化的数字不会算新告警)
  • 滑动窗口:P0 = 5 min,P1 = 30 min,P2 = 24 h
  • 强制 override:恢复通知用 force=True 跳过 dedup

6.2 抑制(suppression)

特定时段 / 状态主动关闭某些告警:

def is_market_closed():
    """周末 + 美东 16:00-09:30 之间是闭市,闭市数据空洞不报警。"""
    now = datetime.now(tz=pytz.timezone("US/Eastern"))
    if now.weekday() >= 5:  # Sat / Sun
        return True
    if now.hour < 9 or (now.hour == 9 and now.minute < 30):
        return True
    if now.hour >= 16:
        return True
    return False

def is_holiday():
    return today() in NYSE_HOLIDAYS_2026

# 在监控循环里
if metric_stale():
    if is_market_closed() or is_holiday():
        log.debug("market closed, suppress stale-data alert")
        continue
    alerter.send(Level.P1, "数据陈旧", ...)

6.3 恢复通知(all-clear)

异常恢复时主动发:

class StatefulMonitor:
    def __init__(self):
        self.active_alerts = {}  # key -> Level

    def check(self, key: str, level: Level, title: str, body: str, triggered: bool):
        was_active = key in self.active_alerts
        if triggered and not was_active:
            alerter.send(level, title, body)
            self.active_alerts[key] = level
        elif not triggered and was_active:
            alerter.clear(self.active_alerts[key], title)
            del self.active_alerts[key]

七、Streamlit 实时 Dashboard

7.1 设计原则

  • localhost onlystreamlit run dashboard.py --server.address 127.0.0.1
  • 不要暴露公网(涉及 NAV、API key、仓位)
  • 关键指标 above the fold(不滚动可见)
  • 颜色:绿 = ok / 黄 = P1 / 红 = P0
  • 24/7 accessible:systemd service 守护

7.2 dashboard.py 骨架

# dashboard.py — streamlit run dashboard.py --server.port 8080 --server.address 127.0.0.1
import time
import streamlit as st
import pandas as pd
import plotly.graph_objects as go
from datetime import datetime

from portfolio import get_account_summary, get_positions, get_greeks
from pnl import load_pnl_history
from alerts_log import load_recent_alerts

st.set_page_config(page_title="TR Monitor", layout="wide", page_icon="📊")

# Auto-refresh every 30 sec
st_autorefresh = st.empty()
REFRESH = 30  # seconds

# Top metrics row
acct = get_account_summary()
col1, col2, col3, col4, col5 = st.columns(5)
col1.metric("NAV", f"${acct['nav']:,.0f}", f"{acct['daily_pnl_pct']:+.2%}")
col2.metric("Cash", f"${acct['cash']:,.0f}")
col3.metric("Buying Power", f"${acct['bp']:,.0f}")
col4.metric("Margin Used", f"{acct['margin_used_pct']:.1%}")
col5.metric("Drawdown", f"{acct['dd_pct']:.1%}",
            delta_color="inverse")

# Health status
st.subheader("System Health")
hcols = st.columns(4)
gateway_ok = acct["gateway_connected"]
hcols[0].markdown(f"**IB Gateway**: {'🟢' if gateway_ok else '🔴'}")
hcols[1].markdown(f"**Last Tick**: {acct['last_tick_age_s']}s ago")
hcols[2].markdown(f"**API Latency**: {acct['api_latency_ms']}ms")
hcols[3].markdown(f"**VIX**: {acct['vix']:.1f}")

# Greeks
st.subheader("Greeks")
greeks = get_greeks()
gcols = st.columns(4)
gcols[0].metric("Net Delta", f"{greeks['delta']:+.2f}")
gcols[1].metric("Net Vega", f"${greeks['vega']:+.0f}")
gcols[2].metric("Net Theta", f"${greeks['theta']:+.0f}")
gcols[3].metric("Net Gamma", f"{greeks['gamma']:+.4f}")

# P&L chart
st.subheader("P&L Curve (30d)")
pnl_df = load_pnl_history(days=30)
fig = go.Figure()
fig.add_trace(go.Scatter(x=pnl_df["date"], y=pnl_df["nav"],
                          mode="lines", name="NAV"))
st.plotly_chart(fig, use_container_width=True)

# Positions
st.subheader("Open Positions")
positions = get_positions()
st.dataframe(positions, use_container_width=True)

# Recent alerts
st.subheader("Recent Alerts (24h)")
alerts_df = load_recent_alerts(hours=24)
st.dataframe(alerts_df, use_container_width=True)

# Footer
st.caption(f"Last update: {datetime.now():%Y-%m-%d %H:%M:%S} — auto-refresh {REFRESH}s")
time.sleep(REFRESH)
st.rerun()

7.3 部署:systemd service

# /etc/systemd/system/tr-dashboard.service
[Unit]
Description=TR Trading Dashboard
After=network.target

[Service]
User=trader
WorkingDirectory=/home/trader/momoweb3/tr
ExecStart=/home/trader/.venv/bin/streamlit run dashboard.py \
  --server.port 8080 --server.address 127.0.0.1
Restart=always
RestartSec=10

[Install]
WantedBy=multi-user.target
sudo systemctl enable --now tr-dashboard
# 手机访问:用 Tailscale / ngrok 内网穿透到 localhost:8080

八、「凌晨 4 点 IB Gateway 自动重启」守护进程

8.1 IBKR 每日强制重启的特性

模式重启时间(UTC)影响
Auto-restart用户自配置一次性仍然要 2FA
Daily restart美东 23:45-00:30 之间强制断开
Weekly Sat周六TWS 强制

重点:自动重启后 IBKR 不会自动恢复 API 客户端连接——你的 Python 进程会拿到 disconnect 事件,必须自己 reconnect。

8.2 watchdog 实现

# gateway_watchdog.py
import time
import logging
from ib_insync import IB
from alerts import alerter, Level

log = logging.getLogger(__name__)

HOST, PORT, CID = "127.0.0.1", 4002, 1

class GatewayWatchdog:
    def __init__(self):
        self.ib = IB()
        self.disconnect_at = None
        self.attempts = 0

    def connect_once(self):
        try:
            self.ib.connect(HOST, PORT, clientId=CID, timeout=15)
            log.info("connected")
            self.disconnect_at = None
            self.attempts = 0
            return True
        except Exception as e:
            log.warning(f"connect failed: {e}")
            self.attempts += 1
            return False

    def run(self):
        if not self.ib.isConnected():
            self.connect_once()

        while True:
            try:
                self.ib.sleep(10)  # keep event loop alive

                if self.ib.isConnected():
                    if self.disconnect_at:
                        # 恢复
                        alerter.clear(Level.P1, "Gateway 断开")
                        self.disconnect_at = None
                else:
                    now = time.time()
                    if self.disconnect_at is None:
                        self.disconnect_at = now
                        log.warning("gateway disconnected, will retry")

                    elapsed = now - self.disconnect_at
                    # P1 at 5 min
                    if 300 <= elapsed < 310:
                        alerter.send(Level.P1,
                            "IB Gateway 已断开 5 分钟",
                            f"尝试自动重连中... attempts={self.attempts}")
                    # P0 at 15 min
                    if 900 <= elapsed < 910:
                        alerter.send(Level.P0,
                            "IB Gateway 断开超过 15 分钟",
                            f"自动重连失败 {self.attempts} 次,需要人工介入!")

                    # 重试(指数退避,上限 5 分钟)
                    backoff = min(300, 2 ** min(self.attempts, 8))
                    log.info(f"retry in {backoff}s")
                    time.sleep(backoff)
                    self.connect_once()
            except KeyboardInterrupt:
                break
            except Exception as e:
                log.exception(f"watchdog loop error: {e}")
                time.sleep(30)

if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s")
    GatewayWatchdog().run()

8.3 部署成 systemd service

# /etc/systemd/system/tr-watchdog.service
[Service]
ExecStart=/home/trader/.venv/bin/python gateway_watchdog.py
Restart=always

关键点:watchdog 进程只负责连接 + 告警,不下单。下单逻辑在另一个 strategy 进程里订阅同一个 IB 连接(用 clientId=2)。职责单一是 SRE 的核心原则。


九、Production checklist:上线前必须过的 12 道关

把这套搬到实盘前,按这个顺序自检:

  • (1) 所有 P0 都有 mock 触发脚本:能手动跑出一次告警证明通道活
  • (2) Telegram bot 测试:发送一条 P0 → 5 秒内在手机收到 + 有声音
  • (3) SMS 测试:Twilio test mode 发一条 → 手机收到
  • (4) Email 测试:SMTP 配置可发,且不进垃圾箱
  • (5) Dashboard 可达:localhost:8080 打开正常,移动端通过 Tailscale 可访问
  • (6) systemd service 测试:手动 kill -9 进程 → 10 秒内自动拉起
  • (7) Gateway 断网测试:拔网线 5 分钟 → 看到 P1 告警;15 分钟 → P0
  • (8) 闭市抑制:周末跑监控不应该告警
  • (9) Holiday 抑制:NYSE 节假日表至少覆盖未来 12 个月
  • (10) 告警去重测试:连续触发同一异常 10 次,只收到 1 条
  • (11) Recovery 通知:触发后恢复,收到 [CLEARED] 消息
  • (12) Log retention:监控日志至少保留 90 天,方便事后复盘

十、常见 false positive 排查表

误报场景根因解决
周六上午报「Net Delta 越界」周末用了周五收盘价 + Monday morning gap 假设闭市抑制 Greeks 告警
期权 rollover 当日 Greeks 跳变老合约平、新合约开瞬间 net 临时偏容忍 1 天 + 在 rollover script 里手动 suppress
数据延迟 60 秒告警频发标的低流动性 + 没成交就没 tick改成「最后 5 min 内无 tick + 在交易时段」
假日告警美国 / 香港不同假日表按市场维度独立 holiday calendar
「-10% 单仓位」频繁触发小仓位($50 期权)波动 % 很大加绝对金额下限 abs($) > 1500 AND pct < -10%
Gateway 重启窗口的 P1每天 23:50 UTC 必断把 23:30-00:30 UTC 设为「计划维护窗口」抑制 P1
VIX > 30 一直挂着高波动期间持续触发一次性 latched alert,恢复才再发

十一、PM 视角:监控告警是 production maturity 的真正分水岭

  1. 「能跑」不是「能投产」:策略跑通只是 0→1,从 1→100 中间的 99 步全是 ops。我十年金融业看过太多策略「回测惊艳实盘平庸」,其中一半的问题不在 alpha 而在「事故没被发现」。
  2. MTTD + MTTR 是新的 KPI:不要再纠结「策略 Sharpe 又涨 0.1」——花同样时间把告警 SLO 从 30 分钟降到 5 分钟,对长期 P&L 的贡献更大。这是 SRE 给量化的最大启示。
  3. 告警分级是「自我承诺管理」:你能保证 5 分钟内响应的事情才能叫 P0。承诺越多越廉价——这条做产品也通用,「P0 issue 满天飞」的团队没有真正的优先级。
  4. 职责单一原则:watchdog 不下单,strategy 不重连,dashboard 不修改状态。每个进程做一件事,挂了影响最小。Conway's Law 在你一个人的系统里也成立——你怎么切分进程,你就会怎么调试事故。
  5. observability 优先级:先有 logs(事后看),再有 metrics(事中看),最后有 traces(事中查链路)。Day 83 的告警系统本质是「metrics → alerts」那一层,logs 早就应该有了。下一步 Day 84+ 我们会补 metrics 持久化(InfluxDB 或 Prometheus)。
  6. 可移植到 web3:这套监控完全可以套用到链上策略——把「IB Gateway 连接」换成「RPC 节点延迟」,把「Greeks」换成「Vault Health Factor」,整个框架几乎零改动。Web3 PM 简历上写一句「为多链 DeFi 仓位设计三级告警 + 自动重连守护进程」绝对加分。

十二、明日预告

Day 84: 月度报告自动化 — P&L 归因 / Greeks 复盘 / 改进点 ranking

  • 月末自动跑出一份 markdown 报告(含图表)
  • P&L 归因:按策略 / 按标的 / 按 Greeks 分解
  • 与基准(SPY / 60-40)对比
  • 自动识别「该止损没止损」的事件
  • 给下个月的策略调整提建议(heuristic)
  • 直接 push 到 Telegram + 存 docs/daily/ 备查

实际执行记录

启动一项填一项,时间戳 + 卡点。

  • [hh:mm] 创建 Telegram bot + 拿到 BOT_TOKEN / CHAT_ID — ...
  • [hh:mm] alerts.py 写完 + 单测 P0/P1/P2 三个通道 — ...
  • [hh:mm] dedup 逻辑验证(连续触发 10 次只收 1 条) — ...
  • [hh:mm] Streamlit dashboard 跑通 + Tailscale 内网穿透 — ...
  • [hh:mm] gateway_watchdog.py 跑通 + 拔网测试 — ...
  • [hh:mm] systemd service 部署 + reboot 后自启 — ...
  • [hh:mm] 12 道 production checklist 全过 — ...
  • 卡点 / 学到的:

总字数:约 6,400 字 今日完成度:理论 ✓ / 实操(你自己执行)/ 笔记 ✓