csv-wave-pipeline

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Auto Mode

自动模式

When
--yes
or
-y
: Auto-confirm task decomposition, skip interactive validation, use defaults.
当使用
--yes
-y
参数时:自动确认任务分解,跳过交互式验证,使用默认设置。

CSV Wave Pipeline

CSV波次执行管道

Usage

使用方法

bash
$csv-wave-pipeline "Implement user authentication with OAuth, JWT, and 2FA"
$csv-wave-pipeline -c 4 "Refactor payment module with Stripe and PayPal"
$csv-wave-pipeline -y "Build notification system with email and SMS"
$csv-wave-pipeline --continue "auth-20260228"
Flags:
  • -y, --yes
    : Skip all confirmations (auto mode)
  • -c, --concurrency N
    : Max concurrent agents within each wave (default: 4)
  • --continue
    : Resume existing session
Output Directory:
.workflow/.csv-wave/{session-id}/
Core Output:
tasks.csv
(master state) +
results.csv
(final) +
discoveries.ndjson
(shared exploration) +
context.md
(human-readable report)

bash
$csv-wave-pipeline "实现基于OAuth、JWT和2FA的用户认证"
$csv-wave-pipeline -c 4 "使用Stripe和PayPal重构支付模块"
$csv-wave-pipeline -y "构建包含邮件和短信的通知系统"
$csv-wave-pipeline --continue "auth-20260228"
参数:
  • -y, --yes
    : 跳过所有确认步骤(自动模式)
  • -c, --concurrency N
    : 每个波次内的最大并发Agent数量(默认值:4)
  • --continue
    : 恢复现有会话
输出目录:
.workflow/.csv-wave/{session-id}/
核心输出:
tasks.csv
(主状态文件)+
results.csv
(最终结果)+
discoveries.ndjson
(共享探索记录)+
context.md
(人类可读报告)

Overview

概述

Wave-based batch execution using
spawn_agents_on_csv
with cross-wave context propagation. Tasks are grouped into dependency waves; each wave executes concurrently, and its results feed into the next wave.
Core workflow: Decompose → Compute Waves → Execute Wave-by-Wave → Aggregate
┌─────────────────────────────────────────────────────────────────────────┐
│                    CSV BATCH EXECUTION WORKFLOW                          │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                          │
│  Phase 1: Requirement → CSV                                              │
│     ├─ Parse requirement into subtasks (3-10 tasks)                      │
│     ├─ Identify dependencies (deps column)                               │
│     ├─ Compute dependency waves (topological sort → depth grouping)      │
│     ├─ Generate tasks.csv with wave column                               │
│     └─ User validates task breakdown (skip if -y)                        │
│                                                                          │
│  Phase 2: Wave Execution Engine                                          │
│     ├─ For each wave (1..N):                                             │
│     │   ├─ Build wave CSV (filter rows for this wave)                    │
│     │   ├─ Inject previous wave findings into prev_context column        │
│     │   ├─ spawn_agents_on_csv(wave CSV)                                 │
│     │   ├─ Collect results, merge into master tasks.csv                  │
│     │   └─ Check: any failed? → skip dependents or retry                 │
│     └─ discoveries.ndjson shared across all waves (append-only)          │
│                                                                          │
│  Phase 3: Results Aggregation                                            │
│     ├─ Export final results.csv                                          │
│     ├─ Generate context.md with all findings                             │
│     ├─ Display summary: completed/failed/skipped per wave                │
│     └─ Offer: view results | retry failed | done                         │
│                                                                          │
└─────────────────────────────────────────────────────────────────────────┘

基于波次的批量执行,使用
spawn_agents_on_csv
并支持跨波次上下文传播。任务会被分组为依赖波次;每个波次并发执行,执行结果会传入下一个波次。
核心工作流: 分解需求 → 计算波次 → 按波次执行 → 结果聚合
┌─────────────────────────────────────────────────────────────────────────┐
│                    CSV批量执行工作流                          │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                          │
│  阶段1: 需求 → CSV                                              │
│     ├─ 将需求解析为子任务(3-10个任务)                      │
│     ├─ 识别任务依赖(deps列)                               │
│     ├─ 计算依赖波次(拓扑排序 → 按深度分组)      │
│     ├─ 生成带wave列的tasks.csv                               │
│     └─ 用户验证任务分解结果(若使用-y则跳过)                        │
│                                                                          │
│  阶段2: 波次执行引擎                                          │
│     ├─ 遍历每个波次(1..N):                                             │
│     │   ├─ 构建当前波次的CSV文件(筛选对应波次的任务行)                    │
│     │   ├─ 将上一波次的结果注入prev_context列        │
│     │   ├─ 调用spawn_agents_on_csv(当前波次CSV)                                 │
│     │   ├─ 收集执行结果,合并到主tasks.csv文件                  │
│     │   └─ 检查:是否有任务失败? → 跳过依赖任务或重试                 │
│     └─ discoveries.ndjson在所有波次间共享(仅追加模式)          │
│                                                                          │
│  阶段3: 结果聚合                                            │
│     ├─ 导出最终的results.csv                                          │
│     ├─ 生成包含所有执行结果的context.md文档                             │
│     ├─ 显示执行摘要:每个波次的完成/失败/跳过任务数                │
│     └─ 提供后续操作选项:查看结果 | 重试失败任务 | 结束                         │
│                                                                          │
└─────────────────────────────────────────────────────────────────────────┘

CSV Schema

CSV Schema

tasks.csv (Master State)

tasks.csv(主状态文件)

csv
id,title,description,test,acceptance_criteria,scope,hints,execution_directives,deps,context_from,wave,status,findings,files_modified,tests_passed,acceptance_met,error
"1","Setup auth module","Create auth directory structure and base files","Verify directory exists and base files export expected interfaces","auth/ dir created; index.ts and types.ts export AuthProvider interface","src/auth/**","Follow monorepo module pattern || package.json;src/shared/types.ts","","","","1","","","","","",""
"2","Implement OAuth","Add OAuth provider integration with Google and GitHub","Unit test: mock OAuth callback returns valid token; Integration test: verify redirect URL generation","OAuth login redirects to provider; callback returns JWT; supports Google and GitHub","src/auth/oauth/**","Use passport.js strategy pattern || src/auth/index.ts;docs/oauth-flow.md","Run npm test -- --grep oauth before completion","1","1","2","","","","","",""
"3","Add JWT tokens","Implement JWT generation and validation","Unit test: sign/verify round-trip; Edge test: expired token returns 401","generateToken() returns valid JWT; verifyToken() rejects expired/tampered tokens","src/auth/jwt/**","Use jsonwebtoken library; Set default expiry 1h || src/config/auth.ts","Ensure tsc --noEmit passes","1","1","2","","","","","",""
"4","Setup 2FA","Add TOTP-based 2FA with QR code generation","Unit test: TOTP verify with correct code; Test: QR data URL is valid","QR code generates scannable image; TOTP verification succeeds within time window","src/auth/2fa/**","Use speakeasy + qrcode libraries || src/auth/oauth/strategy.ts;src/auth/jwt/token.ts","Run full test suite: npm test","2;3","1;2;3","3","","","","","",""
Columns:
ColumnPhaseDescription
id
InputUnique task identifier (string)
title
InputShort task title
description
InputDetailed task description — what to implement
test
InputTest cases: what tests to write and how to verify (unit/integration/edge)
acceptance_criteria
InputAcceptance criteria: measurable conditions that define "done"
scope
InputTarget file/directory glob — constrains agent work area, prevents cross-task file conflicts
hints
InputImplementation tips + reference files. Format:
tips text || file1;file2
. Before
||
= how to implement; after
||
= existing files to read before starting. Either part is optional
execution_directives
InputExecution constraints: commands to run for verification, tool restrictions, environment requirements
deps
InputSemicolon-separated dependency task IDs (empty = no deps)
context_from
InputSemicolon-separated task IDs whose findings this task needs
wave
ComputedWave number (computed by topological sort, 1-based)
status
Output
pending
completed
/
failed
/
skipped
findings
OutputKey discoveries or implementation notes (max 500 chars)
files_modified
OutputSemicolon-separated file paths
tests_passed
OutputWhether all defined test cases passed (true/false)
acceptance_met
OutputSummary of which acceptance criteria were met/unmet
error
OutputError message if failed (empty if success)
csv
id,title,description,test,acceptance_criteria,scope,hints,execution_directives,deps,context_from,wave,status,findings,files_modified,tests_passed,acceptance_met,error
"1","Setup auth module","Create auth directory structure and base files","Verify directory exists and base files export expected interfaces","auth/ dir created; index.ts and types.ts export AuthProvider interface","src/auth/**","Follow monorepo module pattern || package.json;src/shared/types.ts","","","","1","","","","","",""
"2","Implement OAuth","Add OAuth provider integration with Google and GitHub","Unit test: mock OAuth callback returns valid token; Integration test: verify redirect URL generation","OAuth login redirects to provider; callback returns JWT; supports Google and GitHub","src/auth/oauth/**","Use passport.js strategy pattern || src/auth/index.ts;docs/oauth-flow.md","Run npm test -- --grep oauth before completion","1","1","2","","","","","",""
"3","Add JWT tokens","Implement JWT generation and validation","Unit test: sign/verify round-trip; Edge test: expired token returns 401","generateToken() returns valid JWT; verifyToken() rejects expired/tampered tokens","src/auth/jwt/**","Use jsonwebtoken library; Set default expiry 1h || src/config/auth.ts","Ensure tsc --noEmit passes","1","1","2","","","","","",""
"4","Setup 2FA","Add TOTP-based 2FA with QR code generation","Unit test: TOTP verify with correct code; Test: QR data URL is valid","QR code generates scannable image; TOTP verification succeeds within time window","src/auth/2fa/**","Use speakeasy + qrcode libraries || src/auth/oauth/strategy.ts;src/auth/jwt/token.ts","Run full test suite: npm test","2;3","1;2;3","3","","","","","",""
列说明:
列名所属阶段描述
id
输入唯一任务标识符(字符串)
title
输入简短任务标题
description
输入详细任务描述 — 需要实现的内容
test
输入测试用例:需要编写的测试及验证方式(单元/集成/边界测试)
acceptance_criteria
输入验收标准:定义任务“完成”的可衡量条件
scope
输入目标文件/目录通配符 — 限制Agent的工作范围,避免跨任务文件冲突
hints
输入实现提示 + 参考文件。格式:
提示文本 || 文件1;文件2
||
之前的内容为实现建议;之后的内容为开始前需要阅读的现有文件。两部分均可省略
execution_directives
输入执行约束:用于验证的命令、工具限制、环境要求
deps
输入分号分隔的依赖任务ID(为空表示无依赖)
context_from
输入分号分隔的任务ID,当前任务需要这些任务的执行结果
wave
计算生成波次编号(通过拓扑排序计算,从1开始)
status
输出
pending
completed
/
failed
/
skipped
findings
输出关键发现或实现说明(最多500字符)
files_modified
输出分号分隔的文件路径
tests_passed
输出是否所有定义的测试用例都通过(true/false)
acceptance_met
输出验收标准达成情况的摘要
error
输出任务失败时的错误信息(成功则为空)

