build-your-own-openclaw-agent-tutorial

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Build Your Own OpenClaw Agent Tutorial

构建你自己的OpenClaw Agent教程

Skill by ara.so — Hermes Skills collection.
A comprehensive tutorial for building AI agents progressively, from a basic chat loop to a production-ready autonomous agent system. This project walks through 18 steps covering single-agent capabilities, event-driven architecture, multi-agent collaboration, and production features like memory and concurrency control.
ara.so提供的Skill — Hermes Skills合集。
一份循序渐进构建AI Agent的全面教程,从基础聊天循环到可投入生产的自主智能体系统。本项目包含18个步骤,涵盖单智能体能力、事件驱动架构、多智能体协作,以及记忆、并发控制等生产级特性。

What This Tutorial Teaches

本教程涵盖内容

The tutorial is organized into 4 phases:
  1. Phase 1 (Steps 0-6): Single agent with tools, skills, persistence, and web access
  2. Phase 2 (Steps 7-10): Event-driven architecture with multi-platform support
  3. Phase 3 (Steps 11-15): Autonomous agents with routing and collaboration
  4. Phase 4 (Steps 16-17): Production features like concurrency and long-term memory
教程分为4个阶段:
  1. 阶段1(步骤0-6):具备工具、技能、持久化和网络访问能力的单智能体
  2. 阶段2(步骤7-10):支持多平台的事件驱动架构
  3. 阶段3(步骤11-15):具备路由与协作能力的自主智能体
  4. 阶段4(步骤16-17):并发控制、长期记忆等生产级特性

Initial Setup

初始设置

Clone the Repository

克隆仓库

bash
git clone https://github.com/czl9707/build-your-own-openclaw.git
cd build-your-own-openclaw
bash
git clone https://github.com/czl9707/build-your-own-openclaw.git
cd build-your-own-openclaw

Configure API Keys

配置API密钥

bash
undefined
bash
undefined

Copy example config

复制示例配置

cp default_workspace/config.example.yaml default_workspace/config.user.yaml

Edit `default_workspace/config.user.yaml`:

```yaml
llm:
  model: "gpt-4"  # or anthropic/claude-3-5-sonnet-20241022, etc.
  api_key: "${OPENAI_API_KEY}"  # Use environment variable
  # See https://docs.litellm.ai/docs/providers for all providers
cp default_workspace/config.example.yaml default_workspace/config.user.yaml

编辑 `default_workspace/config.user.yaml`:

```yaml
llm:
  model: "gpt-4"  # 或 anthropic/claude-3-5-sonnet-20241022 等
  api_key: "${OPENAI_API_KEY}"  # 使用环境变量
  # 查看所有支持的提供商:https://docs.litellm.ai/docs/providers

Optional: Add additional services

可选:添加额外服务

web: search_api_key: "${SERPER_API_KEY}"
undefined
web: search_api_key: "${SERPER_API_KEY}"
undefined

Install Dependencies (for each step)

安装依赖(每个步骤单独安装)

bash
cd 00-chat-loop  # or any step directory
pip install -r requirements.txt
bash
cd 00-chat-loop  # 或任意步骤目录
pip install -r requirements.txt

Phase 1: Building a Capable Single Agent

阶段1:构建具备核心能力的单智能体

Step 0: Basic Chat Loop

步骤0:基础聊天循环

The foundation - a simple conversation loop with an LLM.
python
undefined
基础模块——一个与LLM交互的简单对话循环。
python
undefined

00-chat-loop/main.py

00-chat-loop/main.py

from litellm import completion
def chat_loop(): messages = []
while True:
    user_input = input("You: ")
    if user_input.lower() in ['/exit', '/quit']:
        break
        
    messages.append({"role": "user", "content": user_input})
    
    response = completion(
        model="gpt-4",
        messages=messages,
        api_key="${OPENAI_API_KEY}"
    )
    
    assistant_message = response.choices[0].message.content
    messages.append({"role": "assistant", "content": assistant_message})
    
    print(f"Assistant: {assistant_message}")
if name == "main": chat_loop()

Run it:
```bash
cd 00-chat-loop
python main.py
from litellm import completion
def chat_loop(): messages = []
while True:
    user_input = input("You: ")
    if user_input.lower() in ['/exit', '/quit']:
        break
        
    messages.append({"role": "user", "content": user_input})
    
    response = completion(
        model="gpt-4",
        messages=messages,
        api_key="${OPENAI_API_KEY}"
    )
    
    assistant_message = response.choices[0].message.content
    messages.append({"role": "assistant", "content": assistant_message})
    
    print(f"Assistant: {assistant_message}")
if name == "main": chat_loop()

运行方式:
```bash
cd 00-chat-loop
python main.py

Step 1: Adding Tools

