pi-taskflow

Lightweight workflow orchestration for the Pi coding agent — declarative multi-phase taskflows with dynamic fan-out, isolated subagent context, resumable runs, and saveable commands.

Packages

Package details

extensionskill

Install pi-taskflow from npm and Pi will load the resources declared by the package manifest.

$ pi install npm:pi-taskflow
Package
pi-taskflow
Version
0.0.8
Published
Jun 6, 2026
Downloads
not available
Author
heggria
License
MIT
Types
extension, skill
Size
198.9 KB
Dependencies
0 dependencies · 5 peers
Pi manifest JSON
{
  "extensions": [
    "./extensions/index.ts"
  ],
  "skills": [
    "./skills"
  ]
}

Security note

Pi packages can execute code and influence agent behavior. Review the source before installing third-party packages.

README

Lightweight workflow orchestration for the Pi coding agent.

Orchestrate your Pi subagents. Not by prompting — by declaring.

If you've used the built-in subagent tool's task / tasks / chain, you already know the shorthand — your runs just get tracked, resumable, and saveable as a one-word /tf:<name> command.

pi install npm:pi-taskflow

Fan out one subagent per item, route on results, retry the flaky ones, pause for human approval, cap the spend, and gate the output with an adversarial review — all from one declarative definition. Only the final report reaches your conversation; every intermediate transcript stays in the runtime.

Why

The built-in subagent tool is great for a single delegated task. But when a job needs many coordinated steps, fan-out over dozens of items, cross-checked review, or a repeatable pipeline, you want orchestration — without the intermediate transcripts eating your context window.

pi-taskflow moves the plan into a small declarative definition. The runtime holds the DAG, the loops, and the intermediate results; your context receives only the final phase's output.

subagent tool pi-taskflow
Who drives the model, turn by turn the runtime, from a definition
Intermediate results in your context window in the runtime (not your context)
Reusable re-described each time saved as /tf:<name>
Scale a few tasks dynamic map fan-out
Resumable no yes (cross-session, cached phases skip)
Quality gates no gate phases with VERDICT: BLOCK / PASS
Conditional routing no when guards + join: any OR-joins
Fault tolerance no per-phase retry with backoff
Human-in-the-loop no approval phases (approve / reject / edit)
Cost control no run-wide budget (USD / token caps)
Composition no flow phases run saved sub-flows
Progress visibility opaque while running live DAG render with timing + cost
Ergonomics inline JSON each time shorthand (task/tasks/chain) or DSL

Show me

Describe a pipeline once, then run it from a pi session by name:

/tf:summarize-files dir=src

The runtime fans out one subagent per file, merges the summaries in a reduce phase, and returns only the final overview. Every intermediate transcript stays in the runtime — never in your context window. (Full definition in Quickstart below.)

Quickstart

Shorthand: same effort as subagent, but tracked & resumable

Single task — one agent, one job:

{ "task": "Summarize the architecture of src/", "agent": "explorer" }

Parallel tasks — fire several at once, outputs merge:

{ "tasks": [
  { "task": "Audit auth in src/api",   "agent": "analyst" },
  { "task": "Audit input validation in src/api", "agent": "analyst" }
] }

Chain — sequential, each step sees the previous one's output:

{ "chain": [
  { "task": "List the public API of src/lib", "agent": "scout" },
  { "task": "Write docs for:\n{previous.output}", "agent": "writer" }
] }

agent is optional (defaults to the first available agent). Add name to label the run and enable saving it as a reusable command.

Try it inline — tell the model something like:

Run a chain: first explore the auth flow, then summarize findings.

The model calls the taskflow tool; you get live progress, per-step timing, token cost, and a run record. Ask to save it and you get /tf:<name>.

Then go declarative

When your pipeline outgrows the shorthand — when you need dynamic fan-out, intermediate JSON routing, or quality gates — graduate to the full DSL:

