pocketflow
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChinesePocketFlow Skill
PocketFlow 技能指南
A comprehensive guide to building LLM applications using PocketFlow - a 100-line minimalist framework for Agents, Task Decomposition, RAG, and more.
这是一份使用PocketFlow构建LLM应用的综合指南,PocketFlow是一个仅100行代码的极简框架,支持Agent、任务分解、RAG等功能。
When to Use This Skill
何时使用本技能
Activate this skill when working with:
- Graph-based LLM workflows - Building complex AI systems with nodes and flows
- Agentic applications - Creating autonomous agents with dynamic action selection
- Task decomposition - Breaking down complex LLM tasks into manageable steps
- RAG systems - Implementing Retrieval Augmented Generation pipelines
- Batch processing - Handling large inputs or multiple files with LLMs
- Multi-agent systems - Coordinating multiple AI agents
- Async workflows - Building I/O-bound LLM applications with concurrency
在处理以下场景时启用本技能:
- 基于图的LLM工作流 - 用节点和流构建复杂AI系统
- 智能体应用 - 创建具备动态动作选择能力的自主Agent
- 任务分解 - 将复杂LLM任务拆分为可管理的步骤
- RAG系统 - 实现检索增强生成(Retrieval Augmented Generation)流水线
- 批量处理 - 用LLM处理大输入或多文件
- 多Agent系统 - 协调多个AI Agent
- 异步工作流 - 构建支持并发的I/O密集型LLM应用
Core Concepts
核心概念
Architecture Overview
架构概述
PocketFlow models LLM workflows as Graph + Shared Store:
python
undefinedPocketFlow将LLM工作流建模为图 + 共享存储:
python
undefinedShared Store: Central data storage
Shared Store: Central data storage
shared = {
"data": {},
"summary": {},
"config": {...}
}
shared = {
"data": {},
"summary": {},
"config": {...}
}
Graph: Nodes connected by transitions
Graph: Nodes connected by transitions
node_a >> node_b >> node_c
flow = Flow(start=node_a)
flow.run(shared)
undefinednode_a >> node_b >> node_c
flow = Flow(start=node_a)
flow.run(shared)
undefinedThe Node: Building Block
节点:基本构建块
Every Node has 3 steps: → →
prep()exec()post()python
class SummarizeFile(Node):
def prep(self, shared):
# Get data from shared store
return shared["data"]
def exec(self, prep_res):
# Process with LLM (retries built-in)
prompt = f"Summarize this text in 10 words: {prep_res}"
summary = call_llm(prompt)
return summary
def post(self, shared, prep_res, exec_res):
# Write results back to shared store
shared["summary"] = exec_res
return "default" # Action for flow controlWhy 3 steps? Separation of concerns - data storage and processing operate separately.
每个节点包含3个步骤: → →
prep()exec()post()python
class SummarizeFile(Node):
def prep(self, shared):
# Get data from shared store
return shared["data"]
def exec(self, prep_res):
# Process with LLM (retries built-in)
prompt = f"Summarize this text in 10 words: {prep_res}"
summary = call_llm(prompt)
return summary
def post(self, shared, prep_res, exec_res):
# Write results back to shared store
shared["summary"] = exec_res
return "default" # Action for flow control为什么分3步? 关注点分离 - 数据存储与处理独立运行。
The Flow: Orchestration
流:编排
python
undefinedpython
undefinedSimple sequence
Simple sequence
load_data >> summarize >> save_result
flow = Flow(start=load_data)
flow.run(shared)
load_data >> summarize >> save_result
flow = Flow(start=load_data)
flow.run(shared)
Branching with actions
Branching with actions
review - "approved" >> payment
review - "needs_revision" >> revise
review - "rejected" >> finish
revise >> review # Loop back
flow = Flow(start=review)
undefinedreview - "approved" >> payment
review - "needs_revision" >> revise
review - "rejected" >> finish
revise >> review # Loop back
flow = Flow(start=review)
undefinedQuick Reference
快速参考
1. Basic Node Pattern
1. 基础节点模式
python
class LoadData(Node):
def post(self, shared, prep_res, exec_res):
shared["data"] = "Some text content"
return None
class Summarize(Node):
def prep(self, shared):
return shared["data"]
def exec(self, prep_res):
return call_llm(f"Summarize: {prep_res}")
def post(self, shared, prep_res, exec_res):
shared["summary"] = exec_res
return "default"python
class LoadData(Node):
def post(self, shared, prep_res, exec_res):
shared["data"] = "Some text content"
return None
class Summarize(Node):
def prep(self, shared):
return shared["data"]
def exec(self, prep_res):
return call_llm(f"Summarize: {prep_res}")
def post(self, shared, prep_res, exec_res):
shared["summary"] = exec_res
return "default"Connect and run
Connect and run
load_data >> summarize
flow = Flow(start=load_data)
flow.run(shared)
undefinedload_data >> summarize
flow = Flow(start=load_data)
flow.run(shared)
undefined2. Batch Processing
2. 批量处理
BatchNode - Process large inputs in chunks:
python
class MapSummaries(BatchNode):
def prep(self, shared):
# Chunk big file
content = shared["data"]
chunk_size = 10000
return [content[i:i+chunk_size]
for i in range(0, len(content), chunk_size)]
def exec(self, chunk):
# Process each chunk
return call_llm(f"Summarize: {chunk}")
def post(self, shared, prep_res, exec_res_list):
# Combine all results
shared["summary"] = "\n".join(exec_res_list)
return "default"BatchFlow - Run flow multiple times with different parameters:
python
class SummarizeAllFiles(BatchFlow):
def prep(self, shared):
filenames = list(shared["data"].keys())
# Return list of parameter dicts
return [{"filename": fn} for fn in filenames]
class LoadFile(Node):
def prep(self, shared):
# Access filename from params
filename = self.params["filename"]
return filenameBatchNode - 分块处理大输入:
python
class MapSummaries(BatchNode):
def prep(self, shared):
# Chunk big file
content = shared["data"]
chunk_size = 10000
return [content[i:i+chunk_size]
for i in range(0, len(content), chunk_size)]
def exec(self, chunk):
# Process each chunk
return call_llm(f"Summarize: {chunk}")
def post(self, shared, prep_res, exec_res_list):
# Combine all results
shared["summary"] = "\n".join(exec_res_list)
return "default"BatchFlow - 使用不同参数多次运行流:
python
class SummarizeAllFiles(BatchFlow):
def prep(self, shared):
filenames = list(shared["data"].keys())
# Return list of parameter dicts
return [{"filename": fn} for fn in filenames]
class LoadFile(Node):
def prep(self, shared):
# Access filename from params
filename = self.params["filename"]
return filename3. Agent Pattern
3. Agent模式
python
class DecideAction(Node):
def exec(self, inputs):
query, context = inputs
prompt = f"""
Given input: {query}
Previous search results: {context}
Should I: 1) Search web for more info 2) Answer with current knowledge
Output in yaml:
```yaml
action: search/answer
reason: why this action
search_term: search phrase if action is search
```"""
resp = call_llm(prompt)
yaml_str = resp.split("```yaml")[1].split("```")[0]
action_data = yaml.safe_load(yaml_str)
return action_datapython
class DecideAction(Node):
def exec(self, inputs):
query, context = inputs
prompt = f"""
Given input: {query}
Previous search results: {context}
Should I: 1) Search web for more info 2) Answer with current knowledge
Output in yaml:
```yaml
action: search/answer
reason: why this action
search_term: search phrase if action is search
```"""
resp = call_llm(prompt)
yaml_str = resp.split("```yaml")[1].split("```")[0]
action_data = yaml.safe_load(yaml_str)
return action_dataBuild agent graph
Build agent graph
decide >> search_web
decide - "answer" >> provide_answer
search_web >> decide # Loop back for more searches
agent_flow = Flow(start=decide)
undefineddecide >> search_web
decide - "answer" >> provide_answer
search_web >> decide # Loop back for more searches
agent_flow = Flow(start=decide)
undefined4. RAG (Retrieval Augmented Generation)
4. RAG(检索增强生成)
Stage 1: Offline Indexing
python
class ChunkDocs(BatchNode):
def prep(self, shared):
return shared["files"]
def exec(self, filepath):
with open(filepath, "r") as f:
text = f.read()
# Chunk by 100 chars
size = 100
return [text[i:i+size] for i in range(0, len(text), size)]
def post(self, shared, prep_res, exec_res_list):
shared["all_chunks"] = [c for chunks in exec_res_list
for c in chunks]
chunk_docs >> embed_docs >> build_index
offline_flow = Flow(start=chunk_docs)Stage 2: Online Query
python
class RetrieveDocs(Node):
def exec(self, inputs):
q_emb, index, chunks = inputs
I, D = search_index(index, q_emb, top_k=1)
return chunks[I[0][0]]
embed_query >> retrieve_docs >> generate_answer
online_flow = Flow(start=embed_query)阶段1:离线索引
python
class ChunkDocs(BatchNode):
def prep(self, shared):
return shared["files"]
def exec(self, filepath):
with open(filepath, "r") as f:
text = f.read()
# Chunk by 100 chars
size = 100
return [text[i:i+size] for i in range(0, len(text), size)]
def post(self, shared, prep_res, exec_res_list):
shared["all_chunks"] = [c for chunks in exec_res_list
for c in chunks]
chunk_docs >> embed_docs >> build_index
offline_flow = Flow(start=chunk_docs)阶段2:在线查询
python
class RetrieveDocs(Node):
def exec(self, inputs):
q_emb, index, chunks = inputs
I, D = search_index(index, q_emb, top_k=1)
return chunks[I[0][0]]
embed_query >> retrieve_docs >> generate_answer
online_flow = Flow(start=embed_query)5. Async & Parallel
5. 异步与并行
AsyncNode for I/O-bound operations:
python
class SummarizeThenVerify(AsyncNode):
async def prep_async(self, shared):
doc_text = await read_file_async(shared["doc_path"])
return doc_text
async def exec_async(self, prep_res):
summary = await call_llm_async(f"Summarize: {prep_res}")
return summary
async def post_async(self, shared, prep_res, exec_res):
decision = await gather_user_feedback(exec_res)
if decision == "approve":
shared["summary"] = exec_res
return "default"AsyncNode 用于I/O密集型操作:
python
class SummarizeThenVerify(AsyncNode):
async def prep_async(self, shared):
doc_text = await read_file_async(shared["doc_path"])
return doc_text
async def exec_async(self, prep_res):
summary = await call_llm_async(f"Summarize: {prep_res}")
return summary
async def post_async(self, shared, prep_res, exec_res):
decision = await gather_user_feedback(exec_res)
if decision == "approve":
shared["summary"] = exec_res
return "default"Must wrap in AsyncFlow
Must wrap in AsyncFlow
node = SummarizeThenVerify()
flow = AsyncFlow(start=node)
await flow.run_async(shared)
**AsyncParallelBatchNode** - Process multiple items concurrently:
```python
class ParallelSummaries(AsyncParallelBatchNode):
async def prep_async(self, shared):
return shared["texts"] # List of texts
async def exec_async(self, text):
# Runs in parallel for each text
return await call_llm_async(f"Summarize: {text}")
async def post_async(self, shared, prep_res, exec_res_list):
shared["summary"] = "\n\n".join(exec_res_list)
return "default"node = SummarizeThenVerify()
flow = AsyncFlow(start=node)
await flow.run_async(shared)
**AsyncParallelBatchNode** - 并发处理多个项:
```python
class ParallelSummaries(AsyncParallelBatchNode):
async def prep_async(self, shared):
return shared["texts"] # List of texts
async def exec_async(self, text):
# Runs in parallel for each text
return await call_llm_async(f"Summarize: {text}")
async def post_async(self, shared, prep_res, exec_res_list):
shared["summary"] = "\n\n".join(exec_res_list)
return "default"6. Workflow (Task Decomposition)
6. 工作流(任务分解)
python
class GenerateOutline(Node):
def prep(self, shared):
return shared["topic"]
def exec(self, topic):
return call_llm(f"Create outline for: {topic}")
def post(self, shared, prep_res, exec_res):
shared["outline"] = exec_res
class WriteSection(Node):
def exec(self, outline):
return call_llm(f"Write content: {outline}")
def post(self, shared, prep_res, exec_res):
shared["draft"] = exec_res
class ReviewAndRefine(Node):
def exec(self, draft):
return call_llm(f"Review and improve: {draft}")python
class GenerateOutline(Node):
def prep(self, shared):
return shared["topic"]
def exec(self, topic):
return call_llm(f"Create outline for: {topic}")
def post(self, shared, prep_res, exec_res):
shared["outline"] = exec_res
class WriteSection(Node):
def exec(self, outline):
return call_llm(f"Write content: {outline}")
def post(self, shared, prep_res, exec_res):
shared["draft"] = exec_res
class ReviewAndRefine(Node):
def exec(self, draft):
return call_llm(f"Review and improve: {draft}")Chain the workflow
Chain the workflow
outline >> write >> review
workflow = Flow(start=outline)
undefinedoutline >> write >> review
workflow = Flow(start=outline)
undefined7. Structured Output
7. 结构化输出
python
class SummarizeNode(Node):
def exec(self, prep_res):
prompt = f"""
Summarize the following text as YAML, with exactly 3 bullet points
{prep_res}
Output:
```yaml
summary:
- bullet 1
- bullet 2
- bullet 3
```"""
response = call_llm(prompt)
yaml_str = response.split("```yaml")[1].split("```")[0].strip()
import yaml
structured_result = yaml.safe_load(yaml_str)
# Validate
assert "summary" in structured_result
assert isinstance(structured_result["summary"], list)
return structured_resultWhy YAML? Modern LLMs handle YAML better than JSON (less escaping issues).
python
class SummarizeNode(Node):
def exec(self, prep_res):
prompt = f"""
Summarize the following text as YAML, with exactly 3 bullet points
{prep_res}
Output:
```yaml
summary:
- bullet 1
- bullet 2
- bullet 3
```"""
response = call_llm(prompt)
yaml_str = response.split("```yaml")[1].split("```")[0].strip()
import yaml
structured_result = yaml.safe_load(yaml_str)
# Validate
assert "summary" in structured_result
assert isinstance(structured_result["summary"], list)
return structured_result为什么用YAML? 现代LLM处理YAML比JSON更顺畅(更少转义问题)。
🍳 Cookbook: Real-World Examples
🍳 实战手册:真实场景示例
This skill includes 6 production-ready examples from the official PocketFlow cookbook, plus a complete Python project template.
📂 Location: and
assets/examples/assets/template/本技能包含官方PocketFlow实战手册中的6个生产就绪示例,以及完整的Python项目模板。
📂 位置: 和
assets/examples/assets/template/Example 1: Interactive Chat Bot (☆☆☆)
示例1:交互式聊天机器人(☆☆☆)
File:
assets/examples/01_chat.pyA chat bot with conversation history that loops back to itself.
python
undefined文件:
assets/examples/01_chat.py一款带对话历史的聊天机器人,支持循环对话。
python
undefinedKey pattern: Self-looping node
Key pattern: Self-looping node
chat_node = ChatNode()
chat_node - "continue" >> chat_node # Loop for continuous chat
flow = Flow(start=chat_node)
**What it demonstrates:**
- Message history management
- Self-looping nodes
- Graceful exit handling
- User input processing
**Run it:** `python assets/examples/01_chat.py`
---chat_node = ChatNode()
chat_node - "continue" >> chat_node # Loop for continuous chat
flow = Flow(start=chat_node)
**演示内容:**
- 对话历史管理
- 自循环节点
- 优雅退出处理
- 用户输入处理
**运行方式:** `python assets/examples/01_chat.py`
---Example 2: Article Writing Workflow (☆☆☆)
示例2:文章写作工作流(☆☆☆)
File:
assets/examples/02_workflow.pyMulti-step content creation: outline → draft → refine.
python
undefined文件:
assets/examples/02_workflow.py多步骤内容创作:大纲 → 草稿 → 优化。
python
undefinedSequential pipeline
Sequential pipeline
outline >> draft >> refine
flow = Flow(start=outline)
**What it demonstrates:**
- Task decomposition
- Sequential workflows
- Progressive content generation
**Run it:** `python assets/examples/02_workflow.py "AI Safety"`
---outline >> draft >> refine
flow = Flow(start=outline)
**演示内容:**
- 任务分解
- 顺序工作流
- 渐进式内容生成
**运行方式:** `python assets/examples/02_workflow.py "AI Safety"`
---Example 3: Research Agent (☆☆☆)
示例3:研究Agent(☆☆☆)
File:
assets/examples/03_agent.pyAgent that decides whether to search or answer.
python
undefined文件:
assets/examples/03_agent.py可自主决定搜索信息还是直接回答的Agent。
python
undefinedBranching based on decision
Branching based on decision
decide - "search" >> search
decide - "answer" >> answer
search - "continue" >> decide # Loop back
**What it demonstrates:**
- Dynamic action selection
- Branching logic
- Agent decision-making
- Iterative research loops
**Run it:** `python assets/examples/03_agent.py "Nobel Prize 2024"`
---decide - "search" >> search
decide - "answer" >> answer
search - "continue" >> decide # Loop back
**演示内容:**
- 动态动作选择
- 分支逻辑
- Agent决策机制
- 迭代式研究循环
**运行方式:** `python assets/examples/03_agent.py "Nobel Prize 2024"`
---Example 4: RAG System (☆☆☆)
示例4:RAG系统(☆☆☆)
File:
assets/examples/04_rag.pyComplete two-stage RAG pipeline with offline indexing and online querying.
python
undefined文件:
assets/examples/04_rag.py完整的两阶段RAG流水线,包含离线索引和在线查询。
python
undefinedStage 1: Offline indexing
Stage 1: Offline indexing
embed_docs >> build_index
offline_flow = Flow(start=embed_docs)
embed_docs >> build_index
offline_flow = Flow(start=embed_docs)
Stage 2: Online query
Stage 2: Online query
embed_query >> retrieve >> generate
online_flow = Flow(start=embed_query)
**What it demonstrates:**
- Document embedding and indexing
- Similarity search
- Context-based generation
- Multi-stage pipelines
**Run it:** `python assets/examples/04_rag.py --"How to install PocketFlow?"`
---embed_query >> retrieve >> generate
online_flow = Flow(start=embed_query)
**演示内容:**
- 文档嵌入与索引
- 相似度搜索
- 基于上下文的生成
- 多阶段流水线
**运行方式:** `python assets/examples/04_rag.py --"How to install PocketFlow?"`
---Example 5: Structured Output Parser (☆☆☆)
示例5:结构化输出解析器(☆☆☆)
File:
assets/examples/05_structured_output.pyResume parser extracting structured data with YAML.
python
undefined文件:
assets/examples/05_structured_output.py使用YAML提取结构化数据的简历解析器。
python
undefinedParse YAML from LLM response
Parse YAML from LLM response
yaml_str = response.split("")[0]
structured_result = yaml.safe_load(yaml_str)
yaml")[1].split("yaml_str = response.split("")[0]
structured_result = yaml.safe_load(yaml_str)
yaml")[1].split("Validate structure
Validate structure
assert "name" in structured_result
assert "email" in structured_result
**What it demonstrates:**
- Structured LLM responses with YAML
- Schema validation
- Retry logic for parsing
- Data extraction patterns
**Run it:** `python assets/examples/05_structured_output.py`
---assert "name" in structured_result
assert "email" in structured_result
**演示内容:**
- 基于YAML的LLM结构化响应
- Schema验证
- 解析重试逻辑
- 数据提取模式
**运行方式:** `python assets/examples/05_structured_output.py`
---Example 6: Multi-Agent Communication (★☆☆)
示例6:多Agent通信(★☆☆)
File:
assets/examples/06_multi_agent.pyTwo async agents playing Taboo word game.
python
undefined文件:
assets/examples/06_multi_agent.py两个异步Agent玩Taboo单词游戏。
python
undefinedAgents with message queues
Agents with message queues
shared = {
"hinter_queue": asyncio.Queue(),
"guesser_queue": asyncio.Queue()
}
shared = {
"hinter_queue": asyncio.Queue(),
"guesser_queue": asyncio.Queue()
}
Run concurrently
Run concurrently
await asyncio.gather(
hinter_flow.run_async(shared),
guesser_flow.run_async(shared)
)
**What it demonstrates:**
- AsyncNode for concurrent operations
- Message queues for inter-agent communication
- Multi-agent coordination
- Game logic with termination
**Run it:** `python assets/examples/06_multi_agent.py`
---await asyncio.gather(
hinter_flow.run_async(shared),
guesser_flow.run_async(shared)
)
**演示内容:**
- 用AsyncNode实现并发操作
- 用消息队列实现Agent间通信
- 多Agent协调
- 含终止逻辑的游戏规则
**运行方式:** `python assets/examples/06_multi_agent.py`
---Python Project Template
Python项目模板
Location:
assets/template/Official best-practice template with complete project structure.
Files:
- - Entry point
main.py - - Flow definition
flow.py - - Node implementations
nodes.py - - LLM wrappers
utils.py - - Dependencies
requirements.txt
Quick Start:
bash
cd assets/template/
pip install -r requirements.txt位置:
assets/template/遵循官方最佳实践的项目模板,包含完整的项目结构。
文件列表:
- - 入口文件
main.py - - 流定义
flow.py - - 节点实现
nodes.py - - LLM封装工具
utils.py - - 依赖清单
requirements.txt
快速开始:
bash
cd assets/template/
pip install -r requirements.txtEdit utils.py to add your LLM API key
Edit utils.py to add your LLM API key
python main.py
**What it demonstrates:**
- Separation of concerns
- Factory pattern for flows
- Clean data flow with shared store
- Configuration best practices
---python main.py
**演示内容:**
- 关注点分离
- 流的工厂模式
- 基于共享存储的清晰数据流
- 配置最佳实践
---Full Cookbook (47 Examples)
完整实战手册(47个示例)
The complete cookbook has 47 progressively complex examples on GitHub:
Dummy Level (☆☆☆):
Chat, Workflow, Agent, RAG, Map-Reduce, Streaming, Structured Output, Guardrails
Beginner Level (★☆☆):
Multi-Agent, Supervisor, Parallel (3x/8x), Thinking (CoT), Memory, MCP, Tracing
Plus 30+ more advanced patterns:
FastAPI integration, Code generator, Text-to-SQL, Voice chat, PDF vision, Website chatbot, and more.
Complete guide: See for full index and learning path.
assets/COOKBOOK_GUIDE.mdGitHub上的完整实战手册包含47个复杂度逐步提升的示例:
入门级(☆☆☆):
聊天、工作流、Agent、RAG、Map-Reduce、流式输出、结构化输出、安全防护
进阶级(★☆☆):
多Agent、监督者、并行处理(3倍/8倍)、思维链(CoT)、记忆、MCP、追踪
30+高级模式:
FastAPI集成、代码生成器、Text-to-SQL、语音聊天、PDF视觉识别、网站聊天机器人等。
完整指南: 查看获取完整索引与学习路径。
assets/COOKBOOK_GUIDE.mdDesign Patterns Summary
设计模式总结
| Pattern | Use Case | Key Component |
|---|---|---|
| Agent | Dynamic action selection | Action space + context management |
| Workflow | Multi-step task decomposition | Chained nodes |
| RAG | Context-aware answers | Offline indexing + online retrieval |
| Map Reduce | Large input processing | BatchNode with aggregation |
| Multi-Agent | Collaborative agents | Message queues + AsyncNode |
| Structured Output | Typed LLM responses | YAML prompting + validation |
| 模式 | 适用场景 | 核心组件 |
|---|---|---|
| Agent | 动态动作选择 | 动作空间 + 上下文管理 |
| 工作流 | 多步骤任务分解 | 链式节点 |
| RAG | 基于上下文的问答 | 离线索引 + 在线检索 |
| Map Reduce | 大输入处理 | 带聚合功能的BatchNode |
| 多Agent | 协作式Agent | 消息队列 + AsyncNode |
| 结构化输出 | 类型化LLM响应 | YAML提示词 + 验证 |
Communication Patterns
通信模式
Shared Store (Primary)
共享存储(主要方式)
python
undefinedpython
undefinedDesign data structure first
Design data structure first
shared = {
"user": {
"id": "user123",
"context": {
"weather": {"temp": 72, "condition": "sunny"},
"location": "San Francisco"
}
},
"results": {}
}
**Best Practice:** Separate data schema from compute logic using shared store.shared = {
"user": {
"id": "user123",
"context": {
"weather": {"temp": 72, "condition": "sunny"},
"location": "San Francisco"
}
},
"results": {}
}
**最佳实践:** 用共享存储分离数据Schema与计算逻辑。Params (For Batch Only)
参数(仅用于批量处理)
python
class SummarizeFile(Node):
def prep(self, shared):
# Access node's params
filename = self.params["filename"]
return shared["data"].get(filename, "")python
class SummarizeFile(Node):
def prep(self, shared):
# Access node's params
filename = self.params["filename"]
return shared["data"].get(filename, "")Set params
Set params
node = SummarizeFile()
node.set_params({"filename": "report.txt"})
undefinednode = SummarizeFile()
node.set_params({"filename": "report.txt"})
undefinedAdvanced Features
高级功能
Fault Tolerance
容错机制
python
undefinedpython
undefinedAutomatic retries
Automatic retries
my_node = SummarizeFile(max_retries=3, wait=10)
my_node = SummarizeFile(max_retries=3, wait=10)
Graceful fallback
Graceful fallback
class ResilientNode(Node):
def exec_fallback(self, prep_res, exc):
# Return fallback instead of crashing
return "There was an error processing your request."
undefinedclass ResilientNode(Node):
def exec_fallback(self, prep_res, exc):
# Return fallback instead of crashing
return "There was an error processing your request."
undefinedNested Flows
嵌套流
python
undefinedpython
undefinedFlows can act as nodes
Flows can act as nodes
node_a >> node_b
subflow = Flow(start=node_a)
node_a >> node_b
subflow = Flow(start=node_a)
Connect to other nodes
Connect to other nodes
subflow >> node_c
subflow >> node_c
Create parent flow
Create parent flow
parent_flow = Flow(start=subflow)
undefinedparent_flow = Flow(start=subflow)
undefinedMulti-Agent Communication
多Agent通信
python
class AgentNode(AsyncNode):
async def prep_async(self, _):
message_queue = self.params["messages"]
message = await message_queue.get()
print(f"Agent received: {message}")
return messagepython
class AgentNode(AsyncNode):
async def prep_async(self, _):
message_queue = self.params["messages"]
message = await message_queue.get()
print(f"Agent received: {message}")
return messageCreate self-loop for continuous listening
Create self-loop for continuous listening
agent = AgentNode()
agent >> agent
flow = AsyncFlow(start=agent)
undefinedagent = AgentNode()
agent >> agent
flow = AsyncFlow(start=agent)
undefinedUtility Functions
工具函数
LLM Wrappers
LLM封装
python
undefinedpython
undefinedOpenAI
OpenAI
def call_llm(prompt):
from openai import OpenAI
client = OpenAI(api_key="YOUR_API_KEY")
r = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}]
)
return r.choices[0].message.content
def call_llm(prompt):
from openai import OpenAI
client = OpenAI(api_key="YOUR_API_KEY")
r = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}]
)
return r.choices[0].message.content
Anthropic Claude
Anthropic Claude
def call_llm(prompt):
from anthropic import Anthropic
client = Anthropic(api_key="YOUR_API_KEY")
r = client.messages.create(
model="claude-sonnet-4-0",
messages=[{"role": "user", "content": prompt}]
)
return r.content[0].text
def call_llm(prompt):
from anthropic import Anthropic
client = Anthropic(api_key="YOUR_API_KEY")
r = client.messages.create(
model="claude-sonnet-4-0",
messages=[{"role": "user", "content": prompt}]
)
return r.content[0].text
Google Gemini
Google Gemini
def call_llm(prompt):
from google import genai
client = genai.Client(api_key='GEMINI_API_KEY')
response = client.models.generate_content(
model='gemini-2.5-pro',
contents=prompt
)
return response.text
undefineddef call_llm(prompt):
from google import genai
client = genai.Client(api_key='GEMINI_API_KEY')
response = client.models.generate_content(
model='gemini-2.5-pro',
contents=prompt
)
return response.text
undefinedEmbeddings
嵌入向量
python
undefinedpython
undefinedOpenAI
OpenAI
from openai import OpenAI
client = OpenAI(api_key="YOUR_API_KEY")
response = client.embeddings.create(
model="text-embedding-ada-002",
input=text
)
embedding = response.data[0].embedding
undefinedfrom openai import OpenAI
client = OpenAI(api_key="YOUR_API_KEY")
response = client.embeddings.create(
model="text-embedding-ada-002",
input=text
)
embedding = response.data[0].embedding
undefinedText Chunking
文本分块
python
undefinedpython
undefinedFixed-size chunking
Fixed-size chunking
def fixed_size_chunk(text, chunk_size=100):
return [text[i:i+chunk_size]
for i in range(0, len(text), chunk_size)]
def fixed_size_chunk(text, chunk_size=100):
return [text[i:i+chunk_size]
for i in range(0, len(text), chunk_size)]
Sentence-based chunking
Sentence-based chunking
import nltk
def sentence_based_chunk(text, max_sentences=2):
sentences = nltk.sent_tokenize(text)
return [" ".join(sentences[i:i+max_sentences])
for i in range(0, len(sentences), max_sentences)]
undefinedimport nltk
def sentence_based_chunk(text, max_sentences=2):
sentences = nltk.sent_tokenize(text)
return [" ".join(sentences[i:i+max_sentences])
for i in range(0, len(sentences), max_sentences)]
undefinedAgentic Coding Guidelines
智能体编码指南
IMPORTANT for AI Agents building LLM systems:
- Start Simple - Begin with the smallest solution first
- Design First - Create high-level design (docs/design.md) before implementation
- Manual Testing - Solve example inputs manually to develop intuition
- Iterate Frequently - Expect hundreds of iterations on Steps 3-6
- Ask Humans - Request feedback and clarification regularly
对于构建LLM系统的AI Agent,以下内容至关重要:
- 从简入手 - 先实现最小可行方案
- 先设计后编码 - 先创建高层设计文档(docs/design.md)再开始开发
- 手动测试 - 先手动解决示例输入,培养直觉
- 频繁迭代 - 预计在步骤3-6之间需要数百次迭代
- 寻求人类帮助 - 定期请求反馈与澄清
Recommended Project Structure
推荐项目结构
my_project/
├── main.py
├── nodes.py
├── flow.py
├── utils/
│ ├── __init__.py
│ ├── call_llm.py
│ └── search_web.py
├── requirements.txt
└── docs/
└── design.mdmy_project/
├── main.py
├── nodes.py
├── flow.py
├── utils/
│ ├── __init__.py
│ ├── call_llm.py
│ └── search_web.py
├── requirements.txt
└── docs/
└── design.mdDevelopment Workflow
开发工作流
mermaid
flowchart LR
start[Start] --> batch[Batch]
batch --> check[Check]
check -->|OK| process
check -->|Error| fix[Fix]
fix --> check
subgraph process[Process]
step1[Step 1] --> step2[Step 2]
end
process --> endNode[End]mermaid
flowchart LR
start[Start] --> batch[Batch]
batch --> check[Check]
check -->|OK| process
check -->|Error| fix[Fix]
fix --> check
subgraph process[Process]
step1[Step 1] --> step2[Step 2]
end
process --> endNode[End]Best Practices
最佳实践
Context Management (Agents)
上下文管理(Agent)
- Relevant & Minimal - Retrieve most relevant via RAG, not entire history
- Avoid "lost in the middle" - LLMs overlook mid-prompt content even with large windows
- 相关且精简 - 通过RAG获取最相关的内容,而非全部历史
- 避免"中间信息丢失" - 即使LLM有大上下文窗口,也容易忽略提示词中间部分的内容
Action Space Design (Agents)
动作空间设计(Agent)
- Unambiguous - Avoid overlapping actions (e.g., one instead of separate
read_databaseandread_databases)read_csvs - Incremental - Feed 500 lines or 1 page at a time, not all at once
- Overview-zoom-in - Show structure first (TOC, summary), then details
- Parameterized - Enable flexible actions with parameters (columns, SQL queries)
- Backtracking - Allow undo instead of full restart
- 明确无歧义 - 避免重叠动作(例如:用一个代替
read_database和read_databases)read_csvs - 增量式处理 - 每次传入500行或1页内容,而非全部
- 先概览后深入 - 先展示结构(目录、摘要),再提供细节
- 参数化 - 用参数实现灵活动作(列、SQL查询)
- 支持回溯 - 允许撤销操作,而非完全重启
Error Handling
错误处理
- No try/except in utilities - Let Node retry mechanism handle failures
- Use exec_fallback() - Provide graceful degradation instead of crashes
- 工具函数中不要加try/except - 让节点的重试机制处理失败
- 使用exec_fallback() - 提供优雅降级方案,而非直接崩溃
Performance Tips
性能优化技巧
- Batch APIs - Use LLM batch inference for multiple prompts
- Rate Limiting - Use semaphores to avoid API limits
- Parallel only for I/O - Python GIL prevents true CPU parallelism
- Independent tasks - Don't parallelize dependent operations
- 批量API调用 - 对多个提示词使用LLM批量推理
- 速率限制 - 用信号量避免触发API调用限制
- 仅对I/O操作并行化 - Python GIL限制了真正的CPU并行
- 独立任务并行 - 不要对依赖操作进行并行化
Reference Files
参考文档
This skill includes comprehensive documentation in :
references/core_abstraction.md- Node - Basic building block with prep/exec/post
- Flow - Orchestration and graph control
- Communication - Shared store vs params
- Batch - BatchNode and BatchFlow patterns
- Async - AsyncNode for I/O-bound tasks
- Parallel - AsyncParallelBatchNode/Flow
- Agent - Dynamic action selection
- Workflow - Task decomposition chains
- RAG - Retrieval augmented generation
- Map Reduce - Large input processing
- Structured Output - YAML-based schemas
- Multi-Agents - Inter-agent communication
- LLM Wrappers - OpenAI, Anthropic, Google, Azure
- Embeddings - Text embedding APIs
- Vector Databases - FAISS, Pinecone, Qdrant, etc.
- Web Search - Google, Bing, DuckDuckGo, Brave
- Text Chunking - Fixed-size and sentence-based
- Text-to-Speech - AWS Polly, Google Cloud, Azure, IBM
- Visualization - Mermaid diagrams and call stacks
- Agentic Coding - Development workflow guidance
本技能包含完整的文档,位于:
references/core_abstraction.md- Node - 包含prep/exec/post的基本构建块
- Flow - 编排与图控制
- 通信 - 共享存储 vs 参数
- 批量处理 - BatchNode与BatchFlow模式
- 异步 - 用于I/O密集型任务的AsyncNode
- 并行 - AsyncParallelBatchNode/Flow
- Agent - 动态动作选择
- 工作流 - 任务分解链
- RAG - 检索增强生成
- Map Reduce - 大输入处理
- 结构化输出 - 基于YAML的Schema
- 多Agent - Agent间通信
- LLM封装 - OpenAI、Anthropic、Google、Azure
- 嵌入向量 - 文本嵌入API
- 向量数据库 - FAISS、Pinecone、Qdrant等
- 网页搜索 - Google、Bing、DuckDuckGo、Brave
- 文本分块 - 固定大小与基于句子的分块
- 文本转语音 - AWS Polly、Google Cloud、Azure、IBM
- 可视化 - Mermaid图与调用栈
- 智能体编码 - 开发工作流指导
Navigation Guide
导航指南
For Beginners
面向初学者
- Start with Node and Flow basics
- Learn Communication (shared store)
- Try simple Workflow example
- Read Agentic Coding guidelines
- 从Node和Flow基础开始
- 学习通信(共享存储)
- 尝试简单的工作流示例
- 阅读智能体编码指南
For Specific Use Cases
面向特定场景
- Document processing → Batch + Map Reduce
- Question answering → RAG
- Dynamic task planning → Agent
- Multi-step pipelines → Workflow
- Real-time systems → Async + Parallel
- Collaborative AI → Multi-Agents
- 文档处理 → 批量处理 + Map Reduce
- 问答系统 → RAG
- 动态任务规划 → Agent
- 多步骤流水线 → 工作流
- 实时系统 → 异步 + 并行
- 协作式AI → 多Agent
For Advanced Users
面向高级用户
- Nested flows for complex pipelines
- Custom fault tolerance with exec_fallback
- Parallel processing with rate limiting
- Multi-agent communication patterns
- Custom visualization and debugging tools
- 用嵌套流构建复杂流水线
- 用exec_fallback实现自定义容错
- 带速率限制的并行处理
- 多Agent通信模式
- 自定义可视化与调试工具
Common Pitfalls
常见陷阱
❌ Don't use Multi-Agents unless necessary - Start simple!
❌ Don't parallelize dependent operations
❌ Don't add try/except in utility functions called from exec()
❌ Don't use node.run() in production - Always use flow.run()
❌ Don't modify shared store in exec() - Use prep() and post()
✅ Do design data schema before implementation
✅ Do use shared store for data, params for identifiers
✅ Do leverage built-in retry mechanisms
✅ Do validate structured output with assertions
✅ Do start with simplest solution and iterate
❌ 不要不必要地使用多Agent - 从简单方案开始!
❌ 不要对依赖操作进行并行化
❌ 不要在exec()调用的工具函数中添加try/except
❌ 不要在生产环境中使用node.run() - 始终使用flow.run()
❌ 不要在exec()中修改共享存储 - 使用prep()和post()
✅ 要先设计数据Schema再实现
✅ 要用共享存储存数据,用参数存标识符
✅ 要利用内置的重试机制
✅ 要用断言验证结构化输出
✅ 要从最简方案开始,逐步迭代
Resources
资源
Official Docs: https://the-pocket.github.io/PocketFlow/
Framework Philosophy:
- Minimalist (100 lines of core code)
- No vendor lock-in (implement your own utilities)
- Separation of concerns (graph + shared store)
- Graph-based workflow modeling
This skill was generated from PocketFlow official documentation. For detailed examples and complete API reference, see .
references/core_abstraction.md框架设计理念:
- 极简主义(核心代码仅100行)
- 无厂商锁定(可自行实现工具函数)
- 关注点分离(图 + 共享存储)
- 基于图的工作流建模
*本技能基于PocketFlow官方文档生成。如需详细示例与完整API参考,请查看。
references/core_abstraction.md