Per-Wave CSV (Temporary)

单波次临时CSV文件

Each wave generates a temporary
wave-{N}.csv
with an extra
prev_context
column:
csv
id,title,description,test,acceptance_criteria,scope,hints,execution_directives,deps,context_from,wave,prev_context
"2","Implement OAuth","Add OAuth integration","Unit test: mock OAuth callback returns valid token","OAuth login redirects to provider; callback returns JWT","src/auth/oauth/**","Use passport.js strategy pattern || src/auth/index.ts;docs/oauth-flow.md","Run npm test -- --grep oauth","1","1","2","[Task 1] Created auth/ with index.ts and types.ts"
"3","Add JWT tokens","Implement JWT","Unit test: sign/verify round-trip; Edge test: expired token returns 401","generateToken() returns valid JWT; verifyToken() rejects expired/tampered tokens","src/auth/jwt/**","Use jsonwebtoken library; Set default expiry 1h || src/config/auth.ts","Ensure tsc --noEmit passes","1","1","2","[Task 1] Created auth/ with index.ts and types.ts"
The
prev_context
column is built from
context_from
by looking up completed tasks'
findings
in the master CSV.

每个波次会生成一个临时的
wave-{N}.csv
文件,包含额外的
prev_context
列:
csv
id,title,description,test,acceptance_criteria,scope,hints,execution_directives,deps,context_from,wave,prev_context
"2","Implement OAuth","Add OAuth integration","Unit test: mock OAuth callback returns valid token","OAuth login redirects to provider; callback returns JWT","src/auth/oauth/**","Use passport.js strategy pattern || src/auth/index.ts;docs/oauth-flow.md","Run npm test -- --grep oauth","1","1","2","[Task 1] Created auth/ with index.ts and types.ts"
"3","Add JWT tokens","Implement JWT","Unit test: sign/verify round-trip; Edge test: expired token returns 401","generateToken() returns valid JWT; verifyToken() rejects expired/tampered tokens","src/auth/jwt/**","Use jsonwebtoken library; Set default expiry 1h || src/config/auth.ts","Ensure tsc --noEmit passes","1","1","2","[Task 1] Created auth/ with index.ts and types.ts"
prev_context
列的值通过主CSV文件中已完成任务的
findings
字段,结合当前任务的
context_from
列生成。

Output Artifacts

输出产物

FilePurposeLifecycle
tasks.csv
Master state — all tasks with status/findingsUpdated after each wave
wave-{N}.csv
Per-wave input (temporary)Created before wave, deleted after
results.csv
Final export of all task resultsCreated in Phase 3
discoveries.ndjson
Shared exploration board across all agentsAppend-only, carries across waves
context.md
Human-readable execution reportCreated in Phase 3

文件用途生命周期
tasks.csv
主状态文件 — 包含所有任务的状态和执行结果每个波次执行后更新
wave-{N}.csv
单波次输入文件(临时文件)波次执行前创建,执行后删除
results.csv
所有任务结果的最终导出文件阶段3生成
discoveries.ndjson
所有Agent共享的探索记录板仅追加模式,跨波次保留
context.md
人类可读的执行报告阶段3生成

Session Structure

会话结构

.workflow/.csv-wave/{session-id}/
├── tasks.csv                  # Master state (updated per wave)
├── results.csv                # Final results export
├── discoveries.ndjson         # Shared discovery board (all agents)
├── context.md                 # Human-readable report
└── wave-{N}.csv               # Temporary per-wave input (cleaned up)

.workflow/.csv-wave/{session-id}/
├── tasks.csv                  # 主状态文件(每个波次执行后更新)
├── results.csv                # 最终结果导出文件
├── discoveries.ndjson         # 共享探索记录板(所有Agent共享)
├── context.md                 # 人类可读报告
└── wave-{N}.csv               # 临时单波次输入文件(执行后清理)

Implementation

实现细节

Session Initialization

会话初始化

javascript
const getUtc8ISOString = () => new Date(Date.now() + 8 * 60 * 60 * 1000).toISOString()

// Parse flags
const AUTO_YES = $ARGUMENTS.includes('--yes') || $ARGUMENTS.includes('-y')
const continueMode = $ARGUMENTS.includes('--continue')
const concurrencyMatch = $ARGUMENTS.match(/(?:--concurrency|-c)\s+(\d+)/)
const maxConcurrency = concurrencyMatch ? parseInt(concurrencyMatch[1]) : 4

// Clean requirement text (remove flags)
const requirement = $ARGUMENTS
  .replace(/--yes|-y|--continue|--concurrency\s+\d+|-c\s+\d+/g, '')
  .trim()

const slug = requirement.toLowerCase()
  .replace(/[^a-z0-9\u4e00-\u9fa5]+/g, '-')
  .substring(0, 40)
const dateStr = getUtc8ISOString().substring(0, 10).replace(/-/g, '')
const sessionId = `cwp-${slug}-${dateStr}`
const sessionFolder = `.workflow/.csv-wave/${sessionId}`

// Continue mode: find existing session
if (continueMode) {
  const existing = Bash(`ls -t .workflow/.csv-wave/ 2>/dev/null | head -1`).trim()
  if (existing) {
    sessionId = existing
    sessionFolder = `.workflow/.csv-wave/${sessionId}`
    // Read existing tasks.csv, find incomplete waves, resume from there
    const existingCsv = Read(`${sessionFolder}/tasks.csv`)
    // → jump to Phase 2 with remaining waves
  }
}

Bash(`mkdir -p ${sessionFolder}`)

javascript
const getUtc8ISOString = () => new Date(Date.now() + 8 * 60 * 60 * 1000).toISOString()

// 解析命令行参数
const AUTO_YES = $ARGUMENTS.includes('--yes') || $ARGUMENTS.includes('-y')
const continueMode = $ARGUMENTS.includes('--continue')
const concurrencyMatch = $ARGUMENTS.match(/(?:--concurrency|-c)\s+(\d+)/)
const maxConcurrency = concurrencyMatch ? parseInt(concurrencyMatch[1]) : 4

