data-pipeline

star 0

YQuant 数据管道框架。所有外部数据(API采集、文件导入、图片解析、消息提取)统一经由本管道处理,完成 Extract → Transform → Validate → Load 全流程。

PascalMore By PascalMore schedule Updated 6/12/2026

name: data-pipeline description: YQuant 数据管道框架。所有外部数据(API采集、文件导入、图片解析、消息提取)统一经由本管道处理,完成 Extract → Transform → Validate → Load 全流程。

Data Pipeline

核心定位

data-pipeline 是 YQuant 的统一数据摄入框架,负责将各种来源的原始数据转化为结构化、已校验的存储数据。

所有数据流均走同一套管道,只是 Extractor(采集节点) 不同:

  • ApiExtractor — 从 Tushare / AKShare 等 API 拉取
  • ImageParser — 从图片(飞书/Telegram截图)中提取
  • FileExtractor — 从 CSV / Excel 导入
  • MessageExtractor — 从聊天消息中解析

架构

数据输入(图片/API/文件/消息)
        ↓
  ┌─────────────┐
  │  Extractor  │  ← 采集节点(按来源切换)
  └─────────────┘
        ↓
  ┌─────────────┐
  │ Transformer │  ← 数据清洗、字段映射、类型转换
  └─────────────┘
        ↓
  ┌─────────────┐
  │  Validator  │  ← Schema 校验、必填检查、范围校验
  └─────────────┘
        ↓
  ┌─────────────┐
  │   Loader    │  ← 写入 MongoDB / 文件 / 推送通知
  └─────────────┘
        ↓
  ┌─────────────┐
  │   Codec     │  ← 序列化传输编码(JSON↔Base64,可选 zlib 压缩)
  └─────────────┘
        ↓
  存储 / 跨系统传输

目录结构

data-pipeline/
├── SKILL.md                     ← 本文件
├── scripts/
│   ├── pipeline.py              ← 管道引擎(核心编排)
│   ├── extractors/
│   │   ├── __init__.py
│   │   ├── base.py              ← Extractor 基类
│   │   ├── image_parser.py      ← 图片解析(Vision 模型)
│   │   ├── api_extractor.py     ← API 拉取(Tushare/AKShare)
│   │   ├── file_extractor.py   ← 文件导入(CSV/Excel)
│   │   └── message_extractor.py ← 聊天消息解析
│   ├── transformers/
│   │   ├── __init__.py
│   │   ├── base.py             ← Transformer 基类
│   │   ├── field_mapper.py    ← 字段映射
│   │   └── normalizer.py       ← NaN/None 规范化
│   ├── validators/
│   │   ├── __init__.py
│   │   ├── base.py            ← Validator 基类
│   │   └── schema_validator.py ← Schema 校验
│   ├── loaders/
│   │   ├── __init__.py
│   │   ├── base.py             ← Loader 基类
│   │   └── mongodb_loader.py   ← MongoDB 写入
│   └── serializers/
│       ├── __init__.py
│       └── base64_codec.py     ← JSON↔Base64 序列化(Transport 层)
└── references/
    └── schemas/                ← 各数据类型 Schema 定义
        ├── financial.yaml       ← 财务数据 Schema
        ├── price.yaml           ← 行情数据 Schema
        └── manual_input.yaml    ← 手工输入数据 Schema

Extractor 基类定义

# scripts/extractors/base.py
from abc import ABC, abstractmethod
from typing import Any

class BaseExtractor(ABC):
    """数据采集节点基类"""
    
    @property
    @abstractmethod
    def source_type(self) -> str:
        """数据来源标识,如 'tushare', 'image', 'csv'"""
        pass
    
    @abstractmethod
    async def extract(self, source: Any, **kwargs) -> list[dict]:
        """
        执行采集
        - source: 数据来源(图片路径/URL、API 参数、文件路径等)
        - 返回: 结构化数据列表,每条为 dict
        """
        pass
    
    @abstractmethod
    async def validate_source(self, source: Any) -> bool:
        """校验数据来源是否有效"""
        pass

Transformer 基类定义

# scripts/transformers/base.py
from abc import ABC, abstractmethod

class BaseTransformer(ABC):
    """数据清洗转换节点基类"""
    
    @abstractmethod
    async def transform(self, records: list[dict]) -> list[dict]:
        """对 records 进行清洗转换"""
        pass

Validator 基类定义

# scripts/validators/base.py
from abc import ABC, abstractmethod
from typing import Optional

