返回交易笔记
TR Day 47

财报 Sentiment 抽取流水线 — 端到端自动化

把 Day 46 的一次性 LLM extract 升级为「production-grade」自动化流水线;prompt caching 经济学;feature store 表设计;监控/重试/告警

2026-06-25
Phase 2: 策略实战 + AI 信号
PipelineClaudeAPISECEDGARDataEngineeringPromptCachingFeatureStore

日期: 2026-06-25 方向: Phase 2 / AI Pipeline 阶段: Phase 2: 策略实战 + AI 信号 标签: #Pipeline #ClaudeAPI #SEC #EDGAR #DataEngineering #PromptCaching #FeatureStore


今日目标

类型内容
学习把 Day 46 的一次性 LLM extract 升级为「production-grade」自动化流水线;prompt caching 经济学;feature store 表设计;监控/重试/告警
实操写 6 层 pipeline(calendar → fetch → parse → LLM → store → monitor),跑通 1 个 ticker 端到端,启动 SP500 backfill
产出TR-DAY47 笔记 + pipeline/ 目录可运行代码 + earnings_features 第一批样本入库

一、为什么 Day 46 → Day 47 是个质变

Day 46 我们做了一件事:手工下载一份 NVDA 10-Q PDF,丢给 Claude API,让它输出 JSON 格式的 sentiment + guidance + risk count。那是 demo,不是系统

Demo 和系统的差距,是金融 PM 工作里最被低估的鸿沟:

维度Demo(Day 46)系统(Day 47)
输入1 份 PDF 手工放本地自动监听财报日历,自动下载
频率一次性每周自动跑,财报季每天跑
覆盖1 个 tickerSP500 全集,可扩展 SP400/600
错误处理报错就崩retry + DLQ + 人工 review 队列
输出print 到屏幕feature store,可被下游因子模型查询
可观测失败率 / 一致性 / cost 三套监控
成本单次 0.05 USD 心里没数月度 dashboard,知道每季度精确花费

核心认知:在卖方/买方真正干活的人,70% 时间在做这种"数据工程"——把"能跑通一次"变成"能可靠 run 1 年"。我们做个人量化也一样。Day 46 的 prompt 调得再好,没有 Day 47 的 pipeline 包住,它产生的信号既无法纳入因子模型回测,也无法在实盘中信任。

我 10 年 PM 经验最强的一条直觉:永远不要相信一个手工才能跑出来的数字。手工跑出来的数能 demo,能写 PPT,不能进任何严肃决策——因为下次再跑你不知道还能不能复现。


