pocketflow

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

PocketFlow Skill

PocketFlow 技能指南

A comprehensive guide to building LLM applications using PocketFlow - a 100-line minimalist framework for Agents, Task Decomposition, RAG, and more.
这是一份使用PocketFlow构建LLM应用的综合指南,PocketFlow是一个仅100行代码的极简框架,支持Agent、任务分解、RAG等功能。

When to Use This Skill

何时使用本技能

Activate this skill when working with:
  • Graph-based LLM workflows - Building complex AI systems with nodes and flows
  • Agentic applications - Creating autonomous agents with dynamic action selection
  • Task decomposition - Breaking down complex LLM tasks into manageable steps
  • RAG systems - Implementing Retrieval Augmented Generation pipelines
  • Batch processing - Handling large inputs or multiple files with LLMs
  • Multi-agent systems - Coordinating multiple AI agents
  • Async workflows - Building I/O-bound LLM applications with concurrency
在处理以下场景时启用本技能:
  • 基于图的LLM工作流 - 用节点和流构建复杂AI系统
  • 智能体应用 - 创建具备动态动作选择能力的自主Agent
  • 任务分解 - 将复杂LLM任务拆分为可管理的步骤
  • RAG系统 - 实现检索增强生成(Retrieval Augmented Generation)流水线
  • 批量处理 - 用LLM处理大输入或多文件
  • 多Agent系统 - 协调多个AI Agent
  • 异步工作流 - 构建支持并发的I/O密集型LLM应用

Core Concepts

核心概念

Architecture Overview

架构概述

PocketFlow models LLM workflows as Graph + Shared Store:
python
undefined
PocketFlow将LLM工作流建模为图 + 共享存储
python
undefined

Shared Store: Central data storage

Shared Store: Central data storage

shared = { "data": {}, "summary": {}, "config": {...} }
shared = { "data": {}, "summary": {}, "config": {...} }

Graph: Nodes connected by transitions

Graph: Nodes connected by transitions

node_a >> node_b >> node_c flow = Flow(start=node_a) flow.run(shared)
undefined
node_a >> node_b >> node_c flow = Flow(start=node_a) flow.run(shared)
undefined

The Node: Building Block

节点:基本构建块

Every Node has 3 steps:
prep()
exec()
post()
python
class SummarizeFile(Node):
    def prep(self, shared):
        # Get data from shared store
        return shared["data"]

    def exec(self, prep_res):
        # Process with LLM (retries built-in)
        prompt = f"Summarize this text in 10 words: {prep_res}"
        summary = call_llm(prompt)
        return summary

    def post(self, shared, prep_res, exec_res):
        # Write results back to shared store
        shared["summary"] = exec_res
        return "default"  # Action for flow control
Why 3 steps? Separation of concerns - data storage and processing operate separately.
每个节点包含3个步骤:
prep()
exec()
post()
python
class SummarizeFile(Node):
    def prep(self, shared):
        # Get data from shared store
        return shared["data"]

    def exec(self, prep_res):
        # Process with LLM (retries built-in)
        prompt = f"Summarize this text in 10 words: {prep_res}"
        summary = call_llm(prompt)
        return summary

    def post(self, shared, prep_res, exec_res):
        # Write results back to shared store
        shared["summary"] = exec_res
        return "default"  # Action for flow control
为什么分3步? 关注点分离 - 数据存储与处理独立运行。

The Flow: Orchestration

流:编排

python
undefined
python
undefined

Simple sequence

Simple sequence

load_data >> summarize >> save_result flow = Flow(start=load_data) flow.run(shared)
load_data >> summarize >> save_result flow = Flow(start=load_data) flow.run(shared)

Branching with actions

Branching with actions

review - "approved" >> payment review - "needs_revision" >> revise review - "rejected" >> finish revise >> review # Loop back
flow = Flow(start=review)
undefined
review - "approved" >> payment review - "needs_revision" >> revise review - "rejected" >> finish revise >> review # Loop back
flow = Flow(start=review)
undefined

Quick Reference

快速参考

1. Basic Node Pattern

1. 基础节点模式

python
class LoadData(Node):
    def post(self, shared, prep_res, exec_res):
        shared["data"] = "Some text content"
        return None

class Summarize(Node):
    def prep(self, shared):
        return shared["data"]

    def exec(self, prep_res):
        return call_llm(f"Summarize: {prep_res}")

    def post(self, shared, prep_res, exec_res):
        shared["summary"] = exec_res
        return "default"
python
class LoadData(Node):
    def post(self, shared, prep_res, exec_res):
        shared["data"] = "Some text content"
        return None

class Summarize(Node):
    def prep(self, shared):
        return shared["data"]

    def exec(self, prep_res):
        return call_llm(f"Summarize: {prep_res}")

    def post(self, shared, prep_res, exec_res):
        shared["summary"] = exec_res
        return "default"

Connect and run

Connect and run

load_data >> summarize flow = Flow(start=load_data) flow.run(shared)
undefined
load_data >> summarize flow = Flow(start=load_data) flow.run(shared)
undefined

