graphiti
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseGraphiti & Zep Knowledge Graph Skill
Graphiti & Zep 知识图谱技能
Expert assistance for building AI agents with temporal knowledge graphs using Graphiti and Zep's context engineering platform for dynamic, personalized agent memory and context assembly.
为使用Graphiti和Zep上下文工程平台构建具备动态、个性化Agent记忆与上下文组装能力的AI Agent提供专业协助。
When to Use This Skill
何时使用该技能
This skill should be used when:
- Building AI agents that need persistent memory across conversations
- Implementing knowledge graphs for agent applications
- Creating personalized user experiences with context awareness
- Managing evolving relationships and historical context
- Integrating enterprise data with agent systems
- Building Graph RAG (Retrieval-Augmented Generation) applications
- Tracking user preferences, traits, and interaction history
- Querying temporal data with point-in-time accuracy
- Reducing hallucinations through structured context
- Implementing multi-user agent systems with individual memory
- Working with Zep's context engineering platform
- Integrating with LangGraph, CrewAI, Autogen, or other agent frameworks
在以下场景中应使用本技能:
- 构建需要跨对话持久化记忆的AI Agent
- 为Agent应用实现知识图谱
- 创建具备上下文感知能力的个性化用户体验
- 管理不断演变的关系与历史上下文
- 将企业数据与Agent系统集成
- 构建Graph RAG(检索增强生成)应用
- 跟踪用户偏好、特征与交互历史
- 以时间点精度查询时序数据
- 通过结构化上下文减少幻觉
- 实现具备独立记忆的多用户Agent系统
- 使用Zep的上下文工程平台
- 与LangGraph、CrewAI、Autogen或其他Agent框架集成
Overview
概述
Graphiti - Temporal Knowledge Graph Framework
Graphiti - 时序知识图谱框架
Graphiti is an open-source framework for constructing and querying time-aware knowledge graphs optimized for AI agents in dynamic environments.
Key Characteristics:
- Temporal: Bi-temporal data model tracking both event occurrence and ingestion times
- Incremental: Real-time data integration without batch reprocessing
- Hybrid Search: Combines semantic embeddings, BM25 keyword matching, and graph traversal
- Low Latency: Sub-second query performance without LLM-dependent summarization
- Flexible: Customizable entity definitions via Pydantic models
- Scalable: Enterprise-scale parallel processing
Graphiti是一个开源框架,专为动态环境中的AI Agent构建和查询时间感知型知识图谱而优化。
核心特性:
- 时序性:双时态数据模型,同时跟踪事件发生时间和数据摄入时间
- 增量式:无需批量重处理的实时数据集成
- 混合搜索:结合语义嵌入、BM25关键词匹配和图遍历
- 低延迟:无需依赖LLM总结的亚秒级查询性能
- 灵活性:通过Pydantic模型自定义实体定义
- 可扩展性:企业级并行处理能力
Zep - Context Engineering Platform
Zep - 上下文工程平台
Zep is a production-ready platform built on Graphiti that provides:
- Agent memory management
- Graph RAG optimization
- User and conversation management
- Developer dashboards and monitoring
- Enterprise features (RBAC, BYOK, BYOLLM)
- Integration with major agent frameworks
Core Value Proposition:
"Assembles personalized context—including user preferences, traits, and business data—for reliable agent applications"
Zep是基于Graphiti构建的生产级平台,提供:
- Agent记忆管理
- Graph RAG优化
- 用户与对话管理
- 开发者仪表盘与监控
- 企业级功能(RBAC、BYOK、BYOLLM)
- 与主流Agent框架的集成
核心价值主张:
"为可靠的Agent应用组装个性化上下文——包括用户偏好、特征和业务数据"
Core Concepts
核心概念
1. Temporal Knowledge Graphs
1. 时序知识图谱
Graphiti creates time-stamped relationship networks that:
- Maintain historical context
- Track changes over time
- Support point-in-time queries
- Preserve relationship evolution
Bi-Temporal Model:
- Valid Time: When an event actually occurred
- Transaction Time: When the data was ingested into the system
Graphiti创建带时间戳的关系网络,能够:
- 维护历史上下文
- 跟踪随时间的变化
- 支持时间点查询
- 保留关系演变过程
双时态模型:
- 有效时间:事件实际发生的时间
- 事务时间:数据被摄入系统的时间
2. Entities and Relations
2. 实体与关系
Entities: Objects or concepts in your knowledge graph
- Users, products, concepts, locations, etc.
- Customizable via Pydantic models
- Can have attributes and properties
- Time-stamped for historical tracking
Relations: Connections between entities
- Typed relationships (e.g., "likes", "purchased", "mentioned")
- Directional (from entity A to entity B)
- Can have weights, properties, and metadata
- Temporal tracking of relationship changes
实体:知识图谱中的对象或概念
- 用户、产品、概念、地点等
- 通过Pydantic模型自定义
- 可包含属性和特性
- 带时间戳以支持历史跟踪
关系:实体之间的连接
- 类型化关系(如“喜欢”、“购买”、“提及”)
- 有向关系(从实体A到实体B)
- 可包含权重、属性和元数据
- 对关系变化进行时序跟踪
3. Facts and Summaries
3. 事实与摘要
Facts: Extracted information from conversations and data
- Automatically extracted from chat history
- Derived from structured/unstructured business data
- Form the basis of the knowledge graph
- Time-stamped and attributed to sources
Summaries: Condensed representations of interactions
- Generated as conversations unfold
- Capture key points and themes
- Enable efficient context retrieval
- Maintain temporal coherence
事实:从对话和数据中提取的信息
- 从聊天记录中自动提取
- 从结构化/非结构化业务数据中派生
- 构成知识图谱的基础
- 带时间戳并归属到来源
摘要:交互内容的浓缩表示
- 随对话展开自动生成
- 捕捉关键点和主题
- 实现高效的上下文检索
- 保持时序一致性
4. Hybrid Search
4. 混合搜索
Graphiti uses three complementary search methods:
Semantic Search:
- Vector embeddings for conceptual similarity
- Finds semantically related entities and facts
Keyword Search (BM25):
- Traditional text matching
- Precise term-based retrieval
Graph Traversal:
- Follow relationships between entities
- Discover connected information
- Navigate the knowledge network
Graphiti使用三种互补的搜索方法:
语义搜索:
- 使用向量嵌入实现概念相似度匹配
- 查找语义相关的实体和事实
关键词搜索(BM25):
- 传统文本匹配
- 精确的基于术语的检索
图遍历:
- 沿实体间的关系进行遍历
- 发现关联信息
- 导航知识网络
Installation
安装
Graphiti (Open Source)
Graphiti(开源)
bash
undefinedbash
undefinedInstall from PyPI
从PyPI安装
pip install graphiti-core
pip install graphiti-core
Or from source
或从源码安装
git clone https://github.com/getzep/graphiti.git
cd graphiti
pip install -e .
undefinedgit clone https://github.com/getzep/graphiti.git
cd graphiti
pip install -e .
undefinedZep (Platform)
Zep(平台)
bash
undefinedbash
undefinedPython SDK
Python SDK
pip install zep-cloud
pip install zep-cloud
TypeScript SDK
TypeScript SDK
npm install @getzep/zep-cloud
npm install @getzep/zep-cloud
Go SDK
Go SDK
go get github.com/getzep/zep-go
undefinedgo get github.com/getzep/zep-go
undefinedDatabase Requirements
数据库要求
Graphiti supports multiple graph database backends:
Neo4j (Recommended)
bash
undefinedGraphiti支持多种图数据库后端:
Neo4j(推荐)
bash
undefinedDocker
Docker
docker run -d
--name neo4j
-p 7474:7474 -p 7687:7687
-e NEO4J_AUTH=neo4j/password
neo4j:5.26
--name neo4j
-p 7474:7474 -p 7687:7687
-e NEO4J_AUTH=neo4j/password
neo4j:5.26
**FalkorDB**
```bash
docker run -d \
--name falkordb \
-p 6379:6379 \
falkordb/falkordb:1.1.2Kuzu
bash
pip install kuzuAmazon Neptune (Enterprise)
- Use with OpenSearch Serverless
- See AWS documentation for setup
docker run -d
--name neo4j
-p 7474:7474 -p 7687:7687
-e NEO4J_AUTH=neo4j/password
neo4j:5.26
--name neo4j
-p 7474:7474 -p 7687:7687
-e NEO4J_AUTH=neo4j/password
neo4j:5.26
**FalkorDB**
```bash
docker run -d \
--name falkordb \
-p 6379:6379 \
falkordb/falkordb:1.1.2Kuzu
bash
pip install kuzuAmazon Neptune(企业级)
- 与OpenSearch Serverless配合使用
- 请参阅AWS文档进行设置
LLM Configuration
LLM配置
Graphiti works with various LLM providers:
python
undefinedGraphiti可与多种LLM提供商配合使用:
python
undefinedOpenAI (default)
OpenAI(默认)
import os
os.environ["OPENAI_API_KEY"] = "your-key"
import os
os.environ["OPENAI_API_KEY"] = "your-key"
Anthropic Claude
Anthropic Claude
os.environ["ANTHROPIC_API_KEY"] = "your-key"
os.environ["ANTHROPIC_API_KEY"] = "your-key"
Google Gemini
Google Gemini
os.environ["GOOGLE_API_KEY"] = "your-key"
os.environ["GOOGLE_API_KEY"] = "your-key"
Azure OpenAI
Azure OpenAI
os.environ["AZURE_OPENAI_API_KEY"] = "your-key"
os.environ["AZURE_OPENAI_ENDPOINT"] = "your-endpoint"
undefinedos.environ["AZURE_OPENAI_API_KEY"] = "your-key"
os.environ["AZURE_OPENAI_ENDPOINT"] = "your-endpoint"
undefinedQuick Start with Graphiti
Graphiti快速入门
Basic Setup
基础设置
python
from graphiti_core import Graphiti
from graphiti_core.nodes import EpisodeTypepython
from graphiti_core import Graphiti
from graphiti_core.nodes import EpisodeTypeInitialize Graphiti with Neo4j
使用Neo4j初始化Graphiti
graphiti = Graphiti(
neo4j_uri="bolt://localhost:7687",
neo4j_user="neo4j",
neo4j_password="password"
)
graphiti = Graphiti(
neo4j_uri="bolt://localhost:7687",
neo4j_user="neo4j",
neo4j_password="password"
)
Initialize the graph
初始化图谱
await graphiti.build_indices_and_constraints()
undefinedawait graphiti.build_indices_and_constraints()
undefinedAdding Episodes (Messages/Events)
添加事件(消息/事件)
python
undefinedpython
undefinedAdd a conversation message
添加对话消息
await graphiti.add_episode(
name="User Message",
episode_body="I love hiking in the mountains, especially in Colorado.",
source_description="User conversation",
reference_time=datetime.now(),
episode_type=EpisodeType.message
)
await graphiti.add_episode(
name="User Message",
episode_body="I love hiking in the mountains, especially in Colorado.",
source_description="User conversation",
reference_time=datetime.now(),
episode_type=EpisodeType.message
)
Add business data
添加业务数据
await graphiti.add_episode(
name="Purchase Event",
episode_body="User purchased hiking boots size 10",
source_description="E-commerce transaction",
reference_time=datetime.now(),
episode_type=EpisodeType.json
)
await graphiti.add_episode(
name="Purchase Event",
episode_body="User purchased hiking boots size 10",
source_description="E-commerce transaction",
reference_time=datetime.now(),
episode_type=EpisodeType.json
)
Add document/file content
添加文档/文件内容
await graphiti.add_episode(
name="User Profile",
episode_body="John is a 35-year-old software engineer based in Denver",
source_description="User profile document",
reference_time=datetime.now(),
episode_type=EpisodeType.text
)
undefinedawait graphiti.add_episode(
name="User Profile",
episode_body="John is a 35-year-old software engineer based in Denver",
source_description="User profile document",
reference_time=datetime.now(),
episode_type=EpisodeType.text
)
undefinedQuerying the Graph
查询图谱
python
undefinedpython
undefinedSearch for relevant context
搜索相关上下文
results = await graphiti.search(
query="What outdoor activities does the user enjoy?",
num_results=10
)
for result in results:
print(f"Entity: {result.name}")
print(f"Content: {result.content}")
print(f"Relevance: {result.score}")
print("---")
results = await graphiti.search(
query="What outdoor activities does the user enjoy?",
num_results=10
)
for result in results:
print(f"Entity: {result.name}")
print(f"Content: {result.content}")
print(f"Relevance: {result.score}")
print("---")
Point-in-time query (historical data)
时间点查询(历史数据)
from datetime import datetime, timedelta
past_time = datetime.now() - timedelta(days=30)
historical_results = await graphiti.search(
query="User preferences",
reference_time=past_time,
num_results=5
)
undefinedfrom datetime import datetime, timedelta
past_time = datetime.now() - timedelta(days=30)
historical_results = await graphiti.search(
query="User preferences",
reference_time=past_time,
num_results=5
)
undefinedCustom Entity Types
自定义实体类型
python
from pydantic import BaseModel, Field
from typing import Optional
class Product(BaseModel):
"""Custom product entity"""
name: str = Field(description="Product name")
category: str = Field(description="Product category")
price: Optional[float] = Field(description="Product price")
in_stock: bool = Field(default=True, description="Availability status")
class Customer(BaseModel):
"""Custom customer entity"""
name: str = Field(description="Customer name")
email: str = Field(description="Customer email")
tier: str = Field(description="Customer tier: basic, premium, enterprise")
lifetime_value: Optional[float] = Field(description="Total customer value")python
from pydantic import BaseModel, Field
from typing import Optional
class Product(BaseModel):
"""自定义产品实体"""
name: str = Field(description="Product name")
category: str = Field(description="Product category")
price: Optional[float] = Field(description="Product price")
in_stock: bool = Field(default=True, description="Availability status")
class Customer(BaseModel):
"""自定义客户实体"""
name: str = Field(description="Customer name")
email: str = Field(description="Customer email")
tier: str = Field(description="Customer tier: basic, premium, enterprise")
lifetime_value: Optional[float] = Field(description="Total customer value")Register custom entity types
注册自定义实体类型
graphiti.register_entity_type(Product)
graphiti.register_entity_type(Customer)
graphiti.register_entity_type(Product)
graphiti.register_entity_type(Customer)
Add episodes with custom entities
添加包含自定义实体的事件
await graphiti.add_episode(
name="Product Purchase",
episode_body="Customer john@example.com purchased Premium Hiking Boots for $150",
source_description="Transaction log",
reference_time=datetime.now(),
entity_types=[Customer, Product]
)
undefinedawait graphiti.add_episode(
name="Product Purchase",
episode_body="Customer john@example.com purchased Premium Hiking Boots for $150",
source_description="Transaction log",
reference_time=datetime.now(),
entity_types=[Customer, Product]
)
undefinedWorking with Zep Platform
使用Zep平台
Python SDK
Python SDK
python
from zep_cloud.client import AsyncZep
from zep_cloud import Messagepython
from zep_cloud.client import AsyncZep
from zep_cloud import MessageInitialize Zep client
初始化Zep客户端
zep = AsyncZep(api_key="your-api-key")
zep = AsyncZep(api_key="your-api-key")
Create a user
创建用户
user = await zep.user.add(
user_id="user_123",
email="user@example.com",
first_name="John",
last_name="Doe",
metadata={"plan": "premium"}
)
user = await zep.user.add(
user_id="user_123",
email="user@example.com",
first_name="John",
last_name="Doe",
metadata={"plan": "premium"}
)
Create a session (conversation thread)
创建会话(对话线程)
session = await zep.memory.add_session(
session_id="session_456",
user_id="user_123",
metadata={"channel": "web"}
)
session = await zep.memory.add_session(
session_id="session_456",
user_id="user_123",
metadata={"channel": "web"}
)
Add messages to build memory
添加消息以构建记忆
await zep.memory.add_memory(
session_id="session_456",
messages=[
Message(
role="user",
content="I'm planning a hiking trip to Colorado next month"
),
Message(
role="assistant",
content="That sounds great! What areas are you considering?"
)
]
)
await zep.memory.add_memory(
session_id="session_456",
messages=[
Message(
role="user",
content="I'm planning a hiking trip to Colorado next month"
),
Message(
role="assistant",
content="That sounds great! What areas are you considering?"
)
]
)
Retrieve relevant context
检索相关上下文
memory = await zep.memory.get_memory(
session_id="session_456"
)
memory = await zep.memory.get_memory(
session_id="session_456"
)
Get facts from the knowledge graph
从知识图谱中获取事实
facts = memory.facts
facts = memory.facts
Get user summary
获取用户摘要
summary = memory.summary
summary = memory.summary
Search across user's graph
跨用户图谱搜索
search_results = await zep.memory.search_memory(
user_id="user_123",
text="outdoor activities and preferences",
search_scope="facts"
)
undefinedsearch_results = await zep.memory.search_memory(
user_id="user_123",
text="outdoor activities and preferences",
search_scope="facts"
)
undefinedTypeScript SDK
TypeScript SDK
typescript
import { ZepClient } from "@getzep/zep-cloud";
// Initialize client
const zep = new ZepClient({ apiKey: "your-api-key" });
// Create user
const user = await zep.user.add({
userId: "user_123",
email: "user@example.com",
firstName: "John",
lastName: "Doe"
});
// Add messages
await zep.memory.addMemory("session_456", {
messages: [
{ role: "user", content: "I love mountain biking" },
{ role: "assistant", content: "Great! Where do you usually ride?" }
]
});
// Get memory with context
const memory = await zep.memory.getMemory("session_456");
console.log("Facts:", memory.facts);
console.log("Summary:", memory.summary);
// Search user's knowledge graph
const results = await zep.memory.searchMemory({
userId: "user_123",
text: "outdoor hobbies",
searchScope: "facts"
});typescript
import { ZepClient } from "@getzep/zep-cloud";
// 初始化客户端
const zep = new ZepClient({ apiKey: "your-api-key" });
// 创建用户
const user = await zep.user.add({
userId: "user_123",
email: "user@example.com",
firstName: "John",
lastName: "Doe"
});
// 添加消息
await zep.memory.addMemory("session_456", {
messages: [
{ role: "user", content: "I love mountain biking" },
{ role: "assistant", content: "Great! Where do you usually ride?" }
]
});
// 获取带上下文的记忆
const memory = await zep.memory.getMemory("session_456");
console.log("Facts:", memory.facts);
console.log("Summary:", memory.summary);
// 搜索用户知识图谱
const results = await zep.memory.searchMemory({
userId: "user_123",
text: "outdoor hobbies",
searchScope: "facts"
});Advanced Features
高级功能
1. Batch Data Ingestion
1. 批量数据摄入
python
undefinedpython
undefinedAdd multiple episodes efficiently
高效添加多个事件
episodes = [
{
"name": f"Message {i}",
"episode_body": f"Content {i}",
"source_description": "Batch import",
"reference_time": datetime.now(),
"episode_type": EpisodeType.message
}
for i in range(100)
]
episodes = [
{
"name": f"Message {i}",
"episode_body": f"Content {i}",
"source_description": "Batch import",
"reference_time": datetime.now(),
"episode_type": EpisodeType.message
}
for i in range(100)
]
Process in parallel
并行处理
for episode in episodes:
await graphiti.add_episode(**episode)
undefinedfor episode in episodes:
await graphiti.add_episode(**episode)
undefined2. Custom Ontologies
2. 自定义本体
python
undefinedpython
undefinedDefine domain-specific relationships
定义领域特定关系
ontology = {
"entities": ["Customer", "Product", "Order", "Support Ticket"],
"relations": [
{"type": "purchased", "from": "Customer", "to": "Product"},
{"type": "contains", "from": "Order", "to": "Product"},
{"type": "created", "from": "Customer", "to": "Support Ticket"},
{"type": "related_to", "from": "Support Ticket", "to": "Product"}
]
}
ontology = {
"entities": ["Customer", "Product", "Order", "Support Ticket"],
"relations": [
{"type": "purchased", "from": "Customer", "to": "Product"},
{"type": "contains", "from": "Order", "to": "Product"},
{"type": "created", "from": "Customer", "to": "Support Ticket"},
{"type": "related_to", "from": "Support Ticket", "to": "Product"}
]
}
Use ontology in episode processing
在事件处理中使用本体
await graphiti.add_episode(
name="Customer Support",
episode_body="Customer had issue with hiking boots, opened ticket",
source_description="Support system",
reference_time=datetime.now(),
ontology=ontology
)
undefinedawait graphiti.add_episode(
name="Customer Support",
episode_body="Customer had issue with hiking boots, opened ticket",
source_description="Support system",
reference_time=datetime.now(),
ontology=ontology
)
undefined3. Graph Traversal Queries
3. 图遍历查询
python
undefinedpython
undefinedFind connected entities
查找关联实体
from graphiti_core.search import GraphTraversal
from graphiti_core.search import GraphTraversal
Find all products a user has purchased
查找用户购买的所有产品
traversal = GraphTraversal(
start_entity="user_123",
relationship_types=["purchased"],
max_depth=2
)
related_products = await graphiti.traverse(traversal)
traversal = GraphTraversal(
start_entity="user_123",
relationship_types=["purchased"],
max_depth=2
)
related_products = await graphiti.traverse(traversal)
Find similar users based on preferences
根据偏好查找相似用户
similar_users = await graphiti.find_similar_entities(
entity_id="user_123",
entity_type="Customer",
similarity_threshold=0.7
)
undefinedsimilar_users = await graphiti.find_similar_entities(
entity_id="user_123",
entity_type="Customer",
similarity_threshold=0.7
)
undefined4. Context Assembly
4. 上下文组装
python
undefinedpython
undefinedAssemble comprehensive context for agent
为Agent组装全面上下文
context = await zep.memory.get_memory(
session_id="session_456",
memory_type="perpetual" # Includes full user graph
)
context = await zep.memory.get_memory(
session_id="session_456",
memory_type="perpetual" # 包含完整用户图谱
)
Build agent prompt with context
构建带上下文的Agent提示词
system_prompt = f"""
You are a helpful assistant. Here's what you know about the user:
User Summary: {context.summary}
Recent Facts:
{chr(10).join(f"- {fact}" for fact in context.facts[:5])}
User Preferences:
{chr(10).join(f"- {pref}" for pref in context.relevant_facts)}
"""
system_prompt = f"""
You are a helpful assistant. Here's what you know about the user:
User Summary: {context.summary}
Recent Facts:
{chr(10).join(f"- {fact}" for fact in context.facts[:5])}
User Preferences:
{chr(10).join(f"- {pref}" for pref in context.relevant_facts)}
"""
Use in your agent framework
在Agent框架中使用
from langchain.chat_models import ChatAnthropic
llm = ChatAnthropic(model="claude-3-5-sonnet-20241022")
response = llm.invoke([
{"role": "system", "content": system_prompt},
{"role": "user", "content": "Recommend some activities for my trip"}
])
undefinedfrom langchain.chat_models import ChatAnthropic
llm = ChatAnthropic(model="claude-3-5-sonnet-20241022")
response = llm.invoke([
{"role": "system", "content": system_prompt},
{"role": "user", "content": "Recommend some activities for my trip"}
])
undefined5. Temporal Queries
5. 时序查询
python
undefinedpython
undefinedQuery what was known at a specific time
查询特定时间点的已知信息
from datetime import datetime, timedelta
from datetime import datetime, timedelta
What did we know 7 days ago?
7天前我们知道什么?
past_date = datetime.now() - timedelta(days=7)
past_context = await graphiti.search(
query="user preferences",
reference_time=past_date
)
past_date = datetime.now() - timedelta(days=7)
past_context = await graphiti.search(
query="user preferences",
reference_time=past_date
)
Track how knowledge evolved
跟踪知识演变
current_context = await graphiti.search(
query="user preferences",
reference_time=datetime.now()
)
current_context = await graphiti.search(
query="user preferences",
reference_time=datetime.now()
)
Compare past vs current
比较过去与当前
print("7 days ago:", past_context)
print("Current:", current_context)
undefinedprint("7 days ago:", past_context)
print("Current:", current_context)
undefinedIntegration with Agent Frameworks
与Agent框架集成
LangGraph Integration
LangGraph集成
python
from langgraph.graph import StateGraph, END
from langchain_anthropic import ChatAnthropic
from zep_cloud.client import AsyncZeppython
from langgraph.graph import StateGraph, END
from langchain_anthropic import ChatAnthropic
from zep_cloud.client import AsyncZepInitialize
初始化
zep = AsyncZep(api_key="your-key")
llm = ChatAnthropic(model="claude-3-5-sonnet-20241022")
zep = AsyncZep(api_key="your-key")
llm = ChatAnthropic(model="claude-3-5-sonnet-20241022")
Define agent state
定义Agent状态
class AgentState(TypedDict):
messages: list
user_id: str
session_id: str
context: str
class AgentState(TypedDict):
messages: list
user_id: str
session_id: str
context: str
Context retrieval node
上下文检索节点
async def get_context(state: AgentState):
memory = await zep.memory.get_memory(state["session_id"])
context = f"Summary: {memory.summary}\nFacts: {memory.facts}"
return {"context": context}
async def get_context(state: AgentState):
memory = await zep.memory.get_memory(state["session_id"])
context = f"Summary: {memory.summary}\nFacts: {memory.facts}"
return {"context": context}
LLM node with context
带上下文的LLM节点
async def call_llm(state: AgentState):
system_msg = f"Context: {state['context']}"
messages = [{"role": "system", "content": system_msg}] + state["messages"]
response = await llm.ainvoke(messages)
return {"messages": state["messages"] + [response]}
async def call_llm(state: AgentState):
system_msg = f"Context: {state['context']}"
messages = [{"role": "system", "content": system_msg}] + state["messages"]
response = await llm.ainvoke(messages)
return {"messages": state["messages"] + [response]}
Build graph
构建图谱
workflow = StateGraph(AgentState)
workflow.add_node("get_context", get_context)
workflow.add_node("call_llm", call_llm)
workflow.add_edge("get_context", "call_llm")
workflow.add_edge("call_llm", END)
workflow.set_entry_point("get_context")
app = workflow.compile()
workflow = StateGraph(AgentState)
workflow.add_node("get_context", get_context)
workflow.add_node("call_llm", call_llm)
workflow.add_edge("get_context", "call_llm")
workflow.add_edge("call_llm", END)
workflow.set_entry_point("get_context")
app = workflow.compile()
Run agent
运行Agent
result = await app.ainvoke({
"messages": [{"role": "user", "content": "Plan my trip"}],
"user_id": "user_123",
"session_id": "session_456",
"context": ""
})
undefinedresult = await app.ainvoke({
"messages": [{"role": "user", "content": "Plan my trip"}],
"user_id": "user_123",
"session_id": "session_456",
"context": ""
})
undefinedCrewAI Integration
CrewAI集成
python
from crewai import Agent, Task, Crew
from zep_cloud.client import AsyncZeppython
from crewai import Agent, Task, Crew
from zep_cloud.client import AsyncZepInitialize Zep
初始化Zep
zep = AsyncZep(api_key="your-key")
zep = AsyncZep(api_key="your-key")
Custom tool for memory retrieval
用于记忆检索的自定义工具
from crewai_tools import tool
@tool("get_user_context")
async def get_user_context(user_id: str) -> str:
"""Retrieve user context from Zep knowledge graph"""
results = await zep.memory.search_memory(
user_id=user_id,
text="preferences and history",
search_scope="facts"
)
return "\n".join([r.content for r in results])
from crewai_tools import tool
@tool("get_user_context")
async def get_user_context(user_id: str) -> str:
"""从Zep知识图谱中检索用户上下文"""
results = await zep.memory.search_memory(
user_id=user_id,
text="preferences and history",
search_scope="facts"
)
return "\n".join([r.content for r in results])
Create agent with memory
创建带记忆的Agent
planning_agent = Agent(
role="Travel Planner",
goal="Create personalized travel plans",
backstory="Expert at creating customized itineraries",
tools=[get_user_context],
verbose=True
)
planning_agent = Agent(
role="Travel Planner",
goal="Create personalized travel plans",
backstory="Expert at creating customized itineraries",
tools=[get_user_context],
verbose=True
)
Define task
定义任务
task = Task(
description="Plan a hiking trip for user_123 based on their preferences",
agent=planning_agent,
expected_output="Detailed 3-day itinerary"
)
task = Task(
description="Plan a hiking trip for user_123 based on their preferences",
agent=planning_agent,
expected_output="Detailed 3-day itinerary"
)
Create crew
创建团队
crew = Crew(
agents=[planning_agent],
tasks=[task]
)
crew = Crew(
agents=[planning_agent],
tasks=[task]
)
Execute with context
结合上下文执行
result = crew.kickoff()
undefinedresult = crew.kickoff()
undefinedUse Cases and Examples
用例与示例
1. E-commerce Personalization
1. 电商个性化
python
undefinedpython
undefinedTrack customer behavior
跟踪客户行为
await graphiti.add_episode(
name="Product View",
episode_body="Customer viewed Premium Hiking Boots, spent 5 minutes",
source_description="Analytics",
reference_time=datetime.now()
)
await graphiti.add_episode(
name="Cart Addition",
episode_body="Customer added Premium Hiking Boots size 10 to cart",
source_description="E-commerce",
reference_time=datetime.now()
)
await graphiti.add_episode(
name="Product View",
episode_body="Customer viewed Premium Hiking Boots, spent 5 minutes",
source_description="Analytics",
reference_time=datetime.now()
)
await graphiti.add_episode(
name="Cart Addition",
episode_body="Customer added Premium Hiking Boots size 10 to cart",
source_description="E-commerce",
reference_time=datetime.now()
)
Retrieve personalized recommendations
检索个性化推荐
context = await graphiti.search(
query="Products customer is interested in",
num_results=5
)
context = await graphiti.search(
query="Products customer is interested in",
num_results=5
)
Use for recommendation agent
用于推荐Agent
recommendations = llm.invoke(f"""
Based on this context: {context}
Recommend 3 products the customer might like.
""")
undefinedrecommendations = llm.invoke(f"""
Based on this context: {context}
Recommend 3 products the customer might like.
""")
undefined2. Customer Support Agent
2. 客户支持Agent
python
undefinedpython
undefinedBuild comprehensive support context
构建全面的支持上下文
support_context = await zep.memory.get_memory(session_id="support_789")
support_context = await zep.memory.get_memory(session_id="support_789")
Include:
包含:
- Past issues and resolutions
- 过往问题与解决方案
- Product ownership
- 产品所有权
- Communication preferences
- 沟通偏好
- Account history
- 账户历史
system_prompt = f"""
You are a customer support agent.
Customer History:
{support_context.summary}
Previous Issues:
{support_context.facts}
Provide helpful, personalized support.
"""
system_prompt = f"""
You are a customer support agent.
Customer History:
{support_context.summary}
Previous Issues:
{support_context.facts}
Provide helpful, personalized support.
"""
Support agent with full context
带完整上下文的支持Agent
support_agent = Agent(
model="claude-3-5-sonnet-20241022",
system=system_prompt
)
undefinedsupport_agent = Agent(
model="claude-3-5-sonnet-20241022",
system=system_prompt
)
undefined3. Healthcare Assistant (HIPAA-Compliant)
3. 医疗助理(符合HIPAA标准)
python
undefinedpython
undefinedTrack patient information securely
安全跟踪患者信息
await graphiti.add_episode(
name="Patient Visit",
episode_body="Patient reported back pain, prescribed physical therapy",
source_description="EMR System",
reference_time=datetime.now(),
metadata={"confidential": True, "hipaa_compliant": True}
)
await graphiti.add_episode(
name="Patient Visit",
episode_body="Patient reported back pain, prescribed physical therapy",
source_description="EMR System",
reference_time=datetime.now(),
metadata={"confidential": True, "hipaa_compliant": True}
)
Retrieve medical history for treatment
检索治疗所需的病史
medical_history = await graphiti.search(
query="patient symptoms and treatments",
filters={"metadata.confidential": True}
)
medical_history = await graphiti.search(
query="patient symptoms and treatments",
filters={"metadata.confidential": True}
)
Use Zep's enterprise features for compliance
使用Zep的企业级功能确保合规
- RBAC for access control
- RBAC访问控制
- Audit logging
- 审计日志
- Data encryption
- 数据加密
- BYOK (Bring Your Own Key)
- BYOK(自带密钥)
undefinedundefined4. Multi-User Chat Application
4. 多用户聊天应用
python
undefinedpython
undefinedTrack individual user contexts in group chat
在群聊中跟踪单个用户上下文
users = ["user_1", "user_2", "user_3"]
for user_id in users:
# Each user has their own knowledge graph
await zep.user.add(user_id=user_id)
users = ["user_1", "user_2", "user_3"]
for user_id in users:
# 每个用户都有自己的知识图谱
await zep.user.add(user_id=user_id)
Add messages with user attribution
添加带用户归属的消息
await zep.memory.add_memory(
session_id="group_chat_123",
messages=[
Message(role="user", content="I prefer morning meetings",
metadata={"user_id": "user_1"}),
Message(role="user", content="I'm usually free after 2pm",
metadata={"user_id": "user_2"})
]
)
await zep.memory.add_memory(
session_id="group_chat_123",
messages=[
Message(role="user", content="I prefer morning meetings",
metadata={"user_id": "user_1"}),
Message(role="user", content="I'm usually free after 2pm",
metadata={"user_id": "user_2"})
]
)
Retrieve context for each user
检索每个用户的上下文
for user_id in users:
user_context = await zep.memory.search_memory(
user_id=user_id,
text="scheduling preferences"
)
print(f"{user_id} preferences: {user_context}")
undefinedfor user_id in users:
user_context = await zep.memory.search_memory(
user_id=user_id,
text="scheduling preferences"
)
print(f"{user_id} preferences: {user_context}")
undefinedBest Practices
最佳实践
1. Efficient Data Ingestion
1. 高效数据摄入
python
undefinedpython
undefinedBatch episodes for better performance
批量处理事件以提升性能
async def batch_ingest(episodes, batch_size=50):
for i in range(0, len(episodes), batch_size):
batch = episodes[i:i + batch_size]
await asyncio.gather(*[
graphiti.add_episode(**ep) for ep in batch
])
undefinedasync def batch_ingest(episodes, batch_size=50):
for i in range(0, len(episodes), batch_size):
batch = episodes[i:i + batch_size]
await asyncio.gather(*[
graphiti.add_episode(**ep) for ep in batch
])
undefined2. Context Window Management
2. 上下文窗口管理
python
undefinedpython
undefinedLimit context to most relevant information
将上下文限制为最相关的信息
def assemble_context(memory, max_facts=10, max_tokens=4000):
facts = memory.facts[:max_facts]
context = f"Summary: {memory.summary}\n\nKey Facts:\n"
for fact in facts:
context += f"- {fact}\n"
if len(context) > max_tokens:
break
return context[:max_tokens]undefineddef assemble_context(memory, max_facts=10, max_tokens=4000):
facts = memory.facts[:max_facts]
context = f"Summary: {memory.summary}\n\nKey Facts:\n"
for fact in facts:
context += f"- {fact}\n"
if len(context) > max_tokens:
break
return context[:max_tokens]undefined3. Entity Resolution
3. 实体解析
python
undefinedpython
undefinedHandle entity variations and aliases
处理实体变体与别名
entity_aliases = {
"Colorado": ["CO", "Colorful Colorado", "Centennial State"],
"hiking": ["trekking", "backpacking", "trail walking"]
}
entity_aliases = {
"Colorado": ["CO", "Colorful Colorado", "Centennial State"],
"hiking": ["trekking", "backpacking", "trail walking"]
}
Normalize entities before ingestion
在摄入前标准化实体
def normalize_episode(text):
for canonical, aliases in entity_aliases.items():
for alias in aliases:
text = text.replace(alias, canonical)
return text
undefineddef normalize_episode(text):
for canonical, aliases in entity_aliases.items():
for alias in aliases:
text = text.replace(alias, canonical)
return text
undefined4. Privacy and Security
4. 隐私与安全
python
undefinedpython
undefinedImplement data retention policies
实施数据保留策略
from datetime import timedelta
async def cleanup_old_data(user_id, retention_days=90):
cutoff_date = datetime.now() - timedelta(days=retention_days)
# Delete old episodes
await graphiti.delete_episodes(
user_id=user_id,
before_date=cutoff_date
)from datetime import timedelta
async def cleanup_old_data(user_id, retention_days=90):
cutoff_date = datetime.now() - timedelta(days=retention_days)
# 删除旧事件
await graphiti.delete_episodes(
user_id=user_id,
before_date=cutoff_date
)Redact sensitive information
编辑敏感信息
def redact_pii(text):
import re
# Redact email addresses
text = re.sub(r'\b[\w.-]+@[\w.-]+.\w+\b', '[EMAIL]', text)
# Redact phone numbers
text = re.sub(r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b', '[PHONE]', text)
return text
undefineddef redact_pii(text):
import re
# 编辑电子邮件地址
text = re.sub(r'\b[\w.-]+@[\w.-]+.\w+\b', '[EMAIL]', text)
# 编辑电话号码
text = re.sub(r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b', '[PHONE]', text)
return text
undefined5. Monitoring and Debugging
5. 监控与调试
python
undefinedpython
undefinedLog graph operations
记录图谱操作
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("graphiti")
async def add_episode_with_logging(graphiti, **kwargs):
logger.info(f"Adding episode: {kwargs['name']}")
try:
result = await graphiti.add_episode(**kwargs)
logger.info(f"Successfully added episode: {result}")
return result
except Exception as e:
logger.error(f"Failed to add episode: {e}")
raise
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("graphiti")
async def add_episode_with_logging(graphiti, **kwargs):
logger.info(f"Adding episode: {kwargs['name']}")
try:
result = await graphiti.add_episode(**kwargs)
logger.info(f"Successfully added episode: {result}")
return result
except Exception as e:
logger.error(f"Failed to add episode: {e}")
raise
Track performance metrics
跟踪性能指标
import time
async def search_with_metrics(graphiti, query):
start = time.time()
results = await graphiti.search(query)
latency = time.time() - start
logger.info(f"Search latency: {latency:.3f}s, Results: {len(results)}")
return results
undefinedimport time
async def search_with_metrics(graphiti, query):
start = time.time()
results = await graphiti.search(query)
latency = time.time() - start
logger.info(f"Search latency: {latency:.3f}s, Results: {len(results)}")
return results
undefinedTroubleshooting
故障排除
Common Issues
常见问题
"Connection to graph database failed"
"连接图数据库失败"
python
undefinedpython
undefinedVerify database is running
验证数据库是否运行
For Neo4j:
对于Neo4j:
docker ps | grep neo4j
docker ps | grep neo4j
Test connection
测试连接
from neo4j import GraphDatabase
driver = GraphDatabase.driver(
"bolt://localhost:7687",
auth=("neo4j", "password")
)
with driver.session() as session:
result = session.run("RETURN 1 as num")
print(result.single()["num"]) # Should print 1
undefinedfrom neo4j import GraphDatabase
driver = GraphDatabase.driver(
"bolt://localhost:7687",
auth=("neo4j", "password")
)
with driver.session() as session:
result = session.run("RETURN 1 as num")
print(result.single()["num"]) # 应输出1
undefined"Rate limit exceeded"
"超出速率限制"
python
undefinedpython
undefinedImplement exponential backoff
实现指数退避
import asyncio
async def add_episode_with_retry(graphiti, max_retries=3, **kwargs):
for attempt in range(max_retries):
try:
return await graphiti.add_episode(**kwargs)
except RateLimitError:
wait_time = 2 ** attempt
await asyncio.sleep(wait_time)
raise Exception("Max retries exceeded")
undefinedimport asyncio
async def add_episode_with_retry(graphiti, max_retries=3, **kwargs):
for attempt in range(max_retries):
try:
return await graphiti.add_episode(**kwargs)
except RateLimitError:
wait_time = 2 ** attempt
await asyncio.sleep(wait_time)
raise Exception("Max retries exceeded")
undefined"Search returns no results"
"搜索无结果返回"
python
undefinedpython
undefinedDebug search configuration
调试搜索配置
results = await graphiti.search(
query="user preferences",
num_results=20, # Increase number
search_config={
"semantic_weight": 0.5, # Adjust weights
"bm25_weight": 0.3,
"graph_weight": 0.2
}
)
results = await graphiti.search(
query="user preferences",
num_results=20, # 增加结果数量
search_config={
"semantic_weight": 0.5, # 调整权重
"bm25_weight": 0.3,
"graph_weight": 0.2
}
)
Check if data exists
检查数据是否存在
all_nodes = await graphiti.get_all_nodes(limit=10)
print(f"Total nodes in graph: {len(all_nodes)}")
undefinedall_nodes = await graphiti.get_all_nodes(limit=10)
print(f"图谱中的总节点数: {len(all_nodes)}")
undefinedConfiguration Options
配置选项
Graphiti Configuration
Graphiti配置
python
from graphiti_core import Graphiti
from graphiti_core.llm import OpenAIClient
graphiti = Graphiti(
# Database
neo4j_uri="bolt://localhost:7687",
neo4j_user="neo4j",
neo4j_password="password",
# LLM
llm_client=OpenAIClient(
model="gpt-4o",
temperature=0.7
),
# Search configuration
search_config={
"semantic_weight": 0.5,
"bm25_weight": 0.3,
"graph_weight": 0.2
},
# Processing
max_workers=4, # Parallel processing
batch_size=50, # Batch size for ingestion
# Embeddings
embedding_model="text-embedding-3-small",
embedding_dim=1536
)python
from graphiti_core import Graphiti
from graphiti_core.llm import OpenAIClient
graphiti = Graphiti(
# 数据库
neo4j_uri="bolt://localhost:7687",
neo4j_user="neo4j",
neo4j_password="password",
# LLM
llm_client=OpenAIClient(
model="gpt-4o",
temperature=0.7
),
# 搜索配置
search_config={
"semantic_weight": 0.5,
"bm25_weight": 0.3,
"graph_weight": 0.2
},
# 处理
max_workers=4, # 并行处理
batch_size=50, # 摄入批次大小
# 嵌入
embedding_model="text-embedding-3-small",
embedding_dim=1536
)Zep Platform Configuration
Zep平台配置
python
from zep_cloud.client import AsyncZep
zep = AsyncZep(
api_key="your-api-key",
# Optional: Custom endpoint
base_url="https://api.getzep.com",
# Timeout settings
timeout=30.0,
# Retry configuration
max_retries=3
)python
from zep_cloud.client import AsyncZep
zep = AsyncZep(
api_key="your-api-key",
# 可选:自定义端点
base_url="https://api.getzep.com",
# 超时设置
timeout=30.0,
# 重试配置
max_retries=3
)Performance Optimization
性能优化
1. Indexing Strategy
1. 索引策略
python
undefinedpython
undefinedCreate optimal indices
创建最优索引
await graphiti.build_indices_and_constraints()
await graphiti.build_indices_and_constraints()
Custom indices for frequently queried properties
为频繁查询的属性创建自定义索引
await graphiti.create_index("User", "email")
await graphiti.create_index("Product", "sku")
undefinedawait graphiti.create_index("User", "email")
await graphiti.create_index("Product", "sku")
undefined2. Query Optimization
2. 查询优化
python
undefinedpython
undefinedUse filters to narrow search scope
使用过滤器缩小搜索范围
results = await graphiti.search(
query="hiking products",
filters={
"entity_type": "Product",
"category": "outdoor",
"price_range": {"min": 50, "max": 200}
},
num_results=5
)
results = await graphiti.search(
query="hiking products",
filters={
"entity_type": "Product",
"category": "outdoor",
"price_range": {"min": 50, "max": 200}
},
num_results=5
)
Limit traversal depth for graph queries
限制图查询的遍历深度
results = await graphiti.traverse(
start_node="user_123",
max_depth=2, # Limit depth
relationship_types=["purchased", "viewed"]
)
undefinedresults = await graphiti.traverse(
start_node="user_123",
max_depth=2, # 限制深度
relationship_types=["purchased", "viewed"]
)
undefined3. Caching
3. 缓存
python
from functools import lru_cache
import hashlibpython
from functools import lru_cache
import hashlibCache frequent searches
缓存频繁搜索
@lru_cache(maxsize=100)
def cache_key(query, user_id):
return hashlib.md5(f"{query}:{user_id}".encode()).hexdigest()
async def cached_search(graphiti, query, user_id):
key = cache_key(query, user_id)
# Implement your caching logic
return await graphiti.search(query)
undefined@lru_cache(maxsize=100)
def cache_key(query, user_id):
return hashlib.md5(f"{query}:{user_id}".encode()).hexdigest()
async def cached_search(graphiti, query, user_id):
key = cache_key(query, user_id)
# 实现你的缓存逻辑
return await graphiti.search(query)
undefinedResources
资源
Documentation
文档
- Graphiti GitHub: https://github.com/getzep/graphiti
- Zep Documentation: https://help.getzep.com/
- Zep Concepts: https://help.getzep.com/concepts
- SDK Reference: https://help.getzep.com/sdk-reference
- Graphiti GitHub: https://github.com/getzep/graphiti
- Zep 文档: https://help.getzep.com/
- Zep 概念: https://help.getzep.com/concepts
- SDK 参考: https://help.getzep.com/sdk-reference
Supported Databases
支持的数据库
- Neo4j 5.26+
- FalkorDB 1.1.2+
- Kuzu 0.11.2+
- Amazon Neptune with OpenSearch Serverless
- Neo4j 5.26+
- FalkorDB 1.1.2+
- Kuzu 0.11.2+
- Amazon Neptune(搭配OpenSearch Serverless)
Supported LLM Providers
支持的LLM提供商
- OpenAI (GPT-4, GPT-4o, GPT-3.5)
- Anthropic (Claude 3.5 Sonnet, Claude 3 Opus/Sonnet/Haiku)
- Google (Gemini Pro, Gemini Flash)
- Groq
- Azure OpenAI
- OpenAI(GPT-4、GPT-4o、GPT-3.5)
- Anthropic(Claude 3.5 Sonnet、Claude 3 Opus/Sonnet/Haiku)
- Google(Gemini Pro、Gemini Flash)
- Groq
- Azure OpenAI
Integration Partners
集成合作伙伴
- LangGraph
- LangChain
- CrewAI
- Microsoft Autogen
- NVIDIA NeMo
- LiveKit (voice)
- LangGraph
- LangChain
- CrewAI
- Microsoft Autogen
- NVIDIA NeMo
- LiveKit(语音)
License
许可证
Graphiti is open-source. Zep offers both open-source and commercial licensing options.
Note: This skill provides comprehensive guidance for building AI agents with temporal knowledge graphs using Graphiti and Zep. Always ensure proper data handling, privacy compliance, and security measures for production applications.
Graphiti是开源软件。Zep提供开源和商业两种许可证选项。
注意: 本技能为使用Graphiti和Zep构建具备时序知识图谱的AI Agent提供全面指导。在生产应用中,请始终确保采取适当的数据处理、隐私合规和安全措施。