// 清理需求文本(移除命令行参数)
const requirement = $ARGUMENTS
  .replace(/--yes|-y|--continue|--concurrency\s+\d+|-c\s+\d+/g, '')
  .trim()

const slug = requirement.toLowerCase()
  .replace(/[^a-z0-9\u4e00-\u9fa5]+/g, '-')
  .substring(0, 40)
const dateStr = getUtc8ISOString().substring(0, 10).replace(/-/g, '')
const sessionId = `cwp-${slug}-${dateStr}`
const sessionFolder = `.workflow/.csv-wave/${sessionId}`

// 恢复模式:查找现有会话
if (continueMode) {
  const existing = Bash(`ls -t .workflow/.csv-wave/ 2>/dev/null | head -1`).trim()
  if (existing) {
    sessionId = existing
    sessionFolder = `.workflow/.csv-wave/${sessionId}`
    // 读取现有tasks.csv,找到未完成的波次,从该位置恢复执行
    const existingCsv = Read(`${sessionFolder}/tasks.csv`)
    // → 跳转到阶段2,继续执行剩余波次
  }
}

Bash(`mkdir -p ${sessionFolder}`)

Phase 1: Requirement → CSV

阶段1: 需求 → CSV

Objective: Decompose requirement into tasks, compute dependency waves, generate tasks.csv.
Steps:
  1. Decompose Requirement
    javascript
    // Use ccw cli to decompose requirement into subtasks
    Bash({
      command: `ccw cli -p "PURPOSE: Decompose requirement into 3-10 atomic tasks for batch agent execution. Each task must include implementation description, test cases, and acceptance criteria.
TASK: • Parse requirement into independent subtasks • Identify dependencies between tasks (which must complete before others) • Identify context flow (which tasks need previous tasks' findings) • For each task, define concrete test cases (unit/integration/edge) • For each task, define measurable acceptance criteria (what defines 'done') • Each task must be executable by a single agent with file read/write access MODE: analysis CONTEXT: @**/* EXPECTED: JSON object with tasks array. Each task: {id: string, title: string, description: string, test: string, acceptance_criteria: string, scope: string, hints: string, execution_directives: string, deps: string[], context_from: string[]}.
  • description: what to implement (specific enough for an agent to execute independently)
  • test: what tests to write and how to verify (e.g. 'Unit test: X returns Y; Edge test: handles Z')
  • acceptance_criteria: measurable conditions that define done (e.g. 'API returns 200; token expires after 1h')
  • scope: target file/directory glob (e.g. 'src/auth/**') — tasks in same wave MUST have non-overlapping scopes
  • hints: implementation tips + reference files, format '<tips> || <ref_file1>;<ref_file2>' (e.g. 'Use strategy pattern || src/base/Strategy.ts;docs/design.md')
  • execution_directives: commands to run for verification or tool constraints (e.g. 'Run npm test --bail; Ensure tsc passes')
  • deps: task IDs that must complete first
  • context_from: task IDs whose findings are needed CONSTRAINTS: 3-10 tasks | Each task is atomic | No circular deps | test and acceptance_criteria must be concrete and verifiable | Same-wave tasks must have non-overlapping scopes
REQUIREMENT: ${requirement}" --tool gemini --mode analysis --rule planning-breakdown-task-steps`, run_in_background: true }) // Wait for CLI completion via hook callback // Parse JSON from CLI output → decomposedTasks[]

2. **Compute Waves** (Topological Sort → Depth Grouping)

```javascript
function computeWaves(tasks) {
  // Build adjacency: task.deps → predecessors
  const taskMap = new Map(tasks.map(t => [t.id, t]))
  const inDegree = new Map(tasks.map(t => [t.id, 0]))
  const adjList = new Map(tasks.map(t => [t.id, []]))

  for (const task of tasks) {
    for (const dep of task.deps) {
      if (taskMap.has(dep)) {
        adjList.get(dep).push(task.id)
        inDegree.set(task.id, inDegree.get(task.id) + 1)
      }
    }
  }

  // BFS-based topological sort with depth tracking
  const queue = []  // [taskId, depth]
  const waveAssignment = new Map()

  for (const [id, deg] of inDegree) {
    if (deg === 0) {
      queue.push([id, 1])
      waveAssignment.set(id, 1)
    }
  }

  let maxWave = 1
  let idx = 0
  while (idx < queue.length) {
    const [current, depth] = queue[idx++]
    for (const next of adjList.get(current)) {
      const newDeg = inDegree.get(next) - 1
      inDegree.set(next, newDeg)
      const nextDepth = Math.max(waveAssignment.get(next) || 0, depth + 1)
      waveAssignment.set(next, nextDepth)
      if (newDeg === 0) {
        queue.push([next, nextDepth])
        maxWave = Math.max(maxWave, nextDepth)
      }
    }
  }

  // Detect cycles: any task without wave assignment
  for (const task of tasks) {
    if (!waveAssignment.has(task.id)) {
      throw new Error(`Circular dependency detected involving task ${task.id}`)
    }
  }

  return { waveAssignment, maxWave }
}

const { waveAssignment, maxWave } = computeWaves(decomposedTasks)
  1. Generate tasks.csv
    javascript
    const header = 'id,title,description,test,acceptance_criteria,scope,hints,execution_directives,deps,context_from,wave,status,findings,files_modified,tests_passed,acceptance_met,error'
    const rows = decomposedTasks.map(task => {
      const wave = waveAssignment.get(task.id)
      return [
        task.id,
        csvEscape(task.title),
        csvEscape(task.description),
        csvEscape(task.test),
        csvEscape(task.acceptance_criteria),
        csvEscape(task.scope),
        csvEscape(task.hints),
        csvEscape(task.execution_directives),
        task.deps.join(';'),
        task.context_from.join(';'),
        wave,
        'pending',  // status
        '',         // findings
        '',         // files_modified
        '',         // tests_passed
        '',         // acceptance_met
        ''          // error
      ].map(cell => `"${String(cell).replace(/"/g, '""')}"`).join(',')
    })
    
    Write(`${sessionFolder}/tasks.csv`, [header, ...rows].join('\n'))
  2. User Validation (skip if AUTO_YES)
    javascript
    if (!AUTO_YES) {
      // Display task breakdown with wave assignment
      console.log(`\n## Task Breakdown (${decomposedTasks.length} tasks, ${maxWave} waves)\n`)
      for (let w = 1; w <= maxWave; w++) {
        const waveTasks = decomposedTasks.filter(t => waveAssignment.get(t.id) === w)
        console.log(`### Wave ${w} (${waveTasks.length} tasks, concurrent)`)
        waveTasks.forEach(t => console.log(`  - [${t.id}] ${t.title}`))
      }
    
      const answer = AskUserQuestion({
        questions: [{
          question: "Approve task breakdown?",
          header: "Validation",
          multiSelect: false,
          options: [
            { label: "Approve", description: "Proceed with wave execution" },
            { label: "Modify", description: `Edit ${sessionFolder}/tasks.csv manually, then --continue` },
            { label: "Cancel", description: "Abort" }
          ]
        }]
      })  // BLOCKS
    
      if (answer.Validation === "Modify") {
        console.log(`Edit: ${sessionFolder}/tasks.csv\nResume: $csv-wave-pipeline --continue`)
        return
      } else if (answer.Validation === "Cancel") {
        return
      }
    }
Success Criteria:
  • tasks.csv created with valid schema and wave assignments
  • No circular dependencies
  • User approved (or AUTO_YES)

