sub-agent-orchestrator

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Sub-Agent Orchestrator

子Agent编排器

A workflow orchestration engine for designing and executing complex multi-agent pipelines. Define agent roles, dependencies, and handoff protocols using a YAML-based workflow syntax. Supports sequential chains, parallel fan-out/fan-in, conditional routing, retry logic, timeout handling, and structured result validation.
一款用于设计和执行复杂多Agent流水线的工作流编排引擎。使用基于YAML的工作流语法定义Agent角色、依赖关系和交接协议。支持链式执行、并行分发/聚合、条件路由、重试逻辑、超时处理以及结构化结果验证。

Why This Exists

设计初衷

Agent Army deploys many agents doing the same type of work in parallel (homogeneous parallelism). Agent Swarm does the same for data processing. The Sub-Agent Orchestrator is different: it coordinates agents doing different types of work that depend on each other (heterogeneous pipelines).
Think of it as a workflow engine where each step is an intelligent agent, not a dumb function.
Agent Army部署多个执行同类任务的Agent并行工作(同构并行)。Agent Swarm则针对数据处理场景实现相同模式。而子Agent编排器有所不同:它协调执行不同类型任务且彼此依赖的Agent(异构流水线)。
可以将其视为每个步骤都是智能Agent而非普通函数的工作流引擎。

Comparison

对比

FeatureAgent ArmyAgent SwarmSub-Agent Orchestrator
Agent rolesSame (all do code changes)Same (all do data processing)Different (each has a unique role)
Execution patternParallel fan-outParallel fan-outSequential, parallel, conditional, or mixed
DependenciesFile-level (import graph)None (items are independent)Task-level (output of A feeds input of B)
Use caseBulk code changesBulk data processingComplex workflows with multiple stages
特性Agent ArmyAgent Swarm子Agent编排器
Agent角色相同(均执行代码变更)相同(均执行数据处理)不同(每个Agent拥有唯一角色)
执行模式并行分发并行分发链式、并行、条件或混合模式
依赖关系文件级别(导入图谱)无(任务相互独立)任务级别(Agent A的输出作为Agent B的输入)
适用场景批量代码变更批量数据处理包含多个阶段的复杂工作流

Supported Workflow Patterns

支持的工作流模式

1. Sequential Chain

1. 链式执行

[Agent A] --> [Agent B] --> [Agent C]
Each agent's output becomes the next agent's input. Use when tasks have strict ordering dependencies.
Example: Research a company (A) -> Draft a proposal (B) -> Review and polish (C)
[Agent A] --> [Agent B] --> [Agent C]
每个Agent的输出成为下一个Agent的输入。适用于任务有严格顺序依赖的场景。
示例:调研目标公司(A)-> 起草提案(B)-> 审核润色(C)

2. Parallel Fan-Out / Fan-In

2. 并行分发/聚合

              /--> [Agent B1] --\
[Agent A] ---|                   |--> [Agent D]
              \--> [Agent B2] --/
Agent A produces output. Multiple agents process it in parallel. Agent D aggregates their results.
Example: Analyze a dataset (A) -> Score by 3 criteria in parallel (B1, B2, B3) -> Aggregate scores (D)
              /--> [Agent B1] --\
[Agent A] ---|                   |--> [Agent D]
              \--> [Agent B2] --/
Agent A生成输出,多个Agent并行处理该输出,Agent D聚合所有结果。
示例:分析数据集(A)-> 并行按3个维度打分(B1、B2、B3)-> 聚合分数(D)

3. Conditional Routing

3. 条件路由

                    /--> [Agent B] (if condition X)
[Agent A] -- GATE -|
                    \--> [Agent C] (if condition Y)
Agent A's output is evaluated against conditions. The appropriate downstream agent is invoked based on the result.
Example: Classify a lead (A) -> If hot, draft proposal (B) / If cold, add to nurture (C)
                    /--> [Agent B](满足条件X时)
[Agent A] -- 网关 -|
                    \--> [Agent C](满足条件Y时)
根据Agent A的输出评估条件,结果决定调用对应的下游Agent。
示例:分类销售线索(A)-> 如果是高价值线索,起草提案(B)/ 如果是低价值线索,加入培育池(C)

4. Loop / Iteration

4. 循环迭代

[Agent A] --> [Agent B] --> [Validator] --failed--> [Agent B] (retry)
                                        --passed--> [Done]
A validator agent checks the output. If it fails, the producing agent retries with feedback.
Example: Write a draft (A) -> Review for quality (B) -> If fails quality bar, rewrite with feedback (loop back to A)
[Agent A] --> [Agent B] --> [验证器] --失败--> [Agent B](重试)
                                        --通过--> [完成]
验证器Agent检查输出,若未通过,生成输出的Agent将基于反馈重试。
示例:撰写草稿(A)-> 质量审核(B)-> 若未达质量标准,基于反馈重写(循环回A)

5. Map-Reduce

5. 映射-归约(Map-Reduce)

[Splitter] --> [Agent 1] --\
           --> [Agent 2]  --|--> [Reducer]
           --> [Agent 3] --/
Split input into chunks, process in parallel, reduce to a single output.
Example: Split a long document into sections -> Summarize each section -> Combine into executive summary
[拆分器] --> [Agent 1] --\
           --> [Agent 2]  --|--> [归约器]
           --> [Agent 3] --/
将输入拆分为多个块,并行处理后合并为单一输出。
示例:将长文档拆分为多个章节 -> 总结每个章节 -> 合并为执行摘要

