ai-building-pipelines

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Build a Multi-Step AI Pipeline

构建多步骤AI流水线

Guide the user through breaking a complex AI task into multiple steps that feed into each other. One prompt can't do everything — compound AI systems dramatically outperform single calls by decomposing problems.
引导用户将复杂的AI任务拆分为多个相互关联的步骤。单个提示词无法完成所有任务——复合AI系统通过分解问题,性能远超单次调用。

Step 1: Understand the pipeline

步骤1:理解流水线

Ask the user:
  1. What's the end-to-end task? (e.g., "read a support ticket, classify it, draft a response")
  2. What are the natural stages? (classification, retrieval, generation, verification?)
  3. Does any step need special tools? (search, database, calculator?)
  4. Does data flow linearly, or do steps branch/loop?
询问用户:
  1. 端到端的任务是什么?(例如:"读取支持工单、分类、草拟回复")
  2. 自然的阶段有哪些?(分类、检索、生成、验证?)
  3. 是否有步骤需要特殊工具?(搜索、数据库、计算器?)
  4. 数据是线性流动,还是步骤会分支/循环?

Step 2: Design the stages

步骤2:设计阶段

The core pattern — compose DSPy modules

核心模式——组合DSPy模块

Every stage is a DSPy module. Wire them together in
forward()
:
python
import dspy

class SupportPipeline(dspy.Module):
    def __init__(self):
        self.classify = dspy.ChainOfThought(ClassifyTicket)
        self.retrieve = dspy.Retrieve(k=3)
        self.draft = dspy.ChainOfThought(DraftResponse)

    def forward(self, ticket):
        # Stage 1: Classify
        classification = self.classify(ticket=ticket)

        # Stage 2: Retrieve relevant docs
        docs = self.retrieve(classification.category + " " + ticket).passages

        # Stage 3: Draft response using classification + docs
        return self.draft(
            ticket=ticket,
            category=classification.category,
            context=docs,
        )
Each stage has its own signature:
python
from typing import Literal

CATEGORIES = ["billing", "technical", "account", "general"]

class ClassifyTicket(dspy.Signature):
    """Classify the support ticket."""
    ticket: str = dspy.InputField()
    category: Literal[tuple(CATEGORIES)] = dspy.OutputField()

class DraftResponse(dspy.Signature):
    """Draft a helpful response to the support ticket."""
    ticket: str = dspy.InputField()
    category: str = dspy.InputField()
    context: list[str] = dspy.InputField(desc="Relevant help articles")
    response: str = dspy.OutputField(desc="Professional support response")
每个阶段都是一个DSPy模块。在
forward()
方法中连接它们:
python
import dspy

class SupportPipeline(dspy.Module):
    def __init__(self):
        self.classify = dspy.ChainOfThought(ClassifyTicket)
        self.retrieve = dspy.Retrieve(k=3)
        self.draft = dspy.ChainOfThought(DraftResponse)

    def forward(self, ticket):
        # Stage 1: Classify
        classification = self.classify(ticket=ticket)

        # Stage 2: Retrieve relevant docs
        docs = self.retrieve(classification.category + " " + ticket).passages

        # Stage 3: Draft response using classification + docs
        return self.draft(
            ticket=ticket,
            category=classification.category,
            context=docs,
        )
每个阶段都有自己的签名:
python
from typing import Literal

CATEGORIES = ["billing", "technical", "account", "general"]

class ClassifyTicket(dspy.Signature):
    """Classify the support ticket."""
    ticket: str = dspy.InputField()
    category: Literal[tuple(CATEGORIES)] = dspy.OutputField()

class DraftResponse(dspy.Signature):
    """Draft a helpful response to the support ticket."""
    ticket: str = dspy.InputField()
    category: str = dspy.InputField()
    context: list[str] = dspy.InputField(desc="Relevant help articles")
    response: str = dspy.OutputField(desc="Professional support response")

Step 3: Common pipeline patterns

步骤3:常见流水线模式

Classify → Route → Specialize

分类→路由→专业化

Different categories get different handling:
python
class RoutedPipeline(dspy.Module):
    def __init__(self):
        self.classify = dspy.ChainOfThought(ClassifyInput)
        self.handlers = {
            "simple": dspy.Predict(SimpleAnswer),
            "complex": dspy.ChainOfThought(DetailedAnswer),
            "research": dspy.ChainOfThought(ResearchAnswer),
        }

    def forward(self, question):
        category = self.classify(question=question).category
        handler = self.handlers.get(category, self.handlers["simple"])
        return handler(question=question)
