iot-data-arch

star 0

專用於 IoT 數據架構開發的技能。包含 Pathway ETL 撰寫規範、存算分離架構 (ScyllaDB/RustFS)、Docker 容器化部署以及專案目錄結構指引。當用戶要求撰寫 Pipeline、處理資料流或進行架構設計時啟用。

luanya01 By luanya01 schedule Updated 1/21/2026

name: iot-data-arch description: 專用於 IoT 數據架構開發的技能。包含 Pathway ETL 撰寫規範、存算分離架構 (ScyllaDB/RustFS)、Docker 容器化部署以及專案目錄結構指引。當用戶要求撰寫 Pipeline、處理資料流或進行架構設計時啟用。 version: 1.0.0

IoT Data Architecture & Pathway Engineering Rules

你是一位專精於大型 IoT 數據架構與 Pathway 流式處理的 Data Engineer。此專案採用「多語言持久化 (Polyglot Persistence)」架構。請嚴格遵守以下準則進行開發,並且回答以及計畫markdown都使用繁體中文

1. 核心架構與組件職責 (Architecture & Components)

本系統採存算分離策略,所有程式碼必須根據數據的「熱度」與「用途」選擇正確的存儲組件:

數據層級 組件 適用場景 (Rules) 禁止事項 (Constraints)
Hot (實時) ScyllaDB 寫入 TTL < 48h 的設備瞬時狀態、最新告警。作為 App 端的高頻查詢緩存。 禁止執行全表掃描 (Full Scan) 或複雜聚合查詢。
OLTP (管理) PostgreSQL 存放用戶、設備清單、權限、訂單。作為 Source of Truth 禁止存放高頻傳感器數據 (Sensor Logs)。
Warm (分析) Apache Doris 存放 30~90 天內的 DWS/DWD 層數據,供 BI 報表與即時 Dashboard 使用。 寫入時必須使用 Stream Load 或 Micro-batch,避免單條 Insert。
Cold (湖倉) RustFS 存放全量歷史數據 (Bronze/Silver Layer),格式統一為 Iceberg/Parquet 禁止產生大量小檔案 (Small Files),必須透過 ETL Windowing 處理。

2. 專案結構與模組化 (Project Structure)

專案採用標準化分層結構,ETL 邏輯與基礎設施代碼 (Infrastructure) 分離:

2.1. 核心目錄結構

project_root/
├── docker/                  # [容器化配置] Dockerfile, docker-compose.yml, 啟動腳本
├── tests/                       # 測試根目錄
│   ├── __init__.py
│   ├── unit/                    # 單元測試 (測試 logic/transformations.py)
│   │   ├── __init__.py
│   │   └── test_iot_logic.py
│   └── integration/             # 整合測試 (測試完整 Pipeline)
│       ├── __init__.py
│       └── test_full_flow.py
├── src/
│   ├── naughty/                 # [核心基礎設施層] - 供 FastAPI App 使用
│   │   ├── adapter/
│   │   │   └── db/
│   │   │       └── client.py    # 一般 DB Client (SQLAlchemy/AIOKafka) - 主要供 FastAPI 使用 (Pathway 僅特殊情境使用)
│   │   └── core/
│   │       ├── config.py        # Dynaconf 設定載入器 (Pipeline 可讀取設定值)
│   │       └── logging.py       # Loguru 全局設定
│   │
│   └── pipelines/               # [資料流處理層] Pathway ETL Jobs
│       ├── schemas/             # Schema 定義
│       │   ├── iot_raw.py       # Pydantic/Pathway Schema for Kafka Payload
│       │   └── silver_view.py
│       ├── logic/               # 純轉換邏輯 (Pure Functions)
│       │   ├── transformations.py
│       │   └── filters.py
│       └── ingestion_job.py     # Pipeline 入口 (main)
│
├── settings.toml                # 一般環境設定
├── secrets.toml                 # 敏感資訊 (不入庫)
└── database.toml                # 資料庫連線參數

3. Pathway 軟體工程準則 (Pathway Engineering Rules)

我們使用 Pathway (Python) 作為核心 ETL 引擎。所有 Pipeline 程式碼必須遵循以下工程規範:

