name: agentflow description: Build and run multi-agent pipelines using AgentFlow. Use when the user wants to orchestrate codex, claude, or kimi agents in parallel, in sequence, or in iterative loops. Trigger when the user mentions multi-agent workflows, fan-out tasks, code review pipelines, iterative implementation loops, running agents on EC2/ECS, or any task that needs multiple AI agents coordinated together. Also trigger for "agentflow", "pipeline", "graph of agents", "fanout", "shard", or "run codex on remote".
AgentFlow
Build multi-agent pipelines where codex, claude, and kimi work together in dependency graphs with parallel fanout, iterative cycles, and remote execution.
Quick Start
from agentflow import Graph, codex, claude
with Graph(
"review-pipeline",
concurrency=3,
optimizer="codex",
n_run=2,
) as g:
plan = codex(task_id="plan", prompt="Plan the work.", tools="read_only")
impl = claude(task_id="impl", prompt="Implement:\n{{ nodes.plan.output }}", tools="read_write")
review = codex(task_id="review", prompt="Review:\n{{ nodes.impl.output }}")
plan >> impl >> review
print(g.to_json())
Run: agentflow run pipeline.py
Setting optimizer and n_run like this runs graph optimization between rounds, so Codex can rewrite the graph and the runtime can verify that the edited pipeline still loads and matches schema before the next round runs.
Imports
from agentflow import Graph, agent, codex, claude, kimi, evolve
from agentflow import fanout, merge # parallel shards
from agentflow import shell, python_node, sync # utility nodes
Nodes
Create agent nodes with codex(), claude(), or kimi(). Required: task_id, prompt.
codex(
task_id="name", # unique ID (required)
prompt="...", # Jinja2 template (required)
tools="read_only", # "read_only" | "read_write"
timeout_seconds=300,
retries=1,
success_criteria=[{"kind": "output_contains", "value": "PASS"}],
target={...}, # execution target (local/ssh/ec2/ecs)
env={"KEY": "val"},
)
Dependencies
Use >> to set execution order:
plan >> [impl, review] # plan before impl AND review (parallel)
[impl, review] >> merge # both before merge
Template Variables
Prompts are Jinja2 templates rendered at runtime:
{{ nodes.plan.output }} # output of completed node
{{ nodes.plan.status }} # "completed", "failed"
{{ fanouts.shards.nodes }} # all fanout members
{{ fanouts.shards.summary.completed }}
{{ item.number }} # current fanout member fields
Fanout (Parallel Shards)
fanout(node, source) -- source type determines mode:
# int = count (N identical copies)
shards = fanout(codex(task_id="shard", prompt="Shard {{ item.number }}/{{ item.count }}"), 128)
# list = values (one per item)
reviews = fanout(
codex(task_id="review", prompt="Review {{ item.repo }}"),
[{"repo": "api"}, {"repo": "billing"}],
)
# dict = matrix (cartesian product)
fuzz = fanout(
codex(task_id="fuzz", prompt="{{ item.target }} + {{ item.sanitizer }}"),
{"lib": [{"target": "libpng"}], "check": [{"sanitizer": "asan"}, {"sanitizer": "ubsan"}]},
)
item fields
| Field | Type | Example |
|---|---|---|
item.index |
int | 0, 1, 2 |
item.number |
int | 1, 2, 3 (1-indexed) |
item.count |
int | total copies |
item.suffix |
str | "000", "001" (zero-padded) |
item.node_id |
str | "shard_001" |
item.<key> |
Any | dict keys lifted from values |
derive (computed fields)
fanout(node, 128, derive={"workspace": "agents/{{ item.suffix }}"})
Merge (Reduce Fanout)
merge(node, source, by=[...] | size=N):
# Batch reduce: one reducer per 16 shards
batch = merge(
codex(task_id="batch", prompt="Reduce shards {{ item.start_number }}-{{ item.end_number }}"),
shards, size=16,
)
# Group by field value
family = merge(
codex(task_id="family", prompt="Reduce {{ item.target }}"),
fuzz, by=["target"],
)
Merge adds: item.member_ids, item.members, item.size, item.source_group.
At runtime: item.scope.nodes, item.scope.outputs, item.scope.summary, item.scope.with_output.
Cycles (Iterative Loops)
Use on_failure back-edges for retry-until-success patterns:
with Graph("iterative", max_iterations=5) as g:
write = codex(task_id="write", prompt=(
"Write the code.\n"
"{% if nodes.review.output %}Fix: {{ nodes.review.output }}{% endif %}"
), tools="read_write")
review = claude(task_id="review", prompt=(
"Review:\n{{ nodes.write.output }}\n"
"If complete, say LGTM. Otherwise list issues."
), success_criteria=[{"kind": "output_contains", "value": "LGTM"}])
done = codex(task_id="done", prompt="Summarize:\n{{ nodes.write.output }}")
write >> review
review.on_failure >> write # loop back until LGTM
review >> done # exit on success
Execution Targets
Local (default)
No target needed. Runs on the host machine.
SSH
target={"kind": "ssh", "host": "server", "username": "deploy"}
# forward_credentials=True to override remote with local codex/claude/kimi auth
target={"kind": "ssh", "host": "server", "forward_credentials": True}
EC2 (auto-discovers AMI, key pair, VPC)
target={"kind": "ec2", "region": "us-east-1"}
# Optional: instance_type, terminate, snapshot, shared, spot
ECS Fargate (auto-discovers VPC, builds agent image)
target={"kind": "ecs", "region": "us-east-1"}
# Optional: image, cpu, memory, install_agents, cluster
Shared instances
Same shared ID = same instance across nodes:
plan = codex(task_id="plan", ..., target={"kind": "ec2", "shared": "dev"})
impl = codex(task_id="impl", ..., target={"kind": "ec2", "shared": "dev"})
# Both run on same EC2, files persist between them
Worktrees
Isolate each agent in its own git worktree so they can edit files without conflicts:
with Graph("review", use_worktree=True) as g:
reviewers = fanout(
codex(task_id="reviewer", prompt="Review {{ item.file }}", tools="read_write"),
[{"file": "api.py"}, {"file": "auth.py"}, {"file": "db.py"}],
)
Each agent gets a full repo copy at .agentflow/worktrees/<run_id>/<node_id>/. Cleaned up after execution.
Utility Nodes
Non-LLM nodes for deterministic operations (no API calls, instant execution):
# Run a shell script
build = shell(task_id="build", script="npm run build && echo OK")
# Run Python code
validate = python_node(task_id="validate", code="import json; print(json.dumps({'ok': True}))")
# Sync local repo to remote (rclone or tar+ssh fallback)
deploy = sync(task_id="deploy", mode="full", target={
"kind": "ssh", "host": "server", "username": "deploy", "remote_workdir": "/app",
})
# mode="repo": .git + stash only (lightweight)
# mode="full": entire directory
Mix with agent nodes freely: build >> codex(...) >> deploy
Tuned Agents
Use evolve(...) when you already have one or more completed Codex nodes and want AgentFlow to turn their traces into a reusable tuned agent:
from agentflow import Graph, codex, evolve
with Graph("improve-codex", working_dir=".") as g:
source = codex(task_id="plan", prompt="Inspect the repository and summarize the main problems.")
tuned = evolve(source, target="codex", optimizer="codex")
What this does:
- Collects
trace.jsonlfrom the selected Codex nodes - Loads
agent_tuner/<profile>.yamlfrom the pipelineworking_dir - Clones the target repo, lets the optimizer agent patch it, then runs build/test/smoke
- Registers the resulting version under
.agentflow/tuned_agents/<name>/versions/<version>/
Typical CLI flow after a completed run:
agentflow runs
agentflow evolve <run_id> -n <node_id> --target codex --profile codex --optimizer codex
agentflow tuned-agents
agentflow tuned-agent codex_tuned --output json
To reuse the generated tuned agent in a later pipeline, use a custom agent name:
from agentflow import Graph, agent
with Graph("use-tuned", working_dir=".") as g:
agent("codex_tuned", task_id="verify", prompt="Reply with exactly READY.")
Important constraints:
- The pipeline
working_dirmust be the workspace that containsagent_tuner/and.agentflow/ - The source run must include Codex trace artifacts
- Tuned agents currently require a local target
- If Codex's own sandbox cannot start in an externally sandboxed/containerized environment, pass
env={"AGENTFLOW_CODEX_SANDBOX_MODE": "danger-full-access"}on the source node or in the tuner profileenv:block
Scratchboard
Enable shared memory across all agents:
with Graph("campaign", scratchboard=True) as g:
...
All agents get a scratchboard.md file to read context and write findings.
Graph Options
Graph("name",
concurrency=4, # max parallel nodes
fail_fast=False, # skip downstream on failure
max_iterations=10, # cycle iteration limit
scratchboard=False, # shared memory file
use_worktree=False, # git worktree per agent
node_defaults={...}, # defaults for all nodes
agent_defaults={...}, # per-agent defaults
)
Add optimizer controls when you want AgentFlow to rewrite the graph before execution:
optimizer: the interactive agent (one ofcodex,claude,kimi) that rewrites the graph for the next round.n_run: total number of graph rounds to execute; set it to2or higher to enable optimization rounds before the final run.
Graph Optimization Rounds
Use top-level optimizer and n_run to run optimization rounds on the pipeline graph before execution.
Supported optimizers: codex, claude, kimi.
Set n_run > 1 to enable per-round optimization behavior.
Artifacts and logs are written under .agentflow/runs/<run_id>/optimization/round-XXX/:
pipeline.original.pypipeline.edited.pygraph_report.jsonoptimizer-prompt.txtoptimizer-result.jsonoptimizer-validation.json
Example pipeline with optimizer rounds:
from agentflow import Graph, codex
with Graph(
"optimization-demo",
optimizer="codex",
n_run=2,
concurrency=2,
) as g:
plan = codex(task_id="plan", prompt="Outline the tasks in the ticket.")
review = codex(task_id="review", prompt="Review the plan for missing steps.")
summary = codex(task_id="summary", prompt="Summarize approved next actions.")
plan >> review >> summary
print(g.to_json())
CLI
agentflow run pipeline.py # run pipeline
agentflow run pipeline.py --output summary
agentflow inspect pipeline.py # show graph structure
agentflow validate pipeline.py # check without running
agentflow templates # list starter templates
agentflow init > pipeline.py # scaffold starter