步骤1:添加工具

Give your agent function-calling capabilities.
python
undefined
为智能体赋予函数调用能力。
python
undefined

01-tools/tools.py

01-tools/tools.py

def get_current_weather(location: str) -> dict: """Get the current weather for a location.""" # Tool implementation return {"location": location, "temperature": 72, "condition": "sunny"}
def get_current_weather(location: str) -> dict: """Get the current weather for a location.""" # 工具实现 return {"location": location, "temperature": 72, "condition": "sunny"}

Tool schema for LLM

供LLM使用的工具描述schema

weather_tool = { "type": "function", "function": { "name": "get_current_weather", "description": "Get the current weather in a given location", "parameters": { "type": "object", "properties": { "location": { "type": "string", "description": "City name, e.g. San Francisco" } }, "required": ["location"] } } }

Using tools in the chat loop:

```python
import json
from litellm import completion

response = completion(
    model="gpt-4",
    messages=messages,
    tools=[weather_tool],
    tool_choice="auto"
)
weather_tool = { "type": "function", "function": { "name": "get_current_weather", "description": "Get the current weather in a given location", "parameters": { "type": "object", "properties": { "location": { "type": "string", "description": "City name, e.g. San Francisco" } }, "required": ["location"] } } }

在聊天循环中使用工具:

```python
import json
from litellm import completion

response = completion(
    model="gpt-4",
    messages=messages,
    tools=[weather_tool],
    tool_choice="auto"
)

Handle tool calls

处理工具调用

if response.choices[0].message.tool_calls: for tool_call in response.choices[0].message.tool_calls: function_name = tool_call.function.name arguments = json.loads(tool_call.function.arguments)
    # Execute tool
    result = get_current_weather(**arguments)
    
    # Add tool result to messages
    messages.append({
        "role": "tool",
        "tool_call_id": tool_call.id,
        "content": json.dumps(result)
    })
undefined
if response.choices[0].message.tool_calls: for tool_call in response.choices[0].message.tool_calls: function_name = tool_call.function.name arguments = json.loads(tool_call.function.arguments)
    # 执行工具
    result = get_current_weather(**arguments)
    
    # 将工具结果添加到对话历史
    messages.append({
        "role": "tool",
        "tool_call_id": tool_call.id,
        "content": json.dumps(result)
    })
undefined

Step 2: Skills with SKILL.md

步骤2:基于SKILL.md扩展技能

Extend agent capabilities through markdown skill files.
markdown
<!-- skills/web_search.md -->
通过Markdown技能文件扩展智能体能力。
markdown
<!-- skills/web_search.md -->

Web Search Skill

Web搜索技能

You can search the internet using the
search_web
tool.
你可以使用
search_web
工具进行互联网搜索。

When to Use

使用场景

  • User asks for current information
  • Need to verify facts
  • Looking for recent news
  • 用户询问时效性信息
  • 需要验证事实
  • 查询近期新闻

Example

示例

User: "What's the latest news on AI?" You: Let me search for that. [calls search_web("latest AI news")]

Loading skills:

```python
用户: "AI领域的最新消息是什么?" 助手: 让我搜索一下。[调用search_web("latest AI news")]

加载技能:

```python

02-skills/skill_loader.py

02-skills/skill_loader.py

import os
def load_skills(skills_dir="skills"): """Load all .md files from skills directory.""" skills_content = []
for filename in os.listdir(skills_dir):
    if filename.endswith(".md"):
        with open(os.path.join(skills_dir, filename), 'r') as f:
            skills_content.append(f.read())

return "\n\n".join(skills_content)
import os
def load_skills(skills_dir="skills"): """加载技能目录下所有.md文件。""" skills_content = []
for filename in os.listdir(skills_dir):
    if filename.endswith(".md"):
        with open(os.path.join(skills_dir, filename), 'r') as f:
            skills_content.append(f.read())

return "\n\n".join(skills_content)

Add to system prompt

添加到系统提示词

system_prompt = f"""You are a helpful assistant.
system_prompt = f"""你是一个乐于助人的助手。

Your Skills

你的技能

{load_skills()} """
undefined
{load_skills()} """
undefined

Step 3: Conversation Persistence

步骤3:对话持久化

Save conversations to resume later.
python
undefined
保存对话历史以便后续恢复。
python
undefined

03-persistence/session_manager.py

03-persistence/session_manager.py

import json from datetime import datetime from pathlib import Path
class SessionManager: def init(self, sessions_dir="sessions"): self.sessions_dir = Path(sessions_dir) self.sessions_dir.mkdir(exist_ok=True)
def save_session(self, session_id: str, messages: list):
    """Save conversation history."""
    session_file = self.sessions_dir / f"{session_id}.json"
    data = {
        "session_id": session_id,
        "updated_at": datetime.now().isoformat(),
        "messages": messages
    }
    with open(session_file, 'w') as f:
        json.dump(data, f, indent=2)

