---
name: data-pipeline-walkthrough
description: 卫星 Agent 端到端数据处理流程 10 步 walkthrough — 从 X 推文抓取到飞书通知。每步含 "做什么 / 怎么跑 / 决策点 / 输出 / 已知坑"。用作首次跑通、新人 onboarding、生产链回归验证、半年回顾时的脚本。
type: process
trigger:
  - "逐步跑一遍整个数据处理流程"
  - "走一遍卫星 agent 数据流"
  - "/data-pipeline-walkthrough"
  - "端到端跑一次 agent"
  - "把生产链路逐步演示一遍"
not_when:
  - 只想跑 cron 那一步(直接 `run_x_ingest.sh` 即可,不需要本 skill 的 10 步)
  - 生产已稳定 N 周需要的是性能/瓶颈分析(用 obs / stats,不是本 skill)
  - 想要新建 corpus / 改 ontology / 加主线(用 PRODUCT-STRATEGY.md + agent/PRD.md,不是本 skill)
---

# Skill · data-pipeline-walkthrough

> **意图**:逐步跑一遍卫星 Agent 端到端数据处理流程,每步先解释"要做什么"、给出"决策/输入选项"、跑完汇报"结果"和"判断"。沉淀为脚本式的 SOP,既是首次跑通指南,也是半年回顾时的"我们的生产链长这样"快照。

> **不做的事**:不替代 cron(daily 03:00 launchd 已经在跑)、不教如何改规则版、不替代 ADVICE D V2 真 LLM E2E (那是 follow-up,不在本 skill 范围)。

---

## 1. 何时调起本 skill

| 触发场景 | 例子 |
|---|---|
| 用户说 "逐步跑一遍整个数据处理流程" | / |
| 用户说 "走一遍卫星 agent 数据流给我看" | / |
| 用户说 "把生产链路演示一遍,看每步在做什么" | / |
| 显式 `/data-pipeline-walkthrough` | / |
| 用户想做"半年回顾",需要先把当下生产链跑一遍当 baseline | / |

**不要激活**:
- 用户问"今天 cron 跑得怎么样" → 直接 `tail agent/data/x-ingest.log` + `satagent job show x-ingest-daily`,不要走 10 步
- 用户问"加一条主线 / 加一个 source" → 走 PRODUCT-STRATEGY.md + agent/PRD.md
- 用户问"ADVICE 准确率怎么提高" → 走 ADVICE-INTEGRATION.md + Step 7 (validate) 单独跑

---

## 2. 数据流总图

```
┌──────────────────────────────────────────────────────────────────────────────┐
│  expert/X/x_agent  ──Step 1──▶  x.sqlite3 (3138 推文 / 95 账号)               │
│       (twikit + cookies)                                                      │
└──────────────────────────────────────────────────────────────────────────────┘
                                  │
                                  ▼  Step 2 (job x-ingest-daily, launchd 03:00)
┌──────────────────────────────────────────────────────────────────────────────┐
│  satellite_agent  ──fetch(x-sqlite)──▶  agent.db.events (376 条 / 5 主线)     │
│       reply/retweet 过滤 → ontology(103 词) 打 thread+impact → 去重写入       │
└──────────────────────────────────────────────────────────────────────────────┘
                                  │
                       ┌──────────┼──────────┬──────────┬──────────┐
                       ▼          ▼          ▼          ▼          ▼
                    Step 3     Step 4     Step 5     Step 6     Step 7
                    decision   thesis     debate    trigger     validate
                    (双视角)   (主线分)   (Bull/Bear) +alerts    (ADVICE D V1)
                       │          │          │          │          │
                       └──────────┴──────┬───┴──────────┴──────────┘
                                         ▼
                                      Step 8 report → Step 9 notify-test → 飞书
```

---

## 3. 10 步详解

### Step 1 · X 推文抓取 (scrape)

**做什么**:`expert/X/x_agent scrape` 用 cookie 抓 47 家种子公司最新推文进 `x.sqlite3`。

**怎么跑**:
```bash
# 全量 (47 账号, 30-60 分钟, 撞 429 sleep 900s)
/Users/john/InvesResearch/agent/scripts/run_x_scrape.sh

# 试水 (单/双账号, 秒级跑完)
/Users/john/InvesResearch/agent/scripts/run_x_scrape.sh --only SpaceX --tweets-per-account 30
```

**决策点**:
- 全量 / 试水 / 跳过 — 走 walkthrough 推荐**试水 2 账号**(SpaceX 是 SEED_COMPANIES 第一条,几乎一定能跑)
- log 看 `expert/X/data/scrape.log`

