tune-pipeline

star 0

Use when a GlassFlow pipeline is experiencing backpressure, sustained sink lag, or throughput pain. Reads GlassFlow's emitted Prometheus metrics to confirm the bottleneck (backpressure gauges, stream depth ratio, source vs sink rate, sink batch size distribution), proposes targeted changes to replicas, batch sizes, stream limits, or dedup storage, applies them via the appropriate API endpoint, and verifies improvement by re-reading the same metrics. Triggers on phrases like "pipeline has backpressure", "tune pipeline X", "throughput is too low", "size resources for pipeline Y".

glassflow By glassflow schedule Updated 6/8/2026

name: tune-pipeline description: Use when a GlassFlow pipeline is experiencing backpressure, sustained sink lag, or throughput pain. Reads GlassFlow's emitted Prometheus metrics to confirm the bottleneck (backpressure gauges, stream depth ratio, source vs sink rate, sink batch size distribution), proposes targeted changes to replicas, batch sizes, stream limits, or dedup storage, applies them via the appropriate API endpoint, and verifies improvement by re-reading the same metrics. Triggers on phrases like "pipeline has backpressure", "tune pipeline X", "throughput is too low", "size resources for pipeline Y". tools: Bash, Read, Write

Tune a GlassFlow pipeline

Move a pipeline from "barely keeping up" to "comfortably keeping up" without overprovisioning. Confirm the bottleneck from metrics, propose a specific change tied to a measured signal, apply it via the right endpoint, then re-read the metrics to verify the change helped.

Prerequisites

  • A reachable GlassFlow API. Default http://localhost:8081; ask the user if remote.
  • Access to GlassFlow's emitted metrics. One of:
    • A Prometheus (or compatible) endpoint that scrapes the OTel Collector. The skill queries via <prometheus>/api/v1/query and /api/v1/query_range.
    • Direct access to the OTel Collector's /metrics endpoint at {release}-otel-collector.{namespace}.svc.cluster.local:9090/metrics.
  • A clear baseline: which signal is bad and by how much, captured before the change so the after-comparison is meaningful.

Inputs to gather

  • pipeline_id: required.
  • prometheus_endpoint: required.
  • Symptom: required. One of (each maps to a specific metric in step 2):
    • Ingestor backpressure (gauge stays at 1, episodes lengthening)
    • NATS stream near capacity (stream_depth_ratio sustained > 0.8)
    • Sink lag (sink rate trails source rate)
    • DLQ filling under load (dlq_records_written_total rate elevated)
    • Sink retry exhaustion (sink_retries_total{outcome="exhausted"} > 0)
  • Target: required. Either a target throughput (events/sec), a target lag (seconds or messages), or "eliminate backpressure".
  • Optional constraints: max replicas, memory ceiling, storage ceiling.
  • api_endpoint: optional; default http://localhost:8081.
  • metric_namespace: optional; defaults to glassflow.

Steps

1. Capture the baseline

Snapshot the current state of both the configuration and the metrics that describe the symptom. You need a clean baseline so step 6 (verify) has something to compare against.

ENDPOINT="<api-endpoint>"
PID="<id>"
NS="<metric_namespace>"
PROM="<prometheus_endpoint>"

# Config + resources
curl -s "$ENDPOINT/api/v1/pipeline/$PID" > /tmp/tune-$PID-pipeline.json
curl -s "$ENDPOINT/api/v1/pipeline/$PID/resources" > /tmp/tune-$PID-resources.json
curl -s "$ENDPOINT/api/v1/pipeline/$PID/health" | python3 -m json.tool

# Save the symptom metrics as a baseline (run all of these)
{
  echo "=== source rate (events/sec, 5m) ==="
  curl -sG "$PROM/api/v1/query" --data-urlencode "query=rate(${NS}_gfm_kafka_records_read_total{pipeline_id=\"$PID\"}[5m])"
  echo "=== sink rate (events/sec, 5m) ==="
  curl -sG "$PROM/api/v1/query" --data-urlencode "query=rate(${NS}_gfm_clickhouse_records_written_total{pipeline_id=\"$PID\"}[5m])"
  echo "=== ingestor backpressure active ==="
  curl -sG "$PROM/api/v1/query" --data-urlencode "query=${NS}_gfm_ingestor_backpressure_active{pipeline_id=\"$PID\"}"
  echo "=== stream depth ratio ==="
  curl -sG "$PROM/api/v1/query" --data-urlencode "query=${NS}_gfm_stream_depth_ratio{pipeline_id=\"$PID\"}"
  echo "=== sink batch size records (p50, p95) ==="
  curl -sG "$PROM/api/v1/query" --data-urlencode "query=histogram_quantile(0.5, sum by (le) (rate(${NS}_gfm_sink_batch_size_records_bucket{pipeline_id=\"$PID\"}[5m])))"
  curl -sG "$PROM/api/v1/query" --data-urlencode "query=histogram_quantile(0.95, sum by (le) (rate(${NS}_gfm_sink_batch_size_records_bucket{pipeline_id=\"$PID\"}[5m])))"
  echo "=== sink retries (outcome) ==="
  curl -sG "$PROM/api/v1/query" --data-urlencode "query=sum by (outcome) (rate(${NS}_gfm_sink_retries_total{pipeline_id=\"$PID\"}[5m]))"
} > /tmp/tune-$PID-before.txt