def load_session(self, session_id: str) -> list:
    """Load conversation history."""
    session_file = self.sessions_dir / f"{session_id}.json"
    if not session_file.exists():
        return []
    
    with open(session_file, 'r') as f:
        data = json.load(f)
        return data.get("messages", [])

def list_sessions(self) -> list:
    """List all available sessions."""
    return [f.stem for f in self.sessions_dir.glob("*.json")]

Usage:

```python
manager = SessionManager()
import json from datetime import datetime from pathlib import Path
class SessionManager: def init(self, sessions_dir="sessions"): self.sessions_dir = Path(sessions_dir) self.sessions_dir.mkdir(exist_ok=True)
def save_session(self, session_id: str, messages: list):
    """保存对话历史。"""
    session_file = self.sessions_dir / f"{session_id}.json"
    data = {
        "session_id": session_id,
        "updated_at": datetime.now().isoformat(),
        "messages": messages
    }
    with open(session_file, 'w') as f:
        json.dump(data, f, indent=2)

def load_session(self, session_id: str) -> list:
    """加载对话历史。"""
    session_file = self.sessions_dir / f"{session_id}.json"
    if not session_file.exists():
        return []
    
    with open(session_file, 'r') as f:
        data = json.load(f)
        return data.get("messages", [])

def list_sessions(self) -> list:
    """列出所有可用会话。"""
    return [f.stem for f in self.sessions_dir.glob("*.json")]

使用示例:

```python
manager = SessionManager()

Load or create session

加载或创建会话

session_id = "my-conversation" messages = manager.load_session(session_id)
session_id = "my-conversation" messages = manager.load_session(session_id)

After each exchange

每次对话交互后保存

manager.save_session(session_id, messages)
undefined
manager.save_session(session_id, messages)
undefined

Step 4: Slash Commands

步骤4:斜杠命令

Direct user control over agent behavior.
python
undefined
让用户可以直接控制智能体行为。
python
undefined

04-slash-commands/commands.py

04-slash-commands/commands.py

class CommandHandler: def init(self, session_manager): self.session_manager = session_manager self.commands = { '/new': self.new_session, '/load': self.load_session, '/list': self.list_sessions, '/save': self.save_session, '/clear': self.clear_session, '/help': self.show_help }
def handle(self, user_input: str, current_session: str, messages: list):
    """Handle slash commands."""
    parts = user_input.split()
    command = parts[0]
    args = parts[1:] if len(parts) > 1 else []
    
    if command in self.commands:
        return self.commands[command](args, current_session, messages)
    
    return None  # Not a command

def new_session(self, args, current_session, messages):
    new_id = args[0] if args else f"session_{int(time.time())}"
    return {"action": "new_session", "session_id": new_id}

def load_session(self, args, current_session, messages):
    if not args:
        print("Usage: /load <session_id>")
        return {"action": "none"}
    
    loaded = self.session_manager.load_session(args[0])
    return {"action": "load_session", "session_id": args[0], "messages": loaded}
undefined
class CommandHandler: def init(self, session_manager): self.session_manager = session_manager self.commands = { '/new': self.new_session, '/load': self.load_session, '/list': self.list_sessions, '/save': self.save_session, '/clear': self.clear_session, '/help': self.show_help }
def handle(self, user_input: str, current_session: str, messages: list):
    """处理斜杠命令。"""
    parts = user_input.split()
    command = parts[0]
    args = parts[1:] if len(parts) > 1 else []
    
    if command in self.commands:
        return self.commands[command](args, current_session, messages)
    
    return None  # 不是命令

def new_session(self, args, current_session, messages):
    new_id = args[0] if args else f"session_{int(time.time())}"
    return {"action": "new_session", "session_id": new_id}

def load_session(self, args, current_session, messages):
    if not args:
        print("用法: /load <session_id>")
        return {"action": "none"}
    
    loaded = self.session_manager.load_session(args[0])
    return {"action": "load_session", "session_id": args[0], "messages": loaded}
undefined

Step 5: Context Compaction

步骤5:上下文压缩

Manage token limits by summarizing old messages.
python
undefined
通过总结旧消息来管理token限制。
python
undefined

05-compaction/compactor.py

05-compaction/compactor.py

