langgraph-architecture

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

LangGraph Architecture Decisions

LangGraph 架构决策

When to Use LangGraph

何时使用LangGraph

Use LangGraph When You Need:

以下场景推荐使用LangGraph:

  • Stateful conversations - Multi-turn interactions with memory
  • Human-in-the-loop - Approval gates, corrections, interventions
  • Complex control flow - Loops, branches, conditional routing
  • Multi-agent coordination - Multiple LLMs working together
  • Persistence - Resume from checkpoints, time travel debugging
  • Streaming - Real-time token streaming, progress updates
  • Reliability - Retries, error recovery, durability guarantees
  • 有状态对话 - 带记忆的多轮交互
  • 人机协作循环 - 审批关卡、修正、人工干预
  • 复杂控制流 - 循环、分支、条件路由
  • 多智能体协作 - 多个LLM协同工作
  • 持久化 - 从检查点恢复、时间旅行调试
  • 流处理 - 实时Token流、进度更新
  • 可靠性 - 重试、错误恢复、持久性保障

Consider Alternatives When:

以下场景考虑替代方案:

ScenarioAlternativeWhy
Single LLM callDirect API callOverhead not justified
Linear pipelineLangChain LCELSimpler abstraction
Stateless tool useFunction callingNo persistence needed
Simple RAGLangChain retrieversBuilt-in patterns
Batch processingAsync tasksDifferent execution model
场景替代方案原因
单次LLM调用直接API调用架构开销得不偿失
线性流水线LangChain LCEL更简洁的抽象
无状态工具调用函数调用无需持久化
简单RAGLangChain检索器内置成熟模式
批处理异步任务不同的执行模型

State Schema Decisions

状态模式决策

TypedDict vs Pydantic

TypedDict 与 Pydantic 对比

TypedDictPydantic
Lightweight, fasterRuntime validation
Dict-like accessAttribute access
No validation overheadType coercion
Simpler serializationComplex nested models
Recommendation: Use TypedDict for most cases. Use Pydantic when you need validation or complex nested structures.
TypedDictPydantic
轻量、速度更快运行时验证
类字典访问属性访问
无验证开销类型转换
序列化更简单支持复杂嵌套模型
推荐方案:大多数场景使用TypedDict。当需要验证或复杂嵌套结构时使用Pydantic。

Reducer Selection

Reducer 选择

Use CaseReducerExample
Chat messages
add_messages
Handles IDs, RemoveMessage
Simple append
operator.add
Annotated[list, operator.add]
Keep latestNone (LastValue)
field: str
Custom mergeLambda
Annotated[list, lambda a, b: ...]
Overwrite list
Overwrite
Bypass reducer
使用场景Reducer示例
聊天消息
add_messages
处理ID、RemoveMessage
简单追加
operator.add
Annotated[list, operator.add]
保留最新值None (LastValue)
field: str
自定义合并Lambda
Annotated[list, lambda a, b: ...]
覆盖列表
Overwrite
绕过reducer

State Size Considerations

状态大小考量

python
undefined
python
undefined

SMALL STATE (< 1MB) - Put in state

小型状态 (< 1MB) - 存入状态

class State(TypedDict): messages: Annotated[list, add_messages] context: str
class State(TypedDict): messages: Annotated[list, add_messages] context: str

LARGE DATA - Use Store

大型数据 - 使用Store

class State(TypedDict): messages: Annotated[list, add_messages] document_ref: str # Reference to store
def node(state, *, store: BaseStore): doc = store.get(namespace, state["document_ref"]) # Process without bloating checkpoints
undefined
class State(TypedDict): messages: Annotated[list, add_messages] document_ref: str # 指向store的引用
def node(state, *, store: BaseStore): doc = store.get(namespace, state["document_ref"]) # 处理时避免膨胀检查点
undefined

Graph Structure Decisions

图结构决策

Single Graph vs Subgraphs

单图与子图

Single Graph when:
  • All nodes share the same state schema
  • Simple linear or branching flow
  • < 10 nodes