目标: 将需求分解为任务,计算依赖波次,生成tasks.csv文件。
步骤:
  1. 分解需求
    javascript
    // 使用ccw cli将需求分解为子任务
    Bash({
      command: `ccw cli -p "PURPOSE: Decompose requirement into 3-10 atomic tasks for batch agent execution. Each task must include implementation description, test cases, and acceptance criteria.
TASK: • Parse requirement into independent subtasks • Identify dependencies between tasks (which must complete before others) • Identify context flow (which tasks need previous tasks' findings) • For each task, define concrete test cases (unit/integration/edge) • For each task, define measurable acceptance criteria (what defines 'done') • Each task must be executable by a single agent with file read/write access MODE: analysis CONTEXT: @**/* EXPECTED: JSON object with tasks array. Each task: {id: string, title: string, description: string, test: string, acceptance_criteria: string, scope: string, hints: string, execution_directives: string, deps: string[], context_from: string[]}.
  • description: what to implement (specific enough for an agent to execute independently)
  • test: what tests to write and how to verify (e.g. 'Unit test: X returns Y; Edge test: handles Z')
  • acceptance_criteria: measurable conditions that define done (e.g. 'API returns 200; token expires after 1h')
  • scope: target file/directory glob (e.g. 'src/auth/**') — tasks in same wave MUST have non-overlapping scopes
  • hints: implementation tips + reference files, format '<tips> || <ref_file1>;<ref_file2>' (e.g. 'Use strategy pattern || src/base/Strategy.ts;docs/design.md')
  • execution_directives: commands to run for verification or tool constraints (e.g. 'Run npm test --bail; Ensure tsc passes')
  • deps: task IDs that must complete first
  • context_from: task IDs whose findings are needed CONSTRAINTS: 3-10 tasks | Each task is atomic | No circular deps | test and acceptance_criteria must be concrete and verifiable | Same-wave tasks must have non-overlapping scopes
REQUIREMENT: ${requirement}" --tool gemini --mode analysis --rule planning-breakdown-task-steps`, run_in_background: true }) // 通过钩子回调等待CLI执行完成 // 解析CLI输出的JSON → decomposedTasks[]

2. **计算波次**(拓扑排序 → 深度分组)

```javascript
function computeWaves(tasks) {
  // 构建邻接表:task.deps → 前置任务
  const taskMap = new Map(tasks.map(t => [t.id, t]))
  const inDegree = new Map(tasks.map(t => [t.id, 0]))
  const adjList = new Map(tasks.map(t => [t.id, []]))

  for (const task of tasks) {
    for (const dep of task.deps) {
      if (taskMap.has(dep)) {
        adjList.get(dep).push(task.id)
        inDegree.set(task.id, inDegree.get(task.id) + 1)
      }
    }
  }

  // 基于BFS的拓扑排序,同时跟踪深度
  const queue = []  // [taskId, depth]
  const waveAssignment = new Map()

  for (const [id, deg] of inDegree) {
    if (deg === 0) {
      queue.push([id, 1])
      waveAssignment.set(id, 1)
    }
  }

  let maxWave = 1
  let idx = 0
  while (idx < queue.length) {
    const [current, depth] = queue[idx++]
    for (const next of adjList.get(current)) {
      const newDeg = inDegree.get(next) - 1
      inDegree.set(next, newDeg)
      const nextDepth = Math.max(waveAssignment.get(next) || 0, depth + 1)
      waveAssignment.set(next, nextDepth)
      if (newDeg === 0) {
        queue.push([next, nextDepth])
        maxWave = Math.max(maxWave, nextDepth)
      }
    }
  }

  // 检测循环依赖:若存在未分配波次的任务则报错
  for (const task of tasks) {
    if (!waveAssignment.has(task.id)) {
      throw new Error(`Circular dependency detected involving task ${task.id}`)
    }
  }

  return { waveAssignment, maxWave }
}

const { waveAssignment, maxWave } = computeWaves(decomposedTasks)
  1. 生成tasks.csv文件
    javascript
    const header = 'id,title,description,test,acceptance_criteria,scope,hints,execution_directives,deps,context_from,wave,status,findings,files_modified,tests_passed,acceptance_met,error'
    const rows = decomposedTasks.map(task => {
      const wave = waveAssignment.get(task.id)
      return [
        task.id,
        csvEscape(task.title),
        csvEscape(task.description),
        csvEscape(task.test),
        csvEscape(task.acceptance_criteria),
        csvEscape(task.scope),
        csvEscape(task.hints),
        csvEscape(task.execution_directives),
        task.deps.join(';'),
        task.context_from.join(';'),
        wave,
        'pending',  // status
        '',         // findings
        '',         // files_modified
        '',         // tests_passed
        '',         // acceptance_met
        ''          // error
      ].map(cell => `"${String(cell).replace(/"/g, '""')}"`).join(',')
    })
    
    Write(`${sessionFolder}/tasks.csv`, [header, ...rows].join('\n'))
  2. 用户验证(若使用AUTO_YES则跳过)
    javascript
    if (!AUTO_YES) {
      // 显示带波次分配的任务分解结果
      console.log(`\n## 任务分解结果(共${decomposedTasks.length}个任务,${maxWave}个波次)\n`)
      for (let w = 1; w <= maxWave; w++) {
        const waveTasks = decomposedTasks.filter(t => waveAssignment.get(t.id) === w)
        console.log(`### 波次 ${w}(共${waveTasks.length}个任务,支持并发执行)`)
        waveTasks.forEach(t => console.log(`  - [${t.id}] ${t.title}`))
      }
    
      const answer = AskUserQuestion({
        questions: [{
          question: "是否批准该任务分解结果?",
          header: "验证步骤",
          multiSelect: false,
          options: [
            { label: "批准", description: "继续执行波次任务" },
            { label: "修改", description: `手动编辑${sessionFolder}/tasks.csv文件,之后使用--continue参数恢复执行` },
            { label: "取消", description: "终止执行" }
          ]
        }]
      })  // 阻塞等待用户输入
    
      if (answer.验证步骤 === "修改") {
        console.log(`编辑路径: ${sessionFolder}/tasks.csv\n恢复执行: $csv-wave-pipeline --continue`)
        return
      } else if (answer.验证步骤 === "取消") {
        return
      }
    }
成功标准:
  • 生成符合Schema且包含波次分配的tasks.csv文件
  • 无循环依赖
  • 用户已批准(或使用了AUTO_YES参数)

Phase 2: Wave Execution Engine

阶段2: 波次执行引擎

