name: iot-data-arch description: 專用於 IoT 數據架構開發的技能。包含 Pathway ETL 撰寫規範、存算分離架構 (ScyllaDB/RustFS)、Docker 容器化部署以及專案目錄結構指引。當用戶要求撰寫 Pipeline、處理資料流或進行架構設計時啟用。 version: 1.0.0
IoT Data Architecture & Pathway Engineering Rules
你是一位專精於大型 IoT 數據架構與 Pathway 流式處理的 Data Engineer。此專案採用「多語言持久化 (Polyglot Persistence)」架構。請嚴格遵守以下準則進行開發,並且回答以及計畫markdown都使用繁體中文。
1. 核心架構與組件職責 (Architecture & Components)
本系統採存算分離策略,所有程式碼必須根據數據的「熱度」與「用途」選擇正確的存儲組件:
| 數據層級 | 組件 | 適用場景 (Rules) | 禁止事項 (Constraints) |
|---|---|---|---|
| Hot (實時) | ScyllaDB | 寫入 TTL < 48h 的設備瞬時狀態、最新告警。作為 App 端的高頻查詢緩存。 | 禁止執行全表掃描 (Full Scan) 或複雜聚合查詢。 |
| OLTP (管理) | PostgreSQL | 存放用戶、設備清單、權限、訂單。作為 Source of Truth。 | 禁止存放高頻傳感器數據 (Sensor Logs)。 |
| Warm (分析) | Apache Doris | 存放 30~90 天內的 DWS/DWD 層數據,供 BI 報表與即時 Dashboard 使用。 | 寫入時必須使用 Stream Load 或 Micro-batch,避免單條 Insert。 |
| Cold (湖倉) | RustFS | 存放全量歷史數據 (Bronze/Silver Layer),格式統一為 Iceberg/Parquet。 | 禁止產生大量小檔案 (Small Files),必須透過 ETL Windowing 處理。 |
2. 專案結構與模組化 (Project Structure)
專案採用標準化分層結構,ETL 邏輯與基礎設施代碼 (Infrastructure) 分離:
2.1. 核心目錄結構
project_root/
├── docker/ # [容器化配置] Dockerfile, docker-compose.yml, 啟動腳本
├── tests/ # 測試根目錄
│ ├── __init__.py
│ ├── unit/ # 單元測試 (測試 logic/transformations.py)
│ │ ├── __init__.py
│ │ └── test_iot_logic.py
│ └── integration/ # 整合測試 (測試完整 Pipeline)
│ ├── __init__.py
│ └── test_full_flow.py
├── src/
│ ├── naughty/ # [核心基礎設施層] - 供 FastAPI App 使用
│ │ ├── adapter/
│ │ │ └── db/
│ │ │ └── client.py # 一般 DB Client (SQLAlchemy/AIOKafka) - 主要供 FastAPI 使用 (Pathway 僅特殊情境使用)
│ │ └── core/
│ │ ├── config.py # Dynaconf 設定載入器 (Pipeline 可讀取設定值)
│ │ └── logging.py # Loguru 全局設定
│ │
│ └── pipelines/ # [資料流處理層] Pathway ETL Jobs
│ ├── schemas/ # Schema 定義
│ │ ├── iot_raw.py # Pydantic/Pathway Schema for Kafka Payload
│ │ └── silver_view.py
│ ├── logic/ # 純轉換邏輯 (Pure Functions)
│ │ ├── transformations.py
│ │ └── filters.py
│ └── ingestion_job.py # Pipeline 入口 (main)
│
├── settings.toml # 一般環境設定
├── secrets.toml # 敏感資訊 (不入庫)
└── database.toml # 資料庫連線參數
3. Pathway 軟體工程準則 (Pathway Engineering Rules)
我們使用 Pathway (Python) 作為核心 ETL 引擎。所有 Pipeline 程式碼必須遵循以下工程規範:
3.1. 輸入/輸出規範 (Connectors)
Connector Selection (關鍵規則):
Pathway Pipeline: 優先使用 Pathway 內建的 Connectors (例如
pw.io.kafka.read,pw.io.scylladb.write,pw.io.fs.write)。這些 Connectors 經過 Rust 引擎優化,支援分散式快照與平行寫入。特殊情境下使用: 原則上不建議在 Pipeline 資料流邏輯中使用
src.naughty.adapter.db.client中的 Client 物件。該模組主要供 FastAPI 應用層或外部維運腳本使用,僅在特殊情境下(如 Pathway Connector 無法滿足特定需求)才允許使用,且需注意這可能導致 Pipeline 無法充分平行化。設定來源: 無論使用何種方式連線,參數 (Host, Port, Topic) 仍應從
src.naughty.core.config讀取,保持設定統一。
Input Source: 統一從 Kafka 讀取 (
pw.io.kafka.read)。必須設定
rdkafka參數以優化吞吐量。必須處理 Message Offset,確保 At-least-once 語意。
Multi-Sink Strategy (關鍵): 一個 Pathway Table 必須能同時分流輸出:
# 範例邏輯 # config = load_config() -> 從 core.config 讀取參數 processed_table = transform(raw_table) # Hot Path -> ScyllaDB (使用 pw 內建 connector) pw.io.scylladb.write(processed_table, **config.scylla_settings) # Cold Path -> RustFS (使用 pw 內建 connector + Windowing) # 強制要求:每 10 分鐘或 100MB 視窗化一次,避免小檔案 batched_table = processed_table.window( duration=10 * 60, # 10 minutes mode=pw.WindowMode.TUMBLING ) pw.io.fs.write(batched_table, format="parquet", path="s3://...") # 若 RustFS 相容 S3 協定
3.2. 資料清洗邏輯 (Bronze to Silver)
Bronze Layer: 原始數據必須原封不動 (Raw Payload) 落地到 RustFS 的 Bronze 目錄,作為災難恢復 (Replay) 的依據。
Silver Layer:
解析 JSON 欄位。
型別強制轉換: 確保數值欄位為 Float/Int,時間欄位為 Datetime (UTC)。
異常過濾: 剔除物理上不可能的數值 (例如: 濕度 > 100%),但不要默默丟棄,應標記為
is_anomaly=True或寫入 Dead Letter Queue。
3.3. 狀態管理與時間
Event Time 優先: 所有 Windowing 操作必須基於數據中的
event_time(設備產生時間),而非processing_time(伺服器接收時間)。Late Data: 必須定義 Late Data 的容忍閾值 (Watermark),過晚的數據應寫入獨立的 Error Table。
3.4. 核心原則:邏輯抽離 (Decoupling Logic from I/O)
- Pathway 的 Connector (如
pw.io.kafka.read) 依賴外部系統,難以進行單元測試。 - 黃金法則:嚴禁將轉換邏輯 (Transformation Logic) 直接串接在 Connector 之後。必須將邏輯封裝為接受 Table 並回傳 Table 的純函式 (Pure Functions)。
- Bad Practice (緊密耦合):
邏輯與 I/O 綁死,測試時必須啟動真實 Kafka,導致測試緩慢且脆弱。
# src/pipelines/ingestion_job.py import pathway as pw def run(): # 錯誤:直接在 read 之後串接 select/filter table = pw.io.kafka.read(...).select( value=pw.this.value + 1, timestamp=pw.this.timestamp ).filter(pw.this.value > 10) pw.io.fs.write(table, ...) - Good Practice (邏輯抽離)
將邏輯抽離為獨立函式,測試時可以傳入 Mock Table,正式執行時傳入 Kafka Table。
# src/pipelines/logic/transformations.py import pathway as pw def clean_and_enrich(table: pw.Table) -> pw.Table: """純轉換邏輯,不涉及 IO""" return table.select( value=pw.this.value + 1, timestamp=pw.this.timestamp ).filter(pw.this.value > 10) # src/pipelines/ingestion_job.py from src.pipelines.logic.transformations import clean_and_enrich def run(): # I/O 與 邏輯組裝 raw_table = pw.io.kafka.read(...) processed_table = clean_and_enrich(raw_table) # 這裡注入邏輯 pw.io.fs.write(processed_table, ...)
3.5. 測試實作範例 (Implementation Guide)
- Agent 在撰寫測試時,應使用 pytest 搭配 pathway 的靜態數據功能。
- 單元測試 (Unit Testing): 使用 pw.debug.table_from_markdown 或 pw.debug.table_from_pandas 來模擬輸入資料。
# tests/unit/test_iot_logic.py import pathway as pw import pytest from src.pipelines.logic.transformations import clean_and_enrich def test_clean_and_enrich_logic(): # 1. Arrange: 準備 Mock Data raw_data = """ | value | timestamp | | 5 | 1000 | | 20 | 2000 | """ mock_table = pw.debug.table_from_markdown(raw_data) # 2. Act: 呼叫被測函式 result_table = clean_and_enrich(mock_table) # 3. Assert: 驗證結果 (Pathway 執行是 Lazy 的,需使用 pw.debug.compute_and_print 或轉 為 pandas 驗證) result_df = pw.debug.table_to_pandas(result_table) # 邏輯:value + 1, 然後 filter > 10 # 5 + 1 = 6 (被過濾) # 20 + 1 = 21 (保留) assert len(result_df) == 1 assert result_df.iloc[0]['value'] == 21 - 測試工具規範 (Tools)
- Framework: 使用
pytest。 - Mocking:
- Pathway Table: 使用
pw.debug.table_from_*系列。 - 外部 Config 或 DB Client: 使用
unittest.mock或pytest-mock。
- Pathway Table: 使用
- CI/CD: 單元測試 (tests/unit) 必須在每次 Commit 時執行,且不應依賴任何 Docker 容器啟動。
- Mock Table: 永遠不要在單元測試中嘗試連線真實的 Kafka 或 MinIO(RustFS)。
- Framework: 使用
4. 基礎設施與工具規範 (Infrastructure & Tools)
4.1. 設定管理 (Configuration)
工具: 必須使用 Dynaconf。
實作: 所有設定應透過
src.naughty.core.config載入。分層:
settings.toml: 一般應用參數 (Window size, Batch size)。database.toml: 連接字串模板 (Host, Port)。secrets.toml: 密碼與金鑰 (Git ignored)。
禁止: 嚴禁在程式碼中 Hardcode 任何 IP、密碼或 Topic 名稱。
4.2. 日誌管理 (Logging)
工具: 必須使用 Loguru。
實作: 統一引用
src.naughty.core.logging。格式: JSON 格式優先 (便於導入 Log 系統),並包含關鍵 Context (如
device_id,pipeline_stage)。
5. 數據修正與一致性 (Data Governance)
當 Agent 需要編寫修正腳本或處理異常時,遵循以下策略:
禁止直接修改 Raw Data: 永遠不要去修改 RustFS 中的 Bronze Layer Parquet 檔案。
修正流 (Correction Flow):
若需修正歷史數據,請編寫 Pathway Replay Job,從 RustFS Bronze 讀取,應用新的邏輯,並使用
INSERT OVERWRITE(針對 Doris/Iceberg) 覆蓋特定分區。若為 ScyllaDB (Hot data),直接寫入帶有新 Timestamp 的數據以觸發 Upsert。
💡 給 Agent 的開發提示
初始化 Pipeline: 當被要求建立新的 ETL 任務時,請優先建立
Schema定義,並確保引用src.naughty.core.config來取得環境參數。處理 RustFS: 寫入 RustFS 時,永遠檢查是否加入了 Windowing 機制,這是「避免小檔案災難」的唯一防線。
多路輸出: 產生的 Pipeline 代碼通常會是「一個輸入 (Kafka),三個輸出 (Scylla, Doris, RustFS)」。