Subgraphs when:
  • Different state schemas needed
  • Reusable components across graphs
  • Team separation of concerns
  • Complex hierarchical workflows
单图适用于:
  • 所有节点共享相同的状态模式
  • 简单的线性或分支流
  • 节点数<10个
子图适用于:
  • 需要不同的状态模式
  • 可在多个图中复用的组件
  • 团队职责分离
  • 复杂的分层工作流

Conditional Edges vs Command

条件边与命令

Conditional EdgesCommand
Routing based on stateRouting + state update
Separate router functionDecision in node
Clearer visualizationMore flexible
Standard patternsDynamic destinations
python
undefined
条件边命令
基于状态的路由路由+状态更新
独立的路由函数在节点内做决策
可视化更清晰灵活性更高
标准模式动态目标
python
undefined

Conditional Edge - when routing is the focus

条件边 - 重点在路由时使用

def router(state) -> Literal["a", "b"]: return "a" if condition else "b" builder.add_conditional_edges("node", router)
def router(state) -> Literal["a", "b"]: return "a" if condition else "b" builder.add_conditional_edges("node", router)

Command - when combining routing with updates

命令 - 同时处理路由与更新时使用

def node(state) -> Command: return Command(goto="next", update={"step": state["step"] + 1})
undefined
def node(state) -> Command: return Command(goto="next", update={"step": state["step"] + 1})
undefined

Static vs Dynamic Routing

静态与动态路由

Static Edges (
add_edge
):
  • Fixed flow known at build time
  • Clearer graph visualization
  • Easier to reason about
Dynamic Routing (
add_conditional_edges
,
Command
,
Send
):
  • Runtime decisions based on state
  • Agent-driven navigation
  • Fan-out patterns
静态边 (
add_edge
):
  • 构建时已确定固定流
  • 图可视化更清晰
  • 更易推理
动态路由 (
add_conditional_edges
,
Command
,
Send
):
  • 基于状态的运行时决策
  • 智能体驱动的导航
  • 扇出模式

Persistence Strategy

持久化策略

Checkpointer Selection

检查点选择器

CheckpointerUse CaseCharacteristics
InMemorySaver
Testing onlyLost on restart
SqliteSaver
DevelopmentSingle file, local
PostgresSaver
ProductionScalable, concurrent
CustomSpecial needsImplement BaseCheckpointSaver
检查点选择器使用场景特性
InMemorySaver
仅用于测试重启后丢失数据
SqliteSaver
开发环境单文件、本地存储
PostgresSaver
生产环境可扩展、支持并发
自定义特殊需求实现BaseCheckpointSaver接口

Checkpointing Scope

检查点范围

python
undefined
python
undefined

Full persistence (default)

全量持久化(默认)

graph = builder.compile(checkpointer=checkpointer)
graph = builder.compile(checkpointer=checkpointer)

Subgraph options

子图选项