2. Batch Processing

2. 批量处理

BatchNode - Process large inputs in chunks:
python
class MapSummaries(BatchNode):
    def prep(self, shared):
        # Chunk big file
        content = shared["data"]
        chunk_size = 10000
        return [content[i:i+chunk_size]
                for i in range(0, len(content), chunk_size)]

    def exec(self, chunk):
        # Process each chunk
        return call_llm(f"Summarize: {chunk}")

    def post(self, shared, prep_res, exec_res_list):
        # Combine all results
        shared["summary"] = "\n".join(exec_res_list)
        return "default"
BatchFlow - Run flow multiple times with different parameters:
python
class SummarizeAllFiles(BatchFlow):
    def prep(self, shared):
        filenames = list(shared["data"].keys())
        # Return list of parameter dicts
        return [{"filename": fn} for fn in filenames]

class LoadFile(Node):
    def prep(self, shared):
        # Access filename from params
        filename = self.params["filename"]
        return filename
BatchNode - 分块处理大输入:
python
class MapSummaries(BatchNode):
    def prep(self, shared):
        # Chunk big file
        content = shared["data"]
        chunk_size = 10000
        return [content[i:i+chunk_size]
                for i in range(0, len(content), chunk_size)]

    def exec(self, chunk):
        # Process each chunk
        return call_llm(f"Summarize: {chunk}")

    def post(self, shared, prep_res, exec_res_list):
        # Combine all results
        shared["summary"] = "\n".join(exec_res_list)
        return "default"
BatchFlow - 使用不同参数多次运行流:
python
class SummarizeAllFiles(BatchFlow):
    def prep(self, shared):
        filenames = list(shared["data"].keys())
        # Return list of parameter dicts
        return [{"filename": fn} for fn in filenames]

class LoadFile(Node):
    def prep(self, shared):
        # Access filename from params
        filename = self.params["filename"]
        return filename

3. Agent Pattern

3. Agent模式

python
class DecideAction(Node):
    def exec(self, inputs):
        query, context = inputs
        prompt = f"""
Given input: {query}
Previous search results: {context}
Should I: 1) Search web for more info 2) Answer with current knowledge

Output in yaml:
```yaml
action: search/answer
reason: why this action
search_term: search phrase if action is search
```"""
        resp = call_llm(prompt)
        yaml_str = resp.split("```yaml")[1].split("```")[0]
        action_data = yaml.safe_load(yaml_str)
        return action_data
python
class DecideAction(Node):
    def exec(self, inputs):
        query, context = inputs
        prompt = f"""
Given input: {query}
Previous search results: {context}
Should I: 1) Search web for more info 2) Answer with current knowledge

Output in yaml:
```yaml
action: search/answer
reason: why this action
search_term: search phrase if action is search
```"""
        resp = call_llm(prompt)
        yaml_str = resp.split("```yaml")[1].split("```")[0]
        action_data = yaml.safe_load(yaml_str)
        return action_data

Build agent graph

Build agent graph

decide >> search_web decide - "answer" >> provide_answer search_web >> decide # Loop back for more searches
agent_flow = Flow(start=decide)
undefined
decide >> search_web decide - "answer" >> provide_answer search_web >> decide # Loop back for more searches
agent_flow = Flow(start=decide)
undefined

4. RAG (Retrieval Augmented Generation)

4. RAG(检索增强生成)

Stage 1: Offline Indexing
python
class ChunkDocs(BatchNode):
    def prep(self, shared):
        return shared["files"]

    def exec(self, filepath):
        with open(filepath, "r") as f:
            text = f.read()
        # Chunk by 100 chars
        size = 100
        return [text[i:i+size] for i in range(0, len(text), size)]

    def post(self, shared, prep_res, exec_res_list):
        shared["all_chunks"] = [c for chunks in exec_res_list
                                for c in chunks]

chunk_docs >> embed_docs >> build_index
offline_flow = Flow(start=chunk_docs)
Stage 2: Online Query
python
class RetrieveDocs(Node):
    def exec(self, inputs):
        q_emb, index, chunks = inputs
        I, D = search_index(index, q_emb, top_k=1)
        return chunks[I[0][0]]

embed_query >> retrieve_docs >> generate_answer
online_flow = Flow(start=embed_query)
阶段1:离线索引
python
class ChunkDocs(BatchNode):
    def prep(self, shared):
        return shared["files"]

    def exec(self, filepath):
        with open(filepath, "r") as f:
            text = f.read()
        # Chunk by 100 chars
        size = 100
        return [text[i:i+size] for i in range(0, len(text), size)]

    def post(self, shared, prep_res, exec_res_list):
        shared["all_chunks"] = [c for chunks in exec_res_list
                                for c in chunks]

chunk_docs >> embed_docs >> build_index
offline_flow = Flow(start=chunk_docs)
阶段2:在线查询
python
class RetrieveDocs(Node):
    def exec(self, inputs):
        q_emb, index, chunks = inputs
        I, D = search_index(index, q_emb, top_k=1)
        return chunks[I[0][0]]

