ai-coordinating-agents
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseBuild Multi-Agent Systems
构建多Agent系统
Guide the user through building multiple AI agents that collaborate — a supervisor delegates tasks, specialists handle their domains, and results flow back. Uses DSPy for each agent's reasoning and LangGraph for orchestration, handoff, and parallel execution.
引导用户构建可协同工作的多AI Agent——主管Agent负责任务委派,专业Agent处理各自领域的工作,最终结果回流汇总。每个Agent的推理逻辑基于DSPy实现,编排、工作交接和并行执行基于LangGraph实现。
Step 1: Identify the agents
步骤1:确定所需Agent
Ask the user:
- What's the overall task? (research a topic, handle support, create content, analyze data?)
- What specialist roles do you need? (researcher, writer, reviewer, analyst, etc.)
- How do agents hand off work? (supervisor routes, chain passes forward, parallel fan-out?)
- Do any agents need tools? (search, database, APIs, code execution?)
询问用户以下问题:
- 整体任务是什么?(研究某个主题、处理支持请求、创建内容、分析数据?)
- 需要哪些专业角色?(研究员、撰稿人、审核员、分析师等)
- Agent之间如何交接工作?(主管Agent路由、链式传递、并行分发?)
- 是否有Agent需要使用工具?(搜索、数据库、API、代码执行?)
Common multi-agent patterns
常见多Agent模式
| Pattern | How it works | Good for |
|---|---|---|
| Supervisor | Central agent routes tasks to specialists | Support triage, research coordination |
| Chain | Agent A → Agent B → Agent C in sequence | Content pipelines (write → edit → review) |
| Parallel | Multiple agents work simultaneously, merge results | Research (search multiple sources at once) |
| Hierarchical | Supervisor → sub-supervisors → specialists | Complex organizations with many agents |
| 模式 | 工作原理 | 适用场景 |
|---|---|---|
| 主管模式 | 中央Agent将任务路由给专业Agent | 支持请求分流、研究协调 |
| 链式模式 | Agent A → Agent B → Agent C 按顺序执行 | 内容流水线(撰稿→编辑→审核) |
| 并行模式 | 多个Agent同时工作,合并结果 | 研究工作(同时搜索多个来源) |
| 分层模式 | 主管Agent → 次级主管Agent → 专业Agent | 包含大量Agent的复杂组织架构 |
Step 2: Build each agent as a DSPy module
步骤2:将每个Agent构建为DSPy模块
Each agent gets its own signature, reasoning strategy, and (optionally) tools.
每个Agent都有自己的签名、推理策略和(可选)工具。
Simple agent — just a DSPy module
简单Agent——仅为DSPy模块
python
import dspy
class ResearchSummary(dspy.Signature):
"""Research the topic and provide a detailed summary with key findings."""
topic: str = dspy.InputField()
sources: list[str] = dspy.InputField(desc="Search results or documents to analyze")
summary: str = dspy.OutputField(desc="Detailed research summary")
key_findings: list[str] = dspy.OutputField(desc="Top 3-5 key findings")
class ResearchAgent(dspy.Module):
def __init__(self, retriever):
self.retriever = retriever
self.analyze = dspy.ChainOfThought(ResearchSummary)
def forward(self, topic):
sources = self.retriever(topic).passages
return self.analyze(topic=topic, sources=sources)python
import dspy
class ResearchSummary(dspy.Signature):
"""Research the topic and provide a detailed summary with key findings."""
topic: str = dspy.InputField()
sources: list[str] = dspy.InputField(desc="Search results or documents to analyze")
summary: str = dspy.OutputField(desc="Detailed research summary")
key_findings: list[str] = dspy.OutputField(desc="Top 3-5 key findings")
class ResearchAgent(dspy.Module):
def __init__(self, retriever):
self.retriever = retriever
self.analyze = dspy.ChainOfThought(ResearchSummary)
def forward(self, topic):
sources = self.retriever(topic).passages
return self.analyze(topic=topic, sources=sources)Agent with tools — use ReAct
带工具的Agent——使用ReAct
python
def search_web(query: str) -> str:
"""Search the web for current information."""
# your search implementation
return results
def query_database(sql: str) -> str:
"""Query the analytics database."""
# your database implementation
return results
class DataAnalyst(dspy.Module):
def __init__(self):
self.agent = dspy.ReAct(
"question, context -> analysis, recommendation",
tools=[search_web, query_database],
max_iters=5,
)
def forward(self, question, context=""):
return self.agent(question=question, context=context)python
def search_web(query: str) -> str:
"""Search the web for current information."""
# your search implementation
return results
def query_database(sql: str) -> str:
"""Query the analytics database."""
# your database implementation
return results
class DataAnalyst(dspy.Module):
def __init__(self):
self.agent = dspy.ReAct(
"question, context -> analysis, recommendation",
tools=[search_web, query_database],
max_iters=5,
)
def forward(self, question, context=""):
return self.agent(question=question, context=context)Agent with LangChain tools
集成LangChain工具的Agent
Convert pre-built LangChain tools for use in DSPy agents:
python
from langchain_community.tools import DuckDuckGoSearchRun
search_tool = dspy.Tool.from_langchain(DuckDuckGoSearchRun())
class WebResearcher(dspy.Module):
def __init__(self):
self.agent = dspy.ReAct(
"question -> findings",
tools=[search_tool],
max_iters=5,
)
def forward(self, question):
return self.agent(question=question)将预构建的LangChain工具转换为DSPy Agent可用的工具:
python
from langchain_community.tools import DuckDuckGoSearchRun
search_tool = dspy.Tool.from_langchain(DuckDuckGoSearchRun())
class WebResearcher(dspy.Module):
def __init__(self):
self.agent = dspy.ReAct(
"question -> findings",
tools=[search_tool],
max_iters=5,
)
def forward(self, question):
return self.agent(question=question)Step 3: Add a supervisor (LangGraph)
步骤3:添加主管Agent(基于LangGraph)
The supervisor decides which agent to call next based on the current state.
主管Agent根据当前状态决定下一个调用的Agent。
Define the shared state
定义共享状态
python
from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Annotated
import operator
class TeamState(TypedDict):
task: str # the overall task
messages: Annotated[list[dict], operator.add] # communication log
current_agent: str # who's working now
results: dict # collected results from agents
status: str # "in_progress", "done", "needs_review"python
from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Annotated
import operator
class TeamState(TypedDict):
task: str # 整体任务
messages: Annotated[list[dict], operator.add] # 通信日志
current_agent: str # 当前工作的Agent
results: dict # 收集到的Agent结果
status: str # "in_progress", "done", "needs_review"Build the supervisor
构建主管Agent
python
class RouteTask(dspy.Signature):
"""Decide which specialist agent should handle the next step."""
task: str = dspy.InputField(desc="The overall task")
completed_work: str = dspy.InputField(desc="Work completed so far")
available_agents: list[str] = dspy.InputField()
next_agent: str = dspy.OutputField(desc="Which agent to call next")
sub_task: str = dspy.OutputField(desc="Specific instruction for that agent")
is_complete: bool = dspy.OutputField(desc="Whether the overall task is done")
supervisor_module = dspy.ChainOfThought(RouteTask)
def supervisor(state: TeamState) -> dict:
completed = "\n".join(
f"{k}: {v}" for k, v in state["results"].items()
)
result = supervisor_module(
task=state["task"],
completed_work=completed or "Nothing yet",
available_agents=["researcher", "writer", "reviewer"],
)
if result.is_complete:
return {"status": "done", "current_agent": "none"}
return {
"current_agent": result.next_agent,
"messages": [{"role": "supervisor", "content": f"@{result.next_agent}: {result.sub_task}"}],
}python
class RouteTask(dspy.Signature):
"""Decide which specialist agent should handle the next step."""
task: str = dspy.InputField(desc="The overall task")
completed_work: str = dspy.InputField(desc="Work completed so far")
available_agents: list[str] = dspy.InputField()
next_agent: str = dspy.OutputField(desc="Which agent to call next")
sub_task: str = dspy.OutputField(desc="Specific instruction for that agent")
is_complete: bool = dspy.OutputField(desc="Whether the overall task is done")
supervisor_module = dspy.ChainOfThought(RouteTask)
def supervisor(state: TeamState) -> dict:
completed = "\n".join(
f"{k}: {v}" for k, v in state["results"].items()
)
result = supervisor_module(
task=state["task"],
completed_work=completed or "Nothing yet",
available_agents=["researcher", "writer", "reviewer"],
)
if result.is_complete:
return {"status": "done", "current_agent": "none"}
return {
"current_agent": result.next_agent,
"messages": [{"role": "supervisor", "content": f"@{result.next_agent}: {result.sub_task}"}],
}Wire up the agents as graph nodes
将Agent连接为图节点
python
researcher = ResearchAgent(retriever=my_retriever)
writer_module = dspy.ChainOfThought(WriteContent)
reviewer_module = dspy.ChainOfThought(ReviewContent)
def researcher_node(state: TeamState) -> dict:
task_msg = state["messages"][-1]["content"]
result = researcher(topic=task_msg)
return {
"results": {**state["results"], "research": result.summary},
"messages": [{"role": "researcher", "content": result.summary}],
}
def writer_node(state: TeamState) -> dict:
result = writer_module(
task=state["task"],
research=state["results"].get("research", ""),
)
return {
"results": {**state["results"], "draft": result.output},
"messages": [{"role": "writer", "content": result.output}],
}
def reviewer_node(state: TeamState) -> dict:
result = reviewer_module(
draft=state["results"].get("draft", ""),
task=state["task"],
)
return {
"results": {**state["results"], "review": result.feedback},
"messages": [{"role": "reviewer", "content": result.feedback}],
}python
researcher = ResearchAgent(retriever=my_retriever)
writer_module = dspy.ChainOfThought(WriteContent)
reviewer_module = dspy.ChainOfThought(ReviewContent)
def researcher_node(state: TeamState) -> dict:
task_msg = state["messages"][-1]["content"]
result = researcher(topic=task_msg)
return {
"results": {**state["results"], "research": result.summary},
"messages": [{"role": "researcher", "content": result.summary}],
}
def writer_node(state: TeamState) -> dict:
result = writer_module(
task=state["task"],
research=state["results"].get("research", ""),
)
return {
"results": {**state["results"], "draft": result.output},
"messages": [{"role": "writer", "content": result.output}],
}
def reviewer_node(state: TeamState) -> dict:
result = reviewer_module(
draft=state["results"].get("draft", ""),
task=state["task"],
)
return {
"results": {**state["results"], "review": result.feedback},
"messages": [{"role": "reviewer", "content": result.feedback}],
}Build the graph
构建图
python
graph = StateGraph(TeamState)python
graph = StateGraph(TeamState)Add nodes
添加节点
graph.add_node("supervisor", supervisor)
graph.add_node("researcher", researcher_node)
graph.add_node("writer", writer_node)
graph.add_node("reviewer", reviewer_node)
graph.add_node("supervisor", supervisor)
graph.add_node("researcher", researcher_node)
graph.add_node("writer", writer_node)
graph.add_node("reviewer", reviewer_node)
Supervisor decides who goes next
主管Agent决定下一步调用哪个Agent
graph.add_edge(START, "supervisor")
def route_to_agent(state: TeamState) -> str:
if state["status"] == "done":
return "done"
return state["current_agent"]
graph.add_conditional_edges(
"supervisor",
route_to_agent,
{
"researcher": "researcher",
"writer": "writer",
"reviewer": "reviewer",
"done": END,
},
)
graph.add_edge(START, "supervisor")
def route_to_agent(state: TeamState) -> str:
if state["status"] == "done":
return "done"
return state["current_agent"]
graph.add_conditional_edges(
"supervisor",
route_to_agent,
{
"researcher": "researcher",
"writer": "writer",
"reviewer": "reviewer",
"done": END,
},
)
All agents report back to supervisor
所有Agent完成工作后向主管Agent汇报
graph.add_edge("researcher", "supervisor")
graph.add_edge("writer", "supervisor")
graph.add_edge("reviewer", "supervisor")
app = graph.compile()
undefinedgraph.add_edge("researcher", "supervisor")
graph.add_edge("writer", "supervisor")
graph.add_edge("reviewer", "supervisor")
app = graph.compile()
undefinedRun it
运行系统
python
result = app.invoke({
"task": "Write a blog post about the benefits of remote work",
"messages": [],
"current_agent": "",
"results": {},
"status": "in_progress",
})python
result = app.invoke({
"task": "Write a blog post about the benefits of remote work",
"messages": [],
"current_agent": "",
"results": {},
"status": "in_progress",
})Supervisor routes: researcher → writer → reviewer → done
主管Agent路由流程:研究员 → 撰稿人 → 审核员 → 完成
print(result["results"]["draft"])
undefinedprint(result["results"]["draft"])
undefinedStep 4: Agent handoff pattern
步骤4:Agent工作交接模式
When one agent passes work directly to another (no supervisor).
当一个Agent直接将工作传递给另一个Agent时(无需主管Agent)。
Shared context via state
通过共享状态传递上下文
python
class HandoffState(TypedDict):
task: str
context: Annotated[list[str], operator.add] # accumulated context
output: str
def agent_a(state: HandoffState) -> dict:
result = module_a(task=state["task"])
return {"context": [f"Agent A found: {result.output}"]}
def agent_b(state: HandoffState) -> dict:
full_context = "\n".join(state["context"])
result = module_b(task=state["task"], context=full_context)
return {"context": [f"Agent B added: {result.output}"]}
def agent_c(state: HandoffState) -> dict:
full_context = "\n".join(state["context"])
result = module_c(task=state["task"], context=full_context)
return {"output": result.output}
graph = StateGraph(HandoffState)
graph.add_node("a", agent_a)
graph.add_node("b", agent_b)
graph.add_node("c", agent_c)
graph.add_edge(START, "a")
graph.add_edge("a", "b")
graph.add_edge("b", "c")
graph.add_edge("c", END)python
class HandoffState(TypedDict):
task: str
context: Annotated[list[str], operator.add] # 累积的上下文
output: str
def agent_a(state: HandoffState) -> dict:
result = module_a(task=state["task"])
return {"context": [f"Agent A found: {result.output}"]}
def agent_b(state: HandoffState) -> dict:
full_context = "\n".join(state["context"])
result = module_b(task=state["task"], context=full_context)
return {"context": [f"Agent B added: {result.output}"]}
def agent_c(state: HandoffState) -> dict:
full_context = "\n".join(state["context"])
result = module_c(task=state["task"], context=full_context)
return {"output": result.output}
graph = StateGraph(HandoffState)
graph.add_node("a", agent_a)
graph.add_node("b", agent_b)
graph.add_node("c", agent_c)
graph.add_edge(START, "a")
graph.add_edge("a", "b")
graph.add_edge("b", "c")
graph.add_edge("c", END)Conditional handoff
条件式工作交接
Route to different specialists based on intermediate results:
python
def route_after_classify(state) -> str:
if state["category"] == "billing":
return "billing_specialist"
elif state["category"] == "technical":
return "tech_specialist"
return "general_agent"
graph.add_conditional_edges("classifier", route_after_classify, {
"billing_specialist": "billing",
"tech_specialist": "tech",
"general_agent": "general",
})根据中间结果将工作路由到不同的专业Agent:
python
def route_after_classify(state) -> str:
if state["category"] == "billing":
return "billing_specialist"
elif state["category"] == "technical":
return "tech_specialist"
return "general_agent"
graph.add_conditional_edges("classifier", route_after_classify, {
"billing_specialist": "billing",
"tech_specialist": "tech",
"general_agent": "general",
})Step 5: Parallel agents
步骤5:并行Agent模式
Fan out to multiple agents simultaneously and merge results.
python
from langgraph.constants import Send
class ParallelState(TypedDict):
task: str
subtasks: list[str]
results: Annotated[list[dict], operator.add]
final_output: str
def split_task(state: ParallelState) -> list:
"""Fan out subtasks to worker agents."""
return [Send("worker", {"task": state["task"], "subtask": st}) for st in state["subtasks"]]
def worker(state: dict) -> dict:
"""Each worker handles one subtask."""
worker_module = dspy.ChainOfThought("task, subtask -> result")
result = worker_module(task=state["task"], subtask=state["subtask"])
return {"results": [{"subtask": state["subtask"], "result": result.result}]}
def merge_results(state: ParallelState) -> dict:
"""Combine all worker results into a final output."""
merger = dspy.ChainOfThought("task, partial_results -> final_output")
partial = "\n".join(f"- {r['subtask']}: {r['result']}" for r in state["results"])
result = merger(task=state["task"], partial_results=partial)
return {"final_output": result.final_output}
graph = StateGraph(ParallelState)
graph.add_node("worker", worker)
graph.add_node("merge", merge_results)
graph.add_conditional_edges(START, split_task)
graph.add_edge("worker", "merge")
graph.add_edge("merge", END)同时将任务分发给多个Agent,然后合并结果。
python
from langgraph.constants import Send
class ParallelState(TypedDict):
task: str
subtasks: list[str]
results: Annotated[list[dict], operator.add]
final_output: str
def split_task(state: ParallelState) -> list:
"""Fan out subtasks to worker agents."""
return [Send("worker", {"task": state["task"], "subtask": st}) for st in state["subtasks"]]
def worker(state: dict) -> dict:
"""Each worker handles one subtask."""
worker_module = dspy.ChainOfThought("task, subtask -> result")
result = worker_module(task=state["task"], subtask=state["subtask"])
return {"results": [{"subtask": state["subtask"], "result": result.result}]}
def merge_results(state: ParallelState) -> dict:
"""Combine all worker results into a final output."""
merger = dspy.ChainOfThought("task, partial_results -> final_output")
partial = "\n".join(f"- {r['subtask']}: {r['result']}" for r in state["results"])
result = merger(task=state["task"], partial_results=partial)
return {"final_output": result.final_output}
graph = StateGraph(ParallelState)
graph.add_node("worker", worker)
graph.add_node("merge", merge_results)
graph.add_conditional_edges(START, split_task)
graph.add_edge("worker", "merge")
graph.add_edge("merge", END)Step 6: Human-in-the-loop
步骤6:人机协作模式
Pause before agents take critical actions.
python
from langgraph.checkpoint.memory import MemorySaver
checkpointer = MemorySaver()在Agent执行关键操作前暂停,等待人工确认。
python
from langgraph.checkpoint.memory import MemorySaver
checkpointer = MemorySaver()Interrupt before any agent that takes external actions
在执行外部操作的Agent节点前中断
app = graph.compile(
checkpointer=checkpointer,
interrupt_before=["execute_action", "send_email", "update_database"],
)
config = {"configurable": {"thread_id": "task-001"}}
app = graph.compile(
checkpointer=checkpointer,
interrupt_before=["execute_action", "send_email", "update_database"],
)
config = {"configurable": {"thread_id": "task-001"}}
Run until interrupt
运行到中断点
result = app.invoke(input_state, config)
result = app.invoke(input_state, config)
-> Pauses before "execute_action" node
-> 在"execute_action"节点前暂停
Human reviews the proposed action in result state
人工查看结果状态中的拟执行操作
print(result["proposed_action"])
print(result["proposed_action"])
If approved, resume from checkpoint
若批准,从检查点恢复执行
result = app.invoke(None, config)
undefinedresult = app.invoke(None, config)
undefinedStep 7: Optimize the team
步骤7:优化Agent团队
Per-agent metrics
单个Agent指标优化
Optimize each agent's prompts independently first:
python
def researcher_metric(example, prediction, trace=None):
"""Are the research findings relevant and complete?"""
judge = dspy.Predict(JudgeResearch)
return judge(topic=example.topic, findings=prediction.summary).is_good
optimizer = dspy.MIPROv2(metric=researcher_metric, auto="light")
optimized_researcher = optimizer.compile(researcher, trainset=research_trainset)先独立优化每个Agent的提示词:
python
def researcher_metric(example, prediction, trace=None):
"""Are the research findings relevant and complete?"""
judge = dspy.Predict(JudgeResearch)
return judge(topic=example.topic, findings=prediction.summary).is_good
optimizer = dspy.MIPROv2(metric=researcher_metric, auto="light")
optimized_researcher = optimizer.compile(researcher, trainset=research_trainset)End-to-end team metric
端到端团队指标优化
Then optimize all agents together with a team-level metric:
python
def team_metric(example, prediction, trace=None):
"""Is the final output high quality?"""
judge = dspy.Predict(JudgeOutput)
return judge(
task=example.task,
expected=example.output,
actual=prediction.final_output,
).is_good然后使用团队级指标整体优化所有Agent:
python
def team_metric(example, prediction, trace=None):
"""Is the final output high quality?"""
judge = dspy.Predict(JudgeOutput)
return judge(
task=example.task,
expected=example.output,
actual=prediction.final_output,
).is_goodCreate a module that wraps the full team
创建包装整个团队的模块
class TeamModule(dspy.Module):
def init(self):
self.supervisor = supervisor_module
self.researcher = optimized_researcher
self.writer = writer_module
self.reviewer = reviewer_module
def forward(self, task):
# Run the LangGraph app
result = app.invoke({"task": task, "messages": [], "current_agent": "", "results": {}, "status": "in_progress"})
return dspy.Prediction(final_output=result["results"].get("draft", ""))optimizer = dspy.MIPROv2(metric=team_metric, auto="medium")
optimized_team = optimizer.compile(TeamModule(), trainset=team_trainset)
undefinedclass TeamModule(dspy.Module):
def init(self):
self.supervisor = supervisor_module
self.researcher = optimized_researcher
self.writer = writer_module
self.reviewer = reviewer_module
def forward(self, task):
# 运行LangGraph应用
result = app.invoke({"task": task, "messages": [], "current_agent": "", "results": {}, "status": "in_progress"})
return dspy.Prediction(final_output=result["results"].get("draft", ""))optimizer = dspy.MIPROv2(metric=team_metric, auto="medium")
optimized_team = optimizer.compile(TeamModule(), trainset=team_trainset)
undefinedKey patterns
核心模式总结
- One DSPy module per agent — each agent has its own signature, tools, and reasoning strategy
- LangGraph orchestrates, DSPy reasons — LangGraph handles routing and state; DSPy handles what each agent actually thinks
- Supervisor pattern for dynamic routing — when you don't know the order of agents in advance
- Chain pattern for fixed pipelines — when agents always run in the same order (write → edit → review)
- Use for parallel work — fan out to multiple agents simultaneously, merge results after
Send() - Shared state is your communication bus — agents read from and write to the LangGraph state
- Optimize bottom-up — tune individual agents first, then optimize the full team end-to-end
- Interrupt before side effects — use so humans approve actions with real-world consequences
interrupt_before
- 每个Agent对应一个DSPy模块——每个Agent拥有独立的签名、工具和推理策略
- LangGraph负责编排,DSPy负责推理——LangGraph处理路由和状态管理;DSPy处理每个Agent的具体逻辑
- 主管模式实现动态路由——当Agent执行顺序不固定时使用
- 链式模式实现固定流水线——当Agent始终按固定顺序执行时使用(如撰稿→编辑→审核)
- 使用实现并行工作——同时将任务分发给多个Agent,之后合并结果
Send() - 共享状态作为通信总线——Agent从LangGraph状态中读取信息并写入结果
- 自下而上优化——先调整单个Agent,再端到端优化整个团队
- 在产生副作用前中断——使用让人工确认会产生现实影响的操作
interrupt_before
Additional resources
额外资源
- For worked examples (research team, support escalation), see examples.md
- For the LangChain/LangGraph API reference, see
docs/langchain-langgraph-reference.md - Need a single agent with tools? Start with
/ai-taking-actions - Building a stateless pipeline instead? Use
/ai-building-pipelines - Need the agents to hold conversations? Use
/ai-building-chatbots - Next: to measure and improve your agents
/ai-improving-accuracy
- 如需完整示例(研究团队、支持升级),请查看examples.md
- 如需LangChain/LangGraph API参考,请查看
docs/langchain-langgraph-reference.md - 如需构建带工具的单个Agent?请从开始
/ai-taking-actions - 如需构建无状态流水线?请使用
/ai-building-pipelines - 如需构建可对话的Agent?请使用
/ai-building-chatbots - 下一步:使用衡量并提升你的Agent性能
/ai-improving-accuracy