langgraph-human-in-the-loop
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
Chinese<overview>
LangGraph's human-in-the-loop patterns let you pause graph execution, surface data to users, and resume with their input:
- — pauses execution, surfaces a value to the caller
interrupt(value) - — resumes execution, providing the value back to
Command(resume=value)interrupt() - Checkpointer — required to save state while paused
- Thread ID — required to identify which paused execution to resume </overview>
<overview>
LangGraph的人机协同循环模式允许你暂停图执行、向用户展示数据,并根据用户输入恢复执行:
- — 暂停执行,向调用者展示指定值
interrupt(value) - — 恢复执行,将值传回给
Command(resume=value)interrupt() - Checkpointer — 暂停时保存状态的必备组件
- Thread ID — 用于标识要恢复的暂停执行任务 </overview>
Requirements
必要条件
Three things are required for interrupts to work:
- Checkpointer — compile with (dev) or
checkpointer=InMemorySaver()(prod)PostgresSaver - Thread ID — pass to every
{"configurable": {"thread_id": "..."}}/invokecallstream - JSON-serializable payload — the value passed to must be JSON-serializable
interrupt()
要使中断功能正常工作,需要满足三个条件:
- Checkpointer — 编译时需配置(开发环境)或
checkpointer=InMemorySaver()(生产环境)PostgresSaver - Thread ID — 每次调用/
invoke时都需传入stream{"configurable": {"thread_id": "..."}} - 可JSON序列化的负载 — 传入的值必须支持JSON序列化
interrupt()
Basic Interrupt + Resume
基础中断与恢复
interrupt(value)__interrupt__Command(resume=value)interrupt()Critical: when the graph resumes, the node restarts from the beginning — all code before re-runs.
<ex-basic-interrupt-resume>
<python>
Pause execution for human review and resume with Command.
```python
from langgraph.types import interrupt, Command
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict
interrupt()class State(TypedDict):
approved: bool
def approval_node(state: State):
# Pause and ask for approval
approved = interrupt("Do you approve this action?")
# When resumed, Command(resume=...) returns that value here
return {"approved": approved}
checkpointer = InMemorySaver()
graph = (
StateGraph(State)
.add_node("approval", approval_node)
.add_edge(START, "approval")
.add_edge("approval", END)
.compile(checkpointer=checkpointer)
)
config = {"configurable": {"thread_id": "thread-1"}}
interrupt(value)__interrupt__Command(resume=value)interrupt()重点注意:当图恢复执行时,节点会从起始位置重新运行——之前的所有代码都会再次执行。
<ex-basic-interrupt-resume>
<python>
暂停执行以等待人工审核,通过Command恢复执行。
```python
from langgraph.types import interrupt, Command
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict
interrupt()class State(TypedDict):
approved: bool
def approval_node(state: State):
# Pause and ask for approval
approved = interrupt("Do you approve this action?")
# When resumed, Command(resume=...) returns that value here
return {"approved": approved}
checkpointer = InMemorySaver()
graph = (
StateGraph(State)
.add_node("approval", approval_node)
.add_edge(START, "approval")
.add_edge("approval", END)
.compile(checkpointer=checkpointer)
)
config = {"configurable": {"thread_id": "thread-1"}}
Initial run — hits interrupt and pauses
Initial run — hits interrupt and pauses
result = graph.invoke({"approved": False}, config)
print(result["interrupt"])
result = graph.invoke({"approved": False}, config)
print(result["interrupt"])
[Interrupt(value='Do you approve this action?')]
[Interrupt(value='Do you approve this action?')]
Resume with the human's response
Resume with the human's response
result = graph.invoke(Command(resume=True), config)
print(result["approved"]) # True
</python>
<typescript>
Pause execution for human review and resume with Command.
```typescript
import { interrupt, Command, MemorySaver, StateGraph, StateSchema, START, END } from "@langchain/langgraph";
import { z } from "zod";
const State = new StateSchema({
approved: z.boolean().default(false),
});
const approvalNode = async (state: typeof State.State) => {
// Pause and ask for approval
const approved = interrupt("Do you approve this action?");
// When resumed, Command({ resume }) returns that value here
return { approved };
};
const checkpointer = new MemorySaver();
const graph = new StateGraph(State)
.addNode("approval", approvalNode)
.addEdge(START, "approval")
.addEdge("approval", END)
.compile({ checkpointer });
const config = { configurable: { thread_id: "thread-1" } };
// Initial run — hits interrupt and pauses
let result = await graph.invoke({ approved: false }, config);
console.log(result.__interrupt__);
// [{ value: 'Do you approve this action?', ... }]
// Resume with the human's response
result = await graph.invoke(new Command({ resume: true }), config);
console.log(result.approved); // trueresult = graph.invoke(Command(resume=True), config)
print(result["approved"]) # True
</python>
<typescript>
暂停执行以等待人工审核,通过Command恢复执行。
```typescript
import { interrupt, Command, MemorySaver, StateGraph, StateSchema, START, END } from "@langchain/langgraph";
import { z } from "zod";
const State = new StateSchema({
approved: z.boolean().default(false),
});
const approvalNode = async (state: typeof State.State) => {
// Pause and ask for approval
const approved = interrupt("Do you approve this action?");
// When resumed, Command({ resume }) returns that value here
return { approved };
};
const checkpointer = new MemorySaver();
const graph = new StateGraph(State)
.addNode("approval", approvalNode)
.addEdge(START, "approval")
.addEdge("approval", END)
.compile({ checkpointer });
const config = { configurable: { thread_id: "thread-1" } };
// Initial run — hits interrupt and pauses
let result = await graph.invoke({ approved: false }, config);
console.log(result.__interrupt__);
// [{ value: 'Do you approve this action?', ... }]
// Resume with the human's response
result = await graph.invoke(new Command({ resume: true }), config);
console.log(result.approved); // trueApproval Workflow
审批工作流
A common pattern: interrupt to show a draft, then route based on the human's decision.
<ex-approval-workflow>
<python>
Interrupt for human review, then route to send or end based on the decision.
```python
from langgraph.types import interrupt, Command
from langgraph.graph import StateGraph, START, END
from typing import Literal
from typing_extensions import TypedDict
class EmailAgentState(TypedDict):
email_content: str
draft_response: str
classification: dict
def human_review(state: EmailAgentState) -> Command[Literal["send_reply", "end"]]:
"""Pause for human review using interrupt and route based on decision."""
classification = state.get("classification", {})
# interrupt() must come first — any code before it will re-run on resume
human_decision = interrupt({
"email_id": state.get("email_content", ""),
"draft_response": state.get("draft_response", ""),
"urgency": classification.get("urgency"),
"action": "Please review and approve/edit this response"
})
# Process the human's decision
if human_decision.get("approved"):
return Command(
update={"draft_response": human_decision.get("edited_response", state.get("draft_response", ""))},
goto="send_reply"
)
else:
# Rejection — human will handle directly
return Command(update={}, goto=END)</python>
<typescript>
Interrupt for human review, then route to send or end based on the decision.
```typescript
import { interrupt, Command, END, GraphNode } from "@langchain/langgraph";
const humanReview: GraphNode<typeof EmailAgentState> = async (state) => {
const classification = state.classification!;
// interrupt() must come first — any code before it will re-run on resume
const humanDecision = interrupt({
emailId: state.emailContent,
draftResponse: state.responseText,
urgency: classification.urgency,
action: "Please review and approve/edit this response",
});
// Process the human's decision
if (humanDecision.approved) {
return new Command({
update: { responseText: humanDecision.editedResponse || state.responseText },
goto: "sendReply",
});
} else {
return new Command({ update: {}, goto: END });
}
};常见模式:中断执行以展示草稿,然后根据人工决策进行路由。
<ex-approval-workflow>
<python>
中断执行等待人工审核,然后根据决策路由至发送或结束节点。
```python
from langgraph.types import interrupt, Command
from langgraph.graph import StateGraph, START, END
from typing import Literal
from typing_extensions import TypedDict
class EmailAgentState(TypedDict):
email_content: str
draft_response: str
classification: dict
def human_review(state: EmailAgentState) -> Command[Literal["send_reply", "end"]]:
"""Pause for human review using interrupt and route based on decision."""
classification = state.get("classification", {})
# interrupt() must come first — any code before it will re-run on resume
human_decision = interrupt({
"email_id": state.get("email_content", ""),
"draft_response": state.get("draft_response", ""),
"urgency": classification.get("urgency"),
"action": "Please review and approve/edit this response"
})
# Process the human's decision
if human_decision.get("approved"):
return Command(
update={"draft_response": human_decision.get("edited_response", state.get("draft_response", ""))},
goto="send_reply"
)
else:
# Rejection — human will handle directly
return Command(update={}, goto=END)</python>
<typescript>
中断执行等待人工审核,然后根据决策路由至发送或结束节点。
```typescript
import { interrupt, Command, END, GraphNode } from "@langchain/langgraph";
const humanReview: GraphNode<typeof EmailAgentState> = async (state) => {
const classification = state.classification!;
// interrupt() must come first — any code before it will re-run on resume
const humanDecision = interrupt({
emailId: state.emailContent,
draftResponse: state.responseText,
urgency: classification.urgency,
action: "Please review and approve/edit this response",
});
// Process the human's decision
if (humanDecision.approved) {
return new Command({
update: { responseText: humanDecision.editedResponse || state.responseText },
goto: "sendReply",
});
} else {
return new Command({ update: {}, goto: END });
}
};Validation Loop
验证循环
Use in a loop to validate human input and re-prompt if invalid.
<ex-validation-loop>
<python>
Validate human input in a loop, re-prompting until valid.
```python
from langgraph.types import interrupt
interrupt()def get_age_node(state):
prompt = "What is your age?"
while True:
answer = interrupt(prompt)
# Validate the input
if isinstance(answer, int) and answer > 0:
break
else:
# Invalid input — ask again with a more specific prompt
prompt = f"'{answer}' is not a valid age. Please enter a positive number."
return {"age": answer}
Each `Command(resume=...)` call provides the next answer. If invalid, the loop re-interrupts with a clearer message.
```python
config = {"configurable": {"thread_id": "form-1"}}
first = graph.invoke({"age": None}, config)在循环中使用验证人工输入,若输入无效则重新提示。
<ex-validation-loop>
<python>
在循环中验证人工输入,若无效则重新提示。
```python
from langgraph.types import interrupt
interrupt()def get_age_node(state):
prompt = "What is your age?"
while True:
answer = interrupt(prompt)
# Validate the input
if isinstance(answer, int) and answer > 0:
break
else:
# Invalid input — ask again with a more specific prompt
prompt = f"'{answer}' is not a valid age. Please enter a positive number."
return {"age": answer}
每次调用`Command(resume=...)`都会传入下一个答案。若输入无效,循环会再次中断并展示更明确的提示。
```python
config = {"configurable": {"thread_id": "form-1"}}
first = graph.invoke({"age": None}, config)interrupt: "What is your age?"
interrupt: "What is your age?"
retry = graph.invoke(Command(resume="thirty"), config)
retry = graph.invoke(Command(resume="thirty"), config)
interrupt: "'thirty' is not a valid age..."
interrupt: "'thirty' is not a valid age..."
final = graph.invoke(Command(resume=30), config)
print(final["age"]) # 30
</python>
<typescript>
Validate human input in a loop, re-prompting until valid.
```typescript
import { interrupt } from "@langchain/langgraph";
const getAgeNode = (state: typeof State.State) => {
let prompt = "What is your age?";
while (true) {
const answer = interrupt(prompt);
// Validate the input
if (typeof answer === "number" && answer > 0) {
return { age: answer };
} else {
// Invalid input — ask again with a more specific prompt
prompt = `'${answer}' is not a valid age. Please enter a positive number.`;
}
}
};final = graph.invoke(Command(resume=30), config)
print(final["age"]) # 30
</python>
<typescript>
在循环中验证人工输入,若无效则重新提示。
```typescript
import { interrupt } from "@langchain/langgraph";
const getAgeNode = (state: typeof State.State) => {
let prompt = "What is your age?";
while (true) {
const answer = interrupt(prompt);
// Validate the input
if (typeof answer === "number" && answer > 0) {
return { age: answer };
} else {
// Invalid input — ask again with a more specific prompt
prompt = `'${answer}' is not a valid age. Please enter a positive number.`;
}
}
};Multiple Interrupts
多中断场景
When parallel branches each call , resume all of them in a single invocation by mapping each interrupt ID to its resume value.
<ex-multiple-interrupts>
<python>
Resume multiple parallel interrupts by mapping interrupt IDs to values.
```python
from typing import Annotated, TypedDict
import operator
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import START, END, StateGraph
from langgraph.types import Command, interrupt
interrupt()class State(TypedDict):
vals: Annotated[list[str], operator.add]
def node_a(state):
answer = interrupt("question_a")
return {"vals": [f"a:{answer}"]}
def node_b(state):
answer = interrupt("question_b")
return {"vals": [f"b:{answer}"]}
graph = (
StateGraph(State)
.add_node("a", node_a)
.add_node("b", node_b)
.add_edge(START, "a")
.add_edge(START, "b")
.add_edge("a", END)
.add_edge("b", END)
.compile(checkpointer=InMemorySaver())
)
config = {"configurable": {"thread_id": "1"}}
当并行分支各自调用时,可通过映射每个中断ID到对应恢复值,在一次调用中恢复所有中断。
<ex-multiple-interrupts>
<python>
通过中断ID与值的映射,一次性恢复多个并行中断。
```python
from typing import Annotated, TypedDict
import operator
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import START, END, StateGraph
from langgraph.types import Command, interrupt
interrupt()class State(TypedDict):
vals: Annotated[list[str], operator.add]
def node_a(state):
answer = interrupt("question_a")
return {"vals": [f"a:{answer}"]}
def node_b(state):
answer = interrupt("question_b")
return {"vals": [f"b:{answer}"]}
graph = (
StateGraph(State)
.add_node("a", node_a)
.add_node("b", node_b)
.add_edge(START, "a")
.add_edge(START, "b")
.add_edge("a", END)
.add_edge("b", END)
.compile(checkpointer=InMemorySaver())
)
config = {"configurable": {"thread_id": "1"}}
Both parallel nodes hit interrupt() and pause
Both parallel nodes hit interrupt() and pause
result = graph.invoke({"vals": []}, config)
result = graph.invoke({"vals": []}, config)
result["interrupt"] contains both Interrupt objects with IDs
result["interrupt"] contains both Interrupt objects with IDs
Resume all pending interrupts at once using a map of id -> value
Resume all pending interrupts at once using a map of id -> value
resume_map = {
i.id: f"answer for {i.value}"
for i in result["interrupt"]
}
result = graph.invoke(Command(resume=resume_map), config)
resume_map = {
i.id: f"answer for {i.value}"
for i in result["interrupt"]
}
result = graph.invoke(Command(resume=resume_map), config)
result["vals"] = ["a:answer for question_a", "b:answer for question_b"]
result["vals"] = ["a:answer for question_a", "b:answer for question_b"]
</python>
<typescript>
Resume multiple parallel interrupts by mapping interrupt IDs to values.
```typescript
import { Command, END, MemorySaver, START, StateGraph, interrupt, isInterrupted, INTERRUPT, Annotation } from "@langchain/langgraph";
const State = Annotation.Root({
vals: Annotation<string[]>({
reducer: (left, right) => left.concat(Array.isArray(right) ? right : [right]),
default: () => [],
}),
});
function nodeA(_state: typeof State.State) {
const answer = interrupt("question_a") as string;
return { vals: [`a:${answer}`] };
}
function nodeB(_state: typeof State.State) {
const answer = interrupt("question_b") as string;
return { vals: [`b:${answer}`] };
}
const graph = new StateGraph(State)
.addNode("a", nodeA)
.addNode("b", nodeB)
.addEdge(START, "a")
.addEdge(START, "b")
.addEdge("a", END)
.addEdge("b", END)
.compile({ checkpointer: new MemorySaver() });
const config = { configurable: { thread_id: "1" } };
const interruptedResult = await graph.invoke({ vals: [] }, config);
// Resume all pending interrupts at once
const resumeMap: Record<string, string> = {};
if (isInterrupted(interruptedResult)) {
for (const i of interruptedResult[INTERRUPT]) {
if (i.id != null) {
resumeMap[i.id] = `answer for ${i.value}`;
}
}
}
const result = await graph.invoke(new Command({ resume: resumeMap }), config);
// result.vals = ["a:answer for question_a", "b:answer for question_b"]User-fixable errors use to pause and collect missing data — that's the pattern covered by this skill. For the full 4-tier error handling strategy (RetryPolicy, Command error loops, etc.), see the fundamentals skill.
interrupt()</python>
<typescript>
通过中断ID与值的映射,一次性恢复多个并行中断。
```typescript
import { Command, END, MemorySaver, START, StateGraph, interrupt, isInterrupted, INTERRUPT, Annotation } from "@langchain/langgraph";
const State = Annotation.Root({
vals: Annotation<string[]>({
reducer: (left, right) => left.concat(Array.isArray(right) ? right : [right]),
default: () => [],
}),
});
function nodeA(_state: typeof State.State) {
const answer = interrupt("question_a") as string;
return { vals: [`a:${answer}`] };
}
function nodeB(_state: typeof State.State) {
const answer = interrupt("question_b") as string;
return { vals: [`b:${answer}`] };
}
const graph = new StateGraph(State)
.addNode("a", nodeA)
.addNode("b", nodeB)
.addEdge(START, "a")
.addEdge(START, "b")
.addEdge("a", END)
.addEdge("b", END)
.compile({ checkpointer: new MemorySaver() });
const config = { configurable: { thread_id: "1" } };
const interruptedResult = await graph.invoke({ vals: [] }, config);
// Resume all pending interrupts at once
const resumeMap: Record<string, string> = {};
if (isInterrupted(interruptedResult)) {
for (const i of interruptedResult[INTERRUPT]) {
if (i.id != null) {
resumeMap[i.id] = `answer for ${i.value}`;
}
}
}
const result = await graph.invoke(new Command({ resume: resumeMap }), config);
// result.vals = ["a:answer for question_a", "b:answer for question_b"]用户可修复的错误可通过暂停执行并收集缺失数据——这正是本Skill所覆盖的模式。如需了解完整的四层错误处理策略(RetryPolicy、Command错误循环等),请查看基础Skill文档。
interrupt()Side Effects Before Interrupt Must Be Idempotent
中断前的副作用操作必须具备幂等性
When the graph resumes, the node restarts from the beginning — ALL code before re-runs. In subgraphs, BOTH the parent node and the subgraph node re-execute.
<idempotency-rules>
interrupt()Do:
- Use upsert (not insert) operations before
interrupt() - Use check-before-create patterns
- Place side effects after when possible
interrupt() - Separate side effects into their own nodes
Don't:
- Create new records before — duplicates on each resume
interrupt() - Append to lists before — duplicate entries on each resume
interrupt()
当图恢复执行时,节点会从起始位置重新运行——之前的所有代码都会再次执行。在子图中,父节点和子图节点都会重新执行。
<idempotency-rules>
interrupt()推荐做法:
- 在前使用**更新插入(upsert)**操作而非插入(insert)
interrupt() - 采用先检查再创建的模式
- 尽可能将副作用操作放在之后
interrupt() - 将副作用操作拆分到独立节点中
禁止做法:
- 在前创建新记录——恢复时会产生重复数据
interrupt() - 在前向列表追加内容——恢复时会产生重复条目
interrupt()
GOOD: Upsert is idempotent — safe before interrupt
GOOD: Upsert is idempotent — safe before interrupt
def node_a(state: State):
db.upsert_user(user_id=state["user_id"], status="pending_approval")
approved = interrupt("Approve this change?")
return {"approved": approved}
def node_a(state: State):
db.upsert_user(user_id=state["user_id"], status="pending_approval")
approved = interrupt("Approve this change?")
return {"approved": approved}
GOOD: Side effect AFTER interrupt — only runs once
GOOD: Side effect AFTER interrupt — only runs once
def node_a(state: State):
approved = interrupt("Approve this change?")
if approved:
db.create_audit_log(user_id=state["user_id"], action="approved")
return {"approved": approved}
def node_a(state: State):
approved = interrupt("Approve this change?")
if approved:
db.create_audit_log(user_id=state["user_id"], action="approved")
return {"approved": approved}
BAD: Insert creates duplicates on each resume!
BAD: Insert creates duplicates on each resume!
def node_a(state: State):
audit_id = db.create_audit_log({ # Runs again on resume!
"user_id": state["user_id"],
"action": "pending_approval",
})
approved = interrupt("Approve this change?")
return {"approved": approved}
</python>
<typescript>
Idempotent operations before interrupt vs non-idempotent (wrong).
```typescript
// GOOD: Upsert is idempotent — safe before interrupt
const nodeA = async (state: typeof State.State) => {
await db.upsertUser({ userId: state.userId, status: "pending_approval" });
const approved = interrupt("Approve this change?");
return { approved };
};
// GOOD: Side effect AFTER interrupt — only runs once
const nodeA = async (state: typeof State.State) => {
const approved = interrupt("Approve this change?");
if (approved) {
await db.createAuditLog({ userId: state.userId, action: "approved" });
}
return { approved };
};
// BAD: Insert creates duplicates on each resume!
const nodeA = async (state: typeof State.State) => {
await db.createAuditLog({ // Runs again on resume!
userId: state.userId,
action: "pending_approval",
});
const approved = interrupt("Approve this change?");
return { approved };
};def node_a(state: State):
audit_id = db.create_audit_log({ # Runs again on resume!
"user_id": state["user_id"],
"action": "pending_approval",
})
approved = interrupt("Approve this change?")
return {"approved": approved}
</python>
<typescript>
中断前的幂等操作与非幂等操作(错误示例)对比。
```typescript
// GOOD: Upsert is idempotent — safe before interrupt
const nodeA = async (state: typeof State.State) => {
await db.upsertUser({ userId: state.userId, status: "pending_approval" });
const approved = interrupt("Approve this change?");
return { approved };
};
// GOOD: Side effect AFTER interrupt — only runs once
const nodeA = async (state: typeof State.State) => {
const approved = interrupt("Approve this change?");
if (approved) {
await db.createAuditLog({ userId: state.userId, action: "approved" });
}
return { approved };
};
// BAD: Insert creates duplicates on each resume!
const nodeA = async (state: typeof State.State) => {
await db.createAuditLog({ // Runs again on resume!
userId: state.userId,
action: "pending_approval",
});
const approved = interrupt("Approve this change?");
return { approved };
};Subgraph re-execution on resume
子图恢复时的重新执行
When a subgraph contains an , resuming re-executes BOTH the parent node (that invoked the subgraph) AND the subgraph node (that called ):
<python>
```python
def node_in_parent_graph(state: State):
some_code() # <-- Re-executes on resume
subgraph_result = subgraph.invoke(some_input)
# ...
interrupt()interrupt()def node_in_subgraph(state: State):
some_other_code() # <-- Also re-executes on resume
result = interrupt("What's your name?")
# ...
</python>
<typescript>
```typescript
async function nodeInParentGraph(state: State) {
someCode(); // <-- Re-executes on resume
const subgraphResult = await subgraph.invoke(someInput);
// ...
}
async function nodeInSubgraph(state: State) {
someOtherCode(); // <-- Also re-executes on resume
const result = interrupt("What's your name?");
// ...
}当子图包含时,恢复执行会同时重新执行调用子图的父节点和调用的子图节点:
<python>
```python
def node_in_parent_graph(state: State):
some_code() # <-- Re-executes on resume
subgraph_result = subgraph.invoke(some_input)
# ...
interrupt()interrupt()def node_in_subgraph(state: State):
some_other_code() # <-- Also re-executes on resume
result = interrupt("What's your name?")
# ...
</python>
<typescript>
```typescript
async function nodeInParentGraph(state: State) {
someCode(); // <-- Re-executes on resume
const subgraphResult = await subgraph.invoke(someInput);
// ...
}
async function nodeInSubgraph(state: State) {
someOtherCode(); // <-- Also re-executes on resume
const result = interrupt("What's your name?");
// ...
}Command(resume) Warning
Command(resume) 警告
Command(resume=...)invoke()stream()Command(update=...)Command(resume=...)invoke()stream()Command(update=...)Fixes
常见问题修复
<fix-checkpointer-required-for-interrupts>
<python>
Checkpointer required for interrupt functionality.
```python
<fix-checkpointer-required-for-interrupts>
<python>
中断功能必须配置Checkpointer。
```python
WRONG
WRONG
graph = builder.compile()
graph = builder.compile()
CORRECT
CORRECT
graph = builder.compile(checkpointer=InMemorySaver())
</python>
<typescript>
Checkpointer required for interrupt functionality.
```typescript
// WRONG
const graph = builder.compile();
// CORRECT
const graph = builder.compile({ checkpointer: new MemorySaver() });graph = builder.compile(checkpointer=InMemorySaver())
</python>
<typescript>
中断功能必须配置Checkpointer。
```typescript
// WRONG
const graph = builder.compile();
// CORRECT
const graph = builder.compile({ checkpointer: new MemorySaver() });WRONG
WRONG
graph.invoke({"resume_data": "approve"}, config)
graph.invoke({"resume_data": "approve"}, config)
CORRECT
CORRECT
graph.invoke(Command(resume="approve"), config)
</python>
<typescript>
Use Command to resume from an interrupt (regular object restarts graph).
```typescript
// WRONG
await graph.invoke({ resumeData: "approve" }, config);
// CORRECT
await graph.invoke(new Command({ resume: "approve" }), config);graph.invoke(Command(resume="approve"), config)
</python>
<typescript>
使用Command恢复中断(普通对象会重启图)。
```typescript
// WRONG
await graph.invoke({ resumeData: "approve" }, config);
// CORRECT
await graph.invoke(new Command({ resume: "approve" }), config);What You Should NOT Do
禁止操作
- Use interrupts without a checkpointer — will fail
- Resume without the same thread_id — creates a new thread instead of resuming
- Pass as invoke input — graph appears stuck (use plain dict)
Command(update=...) - Perform non-idempotent side effects before — creates duplicates on resume
interrupt() - Assume code before only runs once — it re-runs every resume
interrupt()
- 不配置Checkpointer就使用中断——会执行失败
- 恢复时使用不同的thread_id——会创建新线程而非恢复原有任务
- 将作为invoke输入——图会看似卡住(请使用普通字典)
Command(update=...) - 在前执行非幂等副作用操作——恢复时会产生重复数据
interrupt() - 假设之前的代码仅执行一次——每次恢复都会重新运行
interrupt()