langgraph-streaming

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

LangGraph Streaming

LangGraph 流式传输

Real-time updates and progress tracking for LangGraph workflows.
为LangGraph工作流提供实时更新和进度追踪功能。

5 Stream Modes

5种流模式

python
undefined
python
undefined

Available modes

Available modes

for mode, chunk in graph.stream(inputs, stream_mode=["values", "updates", "messages", "custom", "debug"]): print(f"[{mode}] {chunk}")

| Mode | Purpose | Use Case |
|------|---------|----------|
| **values** | Full state after each step | Debugging, state inspection |
| **updates** | State deltas after each step | Efficient UI updates |
| **messages** | LLM tokens + metadata | Chat interfaces, typing indicators |
| **custom** | User-defined events | Progress bars, status updates |
| **debug** | Maximum information | Development, troubleshooting |
for mode, chunk in graph.stream(inputs, stream_mode=["values", "updates", "messages", "custom", "debug"]): print(f"[{mode}] {chunk}")

| 模式 | 用途 | 使用场景 |
|------|---------|----------|
| **values** | 每步执行后的完整状态 | 调试、状态检查 |
| **updates** | 每步执行后的状态增量 | 高效UI更新 |
| **messages** | LLM Token + 元数据 | 聊天界面、输入指示器 |
| **custom** | 用户定义的事件 | 进度条、状态更新 |
| **debug** | 最详细的信息 | 开发、故障排查 |

Custom Events with StreamWriter

使用StreamWriter实现自定义事件

python
from langgraph.config import get_stream_writer

def node_with_progress(state: State):
    """Emit custom progress events."""
    writer = get_stream_writer()

    for i, item in enumerate(state["items"]):
        writer({
            "type": "progress",
            "current": i + 1,
            "total": len(state["items"]),
            "status": f"Processing {item}"
        })
        result = process(item)

    writer({"type": "complete", "message": "All items processed"})
    return {"results": results}
python
from langgraph.config import get_stream_writer

def node_with_progress(state: State):
    """Emit custom progress events."""
    writer = get_stream_writer()

    for i, item in enumerate(state["items"]):
        writer({
            "type": "progress",
            "current": i + 1,
            "total": len(state["items"]),
            "status": f"Processing {item}"
        })
        result = process(item)

    writer({"type": "complete", "message": "All items processed"})
    return {"results": results}

Consume custom events

Consume custom events

for mode, chunk in graph.stream(inputs, stream_mode=["updates", "custom"]): if mode == "custom": if chunk.get("type") == "progress": print(f"Progress: {chunk['current']}/{chunk['total']}") elif mode == "updates": print(f"State updated: {list(chunk.keys())}")
undefined
for mode, chunk in graph.stream(inputs, stream_mode=["updates", "custom"]): if mode == "custom": if chunk.get("type") == "progress": print(f"Progress: {chunk['current']}/{chunk['total']}") elif mode == "updates": print(f"State updated: {list(chunk.keys())}")
undefined

LLM Token Streaming

LLM Token流式传输

python
undefined
python
undefined

Stream tokens from LLM calls

Stream tokens from LLM calls

for message_chunk, metadata in graph.stream( {"topic": "AI safety"}, stream_mode="messages" ): if message_chunk.content: print(message_chunk.content, end="", flush=True)
for message_chunk, metadata in graph.stream( {"topic": "AI safety"}, stream_mode="messages" ): if message_chunk.content: print(message_chunk.content, end="", flush=True)

Filter by node

Filter by node

for msg, meta in graph.stream(inputs, stream_mode="messages"): if meta["langgraph_node"] == "writer_agent": print(msg.content, end="")
for msg, meta in graph.stream(inputs, stream_mode="messages"): if meta["langgraph_node"] == "writer_agent": print(msg.content, end="")

Filter by tags

Filter by tags

model = init_chat_model("claude-sonnet-4-20250514", tags=["main_response"])
for msg, meta in graph.stream(inputs, stream_mode="messages"): if "main_response" in meta.get("tags", []): print(msg.content, end="")
undefined
model = init_chat_model("claude-sonnet-4-20250514", tags=["main_response"])
for msg, meta in graph.stream(inputs, stream_mode="messages"): if "main_response" in meta.get("tags", []): print(msg.content, end="")
undefined

Subgraph Streaming

子图流式传输

python
undefined
python
undefined

Enable subgraph visibility

Enable subgraph visibility

for namespace, chunk in graph.stream( inputs, subgraphs=True, stream_mode="updates" ): # namespace shows graph hierarchy: (), ("child",), ("child", "grandchild") print(f"[{'/'.join(namespace) or 'root'}] {chunk}")
undefined
for namespace, chunk in graph.stream( inputs, subgraphs=True, stream_mode="updates" ): # namespace shows graph hierarchy: (), ("child",), ("child", "grandchild") print(f"[{'/'.join(namespace) or 'root'}] {chunk}")
undefined

Multiple Modes Simultaneously

同时使用多种模式

python
undefined
python
undefined

Combine modes for comprehensive feedback

Combine modes for comprehensive feedback

async for mode, chunk in graph.astream( inputs, stream_mode=["updates", "custom", "messages"] ): match mode: case "updates": update_ui_state(chunk) case "custom": show_progress(chunk) case "messages": append_to_chat(chunk)
undefined
async for mode, chunk in graph.astream( inputs, stream_mode=["updates", "custom", "messages"] ): match mode: case "updates": update_ui_state(chunk) case "custom": show_progress(chunk) case "messages": append_to_chat(chunk)
undefined

Non-LangChain LLM Streaming