不同分类采用不同处理方式:
python
class RoutedPipeline(dspy.Module):
    def __init__(self):
        self.classify = dspy.ChainOfThought(ClassifyInput)
        self.handlers = {
            "simple": dspy.Predict(SimpleAnswer),
            "complex": dspy.ChainOfThought(DetailedAnswer),
            "research": dspy.ChainOfThought(ResearchAnswer),
        }

    def forward(self, question):
        category = self.classify(question=question).category
        handler = self.handlers.get(category, self.handlers["simple"])
        return handler(question=question)

Generate → Verify → Refine

生成→验证→优化

Generate a first draft, check it, then improve:
python
class GenerateAndRefine(dspy.Module):
    def __init__(self):
        self.generate = dspy.ChainOfThought(GenerateDraft)
        self.verify = dspy.ChainOfThought(CheckQuality)
        self.refine = dspy.ChainOfThought(ImproveDraft)

    def forward(self, task):
        # Stage 1: Generate
        draft = self.generate(task=task)

        # Stage 2: Verify
        check = self.verify(task=task, draft=draft.output)

        # Stage 3: Refine if needed
        if not check.is_good:
            refined = self.refine(
                task=task,
                draft=draft.output,
                feedback=check.feedback,
            )
            return refined

        return draft
生成初稿,检查后再改进:
python
class GenerateAndRefine(dspy.Module):
    def __init__(self):
        self.generate = dspy.ChainOfThought(GenerateDraft)
        self.verify = dspy.ChainOfThought(CheckQuality)
        self.refine = dspy.ChainOfThought(ImproveDraft)

    def forward(self, task):
        # Stage 1: Generate
        draft = self.generate(task=task)

        # Stage 2: Verify
        check = self.verify(task=task, draft=draft.output)

        # Stage 3: Refine if needed
        if not check.is_good:
            refined = self.refine(
                task=task,
                draft=draft.output,
                feedback=check.feedback,
            )
            return refined

        return draft

Ensemble — ask multiple times, pick the best

集成——多次生成,择优选择

Generate several candidates and select the best one (the pattern behind AlphaCode and Medprompt):
python
class EnsemblePipeline(dspy.Module):
    def __init__(self, num_candidates=5):
        self.generators = [dspy.ChainOfThought(GenerateAnswer) for _ in range(num_candidates)]
        self.judge = dspy.ChainOfThought(PickBestAnswer)

    def forward(self, question):
        # Stage 1: Generate multiple candidates
        candidates = []
        for gen in self.generators:
            result = gen(question=question)
            candidates.append(result.answer)

        # Stage 2: Pick the best
        return self.judge(
            question=question,
            candidates=candidates,
        )

class PickBestAnswer(dspy.Signature):
    """Pick the best answer from the candidates."""
    question: str = dspy.InputField()
    candidates: list[str] = dspy.InputField(desc="Multiple answer candidates")
    best_answer: str = dspy.OutputField(desc="The most accurate and complete answer")
    reasoning: str = dspy.OutputField(desc="Why this answer was chosen")
生成多个候选结果,然后选择最佳的(AlphaCode和Medprompt背后的模式):
python
class EnsemblePipeline(dspy.Module):
    def __init__(self, num_candidates=5):
        self.generators = [dspy.ChainOfThought(GenerateAnswer) for _ in range(num_candidates)]
        self.judge = dspy.ChainOfThought(PickBestAnswer)

    def forward(self, question):
        # Stage 1: Generate multiple candidates
        candidates = []
        for gen in self.generators:
            result = gen(question=question)
            candidates.append(result.answer)

        # Stage 2: Pick the best
        return self.judge(
            question=question,
            candidates=candidates,
        )

class PickBestAnswer(dspy.Signature):
    """Pick the best answer from the candidates."""
    question: str = dspy.InputField()
    candidates: list[str] = dspy.InputField(desc="Multiple answer candidates")
    best_answer: str = dspy.OutputField(desc="The most accurate and complete answer")
    reasoning: str = dspy.OutputField(desc="Why this answer was chosen")

Parallel fan-out → merge

并行分支→合并

Process different aspects independently, then combine:
python
class ParallelAnalysis(dspy.Module):
    def __init__(self):
        self.sentiment = dspy.ChainOfThought(AnalyzeSentiment)
        self.topics = dspy.ChainOfThought(ExtractTopics)
        self.entities = dspy.ChainOfThought(ExtractEntities)
        self.summarize = dspy.ChainOfThought(CombineAnalysis)

    def forward(self, text):
        # Fan out — run in parallel (DSPy can parallelize these)
        sent = self.sentiment(text=text)
        topics = self.topics(text=text)
        entities = self.entities(text=text)

        # Merge results
        return self.summarize(
            text=text,
            sentiment=sent.sentiment,
            topics=topics.topics,
            entities=entities.entities,
        )