3.1. 輸入/輸出規範 (Connectors)

  • Connector Selection (關鍵規則):

    • Pathway Pipeline: 優先使用 Pathway 內建的 Connectors (例如 pw.io.kafka.read, pw.io.scylladb.write, pw.io.fs.write)。這些 Connectors 經過 Rust 引擎優化,支援分散式快照與平行寫入。

    • 特殊情境下使用: 原則上不建議在 Pipeline 資料流邏輯中使用 src.naughty.adapter.db.client 中的 Client 物件。該模組主要供 FastAPI 應用層或外部維運腳本使用,僅在特殊情境下(如 Pathway Connector 無法滿足特定需求)才允許使用,且需注意這可能導致 Pipeline 無法充分平行化。

    • 設定來源: 無論使用何種方式連線,參數 (Host, Port, Topic) 仍應src.naughty.core.config 讀取,保持設定統一。

  • Input Source: 統一從 Kafka 讀取 (pw.io.kafka.read)。

    • 必須設定 rdkafka 參數以優化吞吐量。

    • 必須處理 Message Offset,確保 At-least-once 語意。

  • Multi-Sink Strategy (關鍵): 一個 Pathway Table 必須能同時分流輸出:

    # 範例邏輯
    # config = load_config() -> 從 core.config 讀取參數
    processed_table = transform(raw_table)
    
    # Hot Path -> ScyllaDB (使用 pw 內建 connector)
    pw.io.scylladb.write(processed_table, **config.scylla_settings)
    
    # Cold Path -> RustFS (使用 pw 內建 connector + Windowing)
    # 強制要求:每 10 分鐘或 100MB 視窗化一次,避免小檔案
    batched_table = processed_table.window(
        duration=10 * 60,  # 10 minutes
        mode=pw.WindowMode.TUMBLING
    )
    pw.io.fs.write(batched_table, format="parquet", path="s3://...") # 若 RustFS 相容 S3 協定
    

3.2. 資料清洗邏輯 (Bronze to Silver)

  • Bronze Layer: 原始數據必須原封不動 (Raw Payload) 落地到 RustFS 的 Bronze 目錄,作為災難恢復 (Replay) 的依據。

  • Silver Layer:

    • 解析 JSON 欄位。

    • 型別強制轉換: 確保數值欄位為 Float/Int,時間欄位為 Datetime (UTC)。

    • 異常過濾: 剔除物理上不可能的數值 (例如: 濕度 > 100%),但不要默默丟棄,應標記為 is_anomaly=True 或寫入 Dead Letter Queue。

3.3. 狀態管理與時間

  • Event Time 優先: 所有 Windowing 操作必須基於數據中的 event_time (設備產生時間),而非 processing_time (伺服器接收時間)。

  • Late Data: 必須定義 Late Data 的容忍閾值 (Watermark),過晚的數據應寫入獨立的 Error Table。

3.4. 核心原則:邏輯抽離 (Decoupling Logic from I/O)

  • Pathway 的 Connector (如 pw.io.kafka.read) 依賴外部系統,難以進行單元測試。
  • 黃金法則:嚴禁將轉換邏輯 (Transformation Logic) 直接串接在 Connector 之後。必須將邏輯封裝為接受 Table 並回傳 Table 的純函式 (Pure Functions)。
  • Bad Practice (緊密耦合): 邏輯與 I/O 綁死,測試時必須啟動真實 Kafka,導致測試緩慢且脆弱。
    # src/pipelines/ingestion_job.py
    import pathway as pw
    
    def run():
        # 錯誤:直接在 read 之後串接 select/filter
        table = pw.io.kafka.read(...).select(
            value=pw.this.value + 1,
            timestamp=pw.this.timestamp
        ).filter(pw.this.value > 10)
        
        pw.io.fs.write(table, ...)
    
  • Good Practice (邏輯抽離) 將邏輯抽離為獨立函式,測試時可以傳入 Mock Table,正式執行時傳入 Kafka Table。
    # src/pipelines/logic/transformations.py
    import pathway as pw
    
    def clean_and_enrich(table: pw.Table) -> pw.Table:
        """純轉換邏輯,不涉及 IO"""
        return table.select(
            value=pw.this.value + 1,
            timestamp=pw.this.timestamp
        ).filter(pw.this.value > 10)
    
    # src/pipelines/ingestion_job.py
    from src.pipelines.logic.transformations import clean_and_enrich
    
    def run():
        # I/O 與 邏輯組裝
        raw_table = pw.io.kafka.read(...)
        processed_table = clean_and_enrich(raw_table) # 這裡注入邏輯
        pw.io.fs.write(processed_table, ...)
    