{
  "name": "summarize-files",
  "description": "Discover files, summarize each, produce a report",
  "args": { "dir": { "default": "." } },
  "concurrency": 8,
  "phases": [
    { "id": "discover", "type": "agent", "agent": "scout",
      "task": "List source files under {args.dir} (non-recursive).\nOutput ONLY a JSON array [{\"file\":\"\"}]. No prose.",
      "output": "json" },
    { "id": "summarize", "type": "map",
      "over": "{steps.discover.json}", "as": "item",
      "agent": "scout",
      "task": "Read {item.file} and give a one-sentence summary.",
      "dependsOn": ["discover"] },
    { "id": "report", "type": "reduce", "from": ["summarize"],
      "agent": "writer",
      "task": "Combine into a short overview:\n{steps.summarize.output}",
      "dependsOn": ["summarize"], "final": true }
  ]
}

What this does:

  1. discover — an agent lists every file in the directory and outputs a JSON array.
  2. summarize — a map fans out, spawning one subagent per file in parallel (throttled to 8 concurrent). Each gets {item.file} bound to its file path.
  3. report — a reduce merges all summaries into one clean overview.

Intermediate outputs never enter your context. The runtime owns them. You get only the final report back.

Save it once → /tf:summarize-files forever.

Route, gate, and guard

Phases also branch, retry, pause for a human, and respect a budget — still declaratively, no scripting:

{
  "name": "triage-and-fix",
  "budget": { "maxUSD": 1.5 },
  "phases": [
    { "id": "triage", "type": "agent", "agent": "analyst", "output": "json",
      "task": "Classify the bug. Output ONLY {\"severity\":\"high\"} or {\"severity\":\"low\"}." },
    { "id": "deep",  "when": "{steps.triage.json.severity} == high", "dependsOn": ["triage"],
      "agent": "executor_code", "task": "Root-cause and patch it.",
      "retry": { "max": 2, "backoffMs": 500 } },
    { "id": "quick", "when": "{steps.triage.json.severity} == low",  "dependsOn": ["triage"],
      "agent": "executor_fast", "task": "Apply the quick fix." },
    { "id": "approve", "type": "approval", "join": "any", "dependsOn": ["deep", "quick"],
      "task": "Review the fix before it ships." },
    { "id": "ship", "type": "agent", "dependsOn": ["approve"],
      "task": "Open a PR with the change.", "final": true }
  ]
}
  • when routes to deep or quick from the triage JSON; the other branch is skipped.
  • join: "any" lets approve run as soon as whichever branch fired completes.
  • retry re-runs a flaky patch with backoff; budget halts the whole run if it gets too expensive.
  • approval pauses for a human (approve / reject / edit) before the final ship.

Watch it run

This is the live progress render for a real run — the self-improve flow that writes and verifies its own test suites, caught here mid-block by a quality gate:

⊗ taskflow self-improve  6/7 · blocked · $0.095
    ✓ discover            agent   deepseek-v4-flash  10t ↑38k ↓6.7k $0.011
  ┌ ✓ write-runner-tests  agent   claude-sonnet-4-6  10t ↑13 ↓6.6k $0.020
  ├ ✓ write-store-tests   agent   claude-sonnet-4-6  10t ↑11 ↓10k $0.018
  ├ ✓ write-agents-tests  agent   claude-sonnet-4-6  10t ↑28 ↓13k $0.030
  └ ✓ fix-stability       agent   claude-sonnet-4-6  10t ↑13 ↓3.9k $0.012
    ✓ verify              gate    BLOCK 3 type errors in test files  deepseek-v4-flash
    ⊘ report              reduce  skipped · Gate blocked  ↳ fix-stability

