name: tune-pipeline description: Use when a GlassFlow pipeline is experiencing backpressure, sustained sink lag, or throughput pain. Reads GlassFlow's emitted Prometheus metrics to confirm the bottleneck (backpressure gauges, stream depth ratio, source vs sink rate, sink batch size distribution), proposes targeted changes to replicas, batch sizes, stream limits, or dedup storage, applies them via the appropriate API endpoint, and verifies improvement by re-reading the same metrics. Triggers on phrases like "pipeline has backpressure", "tune pipeline X", "throughput is too low", "size resources for pipeline Y". tools: Bash, Read, Write
Tune a GlassFlow pipeline
Move a pipeline from "barely keeping up" to "comfortably keeping up" without overprovisioning. Confirm the bottleneck from metrics, propose a specific change tied to a measured signal, apply it via the right endpoint, then re-read the metrics to verify the change helped.
Prerequisites
- A reachable GlassFlow API. Default
http://localhost:8081; ask the user if remote. - Access to GlassFlow's emitted metrics. One of:
- A Prometheus (or compatible) endpoint that scrapes the OTel Collector. The skill queries via
<prometheus>/api/v1/queryand/api/v1/query_range. - Direct access to the OTel Collector's
/metricsendpoint at{release}-otel-collector.{namespace}.svc.cluster.local:9090/metrics.
- A Prometheus (or compatible) endpoint that scrapes the OTel Collector. The skill queries via
- A clear baseline: which signal is bad and by how much, captured before the change so the after-comparison is meaningful.
Inputs to gather
pipeline_id: required.prometheus_endpoint: required.- Symptom: required. One of (each maps to a specific metric in step 2):
- Ingestor backpressure (gauge stays at 1, episodes lengthening)
- NATS stream near capacity (stream_depth_ratio sustained > 0.8)
- Sink lag (sink rate trails source rate)
- DLQ filling under load (dlq_records_written_total rate elevated)
- Sink retry exhaustion (sink_retries_total{outcome="exhausted"} > 0)
- Target: required. Either a target throughput (events/sec), a target lag (seconds or messages), or "eliminate backpressure".
- Optional constraints: max replicas, memory ceiling, storage ceiling.
api_endpoint: optional; defaulthttp://localhost:8081.metric_namespace: optional; defaults toglassflow.
Steps
1. Capture the baseline
Snapshot the current state of both the configuration and the metrics that describe the symptom. You need a clean baseline so step 6 (verify) has something to compare against.
ENDPOINT="<api-endpoint>"
PID="<id>"
NS="<metric_namespace>"
PROM="<prometheus_endpoint>"
# Config + resources
curl -s "$ENDPOINT/api/v1/pipeline/$PID" > /tmp/tune-$PID-pipeline.json
curl -s "$ENDPOINT/api/v1/pipeline/$PID/resources" > /tmp/tune-$PID-resources.json
curl -s "$ENDPOINT/api/v1/pipeline/$PID/health" | python3 -m json.tool
# Save the symptom metrics as a baseline (run all of these)
{
echo "=== source rate (events/sec, 5m) ==="
curl -sG "$PROM/api/v1/query" --data-urlencode "query=rate(${NS}_gfm_kafka_records_read_total{pipeline_id=\"$PID\"}[5m])"
echo "=== sink rate (events/sec, 5m) ==="
curl -sG "$PROM/api/v1/query" --data-urlencode "query=rate(${NS}_gfm_clickhouse_records_written_total{pipeline_id=\"$PID\"}[5m])"
echo "=== ingestor backpressure active ==="
curl -sG "$PROM/api/v1/query" --data-urlencode "query=${NS}_gfm_ingestor_backpressure_active{pipeline_id=\"$PID\"}"
echo "=== stream depth ratio ==="
curl -sG "$PROM/api/v1/query" --data-urlencode "query=${NS}_gfm_stream_depth_ratio{pipeline_id=\"$PID\"}"
echo "=== sink batch size records (p50, p95) ==="
curl -sG "$PROM/api/v1/query" --data-urlencode "query=histogram_quantile(0.5, sum by (le) (rate(${NS}_gfm_sink_batch_size_records_bucket{pipeline_id=\"$PID\"}[5m])))"
curl -sG "$PROM/api/v1/query" --data-urlencode "query=histogram_quantile(0.95, sum by (le) (rate(${NS}_gfm_sink_batch_size_records_bucket{pipeline_id=\"$PID\"}[5m])))"
echo "=== sink retries (outcome) ==="
curl -sG "$PROM/api/v1/query" --data-urlencode "query=sum by (outcome) (rate(${NS}_gfm_sink_retries_total{pipeline_id=\"$PID\"}[5m]))"
} > /tmp/tune-$PID-before.txt
The pipeline must be Running for metrics to reflect live state. If it is Failed or Stopped, use the debug-pipeline skill first.
2. Map the symptom to a metric and a tunable
Each row below names the canonical metric signature and the matching config knob. Pick the row that matches the user's symptom; do not propose changes without a matching metric.
| Metric signature | Likely bottleneck | Tunable |
|---|---|---|
ingestor_backpressure_active == 1 sustained, stream_depth_ratio near 1.0 |
Downstream consumer slower than the source; NATS stream filling | Scale the slowest downstream component (often sink replicas) or grow the stream (resources.nats.stream.max_messages / .max_bytes) |
kafka_records_read_total rate growing but consumer-group lag also growing, ingestor CPU low |
Ingestor parallelism capped by partition count | Increase Kafka topic partitions (Kafka-side change, outside GlassFlow), then increase resources.sources[].replicas |
kafka_records_read_total rate flat near zero with non-zero topic lag |
Ingestor under-provisioned per replica | Raise resources.sources[].limits.cpu and .memory |
clickhouse_records_written_total rate consistently below source rate; sink_batch_size_records p95 hitting the configured max_batch_size |
Sink throttled by batch frequency / size | Tune sink.max_batch_size upward and/or sink.max_delay_time downward via /edit, or scale resources.sink.replicas |
sink_retries_total{outcome="retry"} rate > 0 sustained |
Sink hitting retryable CH errors (overload, connection drops) | Raise resources.sink.replicas; check ClickHouse capacity separately |
sink_retries_total{outcome="exhausted"} rate > 0 |
Sink giving up on batches; data loss risk | Out of tuning scope: surface to the user and run debug-pipeline to find the root cause |
processing_duration_seconds bucket high on stage=dedup_* with growing PVC |
Dedup window too long for storage budget | Shrink dedup time_window (via /edit) or grow resources.transform[].storage.size |
sink_errors_by_classification_total{classification="terminal"} > 0 |
Schema or type mismatch on the destination table | Out of tuning scope: this is a config problem, not a sizing problem |
If no row matches, do not guess. Re-run debug-pipeline or ask the user to narrow the signal.
3. Propose a concrete diff
Build a specific change with rationale tied to a measured value. Example to show the user:
Bottleneck: sink rate is 5,200 events/sec while source rate is 14,800 events/sec (gap = 9,600).
sink_batch_size_recordsp95 is at 5,000 (matches the configuredmax_batch_size), so the sink is batch-bound, not throughput-bound on individual inserts.Proposed change:
sink.max_batch_size: 5000 -> 20000 (let the sink form larger batches; reduces per-insert overhead)sink.max_delay_time: 5s -> 2s (flush sooner so larger batches do not wait too long)resources.sink.replicas: 1 -> 2 (parallelize inserts; ClickHouse accepts concurrent INSERTs to the same table)Expected effect: sink rate climbs to >= 14,000 events/sec within ~5 minutes,
stream_depth_ratiofalls below 0.5,ingestor_backpressure_activereturns to 0.Apply via: stop pipeline, PUT /resources for the replica change, POST /edit for the batch tuning, restart.
Wait for the user to confirm before submitting any change.
4. Apply the change
Choose the endpoint by what is changing.
Resource changes (replicas, requests/limits, storage, NATS stream sizing): require the pipeline to be Stopped, then PUT /api/v1/pipeline/<id>/resources.
# Stop
curl -s -o /tmp/tune-stop.json -w "%{http_code}\n" -X POST "$ENDPOINT/api/v1/pipeline/$PID/stop"
# Wait for Stopped
until [ "$(curl -s "$ENDPOINT/api/v1/pipeline/$PID/health" | python3 -c "import sys,json; print(json.load(sys.stdin).get('overall_status'))")" = "Stopped" ]; do sleep 2; done
# Apply resource diff (full PipelineResources body required)
curl -s -o /tmp/tune-put.json -w "%{http_code}\n" -X PUT "$ENDPOINT/api/v1/pipeline/$PID/resources" \
-H 'Content-Type: application/json' \
--data @/tmp/tune-$PID-new-resources.json
# Resume
curl -s -o /tmp/tune-resume.json -w "%{http_code}\n" -X POST "$ENDPOINT/api/v1/pipeline/$PID/resume"
Config changes (batch size, delay, dedup window, transforms, filter): use POST /api/v1/pipeline/<id>/edit with the full new pipeline JSON. /edit applies the config and resumes the pipeline in one call.
curl -s -o /tmp/tune-edit.json -w "%{http_code}\n" -X POST "$ENDPOINT/api/v1/pipeline/$PID/edit" \
-H 'Content-Type: application/json' \
--data @/tmp/tune-$PID-new-pipeline.json
Mixed changes (resources + config) require both calls in sequence: stop, PUT /resources, then /edit (which resumes) or /resume.
5. Wait for the pipeline to be Running again
for i in {1..60}; do
STATUS=$(curl -s "$ENDPOINT/api/v1/pipeline/$PID/health" | python3 -c "import sys,json; print(json.load(sys.stdin).get('overall_status','?'))")
echo " attempt $i: overall_status=$STATUS"
case "$STATUS" in
Running) echo " ready"; break ;;
Failed) echo " pipeline failed after change; revert"; exit 1 ;;
Stopping|Stopped|Terminating) echo " pipeline did not resume; investigate"; exit 1 ;;
esac
sleep 5
done
6. Verify the change helped (metrics, not assumptions)
Wait at least 5 minutes after the pipeline returns to Running so the rate windows refill, then re-pull the same baseline queries from step 1:
{
echo "=== source rate (events/sec, 5m) ==="
curl -sG "$PROM/api/v1/query" --data-urlencode "query=rate(${NS}_gfm_kafka_records_read_total{pipeline_id=\"$PID\"}[5m])"
echo "=== sink rate (events/sec, 5m) ==="
curl -sG "$PROM/api/v1/query" --data-urlencode "query=rate(${NS}_gfm_clickhouse_records_written_total{pipeline_id=\"$PID\"}[5m])"
echo "=== ingestor backpressure active ==="
curl -sG "$PROM/api/v1/query" --data-urlencode "query=${NS}_gfm_ingestor_backpressure_active{pipeline_id=\"$PID\"}"
echo "=== stream depth ratio ==="
curl -sG "$PROM/api/v1/query" --data-urlencode "query=${NS}_gfm_stream_depth_ratio{pipeline_id=\"$PID\"}"
echo "=== sink batch size records (p50, p95) ==="
curl -sG "$PROM/api/v1/query" --data-urlencode "query=histogram_quantile(0.5, sum by (le) (rate(${NS}_gfm_sink_batch_size_records_bucket{pipeline_id=\"$PID\"}[5m])))"
curl -sG "$PROM/api/v1/query" --data-urlencode "query=histogram_quantile(0.95, sum by (le) (rate(${NS}_gfm_sink_batch_size_records_bucket{pipeline_id=\"$PID\"}[5m])))"
echo "=== sink retries (outcome) ==="
curl -sG "$PROM/api/v1/query" --data-urlencode "query=sum by (outcome) (rate(${NS}_gfm_sink_retries_total{pipeline_id=\"$PID\"}[5m]))"
} > /tmp/tune-$PID-after.txt
Compare before and after, then report:
Change applied: <diff summary>
Observation window: 5 minutes
Symptom metric: <metric name>
Before: <value>
After: <value>
Other signals:
source rate: <before> -> <after>
sink rate: <before> -> <after>
stream_depth_ratio: <before> -> <after>
ingestor BP active: <before> -> <after>
Verdict: <improved | no change | regressed>
Next move: <accept | revert | try a different tunable>
If the metric improved as predicted, accept. If it did not, revert the change before trying anything else. Stacking unverified changes hides which one helped or hurt.
Common pitfalls
resources.nats.stream.max_ageand.max_bytesare immutable after pipeline creation. Changing them requires a new pipeline. Other NATS-stream fields are mutable. Surface this constraint when the proposed change involves these fields.- Resource updates require the pipeline to be
Stopped.PUT /resourcesreturns 409 with acurrent_statuserror if you forget. Always stop first. /editrequiresRunning(or the appropriate transition target). It is not a universal "apply config" endpoint; it has lifecycle preconditions.- More replicas does not always help. Increasing ingestor replicas beyond the Kafka topic's partition count buys nothing; replicas idle on partitions they cannot claim. Check partition count before suggesting parallelism increases.
- Increasing
sink.max_batch_sizeshifts memory pressure to the sink Pods. If the sink limit ismemory: 512Mi, a 50k batch with heavy rows may OOM. Raiselimits.memoryalongside. - Dedup window changes do not retroactively re-emit duplicates. Shrinking the window only affects records evaluated after the change lands; it does not flush state already in BadgerDB.
- Counters are cumulative; always wrap in
rate()over the same window for before/after comparison. Comparing raw counter values across the change is meaningless. {namespace}in metric names is the Helm release namespace, not the per-pipeline one. A non-default install changes every metric name.- Changes are not free. Stopping a pipeline incurs reconciliation time (10-60s). Doing many small tuning iterations is expensive. Batch related changes when you can.