独立处理不同方面,然后合并结果:
python
class ParallelAnalysis(dspy.Module):
    def __init__(self):
        self.sentiment = dspy.ChainOfThought(AnalyzeSentiment)
        self.topics = dspy.ChainOfThought(ExtractTopics)
        self.entities = dspy.ChainOfThought(ExtractEntities)
        self.summarize = dspy.ChainOfThought(CombineAnalysis)

    def forward(self, text):
        # Fan out — run in parallel (DSPy can parallelize these)
        sent = self.sentiment(text=text)
        topics = self.topics(text=text)
        entities = self.entities(text=text)

        # Merge results
        return self.summarize(
            text=text,
            sentiment=sent.sentiment,
            topics=topics.topics,
            entities=entities.entities,
        )

Loop — iterative refinement

循环——迭代优化

Keep improving until a condition is met:
python
class IterativeRefiner(dspy.Module):
    def __init__(self, max_iterations=3):
        self.generate = dspy.ChainOfThought(GenerateDraft)
        self.evaluate = dspy.ChainOfThought(EvaluateDraft)
        self.improve = dspy.ChainOfThought(ImproveDraft)
        self.max_iterations = max_iterations

    def forward(self, task):
        draft = self.generate(task=task)

        for i in range(self.max_iterations):
            evaluation = self.evaluate(task=task, draft=draft.output)
            if evaluation.score >= 0.9:
                break
            draft = self.improve(
                task=task,
                draft=draft.output,
                feedback=evaluation.feedback,
            )

        return draft
持续改进直到满足条件:
python
class IterativeRefiner(dspy.Module):
    def __init__(self, max_iterations=3):
        self.generate = dspy.ChainOfThought(GenerateDraft)
        self.evaluate = dspy.ChainOfThought(EvaluateDraft)
        self.improve = dspy.ChainOfThought(ImproveDraft)
        self.max_iterations = max_iterations

    def forward(self, task):
        draft = self.generate(task=task)

        for i in range(self.max_iterations):
            evaluation = self.evaluate(task=task, draft=draft.output)
            if evaluation.score >= 0.9:
                break
            draft = self.improve(
                task=task,
                draft=draft.output,
                feedback=evaluation.feedback,
            )

        return draft

Step 4: Use different models per stage

步骤4:为不同阶段使用不同模型

Not every stage needs the same model. Use cheap models for simple steps:
python
expensive_lm = dspy.LM("openai/gpt-4o")
cheap_lm = dspy.LM("openai/gpt-4o-mini")

pipeline = SupportPipeline()
并非每个阶段都需要相同的模型。简单步骤使用低成本模型:
python
expensive_lm = dspy.LM("openai/gpt-4o")
cheap_lm = dspy.LM("openai/gpt-4o-mini")

pipeline = SupportPipeline()

Cheap model for classification (simple task)

Cheap model for classification (simple task)

pipeline.classify.lm = cheap_lm
pipeline.classify.lm = cheap_lm

Expensive model for drafting (needs quality)

Expensive model for drafting (needs quality)

pipeline.draft.lm = expensive_lm

See `/ai-cutting-costs` for more cost optimization strategies.
pipeline.draft.lm = expensive_lm

查看`/ai-cutting-costs`获取更多成本优化策略。

Step 5: Test and optimize the full pipeline

步骤5:测试并优化整个流水线

The beauty of DSPy pipelines: you optimize the whole thing end-to-end, not each step separately.
python
def pipeline_metric(example, prediction, trace=None):
    # Score the final output quality
    return prediction.response.lower().strip() == example.response.lower().strip()
DSPy流水线的优势:你可以端到端优化整个流水线,而非单独优化每个步骤。
python
def pipeline_metric(example, prediction, trace=None):
    # Score the final output quality
    return prediction.response.lower().strip() == example.response.lower().strip()

Optimizes prompts for ALL stages together

Optimizes prompts for ALL stages together

optimizer = dspy.MIPROv2(metric=pipeline_metric, auto="medium") optimized = optimizer.compile(pipeline, trainset=trainset)
undefined
optimizer = dspy.MIPROv2(metric=pipeline_metric, auto="medium") optimized = optimizer.compile(pipeline, trainset=trainset)
undefined

Key patterns

关键模式

  • Decompose the problem — if a task has distinct phases (understand, retrieve, generate, verify), make each one a module
  • Each stage gets its own signature — clear inputs and outputs make the pipeline debuggable
  • Wire in
    forward()
    — the
    forward
    method is your orchestration logic
  • Optimize end-to-end — DSPy optimizers tune all stages together to maximize the final metric
  • Debug stage by stage — use
    dspy.inspect_history()
    to see what each step did
  • Assign models per stage — cheap models for simple tasks, expensive for complex ones
  • 分解问题——如果任务有不同的阶段(理解、检索、生成、验证),将每个阶段设为一个模块
  • 每个阶段有独立的签名——清晰的输入和输出让流水线更易于调试
  • forward()
    中连接
    ——
    forward
    方法是你的编排逻辑
  • 端到端优化——DSPy优化器会协同调整所有阶段,以最大化最终指标
  • 分阶段调试——使用
    dspy.inspect_history()
    查看每个步骤的执行情况
  • 为阶段分配对应模型——简单任务用低成本模型,复杂任务用高成本模型