from litellm import completion
class MessageCompactor: def init(self, max_messages=20): self.max_messages = max_messages
def compact_if_needed(self, messages: list) -> list:
    """Compact messages if they exceed threshold."""
    if len(messages) <= self.max_messages:
        return messages
    
    # Keep system message and recent messages
    system_msgs = [m for m in messages if m["role"] == "system"]
    recent_msgs = messages[-(self.max_messages - 2):]
    
    # Summarize older messages
    old_msgs = messages[len(system_msgs):-len(recent_msgs)]
    summary = self._summarize_messages(old_msgs)
    
    return system_msgs + [
        {"role": "system", "content": f"Previous conversation summary:\n{summary}"}
    ] + recent_msgs

def _summarize_messages(self, messages: list) -> str:
    """Generate summary of message history."""
    conversation = "\n".join([
        f"{m['role']}: {m['content']}" for m in messages
    ])
    
    response = completion(
        model="gpt-4",
        messages=[{
            "role": "user",
            "content": f"Summarize this conversation concisely:\n\n{conversation}"
        }]
    )
    
    return response.choices[0].message.content
undefined
from litellm import completion
class MessageCompactor: def init(self, max_messages=20): self.max_messages = max_messages
def compact_if_needed(self, messages: list) -> list:
    """如果消息数量超过阈值,压缩上下文。"""
    if len(messages) <= self.max_messages:
        return messages
    
    # 保留系统消息和近期消息
    system_msgs = [m for m in messages if m["role"] == "system"]
    recent_msgs = messages[-(self.max_messages - 2):]
    
    # 总结旧消息
    old_msgs = messages[len(system_msgs):-len(recent_msgs)]
    summary = self._summarize_messages(old_msgs)
    
    return system_msgs + [
        {"role": "system", "content": f"之前对话总结:\n{summary}"}
    ] + recent_msgs

def _summarize_messages(self, messages: list) -> str:
    """生成对话历史摘要。"""
    conversation = "\n".join([
        f"{m['role']}: {m['content']}" for m in messages
    ])
    
    response = completion(
        model="gpt-4",
        messages=[{
            "role": "user",
            "content": f"请简洁总结以下对话:\n\n{conversation}"
        }]
    )
    
    return response.choices[0].message.content
undefined

Step 6: Web Tools

步骤6:网络工具

Give your agent internet access.
python
undefined
为智能体提供互联网访问能力。
python
undefined

06-web-tools/web_tools.py

06-web-tools/web_tools.py

import requests import os
def search_web(query: str, num_results: int = 5) -> list: """Search the web using Serper API.""" api_key = os.getenv("SERPER_API_KEY")
response = requests.post(
    "https://google.serper.dev/search",
    headers={"X-API-KEY": api_key},
    json={"q": query, "num": num_results}
)

results = response.json()
return [
    {
        "title": r.get("title"),
        "link": r.get("link"),
        "snippet": r.get("snippet")
    }
    for r in results.get("organic", [])
]
def fetch_webpage(url: str) -> str: """Fetch and extract text from a webpage.""" from bs4 import BeautifulSoup
response = requests.get(url, timeout=10)
soup = BeautifulSoup(response.content, 'html.parser')

# Remove script and style elements
for script in soup(["script", "style"]):
    script.decompose()

return soup.get_text(separator="\n", strip=True)
undefined
import requests import os
def search_web(query: str, num_results: int = 5) -> list: """使用Serper API进行网络搜索。""" api_key = os.getenv("SERPER_API_KEY")
response = requests.post(
    "https://google.serper.dev/search",
    headers={"X-API-KEY": api_key},
    json={"q": query, "num": num_results}
)

results = response.json()
return [
    {
        "title": r.get("title"),
        "link": r.get("link"),
        "snippet": r.get("snippet")
    }
    for r in results.get("organic", [])
]
def fetch_webpage(url: str) -> str: """获取并提取网页文本内容。""" from bs4 import BeautifulSoup
response = requests.get(url, timeout=10)
soup = BeautifulSoup(response.content, 'html.parser')

# 移除脚本和样式元素
for script in soup(["script", "style"]):
    script.decompose()

return soup.get_text(separator="\n", strip=True)
undefined

Phase 2: Event-Driven Architecture

阶段2:事件驱动架构

Step 7: Event-Driven Refactor

步骤7:事件驱动重构

Decouple components with an event bus.
python
undefined
通过事件总线解耦组件。
python
undefined

07-event-driven/event_bus.py

07-event-driven/event_bus.py

from typing import Callable, Dict, List from dataclasses import dataclass from enum import Enum
class EventType(Enum): MESSAGE_RECEIVED = "message_received" MESSAGE_SENT = "message_sent" TOOL_CALLED = "tool_called" SESSION_CREATED = "session_created"
@dataclass class Event: type: EventType data: dict source: str
class EventBus: def init(self): self.listeners: Dict[EventType, List[Callable]] = {}
def subscribe(self, event_type: EventType, handler: Callable):
    """Subscribe to an event type."""
    if event_type not in self.listeners:
        self.listeners[event_type] = []
    self.listeners[event_type].append(handler)

