以数据为中心 · 从哪里来 · 怎么处理 · 到哪里去

数据的一生

这一页只回答一个问题:仓库里跑的每一字节,它从哪里进来(5 类来源)、它怎么被处理(7 步管线)、它落在哪里(4 张表)、谁会再读它(7 个分析模块)、最后变成什么(5 种出口)。先看顶部总图建立心智模型,再逐层钻入细节,末尾用一条 demo 事件走完整链路。

来源5 类 管线7 步 4 张 分析模块7 个 出口5 种
Bird's-eye · 总图

一张图看所有数据怎么流

横向 3 列对应数据生命周期的三个阶段。左列是来源:5 类 input 都最终走同一个 ingest_raw() 入口。中列是处理 + 存储:7 步管线 +1 张 SQLite。右列是读取 + 输出:7 个分析模块读 DB 后,通过 CLI / REST / 多格式文件 / Prometheus 出去。流向都是从左到右,只有一个例外 —— thesis_state 会被 thesis.update_thesis_scores 写回 DB,所以右列也有反向虚线箭头隐含。

1来源 · INPUT
JSONL 文件file
samples/events.jsonl · 8 条 demo 事件 · satagent ingest-file
纯文本文件file
公司公告 / 通稿 · 单文件单事件 · satagent fetch --source text
RSS / Atom HTTPhttpnew
通用 publisher-agnostic 源 · UA + 重试 · 可 fixture 离线测 · satagent fetch --source rss
REST POSTapi
实时单条录入 · POST /events/ingest · 可选 Bearer token
种子数据seed
一次性写入 · 84 行 market_model + 6 家公司卡片 · satagent init 触发 ensure_seed
本体词典 (常量)const
ontology.py · 4 主线 / 6 场景 / 6 维度 / 365 词 · 不入 DB, 直接被 classify()
2处理 + 存储 · TRANSFORM
1
classify(text)
关键词规则 → threads / scenarios / dimensions / thesis_impact / confidence
2
match_companies(conn, text)
扫公司名 + aliases → 命中公司列表
3
enrich_with_company_threads
命中公司的所属主线 → 追加到 threads(可能 over-fire)
4
classify_with_llm (可选)
触发门 conf<0.55 或 ≥3 threads · 无 key 时 no-op · LLM 收口纠错
5
extract_numeric(text)
中文金额 (亿/万换算) · 时间窗 · 客户主体 · numeric_evidence
6
numeric_overrides 覆盖
RawEvent 或人工指定的字段 → 覆盖自动提取(抓取层最终决定权)
7
insert_event(conn, ...)
写入 events 表 · 返回 event_id
SQLite · data/agent.db
events17 列
companies13 列
market_model6 列
thesis_state4 列
3分析 + 出口 · OUTPUT
decision.decideread
Phase 3a · 读 events + companies · 产出 top drivers + 战略 + 公司位势 + 仓位信号
valuation.value_allreadnew
Phase 3b · 读 companies.orders + revenue · PE 法估值映射(假设可覆盖)
market_model.dynamicreadnew
读 market_model (seed) + window 内 events 的 capex/订单 · 不持久化, 只读对照
thesis.compute / updatereadnew
实时评分 · refresh=true 时 写回 thesis_state 表(唯一反向流) · 推 alerts
debate.run_debatereadnew
Bull / Bear / Judge 三角色 · 读窗口 events · 输出裁决 + 置信度乘数
report.weekly_reportread
Phase 1 周报 · markdown / JSON · 简单聚合不做决策
observability.collectreadnew
扫所有表 · 计 events / threads / thesis 计数 · 渲染 Prometheus 格式
最终出口 · 5 种
stdout · CLI 13+ 子命令
JSON · REST 14+ 端点
md / html / csv / docx / pptx · export.py
text · Prometheus /metrics
2 张 brief · CEO + 投资视角
1 · 来源

数据从哪里来(5 类)

所有外部 input 都被规约为同一个 RawEvent 数据契约(title / text / source / url / occurred_at / numeric_overrides / next_indicators / companies),由 Source.fetch() → Iterable[RawEvent] 产出。下面 5 张卡按入口形态拆开。