class ValidationResult:
    def __init__(self):
        self.valid: bool = True
        self.errors: list[str] = []
        self.warnings: list[str] = []

class BaseValidator(ABC):
    """数据校验节点基类"""
    
    @abstractmethod
    async def validate(self, records: list[dict]) -> ValidationResult:
        pass

Loader 基类定义

# scripts/loaders/base.py
from abc import ABC, abstractmethod

class BaseLoader(ABC):
    """数据加载节点基类"""
    
    @property
    @abstractmethod
    def target_type(self) -> str:
        pass
    
    @abstractmethod
    async def load(self, records: list[dict]) -> dict:
        """
        执行加载
        - 返回: {"inserted": N, "updated": M, "skipped": K, "errors": [...]}
        """
        pass

Pipeline 引擎

# scripts/pipeline.py
class DataPipeline:
    """
    统一数据管道引擎
    将 Extractor → Transformer → Validator → Loader 串联
    """
    def __init__(
        self,
        extractor: BaseExtractor,
        transformer: BaseTransformer,
        validator: BaseValidator,
        loader: BaseLoader
    ):
        self.extractor = extractor
        self.transformer = transformer
        self.validator = validator
        self.loader = loader
    
    async def run(self, source: Any) -> dict:
        # 1. Extract
        raw_records = await self.extractor.extract(source)
        
        # 2. Transform
        clean_records = await self.transformer.transform(raw_records)
        
        # 3. Validate
        result = await self.validator.validate(clean_records)
        if not result.valid:
            raise ValueError(f"Validation failed: {result.errors}")
        
        # 4. Load
        load_result = await self.loader.load(clean_records)
        
        return load_result

使用示例

from pipeline import DataPipeline
from extractors.minimax_image_extractor import MiniMaxImageExtractor
from transformers.normalizer import NaNNormalizer
from validators.schema_validator import SchemaValidator
from loaders.mongodb_loader import MongoDBLoader

pipeline = DataPipeline(
    extractor=MiniMaxImageExtractor(),
    transformer=NaNNormalizer(),
    validator=SchemaValidator(schema_name="manual_input"),
    loader=MongoDBLoader(db_name="tradingagents", collection="manual_input_data")
)

# 图片输入 → 全自动管道处理
result = await pipeline.run(source="/path/to/screenshot.png")
# result: {"inserted": 1, "updated": 0, "skipped": 0, "errors": []}

已有 Extractor

Extractor 状态 说明
MiniMaxImageExtractor ✅ 已完成 从图片解析结构化数据(MiniMax CLI Vision)
ApiExtractor 待实现 从 Tushare/AKShare API 拉取
FileExtractor 待实现 从 CSV/Excel 导入
MessageExtractor 待实现 从聊天消息解析结构化数据

已有 Codec

Codec 状态 说明
Base64Codec ✅ 已完成 JSON↔Base64,支持嵌套结构聚合 + zlib level=9 压缩

Base64Codec 使用

from serializers.base64_codec import Base64Codec, encode_json, decode_base64

# 嵌套结构 + zlib 压缩(默认,推荐,用于传输/存储)
codec = Base64Codec(
    compress=True,             # zlib level=9 压缩
    data_layout="nested",     # 按 group_key 分组聚合
    group_key="产品名称",     # 分组字段
    position_fields=[        # positions 内保留的字段
        "Wind代码", "资产名称", "持仓比例", "数量", "市值(本币)"
    ]
)
b64 = codec.encode(records)   # list[dict] → Base64
data = codec.decode(b64)       # Base64 → nested dict

# 扁平结构 + 无压缩(调试场景)
codec_flat = Base64Codec(compress=False, data_layout="flat")

# 便捷函数(单行调用,默认嵌套+压缩)
b64_str = encode_json(records, compress=True,
                      group_key="产品名称",
                      position_fields=["Wind代码","资产名称","持仓比例","数量","市值(本币)"])
data = decode_base64(b64_str)

嵌套结构说明

模式 Base64 长度 适用场景
nested + gzip ~5,700 chars 生产/传输(默认)
flat + plain ~89,000 chars 调试/可读性要求高

嵌套结构将每行重复的产品级字段提取到外层,只在 positions 数组内保留持仓字段,zlib 压缩前就消除了大量冗余文本。

定位说明

JSON↔Base64 不属于传统 ETL 的 Extract/Transform/Load 步骤,而是 Transport / Serialization 层