二、整体架构:6 层流水线

                     ┌─────────────────────────────────────────────────┐
                     │            外部源 (External Sources)              │
                     │  yfinance | EDGAR | Anthropic API | S&P constituents│
                     └────────────────────┬────────────────────────────┘
                                          │
   ┌──────────────────────────────────────┴────────────────────────────┐
   │                                                                    │
   │   Layer 1: earnings_calendar.py                                    │
   │   ─────────────────────────────                                    │
   │   每周一 09:00 UTC 拉本周 SP500 财报日历                              │
   │   输出: earnings_event 表(ticker, report_date, est_eps, ...)       │
   │                                                                    │
   ├────────────────────────────────────────────────────────────────────┤
   │   Layer 2: edgar_fetcher.py                                        │
   │   ─────────────────────                                            │
   │   财报披露后 T+0 ~ T+2 检查 EDGAR 10-Q / 10-K filing               │
   │   下载 HTML/PDF 到 raw_filings/ 目录                               │
   │   ETag/MD5 dedupe,避免重复下载                                       │
   ├────────────────────────────────────────────────────────────────────┤
   │   Layer 3: pdf_to_text.py                                          │
   │   ───────────────────                                              │
   │   unstructured / pypdf 解析 → 按 section 切分                       │
   │   关键段:MD&A、Risk Factors、Outlook、CFO Commentary               │
   │   输出: filing_sections 表 (ticker, section_name, text)             │
   ├────────────────────────────────────────────────────────────────────┤
   │   Layer 4: claude_extract.py    ★★★ AI 核心层 ★★★                  │
   │   ────────────────────────                                          │
   │   - system prompt + few-shot (cache_control=ephemeral)              │
   │   - 输入 MD&A 段                                                     │
   │   - 输出 JSON: guidance_change/new_risks/mgmt_tone/...               │
   │   - schema 校验(pydantic)+ 失败 retry                              │
   ├────────────────────────────────────────────────────────────────────┤
   │   Layer 5: feature_store.py                                        │
   │   ───────────────────────                                          │
   │   写入 SQLite (dev) 或 Parquet partition (prod)                     │
   │   schema 见 §五                                                      │
   ├────────────────────────────────────────────────────────────────────┤
   │   Layer 6: monitor.py                                              │
   │   ───────────────                                                  │
   │   - 失败率 > 5% → Slack/email 告警                                  │
   │   - schema 不一致 > 5% → 暂停 pipeline 等人工 review                 │
   │   - |sentiment| > 0.9 → 标记进 review_queue 等人工抽检               │
   │   - cost 累计 → daily Slack 推送                                    │
   └────────────────────────────────────────────────────────────────────┘
                                          │
                                          ▼
                            ┌───────────────────────────┐
                            │  下游消费者 (Day 50 起)      │
                            │  - XGBoost 特征工程        │
                            │  - 因子回测引擎             │
                            │  - 实盘信号生成             │
                            └───────────────────────────┘

设计原则(来自我做核心银行系统的肌肉记忆):

  1. 每层只做一件事:解析归解析、调 LLM 归调 LLM,不要在 fetcher 里偷偷塞 NLP
  2. 每层有自己的输入/输出表:可以 replay 任意一层,不必从头跑(fetcher 挂了不影响已经 parse 过的)
  3. 有状态都进 DB,无状态在内存里——这是 idempotent 重跑的前提
  4. 每层有 SLA:calendar < 1min / fetcher < 5min/ticker / LLM < 30s/section

三、Prompt Caching:把成本砍 90% 的关键

3.1 为什么这是 Day 47 最重要的工程决定

财报 sentiment 这个任务有个完美的 caching 特征:

固定不变(5000 tokens)           每次变化(800 tokens)
─────────────────────────         ─────────────────────────
system prompt                   ticker = "NVDA"
+ JSON schema 定义               + MD&A 段全文
+ 5 个 few-shot 例子              + 财报期
+ tone 评分细则                   = ~ 800 tokens 输入
+ 风险分类法
= ~ 5000 tokens 输入

如果不开 caching:

  • SP500 一季度 ~500 次调用
  • 每次 input = 5800 tokens × $3/M = $0.0174
  • 总成本 = 500 × $0.0174 = $8.7/季度(仅 input)
  • 加 output ~ $5
  • 合计 ~$14/季度

开了 caching(5 分钟 TTL,cache write 1.25×,cache read 0.1×):

  • cache write 一次:5000 × 1.25 × $3/M = $0.019
  • 后续 read:5000 × 0.1 × $3/M = $0.0015
  • 单次新输入:800 × $3/M = $0.0024
  • 500 次中假设 cache hit 80%(同 5 分钟内集中跑可达 95%+)
    • 100 次 cache write = $1.9
    • 400 次 cache read + new input = 400 × ($0.0015 + $0.0024) = $1.56
  • 总 input cost = $3.46 (vs $8.7)
  • 节省 60%;如果按一次性 batch 跑(cache hit ~95%)能省到 85%+

3.2 Anthropic API 实现

import anthropic
client = anthropic.Anthropic()

SYSTEM_PROMPT = """You are an equity research analyst...
[5000 tokens of instructions + schema + few-shot]
"""