6. Pipeline with Fallback

6. 带降级策略的流水线

[Agent A] --success--> [Agent B]
           --failure--> [Fallback Agent A'] --success--> [Agent B]
If an agent fails, a fallback agent attempts the same task with a different approach.
[Agent A] --成功--> [Agent B]
           --失败--> [降级Agent A'] --成功--> [Agent B]
若Agent执行失败,降级Agent将采用不同方式尝试完成同一任务。

Workflow Definition Language (YAML)

工作流定义语言(YAML)

Workflows are defined in YAML. The orchestrator parses this definition and executes it step by step.
工作流通过YAML定义,编排器会解析该定义并逐步执行。

Schema

Schema

yaml
workflow:
  name: string                    # Human-readable workflow name
  description: string             # What this workflow accomplishes
  version: string                 # Workflow version (semver)
  timeout: string                 # Global timeout (e.g., "30m", "2h")
  
  # Input variables available to all agents
  inputs:
    - name: string                # Variable name
      type: string                # string | number | boolean | json | file_path
      required: boolean
      default: any                # Default value if not provided
      description: string

  # Agent role definitions
  agents:
    agent_id:                     # Unique identifier (snake_case)
      name: string                # Human-readable name
      role: string                # Brief role description
      prompt: string              # Full prompt template (supports {{variable}} interpolation)
      tools: string[]             # Tools this agent can use (Read, Write, Bash, etc.)
      timeout: string             # Per-agent timeout (overrides global)
      retry:
        max_attempts: number      # Maximum retry attempts (default: 1, no retry)
        backoff: string           # none | linear | exponential
        on_failure: string        # skip | abort | fallback:{agent_id}
      validation:
        schema: object            # JSON schema for expected output
        rules: string[]           # Plain-English validation rules
      
  # Execution steps
  steps:
    - id: string                  # Step identifier
      agent: string               # Agent ID to execute
      type: string                # sequential | parallel | conditional | loop | map
      
      # For sequential steps
      input: string               # "{{inputs.variable}}" or "{{steps.prev_step.output}}"
      
      # For parallel steps
      parallel:
        - agent: string
          input: string
        - agent: string
          input: string
      wait: all | any | N         # Wait for all, any one, or N to complete
      
      # For conditional steps
      condition:
        eval: string              # Expression to evaluate (e.g., "{{steps.classify.output.category}} == 'hot'")
        true: string              # Step ID or agent ID to run if true
        false: string             # Step ID or agent ID to run if false
      
      # For loop steps
      loop:
        agent: string             # Agent to loop
        validator: string         # Validator agent ID
        max_iterations: number    # Safety limit
        feedback_path: string     # Where to get feedback from validator
      
      # For map steps  
      map:
        over: string              # Array to iterate over (e.g., "{{steps.split.output.chunks}}")
        agent: string             # Agent to run for each element
        reduce: string            # Reducer agent ID
      
      # Output handling
      output:
        store_as: string          # Variable name to store result
        format: string            # json | text | markdown
yaml
workflow:
  name: string                    # 易读的工作流名称
  description: string             # 工作流的目标
  version: string                 # 工作流版本(语义化版本)
  timeout: string                 # 全局超时时间(例如:"30m", "2h")
  
  # 所有Agent可使用的输入变量
  inputs:
    - name: string                # 变量名称
      type: string                # string | number | boolean | json | file_path
      required: boolean
      default: any                # 未提供时的默认值
      description: string

  # Agent角色定义
  agents:
    agent_id:                     # 唯一标识符(蛇形命名法)
      name: string                # 易读的名称
      role: string                # 角色简要描述
      prompt: string              # 完整提示模板(支持{{variable}}插值)
      tools: string[]             # Agent可使用的工具(Read、Write、Bash等)
      timeout: string             # 单Agent超时时间(覆盖全局设置)
      retry:
        max_attempts: number      # 最大重试次数(默认:1,即不重试)
        backoff: string           # none | linear | exponential
        on_failure: string        # skip | abort | fallback:{agent_id}
      validation:
        schema: object            # 预期输出的JSON Schema
        rules: string[]           # 自然语言描述的验证规则
      
  # 执行步骤
  steps:
    - id: string                  # 步骤标识符
      agent: string               # 要执行的Agent ID
      type: string                # sequential | parallel | conditional | loop | map
      
      # 链式执行步骤配置
      input: string               # "{{inputs.variable}}" 或 "{{steps.prev_step.output}}"
      
      # 并行执行步骤配置
      parallel:
        - agent: string
          input: string
        - agent: string
          input: string
      wait: all | any | N         # 等待全部、任意一个或N个完成
      
      # 条件路由步骤配置
      condition:
        eval: string              # 要评估的表达式(例如:"{{steps.classify.output.category}} == 'hot'")
        true: string              # 条件为真时执行的步骤ID或Agent ID
        false: string             # 条件为假时执行的步骤ID或Agent ID
      
      # 循环步骤配置
      loop:
        agent: string             # 要循环执行的Agent
        validator: string         # 验证器Agent ID
        max_iterations: number    # 安全限制次数
        feedback_path: string     # 从验证器获取反馈的路径
      
      # 映射步骤配置  
      map:
        over: string              # 要迭代的数组(例如:"{{steps.split.output.chunks}}")
        agent: string             # 每个元素要执行的Agent
        reduce: string            # 归约器Agent ID
      
      # 输出处理
      output:
        store_as: string          # 存储结果的变量名称
        format: string            # json | text | markdown

Example Workflow: Research-to-Proposal Pipeline

示例工作流:调研到提案流水线

yaml
workflow:
  name: research-to-proposal
  description: Research a prospect company and generate a personalized consulting proposal
  version: "1.0.0"
  timeout: "15m"
  
  inputs:
    - name: company_name
      type: string
      required: true
      description: Target company name
    - name: contact_name
      type: string
      required: true
      description: Primary contact at the company
    - name: our_services
      type: string
      required: true
      description: Description of our service offerings
    - name: rough_scope
      type: string
      required: false
      default: ""
      description: Any known scope details

  agents:
    researcher:
      name: Company Researcher
      role: Gather intelligence about the target company
      prompt: |
        You are a business research analyst. Research {{inputs.company_name}} and compile:
        1. Company overview (size, industry, recent news)
        2. Key challenges in their industry
        3. Technology stack (if detectable)
        4. Recent hiring patterns (signals growth areas)
        5. Competitor landscape
        
        Use web search and any available tools. Return structured JSON.
      tools: [Read, Bash, WebSearch]
      timeout: "3m"
      retry:
        max_attempts: 2
        on_failure: skip
      validation:
        rules:
          - "Output must include company_overview field"
          - "Output must include key_challenges array"
    
    pain_identifier:
      name: Pain Point Identifier
      role: Identify specific pain points we can solve
      prompt: |
        Given this research about {{inputs.company_name}}:
        {{steps.research.output}}
        
        And our services:
        {{inputs.our_services}}
        
        Identify the top 3 pain points where our services align with their needs.
        For each pain point, explain:
        1. The problem (from their perspective)
        2. The business impact (cost, time, risk)
        3. How our service addresses it
        
        Return as structured JSON array.
      tools: [Read]
      timeout: "2m"
      validation:
        rules:
          - "Must identify exactly 3 pain points"
          - "Each pain point must have problem, impact, and solution fields"
    
    pricing_analyst:
      name: Pricing Analyst
      role: Develop pricing tiers based on scope
      prompt: |
        Given these pain points for {{inputs.company_name}}:
        {{steps.identify_pains.output}}
        
        Scope details: {{inputs.rough_scope}}
        
        Create 3 pricing tiers:
        1. Starter: Addresses the primary pain point only
        2. Growth: Addresses top 2 pain points with deeper engagement
        3. Enterprise: Full scope, all 3 pain points, ongoing support
        
        For each tier, provide:
        - Name and tagline
        - Scope description
        - Deliverables list
        - Timeline
        - Price range (provide realistic consulting rates)
        - ROI justification
        
        Return as structured JSON.
      tools: [Read]
      timeout: "2m"
    
    proposal_writer:
      name: Proposal Writer
      role: Draft the full proposal document
      prompt: |
        Write a professional consulting proposal for {{inputs.company_name}}, 
        contact: {{inputs.contact_name}}.
        
        Research data: {{steps.research.output}}
        Pain points: {{steps.identify_pains.output}}
        Pricing tiers: {{steps.pricing.output}}
        
        Structure:
        1. Executive Summary (personalized to their situation)
        2. Understanding of Your Challenges (mirror their pain points)
        3. Proposed Approach (our methodology)
        4. Scope and Deliverables (3 tiers)
        5. Timeline
        6. Investment (pricing tiers)
        7. Why Us (differentiators)
        8. Next Steps
        
        Tone: Professional, confident, consultative. Not salesy.
        Length: 2000-3000 words.
        Format: Markdown.
      tools: [Read, Write]
      timeout: "4m"
    
    reviewer:
      name: Quality Reviewer  
      role: Review proposal for quality and accuracy
      prompt: |
        Review this proposal for {{inputs.company_name}}:
        {{steps.draft.output}}
        
        Check for:
        1. Factual accuracy (does it match the research data?)
        2. Personalization (does it feel generic or tailored?)
        3. Pricing reasonableness (are the tiers realistic?)
        4. Professional tone (no typos, no overselling)
        5. Completeness (all 8 sections present?)
        
        Return a review with:
        - overall_score (1-10)
        - passed (boolean, true if score >= 7)
        - feedback (array of specific improvements)
        - critical_issues (array, empty if none)
      tools: [Read]
      timeout: "2m"
      validation:
        rules:
          - "Must include overall_score field"
          - "Must include passed boolean"

  steps:
    - id: research
      agent: researcher
      type: sequential
      input: "{{inputs.company_name}}"
      output:
        store_as: research_data
        format: json

    - id: identify_pains
      agent: pain_identifier
      type: sequential
      input: "{{steps.research.output}}"
      output:
        store_as: pain_points
        format: json

    - id: pricing
      agent: pricing_analyst
      type: sequential
      input: "{{steps.identify_pains.output}}"
      output:
        store_as: pricing_tiers
        format: json

    - id: draft
      agent: proposal_writer
      type: sequential
      input: "all prior context"
      output:
        store_as: proposal_draft
        format: markdown

    - id: review
      type: loop
      loop:
        agent: proposal_writer
        validator: reviewer
        max_iterations: 2
        feedback_path: "{{steps.review.output.feedback}}"
      output:
        store_as: final_proposal
        format: markdown
yaml
workflow:
  name: research-to-proposal
  description: 调研目标客户公司并生成个性化咨询提案
  version: "1.0.0"
  timeout: "15m"
  
  inputs:
    - name: company_name
      type: string
      required: true
      description: 目标公司名称
    - name: contact_name
      type: string
      required: true
      description: 公司主要联系人
    - name: our_services
      type: string
      required: true
      description: 我方服务内容描述
    - name: rough_scope
      type: string
      required: false
      default: ""
      description: 已知的项目范围细节

  agents:
    researcher:
      name: 企业研究员
      role: 收集目标公司的情报
      prompt: |
        你是一名商业研究分析师,请调研{{inputs.company_name}}并整理:
        1. 公司概况(规模、行业、近期动态)
        2. 其行业面临的核心挑战
        3. 技术栈(若可识别)
        4. 近期招聘趋势(反映业务增长方向)
        5. 竞争格局
        
        使用网页搜索及可用工具,返回结构化JSON。
      tools: [Read, Bash, WebSearch]
      timeout: "3m"
      retry:
        max_attempts: 2
        on_failure: skip
      validation:
        rules:
          - "输出必须包含company_overview字段"
          - "输出必须包含key_challenges数组"
    
    pain_identifier:
      name: 痛点识别师
      role: 识别我方可解决的具体痛点
      prompt: |
        基于以下关于{{inputs.company_name}}的调研数据:
        {{steps.research.output}}
        
        以及我方服务内容:
        {{inputs.our_services}}
        
        识别出我方服务与客户需求匹配的Top 3痛点。
        针对每个痛点,说明:
        1. 客户视角下的问题
        2. 业务影响(成本、时间、风险)
        3. 我方服务如何解决该问题
        
        返回结构化JSON数组。
      tools: [Read]
      timeout: "2m"
      validation:
        rules:
          - "必须识别出恰好3个痛点"
          - "每个痛点必须包含problem、impact和solution字段"
    
    pricing_analyst:
      name: 定价分析师
      role: 根据项目范围制定定价层级
      prompt: |
        基于{{inputs.company_name}}的以下痛点:
        {{steps.identify_pains.output}}
        
        项目范围细节:{{inputs.rough_scope}}
        
        创建3个定价层级:
        1. 入门版:仅解决核心痛点
        2. 成长版:解决Top 2痛点,提供深度服务
        3. 企业版:全范围服务,解决所有3个痛点,包含持续支持
        
        每个层级需提供:
        - 名称与标语
        - 服务范围描述
        - 交付物清单
        - 时间线
        - 价格区间(提供真实合理的咨询费率)
        - ROI合理性说明
        
        返回结构化JSON。
      tools: [Read]
      timeout: "2m"
    
    proposal_writer:
      name: 提案撰写师
      role: 起草完整的提案文档
      prompt: |
        为{{inputs.company_name}}撰写专业咨询提案,
        联系人:{{inputs.contact_name}}。
        
        调研数据:{{steps.research.output}}
        痛点信息:{{steps.identify_pains.output}}
        定价层级:{{steps.pricing.output}}
        
        文档结构:
        1. 执行摘要(贴合客户情况)
        2. 对客户挑战的理解(呼应客户痛点)
        3. 建议方案(我方方法论)
        4. 服务范围与交付物(3个层级)
        5. 时间线
        6. 投资成本(定价层级)
        7. 选择我方的理由(差异化优势)
        8. 下一步行动
        
        语气:专业、自信、顾问式,避免过度销售。
        篇幅:2000-3000字。
        格式:Markdown。
      tools: [Read, Write]
      timeout: "4m"
    
    reviewer:
      name: 质量审核师  
      role: 审核提案的质量与准确性
      prompt: |
        审核这份针对{{inputs.company_name}}的提案:
        {{steps.draft.output}}
        
        检查要点:
        1. 事实准确性(是否与调研数据一致?)
        2. 个性化程度(是通用模板还是量身定制?)
        3. 定价合理性(层级设置是否真实可行?)
        4. 专业语气(无错别字、无过度销售表述)
        5. 完整性(是否包含全部8个章节?)
        
        返回审核结果,包含:
        - overall_score(1-10分)
        - passed(布尔值,分数>=7时为true)
        - feedback(具体改进建议数组)
        - critical_issues(严重问题数组,无问题则为空)
      tools: [Read]
      timeout: "2m"
      validation:
        rules:
          - "必须包含overall_score字段"
          - "必须包含passed布尔值"

  steps:
    - id: research
      agent: researcher
      type: sequential
      input: "{{inputs.company_name}}"
      output:
        store_as: research_data
        format: json

    - id: identify_pains
      agent: pain_identifier
      type: sequential
      input: "{{steps.research.output}}"
      output:
        store_as: pain_points
        format: json

    - id: pricing
      agent: pricing_analyst
      type: sequential
      input: "{{steps.identify_pains.output}}"
      output:
        store_as: pricing_tiers
        format: json

    - id: draft
      agent: proposal_writer
      type: sequential
      input: "all prior context"
      output:
        store_as: proposal_draft
        format: markdown

    - id: review
      type: loop
      loop:
        agent: proposal_writer
        validator: reviewer
        max_iterations: 2
        feedback_path: "{{steps.review.output.feedback}}"
      output:
        store_as: final_proposal
        format: markdown

Example Workflow: Multi-Criteria Lead Scoring

示例工作流:多维度销售线索打分

yaml
workflow:
  name: multi-criteria-lead-scoring
  description: Score a lead from 3 different angles in parallel, then aggregate
  version: "1.0.0"
  timeout: "5m"
  
  inputs:
    - name: lead_data
      type: json
      required: true
      description: Lead data object with name, company, title, etc.
    - name: icp_criteria
      type: json
      required: true
      description: Ideal Customer Profile criteria

  agents:
    firmographic_scorer:
      name: Firmographic Scorer
      role: Score based on company attributes
      prompt: |
        Score this lead on firmographic fit (0-100):
        Lead: {{inputs.lead_data}}
        ICP: {{inputs.icp_criteria}}
        
        Evaluate: company size, industry, geography, revenue range.
        Return: { score: number, breakdown: object, reasoning: string }
      tools: [Read]
      timeout: "1m"
    
    technographic_scorer:
      name: Technographic Scorer
      role: Score based on tech stack alignment
      prompt: |
        Score this lead on technographic fit (0-100):
        Lead: {{inputs.lead_data}}
        ICP: {{inputs.icp_criteria}}
        
        Evaluate: current tech stack, tools they use, technical maturity.
        Return: { score: number, breakdown: object, reasoning: string }
      tools: [Read]
      timeout: "1m"
    
    intent_scorer:
      name: Intent Scorer
      role: Score based on buying signals
      prompt: |
        Score this lead on buying intent (0-100):
        Lead: {{inputs.lead_data}}
        
        Evaluate: recent job postings, funding rounds, tech changes, content consumption.
        Return: { score: number, signals: string[], reasoning: string }
      tools: [Read, Bash]
      timeout: "1m"
    
    aggregator:
      name: Score Aggregator
      role: Combine scores into final recommendation
      prompt: |
        Aggregate these parallel scoring results for lead {{inputs.lead_data.name}}:
        
        Firmographic: {{steps.parallel_scoring.outputs.firmographic}}
        Technographic: {{steps.parallel_scoring.outputs.technographic}}
        Intent: {{steps.parallel_scoring.outputs.intent}}
        
        Calculate weighted final score:
        - Firmographic: 40% weight
        - Technographic: 30% weight
        - Intent: 30% weight
        
        Return:
        {
          final_score: number,
          category: "hot" | "warm" | "cold" | "disqualify",
          recommended_action: string,
          summary: string,
          breakdown: { firmographic, technographic, intent }
        }
      tools: [Read]
      timeout: "1m"

  steps:
    - id: parallel_scoring
      type: parallel
      parallel:
        - agent: firmographic_scorer
          input: "{{inputs.lead_data}}"
          output_key: firmographic
        - agent: technographic_scorer
          input: "{{inputs.lead_data}}"
          output_key: technographic
        - agent: intent_scorer
          input: "{{inputs.lead_data}}"
          output_key: intent
      wait: all
      output:
        store_as: parallel_scores
        format: json

    - id: aggregate
      agent: aggregator
      type: sequential
      input: "{{steps.parallel_scoring.outputs}}"
      output:
        store_as: final_score
        format: json
yaml
workflow:
  name: multi-criteria-lead-scoring
  description: 从3个不同维度并行打分,然后聚合结果
  version: "1.0.0"
  timeout: "5m"
  
  inputs:
    - name: lead_data
      type: json
      required: true
      description: 包含姓名、公司、职位等信息的线索数据对象
    - name: icp_criteria
      type: json
      required: true
      description: 理想客户画像(ICP)标准

  agents:
    firmographic_scorer:
      name: 企业属性打分师
      role: 根据公司属性打分
      prompt: |
        对该线索的企业属性匹配度打分(0-100):
        线索:{{inputs.lead_data}}
        ICP:{{inputs.icp_criteria}}
        
        评估维度:公司规模、行业、地域、收入范围。
        返回:{ score: number, breakdown: object, reasoning: string }
      tools: [Read]
      timeout: "1m"
    
    technographic_scorer:
      name: 技术属性打分师
      role: 根据技术栈匹配度打分
      prompt: |
        对该线索的技术属性匹配度打分(0-100):
        线索:{{inputs.lead_data}}
        ICP:{{inputs.icp_criteria}}
        
        评估维度:当前技术栈、使用工具、技术成熟度。
        返回:{ score: number, breakdown: object, reasoning: string }
      tools: [Read]
      timeout: "1m"
    
    intent_scorer:
      name: 购买意向打分师
      role: 根据购买信号打分
      prompt: |
        对该线索的购买意向打分(0-100):
        线索:{{inputs.lead_data}}
        
        评估维度:近期招聘信息、融资轮次、技术变更、内容消费情况。
        返回:{ score: number, signals: string[], reasoning: string }
      tools: [Read, Bash]
      timeout: "1m"
    
    aggregator:
      name: 分数聚合师
      role: 合并分数生成最终建议
      prompt: |
        聚合以下针对线索{{inputs.lead_data.name}}的并行打分结果:
        
        企业属性:{{steps.parallel_scoring.outputs.firmographic}}
        技术属性:{{steps.parallel_scoring.outputs.technographic}}
        购买意向:{{steps.parallel_scoring.outputs.intent}}
        
        计算加权最终分数:
        - 企业属性:40%权重
        - 技术属性:30%权重
        - 购买意向:30%权重
        
        返回:
        {
          final_score: number,
          category: "hot" | "warm" | "cold" | "disqualify",
          recommended_action: string,
          summary: string,
          breakdown: { firmographic, technographic, intent }
        }
      tools: [Read]
      timeout: "1m"

  steps:
    - id: parallel_scoring
      type: parallel
      parallel:
        - agent: firmographic_scorer
          input: "{{inputs.lead_data}}"
          output_key: firmographic
        - agent: technographic_scorer
          input: "{{inputs.lead_data}}"
          output_key: technographic
        - agent: intent_scorer
          input: "{{inputs.lead_data}}"
          output_key: intent
      wait: all
      output:
        store_as: parallel_scores
        format: json

    - id: aggregate
      agent: aggregator
      type: sequential
      input: "{{steps.parallel_scoring.outputs}}"
      output:
        store_as: final_score
        format: json

Execution Engine

执行引擎

The orchestrator follows this execution model when processing a workflow.
编排器处理工作流时遵循以下执行模型。

Parse Phase

解析阶段

  1. Load workflow YAML -- Read and parse the workflow definition
  2. Validate schema -- Ensure all required fields are present, agent IDs are referenced correctly, no circular dependencies in sequential steps
  3. Resolve inputs -- Collect all required inputs from the user, apply defaults for optional inputs
  4. Build execution graph -- Convert steps into a directed acyclic graph (DAG) for execution ordering
  1. 加载工作流YAML -- 读取并解析工作流定义
  2. 验证Schema -- 确保所有必填字段存在,Agent ID引用正确,链式步骤无循环依赖
  3. 解析输入 -- 收集用户提供的所有必填输入,为可选输入应用默认值
  4. 构建执行图谱 -- 将步骤转换为有向无环图(DAG)以确定执行顺序

Execute Phase

执行阶段

For each step in topological order:
按拓扑顺序执行每个步骤:

Sequential Step

链式步骤

1. Resolve input template (replace {{variables}} with actual values)
2. Build agent prompt from agent definition + resolved input
3. Deploy agent using Agent tool
4. Wait for completion
5. Validate output against agent's validation rules
6. Store output in step context
7. Proceed to next step
1. 解析输入模板(将{{variables}}替换为实际值)
2. 结合Agent定义与解析后的输入构建Agent提示词
3. 使用Agent工具部署Agent
4. 等待执行完成
5. 根据Agent的验证规则验证输出
6. 将输出存储到步骤上下文
7. 执行下一步骤

Parallel Step

并行步骤

1. Resolve all input templates
2. Build all agent prompts
3. Deploy ALL agents simultaneously (single message, multiple Agent tool calls)
4. Wait based on wait policy:
   - all: Wait for every agent to complete
   - any: Proceed when first agent completes
   - N: Proceed when N agents complete
5. Collect and validate all outputs
6. Store outputs (keyed by output_key) in step context
7. Proceed to next step
1. 解析所有输入模板
2. 构建所有Agent的提示词
3. 同时部署所有Agent(单条消息,多次Agent工具调用)
4. 根据等待策略等待:
   - all: 等待所有Agent完成
   - any: 第一个Agent完成后继续
   - N: N个Agent完成后继续
5. 收集并验证所有输出
6. 将输出(按output_key分组)存储到步骤上下文
7. 执行下一步骤

Conditional Step

条件路由步骤

1. Resolve the condition expression
2. Evaluate: parse the expression and determine true/false
3. Route to the appropriate agent/step based on the result
4. Execute that branch
5. Store output in step context
6. Proceed to next step
1. 解析条件表达式
2. 评估:解析表达式并判断真假
3. 根据结果路由到对应的Agent/步骤
4. 执行该分支
5. 将输出存储到步骤上下文
6. 执行下一步骤

Loop Step

循环步骤

1. Execute the primary agent
2. Pass output to the validator agent
3. If validator returns passed=true: exit loop, store output
4. If validator returns passed=false:
   a. Extract feedback from validator
   b. Re-execute primary agent with original input + feedback
   c. Repeat from step 2
5. If max_iterations reached: store last output, flag as "max iterations reached"
1. 执行主Agent
2. 将输出传递给验证器Agent
3. 若验证器返回passed=true:退出循环,存储输出
4. 若验证器返回passed=false:
   a. 从验证器提取反馈
   b. 使用原始输入+反馈重新执行主Agent
   c. 回到步骤2重复执行
5. 若达到max_iterations:存储最后一次输出,标记为"已达最大迭代次数"

Map Step

映射步骤

1. Resolve the array to iterate over
2. For each element, deploy an agent instance (parallel, batched if > 20)
3. Collect all results
4. Pass collected results to the reducer agent
5. Store reducer's output in step context
1. 解析要迭代的数组
2. 为每个元素部署一个Agent实例(并行,若超过20个则分批执行)
3. 收集所有结果
4. 将收集到的结果传递给归约器Agent
5. 将归约器的输出存储到步骤上下文

Retry Logic

重试逻辑

When an agent fails:
attempt = 1
while attempt <= max_attempts:
    result = deploy_agent(brief)
    if result.success:
        return result
    attempt += 1
    if backoff == "linear":
        wait(attempt * 5 seconds)  // conceptual, actual implementation uses re-deploy timing
    elif backoff == "exponential":
        wait(2^attempt seconds)

// All retries exhausted
if on_failure == "skip":
    store null output, continue workflow
elif on_failure == "abort":
    stop entire workflow, report failure
elif on_failure starts with "fallback:":
    deploy fallback agent with same input
当Agent执行失败时:
attempt = 1
while attempt <= max_attempts:
    result = deploy_agent(brief)
    if result.success:
        return result
    attempt += 1
    if backoff == "linear":
        wait(attempt * 5 seconds)  // 概念性代码,实际实现使用重新部署计时
    elif backoff == "exponential":
        wait(2^attempt seconds)

// 所有重试次数耗尽
if on_failure == "skip":
    存储空输出,继续工作流
elif on_failure == "abort":
    终止整个工作流,报告失败
elif on_failure以"fallback:"开头:
    使用相同输入部署降级Agent

Timeout Handling

超时处理

Three levels of timeout:
  1. Global timeout -- If the entire workflow exceeds this, abort all running agents and report partial results
  2. Per-agent timeout -- If a single agent exceeds this, trigger its retry/failure policy
  3. Step timeout -- For parallel steps, the timeout applies to the wait policy (e.g., if wait=all and one agent times out, apply that agent's failure policy)
三级超时机制:
  1. 全局超时 -- 若整个工作流超时,终止所有运行中的Agent并返回部分结果
  2. 单Agent超时 -- 若单个Agent超时,触发其重试/失败策略
  3. 步骤超时 -- 针对并行步骤,超时规则适用于等待策略(例如:wait=all时若某个Agent超时,执行该Agent的失败策略)

Result Validation

结果验证

After each agent completes, validate its output:
  1. Schema check -- If a JSON schema is provided, validate the output structure
  2. Rule check -- Evaluate each plain-English rule against the output:
    • "Must include X field" -- Check field presence
    • "Must identify exactly N items" -- Check array length
    • "Score must be between 0 and 100" -- Check value range
  3. Type check -- Verify output format matches the expected format (json, text, markdown)
If validation fails, the output is treated as a failure and triggers the retry/failure policy.
每个Agent执行完成后,验证其输出:
  1. Schema检查 -- 若提供了JSON Schema,验证输出结构
  2. 规则检查 -- 针对输出评估每条自然语言规则:
    • "必须包含X字段" -- 检查字段是否存在
    • "必须识别出恰好N个条目" -- 检查数组长度
    • "分数必须在0-100之间" -- 检查值范围
  3. 类型检查 -- 验证输出格式与预期格式匹配(json、text、markdown)
若验证失败,输出将被视为执行失败并触发重试/失败策略。

Orchestrator Commands

编排器命令

The user can invoke the orchestrator in several ways:
用户可通过多种方式调用编排器:

Run a Workflow File

运行工作流文件

"Run the workflow at ./workflows/lead-scoring.yaml"
  1. Read the YAML file
  2. Prompt the user for any required inputs
  3. Execute the workflow
  4. Return results
"运行./workflows/lead-scoring.yaml路径下的工作流"
  1. 读取YAML文件
  2. 提示用户提供所有必填输入
  3. 执行工作流
  4. 返回结果

Define and Run Inline

内联定义并运行

"Set up a 3-step workflow: first research the company, then identify pain points, then draft an email"
  1. Convert the natural language description into a workflow YAML
  2. Show the YAML to the user for approval
  3. Execute upon confirmation
"创建一个3步工作流:先调研公司,然后识别痛点,最后起草邮件"
  1. 将自然语言描述转换为工作流YAML
  2. 向用户展示YAML并确认
  3. 确认后执行

Dry Run

试运行

"Dry run the proposal workflow for Acme Corp"
  1. Parse and validate the workflow
  2. Resolve all inputs
  3. Show the execution plan (which agents run in what order, expected inputs/outputs)
  4. Do NOT actually deploy any agents
"为Acme Corp试运行提案工作流"
  1. 解析并验证工作流
  2. 解析所有输入
  3. 展示执行计划(哪些Agent按什么顺序运行,预期输入/输出)
  4. 不实际部署任何Agent

Inspect Workflow

检查工作流

"Explain the lead-scoring workflow"
  1. Parse the YAML
  2. Generate a human-readable description of each step
  3. Draw the execution graph (text-based)
  4. List all agents and their roles
"解释线索打分工作流"
  1. 解析YAML
  2. 生成每个步骤的易读描述
  3. 绘制文本格式的执行图谱
  4. 列出所有Agent及其角色

Visual Workflow Representation

可视化工作流展示

When presenting workflows to users, render them as text-based diagrams:
向用户展示工作流时,渲染为文本格式的流程图:

Sequential

链式执行

[1. Researcher] --> [2. Pain Identifier] --> [3. Pricing] --> [4. Writer] --> [5. Reviewer]
[1. 研究员] --> [2. 痛点识别师] --> [3. 定价师] --> [4. 撰写师] --> [5. 审核师]

Parallel Fan-Out/Fan-In

并行分发/聚合

                    /--> [2a. Firmographic Scorer] --\
[1. Prepare Data] -+--> [2b. Technographic Scorer] --+--> [3. Aggregator]
                    \--> [2c. Intent Scorer] --------/
                    /--> [2a. 企业属性打分师] --\
[1. 准备数据] -+--> [2b. 技术属性打分师] --+--> [3. 分数聚合师]
                    \--> [2c. 购买意向打分师] --------/

Conditional

条件路由

                        /--> [3a. Hot Lead Handler] (score >= 80)
[1. Research] --> [2. Score] --+--> [3b. Warm Lead Handler] (40 <= score < 80)
                        \--> [3c. Cold Lead Handler] (score < 40)
                        /--> [3a. 高价值线索处理师](分数>=80)
[1. 调研] --> [2. 打分] --+--> [3b. 中价值线索处理师](40<=分数<80)
                        \--> [3c. 低价值线索处理师](分数<40)

Loop

循环

[1. Writer] --> [2. Reviewer] --PASS--> [3. Deliver]
     ^                |
     |              FAIL
     \--- feedback ---/
     (max 3 iterations)
[1. 撰写师] --> [2. 审核师] --通过--> [3. 交付]
     ^                |
     |              失败
     \--- 反馈 ---/
     (最多3次迭代)

Built-In Workflow Templates

内置工作流模板

Template: Content Pipeline

模板:内容流水线

yaml
undefined
yaml
undefined

Research -> Draft -> Review -> Publish

调研 -> 起草 -> 审核 -> 发布

agents: [researcher, writer, editor, publisher] pattern: sequential with review loop use_when: "Creating any content that needs research and quality review"
undefined
agents: [研究员, 撰写师, 编辑, 发布师] pattern: 带审核循环的链式执行 use_when: "创建需要调研和质量审核的内容"
undefined

Template: Multi-Angle Analysis

模板:多维度分析

yaml
undefined
yaml
undefined

Fan-out to N analysts -> Aggregate

分发至N个分析师 -> 聚合

agents: [N domain analysts, aggregator] pattern: parallel fan-out/fan-in use_when: "Need multiple perspectives on a single topic"
undefined
agents: [N个领域分析师, 聚合师] pattern: 并行分发/聚合 use_when: "需要针对单一主题获取多个视角"
undefined

Template: Qualify-and-Route

模板:分类与路由

yaml
undefined
yaml
undefined

Classify -> Route to specialist

分类 -> 路由至专家

agents: [classifier, specialist_a, specialist_b, specialist_c] pattern: conditional routing use_when: "Different inputs need different handling"
undefined
agents: [分类师, 专家A, 专家B, 专家C] pattern: 条件路由 use_when: "不同输入需要不同处理方式"
undefined

Template: Iterative Refinement

模板:迭代优化

yaml
undefined
yaml
undefined

Generate -> Validate -> Refine (loop) -> Deliver

生成 -> 验证 -> 优化(循环)-> 交付

agents: [generator, validator] pattern: loop with validation use_when: "Output must meet a quality bar and may need multiple passes"
undefined
agents: [生成师, 验证师] pattern: 带验证的循环 use_when: "输出必须达到质量标准,可能需要多次迭代"
undefined

Template: Map-Reduce

模板:映射-归约

yaml
undefined
yaml
undefined

Split -> Process each -> Combine

拆分 -> 处理每个块 -> 合并

agents: [splitter, processor, reducer] pattern: map-reduce use_when: "Large input needs to be chunked, processed, and recombined"
undefined
agents: [拆分器, 处理器, 归约器] pattern: 映射-归约 use_when: "大输入需要拆分、处理后重新合并"
undefined

Error Handling and Edge Cases

错误处理与边缘场景

ScenarioHandling
Required input missingPrompt user before starting
Agent returns empty outputTreat as failure, trigger retry/fallback
Circular dependency in stepsReject workflow at parse time with clear error
Parallel step with one failureApply that agent's failure policy; others continue
Loop exceeds max_iterationsExit loop, store last output, flag in report
Global timeout reachedAbort all running agents, collect partial results, report
Workflow YAML syntax errorReport the error line and suggest correction
Agent references undefined variableReport at parse time, not runtime
Conditional eval is ambiguousDefault to false branch, log warning
Fallback agent also failsTreat as abort for that step
场景处理方式
必填输入缺失启动前提示用户补充
Agent返回空输出视为执行失败,触发重试/降级策略
步骤存在循环依赖解析阶段拒绝工作流并给出明确错误
并行步骤中单个Agent失败执行该Agent的失败策略;其他Agent继续运行
循环超过最大迭代次数退出循环,存储最后一次输出,在报告中标记
全局超时终止所有运行中的Agent,收集部分结果并报告
工作流YAML语法错误报告错误行并给出修正建议
Agent引用未定义变量解析阶段报告错误,而非运行时
条件表达式模糊默认执行假分支,记录警告
降级Agent也执行失败视为该步骤执行失败并终止

Reporting

报告

After workflow completion, present:
undefined
工作流完成后,生成以下报告:
undefined

Workflow Execution Report: [workflow name]

工作流执行报告:[工作流名称]

Execution Summary

执行摘要

  • Status: COMPLETE / PARTIAL / FAILED
  • Total steps: N
  • Steps completed: N
  • Steps failed: N
  • Steps skipped: N
  • Total agents deployed: N
  • Total time: Xm Ys
  • Retries used: N
  • 状态:完成 / 部分完成 / 失败
  • 总步骤数:N
  • 已完成步骤数:N
  • 失败步骤数:N
  • 跳过步骤数:N
  • 已部署Agent总数:N
  • 总耗时:X分Y秒
  • 重试次数:N

Step-by-Step Results

分步结果

StepAgentStatusDurationRetriesOutput Size
1researcherSUCCESS45s02.1KB
2pain_identifierSUCCESS30s01.4KB
3pricingSUCCESS35s11.8KB
4writerSUCCESS90s08.2KB
5reviewerSUCCESS40s00.9KB
步骤Agent状态耗时重试次数输出大小
1researcher成功45秒02.1KB
2pain_identifier成功30秒01.4KB
3pricing成功35秒11.8KB
4writer成功90秒08.2KB
5reviewer成功40秒00.9KB

Final Output

最终输出

[The workflow's final deliverable]
[工作流的最终交付物]

Issues and Warnings

问题与警告

  • [Any retries, fallbacks, or validation warnings]
  • [Agent notes or flags]
undefined
  • [所有重试、降级或验证警告]
  • [Agent备注或标记]
undefined