**已知坑**:
- `--only` 只匹配 SEED_COMPANIES 里的账号。**`elonmusk` 不在 seed (那是公司账号清单)**, 会被静默忽略, log 不会报错
- launchd 是 daily 但**只调 ingest, 不调 scrape**。scrape 要 operator 手动触发,因为单次可能 30-60 分钟,跟 ingest 1-2 秒不同量级,不能挂同一条 launchd 链
- scrape 中段如果撞 429, `KeyError: 'value'` 是 twikit 库已知问题, 不影响其他账号

---

### Step 2 · 抓取入库 (ingest)

**做什么**:从 `x.sqlite3` 流式读最近 500 条推文 → reply/retweet 过滤 → ontology 词典匹配打 thread+impact → 按 event_id 哈希去重 → 写 `agent.db.events`。

**怎么跑**:
```bash
# 完全等价 cron (job 内部封装 x-sqlite source)
cd /Users/john/InvesResearch/agent
X_SQLITE_PATH=/Users/john/InvesResearch/expert/X/data/x.sqlite3 \
  /usr/bin/python3 -m satellite_agent.cli job run x-ingest-daily
```

**输出 schema**:
```json
{
  "job": "x-ingest-daily",
  "status": "success",
  "result": {
    "source": "x-sqlite",
    "ingested": 0,           // 真正写入 events 的条数
    "duplicates": 353,       // event_id 哈希命中已存在
    "stats": {
      "fetched_rows": 500,   // sqlite 拉了 500 行
      "filtered_reply": 19,  // 过滤 reply
      "filtered_retweet": 128,
      "emitted": 353         // 喂给 classifier 的数量
    }
  }
}
```

**已知坑**:
- **`fetch --source` CLI 不支持 x-sqlite**。x-sqlite 只能通过 `job run x-ingest-daily` 间接调,因为它走 `ingest_pipeline.py` 而不是 `fetch.py`
- 没 `X_SQLITE_PATH` env 时 db_path 默认 `expert/X/data/x.sqlite3` (相对),从非 agent/ 目录跑会找不到。**总是显式 export**
- `ingested=0 / duplicates=N` 不是 bug, 是 "x.sqlite3 没新增" 的真实信号

---

### Step 3 · 决策周报 (decision)

**做什么**:Phase 3a 双视角决策周报。规则版 V1.1 baseline。CEO 视角(5 维 + 战略建议 + 反方 + falsification + Quality Check) + 投资人视角(主线相对热度 + 公司位势 + 仓位调节)。

**怎么跑**:
```bash
# 默认 7 天 + 4 周基线, 双视角
/usr/bin/python3 -m satellite_agent.cli decision --view both --window 7 --format md

# 导出 html 给浏览器看
/usr/bin/python3 -m satellite_agent.cli decision --view both --window 7 \
  --format html --out reports/decision-$(date +%Y-%m-%d).html
```

**决策点**:
- `--view ceo|investor|both` 选视角
- `--window 7|30` — 短窗口对短期波动敏感,长窗口稳但拖尾
- `--debate-thread X --debate-llm` 集成 debate multiplier (需 LLM key)

**已知坑**:
- **窗口空 (无新 events) 时所有 sentiment Δ 都 = 0**, 5 主线全 "观察"。这不是 bug 是窗口现实
- ground truth 缺市场反应 5d / 研究共识时, CEO 视角 falsification 章节会显示 "未配置任何证伪触发器" — 跑 Step 6 给主线挂 trigger 后即可填上

---

### Step 4 · Thesis 主线评分

**做什么**:5 主线 thesis_state 表的 baseline 计算/刷新。每次 `--refresh` 会重新算并写库。

**怎么跑**:
```bash
# 只看, 不写
/usr/bin/python3 -m satellite_agent.cli thesis --window 30

# 重算并写库 (推荐 30 天窗口建 baseline, 7 天太短)
/usr/bin/python3 -m satellite_agent.cli thesis --refresh --window 30
```

**已知坑**:
- 首次跑 `stats` 会显示 `thesis_state_score: {}` — 因为表空。`--refresh` 后才有
- `--window 7` 在 events 稀疏期(比如本周)会**全 0**, 一定要先 30 天建 baseline 再 7 天滚动
- thesis_state 只写 5 主线汇总, **不写公司级** (公司级在 decision 视角实时算 composite)

---

### Step 5 · 多 agent 辩论 (debate)

**做什么**:对指定主线跑 Bull/Bear/Judge 三方辩论。规则版从 events 抽 evidence 评分; `--llm` 版走 LiteLLM 调真模型。

**怎么跑**:
```bash
# 规则版 (无 key 也可)
/usr/bin/python3 -m satellite_agent.cli debate --thread 核心网 --window 30 --format md

# LLM 版 (需 SATAGENT_LLM_API_KEY 或 ANTHROPIC_API_KEY)
/usr/bin/python3 -m satellite_agent.cli debate --thread 核心网 --window 30 --llm
```