def extract_features(ticker: str, mdna_text: str, report_date: str) -> dict:
    response = client.messages.create(
        model="claude-opus-4-7",
        max_tokens=2000,
        system=[
            {
                "type": "text",
                "text": SYSTEM_PROMPT,
                "cache_control": {"type": "ephemeral"}   # ← 关键
            }
        ],
        messages=[
            {
                "role": "user",
                "content": f"Ticker: {ticker}\nReport date: {report_date}\n\nMD&A:\n{mdna_text}"
            }
        ]
    )
    # response.usage.cache_creation_input_tokens / cache_read_input_tokens
    return parse_json_response(response.content[0].text)

Cache TTL 是 5 分钟——意味着如果你打算把 SP500 跑一遍,集中在 5 分钟窗口内连续调用 hit rate 最高。我的做法:

  • 把待 extract 的 500 条任务先放进队列
  • 一个 worker 串行 batch 处理(concurrency = 1 避免抢 cache)
  • 跑完 SP500 大概 25 分钟(30s × 500 / 60),分 5 个 5-min 窗口,每窗口续命一次 cache write

3.3 实测 cache 工作的验证

response.usage 字段必看:

{
  "input_tokens": 800,             # 这次新输入
  "cache_creation_input_tokens": 0,  # 这次写 cache 的 token(首调 ≠0)
  "cache_read_input_tokens": 5000,   # 这次从 cache 读的 token(命中后 ≠0)
  "output_tokens": 600
}

每次跑完打这个到日志,做月度 cost dashboard 时第一手数据


四、各层关键代码

4.1 Layer 1: earnings_calendar.py

# pipeline/earnings_calendar.py
import yfinance as yf
import pandas as pd
from datetime import datetime, timedelta
from db import insert_earnings_events

SP500_TICKERS = pd.read_csv("data/sp500_constituents.csv")["symbol"].tolist()

def fetch_week_calendar(start: datetime) -> pd.DataFrame:
    """拉本周财报日历。yfinance 不稳定,prod 上建议用 Earnings Whisper 或 Finnhub 付费 API"""
    end = start + timedelta(days=7)
    events = []
    for ticker in SP500_TICKERS:
        try:
            t = yf.Ticker(ticker)
            cal = t.calendar
            if cal is None or cal.empty:
                continue
            report_date = cal.loc["Earnings Date"].iloc[0]
            if start <= report_date <= end:
                events.append({
                    "ticker": ticker,
                    "report_date": report_date,
                    "period_end": cal.loc["Earnings Date"].iloc[0] - timedelta(days=45),
                    "est_eps": cal.loc["Earnings Average"].iloc[0],
                })
        except Exception as e:
            print(f"[calendar] {ticker} failed: {e}")
    return pd.DataFrame(events)

if __name__ == "__main__":
    df = fetch_week_calendar(datetime.utcnow())
    insert_earnings_events(df)
    print(f"Loaded {len(df)} earnings events for the week")

:yfinance 的 calendar 字段经常缺失、字段名变动。生产环境换 Finnhub / Polygon / FMP(年订阅 $300 量级,能省下海量调试时间)。

4.2 Layer 2: edgar_fetcher.py

# pipeline/edgar_fetcher.py
import requests
import hashlib
from pathlib import Path

EDGAR_BASE = "https://data.sec.gov"
HEADERS = {"User-Agent": "TR Research info@veloxwallet.com"}  # SEC 要求声明

