langgraph-streaming
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseLangGraph Streaming
LangGraph 流式传输
Real-time updates and progress tracking for LangGraph workflows.
为LangGraph工作流提供实时更新和进度追踪功能。
5 Stream Modes
5种流模式
python
undefinedpython
undefinedAvailable modes
Available modes
for mode, chunk in graph.stream(inputs, stream_mode=["values", "updates", "messages", "custom", "debug"]):
print(f"[{mode}] {chunk}")
| Mode | Purpose | Use Case |
|------|---------|----------|
| **values** | Full state after each step | Debugging, state inspection |
| **updates** | State deltas after each step | Efficient UI updates |
| **messages** | LLM tokens + metadata | Chat interfaces, typing indicators |
| **custom** | User-defined events | Progress bars, status updates |
| **debug** | Maximum information | Development, troubleshooting |for mode, chunk in graph.stream(inputs, stream_mode=["values", "updates", "messages", "custom", "debug"]):
print(f"[{mode}] {chunk}")
| 模式 | 用途 | 使用场景 |
|------|---------|----------|
| **values** | 每步执行后的完整状态 | 调试、状态检查 |
| **updates** | 每步执行后的状态增量 | 高效UI更新 |
| **messages** | LLM Token + 元数据 | 聊天界面、输入指示器 |
| **custom** | 用户定义的事件 | 进度条、状态更新 |
| **debug** | 最详细的信息 | 开发、故障排查 |Custom Events with StreamWriter
使用StreamWriter实现自定义事件
python
from langgraph.config import get_stream_writer
def node_with_progress(state: State):
"""Emit custom progress events."""
writer = get_stream_writer()
for i, item in enumerate(state["items"]):
writer({
"type": "progress",
"current": i + 1,
"total": len(state["items"]),
"status": f"Processing {item}"
})
result = process(item)
writer({"type": "complete", "message": "All items processed"})
return {"results": results}python
from langgraph.config import get_stream_writer
def node_with_progress(state: State):
"""Emit custom progress events."""
writer = get_stream_writer()
for i, item in enumerate(state["items"]):
writer({
"type": "progress",
"current": i + 1,
"total": len(state["items"]),
"status": f"Processing {item}"
})
result = process(item)
writer({"type": "complete", "message": "All items processed"})
return {"results": results}Consume custom events
Consume custom events
for mode, chunk in graph.stream(inputs, stream_mode=["updates", "custom"]):
if mode == "custom":
if chunk.get("type") == "progress":
print(f"Progress: {chunk['current']}/{chunk['total']}")
elif mode == "updates":
print(f"State updated: {list(chunk.keys())}")
undefinedfor mode, chunk in graph.stream(inputs, stream_mode=["updates", "custom"]):
if mode == "custom":
if chunk.get("type") == "progress":
print(f"Progress: {chunk['current']}/{chunk['total']}")
elif mode == "updates":
print(f"State updated: {list(chunk.keys())}")
undefinedLLM Token Streaming
LLM Token流式传输
python
undefinedpython
undefinedStream tokens from LLM calls
Stream tokens from LLM calls
for message_chunk, metadata in graph.stream(
{"topic": "AI safety"},
stream_mode="messages"
):
if message_chunk.content:
print(message_chunk.content, end="", flush=True)
for message_chunk, metadata in graph.stream(
{"topic": "AI safety"},
stream_mode="messages"
):
if message_chunk.content:
print(message_chunk.content, end="", flush=True)
Filter by node
Filter by node
for msg, meta in graph.stream(inputs, stream_mode="messages"):
if meta["langgraph_node"] == "writer_agent":
print(msg.content, end="")
for msg, meta in graph.stream(inputs, stream_mode="messages"):
if meta["langgraph_node"] == "writer_agent":
print(msg.content, end="")
Filter by tags
Filter by tags
model = init_chat_model("claude-sonnet-4-20250514", tags=["main_response"])
for msg, meta in graph.stream(inputs, stream_mode="messages"):
if "main_response" in meta.get("tags", []):
print(msg.content, end="")
undefinedmodel = init_chat_model("claude-sonnet-4-20250514", tags=["main_response"])
for msg, meta in graph.stream(inputs, stream_mode="messages"):
if "main_response" in meta.get("tags", []):
print(msg.content, end="")
undefinedSubgraph Streaming
子图流式传输
python
undefinedpython
undefinedEnable subgraph visibility
Enable subgraph visibility
for namespace, chunk in graph.stream(
inputs,
subgraphs=True,
stream_mode="updates"
):
# namespace shows graph hierarchy: (), ("child",), ("child", "grandchild")
print(f"[{'/'.join(namespace) or 'root'}] {chunk}")
undefinedfor namespace, chunk in graph.stream(
inputs,
subgraphs=True,
stream_mode="updates"
):
# namespace shows graph hierarchy: (), ("child",), ("child", "grandchild")
print(f"[{'/'.join(namespace) or 'root'}] {chunk}")
undefinedMultiple Modes Simultaneously
同时使用多种模式
python
undefinedpython
undefinedCombine modes for comprehensive feedback
Combine modes for comprehensive feedback
async for mode, chunk in graph.astream(
inputs,
stream_mode=["updates", "custom", "messages"]
):
match mode:
case "updates":
update_ui_state(chunk)
case "custom":
show_progress(chunk)
case "messages":
append_to_chat(chunk)
undefinedasync for mode, chunk in graph.astream(
inputs,
stream_mode=["updates", "custom", "messages"]
):
match mode:
case "updates":
update_ui_state(chunk)
case "custom":
show_progress(chunk)
case "messages":
append_to_chat(chunk)
undefinedNon-LangChain LLM Streaming
非LangChain LLM流式传输
python
def call_custom_llm(state: State):
"""Stream from arbitrary LLM APIs."""
writer = get_stream_writer()
for chunk in your_streaming_client.generate(state["prompt"]):
writer({"type": "llm_token", "content": chunk.text})
return {"response": full_response}python
def call_custom_llm(state: State):
"""Stream from arbitrary LLM APIs."""
writer = get_stream_writer()
for chunk in your_streaming_client.generate(state["prompt"]):
writer({"type": "llm_token", "content": chunk.text})
return {"response": full_response}FastAPI SSE Integration
FastAPI SSE集成
python
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import json
app = FastAPI()
@app.post("/stream")
async def stream_workflow(request: WorkflowRequest):
async def event_generator():
async for mode, chunk in graph.astream(
request.inputs,
stream_mode=["updates", "custom"]
):
yield f"data: {json.dumps({'mode': mode, 'data': chunk})}\n\n"
return StreamingResponse(
event_generator(),
media_type="text/event-stream"
)python
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import json
app = FastAPI()
@app.post("/stream")
async def stream_workflow(request: WorkflowRequest):
async def event_generator():
async for mode, chunk in graph.astream(
request.inputs,
stream_mode=["updates", "custom"]
):
yield f"data: {json.dumps({'mode': mode, 'data': chunk})}\n\n"
return StreamingResponse(
event_generator(),
media_type="text/event-stream"
)Python < 3.11 Async
Python < 3.11 异步处理
python
undefinedpython
undefinedManual config propagation required
Manual config propagation required
async def call_model(state: State, config: RunnableConfig):
response = await model.ainvoke(state["messages"], config)
return {"messages": [response]}
async def call_model(state: State, config: RunnableConfig):
response = await model.ainvoke(state["messages"], config)
return {"messages": [response]}
Explicit writer injection
Explicit writer injection
async def node_with_custom_stream(state: State, writer: StreamWriter):
writer({"status": "processing"})
result = await process_async(state)
return {"result": result}
undefinedasync def node_with_custom_stream(state: State, writer: StreamWriter):
writer({"status": "processing"})
result = await process_async(state)
return {"result": result}
undefinedKey Decisions
关键决策建议
| Decision | Recommendation |
|---|---|
| Mode selection | Use |
| Token streaming | Use |
| Progress tracking | Use custom mode with |
| Subgraph visibility | Enable |
| 决策 | 推荐方案 |
|---|---|
| 模式选择 | 大多数UI场景使用 |
| Token流式传输 | 使用 |
| 进度追踪 | 使用自定义模式配合 |
| 子图可见性 | 复杂工作流中启用 |
Common Mistakes
常见错误
- Forgetting parameter (defaults to
stream_modeonly)values - Not handling async properly in Python < 3.11
- Missing on print for real-time display
flush=True - Not filtering messages by node/tags (noisy output)
- 忘记设置参数(默认仅启用
stream_mode模式)values - 在Python < 3.11中未正确处理异步逻辑
- 实时显示时未在print中添加
flush=True - 未按节点/标签过滤消息(导致输出冗余)
Evaluations
评估测试
See references/evaluations.md for test cases.
查看references/evaluations.md获取测试用例。
Related Skills
相关技能
- - Stream updates from nested graphs
langgraph-subgraphs - - Stream status while awaiting human
langgraph-human-in-loop - - Stream agent progress in supervisor workflows
langgraph-supervisor - - Stream from parallel execution branches
langgraph-parallel - - Stream tool execution progress
langgraph-tools - - SSE endpoint design patterns
api-design-framework
- - 从嵌套图中流式获取更新
langgraph-subgraphs - - 等待人工操作时流式传输状态
langgraph-human-in-loop - - 在监督者工作流中流式传输Agent进度
langgraph-supervisor - - 从并行执行分支流式获取数据
langgraph-parallel - - 流式传输工具执行进度
langgraph-tools - - SSE端点设计模式
api-design-framework
Capability Details
功能详情
stream-modes
stream-modes
Keywords: stream mode, values, updates, messages, custom, debug
Solves:
- Configure streaming output format
- Choose appropriate mode for use case
- Combine multiple stream modes
关键词: stream mode, values, updates, messages, custom, debug
解决问题:
- 配置流式输出格式
- 根据使用场景选择合适的模式
- 组合多种流模式
custom-events
custom-events
Keywords: custom event, progress, status, stream writer, get_stream_writer
Solves:
- Emit custom progress events
- Track workflow status
- Implement progress bars
关键词: custom event, progress, status, stream writer, get_stream_writer
解决问题:
- 发送自定义进度事件
- 追踪工作流状态
- 实现进度条
token-streaming
token-streaming
Keywords: token, LLM stream, chat, typing indicator, messages mode
Solves:
- Stream LLM tokens in real-time
- Build chat interfaces
- Show typing indicators
关键词: token, LLM stream, chat, typing indicator, messages mode
解决问题:
- 实时流式传输LLM Token
- 构建聊天界面
- 显示输入指示器
subgraph-streaming
subgraph-streaming
Keywords: subgraph, nested, hierarchy, namespace
Solves:
- Stream from nested graphs
- Track subgraph progress
- Debug complex workflows
关键词: subgraph, nested, hierarchy, namespace
解决问题:
- 从嵌套图中流式获取数据
- 追踪子图进度
- 调试复杂工作流