file source

JSONL 批量

逐行 JSON 文件 · 用于演示与回归基线。samples/events.jsonl(8 条 demo) 与 samples/labeled_*.jsonl(回归集)。
# satagent ingest-file samples/events.jsonl class JsonlSource(Source): def fetch() → Iterable[RawEvent]
代码:sources/jsonl.py
file source

纯文本单事件

一个文件 = 一条事件文本。最适合手工保存的公告 / 通稿。CLI 同时接收 --source-name 标注来源。
# satagent fetch --source text --path ann.txt # --source-name "公司公告"
代码:sources/text.py
http source · NEW

RSS / Atom 通用

不绑定具体发布方,任意 RSS / Atom feed 都能跑。带 User-Agent + 重试 + 关键词过滤。可用 fixture XML 完全离线验证 (test_rss 覆盖)。当前运行环境出网被拦,真实闭环需放开 egress。
# satagent fetch --source rss \ # --url https://... \ # --keywords 卫星 终端
代码:sources/rss.py
tests:tests/test_rss.py
api source

REST 实时录入

最直接的入口 · POST /events/ingest 接受 IngestRequest · 可带 numeric 字段绕过自动提取(顺位最高) · 配 SATAGENT_API_TOKEN 后强制 Bearer。
POST /events/ingest {"text": "...", "numeric": {"order_amount_cny_yi": 5.2}}
代码:api.py · api_ingest()
seed source

种子数据(一次性)

不是事件流,而是 schema 初始化时的占位数据。satagent initensure_seed 写入 84 行 2025-2031 × 4 主线 × 3 情景市场模型 + 6 家公司卡片。仅用于 MVP 演示,接实盘需要被真实数据覆盖。
# satagent init ensure_seed(conn) → market_model[84] + companies[6]
代码:seed.py
const · 非外部

本体词典(代码常量)

严格说不是"数据源",而是 classify 的规则依据ontology.py 里写死 4 主线 / 6 场景 / 6 维度 / 365 关键词 + 极性词典。不入 DB,改了需重启服务。
THREAD_KEYWORDS = {"核心网": [...], "终端": [...], "芯片": [...]} POLARITY_POS = [...] POLARITY_NEG = [...]
代码:ontology.py · 文档 docs/ontology.md
2 · 处理

数据怎么处理(7 步管线)

所有 RawEvent 进入 ingest_pipeline.ingest_raw() 后,严格按下面 7 步串行处理。顺序非常关键:第 4 步 LLM 放在「公司反哺」之后, 是为了让 LLM 能纠正反哺引入的 over-fire 主线;第 6 步 numeric_overrides 放在 extract 之后, 保证抓取层 / 人工指定有最终决定权。

规则分类

classify(text)always
扫 ontology 词典, 命中 主线 / 场景 / 维度; 极性词正/负 → thesis_impact (增强 / 削弱 / 中性); 命中数转 confidence ∈ [0,1]
RawEvent.text
{threads, scenarios, dimensions, thesis_impact, confidence}

公司命中

match_companies(conn, text)always · 除非 raw 自带
扫 companies 表的 name + aliases, 返回命中公司列表。如果 RawEvent 已经带 companies 字段(抓取层指定), 跳过此步。
text + companies 表(name/aliases)
["海格通信", "信科移动", ...]

公司主线反哺

enrich_with_company_threadsalways
用命中公司的 thread 字段(在 companies 表中预定义)追加到 cls.threads。这一步对纯产品 / 财务通稿很有用 —— 文本里没主线关键词, 公司本身能告诉我们是哪条线。副作用:可能引入 over-fire(公司有多个业务线时, 文本只讲了一个但全主线都被加进来)。
cls.threads + 命中公司
cls.threads ∪ companies[i].thread

LLM 收口(可选)