def fetch_latest_10q(ticker: str, cik: str, save_dir: Path) -> Path | None:
    """从 EDGAR 拉最近一份 10-Q,dedupe by content hash"""
    url = f"{EDGAR_BASE}/submissions/CIK{cik.zfill(10)}.json"
    r = requests.get(url, headers=HEADERS, timeout=30)
    r.raise_for_status()
    data = r.json()

    recent = data["filings"]["recent"]
    for form, acc, primary in zip(recent["form"], recent["accessionNumber"], recent["primaryDocument"]):
        if form in ("10-Q", "10-K"):
            acc_clean = acc.replace("-", "")
            doc_url = f"{EDGAR_BASE}/Archives/edgar/data/{int(cik)}/{acc_clean}/{primary}"
            content = requests.get(doc_url, headers=HEADERS, timeout=60).content

            content_hash = hashlib.md5(content).hexdigest()
            out_path = save_dir / f"{ticker}_{form}_{acc_clean}.html"
            if out_path.exists():
                return out_path  # already have

            out_path.write_bytes(content)
            return out_path
    return None

  • SEC 要求 User-Agent 必须包含真实邮箱,否则 403
  • 限速 10 req/sec,多线程会被 ban → 单线程 + sleep(0.15)
  • 10-K 远比 10-Q 长(200 页 vs 60 页),LLM 成本翻 3 倍,预算时分开统计

4.3 Layer 3: pdf_to_text.py(实际是 html→text,因为 EDGAR 主要给 HTML)

# pipeline/parse.py
from bs4 import BeautifulSoup
import re

SECTION_PATTERNS = {
    "mdna": r"management.{0,5}discussion\s+and\s+analysis",
    "risk_factors": r"risk\s+factors",
    "outlook": r"(outlook|forward.{0,3}looking|guidance)",
}

def parse_10q_html(path: Path) -> dict[str, str]:
    soup = BeautifulSoup(path.read_text(encoding="utf-8", errors="ignore"), "lxml")
    text = soup.get_text("\n", strip=True)
    # 按大写标题切段
    sections = {}
    chunks = re.split(r"\n(?=(?:ITEM\s+\d|PART\s+[IVX]))", text)
    for chunk in chunks:
        head = chunk[:200].lower()
        for name, pat in SECTION_PATTERNS.items():
            if re.search(pat, head):
                # 取前 8000 tokens 等效(约 32000 字符),LLM context 友好
                sections[name] = chunk[:32000]
                break
    return sections

(这块占了我半天调试时间):

  • 10-Q 格式各家差异巨大:苹果用 ITEM 编号,特斯拉直接用大标题,伯克希尔 PDF 扫描件根本没结构
  • 经验数字:SP500 里 ~80% 能稳定切到 MD&A,剩下 20% fallback 到「全文丢给 LLM 让它自己找」
  • 不要追求 100% perfect parse,留个 fallback 路径

4.4 Layer 4: claude_extract.py

# pipeline/llm_extract.py
import anthropic, json
from pydantic import BaseModel, ValidationError
from tenacity import retry, stop_after_attempt, wait_exponential

class ExtractedFeatures(BaseModel):
    guidance_change: float  # -1 to 1
    new_risks_count: int
    mgmt_tone: float        # -1 to 1
    capex_direction: str    # "up" / "flat" / "down"
    margin_commentary: str
    confidence: float       # LLM 自评 0-1

SYSTEM_PROMPT = """You are a senior equity analyst with 20 years of buy-side experience.
Extract structured features from earnings MD&A. Output ONLY valid JSON matching schema:
{
  "guidance_change": float in [-1, 1],
  ...
}

[5 few-shot examples here, omitted for brevity]
"""

client = anthropic.Anthropic()

@retry(stop=stop_after_attempt(3), wait=wait_exponential(min=2, max=30))
def extract_features(ticker: str, mdna: str, report_date: str) -> ExtractedFeatures:
    resp = client.messages.create(
        model="claude-opus-4-7",
        max_tokens=1500,
        system=[{"type": "text", "text": SYSTEM_PROMPT,
                 "cache_control": {"type": "ephemeral"}}],
        messages=[{"role": "user",
                   "content": f"Ticker: {ticker}\nDate: {report_date}\n\nMD&A:\n{mdna}"}]
    )
    raw = resp.content[0].text.strip()
    # 偶尔 LLM 会包 markdown code fence
    if raw.startswith("```"):
        raw = raw.split("```")[1].lstrip("json").strip()
    try:
        return ExtractedFeatures(**json.loads(raw))
    except (json.JSONDecodeError, ValidationError) as e:
        # 记到 dead letter queue,触发 retry
        raise ValueError(f"Bad LLM output: {e}\nRaw: {raw[:500]}")

