name: create-pipeline description: Use when the user asks to create a GlassFlow ETL pipeline. Walks through source configuration (Kafka or OTLP), optional dedup/filter/transform stages, and ClickHouse sink mapping, then submits the v3 config to the GlassFlow API. Triggers on phrases like "create a pipeline", "set up a Kafka to ClickHouse pipeline", "ingest logs from OTLP into ClickHouse". tools: Bash, Read, Write
Create a GlassFlow pipeline
Turn the user's intent into a v3 pipeline config and submit it to the GlassFlow API at POST /api/v1/pipeline.
Prerequisites
- A reachable GlassFlow API. Ask the user for the endpoint if not given. Default
http://localhost:8081. - For Kafka sources: broker addresses and credentials (PLAINTEXT / SASL_PLAINTEXT / SSL / SASL_SSL).
- For OTLP sources: no source-side credentials needed; the receiver routes by header.
- ClickHouse target: host, port, database, username, password, table name. The destination table must exist before the pipeline runs.
Inputs to gather
Ask the user upfront. Do not invent values, especially credentials or table names.
Pipeline identity
pipeline_id: lowercase letters/numbers/hyphens, 5-40 chars, starts with a letter, ends with a letter or number, no consecutive hyphens. Example:orders-prod.name: human-readable label.
Source (exactly one in v1 of this skill)
- Type:
kafkaorotlp.logsorotlp.metrics. - Kafka:
brokers[],protocol,mechanism(if SASL),username,password,topic,schema_fields[](each{name, type}; type isstring|int|float|bool|array|map). - OTLP: just a
source_id. Schema is fixed by signal type.
Optional transforms (any subset, applied in order)
dedup: key field + time window (e.g.30s,1m,1h).filter: an expr-lang expression returning bool. Events where the expression is TRUE are KEPT.stateless: array of{expression, output_name, output_type}entries.
Sink (ClickHouse)
host,port(default9000),http_port(default8123),database,username,password,secure(bool).tablename.mapping[]: each entry{name, column_name, column_type}.namereferences either a sourceschema_fieldsentry or a stateless transformoutput_name.max_batch_size(default1000),max_delay_time(default60s).
Steps
1. Build the JSON
Skeleton:
{
"version": "v3",
"pipeline_id": "<id>",
"name": "<name>",
"sources": [{"type": "kafka", "source_id": "...", "topic": "...", "connection_params": {...}, "schema_fields": [...]}],
"transforms": [],
"sink": {
"type": "clickhouse",
"connection_params": {"host": "...", "port": "9000", "http_port": "8123", "database": "...", "username": "...", "password": "...", "secure": false},
"table": "...",
"max_batch_size": 1000,
"max_delay_time": "5s",
"mapping": [{"name": "...", "column_name": "...", "column_type": "..."}]
}
}
Write it to /tmp/pipeline-<pipeline_id>.json and cat it back so the user sees what will be submitted.
2. Confirm before POST
Always confirm the destination before submitting. Example:
"About to create pipeline
orders-prodathttp://localhost:8081. Sink writes toanalytics.orders. Proceed?"
Wait for explicit yes.
3. Submit
curl -s -o /tmp/pipeline-resp.json -w "%{http_code}\n" -X POST \
"<api-endpoint>/api/v1/pipeline" \
-H 'Content-Type: application/json' \
--data @/tmp/pipeline-<id>.json
cat /tmp/pipeline-resp.json
Expected: HTTP 200 with body {}.
4. Handle errors
| HTTP | Likely cause | Next move |
|---|---|---|
| 400 / 422 | API rejected the config. Body contains a specific error path. | Read the error, identify the offending field, propose a fix, ask the user before resubmitting. |
| 409 | Pipeline ID already exists. | Pick a different ID, or DELETE /api/v1/pipeline/<id> first if the user confirms removal is OK. |
| 500 | Server-side issue. | Capture the body verbatim, show the user, do not retry silently. |
| 000 / timeout | Network or unreachable API. | Ask the user to confirm endpoint and any port-forward state. |
5. Wait for the pipeline to reach Running
A successful POST means the config was accepted, not that the pipeline is ready to process events. The operator reconciles the workloads in the background; the pipeline only ingests once overall_status reaches Running. Poll the health endpoint until then:
ENDPOINT="<api-endpoint>"
PID="<id>"
for i in {1..60}; do
STATUS=$(curl -s "$ENDPOINT/api/v1/pipeline/$PID/health" | python3 -c "import sys,json; print(json.load(sys.stdin).get('overall_status','?'))")
echo " attempt $i: overall_status=$STATUS"
case "$STATUS" in
Running) echo " ready"; break ;;
Failed) echo " pipeline failed; capture the health body and surface to the user"; exit 1 ;;
Stopping|Stopped|Terminating) echo " pipeline is being torn down; aborting wait"; exit 1 ;;
esac
sleep 5
done
Typical transition: Created immediately after POST, then Running within 10-60 seconds once the operator has stamped the workloads. If the status stays at Created for more than ~2 minutes, surface the full health body to the user; the most common causes are missing K8s resources or an image-pull issue on the data-plane Pods.
Note: Running reflects control-plane state, not data flow. The components are up; whether events are actually moving needs the verify step below.
6. Verify end to end
curl -s "<api-endpoint>/api/v1/pipeline/<id>" | head -c 500
Confirm the pipeline shape is what you submitted. Then exercise it:
- Kafka source: produce a test message matching the schema (the
send-kafka-messagesskill if available). - OTLP source: POST a sample to
<otlp-endpoint>/v1/logs(or/v1/metrics) with headerx-glassflow-pipeline-id: <id>.
Query ClickHouse to confirm rows landed:
SELECT count(*), max(<timestamp_column>) FROM <db>.<table>
Common pitfalls
- Pipeline ID format. Uppercase letters or consecutive hyphens cause a 500 from the API today (not a 4xx), so the error message is unhelpful. Validate locally before submitting: regex
^[a-z][a-z0-9-]{3,38}[a-z0-9]$, no--. - OTLP requires
x-glassflow-pipeline-idheader on every request. The receiver returns 400 without it, and a 5xx with a retry hang on an unknown ID. - Disable gzip on OTLP exporters. The receiver does not decompress;
compression: noneon the otlp_http exporter. - The destination table must exist. The pipeline does not create it. If it is missing, the sink fails on first batch.
Output
When the pipeline is created successfully, report:
- Pipeline ID and API endpoint.
- Path to the JSON used (
/tmp/pipeline-<id>.json). - Concrete test-message recipe for the configured source.
- The verification query for the sink table.
When the API rejects, surface the error verbatim and offer one fix proposal.