pi-taskflow
Lightweight workflow orchestration for the Pi coding agent — declarative multi-phase taskflows with dynamic fan-out, isolated subagent context, resumable runs, and saveable commands.
Package details
Install pi-taskflow from npm and Pi will load the resources declared by the package manifest.
$ pi install npm:pi-taskflow- Package
pi-taskflow- Version
0.0.8- Published
- Jun 6, 2026
- Downloads
- not available
- Author
- heggria
- License
- MIT
- Types
- extension, skill
- Size
- 198.9 KB
- Dependencies
- 0 dependencies · 5 peers
Pi manifest JSON
{
"extensions": [
"./extensions/index.ts"
],
"skills": [
"./skills"
]
}Security note
Pi packages can execute code and influence agent behavior. Review the source before installing third-party packages.
README
Lightweight workflow orchestration for the Pi coding agent.
Orchestrate your Pi subagents. Not by prompting — by declaring.
If you've used the built-in subagent tool's task / tasks / chain, you
already know the shorthand — your runs just get tracked, resumable, and
saveable as a one-word /tf:<name> command.
pi install npm:pi-taskflow
Fan out one subagent per item, route on results, retry the flaky ones, pause for human approval, cap the spend, and gate the output with an adversarial review — all from one declarative definition. Only the final report reaches your conversation; every intermediate transcript stays in the runtime.
Why
The built-in subagent tool is great for a single delegated task. But when a job needs many coordinated steps, fan-out over dozens of items, cross-checked review, or a repeatable pipeline, you want orchestration — without the intermediate transcripts eating your context window.
pi-taskflow moves the plan into a small declarative definition. The runtime
holds the DAG, the loops, and the intermediate results; your context receives
only the final phase's output.
subagent tool |
pi-taskflow |
|
|---|---|---|
| Who drives | the model, turn by turn | the runtime, from a definition |
| Intermediate results | in your context window | in the runtime (not your context) |
| Reusable | re-described each time | saved as /tf:<name> |
| Scale | a few tasks | dynamic map fan-out |
| Resumable | no | yes (cross-session, cached phases skip) |
| Quality gates | no | gate phases with VERDICT: BLOCK / PASS |
| Conditional routing | no | when guards + join: any OR-joins |
| Fault tolerance | no | per-phase retry with backoff |
| Human-in-the-loop | no | approval phases (approve / reject / edit) |
| Cost control | no | run-wide budget (USD / token caps) |
| Composition | no | flow phases run saved sub-flows |
| Progress visibility | opaque while running | live DAG render with timing + cost |
| Ergonomics | inline JSON each time | shorthand (task/tasks/chain) or DSL |
Show me
Describe a pipeline once, then run it from a pi session by name:
/tf:summarize-files dir=src
The runtime fans out one subagent per file, merges the summaries in a reduce
phase, and returns only the final overview. Every intermediate transcript stays
in the runtime — never in your context window. (Full definition in
Quickstart below.)
Quickstart
Shorthand: same effort as subagent, but tracked & resumable
Single task — one agent, one job:
{ "task": "Summarize the architecture of src/", "agent": "explorer" }
Parallel tasks — fire several at once, outputs merge:
{ "tasks": [
{ "task": "Audit auth in src/api", "agent": "analyst" },
{ "task": "Audit input validation in src/api", "agent": "analyst" }
] }
Chain — sequential, each step sees the previous one's output:
{ "chain": [
{ "task": "List the public API of src/lib", "agent": "scout" },
{ "task": "Write docs for:\n{previous.output}", "agent": "writer" }
] }
agent is optional (defaults to the first available agent). Add name to label
the run and enable saving it as a reusable command.
Try it inline — tell the model something like:
Run a chain: first explore the auth flow, then summarize findings.
The model calls the taskflow tool; you get live progress, per-step timing,
token cost, and a run record. Ask to save it and you get /tf:<name>.
Then go declarative
When your pipeline outgrows the shorthand — when you need dynamic fan-out, intermediate JSON routing, or quality gates — graduate to the full DSL:
{
"name": "summarize-files",
"description": "Discover files, summarize each, produce a report",
"args": { "dir": { "default": "." } },
"concurrency": 8,
"phases": [
{ "id": "discover", "type": "agent", "agent": "scout",
"task": "List source files under {args.dir} (non-recursive).\nOutput ONLY a JSON array [{\"file\":\"\"}]. No prose.",
"output": "json" },
{ "id": "summarize", "type": "map",
"over": "{steps.discover.json}", "as": "item",
"agent": "scout",
"task": "Read {item.file} and give a one-sentence summary.",
"dependsOn": ["discover"] },
{ "id": "report", "type": "reduce", "from": ["summarize"],
"agent": "writer",
"task": "Combine into a short overview:\n{steps.summarize.output}",
"dependsOn": ["summarize"], "final": true }
]
}
What this does:
discover— an agent lists every file in the directory and outputs a JSON array.summarize— amapfans out, spawning one subagent per file in parallel (throttled to 8 concurrent). Each gets{item.file}bound to its file path.report— areducemerges all summaries into one clean overview.
Intermediate outputs never enter your context. The runtime owns them. You get only the final report back.
Save it once → /tf:summarize-files forever.
Route, gate, and guard
Phases also branch, retry, pause for a human, and respect a budget — still declaratively, no scripting:
{
"name": "triage-and-fix",
"budget": { "maxUSD": 1.5 },
"phases": [
{ "id": "triage", "type": "agent", "agent": "analyst", "output": "json",
"task": "Classify the bug. Output ONLY {\"severity\":\"high\"} or {\"severity\":\"low\"}." },
{ "id": "deep", "when": "{steps.triage.json.severity} == high", "dependsOn": ["triage"],
"agent": "executor_code", "task": "Root-cause and patch it.",
"retry": { "max": 2, "backoffMs": 500 } },
{ "id": "quick", "when": "{steps.triage.json.severity} == low", "dependsOn": ["triage"],
"agent": "executor_fast", "task": "Apply the quick fix." },
{ "id": "approve", "type": "approval", "join": "any", "dependsOn": ["deep", "quick"],
"task": "Review the fix before it ships." },
{ "id": "ship", "type": "agent", "dependsOn": ["approve"],
"task": "Open a PR with the change.", "final": true }
]
}
whenroutes todeeporquickfrom the triage JSON; the other branch is skipped.join: "any"letsapproverun as soon as whichever branch fired completes.retryre-runs a flaky patch with backoff;budgethalts the whole run if it gets too expensive.approvalpauses for a human (approve / reject / edit) before the finalship.
Watch it run
This is the live progress render for a real run — the self-improve flow that
writes and verifies its own test suites, caught here mid-block by a quality gate:
⊗ taskflow self-improve 6/7 · blocked · $0.095
✓ discover agent deepseek-v4-flash 10t ↑38k ↓6.7k $0.011
┌ ✓ write-runner-tests agent claude-sonnet-4-6 10t ↑13 ↓6.6k $0.020
├ ✓ write-store-tests agent claude-sonnet-4-6 10t ↑11 ↓10k $0.018
├ ✓ write-agents-tests agent claude-sonnet-4-6 10t ↑28 ↓13k $0.030
└ ✓ fix-stability agent claude-sonnet-4-6 10t ↑13 ↓3.9k $0.012
✓ verify gate BLOCK 3 type errors in test files deepseek-v4-flash
⊘ report reduce skipped · Gate blocked ↳ fix-stability
How to read it — the layout is the DAG:
- Header —
⊗means the flow is blocked (a gate halted it);6/7phases processed, aggregate cost$0.095. - Status icons —
✓done,◐running,✗failed,⊘skipped,○pending. - Rail
┌ ├ └— phases in the same DAG layer, running concurrently. The fourwrite-*/fix-stabilitytasks all fan out fromdiscover. A blank gutter is a single-phase layer. ↳— a long (layer-skipping) dependency.reportdepends onverify(the adjacent layer, implied by position) andfix-stabilitytwo layers back, so only that skip edge is annotated.- Gate —
verifyemittedVERDICT: BLOCK, so the runtime skippedreportand ended the run asblocked, surfacing the reason. - Detail — per phase: model, token counts (
↑in↓out), cost, and timing. Fan-out phases also show sub-task progress.
Phase types
| type | meaning | required fields |
|---|---|---|
agent |
one subagent runs a single task | task |
parallel |
run branches[] concurrently |
branches (array of {task, agent?}) |
map |
fan out over an array — one subagent per item, {item} bound |
over, task |
gate |
quality/review step that can halt the flow | task |
reduce |
aggregate from[] phase outputs into one |
from, task |
approval |
human-in-the-loop pause — approve / reject / edit before continuing | — |
flow |
run a saved sub-flow as one phase (composition/reuse) | use |
Common phase fields
Every phase needs a unique id and a type (defaults to agent). On top of the
per-type fields above:
| Field | Meaning |
|---|---|
agent |
Agent to run (defaults to the first discovered agent) |
dependsOn |
Phase ids this phase waits for — builds the DAG |
join |
"all" (default) waits for every dep; "any" is an OR-join |
when |
Conditional guard — skip unless the expression is truthy |
retry |
{ max, backoffMs?, factor? } — retry a failing subagent |
output |
"text" (default) or "json" (exposes {steps.ID.json}) |
model / thinking / tools |
Per-phase overrides for the subagent |
cwd |
Working directory for the subagent |
concurrency |
Fan-out cap for map / parallel (overrides the flow default) |
final |
Marks the result-bearing phase (else the last phase wins) |
optional |
A failure here does not abort the run |
use / with |
(flow) saved sub-flow name + its args |
Flow-level keys: name, description, args, concurrency (default 8),
agentScope, and budget: { maxUSD?, maxTokens? }.
Control flow & reliability
when— skip a phase unless an expression is truthy. Supports{refs},== != < > <= >=,&& || !, parentheses, and quoted strings/numbers, e.g."when": "{steps.triage.json.route} == deep". Pair withjoin: "any"on the merge phase to build real if/else routing. Parse errors fail open.join: "any"— an OR-join: the phase runs as soon as one dependency completes (default"all"waits for every dep).retry—{ "max": 2, "backoffMs": 500, "factor": 2 }retries a failing subagent with fixed (factor:1) or exponential backoff; usage is summed and the attempt count shows as↻Nin the TUI.approval— pause for a human (select: Approve / Reject / Edit). Reject halts the flow; Edit injects the typed note as the phase output for downstream steps. Non-interactive runs auto-approve.flow—{ "type": "flow", "use": "deep-research", "with": { "topic": "{item}" } }runs a saved flow as a phase (recursion is detected and rejected).budget— a run-wide{maxUSD, maxTokens}ceiling; once exceeded, pending phases are skipped (and in-flight fan-out stops spawning) and the run isblocked.
output format
output: "text"(default) — the raw subagent output.output: "json"— the subagent output is parsed as JSON and exposed via{steps.ID.json}/{steps.ID.json.field}. Set this on phases whose output a downstreammaporreduceneeds to consume as structured data.
There is no output: "file". For file-based output, have the agent write to
disk with a write tool call.
Gate phases (quality control)
A gate runs an agent to review upstream output and can block the rest
of the workflow. End the gate task's instructions by asking the agent to
emit a verdict the runtime can read:
- a final line
VERDICT: PASSorVERDICT: BLOCK(also acceptsOK,FAIL,STOP,REJECT,HALT— last occurrence wins), or - JSON like
{"continue": false, "reason": "missing auth checks"}/{"verdict": "block", "reason": "..."}.
On BLOCK, downstream phases are skipped and the run ends as blocked with
the reason surfaced. Ambiguous output fails open (treated as PASS) — a gate
never halts the flow by accident.
Review the audit results below. If any endpoint is missing auth, end with
"VERDICT: BLOCK" and a one-line reason; otherwise end with "VERDICT: PASS".
{steps.audit.output}
Interpolation
| placeholder | resolves to |
|---|---|
{args.X} |
invocation argument |
{steps.ID.output} |
a prior phase's text output |
{steps.ID.json} |
prior output parsed as JSON (or {steps.ID.json.field}) |
{item} / {item.field} |
current item inside a map phase |
{previous.output} |
the immediately-upstream phase output |
Commands
Saved flows become CLI shortcuts. All commands work in the pi session:
| Command | What it does |
|---|---|
/tf list |
List all saved flows |
/tf run <name> [args] |
Run a saved flow (e.g. /tf run summarize-files dir=src) |
/tf show <name> |
Print a flow's definition |
/tf runs |
Browse recent run history (interactive TUI) |
/tf resume <runId> |
Continue a paused/failed run — cached phases skip automatically |
/tf:<name> [args] |
Shortcut — runs the flow in one tap |
Tool actions (used by the model): run (inline define or saved name),
save, resume, list.
Storage
.pi/taskflows/<name>.json # project-scoped definitions (commit to share)
~/.pi/agent/taskflows/<name>.json # user-scoped definitions
.pi/taskflows/runs/<runId>.json # run state (resume); gitignore this
Agent discovery scope (set via agentScope in the flow definition):
| value | discovers agents from |
|---|---|
"user" (default) |
~/.pi/agent/agents/*.md |
"project" |
.pi/agents/*.md (walks up the tree) |
"both" |
user + project; project wins on name collision |
Agents
Taskflow reuses your existing pi agent files (~/.pi/agent/agents/*.md,
.pi/agents/*.md). Reference agents by name in a phase or shorthand.
When running a phase, the runtime extracts the agent's systemPrompt from its
.md frontmatter and passes it via --append-system-prompt (written to a temp
file). Phase-level overrides for model, thinking, and tools are passed as
--model / --thinking / --tools flags to the subagent invocation.
Settings from ~/.pi/agent/settings.json (the subagents.agentOverrides map)
are honored, letting you tweak model, thinking, or tools per agent across all flows.
Examples
Ready-to-read definitions live in examples/:
| File | Demonstrates |
|---|---|
summarize-files.json |
discover → map fan-out → reduce |
conditional-research.json |
when routing + join: any + gate + budget |
guarded-refactor.json |
approval (human-in-the-loop) + retry + gate |
To use one, copy it into .pi/taskflows/<name>.json (or
~/.pi/agent/taskflows/) and it registers as /tf:<name> — or just point the
model at the definition.
Status & limits
- v0.0.6 — control flow & reliability: conditional
whenguards,join: anyOR-joins, declarativeretry/backoff,approval(human-in-the-loop) phases,flow(saved sub-flow composition), and run-widebudgetcaps — on top of the DSL + DAG runtime (agent/parallel/map/gate/reduce), inline + saved flows, cross-session resume, live progress, isolated context. Defaultconcurrencyis 8 (set on the flow; per-phaseconcurrencyoverrides for that phase). - A run executes as one streaming tool call (live progress while it runs).
maprequires the upstream phase to emit a JSON array (output: "json").- Gate verdicts are fail-open: if the agent output contains no recognizable
verdict marker (
VERDICT: BLOCK/PASS/OK/FAIL/STOP/REJECT/HALTor{continue: false}/{verdict: "block"}), the gate passes. This prevents an accidental missing verdict from blocking your workflow.
What it doesn't do (yet)
- No detached background execution. A run needs the pi session to stay open. True background execution (and event/cron triggers on top of it) is on the roadmap.
- No
output: "file". Outputs are text/JSON only. Write files via agent tool calls if needed. maprequires a JSON array. Theoverfield must resolve to{steps.ID.json}where the upstream phase emittedoutput: "json". If the source is a plain text list, wrap it in a single-agent phase that outputs JSON.- Cycles are rejected at validation. The DAG must be acyclic.
Development
npm install
npm run typecheck
npm test # unit tests — no network, no process spawning
# real end-to-end (spawns live subagents; needs model access)
npm run test:e2e
Contributing
Contributions welcome! This is a young project — open an issue or PR on
GitHub. Tests live in test/, the
runtime in extensions/.
License
MIT