classify_with_llm(text, rules, llm)conf<0.55 或 ≥3 threads · 无 key 则 no-op
Phase 2.1 兜底层 · provider 可插拔(OpenAI / Claude / Anthropic / DeepSeek / Qwen 等任何 OpenAI 兼容接口)· prompt 带 ontology 收口校验 · 输出 LLM 修订过的 threads / scenarios / dimensions / thesis_impact / confidence。放在反哺之后,可以纠正反哺引入的 over-fire。无 API key 时优雅降级,不报错。
text + rules-based cls (含反哺结果)
LLM 修订的 cls,或原 cls(未触发 / 无 key 时)

数字字段抽取

extract_numeric(text)always
正则扫中文金额(亿 / 万自动换算) · 时间窗短语 · 客户主体(命名实体) · numeric_evidence 留原文片段做可追溯。Phase 2 的核心新能力。
RawEvent.text
{order_amount_cny_yi, capex_cny_yi, opex_cny_yi, time_window, customer_subject, numeric_evidence}

numeric_overrides 覆盖

numeric.update(raw.numeric_overrides)仅当 raw 带 override
RawEvent 或 API 调用者显式传的 numeric 字段, 整体覆盖 extract 自动提取的结果 — 抓取层 / 人工对数字字段有最终决定权。只覆盖 非 None 的字段。
numeric + raw.numeric_overrides
numeric (with overrides applied)

落盘入 events 表

insert_event(conn, ...)always
把 title / content / source / url / occurred_at + cls + companies + numeric 写入 events · 返回新 event_id · 列表 / dict 字段以 JSON 字符串存(threads / scenarios / dimensions / next_indicators / companies / numeric_evidence)。
完整 event payload
event_id · 持久化到 SQLite
3 · 存储

数据落在哪里(4 张表 · SQLite)

所有持久化数据落在 agent/data/agent.db(可由 SATAGENT_DB 环境变量重指向)。schema 在 db.py 维护,带幂等 migrate_schema —— 已有 DB 在加新列时不丢数据。列里凡是会塞 list / dict 的, 一律以 JSON 字符串存。

events17 列 · 主表
idINTEGER PKautoincrement
titleTEXTRawEvent.title
contentTEXTRawEvent.text
source / urlTEXTRawEvent.source/url
occurred_atTEXTRawEvent.occurred_at
created_atTEXTDB 默认 CURRENT_TIMESTAMP
threads (JSON)TEXTclassify + enrich + LLM
scenarios (JSON)TEXTclassify + LLM
dimensions (JSON)TEXTclassify + LLM
thesis_impactTEXTclassify (极性词)
confidenceREALclassify (命中数)
next_indicatorsTEXTRawEvent
companies (JSON)TEXTmatch_companies
order_amount_cny_yiREALextract / override · NEW
capex / opex_cny_yiREALextract / override · NEW
time_windowTEXTextract · NEW
customer_subjectTEXTextract · NEW
numeric_evidenceTEXTextract (原文片段) · NEW
companies13 列 · 公司卡片
idINTEGER PKautoincrement
name UNIQUETEXTseed
threadTEXTseed · 供反哺
products / customers / aliasesTEXT(JSON)seed
revenue_mappingTEXTseed
moat / riskTEXTseed
scoreREALseed · 评分
ordersTEXT(JSON)seed / 用户更新 · NEW
peer_rank / deltaINTEGERdecision 更新 · NEW
last_quarter_revenue_cny_yiREAL用户填入 · valuation 用 · NEW
market_model6 列 · 84 行种子
idINTEGER PKautoincrement
threadTEXTseed
yearINTEGERseed (2025-2031)
scenarioTEXTseed (悲观/中性/乐观)
value_cny_yiREALseed · 占位数
noteTEXTseed
UNIQUE(thread, year, scenario) · market_model.dynamic_market_model 不写本表, 只读 + 拼窗口 events
thesis_state4 列 · 实时评分(写回)
idINTEGER PKautoincrement
threadTEXTthesis.update_thesis_scores · NEW
statementTEXTthesis · 命题文本 · NEW
scoreREALthesis · 窗口聚合 · NEW
updated_atTEXTDB CURRENT_TIMESTAMP · NEW
唯一一张被分析层反向写的表 · GET /thesis/state?refresh=true 时触发
4 · 分析