非LangChain LLM流式传输

python
def call_custom_llm(state: State):
    """Stream from arbitrary LLM APIs."""
    writer = get_stream_writer()

    for chunk in your_streaming_client.generate(state["prompt"]):
        writer({"type": "llm_token", "content": chunk.text})

    return {"response": full_response}
python
def call_custom_llm(state: State):
    """Stream from arbitrary LLM APIs."""
    writer = get_stream_writer()

    for chunk in your_streaming_client.generate(state["prompt"]):
        writer({"type": "llm_token", "content": chunk.text})

    return {"response": full_response}

FastAPI SSE Integration

FastAPI SSE集成

python
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import json

app = FastAPI()

@app.post("/stream")
async def stream_workflow(request: WorkflowRequest):
    async def event_generator():
        async for mode, chunk in graph.astream(
            request.inputs,
            stream_mode=["updates", "custom"]
        ):
            yield f"data: {json.dumps({'mode': mode, 'data': chunk})}\n\n"

    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream"
    )
python
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import json

app = FastAPI()

@app.post("/stream")
async def stream_workflow(request: WorkflowRequest):
    async def event_generator():
        async for mode, chunk in graph.astream(
            request.inputs,
            stream_mode=["updates", "custom"]
        ):
            yield f"data: {json.dumps({'mode': mode, 'data': chunk})}\n\n"

    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream"
    )

Python < 3.11 Async

Python < 3.11 异步处理

python
undefined
python
undefined

Manual config propagation required

Manual config propagation required

async def call_model(state: State, config: RunnableConfig): response = await model.ainvoke(state["messages"], config) return {"messages": [response]}
async def call_model(state: State, config: RunnableConfig): response = await model.ainvoke(state["messages"], config) return {"messages": [response]}

Explicit writer injection

Explicit writer injection

async def node_with_custom_stream(state: State, writer: StreamWriter): writer({"status": "processing"}) result = await process_async(state) return {"result": result}
undefined
async def node_with_custom_stream(state: State, writer: StreamWriter): writer({"status": "processing"}) result = await process_async(state) return {"result": result}
undefined

Key Decisions

关键决策建议

DecisionRecommendation
Mode selectionUse
["updates", "custom"]
for most UIs
Token streamingUse
messages
mode with node filtering
Progress trackingUse custom mode with
get_stream_writer()
Subgraph visibilityEnable
subgraphs=True
for complex workflows
决策推荐方案
模式选择大多数UI场景使用
["updates", "custom"]
Token流式传输使用
messages
模式并按节点过滤
进度追踪使用自定义模式配合
get_stream_writer()
子图可见性复杂工作流中启用
subgraphs=True

Common Mistakes

常见错误

  • Forgetting
    stream_mode
    parameter (defaults to
    values
    only)
  • Not handling async properly in Python < 3.11
  • Missing
    flush=True
    on print for real-time display
  • Not filtering messages by node/tags (noisy output)
  • 忘记设置
    stream_mode
    参数(默认仅启用
    values
    模式)
  • 在Python < 3.11中未正确处理异步逻辑
  • 实时显示时未在print中添加
    flush=True
  • 未按节点/标签过滤消息(导致输出冗余)

Evaluations

评估测试

See references/evaluations.md for test cases.
查看references/evaluations.md获取测试用例。

Related Skills

相关技能

  • langgraph-subgraphs
    - Stream updates from nested graphs
  • langgraph-human-in-loop
    - Stream status while awaiting human
  • langgraph-supervisor
    - Stream agent progress in supervisor workflows
  • langgraph-parallel
    - Stream from parallel execution branches
  • langgraph-tools
    - Stream tool execution progress
  • api-design-framework
    - SSE endpoint design patterns
  • langgraph-subgraphs
    - 从嵌套图中流式获取更新
  • langgraph-human-in-loop
    - 等待人工操作时流式传输状态
  • langgraph-supervisor
    - 在监督者工作流中流式传输Agent进度
  • langgraph-parallel
    - 从并行执行分支流式获取数据
  • langgraph-tools
    - 流式传输工具执行进度
  • api-design-framework
    - SSE端点设计模式

Capability Details

功能详情

stream-modes

stream-modes

Keywords: stream mode, values, updates, messages, custom, debug Solves:
  • Configure streaming output format
  • Choose appropriate mode for use case
  • Combine multiple stream modes
关键词: stream mode, values, updates, messages, custom, debug 解决问题:
  • 配置流式输出格式
  • 根据使用场景选择合适的模式
  • 组合多种流模式

custom-events

custom-events

Keywords: custom event, progress, status, stream writer, get_stream_writer Solves:
  • Emit custom progress events
  • Track workflow status
  • Implement progress bars
关键词: custom event, progress, status, stream writer, get_stream_writer 解决问题:
  • 发送自定义进度事件
  • 追踪工作流状态
  • 实现进度条

token-streaming

token-streaming

Keywords: token, LLM stream, chat, typing indicator, messages mode Solves:
  • Stream LLM tokens in real-time
  • Build chat interfaces
  • Show typing indicators
关键词: token, LLM stream, chat, typing indicator, messages mode 解决问题:
  • 实时流式传输LLM Token
  • 构建聊天界面
  • 显示输入指示器

subgraph-streaming

subgraph-streaming

Keywords: subgraph, nested, hierarchy, namespace Solves:
  • Stream from nested graphs
  • Track subgraph progress
  • Debug complex workflows
关键词: subgraph, nested, hierarchy, namespace 解决问题:
  • 从嵌套图中流式获取数据
  • 追踪子图进度
  • 调试复杂工作流