ai-building-pipelines
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseBuild a Multi-Step AI Pipeline
构建多步骤AI流水线
Guide the user through breaking a complex AI task into multiple steps that feed into each other. One prompt can't do everything — compound AI systems dramatically outperform single calls by decomposing problems.
引导用户将复杂的AI任务拆分为多个相互关联的步骤。单个提示词无法完成所有任务——复合AI系统通过分解问题,性能远超单次调用。
Step 1: Understand the pipeline
步骤1:理解流水线
Ask the user:
- What's the end-to-end task? (e.g., "read a support ticket, classify it, draft a response")
- What are the natural stages? (classification, retrieval, generation, verification?)
- Does any step need special tools? (search, database, calculator?)
- Does data flow linearly, or do steps branch/loop?
询问用户:
- 端到端的任务是什么?(例如:"读取支持工单、分类、草拟回复")
- 自然的阶段有哪些?(分类、检索、生成、验证?)
- 是否有步骤需要特殊工具?(搜索、数据库、计算器?)
- 数据是线性流动,还是步骤会分支/循环?
Step 2: Design the stages
步骤2:设计阶段
The core pattern — compose DSPy modules
核心模式——组合DSPy模块
Every stage is a DSPy module. Wire them together in :
forward()python
import dspy
class SupportPipeline(dspy.Module):
def __init__(self):
self.classify = dspy.ChainOfThought(ClassifyTicket)
self.retrieve = dspy.Retrieve(k=3)
self.draft = dspy.ChainOfThought(DraftResponse)
def forward(self, ticket):
# Stage 1: Classify
classification = self.classify(ticket=ticket)
# Stage 2: Retrieve relevant docs
docs = self.retrieve(classification.category + " " + ticket).passages
# Stage 3: Draft response using classification + docs
return self.draft(
ticket=ticket,
category=classification.category,
context=docs,
)Each stage has its own signature:
python
from typing import Literal
CATEGORIES = ["billing", "technical", "account", "general"]
class ClassifyTicket(dspy.Signature):
"""Classify the support ticket."""
ticket: str = dspy.InputField()
category: Literal[tuple(CATEGORIES)] = dspy.OutputField()
class DraftResponse(dspy.Signature):
"""Draft a helpful response to the support ticket."""
ticket: str = dspy.InputField()
category: str = dspy.InputField()
context: list[str] = dspy.InputField(desc="Relevant help articles")
response: str = dspy.OutputField(desc="Professional support response")每个阶段都是一个DSPy模块。在方法中连接它们:
forward()python
import dspy
class SupportPipeline(dspy.Module):
def __init__(self):
self.classify = dspy.ChainOfThought(ClassifyTicket)
self.retrieve = dspy.Retrieve(k=3)
self.draft = dspy.ChainOfThought(DraftResponse)
def forward(self, ticket):
# Stage 1: Classify
classification = self.classify(ticket=ticket)
# Stage 2: Retrieve relevant docs
docs = self.retrieve(classification.category + " " + ticket).passages
# Stage 3: Draft response using classification + docs
return self.draft(
ticket=ticket,
category=classification.category,
context=docs,
)每个阶段都有自己的签名:
python
from typing import Literal
CATEGORIES = ["billing", "technical", "account", "general"]
class ClassifyTicket(dspy.Signature):
"""Classify the support ticket."""
ticket: str = dspy.InputField()
category: Literal[tuple(CATEGORIES)] = dspy.OutputField()
class DraftResponse(dspy.Signature):
"""Draft a helpful response to the support ticket."""
ticket: str = dspy.InputField()
category: str = dspy.InputField()
context: list[str] = dspy.InputField(desc="Relevant help articles")
response: str = dspy.OutputField(desc="Professional support response")Step 3: Common pipeline patterns
步骤3:常见流水线模式
Classify → Route → Specialize
分类→路由→专业化
Different categories get different handling:
python
class RoutedPipeline(dspy.Module):
def __init__(self):
self.classify = dspy.ChainOfThought(ClassifyInput)
self.handlers = {
"simple": dspy.Predict(SimpleAnswer),
"complex": dspy.ChainOfThought(DetailedAnswer),
"research": dspy.ChainOfThought(ResearchAnswer),
}
def forward(self, question):
category = self.classify(question=question).category
handler = self.handlers.get(category, self.handlers["simple"])
return handler(question=question)不同分类采用不同处理方式:
python
class RoutedPipeline(dspy.Module):
def __init__(self):
self.classify = dspy.ChainOfThought(ClassifyInput)
self.handlers = {
"simple": dspy.Predict(SimpleAnswer),
"complex": dspy.ChainOfThought(DetailedAnswer),
"research": dspy.ChainOfThought(ResearchAnswer),
}
def forward(self, question):
category = self.classify(question=question).category
handler = self.handlers.get(category, self.handlers["simple"])
return handler(question=question)Generate → Verify → Refine
生成→验证→优化
Generate a first draft, check it, then improve:
python
class GenerateAndRefine(dspy.Module):
def __init__(self):
self.generate = dspy.ChainOfThought(GenerateDraft)
self.verify = dspy.ChainOfThought(CheckQuality)
self.refine = dspy.ChainOfThought(ImproveDraft)
def forward(self, task):
# Stage 1: Generate
draft = self.generate(task=task)
# Stage 2: Verify
check = self.verify(task=task, draft=draft.output)
# Stage 3: Refine if needed
if not check.is_good:
refined = self.refine(
task=task,
draft=draft.output,
feedback=check.feedback,
)
return refined
return draft生成初稿,检查后再改进:
python
class GenerateAndRefine(dspy.Module):
def __init__(self):
self.generate = dspy.ChainOfThought(GenerateDraft)
self.verify = dspy.ChainOfThought(CheckQuality)
self.refine = dspy.ChainOfThought(ImproveDraft)
def forward(self, task):
# Stage 1: Generate
draft = self.generate(task=task)
# Stage 2: Verify
check = self.verify(task=task, draft=draft.output)
# Stage 3: Refine if needed
if not check.is_good:
refined = self.refine(
task=task,
draft=draft.output,
feedback=check.feedback,
)
return refined
return draftEnsemble — ask multiple times, pick the best
集成——多次生成,择优选择
Generate several candidates and select the best one (the pattern behind AlphaCode and Medprompt):
python
class EnsemblePipeline(dspy.Module):
def __init__(self, num_candidates=5):
self.generators = [dspy.ChainOfThought(GenerateAnswer) for _ in range(num_candidates)]
self.judge = dspy.ChainOfThought(PickBestAnswer)
def forward(self, question):
# Stage 1: Generate multiple candidates
candidates = []
for gen in self.generators:
result = gen(question=question)
candidates.append(result.answer)
# Stage 2: Pick the best
return self.judge(
question=question,
candidates=candidates,
)
class PickBestAnswer(dspy.Signature):
"""Pick the best answer from the candidates."""
question: str = dspy.InputField()
candidates: list[str] = dspy.InputField(desc="Multiple answer candidates")
best_answer: str = dspy.OutputField(desc="The most accurate and complete answer")
reasoning: str = dspy.OutputField(desc="Why this answer was chosen")生成多个候选结果,然后选择最佳的(AlphaCode和Medprompt背后的模式):
python
class EnsemblePipeline(dspy.Module):
def __init__(self, num_candidates=5):
self.generators = [dspy.ChainOfThought(GenerateAnswer) for _ in range(num_candidates)]
self.judge = dspy.ChainOfThought(PickBestAnswer)
def forward(self, question):
# Stage 1: Generate multiple candidates
candidates = []
for gen in self.generators:
result = gen(question=question)
candidates.append(result.answer)
# Stage 2: Pick the best
return self.judge(
question=question,
candidates=candidates,
)
class PickBestAnswer(dspy.Signature):
"""Pick the best answer from the candidates."""
question: str = dspy.InputField()
candidates: list[str] = dspy.InputField(desc="Multiple answer candidates")
best_answer: str = dspy.OutputField(desc="The most accurate and complete answer")
reasoning: str = dspy.OutputField(desc="Why this answer was chosen")Parallel fan-out → merge
并行分支→合并
Process different aspects independently, then combine:
python
class ParallelAnalysis(dspy.Module):
def __init__(self):
self.sentiment = dspy.ChainOfThought(AnalyzeSentiment)
self.topics = dspy.ChainOfThought(ExtractTopics)
self.entities = dspy.ChainOfThought(ExtractEntities)
self.summarize = dspy.ChainOfThought(CombineAnalysis)
def forward(self, text):
# Fan out — run in parallel (DSPy can parallelize these)
sent = self.sentiment(text=text)
topics = self.topics(text=text)
entities = self.entities(text=text)
# Merge results
return self.summarize(
text=text,
sentiment=sent.sentiment,
topics=topics.topics,
entities=entities.entities,
)独立处理不同方面,然后合并结果:
python
class ParallelAnalysis(dspy.Module):
def __init__(self):
self.sentiment = dspy.ChainOfThought(AnalyzeSentiment)
self.topics = dspy.ChainOfThought(ExtractTopics)
self.entities = dspy.ChainOfThought(ExtractEntities)
self.summarize = dspy.ChainOfThought(CombineAnalysis)
def forward(self, text):
# Fan out — run in parallel (DSPy can parallelize these)
sent = self.sentiment(text=text)
topics = self.topics(text=text)
entities = self.entities(text=text)
# Merge results
return self.summarize(
text=text,
sentiment=sent.sentiment,
topics=topics.topics,
entities=entities.entities,
)Loop — iterative refinement
循环——迭代优化
Keep improving until a condition is met:
python
class IterativeRefiner(dspy.Module):
def __init__(self, max_iterations=3):
self.generate = dspy.ChainOfThought(GenerateDraft)
self.evaluate = dspy.ChainOfThought(EvaluateDraft)
self.improve = dspy.ChainOfThought(ImproveDraft)
self.max_iterations = max_iterations
def forward(self, task):
draft = self.generate(task=task)
for i in range(self.max_iterations):
evaluation = self.evaluate(task=task, draft=draft.output)
if evaluation.score >= 0.9:
break
draft = self.improve(
task=task,
draft=draft.output,
feedback=evaluation.feedback,
)
return draft持续改进直到满足条件:
python
class IterativeRefiner(dspy.Module):
def __init__(self, max_iterations=3):
self.generate = dspy.ChainOfThought(GenerateDraft)
self.evaluate = dspy.ChainOfThought(EvaluateDraft)
self.improve = dspy.ChainOfThought(ImproveDraft)
self.max_iterations = max_iterations
def forward(self, task):
draft = self.generate(task=task)
for i in range(self.max_iterations):
evaluation = self.evaluate(task=task, draft=draft.output)
if evaluation.score >= 0.9:
break
draft = self.improve(
task=task,
draft=draft.output,
feedback=evaluation.feedback,
)
return draftStep 4: Use different models per stage
步骤4:为不同阶段使用不同模型
Not every stage needs the same model. Use cheap models for simple steps:
python
expensive_lm = dspy.LM("openai/gpt-4o")
cheap_lm = dspy.LM("openai/gpt-4o-mini")
pipeline = SupportPipeline()并非每个阶段都需要相同的模型。简单步骤使用低成本模型:
python
expensive_lm = dspy.LM("openai/gpt-4o")
cheap_lm = dspy.LM("openai/gpt-4o-mini")
pipeline = SupportPipeline()Cheap model for classification (simple task)
Cheap model for classification (simple task)
pipeline.classify.lm = cheap_lm
pipeline.classify.lm = cheap_lm
Expensive model for drafting (needs quality)
Expensive model for drafting (needs quality)
pipeline.draft.lm = expensive_lm
See `/ai-cutting-costs` for more cost optimization strategies.pipeline.draft.lm = expensive_lm
查看`/ai-cutting-costs`获取更多成本优化策略。Step 5: Test and optimize the full pipeline
步骤5:测试并优化整个流水线
The beauty of DSPy pipelines: you optimize the whole thing end-to-end, not each step separately.
python
def pipeline_metric(example, prediction, trace=None):
# Score the final output quality
return prediction.response.lower().strip() == example.response.lower().strip()DSPy流水线的优势:你可以端到端优化整个流水线,而非单独优化每个步骤。
python
def pipeline_metric(example, prediction, trace=None):
# Score the final output quality
return prediction.response.lower().strip() == example.response.lower().strip()Optimizes prompts for ALL stages together
Optimizes prompts for ALL stages together
optimizer = dspy.MIPROv2(metric=pipeline_metric, auto="medium")
optimized = optimizer.compile(pipeline, trainset=trainset)
undefinedoptimizer = dspy.MIPROv2(metric=pipeline_metric, auto="medium")
optimized = optimizer.compile(pipeline, trainset=trainset)
undefinedKey patterns
关键模式
- Decompose the problem — if a task has distinct phases (understand, retrieve, generate, verify), make each one a module
- Each stage gets its own signature — clear inputs and outputs make the pipeline debuggable
- Wire in — the
forward()method is your orchestration logicforward - Optimize end-to-end — DSPy optimizers tune all stages together to maximize the final metric
- Debug stage by stage — use to see what each step did
dspy.inspect_history() - Assign models per stage — cheap models for simple tasks, expensive for complex ones
- 分解问题——如果任务有不同的阶段(理解、检索、生成、验证),将每个阶段设为一个模块
- 每个阶段有独立的签名——清晰的输入和输出让流水线更易于调试
- 在中连接——
forward()方法是你的编排逻辑forward - 端到端优化——DSPy优化器会协同调整所有阶段,以最大化最终指标
- 分阶段调试——使用查看每个步骤的执行情况
dspy.inspect_history() - 为阶段分配对应模型——简单任务用低成本模型,复杂任务用高成本模型
When to use LangGraph instead
何时改用LangGraph
DSPy pipelines are great for stateless, linear-ish flows. But some problems need more:
| If your pipeline... | Use |
|---|---|
| Steps run in a fixed order | DSPy pipeline (this skill) |
| Steps branch based on results | DSPy pipeline with |
| Needs cycles (retry loops, agent loops) | LangGraph |
| Needs persistent state across calls | LangGraph with checkpointing |
| Needs human approval mid-pipeline | LangGraph |
| Coordinates multiple independent agents | LangGraph supervisor pattern |
DSPy流水线非常适合无状态、近似线性的流程。但有些问题需要更多功能:
| 如果你的流水线... | 使用 |
|---|---|
| 步骤按固定顺序执行 | DSPy流水线(本技能) |
| 步骤根据结果分支 | 在 |
| 需要循环(重试循环、代理循环) | LangGraph |
| 需要跨调用的持久化状态 | 带检查点的LangGraph |
| 流水线中间需要人工审批 | 带 |
| 协调多个独立代理 | LangGraph 监督模式 |
Quick example: DSPy module as a LangGraph node
快速示例:DSPy模块作为LangGraph节点
python
import dspy
from langgraph.graph import StateGraph, START, END
from typing import TypedDict
class PipelineState(TypedDict):
input_text: str
category: str
output: strpython
import dspy
from langgraph.graph import StateGraph, START, END
from typing import TypedDict
class PipelineState(TypedDict):
input_text: str
category: str
output: strDSPy modules
DSPy modules
classifier = dspy.ChainOfThought("text -> category")
generator = dspy.ChainOfThought("text, category -> output")
classifier = dspy.ChainOfThought("text -> category")
generator = dspy.ChainOfThought("text, category -> output")
Wrap as LangGraph nodes
Wrap as LangGraph nodes
def classify_node(state: PipelineState) -> dict:
result = classifier(text=state["input_text"])
return {"category": result.category}
def generate_node(state: PipelineState) -> dict:
result = generator(text=state["input_text"], category=state["category"])
return {"output": result.output}
def classify_node(state: PipelineState) -> dict:
result = classifier(text=state["input_text"])
return {"category": result.category}
def generate_node(state: PipelineState) -> dict:
result = generator(text=state["input_text"], category=state["category"])
return {"output": result.output}
Build graph
Build graph
graph = StateGraph(PipelineState)
graph.add_node("classify", classify_node)
graph.add_node("generate", generate_node)
graph.add_edge(START, "classify")
graph.add_edge("classify", "generate")
graph.add_edge("generate", END)
app = graph.compile()
This gives you LangGraph's state management and routing with DSPy's optimizable prompts. For more, see `/ai-building-chatbots` (stateful conversations) and `/ai-coordinating-agents` (multi-agent systems). For the full LangGraph API reference, see [`docs/langchain-langgraph-reference.md`](../../docs/langchain-langgraph-reference.md).graph = StateGraph(PipelineState)
graph.add_node("classify", classify_node)
graph.add_node("generate", generate_node)
graph.add_edge(START, "classify")
graph.add_edge("classify", "generate")
graph.add_edge("generate", END)
app = graph.compile()
这让你既能利用LangGraph的状态管理和路由功能,又能使用DSPy的可优化提示词。更多内容请查看`/ai-building-chatbots`(有状态对话)和`/ai-coordinating-agents`(多代理系统)。完整的LangGraph API参考请查看[`docs/langchain-langgraph-reference.md`](../../docs/langchain-langgraph-reference.md)。Additional resources
额外资源
- Use to add verification and guardrails between stages
/ai-checking-outputs - Use to assign different models per stage
/ai-cutting-costs - Not sure what stages your pipeline needs? Use to identify where to split
/ai-decomposing-tasks - For content generation pipelines, see . For complex reasoning, see
/ai-writing-content/ai-reasoning - Next: to measure and improve your pipeline
/ai-improving-accuracy
- 使用在阶段之间添加验证和防护措施
/ai-checking-outputs - 使用为不同阶段分配不同模型
/ai-cutting-costs - 不确定流水线需要哪些阶段?使用确定拆分点
/ai-decomposing-tasks - 内容生成流水线请查看。复杂推理请查看
/ai-writing-content/ai-reasoning - 下一步:以衡量并提升流水线性能
/ai-improving-accuracy