The pipeline must be Running for metrics to reflect live state. If it is Failed or Stopped, use the debug-pipeline skill first.

2. Map the symptom to a metric and a tunable

Each row below names the canonical metric signature and the matching config knob. Pick the row that matches the user's symptom; do not propose changes without a matching metric.

Metric signature Likely bottleneck Tunable
ingestor_backpressure_active == 1 sustained, stream_depth_ratio near 1.0 Downstream consumer slower than the source; NATS stream filling Scale the slowest downstream component (often sink replicas) or grow the stream (resources.nats.stream.max_messages / .max_bytes)
kafka_records_read_total rate growing but consumer-group lag also growing, ingestor CPU low Ingestor parallelism capped by partition count Increase Kafka topic partitions (Kafka-side change, outside GlassFlow), then increase resources.sources[].replicas
kafka_records_read_total rate flat near zero with non-zero topic lag Ingestor under-provisioned per replica Raise resources.sources[].limits.cpu and .memory
clickhouse_records_written_total rate consistently below source rate; sink_batch_size_records p95 hitting the configured max_batch_size Sink throttled by batch frequency / size Tune sink.max_batch_size upward and/or sink.max_delay_time downward via /edit, or scale resources.sink.replicas
sink_retries_total{outcome="retry"} rate > 0 sustained Sink hitting retryable CH errors (overload, connection drops) Raise resources.sink.replicas; check ClickHouse capacity separately
sink_retries_total{outcome="exhausted"} rate > 0 Sink giving up on batches; data loss risk Out of tuning scope: surface to the user and run debug-pipeline to find the root cause
processing_duration_seconds bucket high on stage=dedup_* with growing PVC Dedup window too long for storage budget Shrink dedup time_window (via /edit) or grow resources.transform[].storage.size
sink_errors_by_classification_total{classification="terminal"} > 0 Schema or type mismatch on the destination table Out of tuning scope: this is a config problem, not a sizing problem

If no row matches, do not guess. Re-run debug-pipeline or ask the user to narrow the signal.

3. Propose a concrete diff

Build a specific change with rationale tied to a measured value. Example to show the user:

Bottleneck: sink rate is 5,200 events/sec while source rate is 14,800 events/sec (gap = 9,600). sink_batch_size_records p95 is at 5,000 (matches the configured max_batch_size), so the sink is batch-bound, not throughput-bound on individual inserts.

Proposed change:

  • sink.max_batch_size: 5000 -> 20000 (let the sink form larger batches; reduces per-insert overhead)
  • sink.max_delay_time: 5s -> 2s (flush sooner so larger batches do not wait too long)
  • resources.sink.replicas: 1 -> 2 (parallelize inserts; ClickHouse accepts concurrent INSERTs to the same table)

Expected effect: sink rate climbs to >= 14,000 events/sec within ~5 minutes, stream_depth_ratio falls below 0.5, ingestor_backpressure_active returns to 0.

Apply via: stop pipeline, PUT /resources for the replica change, POST /edit for the batch tuning, restart.

Wait for the user to confirm before submitting any change.

4. Apply the change

Choose the endpoint by what is changing.

Resource changes (replicas, requests/limits, storage, NATS stream sizing): require the pipeline to be Stopped, then PUT /api/v1/pipeline/<id>/resources.

# Stop
curl -s -o /tmp/tune-stop.json -w "%{http_code}\n" -X POST "$ENDPOINT/api/v1/pipeline/$PID/stop"

# Wait for Stopped
until [ "$(curl -s "$ENDPOINT/api/v1/pipeline/$PID/health" | python3 -c "import sys,json; print(json.load(sys.stdin).get('overall_status'))")" = "Stopped" ]; do sleep 2; done

# Apply resource diff (full PipelineResources body required)
curl -s -o /tmp/tune-put.json -w "%{http_code}\n" -X PUT "$ENDPOINT/api/v1/pipeline/$PID/resources" \
  -H 'Content-Type: application/json' \
  --data @/tmp/tune-$PID-new-resources.json

# Resume
curl -s -o /tmp/tune-resume.json -w "%{http_code}\n" -X POST "$ENDPOINT/api/v1/pipeline/$PID/resume"

Config changes (batch size, delay, dedup window, transforms, filter): use POST /api/v1/pipeline/<id>/edit with the full new pipeline JSON. /edit applies the config and resumes the pipeline in one call.

curl -s -o /tmp/tune-edit.json -w "%{http_code}\n" -X POST "$ENDPOINT/api/v1/pipeline/$PID/edit" \
  -H 'Content-Type: application/json' \
  --data @/tmp/tune-$PID-new-pipeline.json