数据输入 → Extract → Transform → [Codec: JSON↔Base64] → Load → 存储/传输
                                              ↑
                                   用于跨边界传输
                          (HTTP Header / JSON 字段 / 文件存储 / 消息队列)

开发进度

  • SKILL.md(本文档)
  • 基类定义(base.py 各层)
  • MongoDBLoader(PortfolioMongoLoader)
  • NaNNormalizer(normalize_all)
  • SchemaValidator
  • ImageParserExtractor(MiniMax Vision OCR)
  • 消息 Portfolio Pipeline(run_message_pipeline.py)
  • ApiExtractor(Tushare / AKShare)
  • FileExtractor(CSV / Excel)
  • MessageExtractor(聊天消息解析)
  • 飞书消息接入
  • Telegram 消息接入

Message Portfolio Pipeline(消息直接输入)

用户提供 TSV/CSV 文本,无需 OCR,直接解析 → Excel → normalize → MongoDB。

入口脚本

python scripts/run_message_pipeline.py -i "raw text..." -d 2026-05-03
python scripts/run_message_pipeline.py -f /path/to/data.txt -d 2026-05-03 --dry-run

Python API

import asyncio
from scripts.run_message_pipeline import run_pipeline

result = asyncio.run(run_pipeline(
    raw_text="截止日期\t产品名称\t...",
    date_str="2026-05-03",
    source_root=Path("skills/data/source/smart-money"),
    dry_run=False,
))

流程

  • Step 1: 文本解析 → Excel → source/smart-money/{date}/portfolio_{YYYYMMDD}.xlsx
  • Step 2: Excel → PaddleOCRExcelTransformernormalize_all() → basic_info / nav / position
  • Step 3: validate + MongoDB(复用现有模块)

Extractor

extractors/MessagePortfolioExtractor 读取已保存的 Excel:

from extractors import MessagePortfolioExtractor
ext = MessagePortfolioExtractor()
records = await ext.extract("2026-05-03")  # 传入日期字符串
# → [{"df": DataFrame, "source_path": "..."}]

Image Portfolio / Trade Pipeline(图片 OCR 输入)

图片 pipeline 使用 MiniMax Vision OCR,将截图解析为 DataFrame,再自动识别为 portfolio 或 trade 格式,后续统一进入 Transform → Validate → MongoDB。

批量图片并行处理模式

当用户一次发送多张图片时(≥3张),正确做法是每张图独立提交后台任务,而不是在一个 for 循环里批量后台执行:

# ❌ 错误:6张图写在一个后台命令里,输出截断,无法追踪
for img in $IMAGES; do
  run_unified_image_pipeline.py --image $img --date $DATE &
done
wait   # 输出可能被截断

# ✅ 正确:每张图独立后台任务,独立 session_id,可分别追踪
PYBIN=/home/pascal/workspace/yquant-investment/skills/apps/TradingAgents-CN/.venv/bin/python
for img in $IMAGES; do
  terminal(background=true, notify_on_complete=true,
    command="cd /home/pascal/workspace/yquant-investment && $PYBIN skills/data/data-pipeline/scripts/run_unified_image_pipeline.py --image $img --date $DATE")
done
# 等全部 notify 后再汇总