def publish(self, event: Event):
    """Publish an event to all subscribers."""
    if event.type in self.listeners:
        for handler in self.listeners[event.type]:
            handler(event)
from typing import Callable, Dict, List from dataclasses import dataclass from enum import Enum
class EventType(Enum): MESSAGE_RECEIVED = "message_received" MESSAGE_SENT = "message_sent" TOOL_CALLED = "tool_called" SESSION_CREATED = "session_created"
@dataclass class Event: type: EventType data: dict source: str
class EventBus: def init(self): self.listeners: Dict[EventType, List[Callable]] = {}
def subscribe(self, event_type: EventType, handler: Callable):
    """订阅事件类型。"""
    if event_type not in self.listeners:
        self.listeners[event_type] = []
    self.listeners[event_type].append(handler)

def publish(self, event: Event):
    """向所有订阅者发布事件。"""
    if event.type in self.listeners:
        for handler in self.listeners[event.type]:
            handler(event)

Usage

使用示例

bus = EventBus()
def on_message(event: Event): print(f"Received: {event.data['content']}")
bus.subscribe(EventType.MESSAGE_RECEIVED, on_message) bus.publish(Event( type=EventType.MESSAGE_RECEIVED, data={"content": "Hello!"}, source="user" ))
undefined
bus = EventBus()
def on_message(event: Event): print(f"收到消息: {event.data['content']}")
bus.subscribe(EventType.MESSAGE_RECEIVED, on_message) bus.publish(Event( type=EventType.MESSAGE_RECEIVED, data={"content": "你好!"}, source="user" ))
undefined

Step 9: Multi-Channel Support

步骤9:多渠道支持

Support Discord, Slack, Telegram, etc.
python
undefined
支持Discord、Slack、Telegram等平台。
python
undefined

09-channels/discord_channel.py

09-channels/discord_channel.py

import discord from event_bus import EventBus, Event, EventType
class DiscordChannel: def init(self, event_bus: EventBus, token: str): self.event_bus = event_bus self.client = discord.Client(intents=discord.Intents.default()) self.token = token
    @self.client.event
    async def on_message(message):
        if message.author == self.client.user:
            return
        
        # Publish to event bus
        self.event_bus.publish(Event(
            type=EventType.MESSAGE_RECEIVED,
            data={
                "content": message.content,
                "channel_id": str(message.channel.id),
                "user_id": str(message.author.id)
            },
            source="discord"
        ))
    
    # Subscribe to outgoing messages
    self.event_bus.subscribe(EventType.MESSAGE_SENT, self.send_message)

async def send_message(self, event: Event):
    """Send message to Discord."""
    if event.data.get("channel") != "discord":
        return
    
    channel = self.client.get_channel(int(event.data["channel_id"]))
    await channel.send(event.data["content"])

def start(self):
    self.client.run(self.token)
undefined
import discord from event_bus import EventBus, Event, EventType
class DiscordChannel: def init(self, event_bus: EventBus, token: str): self.event_bus = event_bus self.client = discord.Client(intents=discord.Intents.default()) self.token = token
    @self.client.event
    async def on_message(message):
        if message.author == self.client.user:
            return
        
        # 发布到事件总线
        self.event_bus.publish(Event(
            type=EventType.MESSAGE_RECEIVED,
            data={
                "content": message.content,
                "channel_id": str(message.channel.id),
                "user_id": str(message.author.id)
            },
            source="discord"
        ))
    
    # 订阅 outgoing 消息事件
    self.event_bus.subscribe(EventType.MESSAGE_SENT, self.send_message)

async def send_message(self, event: Event):
    """发送消息到Discord。"""
    if event.data.get("channel") != "discord":
        return
    
    channel = self.client.get_channel(int(event.data["channel_id"]))
    await channel.send(event.data["content"])

def start(self):
    self.client.run(self.token)
undefined

Phase 3: Multi-Agent Systems

阶段3:多智能体系统

Step 11: Agent Routing

步骤11:智能体路由

Route requests to specialized agents.
python
undefined
将请求路由到专业智能体。
python
undefined

11-multi-agent-routing/router.py

11-multi-agent-routing/router.py

from litellm import completion
class AgentRouter: def init(self, agents: dict): self.agents = agents # {"code": CodeAgent(), "research": ResearchAgent()}
def route(self, user_message: str) -> str:
    """Determine which agent should handle the request."""
    agent_descriptions = "\n".join([
        f"- {name}: {agent.description}"
        for name, agent in self.agents.items()
    ])
    
    response = completion(
        model="gpt-4",
        messages=[{
            "role": "user",
            "content": f"""Which agent should handle this request?
Available agents: {agent_descriptions}
User request: {user_message}
Respond with just the agent name.""" }] )
    agent_name = response.choices[0].message.content.strip()
    return agent_name if agent_name in self.agents else "default"
