langchain-orchestration
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseLangChain Orchestration Skill
LangChain编排技能指南
Complete guide for building production-grade LLM applications with LangChain, covering chains, agents, memory, RAG patterns, and advanced orchestration techniques.
使用LangChain构建生产级LLM应用的完整指南,涵盖链、Agent、内存、RAG模式和高级编排技术。
Table of Contents
目录
Core Concepts
核心概念
LangChain Expression Language (LCEL)
LangChain表达式语言(LCEL)
LCEL is the declarative way to compose chains in LangChain, enabling streaming, async, and parallel execution.
python
from langchain_core.runnables import RunnablePassthrough
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAILCEL是LangChain中用于组合链的声明式方式,支持流式传输、异步和并行执行。
python
from langchain_core.runnables import RunnablePassthrough
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAIBasic LCEL chain
基础LCEL链
prompt = ChatPromptTemplate.from_template("Tell me about {topic}")
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
output_parser = StrOutputParser()
chain = prompt | llm | output_parser
result = chain.invoke({"topic": "quantum computing"})
undefinedprompt = ChatPromptTemplate.from_template("Tell me about {topic}")
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
output_parser = StrOutputParser()
chain = prompt | llm | output_parser
result = chain.invoke({"topic": "quantum computing"})
undefinedRunnable Interface
Runnable接口
Every component in LangChain implements the Runnable interface with standard methods:
python
from langchain_core.runnables import RunnablePassthroughLangChain中的每个组件都实现了Runnable接口,包含标准方法:
python
from langchain_core.runnables import RunnablePassthroughKey methods: invoke, stream, batch, ainvoke, astream, abatch
关键方法:invoke、stream、batch、ainvoke、astream、abatch
chain = prompt | llm | output_parser
chain = prompt | llm | output_parser
Synchronous invoke
同步调用
result = chain.invoke({"topic": "AI"})
result = chain.invoke({"topic": "AI"})
Streaming
流式传输
for chunk in chain.stream({"topic": "AI"}):
print(chunk, end="", flush=True)
for chunk in chain.stream({"topic": "AI"}):
print(chunk, end="", flush=True)
Batch processing
批量处理
results = chain.batch([{"topic": "AI"}, {"topic": "ML"}])
results = chain.batch([{"topic": "AI"}, {"topic": "ML"}])
Async variants
异步变体
result = await chain.ainvoke({"topic": "AI"})
undefinedresult = await chain.ainvoke({"topic": "AI"})
undefinedRunnablePassthrough
RunnablePassthrough
Pass inputs directly through or apply transformations:
python
from langchain_core.runnables import RunnablePassthrough直接传递输入或应用转换:
python
from langchain_core.runnables import RunnablePassthroughPass through unchanged
保持不变地传递
chain = RunnablePassthrough() | llm | output_parser
chain = RunnablePassthrough() | llm | output_parser
With transformation
带转换的传递
def add_context(x):
return {"text": x["input"], "context": "important"}
chain = RunnablePassthrough.assign(processed=add_context) | llm
undefineddef add_context(x):
return {"text": x["input"], "context": "important"}
chain = RunnablePassthrough.assign(processed=add_context) | llm
undefinedChains
链
Sequential Chains
顺序链
Process data through multiple steps sequentially.
python
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(temperature=0)通过多个步骤按顺序处理数据。
python
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(temperature=0)Step 1: Generate ideas
步骤1:生成创意
idea_prompt = ChatPromptTemplate.from_template(
"Generate 3 creative ideas for: {topic}"
)
idea_chain = idea_prompt | llm | StrOutputParser()
idea_prompt = ChatPromptTemplate.from_template(
"Generate 3 creative ideas for: {topic}"
)
idea_chain = idea_prompt | llm | StrOutputParser()
Step 2: Evaluate ideas
步骤2:评估创意
eval_prompt = ChatPromptTemplate.from_template(
"Evaluate these ideas and pick the best one:\n{ideas}"
)
eval_chain = eval_prompt | llm | StrOutputParser()
eval_prompt = ChatPromptTemplate.from_template(
"Evaluate these ideas and pick the best one:\n{ideas}"
)
eval_chain = eval_prompt | llm | StrOutputParser()
Combine into sequential chain
组合成顺序链
sequential_chain = (
{"ideas": idea_chain}
| RunnablePassthrough.assign(evaluation=eval_chain)
)
result = sequential_chain.invoke({"topic": "mobile app"})
undefinedsequential_chain = (
{"ideas": idea_chain}
| RunnablePassthrough.assign(evaluation=eval_chain)
)
result = sequential_chain.invoke({"topic": "mobile app"})
undefinedMap-Reduce Chains
映射-归约链
Process multiple inputs in parallel and combine results.
python
from langchain_core.runnables import RunnableParallel
from langchain_core.prompts import ChatPromptTemplate并行处理多个输入并合并结果。
python
from langchain_core.runnables import RunnableParallel
from langchain_core.prompts import ChatPromptTemplateDefine parallel processing
定义并行处理
summary_prompt = ChatPromptTemplate.from_template(
"Summarize this text in one sentence: {text}"
)
keywords_prompt = ChatPromptTemplate.from_template(
"Extract 3 keywords from: {text}"
)
sentiment_prompt = ChatPromptTemplate.from_template(
"Analyze sentiment (positive/negative/neutral): {text}"
)
summary_prompt = ChatPromptTemplate.from_template(
"Summarize this text in one sentence: {text}"
)
keywords_prompt = ChatPromptTemplate.from_template(
"Extract 3 keywords from: {text}"
)
sentiment_prompt = ChatPromptTemplate.from_template(
"Analyze sentiment (positive/negative/neutral): {text}"
)
Map: Process in parallel
映射:并行处理
map_chain = RunnableParallel(
summary=summary_prompt | llm | StrOutputParser(),
keywords=keywords_prompt | llm | StrOutputParser(),
sentiment=sentiment_prompt | llm | StrOutputParser()
)
map_chain = RunnableParallel(
summary=summary_prompt | llm | StrOutputParser(),
keywords=keywords_prompt | llm | StrOutputParser(),
sentiment=sentiment_prompt | llm | StrOutputParser()
)
Reduce: Combine results
归约:合并结果
reduce_prompt = ChatPromptTemplate.from_template(
"""Combine the analysis:
Summary: {summary}
Keywords: {keywords}
Sentiment: {sentiment}
Provide a comprehensive report:""")
map_reduce_chain = map_chain | reduce_prompt | llm | StrOutputParser()
result = map_reduce_chain.invoke({
"text": "LangChain is an amazing framework for building LLM applications."
})
undefinedreduce_prompt = ChatPromptTemplate.from_template(
"""Combine the analysis:
Summary: {summary}
Keywords: {keywords}
Sentiment: {sentiment}
Provide a comprehensive report:""")
map_reduce_chain = map_chain | reduce_prompt | llm | StrOutputParser()
result = map_reduce_chain.invoke({
"text": "LangChain is an amazing framework for building LLM applications."
})
undefinedRouter Chains
路由链
Route inputs to different chains based on conditions.
python
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser根据条件将输入路由到不同的链。
python
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParserDefine specialized chains
定义专用链
technical_prompt = ChatPromptTemplate.from_template(
"Provide a technical explanation of: {query}"
)
simple_prompt = ChatPromptTemplate.from_template(
"Explain in simple terms: {query}"
)
technical_chain = technical_prompt | llm | StrOutputParser()
simple_chain = simple_prompt | llm | StrOutputParser()
technical_prompt = ChatPromptTemplate.from_template(
"Provide a technical explanation of: {query}"
)
simple_prompt = ChatPromptTemplate.from_template(
"Explain in simple terms: {query}"
)
technical_chain = technical_prompt | llm | StrOutputParser()
simple_chain = simple_prompt | llm | StrOutputParser()
Router function
路由函数
def route_query(input_dict):
query = input_dict["query"]
complexity = input_dict.get("complexity", "simple")
if complexity == "technical":
return technical_chain
return simple_chaindef route_query(input_dict):
query = input_dict["query"]
complexity = input_dict.get("complexity", "simple")
if complexity == "technical":
return technical_chain
return simple_chainCreate router chain
创建路由链
from langchain_core.runnables import RunnableLambda
router_chain = RunnableLambda(route_query)
from langchain_core.runnables import RunnableLambda
router_chain = RunnableLambda(route_query)
Use the router
使用路由链
result = router_chain.invoke({
"query": "quantum entanglement",
"complexity": "technical"
})
undefinedresult = router_chain.invoke({
"query": "quantum entanglement",
"complexity": "technical"
})
undefinedConditional Chains
条件链
Execute chains based on conditions.
python
from langchain_core.runnables import RunnableBranch根据条件执行链。
python
from langchain_core.runnables import RunnableBranchDefine condition-based routing
定义基于条件的路由
classification_prompt = ChatPromptTemplate.from_template(
"Classify this as 'question', 'statement', or 'command': {text}"
)
question_handler = ChatPromptTemplate.from_template(
"Answer this question: {text}"
) | llm | StrOutputParser()
statement_handler = ChatPromptTemplate.from_template(
"Acknowledge this statement: {text}"
) | llm | StrOutputParser()
command_handler = ChatPromptTemplate.from_template(
"Execute this command: {text}"
) | llm | StrOutputParser()
classification_prompt = ChatPromptTemplate.from_template(
"Classify this as 'question', 'statement', or 'command': {text}"
)
question_handler = ChatPromptTemplate.from_template(
"Answer this question: {text}"
) | llm | StrOutputParser()
statement_handler = ChatPromptTemplate.from_template(
"Acknowledge this statement: {text}"
) | llm | StrOutputParser()
command_handler = ChatPromptTemplate.from_template(
"Execute this command: {text}"
) | llm | StrOutputParser()
Create conditional branch
创建条件分支
branch = RunnableBranch(
(lambda x: "question" in x["type"].lower(), question_handler),
(lambda x: "statement" in x["type"].lower(), statement_handler),
command_handler # default
)
branch = RunnableBranch(
(lambda x: "question" in x["type"].lower(), question_handler),
(lambda x: "statement" in x["type"].lower(), statement_handler),
command_handler # 默认分支
)
Full chain with classification
包含分类的完整链
full_chain = (
{"text": RunnablePassthrough(), "type": classification_prompt | llm | StrOutputParser()}
| branch
)
undefinedfull_chain = (
{"text": RunnablePassthrough(), "type": classification_prompt | llm | StrOutputParser()}
| branch
)
undefinedLLMChain (Legacy)
LLMChain(传统版)
Traditional chain format still supported:
python
from langchain.chains import LLMChain
from langchain_core.prompts import PromptTemplate
prompt = PromptTemplate(
input_variables=["product"],
template="What is a good name for a company that makes {product}?"
)
chain = LLMChain(llm=llm, prompt=prompt)
result = chain.run(product="eco-friendly water bottles")仍受支持的传统链格式:
python
from langchain.chains import LLMChain
from langchain_core.prompts import PromptTemplate
prompt = PromptTemplate(
input_variables=["product"],
template="What is a good name for a company that makes {product}?"
)
chain = LLMChain(llm=llm, prompt=prompt)
result = chain.run(product="eco-friendly water bottles")Stuff Documents Chain
文档填充链
Combine documents into a single context:
python
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain_core.documents import Document
prompt = ChatPromptTemplate.from_template(
"""Answer based on the following context:
<context>
{context}
</context>
Question: {input}"""
)
document_chain = create_stuff_documents_chain(llm, prompt)
docs = [
Document(page_content="LangChain supports multiple LLM providers."),
Document(page_content="Chains can be composed using LCEL.")
]
result = document_chain.invoke({
"input": "What does LangChain support?",
"context": docs
})将多个文档合并为单个上下文:
python
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain_core.documents import Document
prompt = ChatPromptTemplate.from_template(
"""Answer based on the following context:
<context>
{context}
</context>
Question: {input}"""
)
document_chain = create_stuff_documents_chain(llm, prompt)
docs = [
Document(page_content="LangChain supports multiple LLM providers."),
Document(page_content="Chains can be composed using LCEL.")
]
result = document_chain.invoke({
"input": "What does LangChain support?",
"context": docs
})Agents
Agent
ReAct Agents
ReAct Agent
Reasoning and Acting agents that use tools iteratively.
python
from langchain.agents import create_react_agent, AgentExecutor
from langchain_core.tools import Tool
from langchain import hub使用工具迭代执行的推理与行动Agent。
python
from langchain.agents import create_react_agent, AgentExecutor
from langchain_core.tools import Tool
from langchain import hubDefine tools
定义工具
def search_tool(query: str) -> str:
"""Search for information"""
return f"Search results for: {query}"
def calculator_tool(expression: str) -> str:
"""Calculate mathematical expressions"""
try:
return str(eval(expression))
except:
return "Invalid expression"
tools = [
Tool(
name="Search",
func=search_tool,
description="Useful for searching information"
),
Tool(
name="Calculator",
func=calculator_tool,
description="Useful for math calculations"
)
]
def search_tool(query: str) -> str:
"""Search for information"""
return f"Search results for: {query}"
def calculator_tool(expression: str) -> str:
"""Calculate mathematical expressions"""
try:
return str(eval(expression))
except:
return "Invalid expression"
tools = [
Tool(
name="Search",
func=search_tool,
description="Useful for searching information"
),
Tool(
name="Calculator",
func=calculator_tool,
description="Useful for math calculations"
)
]
Create ReAct agent
创建ReAct Agent
prompt = hub.pull("hwchase17/react")
agent = create_react_agent(llm, tools, prompt)
agent_executor = AgentExecutor(
agent=agent,
tools=tools,
verbose=True,
max_iterations=5
)
result = agent_executor.invoke({
"input": "What is 25 * 4, and then search for that number's significance"
})
undefinedprompt = hub.pull("hwchase17/react")
agent = create_react_agent(llm, tools, prompt)
agent_executor = AgentExecutor(
agent=agent,
tools=tools,
verbose=True,
max_iterations=5
)
result = agent_executor.invoke({
"input": "What is 25 * 4, and then search for that number's significance"
})
undefinedLangGraph ReAct Agent
LangGraph ReAct Agent
Modern approach using LangGraph for better control:
python
from langgraph.prebuilt import create_react_agent
from langchain_core.tools import tool
from langgraph.checkpoint.memory import MemorySaver
@tool
def retrieve(query: str) -> str:
"""Retrieve relevant information from the knowledge base"""
# Your retrieval logic here
return f"Retrieved information for: {query}"
@tool
def analyze(text: str) -> str:
"""Analyze text and provide insights"""
return f"Analysis of: {text}"使用LangGraph实现的现代方法,提供更好的控制:
python
from langgraph.prebuilt import create_react_agent
from langchain_core.tools import tool
from langgraph.checkpoint.memory import MemorySaver
@tool
def retrieve(query: str) -> str:
"""Retrieve relevant information from the knowledge base"""
# 此处添加你的检索逻辑
return f"Retrieved information for: {query}"
@tool
def analyze(text: str) -> str:
"""Analyze text and provide insights"""
return f"Analysis of: {text}"Create agent with memory
创建带内存的Agent
memory = MemorySaver()
agent_executor = create_react_agent(
llm,
[retrieve, analyze],
checkpointer=memory
)
memory = MemorySaver()
agent_executor = create_react_agent(
llm,
[retrieve, analyze],
checkpointer=memory
)
Use with configuration
配置使用
config = {"configurable": {"thread_id": "abc123"}}
for chunk in agent_executor.stream(
{"messages": [("user", "Find information about LangChain")]},
config=config
):
print(chunk)
undefinedconfig = {"configurable": {"thread_id": "abc123"}}
for chunk in agent_executor.stream(
{"messages": [("user", "Find information about LangChain")]},
config=config
):
print(chunk)
undefinedConversational ReAct Agent
对话式ReAct Agent
Agent with built-in conversation memory:
python
from langchain.agents import create_conversational_retrieval_agent
from langchain_core.tools import Tool
tools = [
Tool(
name="Knowledge Base",
func=lambda q: f"KB result: {q}",
description="Search the knowledge base"
)
]
conversational_agent = create_conversational_retrieval_agent(
llm,
tools,
verbose=True
)内置对话内存的Agent:
python
from langchain.agents import create_conversational_retrieval_agent
from langchain_core.tools import Tool
tools = [
Tool(
name="Knowledge Base",
func=lambda q: f"KB result: {q}",
description="Search the knowledge base"
)
]
conversational_agent = create_conversational_retrieval_agent(
llm,
tools,
verbose=True
)Maintains conversation context
保持对话上下文
result1 = conversational_agent.invoke({
"input": "What is LangChain?"
})
result2 = conversational_agent.invoke({
"input": "Tell me more about its features"
})
undefinedresult1 = conversational_agent.invoke({
"input": "What is LangChain?"
})
result2 = conversational_agent.invoke({
"input": "Tell me more about its features"
})
undefinedZero-Shot React Agent
零样本ReAct Agent
Agent that works without examples:
python
from langchain.agents import AgentType, initialize_agent, load_tools无需示例即可工作的Agent:
python
from langchain.agents import AgentType, initialize_agent, load_toolsLoad pre-built tools
加载预构建工具
tools = load_tools(["serpapi", "llm-math"], llm=llm)
agent = initialize_agent(
tools,
llm,
agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
verbose=True,
max_iterations=3
)
result = agent.run(
"What is the population of Tokyo and what is that number divided by 2?"
)
undefinedtools = load_tools(["serpapi", "llm-math"], llm=llm)
agent = initialize_agent(
tools,
llm,
agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
verbose=True,
max_iterations=3
)
result = agent.run(
"What is the population of Tokyo and what is that number divided by 2?"
)
undefinedStructured Chat Agent
结构化聊天Agent
Agent that uses structured input/output:
python
from langchain.agents import create_structured_chat_agent使用结构化输入/输出的Agent:
python
from langchain.agents import create_structured_chat_agentDefine tools with structured schemas
定义带结构化模式的工具
from pydantic import BaseModel, Field
class SearchInput(BaseModel):
query: str = Field(description="The search query")
max_results: int = Field(default=5, description="Maximum results")
@tool(args_schema=SearchInput)
def structured_search(query: str, max_results: int = 5) -> str:
"""Search with structured parameters"""
return f"Found {max_results} results for: {query}"
tools = [structured_search]
prompt = hub.pull("hwchase17/structured-chat-agent")
agent = create_structured_chat_agent(llm, tools, prompt)
agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True)
undefinedfrom pydantic import BaseModel, Field
class SearchInput(BaseModel):
query: str = Field(description="The search query")
max_results: int = Field(default=5, description="Maximum results")
@tool(args_schema=SearchInput)
def structured_search(query: str, max_results: int = 5) -> str:
"""Search with structured parameters"""
return f"Found {max_results} results for: {query}"
tools = [structured_search]
prompt = hub.pull("hwchase17/structured-chat-agent")
agent = create_structured_chat_agent(llm, tools, prompt)
agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True)
undefinedTool Calling Agent
工具调用Agent
Modern agent using native tool calling:
python
from langchain_core.tools import tool
@tool
def multiply(a: int, b: int) -> int:
"""Multiply two numbers"""
return a * b
@tool
def search_database(query: str, limit: int = 10) -> str:
"""Search the database"""
return f"Found {limit} results for {query}"使用原生工具调用的现代Agent:
python
from langchain_core.tools import tool
@tool
def multiply(a: int, b: int) -> int:
"""Multiply two numbers"""
return a * b
@tool
def search_database(query: str, limit: int = 10) -> str:
"""Search the database"""
return f"Found {limit} results for {query}"Bind tools to LLM
将工具绑定到LLM
llm_with_tools = llm.bind_tools([multiply, search_database])
llm_with_tools = llm.bind_tools([multiply, search_database])
Create simple tool chain
创建简单工具链
from operator import itemgetter
tool_chain = llm_with_tools | (lambda x: x.tool_calls[0]["args"]) | multiply
result = tool_chain.invoke("What's four times 23")
undefinedfrom operator import itemgetter
tool_chain = llm_with_tools | (lambda x: x.tool_calls[0]["args"]) | multiply
result = tool_chain.invoke("What's four times 23")
undefinedMemory Systems
内存系统
ConversationBufferMemory
ConversationBufferMemory
Store complete conversation history:
python
from langchain.memory import ConversationBufferMemory
from langchain.chains import LLMChain
memory = ConversationBufferMemory(
memory_key="chat_history",
return_messages=True
)
prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful assistant."),
("placeholder", "{chat_history}"),
("human", "{input}")
])
chain = LLMChain(llm=llm, prompt=prompt, memory=memory)存储完整对话历史:
python
from langchain.memory import ConversationBufferMemory
from langchain.chains import LLMChain
memory = ConversationBufferMemory(
memory_key="chat_history",
return_messages=True
)
prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful assistant."),
("placeholder", "{chat_history}"),
("human", "{input}")
])
chain = LLMChain(llm=llm, prompt=prompt, memory=memory)Conversation is automatically stored
对话会自动存储
response1 = chain.run(input="Hi, I'm Alice")
response2 = chain.run(input="What's my name?") # Will remember Alice
undefinedresponse1 = chain.run(input="Hi, I'm Alice")
response2 = chain.run(input="What's my name?") # 会记住Alice
undefinedConversationBufferWindowMemory
ConversationBufferWindowMemory
Keep only recent K interactions:
python
from langchain.memory import ConversationBufferWindowMemory
memory = ConversationBufferWindowMemory(
k=5, # Keep last 5 interactions
memory_key="chat_history",
return_messages=True
)
chain = LLMChain(llm=llm, prompt=prompt, memory=memory)仅保留最近K次交互:
python
from langchain.memory import ConversationBufferWindowMemory
memory = ConversationBufferWindowMemory(
k=5, # 保留最近5次交互
memory_key="chat_history",
return_messages=True
)
chain = LLMChain(llm=llm, prompt=prompt, memory=memory)ConversationSummaryMemory
ConversationSummaryMemory
Summarize conversation history:
python
from langchain.memory import ConversationSummaryMemory
memory = ConversationSummaryMemory(
llm=llm,
memory_key="chat_history",
return_messages=True
)
chain = LLMChain(llm=llm, prompt=prompt, memory=memory)总结对话历史:
python
from langchain.memory import ConversationSummaryMemory
memory = ConversationSummaryMemory(
llm=llm,
memory_key="chat_history",
return_messages=True
)
chain = LLMChain(llm=llm, prompt=prompt, memory=memory)Long conversations are automatically summarized
长对话会自动被总结
for i in range(20):
chain.run(input=f"Tell me fact {i} about AI")
undefinedfor i in range(20):
chain.run(input=f"Tell me fact {i} about AI")
undefinedConversationSummaryBufferMemory
ConversationSummaryBufferMemory
Hybrid approach: recent messages + summary:
python
from langchain.memory import ConversationSummaryBufferMemory
memory = ConversationSummaryBufferMemory(
llm=llm,
max_token_limit=100, # When to trigger summarization
memory_key="chat_history",
return_messages=True
)混合方式:最近消息+总结:
python
from langchain.memory import ConversationSummaryBufferMemory
memory = ConversationSummaryBufferMemory(
llm=llm,
max_token_limit=100, # 触发总结的令牌阈值
memory_key="chat_history",
return_messages=True
)Vector Store Memory
向量存储内存
Semantic search over conversation history:
python
from langchain.memory import VectorStoreRetrieverMemory
from langchain_community.vectorstores import FAISS
from langchain_openai import OpenAIEmbeddings
embeddings = OpenAIEmbeddings()
vectorstore = FAISS.from_texts([], embeddings)
memory = VectorStoreRetrieverMemory(
retriever=vectorstore.as_retriever(search_kwargs={"k": 5})
)对对话历史进行语义搜索:
python
from langchain.memory import VectorStoreRetrieverMemory
from langchain_community.vectorstores import FAISS
from langchain_openai import OpenAIEmbeddings
embeddings = OpenAIEmbeddings()
vectorstore = FAISS.from_texts([], embeddings)
memory = VectorStoreRetrieverMemory(
retriever=vectorstore.as_retriever(search_kwargs={"k": 5})
)Save context
保存上下文
memory.save_context(
{"input": "My favorite color is blue"},
{"output": "That's great!"}
)
memory.save_context(
{"input": "My favorite color is blue"},
{"output": "That's great!"}
)
Retrieve relevant context
检索相关上下文
relevant = memory.load_memory_variables({"input": "What's my favorite color?"})
undefinedrelevant = memory.load_memory_variables({"input": "What's my favorite color?"})
undefinedRecall Memories (LangGraph)
召回内存(LangGraph)
Structured memory with save and search:
python
from langchain_core.vectorstores import InMemoryVectorStore
from langchain_openai import OpenAIEmbeddings
from langchain_core.tools import tool
recall_vector_store = InMemoryVectorStore(OpenAIEmbeddings())
@tool
def save_recall_memory(memory: str) -> str:
"""Save important information to long-term memory"""
recall_vector_store.add_texts([memory])
return f"Saved memory: {memory}"
@tool
def search_recall_memories(query: str) -> str:
"""Search long-term memories"""
docs = recall_vector_store.similarity_search(query, k=3)
return "\n".join([doc.page_content for doc in docs])带保存和搜索功能的结构化内存:
python
from langchain_core.vectorstores import InMemoryVectorStore
from langchain_openai import OpenAIEmbeddings
from langchain_core.tools import tool
recall_vector_store = InMemoryVectorStore(OpenAIEmbeddings())
@tool
def save_recall_memory(memory: str) -> str:
"""Save important information to long-term memory"""
recall_vector_store.add_texts([memory])
return f"Saved memory: {memory}"
@tool
def search_recall_memories(query: str) -> str:
"""Search long-term memories"""
docs = recall_vector_store.similarity_search(query, k=3)
return "\n".join([doc.page_content for doc in docs])Use with agent
与Agent一起使用
from langgraph.prebuilt import create_react_agent
agent = create_react_agent(
llm,
[save_recall_memory, search_recall_memories]
)
undefinedfrom langgraph.prebuilt import create_react_agent
agent = create_react_agent(
llm,
[save_recall_memory, search_recall_memories]
)
undefinedCustom Memory with LangGraph State
自定义内存(LangGraph状态)
Define custom state for memory:
python
from typing import List
from langgraph.graph import MessagesState, StateGraph, START, END
class State(MessagesState):
recall_memories: List[str]
def load_memories(state: State):
"""Load relevant memories before agent processes input"""
messages = state["messages"]
last_message = messages[-1].content if messages else ""
# Search for relevant memories
docs = recall_vector_store.similarity_search(last_message, k=3)
memories = [doc.page_content for doc in docs]
return {"recall_memories": memories}为内存定义自定义状态:
python
from typing import List
from langgraph.graph import MessagesState, StateGraph, START, END
class State(MessagesState):
recall_memories: List[str]
def load_memories(state: State):
"""在Agent处理输入前加载相关内存"""
messages = state["messages"]
last_message = messages[-1].content if messages else ""
# 搜索相关内存
docs = recall_vector_store.similarity_search(last_message, k=3)
memories = [doc.page_content for doc in docs]
return {"recall_memories": memories}Add to graph
添加到图中
builder = StateGraph(State)
builder.add_node(load_memories)
builder.add_edge(START, "load_memories")
undefinedbuilder = StateGraph(State)
builder.add_node(load_memories)
builder.add_edge(START, "load_memories")
undefinedRAG Patterns
RAG模式
Basic RAG Chain
基础RAG链
Fundamental retrieval-augmented generation:
python
from langchain_community.vectorstores import FAISS
from langchain_openai import OpenAIEmbeddings
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough基础的检索增强生成:
python
from langchain_community.vectorstores import FAISS
from langchain_openai import OpenAIEmbeddings
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthroughSetup vector store
设置向量存储
embeddings = OpenAIEmbeddings()
vectorstore = FAISS.from_texts(
[
"LangChain supports multiple LLM providers including OpenAI, Anthropic, and more.",
"Chains can be composed using LangChain Expression Language (LCEL).",
"Agents can use tools to interact with external systems."
],
embedding=embeddings
)
retriever = vectorstore.as_retriever(search_kwargs={"k": 3})
embeddings = OpenAIEmbeddings()
vectorstore = FAISS.from_texts(
[
"LangChain supports multiple LLM providers including OpenAI, Anthropic, and more.",
"Chains can be composed using LangChain Expression Language (LCEL).",
"Agents can use tools to interact with external systems."
],
embedding=embeddings
)
retriever = vectorstore.as_retriever(search_kwargs={"k": 3})
RAG prompt
RAG提示词
template = """Answer the question based only on the following context:
{context}
Question: {question}
"""
prompt = ChatPromptTemplate.from_template(template)
def format_docs(docs):
return "\n\n".join(doc.page_content for doc in docs)
template = """Answer the question based only on the following context:
{context}
Question: {question}
"""
prompt = ChatPromptTemplate.from_template(template)
def format_docs(docs):
return "\n\n".join(doc.page_content for doc in docs)
Build RAG chain
构建RAG链
rag_chain = (
{"context": retriever | format_docs, "question": RunnablePassthrough()}
| prompt
| llm
| StrOutputParser()
)
result = rag_chain.invoke("What does LangChain support?")
undefinedrag_chain = (
{"context": retriever | format_docs, "question": RunnablePassthrough()}
| prompt
| llm
| StrOutputParser()
)
result = rag_chain.invoke("What does LangChain support?")
undefinedRAG with Retrieval Chain
带检索链的RAG
Using built-in retrieval chain constructor:
python
from langchain.chains import create_retrieval_chain
from langchain.chains.combine_documents import create_stuff_documents_chain
prompt = ChatPromptTemplate.from_template(
"""Answer based on the context:
<context>
{context}
</context>
Question: {input}"""
)
document_chain = create_stuff_documents_chain(llm, prompt)
retrieval_chain = create_retrieval_chain(retriever, document_chain)
response = retrieval_chain.invoke({
"input": "What is LCEL?"
})使用内置检索链构造器:
python
from langchain.chains import create_retrieval_chain
from langchain.chains.combine_documents import create_stuff_documents_chain
prompt = ChatPromptTemplate.from_template(
"""Answer based on the context:
<context>
{context}
</context>
Question: {input}"""
)
document_chain = create_stuff_documents_chain(llm, prompt)
retrieval_chain = create_retrieval_chain(retriever, document_chain)
response = retrieval_chain.invoke({
"input": "What is LCEL?"
})Returns: {"input": "...", "context": [...], "answer": "..."}
返回结果:{"input": "...", "context": [...], "answer": "..."}
undefinedundefinedRAG with Chat History
带对话历史的RAG
Conversational RAG with context:
python
from langchain.chains import create_history_aware_retriever
from langchain_core.prompts import MessagesPlaceholder
contextualize_prompt = ChatPromptTemplate.from_messages([
("system", "Given a chat history and the latest user question, "
"formulate a standalone question which can be understood "
"without the chat history."),
MessagesPlaceholder("chat_history"),
("human", "{input}")
])
history_aware_retriever = create_history_aware_retriever(
llm,
retriever,
contextualize_prompt
)带上下文的对话式RAG:
python
from langchain.chains import create_history_aware_retriever
from langchain_core.prompts import MessagesPlaceholder
contextualize_prompt = ChatPromptTemplate.from_messages([
("system", "Given a chat history and the latest user question, "
"formulate a standalone question which can be understood "
"without the chat history."),
MessagesPlaceholder("chat_history"),
("human", "{input}")
])
history_aware_retriever = create_history_aware_retriever(
llm,
retriever,
contextualize_prompt
)Use in RAG chain
在RAG链中使用
qa_chain = create_retrieval_chain(
history_aware_retriever,
document_chain
)
qa_chain = create_retrieval_chain(
history_aware_retriever,
document_chain
)
First question
第一个问题
result1 = qa_chain.invoke({
"input": "What is LangChain?",
"chat_history": []
})
result1 = qa_chain.invoke({
"input": "What is LangChain?",
"chat_history": []
})
Follow-up with context
带上下文的跟进问题
result2 = qa_chain.invoke({
"input": "What are its main features?",
"chat_history": [
("human", "What is LangChain?"),
("ai", result1["answer"])
]
})
undefinedresult2 = qa_chain.invoke({
"input": "What are its main features?",
"chat_history": [
("human", "What is LangChain?"),
("ai", result1["answer"])
]
})
undefinedMulti-Query RAG
多查询RAG
Generate multiple search queries for better retrieval:
python
from langchain.retrievers.multi_query import MultiQueryRetriever
multi_query_retriever = MultiQueryRetriever.from_llm(
retriever=vectorstore.as_retriever(),
llm=llm
)生成多个搜索查询以提升检索效果:
python
from langchain.retrievers.multi_query import MultiQueryRetriever
multi_query_retriever = MultiQueryRetriever.from_llm(
retriever=vectorstore.as_retriever(),
llm=llm
)Automatically generates multiple query variations
自动生成多个查询变体
rag_chain = (
{"context": multi_query_retriever | format_docs, "question": RunnablePassthrough()}
| prompt
| llm
| StrOutputParser()
)
undefinedrag_chain = (
{"context": multi_query_retriever | format_docs, "question": RunnablePassthrough()}
| prompt
| llm
| StrOutputParser()
)
undefinedRAG with Reranking
带重排序的RAG
Improve relevance with reranking:
python
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import FlashrankRerank通过重排序提升相关性:
python
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import FlashrankRerankSetup reranker
设置重排序器
compressor = FlashrankRerank()
compression_retriever = ContextualCompressionRetriever(
base_compressor=compressor,
base_retriever=retriever
)
compressor = FlashrankRerank()
compression_retriever = ContextualCompressionRetriever(
base_compressor=compressor,
base_retriever=retriever
)
Use in RAG chain
在RAG链中使用
rag_chain = (
{"context": compression_retriever | format_docs, "question": RunnablePassthrough()}
| prompt
| llm
| StrOutputParser()
)
undefinedrag_chain = (
{"context": compression_retriever | format_docs, "question": RunnablePassthrough()}
| prompt
| llm
| StrOutputParser()
)
undefinedParent Document Retrieval
父文档检索
Retrieve larger parent documents for full context:
python
from langchain.retrievers import ParentDocumentRetriever
from langchain.storage import InMemoryStore
from langchain_text_splitters import RecursiveCharacterTextSplitter检索更大的父文档以获取完整上下文:
python
from langchain.retrievers import ParentDocumentRetriever
from langchain.storage import InMemoryStore
from langchain_text_splitters import RecursiveCharacterTextSplitterStorage for parent documents
父文档存储
store = InMemoryStore()
store = InMemoryStore()
Splitters
分割器
child_splitter = RecursiveCharacterTextSplitter(chunk_size=400)
parent_splitter = RecursiveCharacterTextSplitter(chunk_size=2000)
parent_retriever = ParentDocumentRetriever(
vectorstore=vectorstore,
docstore=store,
child_splitter=child_splitter,
parent_splitter=parent_splitter,
)
child_splitter = RecursiveCharacterTextSplitter(chunk_size=400)
parent_splitter = RecursiveCharacterTextSplitter(chunk_size=2000)
parent_retriever = ParentDocumentRetriever(
vectorstore=vectorstore,
docstore=store,
child_splitter=child_splitter,
parent_splitter=parent_splitter,
)
Add documents
添加文档
parent_retriever.add_documents(documents)
undefinedparent_retriever.add_documents(documents)
undefinedSelf-Query Retrieval
自查询检索
Natural language to structured queries:
python
from langchain.retrievers.self_query.base import SelfQueryRetriever
from langchain.chains.query_constructor.base import AttributeInfo
metadata_field_info = [
AttributeInfo(
name="source",
description="The document source",
type="string",
),
AttributeInfo(
name="page",
description="The page number",
type="integer",
),
]
document_content_description = "Technical documentation"
self_query_retriever = SelfQueryRetriever.from_llm(
llm,
vectorstore,
document_content_description,
metadata_field_info,
)自然语言转结构化查询:
python
from langchain.retrievers.self_query.base import SelfQueryRetriever
from langchain.chains.query_constructor.base import AttributeInfo
metadata_field_info = [
AttributeInfo(
name="source",
description="The document source",
type="string",
),
AttributeInfo(
name="page",
description="The page number",
type="integer",
),
]
document_content_description = "Technical documentation"
self_query_retriever = SelfQueryRetriever.from_llm(
llm,
vectorstore,
document_content_description,
metadata_field_info,
)LLM Integrations
LLM集成
OpenAI Integration
OpenAI集成
python
from langchain_openai import ChatOpenAI, OpenAIpython
from langchain_openai import ChatOpenAI, OpenAIChat model
聊天模型
chat_model = ChatOpenAI(
model="gpt-4o-mini",
temperature=0.7,
max_tokens=500,
api_key="your-api-key"
)
chat_model = ChatOpenAI(
model="gpt-4o-mini",
temperature=0.7,
max_tokens=500,
api_key="your-api-key"
)
Completion model
补全模型
completion_model = OpenAI(
model="gpt-3.5-turbo-instruct",
temperature=0.9
)
undefinedcompletion_model = OpenAI(
model="gpt-3.5-turbo-instruct",
temperature=0.9
)
undefinedAnthropic Claude Integration
Anthropic Claude集成
python
from langchain_anthropic import ChatAnthropic
claude = ChatAnthropic(
model="claude-3-5-sonnet-20241022",
temperature=0,
max_tokens=1024,
api_key="your-api-key"
)python
from langchain_anthropic import ChatAnthropic
claude = ChatAnthropic(
model="claude-3-5-sonnet-20241022",
temperature=0,
max_tokens=1024,
api_key="your-api-key"
)HuggingFace Integration
HuggingFace集成
python
from langchain_huggingface import HuggingFaceEndpoint
llm = HuggingFaceEndpoint(
repo_id="meta-llama/Llama-2-7b-chat-hf",
huggingfacehub_api_token="your-token",
task="text-generation",
temperature=0.7
)python
from langchain_huggingface import HuggingFaceEndpoint
llm = HuggingFaceEndpoint(
repo_id="meta-llama/Llama-2-7b-chat-hf",
huggingfacehub_api_token="your-token",
task="text-generation",
temperature=0.7
)Google Vertex AI Integration
Google Vertex AI集成
python
from langchain_google_vertexai import ChatVertexAI, VertexAIpython
from langchain_google_vertexai import ChatVertexAI, VertexAIChat model
聊天模型
chat_model = ChatVertexAI(
model_name="chat-bison",
temperature=0
)
chat_model = ChatVertexAI(
model_name="chat-bison",
temperature=0
)
Completion model
补全模型
completion_model = VertexAI(
model_name="gemini-1.0-pro-002"
)
undefinedcompletion_model = VertexAI(
model_name="gemini-1.0-pro-002"
)
undefinedOllama Local Models
Ollama本地模型
python
from langchain_community.llms import Ollama
llm = Ollama(
model="llama2",
temperature=0.8
)python
from langchain_community.llms import Ollama
llm = Ollama(
model="llama2",
temperature=0.8
)Binding Tools to LLMs
将工具绑定到LLM
python
from langchain_core.tools import tool
@tool
def multiply(a: int, b: int) -> int:
"""Multiply two numbers together"""
return a * bpython
from langchain_core.tools import tool
@tool
def multiply(a: int, b: int) -> int:
"""Multiply two numbers together"""
return a * bBind tools to model
将工具绑定到模型
llm_with_tools = llm.bind_tools([multiply])
llm_with_tools = llm.bind_tools([multiply])
Model will return tool calls
模型会返回工具调用指令
response = llm_with_tools.invoke("What is 3 times 4?")
print(response.tool_calls)
undefinedresponse = llm_with_tools.invoke("What is 3 times 4?")
print(response.tool_calls)
undefinedCallbacks & Monitoring
回调与监控
Standard Callbacks
标准回调
Track chain execution:
python
from langchain_core.callbacks import StdOutCallbackHandler
from langchain.callbacks import get_openai_callback跟踪链的执行:
python
from langchain_core.callbacks import StdOutCallbackHandler
from langchain.callbacks import get_openai_callbackStandard output callback
标准输出回调
callbacks = [StdOutCallbackHandler()]
chain = prompt | llm | StrOutputParser()
result = chain.invoke(
{"topic": "AI"},
config={"callbacks": callbacks}
)
callbacks = [StdOutCallbackHandler()]
chain = prompt | llm | StrOutputParser()
result = chain.invoke(
{"topic": "AI"},
config={"callbacks": callbacks}
)
OpenAI cost tracking
OpenAI成本跟踪
with get_openai_callback() as cb:
result = chain.invoke({"topic": "AI"})
print(f"Total Tokens: {cb.total_tokens}")
print(f"Total Cost: ${cb.total_cost}")
undefinedwith get_openai_callback() as cb:
result = chain.invoke({"topic": "AI"})
print(f"Total Tokens: {cb.total_tokens}")
print(f"Total Cost: ${cb.total_cost}")
undefinedCustom Callbacks
自定义回调
Create custom callback handlers:
python
from langchain_core.callbacks import BaseCallbackHandler
from typing import Any, Dict
class MyCustomCallback(BaseCallbackHandler):
def on_llm_start(self, serialized: Dict[str, Any], prompts: list[str], **kwargs):
print(f"LLM started with prompts: {prompts}")
def on_llm_end(self, response, **kwargs):
print(f"LLM finished with response: {response}")
def on_chain_start(self, serialized: Dict[str, Any], inputs: Dict[str, Any], **kwargs):
print(f"Chain started with inputs: {inputs}")
def on_chain_end(self, outputs: Dict[str, Any], **kwargs):
print(f"Chain ended with outputs: {outputs}")
def on_tool_start(self, serialized: Dict[str, Any], input_str: str, **kwargs):
print(f"Tool started with input: {input_str}")
def on_tool_end(self, output: str, **kwargs):
print(f"Tool ended with output: {output}")创建自定义回调处理器:
python
from langchain_core.callbacks import BaseCallbackHandler
from typing import Any, Dict
class MyCustomCallback(BaseCallbackHandler):
def on_llm_start(self, serialized: Dict[str, Any], prompts: list[str], **kwargs):
print(f"LLM started with prompts: {prompts}")
def on_llm_end(self, response, **kwargs):
print(f"LLM finished with response: {response}")
def on_chain_start(self, serialized: Dict[str, Any], inputs: Dict[str, Any], **kwargs):
print(f"Chain started with inputs: {inputs}")
def on_chain_end(self, outputs: Dict[str, Any], **kwargs):
print(f"Chain ended with outputs: {outputs}")
def on_tool_start(self, serialized: Dict[str, Any], input_str: str, **kwargs):
print(f"Tool started with input: {input_str}")
def on_tool_end(self, output: str, **kwargs):
print(f"Tool ended with output: {output}")Use custom callback
使用自定义回调
custom_callback = MyCustomCallback()
result = chain.invoke(
{"topic": "AI"},
config={"callbacks": [custom_callback]}
)
undefinedcustom_callback = MyCustomCallback()
result = chain.invoke(
{"topic": "AI"},
config={"callbacks": [custom_callback]}
)
undefinedArgilla Callback
Argilla回调
Track and log to Argilla:
python
from langchain_community.callbacks import ArgillaCallbackHandler
argilla_callback = ArgillaCallbackHandler(
dataset_name="langchain-dataset",
api_url="http://localhost:6900",
api_key="your-api-key"
)
callbacks = [argilla_callback]
agent = initialize_agent(
tools,
llm,
agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
callbacks=callbacks
)
agent.run("Who was the first president of the United States?")跟踪并记录到Argilla:
python
from langchain_community.callbacks import ArgillaCallbackHandler
argilla_callback = ArgillaCallbackHandler(
dataset_name="langchain-dataset",
api_url="http://localhost:6900",
api_key="your-api-key"
)
callbacks = [argilla_callback]
agent = initialize_agent(
tools,
llm,
agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
callbacks=callbacks
)
agent.run("Who was the first president of the United States?")UpTrain Callback
UpTrain回调
RAG evaluation and monitoring:
python
from langchain_community.callbacks import UpTrainCallbackHandler
uptrain_callback = UpTrainCallbackHandler(
key_type="uptrain",
api_key="your-api-key"
)
config = {"callbacks": [uptrain_callback]}RAG评估与监控:
python
from langchain_community.callbacks import UpTrainCallbackHandler
uptrain_callback = UpTrainCallbackHandler(
key_type="uptrain",
api_key="your-api-key"
)
config = {"callbacks": [uptrain_callback]}Automatically evaluates context relevance, factual accuracy, completeness
自动评估上下文相关性、事实准确性、完整性
result = rag_chain.invoke("What is LangChain?", config=config)
undefinedresult = rag_chain.invoke("What is LangChain?", config=config)
undefinedLangSmith Integration
LangSmith集成
Production monitoring and debugging:
python
import os生产环境监控与调试:
python
import osSet environment variables
设置环境变量
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-langsmith-key"
os.environ["LANGCHAIN_PROJECT"] = "my-project"
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-langsmith-key"
os.environ["LANGCHAIN_PROJECT"] = "my-project"
All chains automatically traced
所有链都会自动被跟踪
result = chain.invoke({"topic": "AI"})
result = chain.invoke({"topic": "AI"})
View traces at smith.langchain.com
在smith.langchain.com查看跟踪记录
undefinedundefinedRetrieval Strategies
检索策略
Vector Store Retrievers
向量存储检索器
Basic similarity search:
python
from langchain_community.vectorstores import FAISS, Chroma, Pinecone基础相似度搜索:
python
from langchain_community.vectorstores import FAISS, Chroma, PineconeFAISS
FAISS
faiss_retriever = vectorstore.as_retriever(
search_type="similarity",
search_kwargs={"k": 5}
)
faiss_retriever = vectorstore.as_retriever(
search_type="similarity",
search_kwargs={"k": 5}
)
Maximum Marginal Relevance (MMR)
最大边际相关性(MMR)
mmr_retriever = vectorstore.as_retriever(
search_type="mmr",
search_kwargs={"k": 5, "fetch_k": 20, "lambda_mult": 0.5}
)
mmr_retriever = vectorstore.as_retriever(
search_type="mmr",
search_kwargs={"k": 5, "fetch_k": 20, "lambda_mult": 0.5}
)
Similarity with threshold
带阈值的相似度搜索
threshold_retriever = vectorstore.as_retriever(
search_type="similarity_score_threshold",
search_kwargs={"score_threshold": 0.8, "k": 5}
)
undefinedthreshold_retriever = vectorstore.as_retriever(
search_type="similarity_score_threshold",
search_kwargs={"score_threshold": 0.8, "k": 5}
)
undefinedEnsemble Retriever
集成检索器
Combine multiple retrievers:
python
from langchain.retrievers import EnsembleRetriever
from langchain_community.retrievers import BM25Retriever组合多个检索器:
python
from langchain.retrievers import EnsembleRetriever
from langchain_community.retrievers import BM25RetrieverBM25 for keyword search
BM25关键词搜索
bm25_retriever = BM25Retriever.from_texts(texts)
bm25_retriever.k = 5
bm25_retriever = BM25Retriever.from_texts(texts)
bm25_retriever.k = 5
Combine with vector search
与向量搜索组合
ensemble_retriever = EnsembleRetriever(
retrievers=[bm25_retriever, faiss_retriever],
weights=[0.5, 0.5]
)
docs = ensemble_retriever.get_relevant_documents("LangChain features")
undefinedensemble_retriever = EnsembleRetriever(
retrievers=[bm25_retriever, faiss_retriever],
weights=[0.5, 0.5]
)
docs = ensemble_retriever.get_relevant_documents("LangChain features")
undefinedTime-Weighted Retriever
时间加权检索器
Prioritize recent documents:
python
from langchain.retrievers import TimeWeightedVectorStoreRetriever
retriever = TimeWeightedVectorStoreRetriever(
vectorstore=vectorstore,
decay_rate=0.01, # Decay factor for older docs
k=5
)优先考虑近期文档:
python
from langchain.retrievers import TimeWeightedVectorStoreRetriever
retriever = TimeWeightedVectorStoreRetriever(
vectorstore=vectorstore,
decay_rate=0.01, # 旧文档的衰减因子
k=5
)Multi-Vector Retriever
多向量检索器
Multiple vectors per document:
python
from langchain.retrievers.multi_vector import MultiVectorRetriever
from langchain.storage import InMemoryByteStore
store = InMemoryByteStore()
retriever = MultiVectorRetriever(
vectorstore=vectorstore,
byte_store=store,
id_key="doc_id"
)每个文档对应多个向量:
python
from langchain.retrievers.multi_vector import MultiVectorRetriever
from langchain.storage import InMemoryByteStore
store = InMemoryByteStore()
retriever = MultiVectorRetriever(
vectorstore=vectorstore,
byte_store=store,
id_key="doc_id"
)Add documents with multiple representations
添加带多种表示形式的文档
retriever.add_documents(documents)
undefinedretriever.add_documents(documents)
undefinedStreaming
流式传输
Stream Chain Output
流式输出链结果
Stream tokens as they're generated:
python
from langchain_core.output_parsers import StrOutputParser
chain = prompt | llm | StrOutputParser()在生成时流式传输令牌:
python
from langchain_core.output_parsers import StrOutputParser
chain = prompt | llm | StrOutputParser()Stream method
流式方法
for chunk in chain.stream({"topic": "AI"}):
print(chunk, end="", flush=True)
undefinedfor chunk in chain.stream({"topic": "AI"}):
print(chunk, end="", flush=True)
undefinedStream with Callbacks
带回调的流式传输
Handle streaming events:
python
from langchain_core.callbacks import StreamingStdOutCallbackHandler
streaming_llm = ChatOpenAI(
streaming=True,
callbacks=[StreamingStdOutCallbackHandler()]
)
chain = prompt | streaming_llm | StrOutputParser()
result = chain.invoke({"topic": "AI"}) # Streams to stdout处理流式事件:
python
from langchain_core.callbacks import StreamingStdOutCallbackHandler
streaming_llm = ChatOpenAI(
streaming=True,
callbacks=[StreamingStdOutCallbackHandler()]
)
chain = prompt | streaming_llm | StrOutputParser()
result = chain.invoke({"topic": "AI"}) # 流式输出到标准输出Async Streaming
异步流式传输
Stream asynchronously:
python
async def stream_async():
async for chunk in chain.astream({"topic": "AI"}):
print(chunk, end="", flush=True)异步流式传输:
python
async def stream_async():
async for chunk in chain.astream({"topic": "AI"}):
print(chunk, end="", flush=True)Run async
运行异步函数
import asyncio
asyncio.run(stream_async())
undefinedimport asyncio
asyncio.run(stream_async())
undefinedStream Agent Responses
流式Agent响应
Stream agent execution:
python
from langgraph.prebuilt import create_react_agent
agent = create_react_agent(llm, tools)
for chunk in agent.stream(
{"messages": [("user", "Search for LangChain information")]},
stream_mode="values"
):
chunk["messages"][-1].pretty_print()流式传输Agent的执行过程:
python
from langgraph.prebuilt import create_react_agent
agent = create_react_agent(llm, tools)
for chunk in agent.stream(
{"messages": [("user", "Search for LangChain information")]},
stream_mode="values"
):
chunk["messages"][-1].pretty_print()Streaming RAG
流式RAG
Stream RAG responses:
python
retrieval_chain = (
{
"context": retriever.with_config(run_name="Docs"),
"question": RunnablePassthrough(),
}
| prompt
| llm
| StrOutputParser()
)流式传输RAG响应:
python
retrieval_chain = (
{
"context": retriever.with_config(run_name="Docs"),
"question": RunnablePassthrough(),
}
| prompt
| llm
| StrOutputParser()
)Stream the response
流式输出响应
for chunk in retrieval_chain.stream("What is LangChain?"):
print(chunk, end="", flush=True)
undefinedfor chunk in retrieval_chain.stream("What is LangChain?"):
print(chunk, end="", flush=True)
undefinedError Handling
错误处理
Retry Logic
重试逻辑
Automatic retries on failure:
python
from langchain_core.runnables import RunnableRetry失败时自动重试:
python
from langchain_core.runnables import RunnableRetryAdd retry to chain
为链添加重试机制
chain_with_retry = (prompt | llm | StrOutputParser()).with_retry(
stop_after_attempt=3,
wait_exponential_jitter=True
)
result = chain_with_retry.invoke({"topic": "AI"})
undefinedchain_with_retry = (prompt | llm | StrOutputParser()).with_retry(
stop_after_attempt=3,
wait_exponential_jitter=True
)
result = chain_with_retry.invoke({"topic": "AI"})
undefinedFallback Chains
回退链
Use fallback on errors:
python
from langchain_core.runnables import RunnableWithFallbacks
primary_llm = ChatOpenAI(model="gpt-4")
fallback_llm = ChatOpenAI(model="gpt-3.5-turbo")
chain_with_fallback = (prompt | primary_llm).with_fallbacks(
[prompt | fallback_llm]
)
result = chain_with_fallback.invoke({"topic": "AI"})出错时使用回退方案:
python
from langchain_core.runnables import RunnableWithFallbacks
primary_llm = ChatOpenAI(model="gpt-4")
fallback_llm = ChatOpenAI(model="gpt-3.5-turbo")
chain_with_fallback = (prompt | primary_llm).with_fallbacks(
[prompt | fallback_llm]
)
result = chain_with_fallback.invoke({"topic": "AI"})Try-Except Patterns
异常捕获模式
Manual error handling:
python
from langchain_core.exceptions import OutputParserException
try:
result = chain.invoke({"topic": "AI"})
except OutputParserException as e:
print(f"Parsing failed: {e}")
result = chain.invoke({"topic": "AI"}) # Retry
except Exception as e:
print(f"Chain execution failed: {e}")
result = None手动处理错误:
python
from langchain_core.exceptions import OutputParserException
try:
result = chain.invoke({"topic": "AI"})
except OutputParserException as e:
print(f"Parsing failed: {e}")
result = chain.invoke({"topic": "AI"}) # 重试
except Exception as e:
print(f"Chain execution failed: {e}")
result = NoneTimeout Handling
超时处理
Set execution timeouts:
python
from langchain_core.runnables import RunnableConfig
config = RunnableConfig(timeout=10.0) # 10 seconds
try:
result = chain.invoke({"topic": "AI"}, config=config)
except TimeoutError:
print("Chain execution timed out")设置执行超时:
python
from langchain_core.runnables import RunnableConfig
config = RunnableConfig(timeout=10.0) # 10秒
try:
result = chain.invoke({"topic": "AI"}, config=config)
except TimeoutError:
print("Chain execution timed out")Validation
验证
Validate inputs and outputs:
python
from pydantic import BaseModel, Field, validator
class QueryInput(BaseModel):
topic: str = Field(..., min_length=1, max_length=100)
@validator("topic")
def topic_must_be_valid(cls, v):
if not v.strip():
raise ValueError("Topic cannot be empty")
return v.strip()验证输入和输出:
python
from pydantic import BaseModel, Field, validator
class QueryInput(BaseModel):
topic: str = Field(..., min_length=1, max_length=100)
@validator("topic")
def topic_must_be_valid(cls, v):
if not v.strip():
raise ValueError("Topic cannot be empty")
return v.strip()Use with chain
与链一起使用
def validate_and_invoke(topic: str):
try:
validated = QueryInput(topic=topic)
return chain.invoke({"topic": validated.topic})
except ValueError as e:
return f"Validation error: {e}"
undefineddef validate_and_invoke(topic: str):
try:
validated = QueryInput(topic=topic)
return chain.invoke({"topic": validated.topic})
except ValueError as e:
return f"Validation error: {e}"
undefinedProduction Best Practices
生产环境最佳实践
Environment Configuration
环境配置
Manage secrets securely:
python
import os
from dotenv import load_dotenv
load_dotenv()安全管理密钥:
python
import os
from dotenv import load_dotenv
load_dotenv()Use environment variables
使用环境变量
llm = ChatOpenAI(
api_key=os.getenv("OPENAI_API_KEY"),
model=os.getenv("MODEL_NAME", "gpt-4o-mini")
)
llm = ChatOpenAI(
api_key=os.getenv("OPENAI_API_KEY"),
model=os.getenv("MODEL_NAME", "gpt-4o-mini")
)
Vector store configuration
向量存储配置
VECTOR_STORE_TYPE = os.getenv("VECTOR_STORE", "faiss")
EMBEDDING_MODEL = os.getenv("EMBEDDING_MODEL", "text-embedding-3-small")
undefinedVECTOR_STORE_TYPE = os.getenv("VECTOR_STORE", "faiss")
EMBEDDING_MODEL = os.getenv("EMBEDDING_MODEL", "text-embedding-3-small")
undefinedCaching
缓存
Cache LLM responses:
python
from langchain.cache import InMemoryCache, SQLiteCache
from langchain.globals import set_llm_cache缓存LLM响应:
python
from langchain.cache import InMemoryCache, SQLiteCache
from langchain.globals import set_llm_cacheIn-memory cache
内存缓存
set_llm_cache(InMemoryCache())
set_llm_cache(InMemoryCache())
Persistent cache
持久化缓存
set_llm_cache(SQLiteCache(database_path=".langchain.db"))
set_llm_cache(SQLiteCache(database_path=".langchain.db"))
Responses are cached automatically
响应会自动被缓存
result1 = llm.invoke("What is AI?") # Calls API
result2 = llm.invoke("What is AI?") # Uses cache
undefinedresult1 = llm.invoke("What is AI?") # 调用API
result2 = llm.invoke("What is AI?") # 使用缓存
undefinedRate Limiting
速率限制
Control API usage:
python
from langchain_core.rate_limiters import InMemoryRateLimiter
rate_limiter = InMemoryRateLimiter(
requests_per_second=1,
check_every_n_seconds=0.1,
max_bucket_size=10
)
llm = ChatOpenAI(rate_limiter=rate_limiter)控制API使用频率:
python
from langchain_core.rate_limiters import InMemoryRateLimiter
rate_limiter = InMemoryRateLimiter(
requests_per_second=1,
check_every_n_seconds=0.1,
max_bucket_size=10
)
llm = ChatOpenAI(rate_limiter=rate_limiter)Batch Processing
批量处理
Process multiple inputs efficiently:
python
undefined高效处理多个输入:
python
undefinedBatch invoke
批量调用
inputs = [{"topic": f"Topic {i}"} for i in range(10)]
results = chain.batch(inputs, config={"max_concurrency": 5})
inputs = [{"topic": f"Topic {i}"} for i in range(10)]
results = chain.batch(inputs, config={"max_concurrency": 5})
Async batch
异步批量处理
async def batch_process():
results = await chain.abatch(inputs)
return results
undefinedasync def batch_process():
results = await chain.abatch(inputs)
return results
undefinedMonitoring and Logging
监控与日志
Production monitoring:
python
import logging
from langchain_core.callbacks import BaseCallbackHandler生产环境监控:
python
import logging
from langchain_core.callbacks import BaseCallbackHandlerSetup logging
设置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(name)
class ProductionCallback(BaseCallbackHandler):
def on_chain_start(self, serialized, inputs, **kwargs):
logger.info(f"Chain started: {serialized.get('name', 'unknown')}")
def on_chain_end(self, outputs, **kwargs):
logger.info(f"Chain completed successfully")
def on_chain_error(self, error, **kwargs):
logger.error(f"Chain error: {error}")logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(name)
class ProductionCallback(BaseCallbackHandler):
def on_chain_start(self, serialized, inputs, **kwargs):
logger.info(f"Chain started: {serialized.get('name', 'unknown')}")
def on_chain_end(self, outputs, **kwargs):
logger.info(f"Chain completed successfully")
def on_chain_error(self, error, **kwargs):
logger.error(f"Chain error: {error}")Use in production
生产环境使用
production_callback = ProductionCallback()
config = {"callbacks": [production_callback]}
undefinedproduction_callback = ProductionCallback()
config = {"callbacks": [production_callback]}
undefinedTesting Chains
测试链
Unit test your chains:
python
import pytest
from langchain_core.messages import HumanMessage, AIMessage
def test_basic_chain():
chain = prompt | llm | StrOutputParser()
result = chain.invoke({"topic": "testing"})
assert isinstance(result, str)
assert len(result) > 0
def test_rag_chain():
result = rag_chain.invoke("What is LangChain?")
assert "LangChain" in result
assert len(result) > 50
@pytest.mark.asyncio
async def test_async_chain():
result = await chain.ainvoke({"topic": "async"})
assert isinstance(result, str)对链进行单元测试:
python
import pytest
from langchain_core.messages import HumanMessage, AIMessage
def test_basic_chain():
chain = prompt | llm | StrOutputParser()
result = chain.invoke({"topic": "testing"})
assert isinstance(result, str)
assert len(result) > 0
def test_rag_chain():
result = rag_chain.invoke("What is LangChain?")
assert "LangChain" in result
assert len(result) > 50
@pytest.mark.asyncio
async def test_async_chain():
result = await chain.ainvoke({"topic": "async"})
assert isinstance(result, str)Performance Optimization
性能优化
Optimize chain execution:
python
undefined优化链的执行:
python
undefinedUse appropriate chunk sizes for text splitting
使用合适的文本分块大小
from langchain_text_splitters import RecursiveCharacterTextSplitter
splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
length_function=len
)
from langchain_text_splitters import RecursiveCharacterTextSplitter
splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
length_function=len
)
Limit retrieval results
限制检索结果数量
retriever = vectorstore.as_retriever(search_kwargs={"k": 3})
retriever = vectorstore.as_retriever(search_kwargs={"k": 3})
Use smaller, faster models where appropriate
在合适的场景使用更小、更快的模型
fast_llm = ChatOpenAI(model="gpt-4o-mini")
fast_llm = ChatOpenAI(model="gpt-4o-mini")
Enable streaming for better UX
启用流式传输以提升用户体验
streaming_chain = prompt | fast_llm.with_streaming() | StrOutputParser()
undefinedstreaming_chain = prompt | fast_llm.with_streaming() | StrOutputParser()
undefinedDocumentation
文档编写
Document your chains:
python
from langchain_core.runnables import RunnableConfig
class DocumentedChain:
"""
Production RAG chain for technical documentation.
Features:
- Multi-query retrieval for better coverage
- Reranking for improved relevance
- Streaming support
- Error handling with fallbacks
Usage:
chain = DocumentedChain()
result = chain.invoke("Your question here")
"""
def __init__(self):
self.llm = ChatOpenAI(model="gpt-4o-mini")
self.retriever = self._setup_retriever()
self.chain = self._build_chain()
def _setup_retriever(self):
# Setup logic
pass
def _build_chain(self):
# Chain construction
pass
def invoke(self, query: str, config: RunnableConfig = None):
"""Execute the chain with error handling"""
try:
return self.chain.invoke(query, config=config)
except Exception as e:
logger.error(f"Chain execution failed: {e}")
raise为你的链编写文档:
python
from langchain_core.runnables import RunnableConfig
class DocumentedChain:
"""
用于技术文档的生产级RAG链。
特性:
- 多查询检索以提升覆盖范围
- 重排序以提升相关性
- 流式传输支持
- 带回退的错误处理
使用方法:
chain = DocumentedChain()
result = chain.invoke("Your question here")
"""
def __init__(self):
self.llm = ChatOpenAI(model="gpt-4o-mini")
self.retriever = self._setup_retriever()
self.chain = self._build_chain()
def _setup_retriever(self):
# 设置逻辑
pass
def _build_chain(self):
# 链构建逻辑
pass
def invoke(self, query: str, config: RunnableConfig = None):
"""带错误处理的链执行"""
try:
return self.chain.invoke(query, config=config)
except Exception as e:
logger.error(f"Chain execution failed: {e}")
raiseSummary
总结
This skill covers comprehensive LangChain orchestration patterns:
- Chains: Sequential, map-reduce, router, conditional chains
- Agents: ReAct, conversational, zero-shot, structured agents
- Memory: Buffer, window, summary, vector store memory
- RAG: Basic, multi-query, reranking, parent document retrieval
- LLM Integration: OpenAI, Anthropic, HuggingFace, Vertex AI, Ollama
- Callbacks: Standard, custom, Argilla, UpTrain, LangSmith
- Retrieval: Vector store, ensemble, time-weighted, multi-vector
- Streaming: Chain, agent, async streaming
- Error Handling: Retry, fallback, timeout, validation
- Production: Configuration, caching, rate limiting, monitoring, testing
For more examples and patterns, see EXAMPLES.md.
本技能指南涵盖了全面的LangChain编排模式:
- 链:顺序链、映射-归约链、路由链、条件链
- Agent:ReAct、对话式、零样本、结构化Agent
- 内存:缓冲、窗口、总结、向量存储内存
- RAG:基础、多查询、重排序、父文档检索
- LLM集成:OpenAI、Anthropic、HuggingFace、Vertex AI、Ollama
- 回调:标准、自定义、Argilla、UpTrain、LangSmith
- 检索:向量存储、集成、时间加权、多向量
- 流式传输:链、Agent、异步流式传输
- 错误处理:重试、回退、超时、验证
- 生产环境:配置、缓存、速率限制、监控、测试
更多示例和模式,请查看EXAMPLES.md。