YQuant 项目 venv 查找顺序(优先级从高到低):

  1. 模块自身 venv(如 skills/xxx/.venv
  2. 项目根目录 .venv — fallback,统一环境(需手动创建)

⚠️ 不再经过 TradingAgents-CN 作为中转。每个 skill 的 venv 只管理自己。

入口脚本

cd /home/pascal/workspace/yquant-investment && \
  PYTHONPATH=/home/pascal/workspace/yquant-investment \
  .venv/bin/python \
  skills/data/data-pipeline/scripts/run_unified_image_pipeline.py \
  --image /path/to/image.jpg --date 2026-06-22

注意:根目录 .venv 需要 PYTHONPATH 才能解析项目内部 skills.xxx 导入。TradingAgents-CN venv 因架构差异不经过根目录 fallback。

关键目录语义:

  • skills/data/source/smart-money/{system_date}/image/:归档原始图片、OCR raw JSON、生成的 Excel。
  • skills/data/source/smart-money/{system_date}/review_pending/:需要人工确认的 Wind code / asset name 异常 CSV 与 JSON。
  • --date 表示截图内容里的业务日期;目录日期使用系统接收日期,避免跨日批处理混淆。

MiniMax raw/debug JSON 文件命名为 pic_{timestamp}_vision_raw.jsonpic_{timestamp}_vision_retry.jsonpic_{timestamp}_vision_error.json,只作为 OCR 审计与问题复盘材料,不作为后续入库入口。

批量图片并行处理模式

当用户一次发送多张图片时(≥3张),正确做法是每张图独立提交后台任务,而不是在一个 for 循环里批量后台执行:

# ❌ 错误:6张图写在一个后台命令里,输出截断,无法追踪
for img in $IMAGES; do
  run_unified_image_pipeline.py --image $img --date $DATE &
done
wait   # 输出可能被截断

# ✅ 正确:每张图独立后台任务,独立 session_id,可分别追踪
for img in $IMAGES; do
  terminal(background=true, notify_on_complete=true,
    command="cd /home/pascal/workspace/yquant-investment && PYTHONPATH=/home/pascal/workspace/yquant-investment .venv/bin/python skills/data/data-pipeline/scripts/run_unified_image_pipeline.py --image $img --date $DATE")
done
# 等全部 notify 后再汇总

原因:同一个 shell 命令内的多个后台子进程(&)共享一个输出流,Hermes 的 process 工具只能追踪通过 terminal(background=true) 创建的独立任务,不识别 shell 内嵌的后台子进程。输出截断时只能重新运行。

会话累积模式:如果用户分多次发送图片,每次新图片到来时继续独立后台提交,所有 pending 记录跨所有图片累积,等用户发送「就这些」等触发词后一次性展示所有 pending 汇总。

批次 closeout 后 pending 汇总命令

# 汇总所有 pending 文件中的待确认记录
from pathlib import Path
import pandas as pd

pending_files = sorted(Path(f"skills/data/source/smart-money/{date}/review_pending").glob("*_pending.csv"))
for pf in pending_files:
    df = pd.read_csv(pf)
    pending = df[df["名称复核状态"] == "pending_review"]
    if len(pending):
        print(f"\n=== {pf.name} ({len(pending)} pending) ===")
        print(pending[["产品代码","Wind代码","资产名称","持仓比例","数量","市值(本币)","名称复核状态","主数据名称","名称复核原因"]].to_string(index=False))

关键 Pitfall:不要在单个终端命令里用 & 批量后台执行多张图片。每张图必须作为独立的 terminal(background=true) 任务提交。

⚠️ Python 环境与工具链注意事项

Python 环境

venv 查找顺序:模块自身 venv/ → 项目根目录 .venv/。项目根目录 venv 已安装 pandas / openpyxl / pymongo / python-dotenv,可直接使用。

# ✅ 正确 — 根目录 .venv + PYTHONPATH
cd /home/pascal/workspace/yquant-investment && \
  PYTHONPATH=/home/pascal/workspace/yquant-investment \
  .venv/bin/python \
  skills/data/data-pipeline/scripts/run_unified_image_pipeline.py --image ... --date ...

# ❌ 错误 — 系统默认 python3 没有 pandas
python3 skills/data/data-pipeline/scripts/run_unified_image_pipeline.py
# → ModuleNotFoundError: No module named 'pandas'

Hermes execute_code 工具使用自己的 venv(无 pandas),禁止用于执行 pipeline。

Hermes 工具阻塞与 Python 执行路径

  • execute_code 工具使用 Hermes Agent 自己的 venv(无 pandas),禁止用于执行 pipeline
  • 完整 Python path 命令(如 .venv/bin/python ...不触发 Hermes 安全审批,可直接执行
  • 脚本通过 -e/-c 参数注入代码时会触发审批,不要依赖这种方式
  • 若 pipeline 执行被安全策略拦截,切换 terminal(background=true) + notify_on_complete=true 提交后台任务

图片 Vision 策略(关键 Pitfall — 两层问题)

图片处理涉及两个独立环节,每个环节都有坑:

环节 1:Gateway 自动路由(入站图片 → agent 看到什么)

Hermes gateway 收到图片时,_decide_image_input_mode() 决定两种路由:

  • native:图片作为 image_url 附在用户 turn 上,agent 直接"看到"像素。
  • text:先调 vision_analyze 把图片转成文字描述,拼到消息前面,agent 只看到文字摘要。

决策逻辑agent/image_routing.py):

  1. agent.image_input_mode 显式设置 → 遵从
  2. auxiliary.vision.provider 显式设置(非 auto/空)→ text
  3. _lookup_supports_vision(provider, model) 查 models.dev → True 则 native
  4. 否则 → text

⚠️ Pitfall:custom: 前缀的 provider 无法查 models.dev。 PROVIDER_TO_MODELS_DEV 只映射内置 provider 名(minimaxzai 等), 不包含 custom:minimax。因此当 primary model 用 custom: provider 时, _lookup_supports_vision 返回 None → 走 text 路径 → 每张图都额外调一次 vision_analyze

修复:在 config.yaml 里显式声明 supports_vision

model:
  default: MiniMax-M3
  provider: custom:minimax
  supports_vision: true    # ← 绕过 models.dev 查找,直接 native

设为 true 后,MiniMax-M3 原生视觉生效,图片作为像素直接传给模型, 省掉一次 vision_analyze 调用,且 agent 拿到的是原图而非 lossy 文字摘要。

环节 2:Agent 手动调 Vision 工具(对话中临时看图)

当 agent 需要在对话中主动分析图片时(非 gateway 自动路由):

优先 MiniMax MCP,vision_analyze 作兜底:

# Step 1: 直接用 MiniMax MCP
mcp_MiniMax_Token_Plan_MCP_understand_image(
    image_source="/path/to/img.jpg",
    prompt="识别图片中所有文字、数字、表格内容..."
)

# Step 2: 如果 MiniMax 也失败,再降级到 vision_analyze
# vision_analyze(...)  # Hermes 内置 Vision — 兜底选项

理由:MiniMax MCP 的 structured JSON 输出更完整(含 structuredContent.tables), 更适合持仓截图、交易记录等结构化数据。vision_analyze 在图片分析场景中容易 产生 Duplicate tool output 或截断输出。

双重分析问题

如果不设 supports_vision: true,同一张图会被分析两次

  1. Gateway 自动调 vision_analyze(text 路径预分析)
  2. Pipeline 正式 OCR 调 MiniMax CLI(run_unified_image_pipeline.py

两次调用走不同端点、不同额度,第二次才是正式入库。设 supports_vision: true 后环节 1 消失,agent 直接看到原图,自主决定是否调 pipeline。

静态名称更名文件路径: skills/data/data-pipeline/scripts/stock_name_corrections.py。该文件用于 OCR 识别后的 Wind code → 标准名称映射,修改后对新图片立即生效,无需重启任何服务。

Vision Provider Fallback(zai MCP)

MiniMaxImageExtractor 当前没有 fallback——MiniMax 套餐一旦耗尽,pipeline 立即失败。Fallback 走 agent 层:zai 提供的 @z_ai/mcp-server(GLM Coding Plan 套餐专属视觉 MCP,共享 5h prompt 池子,按 token 付费)。

  • 配置方法、环境变量、启用开关、协议约定:references/provider-fallback.md
  • MCP server 完整配置片段(可直接 merge 到 config.yaml):templates/mcp-servers-zai-vision.yaml
  • ⚠️ 关键变量名区分:Z_AI_API_KEY(zai MCP 用)≠ GLM_API_KEY(zai pay-as-you-go API 用)。两者在 Z.AI 后台是独立条目。

禁止行为(Pitfall)

不要为图片入库写自定义脚本或手动连接 MongoDB。正确做法只需一条命令:

cd /home/pascal/workspace/yquant-investment && \
  PYTHONPATH=/home/pascal/workspace/yquant-investment \
  .venv/bin/python \
  skills/data/data-pipeline/scripts/run_unified_image_pipeline.py \
  --image /path/to/image.jpg --date 2026-06-22

待确认行补录命令(同样需要根目录 venv):

cd /home/pascal/workspace/yquant-investment && \
  PYTHONPATH=/home/pascal/workspace/yquant-investment \
  .venv/bin/python \
  skills/data/data-pipeline/scripts/load_pending_confirmed.py \
  --csv "skills/data/source/smart-money/{date}/review_pending/{filename}_pending.csv" \
  --confirm-all

该 pipeline 自动完成:MiniMax OCR → 格式自动检测(portfolio/trade)→ Transform → Validate → MongoDB 入库。

不要写自定义 Python 脚本插入 MongoDB 记录。正确做法:

  1. 运行 run_unified_image_pipeline.py 入库主体数据
  2. 待确认行由 load_pending_confirmed.py --confirm-all 统一处理

不要用 sed 批量修改 CSV 状态再多次调用脚本。正确做法:

  • 单次确认:--confirm-all 一行命令完成
  • 分步确认:先不加 --confirm-all 入库自动通过部分,再加 --confirm-all 入库确认部分

不要绕过 pipeline 自己解析图片。用户说"图片数据入库" → 加载 data-pipeline skill → 调用 run_unified_image_pipeline.py

手动连接 MongoDB 的典型错误:凭证从 skills/.env 读取时 os.getenv('MONGODB_PASSWORD') 在非项目目录运行返回 None。正确做法是 pipeline 内部已实现的 PortfolioMongoLoader,它通过 .env 文件加载凭证,不需要也不应该手动拼接连接字符串。

Smart Money Batch Closeout(YQuant 会话集成)

当用户在飞书会话中发送图片批次后跟「图片批次已上传」等触发词时,YQuant 需要:

  1. 累积每张图片的处理结果
  2. 检测触发词后调用批次 closeout
  3. 发送 closeout 文本给用户

触发词

BATCH_END_PHRASES = [
    "图片批次已上传",
    "就这些",
    "处理完了",
    "发完了",
    "没有了",
]


def is_batch_end(message: str) -> bool:
    """检测用户消息是否为批次结束信号。"""
    return any(phrase in message for phrase in BATCH_END_PHRASES)

模块使用

所有函数在 scripts/image_batch_state.py

import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent / "scripts"))
from image_batch_state import add_image_result, close_batch_now, check_and_send_pending_closeout