undefined
from litellm import completion
class AgentRouter: def init(self, agents: dict): self.agents = agents # {"code": CodeAgent(), "research": ResearchAgent()}
def route(self, user_message: str) -> str:
    """确定哪个智能体应该处理请求。"""
    agent_descriptions = "\n".join([
        f"- {name}: {agent.description}"
        for name, agent in self.agents.items()
    ])
    
    response = completion(
        model="gpt-4",
        messages=[{
            "role": "user",
            "content": f"""哪个智能体应该处理这个请求?
可用智能体: {agent_descriptions}
用户请求: {user_message}
仅回复智能体名称。""" }] )
    agent_name = response.choices[0].message.content.strip()
    return agent_name if agent_name in self.agents else "default"
undefined

Step 12: Scheduled Tasks (Cron Heartbeat)

步骤12:定时任务(Cron心跳)

Run agents on a schedule.
python
undefined
按计划运行智能体任务。
python
undefined

12-cron-heartbeat/scheduler.py

12-cron-heartbeat/scheduler.py

from apscheduler.schedulers.background import BackgroundScheduler from datetime import datetime
class AgentScheduler: def init(self, event_bus): self.scheduler = BackgroundScheduler() self.event_bus = event_bus
def schedule_task(self, agent_id: str, cron: str, task: callable):
    """Schedule a recurring task."""
    self.scheduler.add_job(
        func=task,
        trigger='cron',
        **self._parse_cron(cron),
        id=f"{agent_id}_{datetime.now().timestamp()}"
    )

def _parse_cron(self, cron: str) -> dict:
    """Parse cron expression to APScheduler format."""
    # "0 9 * * *" -> {"hour": 9, "minute": 0}
    parts = cron.split()
    return {
        "minute": parts[0],
        "hour": parts[1],
        "day": parts[2],
        "month": parts[3],
        "day_of_week": parts[4]
    }

def start(self):
    self.scheduler.start()
from apscheduler.schedulers.background import BackgroundScheduler from datetime import datetime
class AgentScheduler: def init(self, event_bus): self.scheduler = BackgroundScheduler() self.event_bus = event_bus
def schedule_task(self, agent_id: str, cron: str, task: callable):
    """调度周期性任务。"""
    self.scheduler.add_job(
        func=task,
        trigger='cron',
        **self._parse_cron(cron),
        id=f"{agent_id}_{datetime.now().timestamp()}"
    )

def _parse_cron(self, cron: str) -> dict:
    """将Cron表达式解析为APScheduler格式。"""
    # "0 9 * * *" -> {"hour": 9, "minute": 0}
    parts = cron.split()
    return {
        "minute": parts[0],
        "hour": parts[1],
        "day": parts[2],
        "month": parts[3],
        "day_of_week": parts[4]
    }

def start(self):
    self.scheduler.start()

Usage

使用示例