数据怎么被再读 / 衍生(7 个分析模块)

分析层全部以 events + companies + market_model 为输入, 派生出决策视角的衍生数据。除 thesis.update_thesis_scores 会反向写 thesis_state 外, 其他都是纯读, 输出直接走 CLI / REST。

phase 3a · core

decision · 双视角决策

读窗口 events + companies → 计算 thread sentiment / money / event_count → 与 baseline_weeks 平均值对比 → 输出 top drivers / 战略 / 公司位势矩阵 / 仓位信号(加 / 减 / 持)+ 证据链。
读:events (窗口 + baseline) · companies
写:无(纯计算)
出口:/decision/weekly · satagent decision
phase 3b · new

valuation · PE 估值映射

读 companies.orders + last_quarter_revenue · 应用 ValuationAssumptions(净利率 / PE 悲观-基准-乐观 / 订单转年化营收率)→ PE 法估值。透明: 所有假设都在 API 参数里可覆盖。不构成投资建议。
读:companies(orders, last_quarter_revenue_cny_yi)
写:
出口:/valuation · satagent valuation
phase 3b' · new

market_model · 动态视图

读 market_model 表的 seed 基线 + 窗口内 events 的 capex / 订单数, 拼成 seed-vs-实测对照视图 + 证据链(具体哪些 events 提供了实测数)。只读不持久化, persist=False 是固定。
读:market_model · events (窗口)
写:
出口:/market-model/dynamic
phase 3 · new

thesis · 实时评分 + alerts

读窗口 events 计算各主线的 thesis 评分 · refresh=true 时写回 thesis_state 表(整层唯一反向流) · compute_alerts 输出风险预警(thesis 削弱事件 + 风险维度命中)。
读:events
写:thesis_state(refresh 时)
出口:/thesis/state · /alerts
skeleton · new

debate · 多 agent 辩论

读窗口 events 拆 Bull / Bear 证据 → _bull_case / _bear_case / _judge 三角色 → 输出裁决 + 置信度乘数。当前是 skeleton(143 行), ArgWriter 是 default,接 LLM 后 3 角色可真跑。
读:events (按 thread + 窗口)
写:
出口:/debate?thread=终端
phase 1

report · 周报聚合

读窗口 events 简单聚合 → markdown / JSON 周报。不做决策, 不算位势, 是 Phase 1 的输出形式, 现在被 decision 取代但保留。
读:events
写:
出口:/report/weekly · satagent report
cross-cutting · new

observability · 运行指标

扫所有表算计数 · 渲染 Prometheus 文本曝光格式。事件总数 / 各主线计数 / thesis 评分 / 错误率 等。零依赖,可被任何 Prometheus / Grafana 抓取。
读:events · companies · thesis_state
写:
出口:/metrics · satagent stats
5 · 出口

数据最终到哪里去(5 种出口)

同一份决策数据可以经过不同 sink 走出去 —— CLI 给运维 / 工程, REST 给前端 / 集成, 多格式文件给业务方分发, Prometheus 给监控, 两张 brief 给 CEO 和投资人。所有出口都基于同一个 decision report dict, 出口选择只决定包装。

stdout

CLI 子命令

13+ 个 satagent 子命令 · 全部走 click + json.dumps · 工程 / 运维 / 调试主用。
satagent decision --view ceo satagent valuation --thread 终端 satagent regress satagent stats
代码:cli.py
json · http

REST JSON

14+ FastAPI 端点 · 标准 JSON 响应 · 自动 OpenAPI 文档 (/docs) · 可选 Bearer token。
GET /events?thread=终端 GET /decision/weekly?view=both GET /thesis/state?refresh=true GET /metrics
代码:api.py
multi-format

分发文件 (export.py)

同一份 decision report → 5 种格式:md(默认)/ html(零依赖)/ csv(零依赖)/ docx / pptx(后两需 .[export] extra)。Phase 3c 落地。
GET /decision/weekly?format=html GET /decision/weekly?format=csv GET /decision/weekly?format=docx
代码:export.py
prometheus text

监控指标