subgraph = sub_builder.compile( checkpointer=None, # Inherit from parent checkpointer=True, # Independent checkpointing checkpointer=False, # No checkpointing (runs atomically) )
undefined
subgraph = sub_builder.compile( checkpointer=None, # 继承父图配置 checkpointer=True, # 独立检查点 checkpointer=False, # 无检查点(原子性运行) )
undefined

When to Disable Checkpointing

何时禁用检查点

  • Short-lived subgraphs that should be atomic
  • Subgraphs with incompatible state schemas
  • Performance-critical paths without need for resume
  • 应原子性运行的短生命周期子图
  • 状态模式不兼容的子图
  • 性能关键路径且无需恢复功能

Multi-Agent Architecture

多智能体架构

Supervisor Pattern

监督者模式

Best for:
  • Clear hierarchy
  • Centralized decision making
  • Different agent specializations
          ┌─────────────┐
          │  Supervisor │
          └──────┬──────┘
    ┌────────┬───┴───┬────────┐
    ▼        ▼       ▼        ▼
┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐
│Agent1│ │Agent2│ │Agent3│ │Agent4│
└──────┘ └──────┘ └──────┘ └──────┘
最适用于:
  • 清晰的层级结构
  • 集中式决策
  • 不同的智能体专业化分工
          ┌─────────────┐
          │  Supervisor │
          └──────┬──────┘
    ┌────────┬───┴───┬────────┐
    ▼        ▼       ▼        ▼
┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐
│Agent1│ │Agent2│ │Agent3│ │Agent4│
└──────┘ └──────┘ └──────┘ └──────┘

Peer-to-Peer Pattern

点对点模式

Best for:
  • Collaborative agents
  • No clear hierarchy
  • Flexible communication
┌──────┐     ┌──────┐
│Agent1│◄───►│Agent2│
└──┬───┘     └───┬──┘
   │             │
   ▼             ▼
┌──────┐     ┌──────┐
│Agent3│◄───►│Agent4│
└──────┘     └──────┘
最适用于:
  • 协作式智能体
  • 无清晰层级
  • 灵活的通信机制
┌──────┐     ┌──────┐
│Agent1│◄───►│Agent2│
└──┬───┘     └───┬──┘
   │             │
   ▼             ▼
┌──────┐     ┌──────┐
│Agent3│◄───►│Agent4│
└──────┘     └──────┘

Handoff Pattern

交接模式

Best for:
  • Sequential specialization
  • Clear stage transitions
  • Different capabilities per stage
┌────────┐    ┌────────┐    ┌────────┐
│Research│───►│Planning│───►│Execute │
└────────┘    └────────┘    └────────┘
最适用于:
  • 顺序专业化分工
  • 清晰的阶段过渡
  • 各阶段能力不同
┌────────┐    ┌────────┐    ┌────────┐
│Research│───►│Planning│───►│Execute │
└────────┘    └────────┘    └────────┘

Streaming Strategy

流处理策略

Stream Mode Selection

流模式选择

ModeUse CaseData
updates
UI updatesNode outputs only
values
State inspectionFull state each step
messages
Chat UXLLM tokens
custom
Progress/logsYour data via StreamWriter
debug
DebuggingTasks + checkpoints
模式使用场景数据内容
updates
UI更新仅节点输出
values
状态检查每步完整状态
messages
聊天交互LLM Token
custom
进度/日志自定义数据(通过StreamWriter)
debug
调试任务+检查点

Subgraph Streaming

子图流处理

python
undefined
python
undefined

Stream from subgraphs

从子图获取流

async for chunk in graph.astream( input, stream_mode="updates", subgraphs=True # Include subgraph events ): namespace, data = chunk # namespace indicates depth
undefined
async for chunk in graph.astream( input, stream_mode="updates", subgraphs=True # 包含子图事件 ): namespace, data = chunk # namespace表示层级
undefined

Human-in-the-Loop Design

人机协作循环设计

Interrupt Placement

中断位置

StrategyUse Case
interrupt_before
Approval before action
interrupt_after
Review after completion
interrupt()
in node
Dynamic, contextual pauses
策略使用场景
interrupt_before
操作前需审批
interrupt_after
完成后需审核
节点内调用
interrupt()
动态、上下文相关的暂停

Resume Patterns

恢复模式

python
undefined
python
undefined

Simple resume (same thread)

简单恢复(同一线程)

graph.invoke(None, config)
graph.invoke(None, config)

Resume with value

携带值恢复

graph.invoke(Command(resume="approved"), config)
graph.invoke(Command(resume="approved"), config)

Resume specific interrupt

恢复指定中断

graph.invoke(Command(resume={interrupt_id: value}), config)
graph.invoke(Command(resume={interrupt_id: value}), config)

Modify state and resume

修改状态后恢复

graph.update_state(config, {"field": "new_value"}) graph.invoke(None, config)
undefined
graph.update_state(config, {"field": "new_value"}) graph.invoke(None, config)
undefined

Error Handling Strategy

错误处理策略

Retry Configuration

重试配置

python
undefined
python
undefined

Per-node retry

单节点重试

RetryPolicy( initial_interval=0.5, backoff_factor=2.0, max_interval=60.0, max_attempts=3, retry_on=lambda e: isinstance(e, (APIError, TimeoutError)) )
RetryPolicy( initial_interval=0.5, backoff_factor=2.0, max_interval=60.0, max_attempts=3, retry_on=lambda e: isinstance(e, (APIError, TimeoutError)) )

Multiple policies (first match wins)

多策略(匹配第一个生效)

builder.add_node("node", fn, retry_policy=[ RetryPolicy(retry_on=RateLimitError, max_attempts=5), RetryPolicy(retry_on=Exception, max_attempts=2), ])
undefined
builder.add_node("node", fn, retry_policy=[ RetryPolicy(retry_on=RateLimitError, max_attempts=5), RetryPolicy(retry_on=Exception, max_attempts=2), ])
undefined

Fallback Patterns

降级模式

python
def node_with_fallback(state):
    try:
        return primary_operation(state)
    except PrimaryError:
        return fallback_operation(state)
python
def node_with_fallback(state):
    try:
        return primary_operation(state)
    except PrimaryError:
        return fallback_operation(state)

Or use conditional edges for complex fallback routing

或使用条件边实现复杂降级路由

def route_on_error(state) -> Literal["retry", "fallback", "end"]: if state.get("error") and state["attempts"] < 3: return "retry" elif state.get("error"): return "fallback" return END
undefined
def route_on_error(state) -> Literal["retry", "fallback", "end"]: if state.get("error") and state["attempts"] < 3: return "retry" elif state.get("error"): return "fallback" return END
undefined

Scaling Considerations

扩展考量

Horizontal Scaling

水平扩展

  • Use PostgresSaver for shared state
  • Consider LangGraph Platform for managed infrastructure
  • Use stores for large data outside checkpoints
  • 使用PostgresSaver实现共享状态
  • 考虑使用LangGraph Platform托管基础设施
  • 大型数据存入store而非检查点

Performance Optimization

性能优化

  1. Minimize state size - Use references for large data
  2. Parallel nodes - Fan out when possible
  3. Cache expensive operations - Use CachePolicy
  4. Async everywhere - Use ainvoke, astream
  1. 最小化状态大小 - 大型数据使用引用
  2. 并行节点 - 尽可能扇出
  3. 缓存昂贵操作 - 使用CachePolicy
  4. 全异步化 - 使用ainvoke、astream

Resource Limits

资源限制

python
undefined
python
undefined

Set recursion limit

设置递归限制

config = {"recursion_limit": 50} graph.invoke(input, config)
config = {"recursion_limit": 50} graph.invoke(input, config)

Track remaining steps in state

在状态中跟踪剩余步数

class State(TypedDict): remaining_steps: RemainingSteps
def check_budget(state): if state["remaining_steps"] < 5: return "wrap_up" return "continue"
undefined
class State(TypedDict): remaining_steps: RemainingSteps
def check_budget(state): if state["remaining_steps"] < 5: return "wrap_up" return "continue"
undefined

Decision Checklist

决策检查清单

Before implementing:
  1. Is LangGraph the right tool? (vs simpler alternatives)
  2. State schema defined with appropriate reducers?
  3. Persistence strategy chosen? (dev vs prod checkpointer)
  4. Streaming needs identified?
  5. Human-in-the-loop points defined?
  6. Error handling and retry strategy?
  7. Multi-agent coordination pattern? (if applicable)
  8. Resource limits configured?
实施前确认:
  1. LangGraph是合适的工具吗?(对比更简单的替代方案)
  2. 状态模式已使用合适的reducer定义完成?
  3. 已选择持久化策略?(开发/生产环境检查点)
  4. 已明确流处理需求?
  5. 已定义人机协作循环节点?
  6. 已制定错误处理与重试策略?
  7. 已确定多智能体协作模式?(如适用)
  8. 已配置资源限制?