When to use LangGraph instead

何时改用LangGraph

DSPy pipelines are great for stateless, linear-ish flows. But some problems need more:
If your pipeline...Use
Steps run in a fixed orderDSPy pipeline (this skill)
Steps branch based on resultsDSPy pipeline with
if/else
in
forward()
Needs cycles (retry loops, agent loops)LangGraph
StateGraph
with DSPy modules as nodes
Needs persistent state across callsLangGraph with checkpointing
Needs human approval mid-pipelineLangGraph
interrupt_before
Coordinates multiple independent agentsLangGraph supervisor pattern
DSPy流水线非常适合无状态、近似线性的流程。但有些问题需要更多功能:
如果你的流水线...使用
步骤按固定顺序执行DSPy流水线(本技能)
步骤根据结果分支
forward()
中使用
if/else
的DSPy流水线
需要循环(重试循环、代理循环)LangGraph
StateGraph
,将DSPy模块作为节点
需要跨调用的持久化状态带检查点的LangGraph
流水线中间需要人工审批
interrupt_before
LangGraph
协调多个独立代理LangGraph 监督模式

Quick example: DSPy module as a LangGraph node

快速示例:DSPy模块作为LangGraph节点

python
import dspy
from langgraph.graph import StateGraph, START, END
from typing import TypedDict

class PipelineState(TypedDict):
    input_text: str
    category: str
    output: str
python
import dspy
from langgraph.graph import StateGraph, START, END
from typing import TypedDict

class PipelineState(TypedDict):
    input_text: str
    category: str
    output: str

DSPy modules

DSPy modules

classifier = dspy.ChainOfThought("text -> category") generator = dspy.ChainOfThought("text, category -> output")
classifier = dspy.ChainOfThought("text -> category") generator = dspy.ChainOfThought("text, category -> output")

Wrap as LangGraph nodes

Wrap as LangGraph nodes

def classify_node(state: PipelineState) -> dict: result = classifier(text=state["input_text"]) return {"category": result.category}
def generate_node(state: PipelineState) -> dict: result = generator(text=state["input_text"], category=state["category"]) return {"output": result.output}
def classify_node(state: PipelineState) -> dict: result = classifier(text=state["input_text"]) return {"category": result.category}
def generate_node(state: PipelineState) -> dict: result = generator(text=state["input_text"], category=state["category"]) return {"output": result.output}

Build graph

Build graph

graph = StateGraph(PipelineState) graph.add_node("classify", classify_node) graph.add_node("generate", generate_node) graph.add_edge(START, "classify") graph.add_edge("classify", "generate") graph.add_edge("generate", END) app = graph.compile()

This gives you LangGraph's state management and routing with DSPy's optimizable prompts. For more, see `/ai-building-chatbots` (stateful conversations) and `/ai-coordinating-agents` (multi-agent systems). For the full LangGraph API reference, see [`docs/langchain-langgraph-reference.md`](../../docs/langchain-langgraph-reference.md).
graph = StateGraph(PipelineState) graph.add_node("classify", classify_node) graph.add_node("generate", generate_node) graph.add_edge(START, "classify") graph.add_edge("classify", "generate") graph.add_edge("generate", END) app = graph.compile()

这让你既能利用LangGraph的状态管理和路由功能,又能使用DSPy的可优化提示词。更多内容请查看`/ai-building-chatbots`(有状态对话)和`/ai-coordinating-agents`(多代理系统)。完整的LangGraph API参考请查看[`docs/langchain-langgraph-reference.md`](../../docs/langchain-langgraph-reference.md)。

Additional resources

额外资源

  • Use
    /ai-checking-outputs
    to add verification and guardrails between stages
  • Use
    /ai-cutting-costs
    to assign different models per stage
  • Not sure what stages your pipeline needs? Use
    /ai-decomposing-tasks
    to identify where to split
  • For content generation pipelines, see
    /ai-writing-content
    . For complex reasoning, see
    /ai-reasoning
  • Next:
    /ai-improving-accuracy
    to measure and improve your pipeline
  • 使用
    /ai-checking-outputs
    在阶段之间添加验证和防护措施
  • 使用
    /ai-cutting-costs
    为不同阶段分配不同模型
  • 不确定流水线需要哪些阶段?使用
    /ai-decomposing-tasks
    确定拆分点
  • 内容生成流水线请查看
    /ai-writing-content
    。复杂推理请查看
    /ai-reasoning
  • 下一步:
    /ai-improving-accuracy
    以衡量并提升流水线性能