Mixed changes (resources + config) require both calls in sequence: stop, PUT /resources, then /edit (which resumes) or /resume.

5. Wait for the pipeline to be Running again

for i in {1..60}; do
  STATUS=$(curl -s "$ENDPOINT/api/v1/pipeline/$PID/health" | python3 -c "import sys,json; print(json.load(sys.stdin).get('overall_status','?'))")
  echo "  attempt $i: overall_status=$STATUS"
  case "$STATUS" in
    Running)                       echo "  ready"; break ;;
    Failed)                        echo "  pipeline failed after change; revert"; exit 1 ;;
    Stopping|Stopped|Terminating)  echo "  pipeline did not resume; investigate"; exit 1 ;;
  esac
  sleep 5
done

6. Verify the change helped (metrics, not assumptions)

Wait at least 5 minutes after the pipeline returns to Running so the rate windows refill, then re-pull the same baseline queries from step 1:

{
  echo "=== source rate (events/sec, 5m) ==="
  curl -sG "$PROM/api/v1/query" --data-urlencode "query=rate(${NS}_gfm_kafka_records_read_total{pipeline_id=\"$PID\"}[5m])"
  echo "=== sink rate (events/sec, 5m) ==="
  curl -sG "$PROM/api/v1/query" --data-urlencode "query=rate(${NS}_gfm_clickhouse_records_written_total{pipeline_id=\"$PID\"}[5m])"
  echo "=== ingestor backpressure active ==="
  curl -sG "$PROM/api/v1/query" --data-urlencode "query=${NS}_gfm_ingestor_backpressure_active{pipeline_id=\"$PID\"}"
  echo "=== stream depth ratio ==="
  curl -sG "$PROM/api/v1/query" --data-urlencode "query=${NS}_gfm_stream_depth_ratio{pipeline_id=\"$PID\"}"
  echo "=== sink batch size records (p50, p95) ==="
  curl -sG "$PROM/api/v1/query" --data-urlencode "query=histogram_quantile(0.5, sum by (le) (rate(${NS}_gfm_sink_batch_size_records_bucket{pipeline_id=\"$PID\"}[5m])))"
  curl -sG "$PROM/api/v1/query" --data-urlencode "query=histogram_quantile(0.95, sum by (le) (rate(${NS}_gfm_sink_batch_size_records_bucket{pipeline_id=\"$PID\"}[5m])))"
  echo "=== sink retries (outcome) ==="
  curl -sG "$PROM/api/v1/query" --data-urlencode "query=sum by (outcome) (rate(${NS}_gfm_sink_retries_total{pipeline_id=\"$PID\"}[5m]))"
} > /tmp/tune-$PID-after.txt

Compare before and after, then report:

Change applied:    <diff summary>
Observation window: 5 minutes
Symptom metric:    <metric name>
Before:            <value>
After:             <value>
Other signals:
  source rate:        <before> -> <after>
  sink rate:          <before> -> <after>
  stream_depth_ratio: <before> -> <after>
  ingestor BP active: <before> -> <after>
Verdict:           <improved | no change | regressed>
Next move:         <accept | revert | try a different tunable>

If the metric improved as predicted, accept. If it did not, revert the change before trying anything else. Stacking unverified changes hides which one helped or hurt.

Common pitfalls

  • resources.nats.stream.max_age and .max_bytes are immutable after pipeline creation. Changing them requires a new pipeline. Other NATS-stream fields are mutable. Surface this constraint when the proposed change involves these fields.
  • Resource updates require the pipeline to be Stopped. PUT /resources returns 409 with a current_status error if you forget. Always stop first.
  • /edit requires Running (or the appropriate transition target). It is not a universal "apply config" endpoint; it has lifecycle preconditions.
  • More replicas does not always help. Increasing ingestor replicas beyond the Kafka topic's partition count buys nothing; replicas idle on partitions they cannot claim. Check partition count before suggesting parallelism increases.
  • Increasing sink.max_batch_size shifts memory pressure to the sink Pods. If the sink limit is memory: 512Mi, a 50k batch with heavy rows may OOM. Raise limits.memory alongside.
  • Dedup window changes do not retroactively re-emit duplicates. Shrinking the window only affects records evaluated after the change lands; it does not flush state already in BadgerDB.
  • Counters are cumulative; always wrap in rate() over the same window for before/after comparison. Comparing raw counter values across the change is meaningless.
  • {namespace} in metric names is the Helm release namespace, not the per-pipeline one. A non-default install changes every metric name.
  • Changes are not free. Stopping a pipeline incurs reconciliation time (10-60s). Doing many small tuning iterations is expensive. Batch related changes when you can.
Install via CLI
npx skills add https://github.com/glassflow/agent-skills --skill tune-pipeline
Repository Details
star Stars 0
call_split Forks 0
navigation Branch main
article Path SKILL.md
More from Creator