关键工程点

  • @retry 装饰器自动 3 次重试(指数 backoff)
  • pydantic schema 校验 = production-grade 的"合同"
  • LLM 偶尔输出 markdown 包裹 JSON,需要剥壳——这个 case 占失败的 ~30%

4.5 Layer 5: feature_store.py(schema 见 §五)

# pipeline/store.py
import sqlite3
from datetime import datetime

def upsert_features(ticker: str, report_date: str, features: ExtractedFeatures):
    conn = sqlite3.connect("data/feature_store.db")
    conn.execute("""
        INSERT OR REPLACE INTO extracted_features
        (ticker, report_date, guidance_change, new_risks_count,
         mgmt_tone, capex_direction, margin_commentary, confidence, extracted_at)
        VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
    """, (ticker, report_date, features.guidance_change, features.new_risks_count,
          features.mgmt_tone, features.capex_direction, features.margin_commentary,
          features.confidence, datetime.utcnow().isoformat()))
    conn.commit()

Prod 上 SQLite 不够用(并发写、大 backfill),切到 DuckDB + Parquet partitioned by report_date 的年月。dev 用 SQLite 够用。

4.6 Layer 6: monitor.py

# pipeline/monitor.py
def daily_health_check():
    conn = sqlite3.connect("data/feature_store.db")
    yesterday = (datetime.utcnow() - timedelta(days=1)).date()

    # 1. 失败率
    failed = conn.execute("SELECT COUNT(*) FROM extract_failures WHERE date(failed_at)=?",
                          (yesterday,)).fetchone()[0]
    success = conn.execute("SELECT COUNT(*) FROM extracted_features WHERE date(extracted_at)=?",
                           (yesterday,)).fetchone()[0]
    rate = failed / (failed + success + 1)
    if rate > 0.05:
        slack_alert(f"[Pipeline] Failure rate {rate:.1%} exceeds 5%")

    # 2. Schema 一致性(pydantic 通过率)—— failed 里区分 schema 错和 LLM error
    # 3. 异常 sentiment review queue
    extreme = conn.execute("""
        SELECT ticker, mgmt_tone FROM extracted_features
        WHERE date(extracted_at)=? AND (mgmt_tone > 0.9 OR mgmt_tone < -0.9)
    """, (yesterday,)).fetchall()
    for ticker, tone in extreme:
        push_to_review_queue(ticker, tone)

4.7 编排:pipeline.py

# pipeline.py — 一键编排
from earnings_calendar import fetch_week_calendar
from edgar_fetcher import fetch_latest_10q
from parse import parse_10q_html
from llm_extract import extract_features
from store import upsert_features
import logging

logging.basicConfig(level=logging.INFO)
log = logging.getLogger("pipeline")

def run_pipeline():
    week_events = fetch_week_calendar(datetime.utcnow())
    log.info(f"Week has {len(week_events)} earnings events")

    for _, ev in week_events.iterrows():
        ticker, cik, rdate = ev["ticker"], ev["cik"], ev["report_date"]
        try:
            filing = fetch_latest_10q(ticker, cik, save_dir=Path("raw_filings"))
            if filing is None:
                log.warning(f"[{ticker}] no filing yet, will retry next run")
                continue
            sections = parse_10q_html(filing)
            mdna = sections.get("mdna")
            if not mdna:
                log.warning(f"[{ticker}] no MD&A found, fallback to full text")
                mdna = sections.get("full_text", "")[:32000]
            features = extract_features(ticker, mdna, rdate)
            upsert_features(ticker, rdate, features)
            log.info(f"[{ticker}] OK — tone={features.mgmt_tone:.2f}")
        except Exception as e:
            log.exception(f"[{ticker}] pipeline failed: {e}")
            push_to_dlq(ticker, rdate, str(e))

