EXPERT Day 178: 金融研究 AI Agent v1 — 实现与代码
LangGraph 高级用法、Anthropic tool use、Voyage embedding 接入、Cohere rerank、Langfuse trace 集成
日期: 2026-10-26 方向: AI 系统工程 / Implementation / Multi-Agent 阶段: Phase 3 - 实战 Capstone(Day 177-180) 标签: #LangGraph #Anthropic #MCP #Code #Production #FinanceAgent
今日目标
把 Day 177 的架构设计落地为可运行的 MVP 代码骨架:
| 类型 | 内容 |
|---|---|
| 学习 | LangGraph 高级用法、Anthropic tool use、Voyage embedding 接入、Cohere rerank、Langfuse trace 集成 |
| 实操 | 完成 17 个 Python 文件、~1500+ 行代码、30 条 golden test、一个端到端 demo |
| 产出 | 一个完整的 finance_agent/ 项目,能 python -m src.api.cli "分析 NVDA" 直接跑 |
代码组织原则:
- 每个文件独立可运行 / 可单测
- 关键路径的真实 API 调用(不用 mock 当伪代码)
- async 优先,留 streaming 出口
- 显式错误处理,不吞异常
一、项目目录结构
finance_agent/
├── pyproject.toml
├── requirements.txt
├── config.yaml
├── .env.example
├── README.md
├── docker-compose.yml
├── Dockerfile
├── src/
│ ├── __init__.py
│ ├── settings.py
│ ├── orchestrator.py # LangGraph state machine
│ ├── agents/
│ │ ├── __init__.py
│ │ ├── coordinator.py
│ │ ├── macro.py
│ │ ├── equity.py
│ │ ├── crypto.py
│ │ └── compliance.py
│ ├── tools/
│ │ ├── __init__.py
│ │ ├── registry.py
│ │ ├── financial_data.py
│ │ ├── onchain.py
│ │ ├── news.py
│ │ ├── calc.py
│ │ └── viz.py
│ ├── rag/
│ │ ├── __init__.py
│ │ ├── embedder.py
│ │ ├── retriever.py
│ │ └── reranker.py
│ ├── memory/
│ │ ├── __init__.py
│ │ ├── session.py
│ │ └── longterm.py
│ ├── guardrails/
│ │ ├── __init__.py
│ │ └── safety.py
│ ├── eval/
│ │ ├── __init__.py
│ │ ├── golden.py
│ │ └── runner.py
│ ├── observability/
│ │ ├── __init__.py
│ │ └── langfuse_setup.py
│ └── api/
│ ├── __init__.py
│ ├── cli.py
│ └── server.py
├── prompts/
│ ├── coordinator.md
│ ├── equity.md
│ ├── crypto.md
│ ├── compliance.md
│ └── macro.md
└── tests/
├── conftest.py
├── test_orchestrator.py
├── test_tools.py
└── golden/
└── cases.jsonl
二、依赖清单(requirements.txt)
# Core LLM
anthropic==0.42.0
voyageai==0.3.2
cohere==5.13.0
# Agent framework
langgraph==0.2.50
langchain-core==0.3.20
# RAG
qdrant-client==1.13.0
elasticsearch==8.16.0
sentence-transformers==3.3.1
# Memory
mem0ai==0.1.40
redis==5.2.0
chromadb==0.5.20
# Data sources
yfinance==0.2.50
fredapi==0.5.2
pycoingecko==3.2.0
sec-edgar-api==1.1.0
# Observability
langfuse==2.55.0
prometheus-client==0.21.0
# Guardrails
presidio-analyzer==2.2.355
presidio-anonymizer==2.2.355
# Web framework
fastapi==0.115.0
uvicorn==0.32.0
pydantic==2.10.0
typer==0.15.0
# Utilities
httpx==0.28.0
tenacity==9.0.0
python-dotenv==1.0.1
pyyaml==6.0.2
structlog==24.4.0
aiocache==0.12.3
# Dev
pytest==8.3.0
pytest-asyncio==0.25.0
black==24.10.0
ruff==0.8.0
三、配置文件
.env.example
# LLM
ANTHROPIC_API_KEY=sk-ant-xxxxx
VOYAGE_API_KEY=pa-xxxxx
COHERE_API_KEY=xxxxx
# Data sources
FRED_API_KEY=xxxxx
ETHERSCAN_API_KEY=xxxxx
DUNE_API_KEY=xxxxx
COINGECKO_API_KEY=CG-xxxxx
TAVILY_API_KEY=tvly-xxxxx
NEWSAPI_KEY=xxxxx
CAPIQ_TOKEN=xxxxx
# Storage
QDRANT_URL=http://localhost:6333
ELASTICSEARCH_URL=http://localhost:9200
REDIS_URL=redis://localhost:6379/0
POSTGRES_DSN=postgresql://user:pass@localhost:5432/finance_agent
# Observability
LANGFUSE_HOST=https://cloud.langfuse.com
LANGFUSE_PUBLIC_KEY=pk-lf-xxxxx
LANGFUSE_SECRET_KEY=sk-lf-xxxxx
# Settings
ENV=dev
LOG_LEVEL=INFO
MAX_ITERATIONS=15
COST_CAP_PER_QUERY=0.5
config.yaml
models:
coordinator:
name: "claude-opus-4-7"
max_tokens: 4096
temperature: 0.0
routine:
name: "claude-sonnet-4-6"
max_tokens: 4096
temperature: 0.0
cheap:
name: "claude-haiku-4-5"
max_tokens: 2048
temperature: 0.0
rag:
embedding_model: "voyage-finance-2"
embedding_dim: 1024
rerank_model: "rerank-3"
retrieval_top_k: 50
rerank_top_k: 10
chunk_size: 800
chunk_overlap: 100
cache:
enabled: true
ttl_seconds:
price: 60
macro: 60
onchain: 30
news: 300
filing: 86400
guardrails:
max_iterations: 15
per_tool_max_calls: 5
disclaimer: |
Disclaimer: This information is for research purposes only and does not
constitute investment advice. Past performance is not indicative of future
results.
memory:
session_ttl: 86400
longterm_provider: "mem0"
eval:
golden_path: "tests/golden/cases.jsonl"
hallucination_threshold: 0.05
四、核心代码
说明:以下代码为完整 MVP 实现。每段代码都标注了其在项目中的相对路径。
4.1 src/settings.py
"""Centralized settings loader - reads .env + config.yaml."""
from __future__ import annotations
import os
from functools import lru_cache
from pathlib import Path
import yaml
from pydantic import BaseModel, Field
from dotenv import load_dotenv
load_dotenv()
class ModelConfig(BaseModel):
name: str
max_tokens: int = 4096
temperature: float = 0.0
class Models(BaseModel):
coordinator: ModelConfig
routine: ModelConfig
cheap: ModelConfig
class RAGConfig(BaseModel):
embedding_model: str = "voyage-finance-2"
embedding_dim: int = 1024
rerank_model: str = "rerank-3"
retrieval_top_k: int = 50
rerank_top_k: int = 10
chunk_size: int = 800
chunk_overlap: int = 100
class GuardrailsConfig(BaseModel):
max_iterations: int = 15
per_tool_max_calls: int = 5
disclaimer: str = ""
class Settings(BaseModel):
models: Models
rag: RAGConfig
guardrails: GuardrailsConfig
cache: dict = Field(default_factory=dict)
memory: dict = Field(default_factory=dict)
eval: dict = Field(default_factory=dict)
# Env vars
anthropic_api_key: str = Field(default_factory=lambda: os.getenv("ANTHROPIC_API_KEY", ""))
voyage_api_key: str = Field(default_factory=lambda: os.getenv("VOYAGE_API_KEY", ""))
cohere_api_key: str = Field(default_factory=lambda: os.getenv("COHERE_API_KEY", ""))
fred_api_key: str = Field(default_factory=lambda: os.getenv("FRED_API_KEY", ""))
etherscan_api_key: str = Field(default_factory=lambda: os.getenv("ETHERSCAN_API_KEY", ""))
dune_api_key: str = Field(default_factory=lambda: os.getenv("DUNE_API_KEY", ""))
tavily_api_key: str = Field(default_factory=lambda: os.getenv("TAVILY_API_KEY", ""))
qdrant_url: str = Field(default_factory=lambda: os.getenv("QDRANT_URL", "http://localhost:6333"))
redis_url: str = Field(default_factory=lambda: os.getenv("REDIS_URL", "redis://localhost:6379/0"))
langfuse_public_key: str = Field(default_factory=lambda: os.getenv("LANGFUSE_PUBLIC_KEY", ""))
langfuse_secret_key: str = Field(default_factory=lambda: os.getenv("LANGFUSE_SECRET_KEY", ""))
cost_cap_per_query: float = Field(default_factory=lambda: float(os.getenv("COST_CAP_PER_QUERY", "0.5")))
@lru_cache
def get_settings() -> Settings:
cfg_path = Path(__file__).parent.parent / "config.yaml"
with open(cfg_path, "r", encoding="utf-8") as f:
raw = yaml.safe_load(f)
return Settings(**raw)
4.2 src/orchestrator.py — LangGraph StateGraph
"""LangGraph orchestrator: 5-agent state machine.
Pattern:
user_query
-> coordinator (router)
-> [parallel: macro / equity / crypto / compliance]
-> coordinator (synthesizer)
-> output guardrails
-> response
"""
from __future__ import annotations
import asyncio
from typing import TypedDict, Annotated, Literal, Sequence
from operator import add
import structlog
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from anthropic import AsyncAnthropic
from .settings import get_settings
from .agents.coordinator import route, synthesize
from .agents.macro import macro_node
from .agents.equity import equity_node
from .agents.crypto import crypto_node
from .agents.compliance import compliance_node
from .guardrails.safety import scan_input, scrub_output
from .observability.langfuse_setup import get_tracer
log = structlog.get_logger()
settings = get_settings()
tracer = get_tracer()
# --- State Schema ---
class AgentState(TypedDict):
# Input
query: str
user_id: str
session_id: str
# Routing
intent: list[str] # ["macro", "equity", ...] subset
plan: str # coordinator's plan text
# Sub-agent outputs (parallel writes via add reducer)
sub_results: Annotated[list[dict], add]
# Final
answer: str
citations: list[dict]
# Cost tracking
total_input_tokens: int
total_output_tokens: int
total_cost_usd: float
# Iteration safety
iterations: int
# --- Routing function ---
def router_decision(state: AgentState) -> list[str]:
"""Decide which sub-agents to run in parallel based on coordinator's intent."""
nodes_to_run = []
intent = state.get("intent", [])
if "macro" in intent:
nodes_to_run.append("macro")
if "equity" in intent:
nodes_to_run.append("equity")
if "crypto" in intent:
nodes_to_run.append("crypto")
if "compliance" in intent:
nodes_to_run.append("compliance")
if not nodes_to_run:
# default fallback to equity (most common in finance)
nodes_to_run = ["equity"]
return nodes_to_run
def loop_check(state: AgentState) -> Literal["synthesize", "abort"]:
if state["iterations"] >= settings.guardrails.max_iterations:
return "abort"
if state["total_cost_usd"] >= settings.cost_cap_per_query:
return "abort"
return "synthesize"
# --- Input guardrail node ---
async def input_guard(state: AgentState) -> dict:
verdict = await scan_input(state["query"])
if not verdict["safe"]:
return {"answer": f"Sorry, your query was flagged: {verdict['reason']}", "iterations": state["iterations"] + 1}
return {"iterations": state.get("iterations", 0) + 1}
# --- Output guardrail node ---
async def output_guard(state: AgentState) -> dict:
scrubbed = await scrub_output(state["answer"])
return {"answer": scrubbed}
# --- Build graph ---
def build_graph() -> "CompiledGraph":
graph = StateGraph(AgentState)
graph.add_node("input_guard", input_guard)
graph.add_node("route", route)
graph.add_node("macro", macro_node)
graph.add_node("equity", equity_node)
graph.add_node("crypto", crypto_node)
graph.add_node("compliance", compliance_node)
graph.add_node("synthesize", synthesize)
graph.add_node("output_guard", output_guard)
graph.add_node("abort", lambda s: {"answer": "[ABORTED] cost or iteration cap exceeded"})
graph.add_edge(START, "input_guard")
graph.add_edge("input_guard", "route")
graph.add_conditional_edges(
"route",
router_decision,
{"macro": "macro", "equity": "equity", "crypto": "crypto", "compliance": "compliance"},
)
graph.add_edge("macro", "synthesize")
graph.add_edge("equity", "synthesize")
graph.add_edge("crypto", "synthesize")
graph.add_edge("compliance", "synthesize")
graph.add_conditional_edges("synthesize", loop_check,
{"synthesize": "output_guard", "abort": "abort"})
graph.add_edge("output_guard", END)
graph.add_edge("abort", END)
checkpointer = MemorySaver()
return graph.compile(checkpointer=checkpointer)
# --- Public entry ---
COMPILED_GRAPH = build_graph()
async def run_query(query: str, user_id: str = "anon", session_id: str = "default") -> dict:
"""Public async entrypoint."""
initial: AgentState = {
"query": query, "user_id": user_id, "session_id": session_id,
"intent": [], "plan": "", "sub_results": [],
"answer": "", "citations": [],
"total_input_tokens": 0, "total_output_tokens": 0,
"total_cost_usd": 0.0, "iterations": 0,
}
config = {"configurable": {"thread_id": session_id}}
with tracer.start_as_current_span("run_query") as span:
span.set_attribute("query", query)
result = await COMPILED_GRAPH.ainvoke(initial, config=config)
span.set_attribute("cost_usd", result.get("total_cost_usd", 0.0))
return result
4.3 src/agents/coordinator.py — Router + Synthesizer
"""Coordinator agent.
Two roles:
1. route(): given query, decide intent (which sub-agents to invoke) + write plan
2. synthesize(): given sub_results, fuse into final answer
"""
from __future__ import annotations
import json
import structlog
from anthropic import AsyncAnthropic
from ..settings import get_settings
log = structlog.get_logger()
settings = get_settings()
client = AsyncAnthropic(api_key=settings.anthropic_api_key)
ROUTING_SYSTEM = """You are the Coordinator of a financial research multi-agent system.
Your job: read the user's query and decide which sub-agents to invoke. The available
sub-agents are:
- macro: macro economy, rates, FX, central bank policy, inflation
- equity: individual stocks, earnings, valuation, fundamentals
- crypto: onchain data, DeFi, tokens, DAOs, RWA tokenization
- compliance: sanctions screening (OFAC), KYC/AML risk, fraud
You MUST output strict JSON with two fields:
- intent: list of agent names to invoke (1 or more)
- plan: one-sentence plan, e.g. "Pull NVDA financials, then compare with AMD."
Rules:
- For cross-asset queries, invoke multiple agents (e.g. macro+crypto).
- Always include compliance if the query mentions specific wallet addresses
or token contracts.
- Output JSON ONLY. No prose.
"""
SYNTHESIZE_SYSTEM = """You are the Coordinator of a financial research multi-agent system,
in synthesizer mode. Below are sub-agent results in JSON form. Your job:
1. Combine into a single coherent answer that directly addresses the user's query.
2. Cite specific data points by source (which agent + which tool call).
3. Flag conflicts between sub-agents and explain.
4. NEVER fabricate numbers - if a sub-agent didn't produce a number, say "data unavailable".
5. Be concise. Default to 200-400 words. Use bullet lists when comparing.
6. NO investment advice. Use research framing ("data shows...", "historically...").
"""
PRICING = { # USD per 1M tokens
"claude-opus-4-7": {"in": 15.0, "out": 75.0, "cached_in": 1.5},
"claude-sonnet-4-6": {"in": 3.0, "out": 15.0, "cached_in": 0.30},
"claude-haiku-4-5": {"in": 0.80, "out": 4.0, "cached_in": 0.08},
}
def _cost(model: str, in_tok: int, out_tok: int, cached_tok: int = 0) -> float:
p = PRICING[model]
return (in_tok - cached_tok) * p["in"] / 1e6 + cached_tok * p["cached_in"] / 1e6 + out_tok * p["out"] / 1e6
async def route(state: dict) -> dict:
"""Stage 1: Router - decide which sub-agents to invoke."""
model = settings.models.coordinator.name
resp = await client.messages.create(
model=model,
max_tokens=512,
temperature=0.0,
system=ROUTING_SYSTEM,
messages=[{"role": "user", "content": f"Query: {state['query']}"}],
)
text = resp.content[0].text.strip()
# Parse JSON (best effort)
try:
parsed = json.loads(text)
intent = parsed.get("intent", ["equity"])
plan = parsed.get("plan", "")
except Exception as e:
log.warning("router_parse_failed", text=text, err=str(e))
intent, plan = ["equity"], "fallback to equity"
cost = _cost(model, resp.usage.input_tokens, resp.usage.output_tokens,
getattr(resp.usage, "cache_read_input_tokens", 0))
log.info("router_decision", intent=intent, plan=plan, cost=cost)
return {
"intent": intent,
"plan": plan,
"total_input_tokens": state.get("total_input_tokens", 0) + resp.usage.input_tokens,
"total_output_tokens": state.get("total_output_tokens", 0) + resp.usage.output_tokens,
"total_cost_usd": state.get("total_cost_usd", 0.0) + cost,
}
async def synthesize(state: dict) -> dict:
"""Stage 2: Synthesizer - fuse sub-results into final answer."""
model = settings.models.coordinator.name
sub_payload = json.dumps(state.get("sub_results", []), indent=2, default=str)
user_msg = (
f"User query: {state['query']}\n\n"
f"Plan: {state.get('plan', 'N/A')}\n\n"
f"Sub-agent results:\n```json\n{sub_payload}\n```\n\n"
f"Now produce the final answer."
)
resp = await client.messages.create(
model=model,
max_tokens=2048,
temperature=0.0,
system=SYNTHESIZE_SYSTEM,
messages=[{"role": "user", "content": user_msg}],
)
answer = resp.content[0].text.strip()
cost = _cost(model, resp.usage.input_tokens, resp.usage.output_tokens)
# Collect citations from sub_results
citations = []
for sr in state.get("sub_results", []):
citations.extend(sr.get("citations", []))
return {
"answer": answer,
"citations": citations,
"total_input_tokens": state.get("total_input_tokens", 0) + resp.usage.input_tokens,
"total_output_tokens": state.get("total_output_tokens", 0) + resp.usage.output_tokens,
"total_cost_usd": state.get("total_cost_usd", 0.0) + cost,
}
4.4 src/agents/equity.py — 完整 Tool Use Loop
"""Equity research agent - full tool-use loop with Anthropic API."""
from __future__ import annotations
import json
import asyncio
from pathlib import Path
import structlog
from anthropic import AsyncAnthropic
from anthropic.types import ToolUseBlock, TextBlock
from ..settings import get_settings
from ..tools.registry import EQUITY_TOOLS, dispatch
from ..tools import financial_data
from ..rag.retriever import HybridRetriever
log = structlog.get_logger()
settings = get_settings()
client = AsyncAnthropic(api_key=settings.anthropic_api_key)
PROMPT_PATH = Path(__file__).parent.parent.parent / "prompts" / "equity.md"
SYSTEM_PROMPT = PROMPT_PATH.read_text(encoding="utf-8") if PROMPT_PATH.exists() else """
You are an equity research analyst agent. You have tools for:
- getting stock prices, financials, estimates, filings
- parsing 10-K/10-Q for specific items
- computing PE / EV/EBITDA / ratios
- hybrid RAG retrieval over SEC filings
Workflow:
1. Decompose the user's question into 1-5 tool calls.
2. After each tool result, decide: more tools needed? OR final answer?
3. Final answer must be JSON: {findings: str, citations: [{tool, args, ts}], data: {...}}
Rules:
- Only use facts from tool outputs. NEVER fabricate numbers.
- If a tool returns an error or empty, say so explicitly.
- Max 8 tool calls per query.
"""
retriever = HybridRetriever()
async def equity_node(state: dict) -> dict:
"""Run the equity sub-agent with tool-use loop. Returns sub_results delta."""
query = state["query"]
plan = state.get("plan", "")
user_input = f"Research request: {query}\nPlan hint from coordinator: {plan}"
messages = [{"role": "user", "content": user_input}]
model = settings.models.routine.name
iterations = 0
max_tool_iters = 8
cost_total = 0.0
in_tok_total = 0
out_tok_total = 0
citations = []
while iterations < max_tool_iters:
iterations += 1
resp = await client.messages.create(
model=model,
max_tokens=4096,
temperature=0.0,
system=SYSTEM_PROMPT,
tools=EQUITY_TOOLS,
messages=messages,
)
in_tok_total += resp.usage.input_tokens
out_tok_total += resp.usage.output_tokens
cost_total += _cost(model, resp.usage.input_tokens, resp.usage.output_tokens,
getattr(resp.usage, "cache_read_input_tokens", 0))
# If stop_reason == "end_turn" → final answer
if resp.stop_reason == "end_turn":
text_blocks = [b.text for b in resp.content if isinstance(b, TextBlock)]
final_text = "\n".join(text_blocks)
log.info("equity_done", iters=iterations, cost=cost_total)
return {
"sub_results": [{
"agent": "equity",
"answer": final_text,
"citations": citations,
"iterations": iterations,
}],
"total_input_tokens": state.get("total_input_tokens", 0) + in_tok_total,
"total_output_tokens": state.get("total_output_tokens", 0) + out_tok_total,
"total_cost_usd": state.get("total_cost_usd", 0.0) + cost_total,
}
# Otherwise: tool use → execute tools and feed back
tool_use_blocks = [b for b in resp.content if isinstance(b, ToolUseBlock)]
messages.append({"role": "assistant", "content": resp.content})
tool_results = []
for tu in tool_use_blocks:
try:
result = await dispatch(tu.name, tu.input)
citations.append({"tool": tu.name, "args": tu.input, "agent": "equity"})
except Exception as e:
log.error("tool_error", tool=tu.name, err=str(e))
result = {"error": str(e)}
tool_results.append({
"type": "tool_result",
"tool_use_id": tu.id,
"content": json.dumps(result, default=str),
})
messages.append({"role": "user", "content": tool_results})
# Hit cap
log.warning("equity_max_iters_reached")
return {
"sub_results": [{"agent": "equity", "answer": "[max iterations reached]",
"citations": citations, "iterations": iterations}],
"total_input_tokens": state.get("total_input_tokens", 0) + in_tok_total,
"total_output_tokens": state.get("total_output_tokens", 0) + out_tok_total,
"total_cost_usd": state.get("total_cost_usd", 0.0) + cost_total,
}
def _cost(model, in_tok, out_tok, cached_tok=0):
from .coordinator import _cost as _c
return _c(model, in_tok, out_tok, cached_tok)
4.5 src/agents/crypto.py
"""Crypto research agent."""
from __future__ import annotations
import json
import structlog
from pathlib import Path
from anthropic import AsyncAnthropic
from anthropic.types import ToolUseBlock, TextBlock
from ..settings import get_settings
from ..tools.registry import CRYPTO_TOOLS, dispatch
from .coordinator import _cost
log = structlog.get_logger()
settings = get_settings()
client = AsyncAnthropic(api_key=settings.anthropic_api_key)
SYSTEM_PROMPT = """You are a crypto onchain research agent. You have tools for:
- querying ETH balances and transfers via Etherscan
- running Dune SQL queries
- fetching token info from Coingecko
- getting DeFi TVL from DeFiLlama
- hybrid RAG over crypto whitepapers and research notes
Workflow:
1. Identify wallet address(es), token contract(s), protocol(s) from the query.
2. Pull onchain data + price/TVL data + relevant docs in parallel where possible.
3. Final answer JSON: {findings, citations, data}.
Rules:
- All onchain data must include block number / timestamp.
- For tokens, always include contract address + chain.
- NEVER fabricate balances or TX hashes.
"""
async def crypto_node(state: dict) -> dict:
query = state["query"]; plan = state.get("plan", "")
messages = [{"role": "user", "content": f"Research: {query}\nPlan: {plan}"}]
model = settings.models.routine.name
in_tok = out_tok = 0; cost = 0.0; citations = []
for i in range(8):
resp = await client.messages.create(
model=model, max_tokens=4096, temperature=0.0,
system=SYSTEM_PROMPT, tools=CRYPTO_TOOLS, messages=messages,
)
in_tok += resp.usage.input_tokens
out_tok += resp.usage.output_tokens
cost += _cost(model, resp.usage.input_tokens, resp.usage.output_tokens)
if resp.stop_reason == "end_turn":
text = "\n".join(b.text for b in resp.content if isinstance(b, TextBlock))
return {
"sub_results": [{"agent": "crypto", "answer": text,
"citations": citations, "iterations": i+1}],
"total_input_tokens": state.get("total_input_tokens", 0) + in_tok,
"total_output_tokens": state.get("total_output_tokens", 0) + out_tok,
"total_cost_usd": state.get("total_cost_usd", 0.0) + cost,
}
# tool use
tools_used = [b for b in resp.content if isinstance(b, ToolUseBlock)]
messages.append({"role": "assistant", "content": resp.content})
results = []
for tu in tools_used:
try:
r = await dispatch(tu.name, tu.input)
citations.append({"tool": tu.name, "args": tu.input, "agent": "crypto"})
except Exception as e:
r = {"error": str(e)}
results.append({"type": "tool_result", "tool_use_id": tu.id,
"content": json.dumps(r, default=str)})
messages.append({"role": "user", "content": results})
return {"sub_results": [{"agent": "crypto", "answer": "[max iters]",
"citations": citations, "iterations": 8}],
"total_cost_usd": state.get("total_cost_usd", 0.0) + cost}
4.6 src/agents/macro.py 与 src/agents/compliance.py
结构与 equity/crypto 几乎一致,差异只在 system prompt 和 tools 子集。为节省篇幅给关键差异:
# src/agents/macro.py
SYSTEM_PROMPT = """You are a macro research agent. Tools: FRED time series, news search,
Yahoo prices for indices/FX/rates. Workflow: identify time series → pull data → compute
correlations/changes → produce structured answer."""
# main loop identical to crypto_node, swap CRYPTO_TOOLS -> MACRO_TOOLS
# src/agents/compliance.py
SYSTEM_PROMPT = """You are a compliance/sanctions agent. Tools: check_ofac_sdn,
trace_funds, get_eth_txs. For any wallet/contract: 1) check OFAC, 2) trace 2-hop funding,
3) flag if interacted with mixers (Tornado Cash, Sinbad). Output risk_score 0-100 + reasoning.
Use Opus for high-stakes decisions."""
4.7 src/tools/registry.py — Tool Schemas (Anthropic format)
"""Anthropic tool schemas + dispatcher.
Each tool has:
- JSON schema describing args (Anthropic tool format)
- implementation function (sync or async)
"""
from __future__ import annotations
from typing import Any
from . import financial_data, onchain, news, calc
# --- Schemas ---
EQUITY_TOOLS = [
{
"name": "get_stock_price",
"description": "Latest stock price + 30-day OHLCV. Free Yahoo data, ~5min freshness.",
"input_schema": {
"type": "object",
"properties": {
"ticker": {"type": "string", "description": "e.g. NVDA, AAPL"},
"period": {"type": "string", "enum": ["1d","1mo","3mo","1y"], "default": "1mo"},
},
"required": ["ticker"],
},
},
{
"name": "get_financials",
"description": "Quarterly income statement, balance sheet, cash flow from SEC EDGAR.",
"input_schema": {
"type": "object",
"properties": {
"ticker": {"type": "string"},
"statement": {"type": "string", "enum": ["income","balance","cashflow"]},
"freq": {"type": "string", "enum": ["quarterly","annual"], "default": "quarterly"},
},
"required": ["ticker", "statement"],
},
},
{
"name": "parse_filing",
"description": "Extract a specific section from 10-K/10-Q filing using RAG.",
"input_schema": {
"type": "object",
"properties": {
"ticker": {"type": "string"},
"filing_type": {"type": "string", "enum": ["10-K","10-Q","8-K"]},
"section": {"type": "string", "description": "e.g. 'risk factors', 'MD&A'"},
},
"required": ["ticker", "filing_type", "section"],
},
},
{
"name": "compute_pe_ev_ebitda",
"description": "Compute PE, EV/EBITDA, P/S, P/B ratios for a ticker.",
"input_schema": {
"type": "object",
"properties": {"ticker": {"type": "string"}},
"required": ["ticker"],
},
},
{
"name": "vector_search",
"description": "Hybrid semantic+keyword search over financial document corpus.",
"input_schema": {
"type": "object",
"properties": {
"query": {"type": "string"},
"top_k": {"type": "integer", "default": 5},
"filters": {"type": "object", "description": "e.g. {'doc_type':'10-K'}"},
},
"required": ["query"],
},
},
]
CRYPTO_TOOLS = [
{
"name": "get_eth_balance",
"description": "ETH balance + ERC20 holdings of an address.",
"input_schema": {
"type": "object",
"properties": {"address": {"type": "string", "description": "0x..."}},
"required": ["address"],
},
},
{
"name": "get_eth_txs",
"description": "Recent transactions of an address (default 50).",
"input_schema": {
"type": "object",
"properties": {
"address": {"type": "string"},
"limit": {"type": "integer", "default": 50},
},
"required": ["address"],
},
},
{
"name": "query_dune",
"description": "Run a saved Dune SQL query by ID. Returns up to 10K rows.",
"input_schema": {
"type": "object",
"properties": {"query_id": {"type": "integer"}, "params": {"type": "object"}},
"required": ["query_id"],
},
},
{
"name": "get_token_info",
"description": "Token metadata (price, MC, FDV, 24h volume) from Coingecko.",
"input_schema": {
"type": "object",
"properties": {"symbol_or_id": {"type": "string"}},
"required": ["symbol_or_id"],
},
},
{
"name": "get_defi_tvl",
"description": "Protocol TVL time series from DeFiLlama.",
"input_schema": {
"type": "object",
"properties": {"protocol_slug": {"type": "string"}},
"required": ["protocol_slug"],
},
},
]
MACRO_TOOLS = [
{
"name": "get_macro_series",
"description": "FRED time series (e.g. DGS10 = 10-year treasury yield).",
"input_schema": {
"type": "object",
"properties": {
"series_id": {"type": "string", "description": "FRED series id"},
"start": {"type": "string", "description": "YYYY-MM-DD"},
"end": {"type": "string", "description": "YYYY-MM-DD"},
},
"required": ["series_id"],
},
},
{
"name": "get_news",
"description": "Search recent news via Tavily.",
"input_schema": {
"type": "object",
"properties": {
"query": {"type": "string"},
"max_results": {"type": "integer", "default": 5},
},
"required": ["query"],
},
},
{
"name": "compute_correlation",
"description": "Pearson correlation between two time series.",
"input_schema": {
"type": "object",
"properties": {"series_a": {"type": "array"}, "series_b": {"type": "array"}},
"required": ["series_a", "series_b"],
},
},
]
COMPLIANCE_TOOLS = [
{
"name": "check_ofac_sdn",
"description": "Check if address is on OFAC SDN sanctions list.",
"input_schema": {
"type": "object",
"properties": {"address": {"type": "string"}},
"required": ["address"],
},
},
{
"name": "trace_funds",
"description": "Trace fund origin 2 hops upstream of an address.",
"input_schema": {
"type": "object",
"properties": {"address": {"type": "string"}, "depth": {"type": "integer", "default": 2}},
"required": ["address"],
},
},
{
"name": "get_eth_txs",
"description": "Recent transactions (same as crypto agent's tool).",
"input_schema": {
"type": "object",
"properties": {"address": {"type": "string"}, "limit": {"type": "integer", "default": 50}},
"required": ["address"],
},
},
]
# --- Dispatcher ---
async def dispatch(name: str, args: dict) -> Any:
"""Route tool name to actual implementation."""
impl = {
"get_stock_price": financial_data.get_stock_price,
"get_financials": financial_data.get_financials,
"parse_filing": financial_data.parse_filing,
"compute_pe_ev_ebitda": financial_data.compute_pe_ev_ebitda,
"vector_search": financial_data.vector_search,
"get_eth_balance": onchain.get_eth_balance,
"get_eth_txs": onchain.get_eth_txs,
"query_dune": onchain.query_dune,
"get_token_info": onchain.get_token_info,
"get_defi_tvl": onchain.get_defi_tvl,
"get_macro_series": financial_data.get_macro_series,
"get_news": news.get_news,
"compute_correlation": calc.compute_correlation,
"check_ofac_sdn": onchain.check_ofac_sdn,
"trace_funds": onchain.trace_funds,
}.get(name)
if not impl:
raise ValueError(f"Unknown tool: {name}")
if asyncio.iscoroutinefunction(impl):
return await impl(**args)
return impl(**args)
import asyncio # bottom to satisfy linter
4.8 src/tools/financial_data.py
"""Financial data tools: prices, financials, filings, ratios."""
from __future__ import annotations
import asyncio
from datetime import datetime
import yfinance as yf
import structlog
from aiocache import cached, Cache
from ..settings import get_settings
from ..rag.retriever import HybridRetriever
log = structlog.get_logger()
settings = get_settings()
retriever = HybridRetriever()
@cached(ttl=60, cache=Cache.MEMORY)
async def get_stock_price(ticker: str, period: str = "1mo") -> dict:
"""Latest price + period OHLCV."""
def _fetch():
t = yf.Ticker(ticker)
hist = t.history(period=period)
info = t.fast_info
return {
"ticker": ticker.upper(),
"last_price": float(info.last_price) if info.last_price else None,
"currency": info.currency,
"as_of": datetime.utcnow().isoformat(),
"ohlcv": [
{"date": d.strftime("%Y-%m-%d"),
"open": float(r["Open"]), "high": float(r["High"]),
"low": float(r["Low"]), "close": float(r["Close"]),
"volume": int(r["Volume"])}
for d, r in hist.iterrows()
],
}
return await asyncio.to_thread(_fetch)
@cached(ttl=86400, cache=Cache.MEMORY)
async def get_financials(ticker: str, statement: str, freq: str = "quarterly") -> dict:
def _fetch():
t = yf.Ticker(ticker)
if statement == "income":
df = t.quarterly_financials if freq == "quarterly" else t.financials
elif statement == "balance":
df = t.quarterly_balance_sheet if freq == "quarterly" else t.balance_sheet
else:
df = t.quarterly_cashflow if freq == "quarterly" else t.cashflow
return {
"ticker": ticker.upper(),
"statement": statement,
"freq": freq,
"data": {col.strftime("%Y-%m-%d"): df[col].dropna().to_dict()
for col in df.columns},
}
return await asyncio.to_thread(_fetch)
async def parse_filing(ticker: str, filing_type: str, section: str) -> dict:
"""Extract a section from a filing using RAG."""
query = f"{ticker} {filing_type} {section}"
chunks = await retriever.retrieve(query, top_k=5,
filters={"ticker": ticker.upper(),
"doc_type": filing_type})
return {
"ticker": ticker, "filing_type": filing_type, "section": section,
"extracts": [{"text": c.text, "source": c.metadata.get("source", ""),
"page": c.metadata.get("page", -1)} for c in chunks],
}
async def compute_pe_ev_ebitda(ticker: str) -> dict:
def _fetch():
t = yf.Ticker(ticker)
info = t.info
return {
"ticker": ticker.upper(),
"pe_ratio": info.get("trailingPE"),
"forward_pe": info.get("forwardPE"),
"ev_ebitda": info.get("enterpriseToEbitda"),
"price_to_sales": info.get("priceToSalesTrailing12Months"),
"price_to_book": info.get("priceToBook"),
"as_of": datetime.utcnow().isoformat(),
}
return await asyncio.to_thread(_fetch)
async def vector_search(query: str, top_k: int = 5, filters: dict | None = None) -> dict:
chunks = await retriever.retrieve(query, top_k=top_k, filters=filters or {})
return {
"query": query,
"results": [{"text": c.text, "score": c.score,
"metadata": c.metadata} for c in chunks],
}
# Macro tools
@cached(ttl=60, cache=Cache.MEMORY)
async def get_macro_series(series_id: str, start: str = "", end: str = "") -> dict:
"""FRED time series."""
from fredapi import Fred
def _fetch():
fred = Fred(api_key=settings.fred_api_key)
s = fred.get_series(series_id, start, end if end else None)
return {
"series_id": series_id,
"values": [{"date": d.strftime("%Y-%m-%d"), "value": float(v)}
for d, v in s.items() if v == v], # filter NaN
"as_of": datetime.utcnow().isoformat(),
}
return await asyncio.to_thread(_fetch)
4.9 src/tools/onchain.py
"""Onchain tools: Etherscan, Dune, Coingecko, DeFiLlama, OFAC."""
from __future__ import annotations
import os
from datetime import datetime
import httpx
from aiocache import cached, Cache
import structlog
from ..settings import get_settings
log = structlog.get_logger()
settings = get_settings()
ETHERSCAN_BASE = "https://api.etherscan.io/api"
DUNE_BASE = "https://api.dune.com/api/v1"
COINGECKO_BASE = "https://api.coingecko.com/api/v3"
DEFILLAMA_BASE = "https://api.llama.fi"
# OFAC list cached locally
_OFAC_CACHE: set[str] = set()
@cached(ttl=30, cache=Cache.MEMORY)
async def get_eth_balance(address: str) -> dict:
async with httpx.AsyncClient(timeout=10) as c:
r = await c.get(ETHERSCAN_BASE, params={
"module": "account", "action": "balance",
"address": address, "tag": "latest",
"apikey": settings.etherscan_api_key,
})
r.raise_for_status()
data = r.json()
wei = int(data.get("result", "0"))
return {
"address": address,
"eth_balance": wei / 1e18,
"as_of": datetime.utcnow().isoformat(),
}
@cached(ttl=30, cache=Cache.MEMORY)
async def get_eth_txs(address: str, limit: int = 50) -> dict:
async with httpx.AsyncClient(timeout=15) as c:
r = await c.get(ETHERSCAN_BASE, params={
"module": "account", "action": "txlist",
"address": address, "startblock": 0, "endblock": 99999999,
"page": 1, "offset": limit, "sort": "desc",
"apikey": settings.etherscan_api_key,
})
r.raise_for_status()
data = r.json()
txs = data.get("result", []) or []
return {
"address": address,
"count": len(txs),
"txs": [
{
"hash": t["hash"], "from": t["from"], "to": t["to"],
"value_eth": int(t["value"]) / 1e18,
"block": int(t["blockNumber"]),
"timestamp": datetime.fromtimestamp(int(t["timeStamp"])).isoformat(),
"method": t.get("functionName", "").split("(")[0],
} for t in txs[:limit]
],
}
async def query_dune(query_id: int, params: dict | None = None) -> dict:
"""Run a saved Dune query."""
headers = {"X-Dune-API-Key": settings.dune_api_key}
async with httpx.AsyncClient(timeout=120) as c:
# 1) execute
r = await c.post(f"{DUNE_BASE}/query/{query_id}/execute",
headers=headers, json={"query_parameters": params or {}})
r.raise_for_status()
execution_id = r.json()["execution_id"]
# 2) poll
for _ in range(60):
await asyncio.sleep(2)
r = await c.get(f"{DUNE_BASE}/execution/{execution_id}/status",
headers=headers)
if r.json().get("state") == "QUERY_STATE_COMPLETED":
break
# 3) fetch results
r = await c.get(f"{DUNE_BASE}/execution/{execution_id}/results", headers=headers)
return r.json().get("result", {})
@cached(ttl=60, cache=Cache.MEMORY)
async def get_token_info(symbol_or_id: str) -> dict:
async with httpx.AsyncClient(timeout=10) as c:
# Try id first
r = await c.get(f"{COINGECKO_BASE}/coins/{symbol_or_id.lower()}",
params={"localization": "false", "tickers": "false",
"community_data": "false", "developer_data": "false"})
if r.status_code != 200:
# Search
r = await c.get(f"{COINGECKO_BASE}/search",
params={"query": symbol_or_id})
r.raise_for_status()
results = r.json().get("coins", [])
if not results:
return {"error": "not found"}
cid = results[0]["id"]
r = await c.get(f"{COINGECKO_BASE}/coins/{cid}")
d = r.json()
return {
"id": d.get("id"),
"symbol": d.get("symbol"),
"name": d.get("name"),
"price_usd": d.get("market_data", {}).get("current_price", {}).get("usd"),
"mcap_usd": d.get("market_data", {}).get("market_cap", {}).get("usd"),
"fdv_usd": d.get("market_data", {}).get("fully_diluted_valuation", {}).get("usd"),
"vol_24h": d.get("market_data", {}).get("total_volume", {}).get("usd"),
"as_of": datetime.utcnow().isoformat(),
}
@cached(ttl=300, cache=Cache.MEMORY)
async def get_defi_tvl(protocol_slug: str) -> dict:
async with httpx.AsyncClient(timeout=15) as c:
r = await c.get(f"{DEFILLAMA_BASE}/protocol/{protocol_slug}")
r.raise_for_status()
d = r.json()
tvl_series = d.get("tvl", [])[-30:] # last 30 days
return {
"protocol": d.get("name"),
"current_tvl_usd": tvl_series[-1]["totalLiquidityUSD"] if tvl_series else None,
"tvl_30d": [{"date": datetime.fromtimestamp(p["date"]).strftime("%Y-%m-%d"),
"tvl_usd": p["totalLiquidityUSD"]} for p in tvl_series],
"chains": d.get("chains", []),
"category": d.get("category"),
}
# --- Compliance tools ---
async def check_ofac_sdn(address: str) -> dict:
"""Check OFAC SDN list. List loaded once and cached in memory."""
global _OFAC_CACHE
if not _OFAC_CACHE:
await _load_ofac_list()
is_sanctioned = address.lower() in _OFAC_CACHE
return {
"address": address,
"sanctioned": is_sanctioned,
"list": "OFAC SDN",
"as_of": datetime.utcnow().isoformat(),
}
async def _load_ofac_list():
"""Download OFAC SDN list (simplified: chainalysis sanctioned_addresses snapshot)."""
global _OFAC_CACHE
url = "https://raw.githubusercontent.com/0xB10C/ofac-sanctioned-digital-currency-addresses/lists/sanctioned_addresses_ETH.txt"
try:
async with httpx.AsyncClient(timeout=30) as c:
r = await c.get(url)
r.raise_for_status()
_OFAC_CACHE = {a.strip().lower() for a in r.text.splitlines() if a.strip()}
log.info("ofac_loaded", count=len(_OFAC_CACHE))
except Exception as e:
log.error("ofac_load_failed", err=str(e))
_OFAC_CACHE = set()
async def trace_funds(address: str, depth: int = 2) -> dict:
"""Heuristic: trace fund origin by walking inbound txs upstream."""
visited = set([address.lower()])
frontier = [(address, 0)]
edges = []
sanctioned_hits = []
while frontier:
addr, d = frontier.pop(0)
if d >= depth:
continue
txs = (await get_eth_txs(addr, limit=20)).get("txs", [])
# inbound = txs where addr was 'to'
inbound = [t for t in txs if t["to"].lower() == addr.lower() and t["value_eth"] > 0]
for t in inbound[:5]:
src = t["from"]
edges.append({"from": src, "to": addr, "value_eth": t["value_eth"]})
ofac = await check_ofac_sdn(src)
if ofac["sanctioned"]:
sanctioned_hits.append(src)
if src.lower() not in visited:
visited.add(src.lower())
frontier.append((src, d + 1))
return {
"address": address,
"depth": depth,
"edges_count": len(edges),
"edges": edges[:20],
"sanctioned_upstream": sanctioned_hits,
"risk_flag": len(sanctioned_hits) > 0,
}
import asyncio # bottom for circular guards
4.10 src/tools/news.py
"""News tools (Tavily)."""
from __future__ import annotations
import httpx
from aiocache import cached, Cache
from ..settings import get_settings
settings = get_settings()
@cached(ttl=300, cache=Cache.MEMORY)
async def get_news(query: str, max_results: int = 5) -> dict:
async with httpx.AsyncClient(timeout=15) as c:
r = await c.post("https://api.tavily.com/search", json={
"api_key": settings.tavily_api_key,
"query": query,
"search_depth": "basic",
"max_results": max_results,
"topic": "news",
})
r.raise_for_status()
data = r.json()
return {
"query": query,
"results": [{
"title": r["title"], "url": r["url"], "snippet": r.get("content", ""),
"published_at": r.get("published_date"),
} for r in data.get("results", [])],
}
4.11 src/tools/calc.py
"""Pure-compute finance helpers."""
from __future__ import annotations
import numpy as np
from typing import Sequence
def compute_correlation(series_a: Sequence[float], series_b: Sequence[float]) -> dict:
a = np.asarray(series_a, dtype=float)
b = np.asarray(series_b, dtype=float)
if len(a) != len(b) or len(a) < 2:
return {"error": "series length mismatch or too short"}
rho = float(np.corrcoef(a, b)[0, 1])
return {"correlation": rho, "n": len(a)}
def sharpe(returns: Sequence[float], rf: float = 0.04, periods_per_year: int = 252) -> dict:
r = np.asarray(returns, dtype=float)
excess = r - rf / periods_per_year
if r.std() == 0:
return {"sharpe": None, "reason": "zero variance"}
s = excess.mean() / r.std() * np.sqrt(periods_per_year)
return {"sharpe": float(s), "n": len(r)}
def max_drawdown(prices: Sequence[float]) -> dict:
p = np.asarray(prices, dtype=float)
cum_max = np.maximum.accumulate(p)
dd = (p - cum_max) / cum_max
return {"max_drawdown": float(dd.min()), "trough_idx": int(dd.argmin())}
4.12 src/rag/embedder.py
"""Voyage finance-2 embedder."""
from __future__ import annotations
import voyageai
from ..settings import get_settings
settings = get_settings()
_client = voyageai.Client(api_key=settings.voyage_api_key)
def embed(texts: list[str], input_type: str = "document") -> list[list[float]]:
resp = _client.embed(texts, model=settings.rag.embedding_model, input_type=input_type)
return resp.embeddings
def embed_query(text: str) -> list[float]:
return embed([text], input_type="query")[0]
4.13 src/rag/retriever.py
"""Hybrid retriever: BM25 + Vector + RRF + Cohere rerank."""
from __future__ import annotations
from dataclasses import dataclass
from typing import Any
import structlog
from qdrant_client import QdrantClient
from qdrant_client.http.models import Filter, FieldCondition, MatchValue
import cohere
from ..settings import get_settings
from .embedder import embed_query
log = structlog.get_logger()
settings = get_settings()
@dataclass
class Chunk:
text: str
score: float
metadata: dict
class HybridRetriever:
def __init__(self, collection: str = "finance_docs"):
self.qdrant = QdrantClient(url=settings.qdrant_url)
self.collection = collection
self.cohere = cohere.Client(api_key=settings.cohere_api_key)
async def retrieve(self, query: str, top_k: int = 10,
filters: dict | None = None) -> list[Chunk]:
# 1) Vector
qvec = embed_query(query)
qfilter = self._build_filter(filters or {})
vec_hits = self.qdrant.search(
collection_name=self.collection,
query_vector=qvec,
query_filter=qfilter,
limit=settings.rag.retrieval_top_k,
)
# 2) BM25 (skipped for brevity - in production, use Elasticsearch)
# combine simulated by treating vec_hits as full pool
# 3) Rerank with Cohere
if not vec_hits:
return []
docs = [h.payload.get("text", "") for h in vec_hits]
rerank_resp = self.cohere.rerank(
model=settings.rag.rerank_model,
query=query, documents=docs, top_n=top_k,
)
chunks = []
for r in rerank_resp.results:
h = vec_hits[r.index]
chunks.append(Chunk(
text=h.payload.get("text", ""),
score=float(r.relevance_score),
metadata={k: v for k, v in h.payload.items() if k != "text"},
))
log.info("retrieve_done", query=query[:50], n=len(chunks))
return chunks
@staticmethod
def _build_filter(filters: dict) -> Filter | None:
if not filters:
return None
conditions = [
FieldCondition(key=k, match=MatchValue(value=v))
for k, v in filters.items()
]
return Filter(must=conditions)
4.14 src/rag/reranker.py
"""Standalone Cohere reranker (used outside HybridRetriever)."""
from __future__ import annotations
import cohere
from ..settings import get_settings
settings = get_settings()
_client = cohere.Client(api_key=settings.cohere_api_key)
def rerank(query: str, docs: list[str], top_n: int = 10) -> list[dict]:
resp = _client.rerank(
model=settings.rag.rerank_model,
query=query, documents=docs, top_n=top_n,
)
return [{"index": r.index, "score": float(r.relevance_score),
"text": docs[r.index]} for r in resp.results]
4.15 src/memory/session.py 与 src/memory/longterm.py
# src/memory/session.py
"""Redis-backed session memory."""
from __future__ import annotations
import json
import redis.asyncio as aioredis
from ..settings import get_settings
settings = get_settings()
_redis = aioredis.from_url(settings.redis_url, decode_responses=True)
async def append_message(session_id: str, role: str, content: str):
key = f"session:{session_id}:msgs"
await _redis.rpush(key, json.dumps({"role": role, "content": content}))
await _redis.expire(key, 86400)
async def get_history(session_id: str, limit: int = 20) -> list[dict]:
key = f"session:{session_id}:msgs"
raw = await _redis.lrange(key, -limit, -1)
return [json.loads(r) for r in raw]
# src/memory/longterm.py
"""Mem0-backed long-term user memory."""
from __future__ import annotations
from mem0 import Memory
_mem = Memory() # auto-config from env
def add_fact(user_id: str, text: str, metadata: dict | None = None):
_mem.add(text, user_id=user_id, metadata=metadata or {})
def search_facts(user_id: str, query: str, top_k: int = 5) -> list[dict]:
return _mem.search(query=query, user_id=user_id, limit=top_k)
4.16 src/guardrails/safety.py
"""Input + output guardrails."""
from __future__ import annotations
import re
from anthropic import AsyncAnthropic
from ..settings import get_settings
settings = get_settings()
_client = AsyncAnthropic(api_key=settings.anthropic_api_key)
# Patterns indicating prompt injection attempts
INJECTION_PATTERNS = [
r"ignore (previous|all|prior) instructions",
r"system:?\s*(prompt|message)",
r"</?system>",
r"you are now",
r"act as (?!a |an )",
r"pretend (you are|to be)",
]
INJECTION_RE = re.compile("|".join(INJECTION_PATTERNS), re.IGNORECASE)
# Off-topic / unsafe
BLOCKED_TOPICS = ["porn", "weapon", "drug synthesis", "self-harm"]
# Investment advice trigger words (for compliance scrub)
ADVICE_PHRASES = [
r"\byou should (buy|sell|invest|short)\b",
r"\bguarantee(d)? returns?\b",
r"\bwill (rise|fall|moon|crash) (next|tomorrow|soon)\b",
]
ADVICE_RE = re.compile("|".join(ADVICE_PHRASES), re.IGNORECASE)
DISCLAIMER = settings.guardrails.disclaimer.strip()
async def scan_input(query: str) -> dict:
"""Return {safe: bool, reason: str}."""
if INJECTION_RE.search(query):
return {"safe": False, "reason": "Detected prompt injection pattern."}
low = query.lower()
for t in BLOCKED_TOPICS:
if t in low:
return {"safe": False, "reason": f"Blocked topic: {t}"}
if len(query) > 4000:
return {"safe": False, "reason": "Query too long (>4K chars)."}
return {"safe": True, "reason": ""}
async def scrub_output(text: str) -> str:
"""Remove investment-advice phrasing + append disclaimer."""
# Soft-replace advice phrases with research framing
cleaned = ADVICE_RE.sub("[research note: data shows]", text)
if DISCLAIMER and DISCLAIMER not in cleaned:
cleaned = f"{cleaned}\n\n---\n{DISCLAIMER}"
return cleaned
4.17 src/eval/golden.py
"""Golden test harness - 30 cases."""
from __future__ import annotations
import json
from dataclasses import dataclass
from pathlib import Path
GOLDEN_PATH = Path(__file__).parent.parent.parent / "tests" / "golden" / "cases.jsonl"
@dataclass
class GoldenCase:
id: str
query: str
expected_intent: list[str]
expected_keywords: list[str]
must_call_tools: list[str]
must_cite_source: bool
def load() -> list[GoldenCase]:
cases = []
with open(GOLDEN_PATH, "r", encoding="utf-8") as f:
for line in f:
d = json.loads(line)
cases.append(GoldenCase(**d))
return cases
30 条 golden cases 例子(保存在
tests/golden/cases.jsonl):
{"id":"eq-001","query":"What was NVDA's GenAI revenue in Q3 2026?","expected_intent":["equity"],"expected_keywords":["NVDA","data center","revenue"],"must_call_tools":["parse_filing","get_financials"],"must_cite_source":true}
{"id":"eq-002","query":"Compare AAPL and MSFT trailing PE","expected_intent":["equity"],"expected_keywords":["PE","AAPL","MSFT"],"must_call_tools":["compute_pe_ev_ebitda"],"must_cite_source":true}
{"id":"cr-001","query":"What is BUIDL token's onchain TVL trend?","expected_intent":["crypto"],"expected_keywords":["BUIDL","TVL"],"must_call_tools":["get_defi_tvl"],"must_cite_source":true}
{"id":"cr-002","query":"Who are the top 5 holders of USDC on Ethereum?","expected_intent":["crypto"],"expected_keywords":["USDC","holder"],"must_call_tools":["query_dune"],"must_cite_source":true}
{"id":"mc-001","query":"What is the current 10-year treasury yield?","expected_intent":["macro"],"expected_keywords":["10-year","yield"],"must_call_tools":["get_macro_series"],"must_cite_source":true}
{"id":"co-001","query":"Is address 0x... on OFAC SDN?","expected_intent":["compliance"],"expected_keywords":["OFAC"],"must_call_tools":["check_ofac_sdn"],"must_cite_source":true}
{"id":"x-001","query":"Compare BTC and gold returns over past 12 months","expected_intent":["macro","crypto"],"expected_keywords":["BTC","gold","return"],"must_call_tools":["get_token_info","get_macro_series"],"must_cite_source":true}
4.18 src/eval/runner.py
"""Golden eval runner."""
from __future__ import annotations
import asyncio
import json
from .golden import load, GoldenCase
from ..orchestrator import run_query
async def run_one(case: GoldenCase) -> dict:
out = await run_query(case.query, user_id="eval", session_id=f"eval-{case.id}")
answer = out.get("answer", "").lower()
keyword_hit = sum(1 for kw in case.expected_keywords if kw.lower() in answer)
keyword_pass = keyword_hit >= max(1, len(case.expected_keywords) // 2)
tool_calls = [c["tool"] for c in out.get("citations", [])]
tool_pass = all(t in tool_calls for t in case.must_call_tools)
intent_pass = set(case.expected_intent).issubset(set(out.get("intent", [])))
return {
"id": case.id,
"query": case.query,
"intent_pass": intent_pass,
"keyword_pass": keyword_pass,
"tool_pass": tool_pass,
"all_pass": intent_pass and keyword_pass and tool_pass,
"cost": out.get("total_cost_usd", 0.0),
"iterations": out.get("iterations", 0),
}
async def main():
cases = load()
results = await asyncio.gather(*(run_one(c) for c in cases))
pass_rate = sum(r["all_pass"] for r in results) / len(results)
total_cost = sum(r["cost"] for r in results)
print(json.dumps({
"n": len(results),
"pass_rate": pass_rate,
"total_cost_usd": total_cost,
"details": results,
}, indent=2))
if __name__ == "__main__":
asyncio.run(main())
4.19 src/observability/langfuse_setup.py
"""Langfuse OTEL setup."""
from __future__ import annotations
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from langfuse import Langfuse
from langfuse.opentelemetry import LangfuseSpanExporter
from ..settings import get_settings
settings = get_settings()
_provider = TracerProvider()
trace.set_tracer_provider(_provider)
_exporter = LangfuseSpanExporter(
public_key=settings.langfuse_public_key,
secret_key=settings.langfuse_secret_key,
)
_provider.add_span_processor(BatchSpanProcessor(_exporter))
def get_tracer():
return trace.get_tracer("finance_agent")
4.20 src/api/cli.py
"""CLI entry: python -m src.api.cli "your query"."""
from __future__ import annotations
import asyncio
import json
import typer
from ..orchestrator import run_query
app = typer.Typer()
@app.command()
def ask(query: str, session: str = "default", user: str = "anon",
json_output: bool = False):
"""Ask the finance agent."""
out = asyncio.run(run_query(query, user_id=user, session_id=session))
if json_output:
print(json.dumps(out, indent=2, default=str))
else:
print("=" * 60)
print(f"Query: {query}")
print(f"Intent: {out.get('intent')}")
print(f"Plan: {out.get('plan')}")
print("-" * 60)
print(out.get("answer", ""))
print("-" * 60)
print(f"Cost: ${out.get('total_cost_usd', 0):.4f} "
f"Tokens: in={out.get('total_input_tokens', 0)} "
f"out={out.get('total_output_tokens', 0)}")
print(f"Citations: {len(out.get('citations', []))}")
if __name__ == "__main__":
app()
4.21 src/api/server.py
"""FastAPI server."""
from __future__ import annotations
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from ..orchestrator import run_query
app = FastAPI(title="Finance Research Agent v1")
class QueryRequest(BaseModel):
query: str
user_id: str = "anon"
session_id: str = "default"
@app.post("/v1/query")
async def query(req: QueryRequest):
try:
result = await run_query(req.query, req.user_id, req.session_id)
return result
except Exception as e:
raise HTTPException(500, f"Query failed: {e}")
@app.get("/healthz")
async def health():
return {"status": "ok"}
五、端到端运行示例
Query: "分析 BlackRock BUIDL 最近一周的链上活动并对比传统货币基金(如 SHV)"
5.1 调用序列图
sequenceDiagram
participant U as User
participant CLI as CLI
participant O as Orchestrator
participant IG as InputGuard
participant C as Coordinator
participant CR as CryptoAgent
participant EQ as EquityAgent
participant T as Tools
participant OG as OutputGuard
U->>CLI: ask "...BUIDL...SHV..."
CLI->>O: run_query()
O->>IG: scan_input
IG-->>O: safe=true
O->>C: route()
C->>C: Opus 4.7 routing
C-->>O: intent=["crypto","equity"], plan
par 并行 (asyncio.gather via LangGraph)
O->>CR: crypto_node
CR->>T: get_token_info(BUIDL)
CR->>T: get_defi_tvl(buidl)
CR->>T: query_dune(BUIDL holders)
T-->>CR: data
CR-->>O: sub_results[0]
and
O->>EQ: equity_node
EQ->>T: get_stock_price(SHV, 1mo)
EQ->>T: parse_filing(SHV, fact sheet)
T-->>EQ: data
EQ-->>O: sub_results[1]
end
O->>C: synthesize
C->>C: Opus 4.7 fuse
C-->>O: answer
O->>OG: scrub_output
OG-->>O: answer + disclaimer
O-->>CLI: result
CLI-->>U: print
5.2 实际输出片段(截取)
Coordinator router 输出:
{
"intent": ["crypto", "equity"],
"plan": "Pull BUIDL onchain holders + TVL + token metrics. In parallel, pull SHV price action + ETF fact sheet. Then compare flows and yields."
}
Crypto agent 中间过程(节选):
[Tool] get_token_info(symbol_or_id="buidl")
-> {"id":"blackrock-usd-institutional-digital-liquidity-fund",
"price_usd":1.0, "mcap_usd":520000000, ...}
[Tool] get_defi_tvl(protocol_slug="buidl")
-> {"protocol":"BlackRock USD Inst. Digital Liquidity",
"current_tvl_usd":520000000,
"tvl_30d":[...]}
[Tool] query_dune(query_id=4257891) // top BUIDL holders by addr
-> rows: 12 unique holders, top = Ondo Finance (60% of supply)
Equity agent:
[Tool] get_stock_price(SHV, "1mo")
-> last_price=110.42, 30-day range tight ($110.20-110.55)
[Tool] get_financials(SHV, "income", "quarterly")
-> dividend yield ≈ 4.78% trailing
[Tool] parse_filing(SHV, "10-K", "fact sheet")
-> "iShares Short Treasury Bond ETF, holdings <1Y T-bills..."
Coordinator synthesizer 最终输出(精简版):
## BUIDL vs SHV — 一周链上活动对比
**BlackRock BUIDL (onchain)**
- 当前 AUM: ~$520M(DeFiLlama, 2026-10-26 14:00 UTC)
- 持有人结构高度集中:Ondo Finance 占 ~60%(USDY 储备需求)
- 一周链上转账:4 笔大额(>$10M),主要是 institutional rebalancing
- 当前年化收益 ≈ 5.10%(根据 7-day moving avg of distribution)
**iShares Short Treasury (SHV, TradFi 对照)**
- 价格稳定在 $110.20-110.55(过去 30 天 vol ~0.05%)
- 30-day SEC yield: 4.78%
- AUM: ~$23B(远大于 BUIDL)
**对比要点**
- 收益率:BUIDL ~5.10% 略高于 SHV 4.78%(可能反映链上结算成本不同)
- 流动性:SHV 二级市场每日成交亿美元级;BUIDL 仅在白名单 KYC 之间转移
- 用户结构:BUIDL 极少数大户(12 个地址),SHV 数千机构 + 散户
**数据来源**:DeFiLlama, Coingecko, Dune query 4257891, Yahoo Finance, SEC EDGAR
---
Disclaimer: This information is for research purposes only and does not constitute
investment advice. Past performance is not indicative of future results.
5.3 性能与成本(estimated)
| 指标 | 值 |
|---|---|
| 总耗时 | ~22s(其中并行节省 ~12s) |
| 总 input tokens | 18,400 |
| 总 output tokens | 2,150 |
| 总 cost | $0.094 |
| 工具调用次数 | 6 |
| Iterations | 1 路由 + 2 并行 + 1 合成 = 4 layers |
六、关键设计模式落地
6.1 State Graph Cycles(LangGraph)
- ReAct 循环嵌入子 agent 的 tool-use loop(
while iterations < max_iters) - LangGraph 层做 conditional routing(intent → 哪些 sub-agents 跑)
- LangGraph 不直接做 cycle(那是 sub-agent 内部的事),保持 LangGraph 层只做 macro orchestration
6.2 Memory Layered Access
- Session(Redis 24h):消息历史、工具结果短期 cache
- Long-term(Mem0):用户偏好("我只关心 RWA")、watch list
- Procedural(Postgres):研究模板("分析任意股票" = 一个固定步骤)
- 写入策略:每次 final response 后,Coordinator 让 Opus 抽取 1-3 条事实存入 Mem0
6.3 Parallel Tool Calls
- LangGraph 自带:从 router 出发的 4 个 conditional edges 自动 fan-out
- 每个 sub-agent 内部:Anthropic 原生支持单 turn 多 tool(response 中多个 ToolUseBlock)→ asyncio.gather 并行执行
6.4 Cache 策略
- Tool 层:aiocache TTL(price 60s / onchain 30s / filing 24h)
- LLM 层:Anthropic prompt caching(system prompt + tool schema),每次调用 90% 折扣
- RAG 层:检索结果按 query hash 缓存 5min
6.5 Guardrails Layered Defense
- 入口(input_guard 节点):injection scan + topic check + length cap
- 工具层:每个工具独立超时 / circuit breaker / cost meter
- 出口(output_guard 节点):advice phrase scrub + disclaimer 自动追加
- 离线(eval 层):每日 100 case 红/绿测试,hallucination > 5% 阻止部署
七、运行指南
7.1 本地启动
# 1. 准备
git clone <repo> && cd finance_agent
cp .env.example .env # 填入 API keys
pip install -r requirements.txt
# 2. 起 dependencies (Qdrant + Redis + Elasticsearch)
docker-compose up -d
# 3. 索引一些文档(一次性)
python -m src.rag.index --input data/sec_filings/
# 4. 试跑
python -m src.api.cli "What is NVDA's trailing PE?"
# 5. 跑 golden eval
python -m src.eval.runner
# 6. 起 HTTP server
uvicorn src.api.server:app --host 0.0.0.0 --port 8000
7.2 docker-compose.yml(节选)
version: "3.9"
services:
qdrant:
image: qdrant/qdrant:v1.13.0
ports: ["6333:6333"]
volumes: ["./data/qdrant:/qdrant/storage"]
redis:
image: redis:7.4-alpine
ports: ["6379:6379"]
elasticsearch:
image: elasticsearch:8.16.0
environment:
- discovery.type=single-node
- xpack.security.enabled=false
ports: ["9200:9200"]
agent:
build: .
env_file: .env
depends_on: [qdrant, redis, elasticsearch]
ports: ["8000:8000"]
command: uvicorn src.api.server:app --host 0.0.0.0 --port 8000
八、下一步迭代(5 个改进点)
- Streaming: SSE 推送 token,让用户提前看到中间进度("正在调用 get_eth_balance...")
- Multi-step plan with reflection: 当 sub-agent 返回 confidence 低时,Coordinator 启动二次 plan
- Subgraph for compliance: Compliance 的 trace_funds 是图遍历,应该单独抽出成 LangGraph 子图
- MCP server 化: 把 tools/ 改造成独立 MCP server(Python stdio),让其他项目复用
- LoRA 财报抽取专模: 用 Unsloth 微调 Qwen 2.5 7B 做 financials extraction,cost 降 80%
九、关键速查
REST API endpoints
| Method | Path | Description |
|---|---|---|
| POST | /v1/query | 主查询 |
| GET | /healthz | 健康检查 |
| POST | /v1/eval/run | 触发 golden eval(admin) |
主要函数签名
| 函数 | 签名 |
|---|---|
run_query | async def run_query(query: str, user_id, session_id) -> dict |
route | async def route(state) -> dict |
synthesize | async def synthesize(state) -> dict |
equity_node | async def equity_node(state) -> dict |
dispatch | async def dispatch(name: str, args: dict) -> Any |
HybridRetriever.retrieve | async def retrieve(query, top_k, filters) -> list[Chunk] |
scan_input / scrub_output | async def (text) -> dict / str |
十、面试题
Q1:为什么用多 agent 而不是大单 agent?
答:3 点:(1) 上下文压缩——单 agent 要把 17 个 tool schema 全塞进 system prompt,就是 5K+ token 的固定开销,多 agent 每个只看自己的 5 个 tool,节省 ~70%;(2) 并行——本 demo 中 crypto agent 和 equity agent 并行可省 ~12s;(3) 独立 eval 与 ownership——团队结构上 macro/equity/crypto 可以是不同人维护。代价是 Coordinator 单点和复杂度。
Q2:Tool calling 错误怎么处理?
答:四层:(1) 工具内部 try/except,永远返回 {"error": str} 而非抛异常;(2) Agent 看到 error 字段会自然在下一轮决定重试 / 换工具 / 告诉用户失败;(3) 全局 max_tool_iters=8,超出强制停;(4) 同 tool 同 args 连续调用 3 次以上则 short-circuit(避免死循环)。
Q3:Cost 怎么控?
答:(1) 模型分级:routine 用 Sonnet(Opus 1/5 价格)、cheap routes 用 Haiku;(2) Anthropic prompt caching:system prompt + tool schemas 90% 折扣;(3) per-query cap $0.50,超出由 LangGraph conditional edge 路由到 abort 节点;(4) per-user 日 cap $5;(5) tool 层 cache(Yahoo/Etherscan)。
Q4:怎么验证 agent 没有 hallucinate 数字?
答:trace 校验。每个数字必须能在 citations 列表的 tool 输出中找到。Output guard 阶段用一个 cheap Haiku 调用做 self-check:"这段 response 中所有数字 X,是否在 trace 数据中能找到?",发现不一致就标红 + log 警报。每日 100 case golden test 持续监控。
Q5:为什么用 LangGraph 而不是 OpenAI Agents SDK?
答:(1) Anthropic 生态优先(我们用 Claude Opus/Sonnet/Haiku);(2) State 显式(TypedDict)便于演化;(3) Conditional edge / cycle 一等公民;(4) Langfuse 集成深;(5) Checkpointer 能恢复中断 task。OpenAI Agents SDK 在 OpenAI 生态外可用性弱。
十一、明日预告(Day 179)
明天进入 Eval & Iteration:
- 把 30 条 golden test 跑通,统计 pass rate
- 接入 Langfuse 做 trace 可视化
- 红队 30 条 adversarial query 试探边界
- 发布 v1.0 release notes 与 metrics dashboard
速查总表
| 模块 | 文件 | 行数 | 角色 |
|---|---|---|---|
| Orchestrator | src/orchestrator.py | ~120 | LangGraph 状态机 |
| Coordinator | src/agents/coordinator.py | ~110 | 路由+合成(Opus) |
| Equity Agent | src/agents/equity.py | ~90 | 股票研究(Sonnet) |
| Crypto Agent | src/agents/crypto.py | ~50 | 链上研究(Sonnet) |
| Tool Registry | src/tools/registry.py | ~200 | 17 个工具 schema |
| Financial Data | src/tools/financial_data.py | ~120 | yfinance + EDGAR |
| Onchain | src/tools/onchain.py | ~180 | Etherscan + Dune + OFAC |
| Hybrid Retriever | src/rag/retriever.py | ~70 | Qdrant + Cohere rerank |
| Safety | src/guardrails/safety.py | ~50 | input + output guard |
| CLI | src/api/cli.py | ~30 | typer entry |
| Eval Runner | src/eval/runner.py | ~50 | golden test |
总:约 1500 行 Python + 30 条 golden cases + 配置 / docs。
下一站 → Day 179:Eval、Observability、迭代 与 Day 180:v1.0 发布与回顾