langgraph-implementation
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseLangGraph Implementation
LangGraph 实现
Core Concepts
核心概念
LangGraph builds stateful, multi-actor agent applications using a graph-based architecture:
- StateGraph: Builder class for defining graphs with shared state
- Nodes: Functions that read state and return partial updates
- Edges: Define execution flow (static or conditional)
- Channels: Internal state management (LastValue, BinaryOperatorAggregate)
- Checkpointer: Persistence for pause/resume capabilities
LangGraph 采用基于图的架构构建有状态的多角色智能体应用:
- StateGraph:用于定义带有共享状态的图的构建器类
- Nodes:读取状态并返回部分更新的函数
- Edges:定义执行流程(静态或条件式)
- Channels:内部状态管理(LastValue、BinaryOperatorAggregate)
- Checkpointer:用于实现暂停/恢复功能的持久化组件
Essential Imports
必要导入
python
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import MessagesState, add_messages
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.types import Command, Send, interrupt, RetryPolicy
from typing import Annotated
from typing_extensions import TypedDictpython
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import MessagesState, add_messages
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.types import Command, Send, interrupt, RetryPolicy
from typing import Annotated
from typing_extensions import TypedDictState Schema Patterns
状态模式
Basic State with TypedDict
基于TypedDict的基础状态
python
class State(TypedDict):
counter: int # LastValue - stores last value
messages: Annotated[list, operator.add] # Reducer - appends lists
items: Annotated[list, lambda a, b: a + [b] if b else a] # Custom reducerpython
class State(TypedDict):
counter: int # LastValue - stores last value
messages: Annotated[list, operator.add] # Reducer - appends lists
items: Annotated[list, lambda a, b: a + [b] if b else a] # Custom reducerMessagesState for Chat Applications
聊天应用专用MessagesState
python
from langgraph.graph.message import MessagesState
class State(MessagesState):
# Inherits: messages: Annotated[list[AnyMessage], add_messages]
user_id: str
context: dictpython
from langgraph.graph.message import MessagesState
class State(MessagesState):
# Inherits: messages: Annotated[list[AnyMessage], add_messages]
user_id: str
context: dictPydantic State (for validation)
基于Pydantic的状态(用于验证)
python
from pydantic import BaseModel
class State(BaseModel):
messages: Annotated[list, add_messages]
validated_field: str # Pydantic validates on assignmentpython
from pydantic import BaseModel
class State(BaseModel):
messages: Annotated[list, add_messages]
validated_field: str # Pydantic validates on assignmentBuilding Graphs
构建图
Basic Pattern
基础模式
python
builder = StateGraph(State)python
builder = StateGraph(State)Add nodes - functions that take state, return partial updates
添加节点 - 接收状态并返回部分更新的函数
builder.add_node("process", process_fn)
builder.add_node("decide", decide_fn)
builder.add_node("process", process_fn)
builder.add_node("decide", decide_fn)
Add edges
添加边
builder.add_edge(START, "process")
builder.add_edge("process", "decide")
builder.add_edge("decide", END)
builder.add_edge(START, "process")
builder.add_edge("process", "decide")
builder.add_edge("decide", END)
Compile
编译
graph = builder.compile()
undefinedgraph = builder.compile()
undefinedNode Function Signature
节点函数签名
python
def my_node(state: State) -> dict:
"""Node receives full state, returns partial update."""
return {"counter": state["counter"] + 1}python
def my_node(state: State) -> dict:
"""Node receives full state, returns partial update."""
return {"counter": state["counter"] + 1}With config access
支持访问配置
def my_node(state: State, config: RunnableConfig) -> dict:
thread_id = config["configurable"]["thread_id"]
return {"result": process(state, thread_id)}
def my_node(state: State, config: RunnableConfig) -> dict:
thread_id = config["configurable"]["thread_id"]
return {"result": process(state, thread_id)}
With Runtime context (v0.6+)
支持Runtime上下文(v0.6+)
def my_node(state: State, runtime: Runtime[Context]) -> dict:
user_id = runtime.context.get("user_id")
return {"result": user_id}
undefineddef my_node(state: State, runtime: Runtime[Context]) -> dict:
user_id = runtime.context.get("user_id")
return {"result": user_id}
undefinedConditional Edges
条件边
python
from typing import Literal
def router(state: State) -> Literal["agent", "tools", "__end__"]:
last_msg = state["messages"][-1]
if hasattr(last_msg, "tool_calls") and last_msg.tool_calls:
return "tools"
return END # or "__end__"
builder.add_conditional_edges("agent", router)python
from typing import Literal
def router(state: State) -> Literal["agent", "tools", "__end__"]:
last_msg = state["messages"][-1]
if hasattr(last_msg, "tool_calls") and last_msg.tool_calls:
return "tools"
return END # or "__end__"
builder.add_conditional_edges("agent", router)With path_map for visualization
使用path_map优化可视化
builder.add_conditional_edges(
"agent",
router,
path_map={"agent": "agent", "tools": "tools", "end": END}
)
undefinedbuilder.add_conditional_edges(
"agent",
router,
path_map={"agent": "agent", "tools": "tools", "end": END}
)
undefinedCommand Pattern (Dynamic Routing + State Update)
命令模式(动态路由+状态更新)
python
from langgraph.types import Command
def dynamic_node(state: State) -> Command[Literal["next", "__end__"]]:
if state["should_continue"]:
return Command(goto="next", update={"step": state["step"] + 1})
return Command(goto=END)python
from langgraph.types import Command
def dynamic_node(state: State) -> Command[Literal["next", "__end__"]]:
if state["should_continue"]:
return Command(goto="next", update={"step": state["step"] + 1})
return Command(goto=END)Must declare destinations for visualization
必须声明目标节点以支持可视化
builder.add_node("dynamic", dynamic_node, destinations=["next", END])
undefinedbuilder.add_node("dynamic", dynamic_node, destinations=["next", END])
undefinedSend Pattern (Fan-out/Map-Reduce)
发送模式(扇出/Map-Reduce)
python
from langgraph.types import Send
def fan_out(state: State) -> list[Send]:
"""Route to multiple node instances with different inputs."""
return [Send("worker", {"item": item}) for item in state["items"]]
builder.add_conditional_edges(START, fan_out)
builder.add_edge("worker", "aggregate") # Workers convergepython
from langgraph.types import Send
def fan_out(state: State) -> list[Send]:
"""Route to multiple node instances with different inputs."""
return [Send("worker", {"item": item}) for item in state["items"]]
builder.add_conditional_edges(START, fan_out)
builder.add_edge("worker", "aggregate") # 多个Worker汇聚到同一节点Checkpointing
检查点
Enable Persistence
启用持久化
python
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.checkpoint.sqlite import SqliteSaver # Development
from langgraph.checkpoint.postgres import PostgresSaver # Productionpython
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.checkpoint.sqlite import SqliteSaver # 开发环境
from langgraph.checkpoint.postgres import PostgresSaver # 生产环境In-memory (testing only)
内存存储(仅用于测试)
graph = builder.compile(checkpointer=InMemorySaver())
graph = builder.compile(checkpointer=InMemorySaver())
SQLite (development)
SQLite(开发环境)
with SqliteSaver.from_conn_string("checkpoints.db") as checkpointer:
graph = builder.compile(checkpointer=checkpointer)
with SqliteSaver.from_conn_string("checkpoints.db") as checkpointer:
graph = builder.compile(checkpointer=checkpointer)
Thread-based invocation
基于线程的调用
config = {"configurable": {"thread_id": "user-123"}}
result = graph.invoke({"messages": [...]}, config)
undefinedconfig = {"configurable": {"thread_id": "user-123"}}
result = graph.invoke({"messages": [...]}, config)
undefinedState Management
状态管理
python
undefinedpython
undefinedGet current state
获取当前状态
state = graph.get_state(config)
state = graph.get_state(config)
Get state history
获取状态历史
for state in graph.get_state_history(config):
print(state.values, state.next)
for state in graph.get_state_history(config):
print(state.values, state.next)
Update state manually
手动更新状态
graph.update_state(config, {"key": "new_value"}, as_node="node_name")
undefinedgraph.update_state(config, {"key": "new_value"}, as_node="node_name")
undefinedHuman-in-the-Loop
人机协作
Using interrupt()
使用interrupt()
python
from langgraph.types import interrupt, Command
def review_node(state: State) -> dict:
# Pause and surface value to client
human_input = interrupt({"question": "Please review", "data": state["draft"]})
return {"approved": human_input["approved"]}python
from langgraph.types import interrupt, Command
def review_node(state: State) -> dict:
# 暂停并将内容推送给客户端
human_input = interrupt({"question": "Please review", "data": state["draft"]})
return {"approved": human_input["approved"]}Resume with Command
使用Command恢复执行
graph.invoke(Command(resume={"approved": True}), config)
undefinedgraph.invoke(Command(resume={"approved": True}), config)
undefinedInterrupt Before/After Nodes
在节点执行前后中断
python
graph = builder.compile(
checkpointer=checkpointer,
interrupt_before=["human_review"], # Pause before node
interrupt_after=["agent"], # Pause after node
)python
graph = builder.compile(
checkpointer=checkpointer,
interrupt_before=["human_review"], # 在节点执行前暂停
interrupt_after=["agent"], # 在节点执行后暂停
)Check pending interrupts
检查待处理的中断
state = graph.get_state(config)
if state.next: # Has pending nodes
# Resume
graph.invoke(None, config)
undefinedstate = graph.get_state(config)
if state.next: # 存在待执行节点
# 恢复执行
graph.invoke(None, config)
undefinedStreaming
流式处理
python
undefinedpython
undefinedStream modes: "values", "updates", "custom", "messages", "debug"
流式模式:"values", "updates", "custom", "messages", "debug"
Updates only (node outputs)
仅获取更新(节点输出)
for chunk in graph.stream(input, stream_mode="updates"):
print(chunk) # {"node_name": {"key": "value"}}
for chunk in graph.stream(input, stream_mode="updates"):
print(chunk) # {"node_name": {"key": "value"}}
Full state after each step
每步执行后获取完整状态
for chunk in graph.stream(input, stream_mode="values"):
print(chunk)
for chunk in graph.stream(input, stream_mode="values"):
print(chunk)
Multiple modes
同时使用多种模式
for mode, chunk in graph.stream(input, stream_mode=["updates", "messages"]):
if mode == "messages":
print("Token:", chunk)
for mode, chunk in graph.stream(input, stream_mode=["updates", "messages"]):
if mode == "messages":
print("Token:", chunk)
Custom streaming from within nodes
在节点内自定义流式输出
from langgraph.config import get_stream_writer
def my_node(state):
writer = get_stream_writer()
writer({"progress": 0.5}) # Custom event
return {"result": "done"}
undefinedfrom langgraph.config import get_stream_writer
def my_node(state):
writer = get_stream_writer()
writer({"progress": 0.5}) # 自定义事件
return {"result": "done"}
undefinedSubgraphs
子图
python
undefinedpython
undefinedDefine subgraph
定义子图
sub_builder = StateGraph(SubState)
sub_builder.add_node("step", step_fn)
sub_builder.add_edge(START, "step")
subgraph = sub_builder.compile()
sub_builder = StateGraph(SubState)
sub_builder.add_node("step", step_fn)
sub_builder.add_edge(START, "step")
subgraph = sub_builder.compile()
Use as node in parent
在父图中将子图作为节点使用
parent_builder = StateGraph(ParentState)
parent_builder.add_node("subprocess", subgraph)
parent_builder.add_edge(START, "subprocess")
parent_builder = StateGraph(ParentState)
parent_builder.add_node("subprocess", subgraph)
parent_builder.add_edge(START, "subprocess")
Subgraph checkpointing
子图检查点配置
subgraph = sub_builder.compile(
checkpointer=None, # Inherit from parent (default)
# checkpointer=True, # Use persistent checkpointing
# checkpointer=False, # Disable checkpointing
)
undefinedsubgraph = sub_builder.compile(
checkpointer=None, # 继承父图的配置(默认)
# checkpointer=True, # 使用持久化检查点
# checkpointer=False, # 禁用检查点
)
undefinedRetry and Caching
重试与缓存
python
from langgraph.types import RetryPolicy, CachePolicy
retry = RetryPolicy(
initial_interval=0.5,
backoff_factor=2.0,
max_attempts=3,
retry_on=ValueError, # Or callable: lambda e: isinstance(e, ValueError)
)
cache = CachePolicy(ttl=3600) # Cache for 1 hour
builder.add_node("risky", risky_fn, retry_policy=retry, cache_policy=cache)python
from langgraph.types import RetryPolicy, CachePolicy
retry = RetryPolicy(
initial_interval=0.5,
backoff_factor=2.0,
max_attempts=3,
retry_on=ValueError, # 或使用可调用对象:lambda e: isinstance(e, ValueError)
)
cache = CachePolicy(ttl=3600) # 缓存1小时
builder.add_node("risky", risky_fn, retry_policy=retry, cache_policy=cache)Prebuilt Components
预构建组件
create_react_agent (moved to langchain.agents in v1.0)
create_react_agent(v1.0版本迁移至langchain.agents)
python
from langgraph.prebuilt import create_react_agent, ToolNodepython
from langgraph.prebuilt import create_react_agent, ToolNodeSimple agent
简单智能体
graph = create_react_agent(
model="anthropic:claude-3-5-sonnet",
tools=[my_tool],
prompt="You are a helpful assistant",
checkpointer=InMemorySaver(),
)
graph = create_react_agent(
model="anthropic:claude-3-5-sonnet",
tools=[my_tool],
prompt="You are a helpful assistant",
checkpointer=InMemorySaver(),
)
Custom tool node
自定义工具节点
tool_node = ToolNode([tool1, tool2])
builder.add_node("tools", tool_node)
undefinedtool_node = ToolNode([tool1, tool2])
builder.add_node("tools", tool_node)
undefinedCommon Patterns
常见模式
Agent Loop
智能体循环
python
def should_continue(state) -> Literal["tools", "__end__"]:
if state["messages"][-1].tool_calls:
return "tools"
return END
builder.add_node("agent", call_model)
builder.add_node("tools", ToolNode(tools))
builder.add_edge(START, "agent")
builder.add_conditional_edges("agent", should_continue)
builder.add_edge("tools", "agent")python
def should_continue(state) -> Literal["tools", "__end__"]:
if state["messages"][-1].tool_calls:
return "tools"
return END
builder.add_node("agent", call_model)
builder.add_node("tools", ToolNode(tools))
builder.add_edge(START, "agent")
builder.add_conditional_edges("agent", should_continue)
builder.add_edge("tools", "agent")Parallel Execution
并行执行
python
undefinedpython
undefinedMultiple nodes execute in parallel when they share the same trigger
多个节点在同一触发器下并行执行
builder.add_edge(START, "node_a")
builder.add_edge(START, "node_b") # Runs parallel with node_a
builder.add_edge(["node_a", "node_b"], "join") # Wait for both
See [PATTERNS.md](PATTERNS.md) for advanced patterns including multi-agent systems, hierarchical graphs, and complex workflows.builder.add_edge(START, "node_a")
builder.add_edge(START, "node_b") # 与node_a并行运行
builder.add_edge(["node_a", "node_b"], "join") # 等待两个节点都完成
更多高级模式(包括多智能体系统、分层图和复杂工作流)请查看 [PATTERNS.md](PATTERNS.md)。