**输出**:
- 裁决: `看多 / 看熊 / 分歧`
- 置信度乘数: `0.5 ~ 1.3` (规则版上限 1.3, margin 越大乘数越高)
- 多空双方各自的论据评分

**已知坑**:
- bear 论据完全为 0 时,**裁决无脑看多**,置信度顶到 1.3。在 events 稀疏期看多分数容易虚高
- LLM 版需要在 `agent/.env` 配 key (.env.example 有 5 provider 模板)

---

### Step 6 · 触发器 + 风险预警

**做什么**:
- `trigger set/list/delete/check` — 给主线挂硬阈值,扫描后命中即写 alerts 表
- `alerts` — 查询窗口内已有 alerts (含 trigger 写的 + 风险维度 events)

**怎么跑**:
```bash
# 挂个示例 trigger
/usr/bin/python3 -m satellite_agent.cli trigger set \
  --thread 核心网 --type thread_sentiment_below \
  --params '{"thread":"核心网","threshold":1.0,"window":7}' --severity med

# 立即扫描
/usr/bin/python3 -m satellite_agent.cli trigger check

# 30 天 alerts
/usr/bin/python3 -m satellite_agent.cli alerts --window 30

# 演示完清掉
/usr/bin/python3 -m satellite_agent.cli trigger delete \
  --thread 核心网 --type thread_sentiment_below
```

**已知坑(产品级 bug 候选)**:
- ⚠️ `thread_sentiment_below` 看的是 **7 天窗口内 events 实时 sentiment**, 不是 `thesis_state.score`。窗口空时 sentiment=0 必触发, **即使 thesis_state 在 3.25**。建议改成读 thesis_state.score 或加 `min_events` 兜底
- 已注册 check_types: `company_order_concentration` / `event_count_drop` / `thread_sentiment_below`
- 演示完务必 `trigger delete`,否则污染生产 alerts 表

---

### Step 7 · ADVICE 验证

**做什么**:`validate` 跑 corpus v2.0 (96 样本) 对照, 输出 4 字段(threads / impact / strategy / focus) 的 exact/partial/mismatch 矩阵, 反推 top-3 盲点。

**怎么跑**:
```bash
# 规则版 baseline (无 key 也可)
/usr/bin/python3 -m satellite_agent.cli validate --format md

# 规则 + LLM 兜底 (需 key)
/usr/bin/python3 -m satellite_agent.cli validate --llm --format md

# D V2 双跑对比 (rules-only vs rules+LLM, 同 corpus, 三视角 diff)
/usr/bin/python3 -m satellite_agent.cli validate --compare-llm --format md
```

**当前 baseline (2026-06-09 corpus v2.0)**:

| 字段 | exact_rate |
|---|---:|
| threads | 37% |
| thesis_impact | 55% |
| strategy | 53% |
| thread_in_focus | 48% |
| **Overall (≥3 字段 exact)** | **27%** |

**Top-3 盲点**:
1. strategy ×18: agent=观察 / gt=进入 (启发式太保守)
2. thesis_impact ×17: agent=中性 / gt=增强 (漏判正向)
3. thesis_impact ×17: agent=增强 / gt=中性 (错判正向)

**已知坑**:
- corpus 路径默认 `samples/labeled_validation.jsonl` — 在 agent/ 目录下跑才能找到
- `--compare-llm` 跑两遍,LLM 调用费用 = 单次 × 96, 注意 EH-1 budget

---

### Step 8 · 周报 (report)

**做什么**:7 天投研周报(默认), 包含 5 主线评分矩阵、thesis 变化、关键事件分线、风险预警、被点名公司、下周跟踪清单。

**怎么跑**:
```bash
/usr/bin/python3 -m satellite_agent.cli report --window 30 --format md
```

**已知坑**:
- 复用 thesis_state.score,所以 **Step 4 必须先跑 `thesis --refresh`** 否则报告里 thesis 一栏全 0
- 默认窗口 7 天,events 稀疏期请用 30 天

---

### Step 9 · 飞书通知 (notify-test)

**做什么**:发测试消息到飞书自定义机器人 webhook。验证 EH-3 飞书送达链路。

**怎么跑**:
```bash
# 简单文本
/usr/bin/python3 -m satellite_agent.cli notify-test \
  --webhook 'https://open.feishu.cn/open-apis/bot/v2/hook/XXX'

# 卡片格式 (底部带免责声明)
/usr/bin/python3 -m satellite_agent.cli notify-test \
  --webhook 'https://open.feishu.cn/open-apis/bot/v2/hook/XXX' --card
```