How to read it — the layout is the DAG:

  • Header means the flow is blocked (a gate halted it); 6/7 phases processed, aggregate cost $0.095.
  • Status icons done, running, failed, skipped, pending.
  • Rail ┌ ├ └ — phases in the same DAG layer, running concurrently. The four write-*/fix-stability tasks all fan out from discover. A blank gutter is a single-phase layer.
  • — a long (layer-skipping) dependency. report depends on verify (the adjacent layer, implied by position) and fix-stability two layers back, so only that skip edge is annotated.
  • Gateverify emitted VERDICT: BLOCK, so the runtime skipped report and ended the run as blocked, surfacing the reason.
  • Detail — per phase: model, token counts (in out), cost, and timing. Fan-out phases also show sub-task progress.

Phase types

type meaning required fields
agent one subagent runs a single task task
parallel run branches[] concurrently branches (array of {task, agent?})
map fan out over an array — one subagent per item, {item} bound over, task
gate quality/review step that can halt the flow task
reduce aggregate from[] phase outputs into one from, task
approval human-in-the-loop pause — approve / reject / edit before continuing
flow run a saved sub-flow as one phase (composition/reuse) use

Common phase fields

Every phase needs a unique id and a type (defaults to agent). On top of the per-type fields above:

Field Meaning
agent Agent to run (defaults to the first discovered agent)
dependsOn Phase ids this phase waits for — builds the DAG
join "all" (default) waits for every dep; "any" is an OR-join
when Conditional guard — skip unless the expression is truthy
retry { max, backoffMs?, factor? } — retry a failing subagent
output "text" (default) or "json" (exposes {steps.ID.json})
model / thinking / tools Per-phase overrides for the subagent
cwd Working directory for the subagent
concurrency Fan-out cap for map / parallel (overrides the flow default)
final Marks the result-bearing phase (else the last phase wins)
optional A failure here does not abort the run
use / with (flow) saved sub-flow name + its args

Flow-level keys: name, description, args, concurrency (default 8), agentScope, and budget: { maxUSD?, maxTokens? }.

Control flow & reliability

  • when — skip a phase unless an expression is truthy. Supports {refs}, == != < > <= >=, && || !, parentheses, and quoted strings/numbers, e.g. "when": "{steps.triage.json.route} == deep". Pair with join: "any" on the merge phase to build real if/else routing. Parse errors fail open.
  • join: "any" — an OR-join: the phase runs as soon as one dependency completes (default "all" waits for every dep).
  • retry{ "max": 2, "backoffMs": 500, "factor": 2 } retries a failing subagent with fixed (factor:1) or exponential backoff; usage is summed and the attempt count shows as ↻N in the TUI.
  • approval — pause for a human (select: Approve / Reject / Edit). Reject halts the flow; Edit injects the typed note as the phase output for downstream steps. Non-interactive runs auto-approve.
  • flow{ "type": "flow", "use": "deep-research", "with": { "topic": "{item}" } } runs a saved flow as a phase (recursion is detected and rejected).
  • budget — a run-wide {maxUSD, maxTokens} ceiling; once exceeded, pending phases are skipped (and in-flight fan-out stops spawning) and the run is blocked.

output format

  • output: "text" (default) — the raw subagent output.
  • output: "json" — the subagent output is parsed as JSON and exposed via {steps.ID.json} / {steps.ID.json.field}. Set this on phases whose output a downstream map or reduce needs to consume as structured data.

There is no output: "file". For file-based output, have the agent write to disk with a write tool call.

Gate phases (quality control)

A gate runs an agent to review upstream output and can block the rest of the workflow. End the gate task's instructions by asking the agent to emit a verdict the runtime can read:

  • a final line VERDICT: PASS or VERDICT: BLOCK (also accepts OK, FAIL, STOP, REJECT, HALT — last occurrence wins), or
  • JSON like {"continue": false, "reason": "missing auth checks"} / {"verdict": "block", "reason": "..."}.

On BLOCK, downstream phases are skipped and the run ends as blocked with the reason surfaced. Ambiguous output fails open (treated as PASS) — a gate never halts the flow by accident.

Review the audit results below. If any endpoint is missing auth, end with
"VERDICT: BLOCK" and a one-line reason; otherwise end with "VERDICT: PASS".