YQuant 会话流程

# 1. 每张图片处理后,累积结果
pipeline_result = await run_unified_image_pipeline(...)
add_image_result(pipeline_result)

# 2. 用户发送触发词 → 调用 close_batch_now() 并发送
if is_batch_end(user_message):
    closeout = close_batch_now()
    if closeout:
        await message(closeout["message_text"])
    else:
        await message("当前没有待处理的图片批次。")

closeout 结构

close_batch_now() 返回:

{
    "kind": "smart_money_batch_closeout",
    "status": "closed_clean" | "closed_needs_confirmation" | "closed_with_failures" | "closed_dry_run" | "closed_empty",
    "totals": {"files": N, "success": N, ...},
    "mongodb_counts": {"position": N, "trade": N},
    "needs_confirmation_items": [...],
    "failed_items": [...],
    "confirmation": {"required": bool, "question": str, "expected_user_action": str},
    "message_text": "Smart Money 批次处理 Closeout\n\n状态:closed_clean\n..."
}

状态判定优先级

条件 status
0 个文件 closed_empty
有 failed closed_with_failures
有 pending_rows / partial / pending_review closed_needs_confirmation
有 dry_run closed_dry_run
全部成功 closed_clean

状态文件

运行时状态文件写在 WORKSPACE/.openclaw/

  • image_batch_results.json — 累积中的图片结果

