name: agently-dynamic-task description: Use when the user needs Agently TaskDAG / Dynamic Task, model-generated or app-submitted DAG planning, TaskDAG validation, DynamicTaskResolver handlers, TaskDAGExecutor execution, or the Agently.create_dynamic_task compatibility facade. TaskDAG is the DAG foundation capability; Dynamic Task is the convenience facade over it and uses TriggerFlow as the execution substrate.
Agently Dynamic Task
Use this skill when the problem is a dynamic task graph: a model or application submits DAG data that must be planned, validated, pruned, resolved to handlers, and executed.
TaskDAG is the Agently DAG foundation capability. Dynamic Task is the current
compatibility and convenience facade over TaskDAG planning and execution, not a
TriggerFlow sub-API and not the strategic Agent task lifecycle. TriggerFlow is
the execution substrate under TaskDAGExecutor, while
Agently.create_dynamic_task(...) and agent.create_dynamic_task(...) remain
compact app-facing facade entrypoints.
Native-First Rules
- use
Agently.create_dynamic_task(...)oragent.create_dynamic_task(...)when ordinary app code wants the compact facade - when a DAG is only one bounded phase inside a broader business task,
keep
agent.create_task(...)oragent.create_task_loop(...)as the outer AgentExecution owner and expose the DAG as an explicitly enabled Agent candidate; do not turn the Dynamic Task facade into the long-running AgentTask lifecycle owner - split into
AgentlyTaskDAGPlanner,TaskDAGValidator,DynamicTaskResolver, andTaskDAGExecutoronly when staged control is required - use
plan=<TaskDAG>when the caller already owns the DAG and should skip model planning - use
TaskDAG.from_yaml(...)orTaskDAG.from_json(...)when the submitted DAG is a reviewed config artifact - use submitted DAG
inputsplaceholders for small runtime wiring:${INIT}/${INIT.foo}for initial graph input,${DEPS.task_id.path}for dependency results,${STATE...}for execution-local state, and${TRIGGER...}for the raw TriggerFlow trigger payload. Slot names are case-insensitive, but examples should use uppercase. Whole-string placeholders preserve the original value type; embedded placeholders stringify into the surrounding text. Missing runtime paths fail closed during execution - when a submitted DAG runs through
agent.create_execution(), let${INIT...}readuse_dynamic_task(graph_input=...)when explicit, otherwise the frozen execution promptinputslot, then{"target": task_target}; do not add a secondinputs.inputor task-local prompt mapping surface - use
planner=<agent-or-provider-settings>when the model must generate the DAG - use
model=<agent-or-provider-settings>for model task execution resources - use
handlers={"risk_check_handler": handler}for local/custom tasks; handler names should be explicit and usually end in_handler - pass
actions=...orskills=...only when the planner should be allowed to use those task kinds; do not expose them by default - use Agently output schemas for model-owned semantic outputs instead of parsing model text with example-local JSON/code-block parsers
- keep backend business-system calls mockable in examples, but keep model-owned planning, classification, drafting, evaluation, revising, and final response generation model-driven
- require each model task that matters to have a clear output schema, either through facade-level
output_schemaor node-levelinputs.output_schema - set model-task
inputs.output_formatby result type when the plan is generated or submitted: omit it to let the request readprompt.default_output_format(global defaultjson); usejsonfor compact machine-control data, action arguments, routing flags, standalone booleans/numbers, strict extraction, model judges, and dense all-typed arrays/objects;xml_fieldfor flat string long text/code/HTML/SVG/Markdown/SQL/templates; explicithybridfor long prose/code fields mixed with typed list/object/boolean/number fields when retry latency is acceptable; explicitflat_markdownonly for legacy section-header compatibility; explicityaml_literalonly when a YAML target document is intentionally desired;autoonly when structural schema-driven selection and retry latency are acceptable after the target model has passed representative stability checks. qwen2.5:7b checks found hybrid can omit section headers or echo old scaffold comments into text fields - let
TaskDAGValidatorreject duplicate ids, missing dependencies, cycles, unsupported required task kinds, schema-version mismatches, unsafe side-effect policy, and unknown required handlers before execution - allow unknown optional handlers to be pruned only when they do not affect required semantic outputs, required downstream nodes, approvals, or side-effect policy
- keep generated plan data in execution input or execution state; do not use shared TriggerFlow flow data for per-run DAG state
- use
${INIT...}/${DEPS...}placeholders for small declarative input wiring; for larger transformations, keep the logic in a handler or model task - do not teach planners to use internal resolver entries such as
validateoremitas ordinary task kinds - do not mark ordinary read-only model tasks as network side effects just because they call an LLM provider
- do not add approval gates to analysis, synthesis, drafting, or final response model tasks unless the user explicitly asks for approval
API Shape
Submitted DAG:
from agently import Agently
async def risk_check_handler(context):
return {
"task_id": context.task.id,
"deps": dict(context.dependency_results),
}
task = Agently.create_dynamic_task(
target="review policy",
plan={
"graph_id": "review",
"task_schema_version": "task_dag/v1",
"tasks": [
{"id": "extract", "kind": "local", "binding": "risk_check_handler"},
{
"id": "final",
"kind": "local",
"binding": "risk_check_handler",
"depends_on": ["extract"],
"inputs": {
"source": "${INIT}",
"extract_result": "${DEPS.extract}",
},
},
],
"semantic_outputs": {"final": "final"},
},
handlers={"risk_check_handler": risk_check_handler},
)
snapshot = await task.async_start(timeout=10)
result = snapshot["semantic_outputs"]["final"]
Config-backed submitted DAG:
from agently import Agently
from agently.core import TaskDAG
graph = TaskDAG.from_yaml("plans/review.yaml")
task = Agently.create_dynamic_task(
target="review policy",
plan=graph,
handlers={"risk_check_handler": risk_check_handler},
)
snapshot = await task.async_run(graph_input={"doc": "policy"}, timeout=10)
Use TaskDAG.from_json(...) for JSON/JSON5 files or raw content. Both loaders
support task_dag_key_path="plans.review" for selecting one DAG from a larger
config. Use graph.get_yaml(path) or graph.get_json(path) to export a
normalized graph.
Auto-planned DAG:
from agently import Agently
task = Agently.create_dynamic_task(
target=user_goal,
planner=planner_agent,
model=worker_agent,
output_schema={
"answer": (str, "final user-facing answer", True),
},
)
graph = await task.async_plan(max_retries=3)
task.validate(graph, strict_schema_version=True)
snapshot = await task.async_run(graph_input={"goal": user_goal}, timeout=30)
Lower-level integration:
from agently.builtins.plugins import AgentlyTaskDAGPlanner
from agently.core import DynamicTaskResolver, TaskDAGExecutor, TaskDAGValidator
resolver = DynamicTaskResolver({"risk_check_handler": risk_check_handler})
validator = TaskDAGValidator(resolver)
planner = AgentlyTaskDAGPlanner(validator=validator)
graph = await planner.async_plan(planner_agent, {"target": "review policy"})
validation = validator.validate(graph, strict_schema_version=True)
snapshot = await TaskDAGExecutor(resolver, validator=validator).async_run(graph)
Planner Constraints
Planner plugins and planner prompts must declare:
- task schema version, normally
task_dag/v1 - allowed task kinds and available binding names
- required semantic output leaves
- maximum task count when the scenario has a practical limit
- validation checks that become retryable model-output failures
- forbidden behavior: unstable task ids, dependency results embedded in static task inputs, implicit side effects, hidden action/skill use, and internal resolver task kinds
The facade planner should wire .output(planner.output_schema(), format="json"), required
planner.ensure_keys(), and .validate(planner.validate_output) so structural
problems become model-output validation failures before execution. Compile/run
still revalidates the graph.
Layer ownership:
TaskDAG/TaskDAGNodedata contracts and YAML/JSON graph config belong toagently.types.dataTaskDAGExecutor,TaskDAGValidator, andDynamicTaskResolverbelong to coreAgentlyTaskDAGPlannerowns planner output schema, ensure keys, and planner instructions as a plugin concernAgently.create_dynamic_task(...)andagent.create_dynamic_task(...)are the app-facing facade entrypoints
Anti-Patterns
- do not put Dynamic Task guidance under TriggerFlow as if it were TriggerFlow syntax
- do not hand-roll a scheduler for submitted DAG data
- do not parse model text with
json.loads(...), regex marker splits, or code-block extraction when Agently output schemas can own the shape - do not expose
actionsorskillsto the planner by default - do not hide model-owned classification, drafting, quality checks, or final response generation behind deterministic local substitutes in recommended examples
- do not call low-level
TaskDAGExecutorfrom ordinary app code whenAgently.create_dynamic_task(...)is enough - do not use vague handler names such as
risk_check; userisk_check_handleror another name that describes the callable role - do not duplicate TriggerFlow docs here; explain only that Dynamic Task compiles to TriggerFlow internally
Read Next
references/overview.mdexamples/minimal.py