{steps.audit.output}

Interpolation

placeholder resolves to
{args.X} invocation argument
{steps.ID.output} a prior phase's text output
{steps.ID.json} prior output parsed as JSON (or {steps.ID.json.field})
{item} / {item.field} current item inside a map phase
{previous.output} the immediately-upstream phase output

Commands

Saved flows become CLI shortcuts. All commands work in the pi session:

Command What it does
/tf list List all saved flows
/tf run <name> [args] Run a saved flow (e.g. /tf run summarize-files dir=src)
/tf show <name> Print a flow's definition
/tf runs Browse recent run history (interactive TUI)
/tf resume <runId> Continue a paused/failed run — cached phases skip automatically
/tf:<name> [args] Shortcut — runs the flow in one tap

Tool actions (used by the model): run (inline define or saved name), save, resume, list.

Storage

.pi/taskflows/<name>.json          # project-scoped definitions (commit to share)
~/.pi/agent/taskflows/<name>.json  # user-scoped definitions
.pi/taskflows/runs/<runId>.json    # run state (resume); gitignore this

Agent discovery scope (set via agentScope in the flow definition):

value discovers agents from
"user" (default) ~/.pi/agent/agents/*.md
"project" .pi/agents/*.md (walks up the tree)
"both" user + project; project wins on name collision

Agents

Taskflow reuses your existing pi agent files (~/.pi/agent/agents/*.md, .pi/agents/*.md). Reference agents by name in a phase or shorthand.

When running a phase, the runtime extracts the agent's systemPrompt from its .md frontmatter and passes it via --append-system-prompt (written to a temp file). Phase-level overrides for model, thinking, and tools are passed as --model / --thinking / --tools flags to the subagent invocation.

Settings from ~/.pi/agent/settings.json (the subagents.agentOverrides map) are honored, letting you tweak model, thinking, or tools per agent across all flows.

Examples

Ready-to-read definitions live in examples/:

File Demonstrates
summarize-files.json discover → map fan-out → reduce
conditional-research.json when routing + join: any + gate + budget
guarded-refactor.json approval (human-in-the-loop) + retry + gate

To use one, copy it into .pi/taskflows/<name>.json (or ~/.pi/agent/taskflows/) and it registers as /tf:<name> — or just point the model at the definition.

Status & limits

  • v0.0.6 — control flow & reliability: conditional when guards, join: any OR-joins, declarative retry/backoff, approval (human-in-the-loop) phases, flow (saved sub-flow composition), and run-wide budget caps — on top of the DSL + DAG runtime (agent/parallel/map/gate/reduce), inline + saved flows, cross-session resume, live progress, isolated context. Default concurrency is 8 (set on the flow; per-phase concurrency overrides for that phase).
  • A run executes as one streaming tool call (live progress while it runs).
  • map requires the upstream phase to emit a JSON array (output: "json").
  • Gate verdicts are fail-open: if the agent output contains no recognizable verdict marker (VERDICT: BLOCK/PASS/OK/FAIL/STOP/REJECT/HALT or {continue: false} / {verdict: "block"}), the gate passes. This prevents an accidental missing verdict from blocking your workflow.

What it doesn't do (yet)

  • No detached background execution. A run needs the pi session to stay open. True background execution (and event/cron triggers on top of it) is on the roadmap.
  • No output: "file". Outputs are text/JSON only. Write files via agent tool calls if needed.
  • map requires a JSON array. The over field must resolve to {steps.ID.json} where the upstream phase emitted output: "json". If the source is a plain text list, wrap it in a single-agent phase that outputs JSON.
  • Cycles are rejected at validation. The DAG must be acyclic.

Development

npm install
npm run typecheck
npm test            # unit tests — no network, no process spawning

# real end-to-end (spawns live subagents; needs model access)
npm run test:e2e

Contributing

Contributions welcome! This is a young project — open an issue or PR on GitHub. Tests live in test/, the runtime in extensions/.

License

MIT