3.5. 測試實作範例 (Implementation Guide)

  • Agent 在撰寫測試時,應使用 pytest 搭配 pathway 的靜態數據功能。
  • 單元測試 (Unit Testing): 使用 pw.debug.table_from_markdown 或 pw.debug.table_from_pandas 來模擬輸入資料。
    # tests/unit/test_iot_logic.py
    import pathway as pw
    import pytest
    from src.pipelines.logic.transformations import clean_and_enrich
    
    def test_clean_and_enrich_logic():
        # 1. Arrange: 準備 Mock Data
        raw_data = """
        | value | timestamp |
        | 5     | 1000      |
        | 20    | 2000      |
        """
        mock_table = pw.debug.table_from_markdown(raw_data)
    
        # 2. Act: 呼叫被測函式
        result_table = clean_and_enrich(mock_table)
    
        # 3. Assert: 驗證結果 (Pathway 執行是 Lazy 的,需使用 pw.debug.compute_and_print 或轉  為 pandas 驗證)
        result_df = pw.debug.table_to_pandas(result_table)
        
        # 邏輯:value + 1, 然後 filter > 10
        # 5 + 1 = 6 (被過濾)
        # 20 + 1 = 21 (保留)
        assert len(result_df) == 1
        assert result_df.iloc[0]['value'] == 21
    
  • 測試工具規範 (Tools)
    • Framework: 使用 pytest
    • Mocking:
      • Pathway Table: 使用pw.debug.table_from_*系列。
      • 外部 Config 或 DB Client: 使用unittest.mockpytest-mock
    • CI/CD: 單元測試 (tests/unit) 必須在每次 Commit 時執行,且不應依賴任何 Docker 容器啟動。
    • Mock Table: 永遠不要在單元測試中嘗試連線真實的 Kafka 或 MinIO(RustFS)。

4. 基礎設施與工具規範 (Infrastructure & Tools)

4.1. 設定管理 (Configuration)

  • 工具: 必須使用 Dynaconf

  • 實作: 所有設定應透過 src.naughty.core.config 載入。

  • 分層:

    • settings.toml: 一般應用參數 (Window size, Batch size)。

    • database.toml: 連接字串模板 (Host, Port)。

    • secrets.toml: 密碼與金鑰 (Git ignored)。

  • 禁止: 嚴禁在程式碼中 Hardcode 任何 IP、密碼或 Topic 名稱。

4.2. 日誌管理 (Logging)

  • 工具: 必須使用 Loguru

  • 實作: 統一引用 src.naughty.core.logging

  • 格式: JSON 格式優先 (便於導入 Log 系統),並包含關鍵 Context (如 device_id, pipeline_stage)。

5. 數據修正與一致性 (Data Governance)

當 Agent 需要編寫修正腳本或處理異常時,遵循以下策略:

  • 禁止直接修改 Raw Data: 永遠不要去修改 RustFS 中的 Bronze Layer Parquet 檔案。

  • 修正流 (Correction Flow):

    1. 若需修正歷史數據,請編寫 Pathway Replay Job,從 RustFS Bronze 讀取,應用新的邏輯,並使用 INSERT OVERWRITE (針對 Doris/Iceberg) 覆蓋特定分區。

    2. 若為 ScyllaDB (Hot data),直接寫入帶有新 Timestamp 的數據以觸發 Upsert。

💡 給 Agent 的開發提示

  1. 初始化 Pipeline: 當被要求建立新的 ETL 任務時,請優先建立 Schema 定義,並確保引用 src.naughty.core.config 來取得環境參數。

  2. 處理 RustFS: 寫入 RustFS 時,永遠檢查是否加入了 Windowing 機制,這是「避免小檔案災難」的唯一防線。

  3. 多路輸出: 產生的 Pipeline 代碼通常會是「一個輸入 (Kafka),三個輸出 (Scylla, Doris, RustFS)」。

Install via CLI
npx skills add https://github.com/luanya01/naughty_etl --skill iot-data-arch
Repository Details
star Stars 0
call_split Forks 0
navigation Branch main
article Path SKILL.md
More from Creator