正常情况下用户无感,仅用于 YQuant 重启后的状态恢复。本方案不使用 30s timer;批次结束完全由用户发送「图片批次已上传」等触发词决定。

待确认项补录工作流

批次处理后若有 closed_needs_confirmation 状态:

  1. YQuant 展示 closeout 报告,说明哪些行待确认(记录 pending_csv 路径)
  2. 用户确认(如"联讯仪器、惠科股份 已确认")
  3. 一行命令完成补录
python3 skills/data/data-pipeline/scripts/load_pending_confirmed.py \
  --csv "skills/data/source/smart-money/{date}/review_pending/{filename}_pending.csv" \
  --confirm-all

--confirm-all 会放行所有状态(包括 pending_reviewmissing_master),将 CSV 中 Wind 代码非空的行全部入库。

安全设计

状态 默认行为 --confirm-all 行为
pending_review ❌ 拦截 ✅ 入库
missing_master ❌ 拦截 ✅ 入库
resolved / confirmed ✅ 自动入库 ✅ 入库
空或其他 ✅ 自动入库 ✅ 入库

两次调用场景(可选):如果用户只确认了部分行,可以分步操作:

# Step 1: 先入自动通过的行
python3 load_pending_confirmed.py --csv pending.csv

# Step 2: 用户确认后,再入剩余行
python3 load_pending_confirmed.py --csv pending.csv --confirm-all
Install via CLI
npx skills add https://github.com/PascalMore/YQClaw-Investment --skill data-pipeline
Repository Details
star Stars 0
call_split Forks 0
navigation Branch main
article Path SKILL.md
More from Creator