embed_query >> retrieve_docs >> generate_answer
online_flow = Flow(start=embed_query)

5. Async & Parallel

5. 异步与并行

AsyncNode for I/O-bound operations:
python
class SummarizeThenVerify(AsyncNode):
    async def prep_async(self, shared):
        doc_text = await read_file_async(shared["doc_path"])
        return doc_text

    async def exec_async(self, prep_res):
        summary = await call_llm_async(f"Summarize: {prep_res}")
        return summary

    async def post_async(self, shared, prep_res, exec_res):
        decision = await gather_user_feedback(exec_res)
        if decision == "approve":
            shared["summary"] = exec_res
        return "default"
AsyncNode 用于I/O密集型操作:
python
class SummarizeThenVerify(AsyncNode):
    async def prep_async(self, shared):
        doc_text = await read_file_async(shared["doc_path"])
        return doc_text

    async def exec_async(self, prep_res):
        summary = await call_llm_async(f"Summarize: {prep_res}")
        return summary

    async def post_async(self, shared, prep_res, exec_res):
        decision = await gather_user_feedback(exec_res)
        if decision == "approve":
            shared["summary"] = exec_res
        return "default"

Must wrap in AsyncFlow

Must wrap in AsyncFlow

node = SummarizeThenVerify() flow = AsyncFlow(start=node) await flow.run_async(shared)

**AsyncParallelBatchNode** - Process multiple items concurrently:

```python
class ParallelSummaries(AsyncParallelBatchNode):
    async def prep_async(self, shared):
        return shared["texts"]  # List of texts

    async def exec_async(self, text):
        # Runs in parallel for each text
        return await call_llm_async(f"Summarize: {text}")

    async def post_async(self, shared, prep_res, exec_res_list):
        shared["summary"] = "\n\n".join(exec_res_list)
        return "default"
node = SummarizeThenVerify() flow = AsyncFlow(start=node) await flow.run_async(shared)

**AsyncParallelBatchNode** - 并发处理多个项:

```python
class ParallelSummaries(AsyncParallelBatchNode):
    async def prep_async(self, shared):
        return shared["texts"]  # List of texts

    async def exec_async(self, text):
        # Runs in parallel for each text
        return await call_llm_async(f"Summarize: {text}")

    async def post_async(self, shared, prep_res, exec_res_list):
        shared["summary"] = "\n\n".join(exec_res_list)
        return "default"

6. Workflow (Task Decomposition)

6. 工作流(任务分解)

python
class GenerateOutline(Node):
    def prep(self, shared):
        return shared["topic"]

    def exec(self, topic):
        return call_llm(f"Create outline for: {topic}")

    def post(self, shared, prep_res, exec_res):
        shared["outline"] = exec_res

class WriteSection(Node):
    def exec(self, outline):
        return call_llm(f"Write content: {outline}")

    def post(self, shared, prep_res, exec_res):
        shared["draft"] = exec_res

class ReviewAndRefine(Node):
    def exec(self, draft):
        return call_llm(f"Review and improve: {draft}")
python
class GenerateOutline(Node):
    def prep(self, shared):
        return shared["topic"]

    def exec(self, topic):
        return call_llm(f"Create outline for: {topic}")

    def post(self, shared, prep_res, exec_res):
        shared["outline"] = exec_res

class WriteSection(Node):
    def exec(self, outline):
        return call_llm(f"Write content: {outline}")

    def post(self, shared, prep_res, exec_res):
        shared["draft"] = exec_res

class ReviewAndRefine(Node):
    def exec(self, draft):
        return call_llm(f"Review and improve: {draft}")

Chain the workflow

Chain the workflow

outline >> write >> review workflow = Flow(start=outline)
undefined
outline >> write >> review workflow = Flow(start=outline)
undefined

7. Structured Output

7. 结构化输出

python
class SummarizeNode(Node):
    def exec(self, prep_res):
        prompt = f"""
Summarize the following text as YAML, with exactly 3 bullet points

{prep_res}

Output:
```yaml
summary:
  - bullet 1
  - bullet 2
  - bullet 3
```"""
        response = call_llm(prompt)
        yaml_str = response.split("```yaml")[1].split("```")[0].strip()

        import yaml
        structured_result = yaml.safe_load(yaml_str)

        # Validate
        assert "summary" in structured_result
        assert isinstance(structured_result["summary"], list)

        return structured_result
Why YAML? Modern LLMs handle YAML better than JSON (less escaping issues).

python
class SummarizeNode(Node):
    def exec(self, prep_res):
        prompt = f"""
Summarize the following text as YAML, with exactly 3 bullet points

{prep_res}

Output:
```yaml
summary:
  - bullet 1
  - bullet 2
  - bullet 3
```"""
        response = call_llm(prompt)
        yaml_str = response.split("```yaml")[1].split("```")[0].strip()

        import yaml
        structured_result = yaml.safe_load(yaml_str)

        # Validate
        assert "summary" in structured_result
        assert isinstance(structured_result["summary"], list)

        return structured_result
为什么用YAML? 现代LLM处理YAML比JSON更顺畅(更少转义问题)。