**已知坑**:
- webhook URL 是机密,**不要写进仓库**, 也不要让 shell history 留底。用 env / .env (gitignored) / 1Password CLI 注入
- 飞书机器人有 security 选项 (IP 白名单 / 签名校验),目前 notify.py 只支持无签名模式
- 卡片格式底部强制带"不构成投资建议"免责声明 (定义在 `agent/satellite_agent/notify.py`)

---

### Step 10 · 沉淀 skill + 文档

**做什么**:跑完 1-9 后,把全部输出归档到 `agent/reports/walkthrough-YYYY-MM-DD/`, 保留 7 份 md 当回归 baseline。

**怎么跑**:已在本 skill 内嵌(本文件本身就是 Step 10 的产出)。

**输出文件**:
```
agent/reports/walkthrough-YYYY-MM-DD/
├── 03-decision-both-7d.md
├── 05-debate-核心网.md
├── 05-debate-终端.md
├── 05-debate-芯片.md
├── 07-validate.md
└── 08-report-30d.md
```

---

## 4. 全流程时间预算

| 阶段 | 最快 | 全量 |
|---|---:|---:|
| Step 1 scrape | 秒级 (跳过) | 30-60 分钟 (47 账号) |
| Step 2 ingest | 1-2 秒 | 1-2 秒 |
| Step 3 decision | 1 秒 | 2 秒 |
| Step 4 thesis | 1 秒 | 1 秒 |
| Step 5 debate × 3 | 3 秒 | 30 秒 (LLM 版) |
| Step 6 trigger+alerts | 1 秒 | 1 秒 |
| Step 7 validate | 5 秒 | 2 分钟 (`--compare-llm` × 96) |
| Step 8 report | 1 秒 | 1 秒 |
| Step 9 notify-test | 0 (跳过) | 1 秒 |
| Step 10 沉淀 | 手工 | 手工 |
| **合计 (最快路径)** | **~15 秒** | — |
| **合计 (全量含 LLM)** | — | **~65 分钟** |

---

## 5. 跑完之后该看什么

1. **`agent/reports/walkthrough-YYYY-MM-DD/`** — 6 份 md 是这次跑通的硬证据
2. **`agent/data/x-ingest.log`** — cron 历史 (与 Step 2 输出对齐)
3. **`agent/data/agent.db`** — events / thesis_state / alerts / job_runs 四张表本期变化
4. **memory** — 把 "本次跑通发现的 product bug 候选" 写一条新 memory (本次发现的 trigger thread_sentiment_below 该读 thesis_state 而非窗口 sentiment 是典型例子)

---

## 6. 跨节点回归测试用法

每 30 / 90 天复跑本 skill,然后:
1. `diff agent/reports/walkthrough-2026-06-09/ agent/reports/walkthrough-2026-09-09/`
2. 看 7 份 md 对应章节差异 — 是数据增长(正常)还是结构异常(回归)
3. Step 7 validate 的 overall_exact 必须 **单调不降**, 否则规则版退化警告

---

## 7. 关联文档

- `agent/PRD.md` — 完整产品定义
- `PRODUCT-STRATEGY.md` — 战略层 5 主线 / 4 数据源 / 6 skill
- `ADVICE-INTEGRATION.md` — Step 7 validate 完整设计
- `FE-SKILLS-INTEGRATION.md` — Step 5 debate / Step 6 trigger / wyhtb 的 FE 设计
- `NEXT-STEPS.md` — 下轮开发候选 (D V2 真 LLM E2E / AC1 cron 4 链 / strategy V1.2)
- `module-graph.html` — 32 agent module 代码图谱 (可视化本 skill 用到的所有 CLI 子命令)
- `data.html` — 第 5 视角门户 (4 数据源 / 6 skill / 3 类产物 / 19 行状态矩阵)

---

## 8. 已知遗留(本 skill 不解决,但要记)

| 项 | 详情 | 下一步 |
|---|---|---|
| Step 1 elonmusk 静默忽略 | `--only` 不在 SEED_COMPANIES 时没 warn | 加 warn 日志即可 (10 行代码 follow-up) |
| Step 2 fetch CLI 不支持 x-sqlite | 只能走 job 间接调 | 想直接 CLI 跑,加 `--source x-sqlite` 即可 |
| Step 6 trigger 假警报 | thread_sentiment_below 用窗口 sentiment 不是 thesis_state | 改成读 thesis_state.score 或加 min_events 兜底 |
| Step 9 飞书 webhook 无签名 | 当前 notify.py 不支持签名 | 接 sign 模式 (.env 配 secret) |
| Step 7 ADVICE 27% baseline | 规则版盲点已暴露 | D V2 真 LLM E2E (用户填 key 后跑) |
