langgraph-architecture
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseLangGraph Architecture Decisions
LangGraph 架构决策
When to Use LangGraph
何时使用LangGraph
Use LangGraph When You Need:
以下场景推荐使用LangGraph:
- Stateful conversations - Multi-turn interactions with memory
- Human-in-the-loop - Approval gates, corrections, interventions
- Complex control flow - Loops, branches, conditional routing
- Multi-agent coordination - Multiple LLMs working together
- Persistence - Resume from checkpoints, time travel debugging
- Streaming - Real-time token streaming, progress updates
- Reliability - Retries, error recovery, durability guarantees
- 有状态对话 - 带记忆的多轮交互
- 人机协作循环 - 审批关卡、修正、人工干预
- 复杂控制流 - 循环、分支、条件路由
- 多智能体协作 - 多个LLM协同工作
- 持久化 - 从检查点恢复、时间旅行调试
- 流处理 - 实时Token流、进度更新
- 可靠性 - 重试、错误恢复、持久性保障
Consider Alternatives When:
以下场景考虑替代方案:
| Scenario | Alternative | Why |
|---|---|---|
| Single LLM call | Direct API call | Overhead not justified |
| Linear pipeline | LangChain LCEL | Simpler abstraction |
| Stateless tool use | Function calling | No persistence needed |
| Simple RAG | LangChain retrievers | Built-in patterns |
| Batch processing | Async tasks | Different execution model |
| 场景 | 替代方案 | 原因 |
|---|---|---|
| 单次LLM调用 | 直接API调用 | 架构开销得不偿失 |
| 线性流水线 | LangChain LCEL | 更简洁的抽象 |
| 无状态工具调用 | 函数调用 | 无需持久化 |
| 简单RAG | LangChain检索器 | 内置成熟模式 |
| 批处理 | 异步任务 | 不同的执行模型 |
State Schema Decisions
状态模式决策
TypedDict vs Pydantic
TypedDict 与 Pydantic 对比
| TypedDict | Pydantic |
|---|---|
| Lightweight, faster | Runtime validation |
| Dict-like access | Attribute access |
| No validation overhead | Type coercion |
| Simpler serialization | Complex nested models |
Recommendation: Use TypedDict for most cases. Use Pydantic when you need validation or complex nested structures.
| TypedDict | Pydantic |
|---|---|
| 轻量、速度更快 | 运行时验证 |
| 类字典访问 | 属性访问 |
| 无验证开销 | 类型转换 |
| 序列化更简单 | 支持复杂嵌套模型 |
推荐方案:大多数场景使用TypedDict。当需要验证或复杂嵌套结构时使用Pydantic。
Reducer Selection
Reducer 选择
| Use Case | Reducer | Example |
|---|---|---|
| Chat messages | | Handles IDs, RemoveMessage |
| Simple append | | |
| Keep latest | None (LastValue) | |
| Custom merge | Lambda | |
| Overwrite list | | Bypass reducer |
| 使用场景 | Reducer | 示例 |
|---|---|---|
| 聊天消息 | | 处理ID、RemoveMessage |
| 简单追加 | | |
| 保留最新值 | None (LastValue) | |
| 自定义合并 | Lambda | |
| 覆盖列表 | | 绕过reducer |
State Size Considerations
状态大小考量
python
undefinedpython
undefinedSMALL STATE (< 1MB) - Put in state
小型状态 (< 1MB) - 存入状态
class State(TypedDict):
messages: Annotated[list, add_messages]
context: str
class State(TypedDict):
messages: Annotated[list, add_messages]
context: str
LARGE DATA - Use Store
大型数据 - 使用Store
class State(TypedDict):
messages: Annotated[list, add_messages]
document_ref: str # Reference to store
def node(state, *, store: BaseStore):
doc = store.get(namespace, state["document_ref"])
# Process without bloating checkpoints
undefinedclass State(TypedDict):
messages: Annotated[list, add_messages]
document_ref: str # 指向store的引用
def node(state, *, store: BaseStore):
doc = store.get(namespace, state["document_ref"])
# 处理时避免膨胀检查点
undefinedGraph Structure Decisions
图结构决策
Single Graph vs Subgraphs
单图与子图
Single Graph when:
- All nodes share the same state schema
- Simple linear or branching flow
- < 10 nodes
Subgraphs when:
- Different state schemas needed
- Reusable components across graphs
- Team separation of concerns
- Complex hierarchical workflows
单图适用于:
- 所有节点共享相同的状态模式
- 简单的线性或分支流
- 节点数<10个
子图适用于:
- 需要不同的状态模式
- 可在多个图中复用的组件
- 团队职责分离
- 复杂的分层工作流
Conditional Edges vs Command
条件边与命令
| Conditional Edges | Command |
|---|---|
| Routing based on state | Routing + state update |
| Separate router function | Decision in node |
| Clearer visualization | More flexible |
| Standard patterns | Dynamic destinations |
python
undefined| 条件边 | 命令 |
|---|---|
| 基于状态的路由 | 路由+状态更新 |
| 独立的路由函数 | 在节点内做决策 |
| 可视化更清晰 | 灵活性更高 |
| 标准模式 | 动态目标 |
python
undefinedConditional Edge - when routing is the focus
条件边 - 重点在路由时使用
def router(state) -> Literal["a", "b"]:
return "a" if condition else "b"
builder.add_conditional_edges("node", router)
def router(state) -> Literal["a", "b"]:
return "a" if condition else "b"
builder.add_conditional_edges("node", router)
Command - when combining routing with updates
命令 - 同时处理路由与更新时使用
def node(state) -> Command:
return Command(goto="next", update={"step": state["step"] + 1})
undefineddef node(state) -> Command:
return Command(goto="next", update={"step": state["step"] + 1})
undefinedStatic vs Dynamic Routing
静态与动态路由
Static Edges ():
add_edge- Fixed flow known at build time
- Clearer graph visualization
- Easier to reason about
Dynamic Routing (, , ):
add_conditional_edgesCommandSend- Runtime decisions based on state
- Agent-driven navigation
- Fan-out patterns
静态边 ():
add_edge- 构建时已确定固定流
- 图可视化更清晰
- 更易推理
动态路由 (, , ):
add_conditional_edgesCommandSend- 基于状态的运行时决策
- 智能体驱动的导航
- 扇出模式
Persistence Strategy
持久化策略
Checkpointer Selection
检查点选择器
| Checkpointer | Use Case | Characteristics |
|---|---|---|
| Testing only | Lost on restart |
| Development | Single file, local |
| Production | Scalable, concurrent |
| Custom | Special needs | Implement BaseCheckpointSaver |
| 检查点选择器 | 使用场景 | 特性 |
|---|---|---|
| 仅用于测试 | 重启后丢失数据 |
| 开发环境 | 单文件、本地存储 |
| 生产环境 | 可扩展、支持并发 |
| 自定义 | 特殊需求 | 实现BaseCheckpointSaver接口 |
Checkpointing Scope
检查点范围
python
undefinedpython
undefinedFull persistence (default)
全量持久化(默认)
graph = builder.compile(checkpointer=checkpointer)
graph = builder.compile(checkpointer=checkpointer)
Subgraph options
子图选项
subgraph = sub_builder.compile(
checkpointer=None, # Inherit from parent
checkpointer=True, # Independent checkpointing
checkpointer=False, # No checkpointing (runs atomically)
)
undefinedsubgraph = sub_builder.compile(
checkpointer=None, # 继承父图配置
checkpointer=True, # 独立检查点
checkpointer=False, # 无检查点(原子性运行)
)
undefinedWhen to Disable Checkpointing
何时禁用检查点
- Short-lived subgraphs that should be atomic
- Subgraphs with incompatible state schemas
- Performance-critical paths without need for resume
- 应原子性运行的短生命周期子图
- 状态模式不兼容的子图
- 性能关键路径且无需恢复功能
Multi-Agent Architecture
多智能体架构
Supervisor Pattern
监督者模式
Best for:
- Clear hierarchy
- Centralized decision making
- Different agent specializations
┌─────────────┐
│ Supervisor │
└──────┬──────┘
┌────────┬───┴───┬────────┐
▼ ▼ ▼ ▼
┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐
│Agent1│ │Agent2│ │Agent3│ │Agent4│
└──────┘ └──────┘ └──────┘ └──────┘最适用于:
- 清晰的层级结构
- 集中式决策
- 不同的智能体专业化分工
┌─────────────┐
│ Supervisor │
└──────┬──────┘
┌────────┬───┴───┬────────┐
▼ ▼ ▼ ▼
┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐
│Agent1│ │Agent2│ │Agent3│ │Agent4│
└──────┘ └──────┘ └──────┘ └──────┘Peer-to-Peer Pattern
点对点模式
Best for:
- Collaborative agents
- No clear hierarchy
- Flexible communication
┌──────┐ ┌──────┐
│Agent1│◄───►│Agent2│
└──┬───┘ └───┬──┘
│ │
▼ ▼
┌──────┐ ┌──────┐
│Agent3│◄───►│Agent4│
└──────┘ └──────┘最适用于:
- 协作式智能体
- 无清晰层级
- 灵活的通信机制
┌──────┐ ┌──────┐
│Agent1│◄───►│Agent2│
└──┬───┘ └───┬──┘
│ │
▼ ▼
┌──────┐ ┌──────┐
│Agent3│◄───►│Agent4│
└──────┘ └──────┘Handoff Pattern
交接模式
Best for:
- Sequential specialization
- Clear stage transitions
- Different capabilities per stage
┌────────┐ ┌────────┐ ┌────────┐
│Research│───►│Planning│───►│Execute │
└────────┘ └────────┘ └────────┘最适用于:
- 顺序专业化分工
- 清晰的阶段过渡
- 各阶段能力不同
┌────────┐ ┌────────┐ ┌────────┐
│Research│───►│Planning│───►│Execute │
└────────┘ └────────┘ └────────┘Streaming Strategy
流处理策略
Stream Mode Selection
流模式选择
| Mode | Use Case | Data |
|---|---|---|
| UI updates | Node outputs only |
| State inspection | Full state each step |
| Chat UX | LLM tokens |
| Progress/logs | Your data via StreamWriter |
| Debugging | Tasks + checkpoints |
| 模式 | 使用场景 | 数据内容 |
|---|---|---|
| UI更新 | 仅节点输出 |
| 状态检查 | 每步完整状态 |
| 聊天交互 | LLM Token |
| 进度/日志 | 自定义数据(通过StreamWriter) |
| 调试 | 任务+检查点 |
Subgraph Streaming
子图流处理
python
undefinedpython
undefinedStream from subgraphs
从子图获取流
async for chunk in graph.astream(
input,
stream_mode="updates",
subgraphs=True # Include subgraph events
):
namespace, data = chunk # namespace indicates depth
undefinedasync for chunk in graph.astream(
input,
stream_mode="updates",
subgraphs=True # 包含子图事件
):
namespace, data = chunk # namespace表示层级
undefinedHuman-in-the-Loop Design
人机协作循环设计
Interrupt Placement
中断位置
| Strategy | Use Case |
|---|---|
| Approval before action |
| Review after completion |
| Dynamic, contextual pauses |
| 策略 | 使用场景 |
|---|---|
| 操作前需审批 |
| 完成后需审核 |
节点内调用 | 动态、上下文相关的暂停 |
Resume Patterns
恢复模式
python
undefinedpython
undefinedSimple resume (same thread)
简单恢复(同一线程)
graph.invoke(None, config)
graph.invoke(None, config)
Resume with value
携带值恢复
graph.invoke(Command(resume="approved"), config)
graph.invoke(Command(resume="approved"), config)
Resume specific interrupt
恢复指定中断
graph.invoke(Command(resume={interrupt_id: value}), config)
graph.invoke(Command(resume={interrupt_id: value}), config)
Modify state and resume
修改状态后恢复
graph.update_state(config, {"field": "new_value"})
graph.invoke(None, config)
undefinedgraph.update_state(config, {"field": "new_value"})
graph.invoke(None, config)
undefinedError Handling Strategy
错误处理策略
Retry Configuration
重试配置
python
undefinedpython
undefinedPer-node retry
单节点重试
RetryPolicy(
initial_interval=0.5,
backoff_factor=2.0,
max_interval=60.0,
max_attempts=3,
retry_on=lambda e: isinstance(e, (APIError, TimeoutError))
)
RetryPolicy(
initial_interval=0.5,
backoff_factor=2.0,
max_interval=60.0,
max_attempts=3,
retry_on=lambda e: isinstance(e, (APIError, TimeoutError))
)
Multiple policies (first match wins)
多策略(匹配第一个生效)
builder.add_node("node", fn, retry_policy=[
RetryPolicy(retry_on=RateLimitError, max_attempts=5),
RetryPolicy(retry_on=Exception, max_attempts=2),
])
undefinedbuilder.add_node("node", fn, retry_policy=[
RetryPolicy(retry_on=RateLimitError, max_attempts=5),
RetryPolicy(retry_on=Exception, max_attempts=2),
])
undefinedFallback Patterns
降级模式
python
def node_with_fallback(state):
try:
return primary_operation(state)
except PrimaryError:
return fallback_operation(state)python
def node_with_fallback(state):
try:
return primary_operation(state)
except PrimaryError:
return fallback_operation(state)Or use conditional edges for complex fallback routing
或使用条件边实现复杂降级路由
def route_on_error(state) -> Literal["retry", "fallback", "end"]:
if state.get("error") and state["attempts"] < 3:
return "retry"
elif state.get("error"):
return "fallback"
return END
undefineddef route_on_error(state) -> Literal["retry", "fallback", "end"]:
if state.get("error") and state["attempts"] < 3:
return "retry"
elif state.get("error"):
return "fallback"
return END
undefinedScaling Considerations
扩展考量
Horizontal Scaling
水平扩展
- Use PostgresSaver for shared state
- Consider LangGraph Platform for managed infrastructure
- Use stores for large data outside checkpoints
- 使用PostgresSaver实现共享状态
- 考虑使用LangGraph Platform托管基础设施
- 大型数据存入store而非检查点
Performance Optimization
性能优化
- Minimize state size - Use references for large data
- Parallel nodes - Fan out when possible
- Cache expensive operations - Use CachePolicy
- Async everywhere - Use ainvoke, astream
- 最小化状态大小 - 大型数据使用引用
- 并行节点 - 尽可能扇出
- 缓存昂贵操作 - 使用CachePolicy
- 全异步化 - 使用ainvoke、astream
Resource Limits
资源限制
python
undefinedpython
undefinedSet recursion limit
设置递归限制
config = {"recursion_limit": 50}
graph.invoke(input, config)
config = {"recursion_limit": 50}
graph.invoke(input, config)
Track remaining steps in state
在状态中跟踪剩余步数
class State(TypedDict):
remaining_steps: RemainingSteps
def check_budget(state):
if state["remaining_steps"] < 5:
return "wrap_up"
return "continue"
undefinedclass State(TypedDict):
remaining_steps: RemainingSteps
def check_budget(state):
if state["remaining_steps"] < 5:
return "wrap_up"
return "continue"
undefinedDecision Checklist
决策检查清单
Before implementing:
- Is LangGraph the right tool? (vs simpler alternatives)
- State schema defined with appropriate reducers?
- Persistence strategy chosen? (dev vs prod checkpointer)
- Streaming needs identified?
- Human-in-the-loop points defined?
- Error handling and retry strategy?
- Multi-agent coordination pattern? (if applicable)
- Resource limits configured?
实施前确认:
- LangGraph是合适的工具吗?(对比更简单的替代方案)
- 状态模式已使用合适的reducer定义完成?
- 已选择持久化策略?(开发/生产环境检查点)
- 已明确流处理需求?
- 已定义人机协作循环节点?
- 已制定错误处理与重试策略?
- 已确定多智能体协作模式?(如适用)
- 已配置资源限制?