🍳 Cookbook: Real-World Examples

🍳 实战手册:真实场景示例

This skill includes 6 production-ready examples from the official PocketFlow cookbook, plus a complete Python project template.
📂 Location:
assets/examples/
and
assets/template/
本技能包含官方PocketFlow实战手册中的6个生产就绪示例,以及完整的Python项目模板
📂 位置:
assets/examples/
assets/template/

Example 1: Interactive Chat Bot (☆☆☆)

示例1:交互式聊天机器人(☆☆☆)

File:
assets/examples/01_chat.py
A chat bot with conversation history that loops back to itself.
python
undefined
文件:
assets/examples/01_chat.py
一款带对话历史的聊天机器人,支持循环对话。
python
undefined

Key pattern: Self-looping node

Key pattern: Self-looping node

chat_node = ChatNode() chat_node - "continue" >> chat_node # Loop for continuous chat flow = Flow(start=chat_node)

**What it demonstrates:**
- Message history management
- Self-looping nodes
- Graceful exit handling
- User input processing

**Run it:** `python assets/examples/01_chat.py`

---
chat_node = ChatNode() chat_node - "continue" >> chat_node # Loop for continuous chat flow = Flow(start=chat_node)

**演示内容:**
- 对话历史管理
- 自循环节点
- 优雅退出处理
- 用户输入处理

**运行方式:** `python assets/examples/01_chat.py`

---

Example 2: Article Writing Workflow (☆☆☆)

示例2:文章写作工作流(☆☆☆)

File:
assets/examples/02_workflow.py
Multi-step content creation: outline → draft → refine.
python
undefined
文件:
assets/examples/02_workflow.py
多步骤内容创作:大纲 → 草稿 → 优化。
python
undefined

Sequential pipeline

Sequential pipeline

outline >> draft >> refine flow = Flow(start=outline)

**What it demonstrates:**
- Task decomposition
- Sequential workflows
- Progressive content generation

**Run it:** `python assets/examples/02_workflow.py "AI Safety"`

---
outline >> draft >> refine flow = Flow(start=outline)

**演示内容:**
- 任务分解
- 顺序工作流
- 渐进式内容生成

**运行方式:** `python assets/examples/02_workflow.py "AI Safety"`

---

Example 3: Research Agent (☆☆☆)

示例3:研究Agent(☆☆☆)

File:
assets/examples/03_agent.py
Agent that decides whether to search or answer.
python
undefined
文件:
assets/examples/03_agent.py
可自主决定搜索信息还是直接回答的Agent。
python
undefined

Branching based on decision

Branching based on decision

decide - "search" >> search decide - "answer" >> answer search - "continue" >> decide # Loop back

**What it demonstrates:**
- Dynamic action selection
- Branching logic
- Agent decision-making
- Iterative research loops

**Run it:** `python assets/examples/03_agent.py "Nobel Prize 2024"`

---
decide - "search" >> search decide - "answer" >> answer search - "continue" >> decide # Loop back

**演示内容:**
- 动态动作选择
- 分支逻辑
- Agent决策机制
- 迭代式研究循环

**运行方式:** `python assets/examples/03_agent.py "Nobel Prize 2024"`

---

Example 4: RAG System (☆☆☆)

示例4:RAG系统(☆☆☆)

File:
assets/examples/04_rag.py
Complete two-stage RAG pipeline with offline indexing and online querying.
python
undefined
文件:
assets/examples/04_rag.py
完整的两阶段RAG流水线,包含离线索引和在线查询。
python
undefined

Stage 1: Offline indexing

Stage 1: Offline indexing

embed_docs >> build_index offline_flow = Flow(start=embed_docs)
embed_docs >> build_index offline_flow = Flow(start=embed_docs)

Stage 2: Online query

Stage 2: Online query

embed_query >> retrieve >> generate online_flow = Flow(start=embed_query)

**What it demonstrates:**
- Document embedding and indexing
- Similarity search
- Context-based generation
- Multi-stage pipelines

**Run it:** `python assets/examples/04_rag.py --"How to install PocketFlow?"`

---
embed_query >> retrieve >> generate online_flow = Flow(start=embed_query)

**演示内容:**
- 文档嵌入与索引
- 相似度搜索
- 基于上下文的生成
- 多阶段流水线

**运行方式:** `python assets/examples/04_rag.py --"How to install PocketFlow?"`

---

Example 5: Structured Output Parser (☆☆☆)

示例5:结构化输出解析器(☆☆☆)

File:
assets/examples/05_structured_output.py
Resume parser extracting structured data with YAML.
python
undefined
文件:
assets/examples/05_structured_output.py
使用YAML提取结构化数据的简历解析器。
python
undefined

Parse YAML from LLM response

Parse YAML from LLM response

