create-pipeline

star 0

Use when the user asks to create a GlassFlow ETL pipeline. Walks through source configuration (Kafka or OTLP), optional dedup/filter/transform stages, and ClickHouse sink mapping, then submits the v3 config to the GlassFlow API. Triggers on phrases like "create a pipeline", "set up a Kafka to ClickHouse pipeline", "ingest logs from OTLP into ClickHouse".

glassflow By glassflow schedule Updated 6/8/2026

name: create-pipeline description: Use when the user asks to create a GlassFlow ETL pipeline. Walks through source configuration (Kafka or OTLP), optional dedup/filter/transform stages, and ClickHouse sink mapping, then submits the v3 config to the GlassFlow API. Triggers on phrases like "create a pipeline", "set up a Kafka to ClickHouse pipeline", "ingest logs from OTLP into ClickHouse". tools: Bash, Read, Write

Create a GlassFlow pipeline

Turn the user's intent into a v3 pipeline config and submit it to the GlassFlow API at POST /api/v1/pipeline.

Prerequisites

  • A reachable GlassFlow API. Ask the user for the endpoint if not given. Default http://localhost:8081.
  • For Kafka sources: broker addresses and credentials (PLAINTEXT / SASL_PLAINTEXT / SSL / SASL_SSL).
  • For OTLP sources: no source-side credentials needed; the receiver routes by header.
  • ClickHouse target: host, port, database, username, password, table name. The destination table must exist before the pipeline runs.

Inputs to gather

Ask the user upfront. Do not invent values, especially credentials or table names.

Pipeline identity

  • pipeline_id: lowercase letters/numbers/hyphens, 5-40 chars, starts with a letter, ends with a letter or number, no consecutive hyphens. Example: orders-prod.
  • name: human-readable label.

Source (exactly one in v1 of this skill)

  • Type: kafka or otlp.logs or otlp.metrics.
  • Kafka: brokers[], protocol, mechanism (if SASL), username, password, topic, schema_fields[] (each {name, type}; type is string | int | float | bool | array | map).
  • OTLP: just a source_id. Schema is fixed by signal type.

Optional transforms (any subset, applied in order)

  • dedup: key field + time window (e.g. 30s, 1m, 1h).
  • filter: an expr-lang expression returning bool. Events where the expression is TRUE are KEPT.
  • stateless: array of {expression, output_name, output_type} entries.

Sink (ClickHouse)

  • host, port (default 9000), http_port (default 8123), database, username, password, secure (bool).
  • table name.
  • mapping[]: each entry {name, column_name, column_type}. name references either a source schema_fields entry or a stateless transform output_name.
  • max_batch_size (default 1000), max_delay_time (default 60s).

Steps

1. Build the JSON

Skeleton:

{
  "version": "v3",
  "pipeline_id": "<id>",
  "name": "<name>",
  "sources": [{"type": "kafka", "source_id": "...", "topic": "...", "connection_params": {...}, "schema_fields": [...]}],
  "transforms": [],
  "sink": {
    "type": "clickhouse",
    "connection_params": {"host": "...", "port": "9000", "http_port": "8123", "database": "...", "username": "...", "password": "...", "secure": false},
    "table": "...",
    "max_batch_size": 1000,
    "max_delay_time": "5s",
    "mapping": [{"name": "...", "column_name": "...", "column_type": "..."}]
  }
}

Write it to /tmp/pipeline-<pipeline_id>.json and cat it back so the user sees what will be submitted.

2. Confirm before POST

Always confirm the destination before submitting. Example:

"About to create pipeline orders-prod at http://localhost:8081. Sink writes to analytics.orders. Proceed?"

Wait for explicit yes.

3. Submit

curl -s -o /tmp/pipeline-resp.json -w "%{http_code}\n" -X POST \
  "<api-endpoint>/api/v1/pipeline" \
  -H 'Content-Type: application/json' \
  --data @/tmp/pipeline-<id>.json
cat /tmp/pipeline-resp.json

Expected: HTTP 200 with body {}.

4. Handle errors

HTTP Likely cause Next move
400 / 422 API rejected the config. Body contains a specific error path. Read the error, identify the offending field, propose a fix, ask the user before resubmitting.
409 Pipeline ID already exists. Pick a different ID, or DELETE /api/v1/pipeline/<id> first if the user confirms removal is OK.
500 Server-side issue. Capture the body verbatim, show the user, do not retry silently.
000 / timeout Network or unreachable API. Ask the user to confirm endpoint and any port-forward state.

5. Wait for the pipeline to reach Running

A successful POST means the config was accepted, not that the pipeline is ready to process events. The operator reconciles the workloads in the background; the pipeline only ingests once overall_status reaches Running. Poll the health endpoint until then:

ENDPOINT="<api-endpoint>"
PID="<id>"
for i in {1..60}; do
  STATUS=$(curl -s "$ENDPOINT/api/v1/pipeline/$PID/health" | python3 -c "import sys,json; print(json.load(sys.stdin).get('overall_status','?'))")
  echo "  attempt $i: overall_status=$STATUS"
  case "$STATUS" in
    Running)                       echo "  ready"; break ;;
    Failed)                        echo "  pipeline failed; capture the health body and surface to the user"; exit 1 ;;
    Stopping|Stopped|Terminating)  echo "  pipeline is being torn down; aborting wait"; exit 1 ;;
  esac
  sleep 5
done

Typical transition: Created immediately after POST, then Running within 10-60 seconds once the operator has stamped the workloads. If the status stays at Created for more than ~2 minutes, surface the full health body to the user; the most common causes are missing K8s resources or an image-pull issue on the data-plane Pods.

Note: Running reflects control-plane state, not data flow. The components are up; whether events are actually moving needs the verify step below.

6. Verify end to end

curl -s "<api-endpoint>/api/v1/pipeline/<id>" | head -c 500

Confirm the pipeline shape is what you submitted. Then exercise it:

  • Kafka source: produce a test message matching the schema (the send-kafka-messages skill if available).
  • OTLP source: POST a sample to <otlp-endpoint>/v1/logs (or /v1/metrics) with header x-glassflow-pipeline-id: <id>.

Query ClickHouse to confirm rows landed:

SELECT count(*), max(<timestamp_column>) FROM <db>.<table>

Common pitfalls

  • Pipeline ID format. Uppercase letters or consecutive hyphens cause a 500 from the API today (not a 4xx), so the error message is unhelpful. Validate locally before submitting: regex ^[a-z][a-z0-9-]{3,38}[a-z0-9]$, no --.
  • OTLP requires x-glassflow-pipeline-id header on every request. The receiver returns 400 without it, and a 5xx with a retry hang on an unknown ID.
  • Disable gzip on OTLP exporters. The receiver does not decompress; compression: none on the otlp_http exporter.
  • The destination table must exist. The pipeline does not create it. If it is missing, the sink fails on first batch.

Output

When the pipeline is created successfully, report:

  • Pipeline ID and API endpoint.
  • Path to the JSON used (/tmp/pipeline-<id>.json).
  • Concrete test-message recipe for the configured source.
  • The verification query for the sink table.

When the API rejects, surface the error verbatim and offer one fix proposal.

Install via CLI
npx skills add https://github.com/glassflow/agent-skills --skill create-pipeline
Repository Details
star Stars 0
call_split Forks 0
navigation Branch main
article Path SKILL.md
More from Creator