Objective: Execute tasks wave-by-wave via
spawn_agents_on_csv
. Each wave sees previous waves' results.
Steps:
  1. Wave Loop
    javascript
    const failedIds = new Set()
    const skippedIds = new Set()
    
    for (let wave = 1; wave <= maxWave; wave++) {
      console.log(`\n## Wave ${wave}/${maxWave}\n`)
    
      // 1. Read current master CSV
      const masterCsv = parseCsv(Read(`${sessionFolder}/tasks.csv`))
    
      // 2. Filter tasks for this wave
      const waveTasks = masterCsv.filter(row => parseInt(row.wave) === wave)
    
      // 3. Skip tasks whose deps failed
      const executableTasks = []
      for (const task of waveTasks) {
        const deps = task.deps.split(';').filter(Boolean)
        if (deps.some(d => failedIds.has(d) || skippedIds.has(d))) {
          skippedIds.add(task.id)
          // Update master CSV: mark as skipped
          updateMasterCsvRow(sessionFolder, task.id, {
            status: 'skipped',
            error: 'Dependency failed or skipped'
          })
          console.log(`  [${task.id}] ${task.title} → SKIPPED (dependency failed)`)
          continue
        }
        executableTasks.push(task)
      }
    
      if (executableTasks.length === 0) {
        console.log(`  No executable tasks in wave ${wave}`)
        continue
      }
    
      // 4. Build prev_context for each task
      for (const task of executableTasks) {
        const contextIds = task.context_from.split(';').filter(Boolean)
        const prevFindings = contextIds
          .map(id => {
            const prevRow = masterCsv.find(r => r.id === id)
            if (prevRow && prevRow.status === 'completed' && prevRow.findings) {
              return `[Task ${id}: ${prevRow.title}] ${prevRow.findings}`
            }
            return null
          })
          .filter(Boolean)
          .join('\n')
        task.prev_context = prevFindings || 'No previous context available'
      }
    
      // 5. Write wave CSV
      const waveHeader = 'id,title,description,test,acceptance_criteria,scope,hints,execution_directives,deps,context_from,wave,prev_context'
      const waveRows = executableTasks.map(t =>
        [t.id, t.title, t.description, t.test, t.acceptance_criteria, t.scope, t.hints, t.execution_directives, t.deps, t.context_from, t.wave, t.prev_context]
          .map(cell => `"${String(cell).replace(/"/g, '""')}"`)
          .join(',')
      )
      Write(`${sessionFolder}/wave-${wave}.csv`, [waveHeader, ...waveRows].join('\n'))
    
      // 6. Execute wave
      console.log(`  Executing ${executableTasks.length} tasks (concurrency: ${maxConcurrency})...`)
    
      const waveResult = spawn_agents_on_csv({
        csv_path: `${sessionFolder}/wave-${wave}.csv`,
        id_column: "id",
        instruction: buildInstructionTemplate(sessionFolder, wave),
        max_concurrency: maxConcurrency,
        max_runtime_seconds: 600,
        output_csv_path: `${sessionFolder}/wave-${wave}-results.csv`,
        output_schema: {
          type: "object",
          properties: {
            id: { type: "string" },
            status: { type: "string", enum: ["completed", "failed"] },
            findings: { type: "string" },
            files_modified: { type: "array", items: { type: "string" } },
            tests_passed: { type: "boolean" },
            acceptance_met: { type: "string" },
            error: { type: "string" }
          },
          required: ["id", "status", "findings", "tests_passed"]
        }
      })
      // ↑ Blocks until all agents in this wave complete
    
      // 7. Merge results into master CSV
      const waveResults = parseCsv(Read(`${sessionFolder}/wave-${wave}-results.csv`))
      for (const result of waveResults) {
        updateMasterCsvRow(sessionFolder, result.id, {
          status: result.status,
          findings: result.findings || '',
          files_modified: (result.files_modified || []).join(';'),
          tests_passed: String(result.tests_passed ?? ''),
          acceptance_met: result.acceptance_met || '',
          error: result.error || ''
        })
    
        if (result.status === 'failed') {
          failedIds.add(result.id)
          console.log(`  [${result.id}] ${result.title} → FAILED: ${result.error}`)
        } else {
          console.log(`  [${result.id}] ${result.title} → COMPLETED`)
        }
      }
    
      // 8. Cleanup temporary wave CSV
      Bash(`rm -f "${sessionFolder}/wave-${wave}.csv"`)
    
      console.log(`  Wave ${wave} done: ${waveResults.filter(r => r.status === 'completed').length} completed, ${waveResults.filter(r => r.status === 'failed').length} failed`)
    }
  2. Instruction Template Builder
    javascript
    function buildInstructionTemplate(sessionFolder, wave) {
      return `
目标: 通过
spawn_agents_on_csv
按波次执行任务。每个波次可获取上一波次的执行结果。
步骤:
  1. 波次循环执行
    javascript
    const failedIds = new Set()
    const skippedIds = new Set()
    
    for (let wave = 1; wave <= maxWave; wave++) {
      console.log(`\n## 执行波次 ${wave}/${maxWave}\n`)
    
      // 1. 读取当前主CSV文件
      const masterCsv = parseCsv(Read(`${sessionFolder}/tasks.csv`))
    
      // 2. 筛选当前波次的任务
      const waveTasks = masterCsv.filter(row => parseInt(row.wave) === wave)
    
      // 3. 跳过依赖任务已失败的任务
      const executableTasks = []
      for (const task of waveTasks) {
        const deps = task.deps.split(';').filter(Boolean)
        if (deps.some(d => failedIds.has(d) || skippedIds.has(d))) {
          skippedIds.add(task.id)
          // 更新主CSV文件:标记为已跳过
          updateMasterCsvRow(sessionFolder, task.id, {
            status: 'skipped',
            error: '依赖任务失败或已跳过'
          })
          console.log(`  [${task.id}] ${task.title} → 已跳过(依赖任务失败)`)
          continue
        }
        executableTasks.push(task)
      }
    
      if (executableTasks.length === 0) {
        console.log(`  当前波次${wave}无可执行任务`)
        continue
      }
    
      // 4. 为每个任务构建prev_context
      for (const task of executableTasks) {
        const contextIds = task.context_from.split(';').filter(Boolean)
        const prevFindings = contextIds
          .map(id => {
            const prevRow = masterCsv.find(r => r.id === id)
            if (prevRow && prevRow.status === 'completed' && prevRow.findings) {
              return `[任务 ${id}: ${prevRow.title}] ${prevRow.findings}`
            }
            return null
          })
          .filter(Boolean)
          .join('\n')
        task.prev_context = prevFindings || '无前置上下文可用'
      }
    
      // 5. 写入当前波次的CSV文件
      const waveHeader = 'id,title,description,test,acceptance_criteria,scope,hints,execution_directives,deps,context_from,wave,prev_context'
      const waveRows = executableTasks.map(t =>
        [t.id, t.title, t.description, t.test, t.acceptance_criteria, t.scope, t.hints, t.execution_directives, t.deps, t.context_from, t.wave, t.prev_context]
          .map(cell => `"${String(cell).replace(/"/g, '""')}"`)
          .join(',')
      )
      Write(`${sessionFolder}/wave-${wave}.csv`, [waveHeader, ...waveRows].join('\n'))
    
      // 6. 执行当前波次
      console.log(`  执行${executableTasks.length}个任务(并发数: ${maxConcurrency})...`)
    
      const waveResult = spawn_agents_on_csv({
        csv_path: `${sessionFolder}/wave-${wave}.csv`,
        id_column: "id",
        instruction: buildInstructionTemplate(sessionFolder, wave),
        max_concurrency: maxConcurrency,
        max_runtime_seconds: 600,
        output_csv_path: `${sessionFolder}/wave-${wave}-results.csv`,
        output_schema: {
          type: "object",
          properties: {
            id: { type: "string" },
            status: { type: "string", enum: ["completed", "failed"] },
            findings: { type: "string" },
            files_modified: { type: "array", items: { type: "string" } },
            tests_passed: { type: "boolean" },
            acceptance_met: { type: "string" },
            error: { type: "string" }
          },
          required: ["id", "status", "findings", "tests_passed"]
        }
      })
      // ↑ 阻塞直到当前波次所有Agent执行完成
    
      // 7. 将执行结果合并到主CSV文件
      const waveResults = parseCsv(Read(`${sessionFolder}/wave-${wave}-results.csv`))
      for (const result of waveResults) {
        updateMasterCsvRow(sessionFolder, result.id, {
          status: result.status,
          findings: result.findings || '',
          files_modified: (result.files_modified || []).join(';'),
          tests_passed: String(result.tests_passed ?? ''),
          acceptance_met: result.acceptance_met || '',
          error: result.error || ''
        })
    
        if (result.status === 'failed') {
          failedIds.add(result.id)
          console.log(`  [${result.id}] ${result.title} → 执行失败: ${result.error}`)
        } else {
          console.log(`  [${result.id}] ${result.title} → 执行完成`)
        }
      }
    
      // 8. 清理临时波次CSV文件
      Bash(`rm -f "${sessionFolder}/wave-${wave}.csv"`)
    
      console.log(`  波次${wave}执行完成: ${waveResults.filter(r => r.status === 'completed').length}个任务完成,${waveResults.filter(r => r.status === 'failed').length}个任务失败`)
    }
  2. 指令模板构建器
    javascript
    function buildInstructionTemplate(sessionFolder, wave) {
      return `

TASK ASSIGNMENT

任务分配

MANDATORY FIRST STEPS

必须优先执行的步骤

  1. Read shared discoveries: ${sessionFolder}/discoveries.ndjson (if exists, skip if not)
  2. Read project context: .workflow/project-tech.json (if exists)

  1. 读取共享探索记录: ${sessionFolder}/discoveries.ndjson(若存在,不存在则跳过)
  2. 读取项目上下文: .workflow/project-tech.json(若存在)

Your Task

你的任务

Task ID: {id} Title: {title} Description: {description} Scope: {scope}
任务ID: {id} 标题: {title} 描述: {description} 工作范围: {scope}

Implementation Hints & Reference Files

实现提示与参考文件

{hints}
Format: `<tips> || <ref_file1>;<ref_file2>`. Read ALL reference files (after ||) before starting implementation. Apply tips (before ||) as implementation guidance.
{hints}
格式说明: `<提示内容> || <参考文件1>;<参考文件2>`。开始实现前请阅读`||`之后列出的所有参考文件。将`||`之前的内容作为实现指导。

Execution Directives

执行约束

{execution_directives}
Commands to run for verification, tool restrictions, or environment requirements. Follow these constraints during and after implementation.
{execution_directives}
用于验证的命令、工具限制或环境要求。在实现过程及完成后请遵循这些约束。

Test Cases

测试用例

{test}
{test}

Acceptance Criteria

验收标准

{acceptance_criteria}
{acceptance_criteria}

Previous Tasks' Findings (Context)

前置任务执行结果(上下文)

{prev_context}

{prev_context}

Execution Protocol

执行协议

  1. Read references: Parse {hints} — read all files listed after `||` to understand existing patterns
  2. Read discoveries: Load ${sessionFolder}/discoveries.ndjson for shared exploration findings
  3. Use context: Apply previous tasks' findings from prev_context above
  4. Stay in scope: ONLY create/modify files within {scope} — do NOT touch files outside this boundary
  5. Apply hints: Follow implementation tips from {hints} (before `||`)
  6. Execute: Implement the task as described
  7. Write tests: Implement the test cases defined above
  8. Run directives: Execute commands from {execution_directives} to verify your work
  9. Verify acceptance: Ensure all acceptance criteria are met before reporting completion
  10. Share discoveries: Append exploration findings to shared board: ```bash echo '{"ts":"<ISO8601>","worker":"{id}","type":"<type>","data":{...}}' >> ${sessionFolder}/discoveries.ndjson ```
  11. Report result: Return JSON via report_agent_job_result
  1. 读取参考文件: 解析{hints} — 阅读`||`之后列出的所有文件,了解现有代码模式
  2. 读取探索记录: 加载${sessionFolder}/discoveries.ndjson获取共享探索结果
  3. 使用上下文: 应用上述prev_context中的前置任务执行结果
  4. 遵守工作范围: 仅在{scope}范围内创建/修改文件 — 请勿触碰该范围外的文件
  5. 遵循实现提示: 按照{hints}中`||`之前的实现建议执行
  6. 执行任务: 按照描述完成任务实现
  7. 编写测试: 实现上述定义的测试用例
  8. 执行验证命令: 运行{execution_directives}中的命令验证你的工作成果
  9. 验证验收标准: 在报告完成前确保所有验收标准均已满足
  10. 共享探索结果: 将探索发现追加到共享记录板: ```bash echo '{"ts":"<ISO8601>","worker":"{id}","type":"<type>","data":{...}}' >> ${sessionFolder}/discoveries.ndjson ```
  11. 报告结果: 通过report_agent_job_result返回JSON格式的结果

Discovery Types to Share

可共享的探索记录类型

  • `code_pattern`: {name, file, description} — reusable patterns found
  • `integration_point`: {file, description, exports[]} — module connection points
  • `convention`: {naming, imports, formatting} — code style conventions
  • `blocker`: {issue, severity, impact} — blocking issues encountered

  • `code_pattern`: {name, file, description} — 可复用的代码模式
  • `integration_point`: {file, description, exports[]} — 模块连接点
  • `convention`: {naming, imports, formatting} — 代码风格约定
  • `blocker`: {issue, severity, impact} — 遇到的阻塞问题

Output (report_agent_job_result)

输出格式(通过report_agent_job_result返回)

Return JSON: { "id": "{id}", "status": "completed" | "failed", "findings": "Key discoveries and implementation notes (max 500 chars)", "files_modified": ["path1", "path2"], "tests_passed": true | false, "acceptance_met": "Summary of which acceptance criteria were met/unmet", "error": "" }
IMPORTANT: Set status to "completed" ONLY if:
  • All test cases pass
  • All acceptance criteria are met Otherwise set status to "failed" with details in error field. ` }
    undefined
  1. Master CSV Update Helper
    javascript
    function updateMasterCsvRow(sessionFolder, taskId, updates) {
      const csvPath = `${sessionFolder}/tasks.csv`
      const content = Read(csvPath)
      const lines = content.split('\n')
      const header = lines[0].split(',')
    
      for (let i = 1; i < lines.length; i++) {
        const cells = parseCsvLine(lines[i])
        if (cells[0] === taskId || cells[0] === `"${taskId}"`) {
          // Update specified columns
          for (const [col, val] of Object.entries(updates)) {
            const colIdx = header.indexOf(col)
            if (colIdx >= 0) {
              cells[colIdx] = `"${String(val).replace(/"/g, '""')}"`
            }
          }
          lines[i] = cells.join(',')
          break
        }
      }
    
      Write(csvPath, lines.join('\n'))
    }