yaml_str = response.split("
yaml")[1].split("
")[0] structured_result = yaml.safe_load(yaml_str)
yaml_str = response.split("
yaml")[1].split("
")[0] structured_result = yaml.safe_load(yaml_str)

Validate structure

Validate structure

assert "name" in structured_result assert "email" in structured_result

**What it demonstrates:**
- Structured LLM responses with YAML
- Schema validation
- Retry logic for parsing
- Data extraction patterns

**Run it:** `python assets/examples/05_structured_output.py`

---
assert "name" in structured_result assert "email" in structured_result

**演示内容:**
- 基于YAML的LLM结构化响应
- Schema验证
- 解析重试逻辑
- 数据提取模式

**运行方式:** `python assets/examples/05_structured_output.py`

---

Example 6: Multi-Agent Communication (★☆☆)

示例6:多Agent通信(★☆☆)

File:
assets/examples/06_multi_agent.py
Two async agents playing Taboo word game.
python
undefined
文件:
assets/examples/06_multi_agent.py
两个异步Agent玩Taboo单词游戏。
python
undefined

Agents with message queues

Agents with message queues

shared = { "hinter_queue": asyncio.Queue(), "guesser_queue": asyncio.Queue() }
shared = { "hinter_queue": asyncio.Queue(), "guesser_queue": asyncio.Queue() }

Run concurrently

Run concurrently

await asyncio.gather( hinter_flow.run_async(shared), guesser_flow.run_async(shared) )

**What it demonstrates:**
- AsyncNode for concurrent operations
- Message queues for inter-agent communication
- Multi-agent coordination
- Game logic with termination

**Run it:** `python assets/examples/06_multi_agent.py`

---
await asyncio.gather( hinter_flow.run_async(shared), guesser_flow.run_async(shared) )

**演示内容:**
- 用AsyncNode实现并发操作
- 用消息队列实现Agent间通信
- 多Agent协调
- 含终止逻辑的游戏规则

**运行方式:** `python assets/examples/06_multi_agent.py`

---

Python Project Template

Python项目模板

Location:
assets/template/
Official best-practice template with complete project structure.
Files:
  • main.py
    - Entry point
  • flow.py
    - Flow definition
  • nodes.py
    - Node implementations
  • utils.py
    - LLM wrappers
  • requirements.txt
    - Dependencies
Quick Start:
bash
cd assets/template/
pip install -r requirements.txt
位置:
assets/template/
遵循官方最佳实践的项目模板,包含完整的项目结构。
文件列表:
  • main.py
    - 入口文件
  • flow.py
    - 流定义
  • nodes.py
    - 节点实现
  • utils.py
    - LLM封装工具
  • requirements.txt
    - 依赖清单
快速开始:
bash
cd assets/template/
pip install -r requirements.txt

Edit utils.py to add your LLM API key

Edit utils.py to add your LLM API key

python main.py

**What it demonstrates:**
- Separation of concerns
- Factory pattern for flows
- Clean data flow with shared store
- Configuration best practices

---
python main.py

**演示内容:**
- 关注点分离
- 流的工厂模式
- 基于共享存储的清晰数据流
- 配置最佳实践

---

Full Cookbook (47 Examples)

完整实战手册(47个示例)

The complete cookbook has 47 progressively complex examples on GitHub:
Dummy Level (☆☆☆): Chat, Workflow, Agent, RAG, Map-Reduce, Streaming, Structured Output, Guardrails
Beginner Level (★☆☆): Multi-Agent, Supervisor, Parallel (3x/8x), Thinking (CoT), Memory, MCP, Tracing
Plus 30+ more advanced patterns: FastAPI integration, Code generator, Text-to-SQL, Voice chat, PDF vision, Website chatbot, and more.
Complete guide: See
assets/COOKBOOK_GUIDE.md
for full index and learning path.

GitHub上的完整实战手册包含47个复杂度逐步提升的示例
入门级(☆☆☆): 聊天、工作流、Agent、RAG、Map-Reduce、流式输出、结构化输出、安全防护
进阶级(★☆☆): 多Agent、监督者、并行处理(3倍/8倍)、思维链(CoT)、记忆、MCP、追踪
30+高级模式: FastAPI集成、代码生成器、Text-to-SQL、语音聊天、PDF视觉识别、网站聊天机器人等。
完整指南: 查看
assets/COOKBOOK_GUIDE.md
获取完整索引与学习路径。

Design Patterns Summary

设计模式总结

PatternUse CaseKey Component
AgentDynamic action selectionAction space + context management
WorkflowMulti-step task decompositionChained nodes
RAGContext-aware answersOffline indexing + online retrieval
Map ReduceLarge input processingBatchNode with aggregation
Multi-AgentCollaborative agentsMessage queues + AsyncNode
Structured OutputTyped LLM responsesYAML prompting + validation
模式适用场景核心组件
Agent动态动作选择动作空间 + 上下文管理
工作流多步骤任务分解链式节点
RAG基于上下文的问答离线索引 + 在线检索
Map Reduce大输入处理带聚合功能的BatchNode
多Agent协作式Agent消息队列 + AsyncNode
结构化输出类型化LLM响应YAML提示词 + 验证

Communication Patterns

通信模式

Shared Store (Primary)

共享存储(主要方式)

python
undefined
python
undefined

Design data structure first

Design data structure first

shared = { "user": { "id": "user123", "context": { "weather": {"temp": 72, "condition": "sunny"}, "location": "San Francisco" } }, "results": {} }

**Best Practice:** Separate data schema from compute logic using shared store.
shared = { "user": { "id": "user123", "context": { "weather": {"temp": 72, "condition": "sunny"}, "location": "San Francisco" } }, "results": {} }

**最佳实践:** 用共享存储分离数据Schema与计算逻辑。

Params (For Batch Only)

参数(仅用于批量处理)

python
class SummarizeFile(Node):
    def prep(self, shared):
        # Access node's params
        filename = self.params["filename"]
        return shared["data"].get(filename, "")
python
class SummarizeFile(Node):
    def prep(self, shared):
        # Access node's params
        filename = self.params["filename"]
        return shared["data"].get(filename, "")

Set params

Set params

node = SummarizeFile() node.set_params({"filename": "report.txt"})
undefined
node = SummarizeFile() node.set_params({"filename": "report.txt"})
undefined

Advanced Features

高级功能

Fault Tolerance

容错机制

python
undefined
python
undefined

Automatic retries

Automatic retries

my_node = SummarizeFile(max_retries=3, wait=10)
my_node = SummarizeFile(max_retries=3, wait=10)

Graceful fallback

Graceful fallback

class ResilientNode(Node): def exec_fallback(self, prep_res, exc): # Return fallback instead of crashing return "There was an error processing your request."
undefined
class ResilientNode(Node): def exec_fallback(self, prep_res, exc): # Return fallback instead of crashing return "There was an error processing your request."
undefined

Nested Flows

嵌套流

python
undefined
python
undefined

Flows can act as nodes

Flows can act as nodes

node_a >> node_b subflow = Flow(start=node_a)
node_a >> node_b subflow = Flow(start=node_a)

Connect to other nodes

Connect to other nodes

subflow >> node_c
subflow >> node_c

Create parent flow

Create parent flow

parent_flow = Flow(start=subflow)
undefined
parent_flow = Flow(start=subflow)
undefined

Multi-Agent Communication

多Agent通信

python
class AgentNode(AsyncNode):
    async def prep_async(self, _):
        message_queue = self.params["messages"]
        message = await message_queue.get()
        print(f"Agent received: {message}")
        return message
python
class AgentNode(AsyncNode):
    async def prep_async(self, _):
        message_queue = self.params["messages"]
        message = await message_queue.get()
        print(f"Agent received: {message}")
        return message

Create self-loop for continuous listening

Create self-loop for continuous listening

agent = AgentNode() agent >> agent flow = AsyncFlow(start=agent)
undefined
agent = AgentNode() agent >> agent flow = AsyncFlow(start=agent)
undefined

Utility Functions

工具函数

LLM Wrappers

LLM封装

python
undefined
python
undefined

OpenAI

OpenAI

def call_llm(prompt): from openai import OpenAI client = OpenAI(api_key="YOUR_API_KEY") r = client.chat.completions.create( model="gpt-4o", messages=[{"role": "user", "content": prompt}] ) return r.choices[0].message.content
def call_llm(prompt): from openai import OpenAI client = OpenAI(api_key="YOUR_API_KEY") r = client.chat.completions.create( model="gpt-4o", messages=[{"role": "user", "content": prompt}] ) return r.choices[0].message.content

Anthropic Claude

Anthropic Claude

def call_llm(prompt): from anthropic import Anthropic client = Anthropic(api_key="YOUR_API_KEY") r = client.messages.create( model="claude-sonnet-4-0", messages=[{"role": "user", "content": prompt}] ) return r.content[0].text
def call_llm(prompt): from anthropic import Anthropic client = Anthropic(api_key="YOUR_API_KEY") r = client.messages.create( model="claude-sonnet-4-0", messages=[{"role": "user", "content": prompt}] ) return r.content[0].text

Google Gemini

Google Gemini

def call_llm(prompt): from google import genai client = genai.Client(api_key='GEMINI_API_KEY') response = client.models.generate_content( model='gemini-2.5-pro', contents=prompt ) return response.text
undefined
def call_llm(prompt): from google import genai client = genai.Client(api_key='GEMINI_API_KEY') response = client.models.generate_content( model='gemini-2.5-pro', contents=prompt ) return response.text
undefined

Embeddings

嵌入向量

python
undefined
python
undefined

OpenAI

OpenAI

from openai import OpenAI client = OpenAI(api_key="YOUR_API_KEY") response = client.embeddings.create( model="text-embedding-ada-002", input=text ) embedding = response.data[0].embedding
undefined
from openai import OpenAI client = OpenAI(api_key="YOUR_API_KEY") response = client.embeddings.create( model="text-embedding-ada-002", input=text ) embedding = response.data[0].embedding
undefined

Text Chunking

文本分块

python
undefined
python
undefined

Fixed-size chunking

Fixed-size chunking

def fixed_size_chunk(text, chunk_size=100): return [text[i:i+chunk_size] for i in range(0, len(text), chunk_size)]
def fixed_size_chunk(text, chunk_size=100): return [text[i:i+chunk_size] for i in range(0, len(text), chunk_size)]

Sentence-based chunking

Sentence-based chunking

import nltk def sentence_based_chunk(text, max_sentences=2): sentences = nltk.sent_tokenize(text) return [" ".join(sentences[i:i+max_sentences]) for i in range(0, len(sentences), max_sentences)]
undefined
import nltk def sentence_based_chunk(text, max_sentences=2): sentences = nltk.sent_tokenize(text) return [" ".join(sentences[i:i+max_sentences]) for i in range(0, len(sentences), max_sentences)]
undefined

Agentic Coding Guidelines

智能体编码指南

IMPORTANT for AI Agents building LLM systems:
  1. Start Simple - Begin with the smallest solution first
  2. Design First - Create high-level design (docs/design.md) before implementation
  3. Manual Testing - Solve example inputs manually to develop intuition
  4. Iterate Frequently - Expect hundreds of iterations on Steps 3-6
  5. Ask Humans - Request feedback and clarification regularly
对于构建LLM系统的AI Agent,以下内容至关重要:
  1. 从简入手 - 先实现最小可行方案
  2. 先设计后编码 - 先创建高层设计文档(docs/design.md)再开始开发
  3. 手动测试 - 先手动解决示例输入,培养直觉
  4. 频繁迭代 - 预计在步骤3-6之间需要数百次迭代
  5. 寻求人类帮助 - 定期请求反馈与澄清

Recommended Project Structure

推荐项目结构

my_project/
├── main.py
├── nodes.py
├── flow.py
├── utils/
│   ├── __init__.py
│   ├── call_llm.py
│   └── search_web.py
├── requirements.txt
└── docs/
    └── design.md
my_project/
├── main.py
├── nodes.py
├── flow.py
├── utils/
│   ├── __init__.py
│   ├── call_llm.py
│   └── search_web.py
├── requirements.txt
└── docs/
    └── design.md

Development Workflow

开发工作流

mermaid
flowchart LR
    start[Start] --> batch[Batch]
    batch --> check[Check]
    check -->|OK| process
    check -->|Error| fix[Fix]
    fix --> check

    subgraph process[Process]
      step1[Step 1] --> step2[Step 2]
    end

    process --> endNode[End]
mermaid
flowchart LR
    start[Start] --> batch[Batch]
    batch --> check[Check]
    check -->|OK| process
    check -->|Error| fix[Fix]
    fix --> check

    subgraph process[Process]
      step1[Step 1] --> step2[Step 2]
    end

    process --> endNode[End]

Best Practices

最佳实践

Context Management (Agents)

上下文管理(Agent)

  • Relevant & Minimal - Retrieve most relevant via RAG, not entire history
  • Avoid "lost in the middle" - LLMs overlook mid-prompt content even with large windows
  • 相关且精简 - 通过RAG获取最相关的内容,而非全部历史
  • 避免"中间信息丢失" - 即使LLM有大上下文窗口,也容易忽略提示词中间部分的内容

Action Space Design (Agents)

动作空间设计(Agent)

  • Unambiguous - Avoid overlapping actions (e.g., one
    read_database
    instead of separate
    read_databases
    and
    read_csvs
    )
  • Incremental - Feed 500 lines or 1 page at a time, not all at once
  • Overview-zoom-in - Show structure first (TOC, summary), then details
  • Parameterized - Enable flexible actions with parameters (columns, SQL queries)
  • Backtracking - Allow undo instead of full restart
  • 明确无歧义 - 避免重叠动作(例如:用一个
    read_database
    代替
    read_databases
    read_csvs
  • 增量式处理 - 每次传入500行或1页内容,而非全部
  • 先概览后深入 - 先展示结构(目录、摘要),再提供细节
  • 参数化 - 用参数实现灵活动作(列、SQL查询)
  • 支持回溯 - 允许撤销操作,而非完全重启

Error Handling

错误处理

  • No try/except in utilities - Let Node retry mechanism handle failures
  • Use exec_fallback() - Provide graceful degradation instead of crashes
  • 工具函数中不要加try/except - 让节点的重试机制处理失败
  • 使用exec_fallback() - 提供优雅降级方案,而非直接崩溃

Performance Tips

性能优化技巧

  • Batch APIs - Use LLM batch inference for multiple prompts
  • Rate Limiting - Use semaphores to avoid API limits
  • Parallel only for I/O - Python GIL prevents true CPU parallelism
  • Independent tasks - Don't parallelize dependent operations
  • 批量API调用 - 对多个提示词使用LLM批量推理
  • 速率限制 - 用信号量避免触发API调用限制
  • 仅对I/O操作并行化 - Python GIL限制了真正的CPU并行
  • 独立任务并行 - 不要对依赖操作进行并行化

Reference Files

参考文档

This skill includes comprehensive documentation in
references/core_abstraction.md
:
  • Node - Basic building block with prep/exec/post
  • Flow - Orchestration and graph control
  • Communication - Shared store vs params
  • Batch - BatchNode and BatchFlow patterns
  • Async - AsyncNode for I/O-bound tasks
  • Parallel - AsyncParallelBatchNode/Flow
  • Agent - Dynamic action selection
  • Workflow - Task decomposition chains
  • RAG - Retrieval augmented generation
  • Map Reduce - Large input processing
  • Structured Output - YAML-based schemas
  • Multi-Agents - Inter-agent communication
  • LLM Wrappers - OpenAI, Anthropic, Google, Azure
  • Embeddings - Text embedding APIs
  • Vector Databases - FAISS, Pinecone, Qdrant, etc.
  • Web Search - Google, Bing, DuckDuckGo, Brave
  • Text Chunking - Fixed-size and sentence-based
  • Text-to-Speech - AWS Polly, Google Cloud, Azure, IBM
  • Visualization - Mermaid diagrams and call stacks
  • Agentic Coding - Development workflow guidance
本技能包含完整的文档,位于
references/core_abstraction.md
  • Node - 包含prep/exec/post的基本构建块
  • Flow - 编排与图控制
  • 通信 - 共享存储 vs 参数
  • 批量处理 - BatchNode与BatchFlow模式
  • 异步 - 用于I/O密集型任务的AsyncNode
  • 并行 - AsyncParallelBatchNode/Flow
  • Agent - 动态动作选择
  • 工作流 - 任务分解链
  • RAG - 检索增强生成
  • Map Reduce - 大输入处理
  • 结构化输出 - 基于YAML的Schema
  • 多Agent - Agent间通信
  • LLM封装 - OpenAI、Anthropic、Google、Azure
  • 嵌入向量 - 文本嵌入API
  • 向量数据库 - FAISS、Pinecone、Qdrant等
  • 网页搜索 - Google、Bing、DuckDuckGo、Brave
  • 文本分块 - 固定大小与基于句子的分块
  • 文本转语音 - AWS Polly、Google Cloud、Azure、IBM
  • 可视化 - Mermaid图与调用栈
  • 智能体编码 - 开发工作流指导

Navigation Guide

导航指南

For Beginners

面向初学者

  1. Start with Node and Flow basics
  2. Learn Communication (shared store)
  3. Try simple Workflow example
  4. Read Agentic Coding guidelines
  1. NodeFlow基础开始
  2. 学习通信(共享存储)
  3. 尝试简单的工作流示例
  4. 阅读智能体编码指南

For Specific Use Cases

面向特定场景

  • Document processing → Batch + Map Reduce
  • Question answering → RAG
  • Dynamic task planning → Agent
  • Multi-step pipelines → Workflow
  • Real-time systems → Async + Parallel
  • Collaborative AI → Multi-Agents
  • 文档处理 → 批量处理 + Map Reduce
  • 问答系统 → RAG
  • 动态任务规划 → Agent
  • 多步骤流水线 → 工作流
  • 实时系统 → 异步 + 并行
  • 协作式AI → 多Agent

For Advanced Users

面向高级用户

  • Nested flows for complex pipelines
  • Custom fault tolerance with exec_fallback
  • Parallel processing with rate limiting
  • Multi-agent communication patterns
  • Custom visualization and debugging tools
  • 用嵌套流构建复杂流水线
  • 用exec_fallback实现自定义容错
  • 带速率限制的并行处理
  • 多Agent通信模式
  • 自定义可视化与调试工具

Common Pitfalls

常见陷阱

Don't use Multi-Agents unless necessary - Start simple! ❌ Don't parallelize dependent operations ❌ Don't add try/except in utility functions called from exec() ❌ Don't use node.run() in production - Always use flow.run() ❌ Don't modify shared store in exec() - Use prep() and post()
Do design data schema before implementation ✅ Do use shared store for data, params for identifiers ✅ Do leverage built-in retry mechanisms ✅ Do validate structured output with assertions ✅ Do start with simplest solution and iterate
不要不必要地使用多Agent - 从简单方案开始! ❌ 不要对依赖操作进行并行化 ❌ 不要在exec()调用的工具函数中添加try/except ❌ 不要在生产环境中使用node.run() - 始终使用flow.run() ❌ 不要在exec()中修改共享存储 - 使用prep()和post()
先设计数据Schema再实现 ✅ 用共享存储存数据,用参数存标识符 ✅ 利用内置的重试机制 ✅ 用断言验证结构化输出 ✅ 从最简方案开始,逐步迭代

Resources

资源

Framework Philosophy:
  • Minimalist (100 lines of core code)
  • No vendor lock-in (implement your own utilities)
  • Separation of concerns (graph + shared store)
  • Graph-based workflow modeling

This skill was generated from PocketFlow official documentation. For detailed examples and complete API reference, see
references/core_abstraction.md
.
框架设计理念:
  • 极简主义(核心代码仅100行)
  • 无厂商锁定(可自行实现工具函数)
  • 关注点分离(图 + 共享存储)
  • 基于图的工作流建模

*本技能基于PocketFlow官方文档生成。如需详细示例与完整API参考,请查看
references/core_abstraction.md