Prometheus 0.0.4 文本曝光格式 · 不要求装 prometheus_client(observability.py 自渲染)· 可被 Grafana / VictoriaMetrics / 阿里 ARMS 等抓取。
GET /metrics # HELP events_total ... # TYPE events_total counter events_total{thread="终端"} 12
代码:observability.py
human · brief

双视角决策简报

产品核心承载 · 2 张人类可读的 brief:CEO 视角(主驱动 + 战略)和投资视角(公司位势 + 仓位信号 + 证据链)· decision.render_ceo_view / render_investor_view / render_both。
satagent decision --view ceo satagent decision --view investor satagent decision --view both
代码:decision.py
downstream

(未来)前端 web/app.html

规划中的下一站 · web/app.html 切到真实 API (/decision/weekly + /events + /thesis/state) · 当前仍是 mock 数据 · sync 后 remaining option C。
// 待开发 fetch('/decision/weekly?view=both') .then(r => r.json())
代码:web/app.html (待改)
6 · 端到端追踪

一条事件的整段路

用一条典型的中文新闻短文, 逐步演示它从 RawEvent 到 CEO brief 的全过程。每一步显示数据的具体形态变化, 与上面的 7 步管线一一对应。

0 · RAW外部 input
// 来源:JSONL · samples/events.jsonl 第 1 行 RawEvent( title="海格通信中标某军种卫星终端订单", text="海格通信中标某军种卫星终端订单,合同金额 5.2 亿元,2026 年三季度交付", source="公司公告", occurred_at="2026-05-30", numeric_overrides=None, companies=None, )
1 · classify规则分类
classify(text) → { threads: ["终端"], // 命中"卫星终端" scenarios: [], dimensions: ["市场空间"], // 命中"订单" thesis_impact: "增强", // 极性词"中标" confidence: 0.62, }
2 · match公司命中
match_companies(conn, text) → ["海格通信"] // companies 表里有此 name
3 · enrich公司主线反哺
enrich_with_company_threads(cls, ["海格通信"]) → { threads: ["终端", "芯片"], // 海格主业是终端+芯片 // "芯片" 是 over-fire — 文本没讲 scenarios: [], dimensions: ["市场空间"], thesis_impact: "增强", confidence: 0.62, }
4 · LLM触发判定
// 触发门:confidence=0.62 ≥ 0.55 · 且 threads=2 < 3 → 不触发 // 即便配了 SATAGENT_LLM_API_KEY 也 skip,保留规则结果 // 实际 stub 验证:LLM 在真实集触发率约 20% cls unchanged
5 · extract数字字段
extract_numeric(text) → { order_amount_cny_yi: 5.2, // "5.2 亿元" → 5.2 capex_cny_yi: None, opex_cny_yi: None, time_window: "2026 年三季度", customer_subject: "某军种", numeric_evidence: "合同金额 5.2 亿元,2026 年三季度交付", }
6 · overrideraw 覆盖
// raw.numeric_overrides 是 None → skip numeric unchanged
7 · insert入库
insert_event(conn, ...) → event_id=17 // events 表插入一行,列表字段全部 json.dumps: threads='["终端","芯片"]' companies='["海格通信"]' order_amount_cny_yi=5.2 time_window='2026 年三季度' ...
8 · decision窗口聚合
decide(conn, window=7) → // 本周窗口 { window: {...}, event_count: 8, ceo: { top_drivers: [ {name: "终端订单加速", score: 2.3, evidence: [17, ...]}, ... ], strategy: ["进入", "加速终端 BOM 锁定"], }, investor: { company_matrix: [{name: "海格通信", peer_rank: 1, ...}], position_signals: [{company: "海格通信", signal: "加", ...}], } }
9 · renderbrief 输出
render_ceo_view(report) → "## 本周主驱动\n\n- 终端订单加速 (score 2.3)\n 证据:event#17 海格中标 5.2 亿\n..." // 同一份 report 走不同 sink: GET /decision/weekly?format=html → HTML GET /decision/weekly?format=csv → CSV (零依赖) GET /decision/weekly?format=docx → DOCX (.[export]) GET /decision/weekly?format=pptx → PPTX