if __name__ == "__main__":
    run_pipeline()

调度(cron 写法):

# 每周一 09:00 UTC 抓本周日历
0 9 * * 1   /usr/bin/python /opt/tr/pipeline/earnings_calendar.py

# 周二到周五 22:30 UTC(美股盘后)跑全流水线
30 22 * * 2-5  /usr/bin/python /opt/tr/pipeline.py

# 每天 01:00 UTC 跑 health check
0 1 * * *   /usr/bin/python /opt/tr/pipeline/monitor.py

五、数据 Schema

5.1 earnings_event

字段类型说明
tickerTEXT PKNVDA
report_dateDATE PK2026-05-22(发布日)
period_endDATE2026-04-30(财季截止日)
est_epsREAL分析师一致预期
actual_epsREALnullable,公布后填
sueREAL(actual - est) / std_dev,公布后填
eps_surprise_pctREAL(actual - est) / est
cikTEXTSEC CIK,关联 EDGAR

5.2 extracted_features 表 ★ AI 产出的核心

字段类型说明
tickerTEXT PK
report_dateDATE PK
guidance_changeREAL-1 (下调) ~ +1 (上调)
new_risks_countINTMD&A 里新出现的风险项
mgmt_toneREAL-1 (悲观) ~ +1 (乐观)
capex_directionTEXTup / flat / down
margin_commentaryTEXTLLM 一句话总结
confidenceREALLLM 自评 0~1
extracted_atTIMESTAMP跑出来的时间
llm_modelTEXT模型版本 e.g. claude-opus-4-7
cache_hitBOOLEAN是否命中 cache(用于成本归因)

5.3 signals 表(下游消费)

字段类型说明
tickerTEXT PK
signal_dateDATE PK通常 = report_date + 1 trading day
factor_nameTEXT PK"earnings_sentiment_v1"
factor_valueREAL-1 ~ +1
rank_in_universeINT1 = best, 500 = worst

多张表用 (ticker, date) 复合主键的好处:replay 整层不会产生重复,pydantic 校验失败时 INSERT OR REPLACE 直接覆盖,符合"幂等"原则。这套表结构是 Day 50 XGBoost 的训练集来源。

5.4 extract_failures / review_queue(dead letter & 人工抽检)

CREATE TABLE extract_failures (
    ticker TEXT, report_date DATE, error_type TEXT,
    raw_response TEXT, failed_at TIMESTAMP, retry_count INT
);

CREATE TABLE review_queue (
    ticker TEXT, report_date DATE, reason TEXT,  -- 'extreme_tone' | 'low_confidence'
    feature_snapshot JSON, queued_at TIMESTAMP, reviewed BOOLEAN DEFAULT 0
);

六、Cost 监控

6.1 预算表(基于 claude-opus-4-7 当前定价)

范围单次 input单次 output季度调用数Cache hit季度成本
SP500 10-Q~5800 tok~600 tok50080%~$15
SP500 10-K~12000 tok~800 tok500 (年报季)70%~$45
SP400 加入+400 调用40075%+$12
SP600 加入+600 调用60070%+$18
全 SP1500 季报~150075%~$45 / quarter
加重跑 / debug+30%+$15
稳态月度预算~$20/month

6.2 实际监控 dashboard 字段

# 每天写一行
{
  "date": "2026-06-25",
  "calls_count": 38,
  "cache_write_tokens": 5200,
  "cache_read_tokens": 192000,   # 命中 = 划算
  "fresh_input_tokens": 30400,
  "output_tokens": 22800,
  "input_cost": 0.61,            # USD
  "output_cost": 0.34,
  "total_cost": 0.95,
  "mom_cumulative": 18.42        # 月累计
}

留个 budget alert:月累计 > $30 自动 Slack 告警(防 bug 死循环烧钱)。我个人量化预算里专门留 $50/月给 LLM API,跟数据订阅、IBKR 费用一起记在 ops_cost.