scheduler = AgentScheduler(event_bus) scheduler.schedule_task( "morning_brief", "0 9 * * *", # 9 AM daily lambda: send_daily_briefing() ) scheduler.start()
undefined
scheduler = AgentScheduler(event_bus) scheduler.schedule_task( "morning_brief", "0 9 * * *", # 每天上午9点 lambda: send_daily_briefing() ) scheduler.start()
undefined

Step 15: Agent Dispatch

步骤15:智能体调度

Agents collaborate to complete tasks.
python
undefined
智能体协作完成任务。
python
undefined

15-agent-dispatch/dispatcher.py

15-agent-dispatch/dispatcher.py

class AgentDispatcher: def init(self, agents: dict, event_bus): self.agents = agents self.event_bus = event_bus
async def dispatch_task(self, task: str, requesting_agent: str):
    """Dispatch a task to another agent."""
    # Determine best agent for subtask
    target_agent = self._select_agent(task)
    
    # Execute subtask
    result = await self.agents[target_agent].execute(task)
    
    # Return result to requesting agent
    self.event_bus.publish(Event(
        type=EventType.TASK_COMPLETED,
        data={
            "task": task,
            "result": result,
            "requester": requesting_agent
        },
        source=target_agent
    ))
    
    return result
undefined
class AgentDispatcher: def init(self, agents: dict, event_bus): self.agents = agents self.event_bus = event_bus
async def dispatch_task(self, task: str, requesting_agent: str):
    """将任务调度给其他智能体。"""
    # 确定处理子任务的最佳智能体
    target_agent = self._select_agent(task)
    
    # 执行子任务
    result = await self.agents[target_agent].execute(task)
    
    # 将结果返回给请求智能体
    self.event_bus.publish(Event(
        type=EventType.TASK_COMPLETED,
        data={
            "task": task,
            "result": result,
            "requester": requesting_agent
        },
        source=target_agent
    ))
    
    return result
undefined

Phase 4: Production Features

阶段4:生产级特性

Step 16: Concurrency Control

步骤16:并发控制

Prevent race conditions with multiple agents.
python
undefined
防止多智能体场景下的竞态条件。
python
undefined

16-concurrency-control/lock_manager.py

16-concurrency-control/lock_manager.py

import asyncio from contextlib import asynccontextmanager
class LockManager: def init(self): self.locks = {}
@asynccontextmanager
async def acquire(self, resource_id: str):
    """Acquire lock for a resource."""
    if resource_id not in self.locks:
        self.locks[resource_id] = asyncio.Lock()
    
    async with self.locks[resource_id]:
        yield
import asyncio from contextlib import asynccontextmanager
class LockManager: def init(self): self.locks = {}
@asynccontextmanager
async def acquire(self, resource_id: str):
    """获取资源锁。"""
    if resource_id not in self.locks:
        self.locks[resource_id] = asyncio.Lock()
    
    async with self.locks[resource_id]:
        yield

Usage

使用示例

lock_manager = LockManager()
async def process_session(session_id: str): async with lock_manager.acquire(f"session:{session_id}"): # Only one agent can access this session at a time messages = load_session(session_id) # ... process ... save_session(session_id, messages)
undefined
lock_manager = LockManager()
async def process_session(session_id: str): async with lock_manager.acquire(f"session:{session_id}"): # 同一时间只有一个智能体可以访问该会话 messages = load_session(session_id) # ... 处理逻辑 ... save_session(session_id, messages)
undefined

Step 17: Long-Term Memory

步骤17:长期记忆

Store and retrieve relevant memories.
python
undefined
存储和检索相关记忆。
python
undefined

17-memory/memory_store.py

17-memory/memory_store.py

from chromadb import Client from chromadb.config import Settings
class MemoryStore: def init(self, collection_name="agent_memory"): self.client = Client(Settings(persist_directory="./memory_db")) self.collection = self.client.get_or_create_collection(collection_name)
def store_memory(self, content: str, metadata: dict):
    """Store a memory with embeddings."""
    self.collection.add(
        documents=[content],
        metadatas=[metadata],
        ids=[f"mem_{metadata.get('timestamp', 0)}"]
    )

def recall(self, query: str, n_results: int = 5) -> list:
    """Retrieve relevant memories."""
    results = self.collection.query(
        query_texts=[query],
        n_results=n_results
    )
    
    return [
        {
            "content": doc,
            "metadata": meta
        }
        for doc, meta in zip(
            results['documents'][0],
            results['metadatas'][0]
        )
    ]
from chromadb import Client from chromadb.config import Settings
class MemoryStore: def init(self, collection_name="agent_memory"): self.client = Client(Settings(persist_directory="./memory_db")) self.collection = self.client.get_or_create_collection(collection_name)
def store_memory(self, content: str, metadata: dict):
    """存储带嵌入向量的记忆。"""
    self.collection.add(
        documents=[content],
        metadatas=[metadata],
        ids=[f"mem_{metadata.get('timestamp', 0)}"]
    )

def recall(self, query: str, n_results: int = 5) -> list:
    """检索相关记忆。"""
    results = self.collection.query(
        query_texts=[query],
        n_results=n_results
    )
    
    return [
        {
            "content": doc,
            "metadata": meta
        }
        for doc, meta in zip(
            results['documents'][0],
            results['metadatas'][0]
        )
    ]

Usage

使用示例

memory = MemoryStore()
memory = MemoryStore()

Store memory

存储记忆

memory.store_memory( "User prefers Python over JavaScript", {"user_id": "user123", "timestamp": 1234567890} )
memory.store_memory( "用户偏好Python而非JavaScript", {"user_id": "user123", "timestamp": 1234567890} )

Recall relevant memories

检索相关记忆

relevant = memory.recall("What languages does the user like?")
undefined
relevant = memory.recall("用户喜欢什么编程语言?")
undefined

Common Patterns

通用模式

Full Agent Implementation

完整智能体实现

python
undefined
python
undefined

Complete agent with all features

集成所有特性的完整智能体

class Agent: def init(self, config_path="config.user.yaml"): self.config = self.load_config(config_path) self.event_bus = EventBus() self.session_manager = SessionManager() self.memory = MemoryStore() self.tools = self.load_tools() self.skills = self.load_skills()
async def process_message(self, message: str, session_id: str):
    # Load session with lock
    async with lock_manager.acquire(f"session:{session_id}"):
        messages = self.session_manager.load_session(session_id)
        
        # Recall relevant memories
        memories = self.memory.recall(message)
        context = self.build_context(memories)
        
        # Add user message
        messages.append({"role": "user", "content": message})
        
        # Compact if needed
        messages = self.compactor.compact_if_needed(messages)
        
        # Get response with tools
        response = await completion(
            model=self.config['llm']['model'],
            messages=[{"role": "system", "content": context}] + messages,
            tools=self.tools
        )
        
        # Handle tool calls
        while response.choices[0].message.tool_calls:
            # Execute tools and continue
            pass
        
        # Store memory of interaction
        self.memory.store_memory(
            f"User: {message}\nAssistant: {response_text}",
            {"session_id": session_id, "timestamp": time.time()}
        )
        
        # Save session
        self.session_manager.save_session(session_id, messages)
        
        return response_text
undefined
class Agent: def init(self, config_path="config.user.yaml"): self.config = self.load_config(config_path) self.event_bus = EventBus() self.session_manager = SessionManager() self.memory = MemoryStore() self.tools = self.load_tools() self.skills = self.load_skills()
async def process_message(self, message: str, session_id: str):
    # 加锁加载会话
    async with lock_manager.acquire(f"session:{session_id}"):
        messages = self.session_manager.load_session(session_id)
        
        # 检索相关记忆
        memories = self.memory.recall(message)
        context = self.build_context(memories)
        
        # 添加用户消息
        messages.append({"role": "user", "content": message})
        
        # 必要时压缩上下文
        messages = self.compactor.compact_if_needed(messages)
        
        # 调用LLM并使用工具
        response = await completion(
            model=self.config['llm']['model'],
            messages=[{"role": "system", "content": context}] + messages,
            tools=self.tools
        )
        
        # 处理工具调用
        while response.choices[0].message.tool_calls:
            # 执行工具并继续对话
            pass
        
        # 存储本次交互的记忆
        self.memory.store_memory(
            f"用户: {message}\n助手: {response_text}",
            {"session_id": session_id, "timestamp": time.time()}
        )
        
        # 保存会话
        self.session_manager.save_session(session_id, messages)
        
        return response_text
undefined

Troubleshooting

故障排查

API Key Issues

API密钥问题

bash
undefined
bash
undefined

Check environment variables

检查环境变量

echo $OPENAI_API_KEY
echo $OPENAI_API_KEY

Verify config.user.yaml uses correct env var syntax

验证config.user.yaml使用正确的环境变量语法

✓ Correct: api_key: "${OPENAI_API_KEY}"

✓ 正确格式: api_key: "${OPENAI_API_KEY}"

✗ Wrong: api_key: "$OPENAI_API_KEY" or api_key: "sk-..."

✗ 错误格式: api_key: "$OPENAI_API_KEY" 或 api_key: "sk-..."

undefined
undefined

LiteLLM Provider Errors

LiteLLM提供商错误

python
undefined
python
undefined

Test your LLM config

测试你的LLM配置

from litellm import completion
response = completion( model="gpt-4", # or your model messages=[{"role": "user", "content": "test"}], api_key="${OPENAI_API_KEY}" ) print(response.choices[0].message.content)
undefined
from litellm import completion
response = completion( model="gpt-4", # 或你的模型 messages=[{"role": "user", "content": "test"}], api_key="${OPENAI_API_KEY}" ) print(response.choices[0].message.content)
undefined

Session Persistence Issues

会话持久化问题

python
undefined
python
undefined

Verify sessions directory exists and is writable

验证会话目录存在且可写

import os sessions_dir = "sessions" os.makedirs(sessions_dir, exist_ok=True) print(f"Sessions dir: {os.path.abspath(sessions_dir)}")
undefined
import os sessions_dir = "sessions" os.makedirs(sessions_dir, exist_ok=True) print(f"会话目录: {os.path.abspath(sessions_dir)}")
undefined

Memory/ChromaDB Issues

记忆/ChromaDB问题

bash
undefined
bash
undefined

Install ChromaDB dependencies

安装ChromaDB依赖

pip install chromadb
pip install chromadb

Clear memory database if corrupted

如果数据库损坏,清空记忆数据库

rm -rf ./memory_db
undefined
rm -rf ./memory_db
undefined

Next Steps

后续步骤

  1. Start with Step 0 and progress sequentially
  2. Each step builds on previous ones
  3. Reference the example project (pickle-bot) for a complete implementation
  4. Customize each step for your specific use case
  5. Mix and match features based on your needs
  1. 步骤0开始,循序渐进学习
  2. 每个步骤都基于前序步骤构建
  3. 参考示例项目(pickle-bot)获取完整实现
  4. 根据你的特定需求定制每个步骤
  5. 根据需要混合搭配不同特性

Resources

参考资源