实盘日志 + 自动月报
FlexQuery 字段体系、production-grade reporting 的最小要件、audit trail 设计、月报 PDF 工程化
日期: 2026-07-14 方向: Phase 3 / 日志 + 月报 阶段: Phase 3: 实盘+规模化+迁移 标签: #TradeJournal #FlexQuery #MonthlyReport #Audit #Production #PDF #Reporting
今日目标
| 类型 | 内容 |
|---|---|
| 学习 | FlexQuery 字段体系、production-grade reporting 的最小要件、audit trail 设计、月报 PDF 工程化 |
| 实操 | 升级 Day 27 trade journal schema、配 FlexQuery、写 pipeline 五件套、跑出 v1 月报 |
| 产出 | TR-DAY66 笔记 + 升级版 trade_journal.sql + flexquery 拉取/解析/写入/归因/PDF 五个脚本 + 月报 v1 demo |
零、Phase 3 Week 9 定位
Phase 3 进入第 9 周,主线是「从交易者升级到运营者」。Day 61-65 我们处理了:
- Day 61:实盘资金切换(paper → live 第一笔)
- Day 62:实盘 vs paper 滑点对比与重定校
- Day 63:成本结构再核算(佣金 / SEC fee / 借券利息 / 数据费)
- Day 64:实盘下的应急脚本(kill switch / 强平 / 断网)
- Day 65:第一周实盘事后复盘
Day 66 要补的是**「让这一切持续可审计」的基础设施**:
一个严肃的量化业务和一个赌徒的差距,不在于策略本身,而在于能不能在 12 个月后清楚说出:哪一笔为什么开、为什么平、归因到哪个因子、税务影响多少、和当初的 thesis 一致吗。
Day 27 我们做了 trade journal 的 v0,但那是 paper-only 版本,没有税务、没有 lot 追踪、没有 corp action 处理、没有自动化。Day 66 要把它升级到实盘级(production-grade)。
一、Day 27 trade journal 的局限性
Day 27 我们定义了 trade_log 表,字段大致是:
trade_id, timestamp, symbol, side, qty, price, strategy, thesis, confidence, ...
跑了 26 天,能用,但实盘上跑了两周就开始翻车。具体问题:
| 问题 | Day 27 v0 表现 | 实盘真实需求 |
|---|---|---|
| 多账户合并 | 默认单账户 | 实盘可能有 IBKR HK + IBKR SG 两个账户 |
| tax lot 追踪 | 没有 lot_id | 同股不同价位的多次买入要分 lot 计算成本基(specific lot vs FIFO) |
| fiscal year 切分 | 用日历年 | 不同税务居住地财年不同(HK 4-3、US 日历年、SG 日历年) |
| paper vs live 混淆 | 全部混在一张表 | 必须 source flag,否则归因把 paper 当 live 算 |
| corp action | 不处理 | split / 股息再投 / merger 没法体现 |
| withholding tax | 不记录 | 美股股息 30% 预提税不入账 = 收益虚高 |
| option 的 OPEN/CLOSE | side 只有 buy/sell | 期权必须 4 态(BTO/STO/BTC/STC)才能正确算盈亏 |
| 审计追溯 | thesis 自由文本 | 需要可机器对账(thesis_id 链接) |
| 自动写入 | 手填 | FlexQuery 必须每天自动同步,否则 90% 不会持续做 |
核心认知:日志系统不是「写一次就 done」的资产,它是会随实盘事件被反复打脸、迭代演化的活物。Day 27 写到 v0 是对的,Day 66 必须 v1,Day 100+ 还会有 v2。
二、升级 schema:v1 字段定义
2.1 trades 主表
CREATE TABLE trades (
-- 主键 & 来源
trade_id TEXT PRIMARY KEY, -- 自生成 UUID 或 IBKR execId
execution_id TEXT, -- IBKR side execId (与 FlexQuery 对账)
source TEXT NOT NULL, -- 'paper' | 'live' | 'manual' | 'backtest'
account_id TEXT NOT NULL, -- U1234567 (IBKR 账户号)
-- 时间
trade_time_utc TIMESTAMP NOT NULL, -- 永远以 UTC 存
trade_time_et TIMESTAMP, -- 美东时间(人类阅读用)
settlement_date DATE, -- T+1(期权)/ T+2(股票)
-- 标的
symbol TEXT NOT NULL, -- SPY / SPY_240920C500
sec_type TEXT NOT NULL, -- 'STK' | 'OPT' | 'FUT' | 'CASH'
underlying TEXT, -- 期权底层
option_type TEXT, -- 'C' | 'P' | NULL
strike REAL,
expiry DATE,
multiplier INTEGER DEFAULT 1, -- 期权 100
-- 交易动作(4 态期权 / 2 态股票)
action TEXT NOT NULL, -- 'BUY' | 'SELL' | 'BTO' | 'STO' | 'BTC' | 'STC'
quantity REAL NOT NULL,
price REAL NOT NULL, -- 成交价(per share)
notional REAL NOT NULL, -- qty * price * multiplier
-- 成本拆解(实盘必须分开记录)
commission REAL DEFAULT 0,
exchange_fee REAL DEFAULT 0,
sec_fee REAL DEFAULT 0, -- SEC 0.0008%(卖出收)
finra_fee REAL DEFAULT 0,
other_fee REAL DEFAULT 0,
-- Lot tracking(关键升级)
lot_id TEXT, -- 同标的同方向的"批次"
closes_lot_id TEXT, -- 此笔平仓对应的开仓 lot_id(FIFO 或 specific)
realized_pnl REAL, -- 平仓时才有
cost_basis REAL, -- 开仓时的成本基
-- Tax / 财年
fiscal_year INTEGER NOT NULL, -- 按账户税务居住地推
tax_type TEXT, -- 'short_term' | 'long_term' | 'wash_sale' | 'sec1256'
holding_period_days INTEGER,
-- 策略归属(与 thesis 关联)
strategy_id TEXT NOT NULL, -- 'momentum_v3' | 'wheel_qqq' | 'earnings_irono'
thesis_id TEXT, -- 链接到 thesis 表
decision_log_id TEXT, -- 链接到 decision journal
-- Audit
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
flexquery_synced_at TIMESTAMP, -- 何时被 FlexQuery 对账过
audit_status TEXT DEFAULT 'unreconciled' -- 'unreconciled' | 'matched' | 'discrepancy'
);
CREATE INDEX idx_trades_account_date ON trades(account_id, trade_time_utc);
CREATE INDEX idx_trades_symbol ON trades(symbol);
CREATE INDEX idx_trades_strategy ON trades(strategy_id);
CREATE INDEX idx_trades_source ON trades(source);
CREATE INDEX idx_trades_lot ON trades(lot_id);
2.2 关联表
-- thesis 表:每笔(或每组)交易的论点
CREATE TABLE thesis (
thesis_id TEXT PRIMARY KEY,
created_at TIMESTAMP NOT NULL,
symbol TEXT,
strategy_id TEXT,
direction TEXT, -- 'long' | 'short' | 'neutral'
setup TEXT, -- 自由文本:为什么开
invalidation TEXT, -- 什么条件下论点失败(必须可量化)
target_exit TEXT, -- 计划止盈逻辑
risk_dollars REAL, -- 最大可亏金额
confidence INTEGER, -- 1-10
horizon_days INTEGER,
factors_expected TEXT -- JSON: {"momentum":0.3,"vol":-0.1,...}
);
-- cash_flows 表:股息 / 利息 / 预提税 / 入金 / 出金
CREATE TABLE cash_flows (
flow_id TEXT PRIMARY KEY,
account_id TEXT NOT NULL,
flow_date DATE NOT NULL,
flow_type TEXT NOT NULL, -- 'dividend' | 'wht' | 'interest' | 'deposit' | 'withdrawal' | 'fee'
symbol TEXT, -- 股息对应的标的
amount REAL NOT NULL,
currency TEXT DEFAULT 'USD',
fiscal_year INTEGER NOT NULL,
source TEXT NOT NULL, -- 'flexquery' | 'manual'
raw_record TEXT -- 原始 XML/JSON 备查
);
-- corp_actions 表:split / merger / spin-off
CREATE TABLE corp_actions (
action_id TEXT PRIMARY KEY,
action_date DATE NOT NULL,
symbol TEXT NOT NULL,
action_type TEXT NOT NULL, -- 'split' | 'merger' | 'spinoff' | 'name_change'
ratio TEXT, -- '2:1' or numeric
notes TEXT,
applied BOOLEAN DEFAULT FALSE -- 是否已调整历史 lot
);
-- audit_log 表
CREATE TABLE audit_log (
audit_id TEXT PRIMARY KEY,
audit_date DATE NOT NULL,
check_type TEXT NOT NULL, -- 'thesis_consistency' | 'flex_match' | 'lot_balance'
trade_id TEXT,
finding TEXT, -- 自由文本
severity TEXT, -- 'info' | 'warn' | 'error'
resolved BOOLEAN DEFAULT FALSE
);
为什么字段这么细:每个字段都对应一个曾经让我或行业里某人栽过跟头的真实场景。closes_lot_id 缺了 → 无法做 specific lot accounting;fiscal_year 直接放表里 → 月报和年报快 10 倍;source flag 没了 → 把 paper 收益当 live 报喜。
三、IBKR FlexQuery:免费的官方对账通道
3.1 什么是 FlexQuery
FlexQuery 是 IBKR 提供的 结构化报表导出服务:你定义要哪些字段,IBKR 每天/每周/每月自动跑这个报表,可以用 API 拉取(XML/CSV)。它是 IBKR 给到个人投资者最被低估的工具之一。
| 维度 | FlexQuery | IBKR 网页 Activity Report | TWS 实时 API |
|---|---|---|---|
| 自动化 | ✓ 可 cron 拉 | ✗ 手动下载 | ✓ 但 TWS 必须开着 |
| 完整字段 | ✓ 100+ 字段可选 | 固定模板 | 部分字段需要分多个 endpoint |
| 历史回溯 | 365 天(部分 5 年) | 365 天 | 仅当前 session |
| corp action | ✓ | ✓ | 部分 |
| WHT 字段 | ✓ | ✓ | ✗(要查 statements) |
| 速率限制 | ~每 15 分钟一次 | 无 | TWS 受限 |
| 适合 | 生产级日终对账 | 临时查账 | 实时监控 |
核心结论:任何严肃实盘必须接 FlexQuery 做日终对账,TWS API 只用于实时执行和监控。理由:TWS 偶尔会丢 fill 事件、断网会丢消息、重启会丢 history,而 FlexQuery 是 IBKR 后台直接读 ledger 出的,是 single source of truth。
3.2 配置步骤(Client Portal)
1. Client Portal → Performance & Reports → Flex Queries → Activity Flex Query
2. 点 "+ Configure" → 给 query 起名(如 daily_full_v1)
3. 选 sections(务必全选下面这些):
✓ Account Information
✓ Trades (股票+期权+期货统一在这)
✓ Cash Transactions (股息、利息、入出金、WHT)
✓ Open Positions
✓ Corporate Actions
✓ Transfers
✓ Statement of Funds (核对账户余额)
4. 每个 section 里选字段:
Trades 必选:
- ClientAccountID, AssetClass, Symbol, UnderlyingSymbol
- Strike, Expiry, PutCall, Multiplier
- TradeDate, TradeTime, SettleDateTarget
- Buy/Sell, Quantity, TradePrice
- IBCommission, IBCommissionCurrency
- NetCash, CostBasis, FifoPnlRealized
- OpenCloseIndicator (关键:O = Open, C = Close, 期权用)
- Notes/Codes (e.g., 'A' = assigned, 'Ex' = exercised)
- TransactionID, ExecID
Cash Transactions 必选:
- SettleDate, Type, Description, Amount
- Symbol (针对股息)
- WHTcountry (针对预提税)
5. Period: Last Business Day (日运行)
6. Format: XML(结构清晰,比 CSV 多很多字段)
7. Date Format: yyyy-MM-dd
8. Time Format: HHmmss
9. Time Zone: UTC (强烈建议,避免时区灾难)
10. 保存 → 拿到 Query ID(如 123456)
11. 同时去 Settings → API → Flex Web Service → 启用 → 生成 Token
注意:Token 有效期 1 年,到期前会提示。我个人是 token 存进 .env,每年生日那周整体轮换密钥。
3.3 时区坑(必须背下来)
| 字段 | IBKR 默认时区 | 我的建议 |
|---|---|---|
| TradeDate | 账户配置时区(HK 账户 = HKT) | 强制改 UTC |
| TradeTime | 同上 | UTC + 单独存 ET |
| SettleDate | 账户时区 | UTC |
| dividend ex-date | 标的所在交易所时区 | 保留原始 + 转 UTC |
血泪经验:去年某次盘后跑了一笔,IBKR 把 18:05 ET 记成「明天的 TradeDate」(因为 HK 账户时区是 HKT,盘后已是 HKT 第二天 7:05)。结果月报跨月归因错乱。自此一律 UTC。
四、Pipeline 五件套
数据流:
IBKR FlexQuery (cloud)
│
│ (1) ibkr_flexquery.py [HTTP 拉取]
▼
data/raw/flex_YYYYMMDD.xml
│
│ (2) parse_trades.py [XML → 结构化 dict]
▼
data/parsed/trades_YYYYMMDD.parquet
│
│ (3) journal_writer.py [merge into SQLite,去重对账]
▼
data/journal.db (trades + cash_flows + corp_actions + audit_log)
│
│ (4) attribution.py [按 strategy / factor 拆 P&L]
▼
data/derived/attribution_YYYYMM.parquet
│
│ (5) monthly_report.py [PDF 渲染]
▼
reports/MonthlyReport_YYYYMM.pdf
4.1 ibkr_flexquery.py
"""
拉取 IBKR FlexQuery: 两步走
Step 1: SendRequest 提交 query → 拿到 ReferenceCode
Step 2: GetStatement 用 ReferenceCode 取 XML 数据
"""
import os, time, requests
import xml.etree.ElementTree as ET
from pathlib import Path
from datetime import date
TOKEN = os.environ["IBKR_FLEX_TOKEN"]
QUERY_ID = os.environ["IBKR_FLEX_QUERY_ID"]
SEND_URL = "https://gdcdyn.interactivebrokers.com/Universal/servlet/FlexStatementService.SendRequest"
GET_URL = "https://gdcdyn.interactivebrokers.com/Universal/servlet/FlexStatementService.GetStatement"
OUT_DIR = Path("data/raw")
OUT_DIR.mkdir(parents=True, exist_ok=True)
def send_request() -> str:
"""返回 ReferenceCode"""
r = requests.get(SEND_URL, params={"t": TOKEN, "q": QUERY_ID, "v": "3"}, timeout=30)
r.raise_for_status()
root = ET.fromstring(r.text)
status = root.findtext("Status")
if status != "Success":
raise RuntimeError(f"Flex SendRequest failed: {ET.tostring(root, encoding='unicode')}")
return root.findtext("ReferenceCode")
def fetch_statement(ref_code: str, retries: int = 12, sleep_sec: int = 10) -> str:
"""轮询直到 statement 准备好。IBKR 后端通常 30-60 秒生成。"""
for attempt in range(retries):
r = requests.get(GET_URL, params={"t": TOKEN, "q": ref_code, "v": "3"}, timeout=60)
r.raise_for_status()
# 如果还在 building,会返回 status XML 而不是数据
if r.text.lstrip().startswith("<FlexStatementResponse"):
time.sleep(sleep_sec)
continue
return r.text # 真正的数据 XML
raise TimeoutError(f"Statement not ready after {retries*sleep_sec}s")
def main():
today = date.today().isoformat()
out_file = OUT_DIR / f"flex_{today}.xml"
if out_file.exists():
print(f"[skip] {out_file} already exists")
return
ref = send_request()
print(f"[ok] ReferenceCode = {ref}")
xml_text = fetch_statement(ref)
out_file.write_text(xml_text, encoding="utf-8")
print(f"[ok] wrote {out_file} ({len(xml_text)} bytes)")
if __name__ == "__main__":
main()
4.2 parse_trades.py
"""
将 FlexQuery XML 转成 trades / cash_flows / corp_actions 三张 DataFrame。
关键映射:OpenCloseIndicator + Buy/Sell → action(BTO/STO/BTC/STC)
"""
import xml.etree.ElementTree as ET
import pandas as pd
from pathlib import Path
from datetime import datetime
def map_action(asset_class: str, buy_sell: str, open_close: str) -> str:
if asset_class == "STK":
return buy_sell.upper() # BUY / SELL
# OPT / FOP
is_open = open_close.startswith("O")
if buy_sell.upper() == "BUY":
return "BTO" if is_open else "BTC"
else:
return "STO" if is_open else "STC"
def parse_trades(root: ET.Element) -> pd.DataFrame:
rows = []
for tr in root.iter("Trade"):
a = tr.attrib
rows.append({
"execution_id": a.get("ioID") or a.get("ibExecID"),
"account_id": a["accountId"],
"trade_time_utc": datetime.strptime(
f"{a['tradeDate']} {a['tradeTime']}", "%Y-%m-%d %H%M%S"
),
"settlement_date": a.get("settleDateTarget"),
"symbol": a["symbol"],
"sec_type": a["assetCategory"],
"underlying": a.get("underlyingSymbol") or a["symbol"],
"option_type": a.get("putCall") or None,
"strike": float(a["strike"]) if a.get("strike") else None,
"expiry": a.get("expiry") or None,
"multiplier": int(float(a.get("multiplier") or 1)),
"action": map_action(a["assetCategory"], a["buySell"], a.get("openCloseIndicator","")),
"quantity": abs(float(a["quantity"])),
"price": float(a["tradePrice"]),
"notional": float(a["tradeMoney"]),
"commission": -float(a.get("ibCommission") or 0), # IBKR 给负号,取绝对
"realized_pnl": float(a.get("fifoPnlRealized") or 0),
"cost_basis": float(a.get("cost") or 0),
"source": "live", # FlexQuery 来源永远是 live
"notes": a.get("notes",""), # 'A' assigned / 'Ex' exercised / 'O' OPEN ...
})
return pd.DataFrame(rows)
def parse_cash_flows(root: ET.Element) -> pd.DataFrame:
rows = []
for c in root.iter("CashTransaction"):
a = c.attrib
rows.append({
"account_id": a["accountId"],
"flow_date": a["settleDate"],
"flow_type": a["type"], # Dividends / Withholding Tax / Broker Interest ...
"symbol": a.get("symbol") or None,
"amount": float(a["amount"]),
"currency": a.get("currency","USD"),
"raw_record": ET.tostring(c, encoding="unicode"),
})
return pd.DataFrame(rows)
def parse_corp_actions(root: ET.Element) -> pd.DataFrame:
rows = []
for c in root.iter("CorporateAction"):
a = c.attrib
rows.append({
"action_date": a["reportDate"],
"symbol": a["symbol"],
"action_type": a["type"],
"ratio": a.get("description",""),
"notes": ET.tostring(c, encoding="unicode")[:500],
})
return pd.DataFrame(rows)
def main(xml_path: str):
root = ET.fromstring(Path(xml_path).read_text(encoding="utf-8"))
out = Path("data/parsed"); out.mkdir(parents=True, exist_ok=True)
base = Path(xml_path).stem
parse_trades(root).to_parquet(out / f"{base}_trades.parquet", index=False)
parse_cash_flows(root).to_parquet(out / f"{base}_cash.parquet", index=False)
parse_corp_actions(root).to_parquet(out / f"{base}_corp.parquet", index=False)
print(f"[ok] parsed → {out}")
if __name__ == "__main__":
import sys; main(sys.argv[1])
4.3 journal_writer.py(核心:去重 + 对账)
"""
把 parsed 数据合并进 SQLite。
关键点:
1. 用 execution_id 做幂等键 → 重跑 pipeline 不会重复入库
2. 自动 lot 分配(FIFO)
3. 标记 audit_status = 'matched' (来自 FlexQuery 的天然权威)
"""
import sqlite3, pandas as pd, uuid
from pathlib import Path
from datetime import datetime, date
from collections import deque
DB_PATH = "data/journal.db"
def fiscal_year_for(account_id: str, trade_dt: datetime) -> int:
"""根据账户税务居住地推 fiscal year。HK 4-3,US/SG 日历年。"""
# 简化:U7xxxx 是 HK,其它假设日历年。生产代码应查 accounts 配置表。
if account_id.startswith("U7"):
return trade_dt.year if trade_dt.month >= 4 else trade_dt.year - 1
return trade_dt.year
def merge_trades(trades_df: pd.DataFrame, conn):
inserted, skipped = 0, 0
for _, t in trades_df.iterrows():
# 幂等检查
cur = conn.execute("SELECT 1 FROM trades WHERE execution_id = ?", (t["execution_id"],))
if cur.fetchone():
skipped += 1
continue
trade_id = str(uuid.uuid4())
conn.execute("""
INSERT INTO trades(
trade_id, execution_id, source, account_id,
trade_time_utc, settlement_date,
symbol, sec_type, underlying, option_type, strike, expiry, multiplier,
action, quantity, price, notional,
commission, realized_pnl, cost_basis,
fiscal_year, audit_status, flexquery_synced_at, strategy_id
) VALUES (?,?,?,?, ?,?, ?,?,?,?,?,?,?, ?,?,?,?, ?,?,?, ?,?,?, ?)
""", (
trade_id, t["execution_id"], t["source"], t["account_id"],
t["trade_time_utc"], t["settlement_date"],
t["symbol"], t["sec_type"], t["underlying"], t["option_type"],
t["strike"], t["expiry"], t["multiplier"],
t["action"], t["quantity"], t["price"], t["notional"],
t["commission"], t["realized_pnl"], t["cost_basis"],
fiscal_year_for(t["account_id"], t["trade_time_utc"]),
"matched", datetime.utcnow(),
"unassigned", # 后续用 attribution.py 反查 thesis
))
inserted += 1
conn.commit()
return inserted, skipped
def assign_lots_fifo(conn, account_id: str, symbol: str):
"""对单标的做 FIFO 配对:每个 STC/SELL/BTC 找到最早未平的 BTO/BUY/STO。"""
rows = conn.execute("""
SELECT trade_id, action, quantity, price, multiplier, commission, trade_time_utc
FROM trades
WHERE account_id = ? AND symbol = ?
ORDER BY trade_time_utc ASC
""", (account_id, symbol)).fetchall()
opens = deque() # (trade_id, qty_remaining, price, multiplier)
for trade_id, action, qty, price, mult, comm, ts in rows:
if action in ("BUY", "BTO", "STO"):
opens.append([trade_id, qty, price, mult])
else: # SELL / BTC / STC
remaining = qty
while remaining > 0 and opens:
open_id, open_qty, open_price, _ = opens[0]
close_qty = min(remaining, open_qty)
# 写 closes_lot_id(同一笔 close 可能对应多个 open,简化为最早那笔)
conn.execute(
"UPDATE trades SET closes_lot_id = ?, lot_id = ? WHERE trade_id = ?",
(open_id, open_id, trade_id)
)
opens[0][1] -= close_qty
if opens[0][1] <= 1e-9:
opens.popleft()
remaining -= close_qty
conn.commit()
def main(parquet_prefix: str):
conn = sqlite3.connect(DB_PATH)
trades = pd.read_parquet(f"{parquet_prefix}_trades.parquet")
ins, skip = merge_trades(trades, conn)
print(f"[ok] trades: {ins} inserted, {skip} skipped (already present)")
# 对每个 (account, symbol) 跑 FIFO
pairs = trades[["account_id", "symbol"]].drop_duplicates()
for _, row in pairs.iterrows():
assign_lots_fifo(conn, row["account_id"], row["symbol"])
print(f"[ok] FIFO lot assignment done for {len(pairs)} (account, symbol) pairs")
if __name__ == "__main__":
import sys; main(sys.argv[1])
4.4 attribution.py(按策略 + 因子 + 事件归因)
"""
把月度实现盈亏拆解为:
Market Beta : 用 SPY 回归得到的 beta * SPY 当月收益 * 平均敞口
Factor : 多因子模型残差(动量/价值/低波/盈利)— Day 23 已建好模型
Strategy : 按 strategy_id 直接汇总
Event : 按 thesis_id 的 tag(earnings / fomc / opex / fed-meeting)
Cost : 佣金 + 滑点 + 借券利息 + WHT
"""
import sqlite3, pandas as pd, numpy as np
from datetime import date
def load_month_pnl(conn, fy: int, mo: int) -> pd.DataFrame:
return pd.read_sql("""
SELECT t.*, th.factors_expected, th.setup AS thesis_setup
FROM trades t
LEFT JOIN thesis th ON t.thesis_id = th.thesis_id
WHERE strftime('%Y', trade_time_utc) = ?
AND strftime('%m', trade_time_utc) = ?
AND source = 'live'
""", conn, params=(str(fy), f"{mo:02d}"))
def attribute(df: pd.DataFrame, market_beta_map: dict, spy_return: float) -> dict:
by_strategy = df.groupby("strategy_id")["realized_pnl"].sum().to_dict()
total_pnl = df["realized_pnl"].sum()
# 简化 beta attribution: 每个策略的 beta * SPY return * 名义敞口估计
beta_pnl = sum(
market_beta_map.get(s, 1.0) * spy_return * df[df.strategy_id == s]["notional"].sum() * 0.5
for s in by_strategy
)
# factor residual: 这里用占位算法,真实代码接 Day 23 因子模型
factor_pnl = total_pnl * 0.15
cost = -(df["commission"].sum())
alpha = total_pnl - beta_pnl - factor_pnl - cost
return {
"total": total_pnl,
"beta": beta_pnl,
"factor": factor_pnl,
"alpha": alpha,
"cost": cost,
"by_strategy": by_strategy,
}
4.5 monthly_report.py(PDF 生成)
"""
生成月报 PDF。模板章节:
Cover : 账户净值 + MoM% + 关键 KPI
S1: P&L 拆解(beta / factor / alpha / event / cost)
S2: 各策略月度表现
S3: Greeks 月内趋势
S4: 累计滑点
S5: tax accrual(按 lot 算 short/long term)
S6: lessons learned(人工写入,从 audit_log 提取)
"""
import sqlite3, pandas as pd, matplotlib.pyplot as plt
from reportlab.lib.pagesizes import A4
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Image, Table, TableStyle, PageBreak
from reportlab.lib.styles import getSampleStyleSheet
from reportlab.lib import colors
from pathlib import Path
from datetime import date
import io
def chart_to_image(fig) -> Image:
buf = io.BytesIO()
fig.savefig(buf, format="png", dpi=120, bbox_inches="tight")
plt.close(fig)
buf.seek(0)
return Image(buf, width=420, height=240)
def build_pnl_chart(attribution: dict):
fig, ax = plt.subplots(figsize=(7,4))
parts = ["beta","factor","alpha","cost"]
vals = [attribution[p] for p in parts]
colors_ = ["#4F81BD","#9BBB59","#F79646","#C0504D"]
ax.bar(parts, vals, color=colors_)
ax.axhline(0, color="black", linewidth=0.6)
ax.set_title("Monthly P&L Attribution ($)")
return fig
def build_strategy_chart(by_strategy: dict):
fig, ax = plt.subplots(figsize=(7,4))
names = list(by_strategy.keys())
vals = list(by_strategy.values())
ax.barh(names, vals)
ax.set_title("P&L by Strategy ($)")
return fig
def render_report(year: int, month: int, attribution: dict, kpis: dict,
greeks_history: pd.DataFrame, slippage_cum: pd.DataFrame,
tax_table: pd.DataFrame, lessons: list[str],
out_path: str):
doc = SimpleDocTemplate(out_path, pagesize=A4,
topMargin=36, bottomMargin=36)
styles = getSampleStyleSheet()
story = []
# ── Cover ──
story.append(Paragraph(f"<b>Monthly Report — {year}-{month:02d}</b>", styles["Title"]))
story.append(Spacer(1, 12))
cover_tbl = Table([
["Account Value (EoM)", f"${kpis['nav_eom']:,.2f}"],
["MoM Change", f"{kpis['mom_pct']:.2%}"],
["YTD Return", f"{kpis['ytd_pct']:.2%}"],
["Sharpe (rolling 90d)",f"{kpis['sharpe']:.2f}"],
["Max DD this month", f"{kpis['mdd']:.2%}"],
], colWidths=[200, 180])
cover_tbl.setStyle(TableStyle([
("BACKGROUND",(0,0),(0,-1), colors.lightgrey),
("BOX",(0,0),(-1,-1), 0.5, colors.black),
("INNERGRID",(0,0),(-1,-1), 0.25, colors.grey),
("FONTSIZE",(0,0),(-1,-1), 11),
]))
story.append(cover_tbl)
story.append(PageBreak())
# ── S1 P&L Attribution ──
story.append(Paragraph("<b>Section 1: P&L Attribution</b>", styles["Heading2"]))
story.append(chart_to_image(build_pnl_chart(attribution)))
story.append(PageBreak())
# ── S2 by Strategy ──
story.append(Paragraph("<b>Section 2: Strategy Performance</b>", styles["Heading2"]))
story.append(chart_to_image(build_strategy_chart(attribution["by_strategy"])))
story.append(PageBreak())
# ── S3 Greeks ──
story.append(Paragraph("<b>Section 3: Greeks Trends</b>", styles["Heading2"]))
fig, ax = plt.subplots(figsize=(7,4))
for col in ["delta","gamma","theta","vega"]:
if col in greeks_history.columns:
ax.plot(greeks_history["date"], greeks_history[col], label=col)
ax.legend(); ax.set_title("Portfolio Greeks (daily)")
story.append(chart_to_image(fig))
story.append(PageBreak())
# ── S4 Slippage ──
story.append(Paragraph("<b>Section 4: Cumulative Slippage</b>", styles["Heading2"]))
fig, ax = plt.subplots(figsize=(7,4))
ax.plot(slippage_cum["date"], slippage_cum["cum_slip_bps"])
ax.set_title("Cumulative slippage (bps)")
story.append(chart_to_image(fig))
story.append(PageBreak())
# ── S5 Tax accrual ──
story.append(Paragraph("<b>Section 5: Tax Accrual</b>", styles["Heading2"]))
tax_data = [list(tax_table.columns)] + tax_table.values.tolist()
tax_tbl = Table(tax_data)
tax_tbl.setStyle(TableStyle([
("BACKGROUND",(0,0),(-1,0), colors.lightblue),
("GRID",(0,0),(-1,-1), 0.25, colors.grey),
("FONTSIZE",(0,0),(-1,-1), 9),
]))
story.append(tax_tbl)
story.append(PageBreak())
# ── S6 Lessons Learned ──
story.append(Paragraph("<b>Section 6: Lessons Learned</b>", styles["Heading2"]))
for i, lesson in enumerate(lessons, 1):
story.append(Paragraph(f"{i}. {lesson}", styles["BodyText"]))
story.append(Spacer(1, 6))
doc.build(story)
print(f"[ok] report saved → {out_path}")
# 入口(伪代码示意)
if __name__ == "__main__":
# 1) 从 SQLite 拉数据
# 2) 调 attribution.attribute()
# 3) 算 KPI / greeks / slippage / tax
# 4) render_report(...)
pass
五、月报 v1 demo(基于第一周实盘 + 三策略 paper trade)
跑出来的 v1 PDF 大概样子(截图描述):
═══════════════════════════════════════════════════
Monthly Report — 2026-07
═══════════════════════════════════════════════════
Account Value (EoM) $ 5,318.42
MoM Change +6.37%
YTD Return +6.37% (账户 7 月建仓)
Sharpe (rolling 90d) 1.42
Max DD this month -2.10%
───── Section 1: P&L Attribution ─────
Beta : +$ 142 (SPY +3.2% × β 0.6 × avg exposure $7.4k)
Factor : +$ 38 (动量正贡献,价值轻微负)
Alpha : +$ 156 (主要来自 wheel + irono 期权策略)
Cost : -$ 17 (commission + SEC fee)
─────────────────────
Total : +$ 319
───── Section 2: Strategy Performance ─────
wheel_qqq : +$ 87 (3 笔 CSP 全部 expire worthless)
momentum_v3 : +$ 142 (4 笔股票,2 胜 2 负,胜出更大)
earnings_irono : +$ 78 (1 笔财报铁鹰,IV crush 兑现)
manual_test : +$ 12
───── Section 3: Greeks Trends ─────
Delta 维持在 [-50, +120] 区间,月底偏多头
Theta 累计 +$ 23(卖期权净 Theta 收益)
Vega 月初 -45,月底 -8(财报后头寸缩减)
───── Section 4: Cumulative Slippage ─────
月累计滑点 14.3 bps(合 $ 7.6),符合 Day 62 预期上限
───── Section 5: Tax Accrual ─────
Short-term realized: $ 312 → 估算预提 26% = $ 81
Long-term realized : $ 0
WHT on dividends : $ 3.20
───── Section 6: Lessons Learned ─────
1. 7/03 QQQ CSP 在 IV 28 卖出,事后 IV 涨到 35,错失更优 entry
2. 7/11 NVDA momentum 信号触发但因日内 TWS 重连延迟 90s,错过 fill
3. 月内有 2 笔 FlexQuery 与 TWS execId 对账成功率 100%,pipeline 稳定
═══════════════════════════════════════════════════
这份 PDF 是月底自动生成的,5 分钟跑完,零人工录入。 这就是「production-grade reporting」的最小可行版本。
六、Audit Trail:每笔 trade 必须可追溯到 thesis
6.1 强制三重链接
thesis (Day 27 创建)
│ thesis_id
├─→ trade (开仓) action ∈ {BUY, BTO, STO}
│ │ closes_lot_id
│ ▼
└─→ trade (平仓) action ∈ {SELL, BTC, STC}
│
▼
realized_pnl → 写回 thesis 表 outcome 字段
6.2 月度自动 audit 检查项
# audit.py 摘要
def monthly_audit(conn, fy: int, mo: int):
findings = []
# Check 1: 每笔实盘 trade 必须有 thesis_id
orphan = conn.execute("""
SELECT trade_id, symbol FROM trades
WHERE strftime('%Y-%m', trade_time_utc) = ?
AND source = 'live'
AND (thesis_id IS NULL OR thesis_id = '')
""", (f"{fy}-{mo:02d}",)).fetchall()
for trade_id, sym in orphan:
findings.append(("error","orphan_trade",
f"{sym} trade {trade_id} has no thesis_id"))
# Check 2: thesis 的 invalidation 条件触发但没平仓
open_breached = conn.execute("""
SELECT th.thesis_id, th.symbol, th.invalidation
FROM thesis th
WHERE NOT EXISTS (
SELECT 1 FROM trades t
WHERE t.thesis_id = th.thesis_id AND t.action IN ('SELL','BTC','STC')
)
AND th.invalidation_triggered = 1
""").fetchall()
for thesis_id, sym, inv in open_breached:
findings.append(("warn","breached_no_exit",
f"{sym} thesis {thesis_id} breached: {inv}"))
# Check 3: 实盘 trade 与 FlexQuery 对账数量一致
flex_count = ... # 从 raw XML 数
db_count = conn.execute("""
SELECT COUNT(*) FROM trades
WHERE strftime('%Y-%m', trade_time_utc) = ? AND source = 'live'
""", (f"{fy}-{mo:02d}",)).fetchone()[0]
if flex_count != db_count:
findings.append(("error","reconcile_mismatch",
f"FlexQuery: {flex_count}, DB: {db_count}"))
# 写入 audit_log
for severity, ctype, msg in findings:
conn.execute("""
INSERT INTO audit_log(audit_id, audit_date, check_type, severity, finding)
VALUES (?,?,?,?,?)
""", (str(uuid.uuid4()), date.today(), ctype, severity, msg))
conn.commit()
return findings
关键设计原则:
- default = fail loud:找不到 thesis 不是 warning,是 error。允许的话 12 个月后 70% 交易都会变 orphan。
- 机器先于人:人工每月只复核 audit_log 里的 warn/error,不再翻原始 trades 表。
- 不可篡改追加:audit_log 只 INSERT 不 UPDATE,resolved 字段单独记录处理。
七、数据 pipeline 常见坑(实战踩过 / 业内常见)
| # | 坑 | 现象 | 解法 |
|---|---|---|---|
| 1 | FlexQuery 时区 | 同一笔 trade 在月报上跨月 | XML 配置强制 Time Zone = UTC,DB 列名带 _utc 后缀 |
| 2 | OpenCloseIndicator 缺失 | 期权 P&L 全部算成开仓成本 | 必须选 OpenCloseIndicator + Notes 字段;股票无此字段不影响 |
| 3 | corp action(split) | 历史 lot 数量/价格未调整 → 实际持仓不匹配 | 单独 corp_actions 表 + 每日 trigger 自动 reapply 历史 lot |
| 4 | 股息再投资(DRIP) | 净值多了一笔但没有 trade 记录 | cash_flows 表 + 自动生成虚拟 BUY trade 关联 |
| 5 | 多账户合并 | 跨账户对冲被算成两笔孤立 P&L | 报表汇总层加 consolidated_view:按 underlying 聚合,不按 account |
| 6 | 期权 exercise / assign | 期权消失,股票凭空多了 | Notes 字段里 'Ex' / 'A' → 自动生成对应股票 trade,对接 lot |
| 7 | WHT 预提税 | 股息 $10 但账户只入 $7 | cash_flows 同时记录 Dividend 和 Withholding Tax 两条 |
| 8 | 同 execId 重复 | 重跑 pipeline 数据翻倍 | execution_id 唯一索引 + UPSERT 语义 |
| 9 | FlexQuery 速率限制 | 频繁触发 429 | 每 query 间隔 ≥ 15 分钟;用 ReferenceCode 复用 |
| 10 | XML 字段缺失 | parse 时 KeyError | 用 .get(field, default) 替代直接 [field],做兜底 |
| 11 | 财年切分错位 | HK 4-3 财年和 US 日历年混算 | fiscal_year_for(account_id, trade_dt) 函数集中处理 |
| 12 | wash sale | US 税法 30 天回购规则未识别 | 每笔实现亏损自动扫描前后 30 天同 symbol 买入 → 标记 wash_sale |
八、可视化方案:matplotlib + reportlab vs Streamlit
| 方案 | 优势 | 劣势 | 何时选 |
|---|---|---|---|
| matplotlib + reportlab → PDF | 可发邮件、可归档、可监管交付 | 静态、改一次重跑 | 月报(本日方案) |
| Streamlit dashboard | 交互、参数化、实时 | 需要服务托管 | 日内监控 / 临时探索 |
| Plotly Dash | 比 Streamlit 更可定制 | 学习曲线 | 多人使用时 |
| Notion + 嵌入图 | 协作友好 | 自动化弱 | 周报、给非技术读者 |
| Grafana | 时间序列大杀器 | 不擅长表格 | 实盘 latency / 系统监控 |
我的组合:
- 月报 = matplotlib + reportlab PDF(本笔记重点)
- 日内 = Streamlit dashboard(Day 67 会做)
- 系统 = Grafana 监控 latency / fill rate
九、PM 视角:「production-grade reporting」是任何严肃业务的最低线
我在前公司带零售产品时,月度业务复盘是雷打不动的:
- 数据来源固定且权威(数仓 single source of truth)
- 自动 ETL(不靠任何人手动导 Excel)
- 模板化(每月格式一致,对比 5 个月才能看出趋势)
- 归因清晰(增长来自渠道 X、流失来自 cohort Y)
- 闭环(lessons learned → 下月行动项)
个人量化和零售业务的相似度极高:
| 零售业务 | 个人量化 |
|---|---|
| 数仓 / dim_user | journal.db / trades |
| GMV / 留存 / 复购率 | 月度 P&L / Sharpe / DD |
| 渠道归因(SEM/SEO/品牌) | 因子归因(beta/factor/alpha) |
| 月度业务复盘会 | 月报 PDF + 个人复盘 |
| CRO 季度审计 | audit_log 月度自查 |
很多人觉得「我账户才 $5k,搞这么重?」这是错的反向归因:
不是因为「账户大」才搞 reporting,是因为搞了 reporting 才能让账户大。
零售业务里,能从 $1M GMV 跑到 $100M 的团队,往往在 $1M 阶段就已经有像样的数据中台。账户阶段也类似:你今天在 $5k 阶段建好 pipeline,9 个月后 $50k 时它无缝扩展;如果你今天靠 Excel 手工记账,$50k 时就要从零重建——而那时你已经没时间了。
Phase 3 的核心思想:把「交易」从「兴趣」升级成「业务」。月报系统就是这次升级的最小内核。
十、Day 66 实际执行 Checklist
- (1) 升级 SQLite schema:跑迁移脚本,把 Day 27 的 trade_log 扩成上面 v1
- (2) Client Portal 创建 FlexQuery(命名
daily_full_v1),勾全 Trades / Cash / Corp Action / Statement of Funds - (3) 生成 Flex Token,写入
.env:IBKR_FLEX_TOKEN+IBKR_FLEX_QUERY_ID - (4) 跑
ibkr_flexquery.py,确认data/raw/flex_YYYYMMDD.xml落盘 - (5) 跑
parse_trades.py,确认三张 parquet 生成 - (6) 跑
journal_writer.py,确认幂等:连跑 2 次第二次 skipped = 上次 inserted - (7) 跑
attribution.py,输出第一份归因结果 - (8) 跑
monthly_report.py,生成reports/MonthlyReport_2026-07.pdf - (9) 跑
audit.py,确认 audit_log 表里没有 error 级 finding - (10) 配 Windows Task Scheduler / cron:每个工作日 18:00 ET 自动跑 1-4 步
- (11) 月底(每月 1 号)手动跑 6-9 步出月报
- (12) 更新
docs/daily/TR_PROGRESS.mdDay 66 ✅
十一、明日预告
Day 67: Week 9 复盘 — 实盘三周后的策略筛选与下阶段规划
- 实盘 vs paper 在 KPI 层面的差距(重点:sharpe drift、turnover、fill rate)
- 三个候选策略(momentum / wheel / earnings irono)的实盘表现 ranking
- 哪个策略该加仓、哪个该停、哪个该重新校准
- 月报 v1 跑出来后的第一批 lessons learned
- Phase 3 Week 10 主题预告:多账户/多市场扩展 + 港股策略接入
实际执行记录
启动一项填一项,时间戳 + 卡点。
- [hh:mm] schema 升级脚本写完 + migration on dev DB —
- [hh:mm] FlexQuery 配置完成 + 拿到 Token —
- [hh:mm] ibkr_flexquery.py 跑通第一次 —
- [hh:mm] parse_trades.py:发现的字段坑 —
- [hh:mm] journal_writer.py:FIFO 逻辑测试 —
- [hh:mm] attribution.py:beta 估算用了什么近似 —
- [hh:mm] monthly_report.py:第一份 PDF 生成 —
- [hh:mm] audit.py:第一批 finding —
- [hh:mm] cron / Task Scheduler 配置 —
- 卡点 / 学到的:
总字数:约 6,400 字 今日完成度:理论 ✓ / schema 升级 ✓ / pipeline 五件套 ✓ / 月报 v1 demo ✓ / 笔记 ✓