Success Criteria:
  • All waves executed in order
  • Each wave's results merged into master CSV before next wave starts
  • Dependent tasks skipped when predecessor failed
  • discoveries.ndjson accumulated across all waves

返回JSON格式: { "id": "{id}", "status": "completed" | "failed", "findings": "关键发现与实现说明(最多500字符)", "files_modified": ["path1", "path2"], "tests_passed": true | false, "acceptance_met": "验收标准达成情况摘要", "error": "" }
重要提示: 仅当以下条件满足时,才将status设置为"completed":
  • 所有测试用例均通过
  • 所有验收标准均已满足 否则将status设置为"failed",并在error字段中填写详细信息。 ` }
    undefined
  1. 主CSV文件更新工具函数
    javascript
    function updateMasterCsvRow(sessionFolder, taskId, updates) {
      const csvPath = `${sessionFolder}/tasks.csv`
      const content = Read(csvPath)
      const lines = content.split('\n')
      const header = lines[0].split(',')
    
      for (let i = 1; i < lines.length; i++) {
        const cells = parseCsvLine(lines[i])
        if (cells[0] === taskId || cells[0] === `"${taskId}"`) {
          // 更新指定列
          for (const [col, val] of Object.entries(updates)) {
            const colIdx = header.indexOf(col)
            if (colIdx >= 0) {
              cells[colIdx] = `"${String(val).replace(/"/g, '""')}"`
            }
          }
          lines[i] = cells.join(',')
          break
        }
      }
    
      Write(csvPath, lines.join('\n'))
    }
成功标准:
  • 所有波次按顺序执行
  • 每个波次的执行结果在进入下一波次前已合并到主CSV文件
  • 当前驱任务失败时,依赖任务会被跳过
  • discoveries.ndjson在所有波次执行过程中持续累积

Phase 3: Results Aggregation

阶段3: 结果聚合

Objective: Generate final results and human-readable report.
Steps:
  1. Export results.csv
    javascript
    const masterCsv = Read(`${sessionFolder}/tasks.csv`)
    // results.csv = master CSV (already has all results populated)
    Write(`${sessionFolder}/results.csv`, masterCsv)
  2. Generate context.md
    javascript
    const tasks = parseCsv(masterCsv)
    const completed = tasks.filter(t => t.status === 'completed')
    const failed = tasks.filter(t => t.status === 'failed')
    const skipped = tasks.filter(t => t.status === 'skipped')
    
    const contextContent = `# CSV Batch Execution Report
Session: ${sessionId} Requirement: ${requirement} Completed: ${getUtc8ISOString()} Waves: ${maxWave} | Concurrency: ${maxConcurrency}

目标: 生成最终结果文件和人类可读报告。
步骤:
  1. 导出results.csv文件
    javascript
    const masterCsv = Read(`${sessionFolder}/tasks.csv`)
    // results.csv = 主CSV文件(已包含所有执行结果)
    Write(`${sessionFolder}/results.csv`, masterCsv)
  2. 生成context.md文档
    javascript
    const tasks = parseCsv(masterCsv)
    const completed = tasks.filter(t => t.status === 'completed')
    const failed = tasks.filter(t => t.status === 'failed')
    const skipped = tasks.filter(t => t.status === 'skipped')
    
    const contextContent = `# CSV批量执行报告
会话ID: ${sessionId} 需求: ${requirement} 完成时间: ${getUtc8ISOString()} 波次数量: ${maxWave} | 并发数: ${maxConcurrency}

Summary

执行摘要

MetricCount
Total Tasks${tasks.length}
Completed${completed.length}
Failed${failed.length}
Skipped${skipped.length}
Waves${maxWave}

指标数量
总任务数${tasks.length}
已完成${completed.length}
执行失败${failed.length}
已跳过${skipped.length}
波次数量${maxWave}

Wave Execution

波次执行详情

${Array.from({ length: maxWave }, (_, i) => i + 1).map(w => { const waveTasks = tasks.filter(t => parseInt(t.wave) === w) return
### Wave ${w} ${waveTasks.map(t => 
- [${t.id}] ${t.title}: ${t.status}${t.tests_passed ? ' ✓tests' : ''}${t.error ? ' — ' + t.error : ''} ${t.findings ? 'Findings: ' + t.findings : ''}
).join('\n')}
}).join('\n\n')}

${Array.from({ length: maxWave }, (_, i) => i + 1).map(w => { const waveTasks = tasks.filter(t => parseInt(t.wave) === w) return
### 波次 ${w} ${waveTasks.map(t => 
- [${t.id}] ${t.title}: ${t.status}${t.tests_passed ? ' ✓测试通过' : ''}${t.error ? ' — ' + t.error : ''} ${t.findings ? '执行发现: ' + t.findings : ''}
).join('\n')}
}).join('\n\n')}

Task Details

任务详情

${tasks.map(t => `### ${t.id}: ${t.title}
FieldValue
Status${t.status}
Wave${t.wave}
Scope${t.scope
Dependencies${t.deps
Context From${t.context_from
Tests Passed${t.tests_passed
Acceptance Met${t.acceptance_met
Error${t.error
Description: ${t.description}
Test Cases: ${t.test || 'N/A'}
Acceptance Criteria: ${t.acceptance_criteria || 'N/A'}
Hints: ${t.hints || 'N/A'}
Execution Directives: ${t.execution_directives || 'N/A'}
Findings: ${t.findings || 'N/A'}
Files Modified: ${t.files_modified || 'none'} `).join('\n---\n')}