七、本周开始的 SP500 Backfill

7.1 Backfill 任务定义

  • 范围:SP500 当前成分股
  • 历史深度:过去 8 个季度(约 2 年),即 4000 份 10-Q + 500 份 10-K
  • 目的:作为 Day 50 XGBoost 训练集(feature → 未来 5/20 日 alpha)
  • 预算:~$120 一次性(按 4500 调用 × 70% cache hit 估)

7.2 Backfill 跟 live pipeline 的区别

维度Live (每周)Backfill (一次性)
并发1 (cache 友好)1(同样 cache 友好)
优先级高(财报次日就要)低(夜里跑都行)
错误容忍单 ticker 失败影响小总体失败率 > 10% 要排查
时间窗口约 1 小时/周约 30 小时一次跑完
监控Slack 实时日报形式

7.3 Backfill 脚本(伪代码)

# scripts/backfill_sp500.py
from concurrent.futures import ThreadPoolExecutor

def backfill(ticker, lookback_quarters=8):
    filings = fetch_historical_10qk(ticker, n=lookback_quarters + 2)
    for f in filings:
        if already_extracted(ticker, f.report_date):
            continue
        process_one_filing(ticker, f)

# 单线程跑 cache 命中率最高
for ticker in SP500_TICKERS:
    backfill(ticker)

跑完后产出 ~4500 行 extracted_features 表,这就是 Day 50 训练集的底层。


八、Pipeline 常见坑(亲身踩过)

8.1 PDF/HTML parse 差异

不同公司 10-Q 格式差异是这个 pipeline 最大的脆弱点:

  • 苹果 / 微软 / Google:HTML 结构规范,正则切段稳定
  • 特斯拉:大量自定义图片标题、表格内嵌文本,parse 后顺序乱
  • 伯克希尔:直接上传扫描 PDF,需要 OCR
  • 金融股:MD&A 极长(信用风险、利率敏感性),单段 > 32K tokens 会触发上下文截断

对策

  1. 主路径:HTML + 正则切段
  2. Fallback 1:找不到 MD&A 就把全文前 32K tokens 丢给 LLM 让它 "identify the MD&A section first, then extract"
  3. Fallback 2:OCR(用 unstructured 的 hi_res 模式或 Mathpix),单价 $0.02/page,慎用

8.2 LLM 偶尔输出非 JSON

实测占失败的 ~3%。常见模式:

  • 包了 ```json ... ``` markdown
  • 在 JSON 前面加了一段"Here is my analysis:"
  • 字段类型错(应该是 float 给了 "0.5" 字符串)

对策

  • prompt 里写 "Output ONLY valid JSON, no markdown, no preamble"
  • 解析时先剥 code fence
  • pydantic ValidationError 进 retry,最多 3 次
  • 3 次后进 DLQ 等人工 review(实际占 <1%)

8.3 财报推迟 / 节假日

  • 财报可以延期:通常公司会发 8-K 公告
  • Calendar API 不一定及时更新 → pipeline 跑了发现没新 filing 是常态
  • 状态机设计:earnings_event 加一个 status 字段(pending / fetched / extracted / failed),每次跑只处理 pending 的,重跑幂等

8.4 Rate limit

Anthropic API 有:

  • 50 req/min(Tier 1 起步)
  • 40K input tokens/min

backfill 跑 SP500 时如果不限速,前 10 分钟会触发 429。对策

  • 单线程跑(30s/调用,自然限速 = 2 req/min,远低于上限)
  • tenacity 装饰器在 429 上指数 backoff

8.5 cache eviction 比想的早

实测:在 5 分钟 TTL 内,如果中间有 1-2 分钟没调,cache 也会被 evict。对策:backfill 时不要让 worker idle,准备好下一个任务 immediate 发送。