${tasks.map(t => `### ${t.id}: ${t.title}
字段
状态${t.status}
所属波次${t.wave}
工作范围${t.scope
依赖任务${t.deps
上下文来源${t.context_from
测试是否通过${t.tests_passed
验收标准达成情况${t.acceptance_met
错误信息${t.error
任务描述: ${t.description}
测试用例: ${t.test || 'N/A'}
验收标准: ${t.acceptance_criteria || 'N/A'}
实现提示: ${t.hints || 'N/A'}
执行约束: ${t.execution_directives || 'N/A'}
执行发现: ${t.findings || 'N/A'}
修改的文件: ${t.files_modified || '无'} `).join('\n---\n')}

All Modified Files

所有修改的文件

${[...new Set(tasks.flatMap(t => (t.files_modified || '').split(';')).filter(Boolean))].map(f => '- ' + f).join('\n') || 'None'} `
Write(
${sessionFolder}/context.md
, contextContent)

3. **Display Summary**

```javascript
console.log(`
${[...new Set(tasks.flatMap(t => (t.files_modified || '').split(';')).filter(Boolean))].map(f => '- ' + f).join('\n') || '无'} `
Write(
${sessionFolder}/context.md
, contextContent)

3. **显示执行摘要**

```javascript
console.log(`

Execution Complete

执行完成

  • Session: ${sessionId}
  • Waves: ${maxWave}
  • Completed: ${completed.length}/${tasks.length}
  • Failed: ${failed.length}
  • Skipped: ${skipped.length}
Results: ${sessionFolder}/results.csv Report: ${sessionFolder}/context.md Discoveries: ${sessionFolder}/discoveries.ndjson `)

4. **Offer Next Steps** (skip if AUTO_YES)

```javascript
if (!AUTO_YES && failed.length > 0) {
  const answer = AskUserQuestion({
    questions: [{
      question: `${failed.length} tasks failed. Next action?`,
      header: "Next Step",
      multiSelect: false,
      options: [
        { label: "Retry Failed", description: `Re-execute ${failed.length} failed tasks with updated context` },
        { label: "View Report", description: "Display context.md" },
        { label: "Done", description: "Complete session" }
      ]
    }]
  })  // BLOCKS

  if (answer['Next Step'] === "Retry Failed") {
    // Reset failed tasks to pending, re-run Phase 2 for their waves
    for (const task of failed) {
      updateMasterCsvRow(sessionFolder, task.id, { status: 'pending', error: '' })
    }
    // Also reset skipped tasks whose deps are now retrying
    for (const task of skipped) {
      updateMasterCsvRow(sessionFolder, task.id, { status: 'pending', error: '' })
    }
    // Re-execute Phase 2 (loop will skip already-completed tasks)
    // → goto Phase 2
  } else if (answer['Next Step'] === "View Report") {
    console.log(Read(`${sessionFolder}/context.md`))
  }
}
Success Criteria:
  • results.csv exported
  • context.md generated
  • Summary displayed to user

  • 会话ID: ${sessionId}
  • 波次数量: ${maxWave}
  • 完成率: ${completed.length}/${tasks.length}
  • 失败任务数: ${failed.length}
  • 跳过任务数: ${skipped.length}
结果文件: ${sessionFolder}/results.csv 报告文档: ${sessionFolder}/context.md 探索记录: ${sessionFolder}/discoveries.ndjson `)

4. **提供后续操作选项**(若使用AUTO_YES则跳过)

```javascript
if (!AUTO_YES && failed.length > 0) {
  const answer = AskUserQuestion({
    questions: [{
      question: `有${failed.length}个任务执行失败,下一步操作?`,
      header: "后续操作",
      multiSelect: false,
      options: [
        { label: "重试失败任务", description: `结合更新后的上下文重新执行${failed.length}个失败任务` },
        { label: "查看报告", description: "显示context.md文档内容" },
        { label: "结束", description: "完成会话" }
      ]
    }]
  })  // 阻塞等待用户输入

  if (answer['后续操作'] === "重试失败任务") {
    // 将失败任务重置为pending状态,重新执行其所属波次的阶段2逻辑
    for (const task of failed) {
      updateMasterCsvRow(sessionFolder, task.id, { status: 'pending', error: '' })
    }
    // 同时重置因依赖失败而被跳过的任务状态
    for (const task of skipped) {
      updateMasterCsvRow(sessionFolder, task.id, { status: 'pending', error: '' })
    }
    // 重新执行阶段2(循环会跳过已完成的任务)
    // → 跳转到阶段2
  } else if (answer['后续操作'] === "查看报告") {
    console.log(Read(`${sessionFolder}/context.md`))
  }
}
成功标准:
  • 已导出results.csv文件
  • 已生成context.md文档
  • 已向用户显示执行摘要

Shared Discovery Board Protocol

共享探索记录板协议

All agents across all waves share
discoveries.ndjson
. This eliminates redundant codebase exploration.
Lifecycle:
  • Created by the first agent to write a discovery
  • Carries over across waves — never cleared
  • Agents append via
    echo '...' >> discoveries.ndjson
Format: NDJSON, each line is a self-contained JSON:
jsonl
{"ts":"2026-02-28T10:00:00+08:00","worker":"1","type":"code_pattern","data":{"name":"repository-pattern","file":"src/repos/Base.ts","description":"Abstract CRUD repository"}}
{"ts":"2026-02-28T10:01:00+08:00","worker":"2","type":"integration_point","data":{"file":"src/auth/index.ts","description":"Auth module entry","exports":["authenticate","authorize"]}}
Discovery Types:
typeDedup KeyDescription
code_pattern
data.name
Reusable code pattern found
integration_point
data.file
Module connection point
convention
singletonCode style conventions
blocker
data.issue
Blocking issue encountered
tech_stack
singletonProject technology stack
test_command
singletonTest commands discovered
Protocol Rules:
  1. Read board before own exploration → skip covered areas
  2. Write discoveries immediately via
    echo >>
    → don't batch
  3. Deduplicate — check existing entries; skip if same type + dedup key exists
  4. Append-only — never modify or delete existing lines

所有波次的Agent共享
discoveries.ndjson
文件。这可以消除重复的代码库探索工作。
生命周期:
  • 由第一个写入探索记录的Agent创建
  • 跨波次保留 — 永不清空
  • Agent通过
    echo '...' >>
    命令即时追加记录 — 不要批量写入
格式: NDJSON格式,每行是一个独立的JSON对象:
jsonl
{"ts":"2026-02-28T10:00:00+08:00","worker":"1","type":"code_pattern","data":{"name":"repository-pattern","file":"src/repos/Base.ts","description":"抽象CRUD仓库模式"}}
{"ts":"2026-02-28T10:01:00+08:00","worker":"2","type":"integration_point","data":{"file":"src/auth/index.ts","description":"认证模块入口","exports":["authenticate","authorize"]}}
探索记录类型:
类型去重键描述
code_pattern
data.name
发现的可复用代码模式
integration_point
data.file
模块连接点
convention
单例代码风格约定
blocker
data.issue
遇到的阻塞问题
tech_stack
单例项目技术栈
test_command
单例发现的测试命令
协议规则:
  1. 在开始自身探索前先读取记录板 → 跳过已覆盖的内容
  2. 通过
    echo >>
    命令即时写入探索记录 → 不要批量操作
  3. 去重 — 检查现有记录;若相同类型+去重键的记录已存在则跳过
  4. 仅追加模式 — 永不修改或删除现有行

Wave Computation Details

波次计算细节

Algorithm

算法

Kahn's BFS topological sort with depth tracking:
Input:  tasks[] with deps[]
Output: waveAssignment (taskId → wave number)

1. Build in-degree map and adjacency list from deps
2. Enqueue all tasks with in-degree 0 at wave 1
3. BFS: for each dequeued task at wave W:
   - For each dependent task D:
     - Decrement D's in-degree
     - D.wave = max(D.wave, W + 1)
     - If D's in-degree reaches 0, enqueue D
4. Any task without wave assignment → circular dependency error
带深度跟踪的Kahn's BFS拓扑排序:
输入: 带deps[]的tasks[]数组
输出: waveAssignment(taskId → 波次编号)

1. 基于依赖关系构建入度映射和邻接表
2. 将所有入度为0的任务加入队列,分配波次为1
3. BFS遍历:对于每个出队的波次W的任务:
   - 遍历其所有依赖任务D:
     - 减少D的入度
     - D.wave = max(D.wave, W + 1)
     - 若D的入度变为0,将其加入队列
4. 若存在未分配波次的任务 → 抛出循环依赖错误

Wave Properties

波次特性

  • Wave 1: No dependencies — all tasks in wave 1 are fully independent
  • Wave N: All dependencies are in waves 1..(N-1) — guaranteed completed before wave N starts
  • Within a wave: Tasks are independent of each other → safe for concurrent execution
  • 波次1: 无依赖 — 波次1中的所有任务完全独立
  • 波次N: 所有依赖任务均在波次1..(N-1)中 — 确保在波次N开始前已全部完成
  • 同一波次内: 任务之间相互独立 → 支持安全并发执行

Example

示例

Task A (no deps)      → Wave 1
Task B (no deps)      → Wave 1
Task C (deps: A)      → Wave 2
Task D (deps: A, B)   → Wave 2
Task E (deps: C, D)   → Wave 3

Execution:
  Wave 1: [A, B]  ← concurrent
  Wave 2: [C, D]  ← concurrent, sees A+B findings
  Wave 3: [E]     ← sees A+B+C+D findings

任务A(无依赖)      → 波次1
任务B(无依赖)      → 波次1
任务C(依赖A)      → 波次2
任务D(依赖A、B)   → 波次2
任务E(依赖C、D)   → 波次3

执行流程:
  波次1: [A, B]  ← 并发执行
  波次2: [C, D]  ← 并发执行,可获取A+B的执行结果
  波次3: [E]     ← 可获取A+B+C+D的执行结果

Context Propagation Flow

上下文传播流程

Wave 1 agents:
  ├─ Execute tasks (no prev_context)
  ├─ Write findings to report_agent_job_result
  └─ Append discoveries to discoveries.ndjson

        ↓ merge results into master CSV

Wave 2 agents:
  ├─ Read discoveries.ndjson (exploration sharing)
  ├─ Read prev_context column (wave 1 findings from context_from)
  ├─ Execute tasks with full upstream context
  ├─ Write findings to report_agent_job_result
  └─ Append new discoveries to discoveries.ndjson

        ↓ merge results into master CSV

Wave 3 agents:
  ├─ Read discoveries.ndjson (accumulated from waves 1+2)
  ├─ Read prev_context column (wave 1+2 findings from context_from)
  ├─ Execute tasks
  └─ ...
Two context channels:
  1. CSV findings (structured):
    context_from
    column →
    prev_context
    injection — task-specific directed context
  2. NDJSON discoveries (broadcast):
    discoveries.ndjson
    — general exploration findings available to all

波次1的Agent:
  ├─ 执行任务(无prev_context)
  ├─ 将执行结果写入report_agent_job_result
  └─ 将探索发现追加到discoveries.ndjson

        ↓ 将结果合并到主CSV文件

波次2的Agent:
  ├─ 读取discoveries.ndjson(共享探索结果)
  ├─ 读取prev_context列(来自context_from的波次1执行结果)
  ├─ 结合完整的上游上下文执行任务
  ├─ 将执行结果写入report_agent_job_result
  └─ 将新的探索发现追加到discoveries.ndjson

        ↓ 将结果合并到主CSV文件

波次3的Agent:
  ├─ 读取discoveries.ndjson(累积了波次1+2的探索结果)
  ├─ 读取prev_context列(来自context_from的波次1+2执行结果)
  ├─ 执行任务
  └─ ...
两种上下文通道:
  1. CSV结构化结果(定向):
    context_from
    列 →
    prev_context
    注入 — 任务特定的定向上下文
  2. NDJSON探索记录(广播):
    discoveries.ndjson
    — 所有Agent均可获取的通用探索结果

Error Handling

错误处理

ErrorResolution
Circular dependencyDetect in wave computation, abort with error message
Agent timeoutMark as failed in results, continue with wave
Agent failedMark as failed, skip dependent tasks in later waves
All agents in wave failedLog error, offer retry or abort
CSV parse errorValidate CSV format before execution, show line number
discoveries.ndjson corruptIgnore malformed lines, continue with valid entries
Continue mode: no session foundList available sessions, prompt user to select

错误类型解决方式
循环依赖在波次计算阶段检测到,抛出错误信息并终止执行
Agent超时在结果中标记为失败,继续执行当前波次其他任务
Agent执行失败标记为失败,后续波次中跳过其依赖任务
波次内所有Agent执行失败记录错误信息,提供重试或终止选项
CSV解析错误执行前验证CSV格式,显示错误行号
discoveries.ndjson损坏忽略格式错误的行,继续处理有效条目
恢复模式:未找到会话列出可用会话,提示用户选择

Core Rules

核心规则

  1. Start Immediately: First action is session initialization, then Phase 1
  2. Wave Order is Sacred: Never execute wave N before wave N-1 completes and results are merged
  3. CSV is Source of Truth: Master tasks.csv holds all state — always read before wave, always write after
  4. Context Propagation: prev_context built from master CSV, not from memory
  5. Discovery Board is Append-Only: Never clear, modify, or recreate discoveries.ndjson
  6. Skip on Failure: If a dependency failed, skip the dependent task (don't attempt)
  7. Cleanup Temp Files: Remove wave-{N}.csv after results are merged
  8. DO NOT STOP: Continuous execution until all waves complete or all remaining tasks are skipped

  1. 立即启动: 第一个操作是会话初始化,然后进入阶段1
  2. 波次顺序不可打破: 绝对不能在波次N-1完成并合并结果前执行波次N
  3. CSV是唯一可信源: 主tasks.csv文件存储所有状态 — 执行波次前必须读取,执行后必须写入
  4. 上下文传播规则: prev_context必须从主CSV文件构建,而非内存
  5. 探索记录板仅追加: 永不清空、修改或重建discoveries.ndjson
  6. 失败则跳过: 若依赖任务失败,跳过该依赖任务(不尝试执行)
  7. 清理临时文件: 结果合并后删除wave-{N}.csv文件
  8. 持续执行: 持续执行直到所有波次完成或剩余任务均被跳过

Best Practices

最佳实践

  1. Task Granularity: 3-10 tasks optimal; too many = overhead, too few = no parallelism benefit
  2. Minimize Cross-Wave Deps: More tasks in wave 1 = more parallelism
  3. Specific Descriptions: Agent sees only its CSV row + prev_context — make description self-contained
  4. Context From ≠ Deps:
    deps
    = execution order constraint;
    context_from
    = information flow. A task can have
    context_from
    without
    deps
    (it just reads previous findings but doesn't require them to be done first in its wave)
  5. Concurrency Tuning:
    -c 1
    for serial execution (maximum context sharing);
    -c 8
    for I/O-bound tasks

  1. 任务粒度: 3-10个任务为最优;任务过多会增加开销,任务过少则无法发挥并行执行的优势
  2. 减少跨波次依赖: 波次1中的任务越多,并行执行的效率越高
  3. 描述具体化: Agent仅能看到其CSV行和prev_context — 确保任务描述足够独立完整
  4. Context From ≠ Deps:
    deps
    = 执行顺序约束;
    context_from
    = 信息流。任务可以在没有
    deps
    的情况下设置
    context_from
    (仅需要读取前置任务的执行结果,但不需要在自身波次前完成)
  5. 并发数调优:
    -c 1
    用于串行执行(最大化上下文共享);
    -c 8
    用于I/O密集型任务

Usage Recommendations

使用建议

ScenarioRecommended Approach
Independent parallel tasks (no deps)
$csv-wave-pipeline -c 8
— single wave, max parallelism
Linear pipeline (A→B→C)
$csv-wave-pipeline -c 1
— 3 waves, serial, full context
Diamond dependency (A→B,C→D)
$csv-wave-pipeline
— 3 waves, B+C concurrent in wave 2
Complex requirement, unclear tasksUse
$roadmap-with-file
first for planning, then feed issues here
Single complex taskUse
$workflow-lite-plan
instead
场景推荐方案
独立并行任务(无依赖)
$csv-wave-pipeline -c 8
— 单波次,最大并发数
线性流水线(A→B→C)
$csv-wave-pipeline -c 1
— 3个波次,串行执行,完整上下文共享
菱形依赖(A→B,C→D)
$csv-wave-pipeline
— 3个波次,B+C在波次2并发执行
复杂需求,任务不明确先使用
$roadmap-with-file
进行规划,再将需求传入本工具
单个复杂任务改用
$workflow-lite-plan
工具