九、PM 视角:把"Demo"变成"系统"的迁移性思考

  1. 数据工程 ≠ 跑通脚本。Pipeline 的产品质量定义是「能可靠 run 1 年不需要我每天看」。这条标准比"准确率高"更难达到——我做核心银行系统时学到,对账系统准确率 99% 比 95% 难 10 倍,最后 4% 全是脏数据 / 边角 case。LLM pipeline 一样。

  2. 分层 + 每层有 schema 输出:这是 ETL 的黄金原则。Day 46 demo 是个 monolith python 脚本,今天的 pipeline 是 6 个解耦组件——好处是 fetcher 挂了不影响已 parse 的数据,明天我能单独换 parser 而不动 LLM 逻辑。Conway 定律的反向利用:你想让代码模块化,先把责任拆开。

  3. Cache 经济学是真金白银。从 $14 优化到 $3 看起来小钱,但 scale 到 SP1500 就是 $45 vs $200 + 边际机会成本(省下的预算可以多跑一倍历史回测)。PM 的工程直觉里要有这种"小处省钱、大处花钱"的清醒——LLM cost 在 demo 阶段不值得优化,在 production 阶段 cache 不开就是失职。

  4. 可观测性 > 准确率。我宁愿要一个 sentiment 准确率 75% 但每天告诉我"哪些样本不确定"的 pipeline,也不要一个号称 90% 准确但没监控的黑盒。因为 75% 那个我能用,黑盒那个我不敢用——这是 10 年金融业的教训:所有不能被审计的数字都是潜在的风险

  5. Backfill 是"训练集生产":很多人忽略,把"现在跑通 pipeline"和"用 pipeline 倒着跑历史攒训练集"看作两件事。其实是一件事:同一套代码 live 时拿 incremental,backfill 时拿 historical。这种"同代码两模式"是数据系统设计的高级标准

  6. 失败要可解释、可重试、可降级:DLQ、retry、fallback 三件套,对应了 production 系统的「三种应对未知的能力」。在金融领域这套思路救过我无数次——10 年支付经验里,从来不存在"100% 成功的对账",存在的是"失败能被发现并恢复的对账系统"。


十、明日预告

Day 48: IV crush 与财报隐含波动率交易 — 期权视角下的财报赌局

  • Implied Volatility 在财报前后的典型 pattern
  • 为什么财报后 IV 暴跌("crush")几乎是确定的,而方向是不确定的
  • Long straddle / strangle 在财报前买入的胜率回测
  • 卖方策略:iron condor / ratio spread 在 IV crush 中如何获利
  • 把 Day 47 提取的 mgmt_tone 信号叠加到 IV 策略上的 alpha 来源
  • 实操:拉一份 NVDA 历史 IV 曲线 + 找一个本周财报 ticker 实盘 paper 试

实际执行记录

启动一项填一项,时间戳 + 卡点。

  • [hh:mm] sp500_constituents.csv 准备(Wikipedia 拉一次)— ...
  • [hh:mm] Layer 1 earnings_calendar.py 跑通 — ...
  • [hh:mm] Layer 2 EDGAR fetcher 跑通一个 ticker — ...
  • [hh:mm] Layer 3 HTML parser 切到 MD&A 段 — ...
  • [hh:mm] Layer 4 LLM extract + cache 验证 — cache_read_input_tokens > 0 才算成功
  • [hh:mm] Layer 5 SQLite schema 建表 + 第一行入库 — ...
  • [hh:mm] Layer 6 monitor.py 跑通 + Slack webhook 配置 — ...
  • [hh:mm] pipeline.py 串行编排端到端 1 个 ticker 成功 — ...
  • [hh:mm] backfill_sp500.py 后台启动(预计 30h)— ...
  • 卡点 / 学到的:
  • 本日 LLM cost:__USD(input/cache_read/output 拆开记录)

总字数:约 6,800 字 今日完成度:架构 ✓ / 代码骨架 ✓ / 实操(你自己执行 backfill)/ 笔记 ✓