langchain-orchestration

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

LangChain Orchestration Skill

LangChain编排技能指南

Complete guide for building production-grade LLM applications with LangChain, covering chains, agents, memory, RAG patterns, and advanced orchestration techniques.
使用LangChain构建生产级LLM应用的完整指南,涵盖链、Agent、内存、RAG模式和高级编排技术。

Table of Contents

目录

Core Concepts

核心概念

LangChain Expression Language (LCEL)

LangChain表达式语言(LCEL)

LCEL is the declarative way to compose chains in LangChain, enabling streaming, async, and parallel execution.
python
from langchain_core.runnables import RunnablePassthrough
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI
LCEL是LangChain中用于组合链的声明式方式,支持流式传输、异步和并行执行。
python
from langchain_core.runnables import RunnablePassthrough
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI

Basic LCEL chain

基础LCEL链

prompt = ChatPromptTemplate.from_template("Tell me about {topic}") llm = ChatOpenAI(model="gpt-4o-mini", temperature=0) output_parser = StrOutputParser()
chain = prompt | llm | output_parser result = chain.invoke({"topic": "quantum computing"})
undefined
prompt = ChatPromptTemplate.from_template("Tell me about {topic}") llm = ChatOpenAI(model="gpt-4o-mini", temperature=0) output_parser = StrOutputParser()
chain = prompt | llm | output_parser result = chain.invoke({"topic": "quantum computing"})
undefined

Runnable Interface

Runnable接口

Every component in LangChain implements the Runnable interface with standard methods:
python
from langchain_core.runnables import RunnablePassthrough
LangChain中的每个组件都实现了Runnable接口,包含标准方法:
python
from langchain_core.runnables import RunnablePassthrough

Key methods: invoke, stream, batch, ainvoke, astream, abatch

关键方法:invoke、stream、batch、ainvoke、astream、abatch

chain = prompt | llm | output_parser
chain = prompt | llm | output_parser

Synchronous invoke

同步调用

result = chain.invoke({"topic": "AI"})
result = chain.invoke({"topic": "AI"})

Streaming

流式传输

for chunk in chain.stream({"topic": "AI"}): print(chunk, end="", flush=True)
for chunk in chain.stream({"topic": "AI"}): print(chunk, end="", flush=True)

Batch processing

批量处理

results = chain.batch([{"topic": "AI"}, {"topic": "ML"}])
results = chain.batch([{"topic": "AI"}, {"topic": "ML"}])

Async variants

异步变体

result = await chain.ainvoke({"topic": "AI"})
undefined
result = await chain.ainvoke({"topic": "AI"})
undefined

RunnablePassthrough

RunnablePassthrough

Pass inputs directly through or apply transformations:
python
from langchain_core.runnables import RunnablePassthrough
直接传递输入或应用转换:
python
from langchain_core.runnables import RunnablePassthrough

Pass through unchanged

保持不变地传递

chain = RunnablePassthrough() | llm | output_parser
chain = RunnablePassthrough() | llm | output_parser

With transformation

带转换的传递

def add_context(x): return {"text": x["input"], "context": "important"}
chain = RunnablePassthrough.assign(processed=add_context) | llm
undefined
def add_context(x): return {"text": x["input"], "context": "important"}
chain = RunnablePassthrough.assign(processed=add_context) | llm
undefined

Chains

Sequential Chains

顺序链

Process data through multiple steps sequentially.
python
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(temperature=0)
通过多个步骤按顺序处理数据。
python
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(temperature=0)

Step 1: Generate ideas

步骤1:生成创意

idea_prompt = ChatPromptTemplate.from_template( "Generate 3 creative ideas for: {topic}" ) idea_chain = idea_prompt | llm | StrOutputParser()
idea_prompt = ChatPromptTemplate.from_template( "Generate 3 creative ideas for: {topic}" ) idea_chain = idea_prompt | llm | StrOutputParser()

Step 2: Evaluate ideas

步骤2:评估创意

eval_prompt = ChatPromptTemplate.from_template( "Evaluate these ideas and pick the best one:\n{ideas}" ) eval_chain = eval_prompt | llm | StrOutputParser()
eval_prompt = ChatPromptTemplate.from_template( "Evaluate these ideas and pick the best one:\n{ideas}" ) eval_chain = eval_prompt | llm | StrOutputParser()

Combine into sequential chain

组合成顺序链

sequential_chain = ( {"ideas": idea_chain} | RunnablePassthrough.assign(evaluation=eval_chain) )
result = sequential_chain.invoke({"topic": "mobile app"})
undefined
sequential_chain = ( {"ideas": idea_chain} | RunnablePassthrough.assign(evaluation=eval_chain) )
result = sequential_chain.invoke({"topic": "mobile app"})
undefined

Map-Reduce Chains

映射-归约链

Process multiple inputs in parallel and combine results.
python
from langchain_core.runnables import RunnableParallel
from langchain_core.prompts import ChatPromptTemplate
并行处理多个输入并合并结果。
python
from langchain_core.runnables import RunnableParallel
from langchain_core.prompts import ChatPromptTemplate

Define parallel processing

定义并行处理

summary_prompt = ChatPromptTemplate.from_template( "Summarize this text in one sentence: {text}" ) keywords_prompt = ChatPromptTemplate.from_template( "Extract 3 keywords from: {text}" ) sentiment_prompt = ChatPromptTemplate.from_template( "Analyze sentiment (positive/negative/neutral): {text}" )
summary_prompt = ChatPromptTemplate.from_template( "Summarize this text in one sentence: {text}" ) keywords_prompt = ChatPromptTemplate.from_template( "Extract 3 keywords from: {text}" ) sentiment_prompt = ChatPromptTemplate.from_template( "Analyze sentiment (positive/negative/neutral): {text}" )

Map: Process in parallel

映射:并行处理

map_chain = RunnableParallel( summary=summary_prompt | llm | StrOutputParser(), keywords=keywords_prompt | llm | StrOutputParser(), sentiment=sentiment_prompt | llm | StrOutputParser() )
map_chain = RunnableParallel( summary=summary_prompt | llm | StrOutputParser(), keywords=keywords_prompt | llm | StrOutputParser(), sentiment=sentiment_prompt | llm | StrOutputParser() )

Reduce: Combine results

归约:合并结果

reduce_prompt = ChatPromptTemplate.from_template( """Combine the analysis: Summary: {summary} Keywords: {keywords} Sentiment: {sentiment}
Provide a comprehensive report:"""
)
map_reduce_chain = map_chain | reduce_prompt | llm | StrOutputParser()
result = map_reduce_chain.invoke({ "text": "LangChain is an amazing framework for building LLM applications." })
undefined
reduce_prompt = ChatPromptTemplate.from_template( """Combine the analysis: Summary: {summary} Keywords: {keywords} Sentiment: {sentiment}
Provide a comprehensive report:"""
)
map_reduce_chain = map_chain | reduce_prompt | llm | StrOutputParser()
result = map_reduce_chain.invoke({ "text": "LangChain is an amazing framework for building LLM applications." })
undefined

Router Chains

路由链

Route inputs to different chains based on conditions.
python
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
根据条件将输入路由到不同的链。
python
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

Define specialized chains

定义专用链

technical_prompt = ChatPromptTemplate.from_template( "Provide a technical explanation of: {query}" ) simple_prompt = ChatPromptTemplate.from_template( "Explain in simple terms: {query}" )
technical_chain = technical_prompt | llm | StrOutputParser() simple_chain = simple_prompt | llm | StrOutputParser()
technical_prompt = ChatPromptTemplate.from_template( "Provide a technical explanation of: {query}" ) simple_prompt = ChatPromptTemplate.from_template( "Explain in simple terms: {query}" )
technical_chain = technical_prompt | llm | StrOutputParser() simple_chain = simple_prompt | llm | StrOutputParser()

Router function

路由函数

def route_query(input_dict): query = input_dict["query"] complexity = input_dict.get("complexity", "simple")
if complexity == "technical":
    return technical_chain
return simple_chain
def route_query(input_dict): query = input_dict["query"] complexity = input_dict.get("complexity", "simple")
if complexity == "technical":
    return technical_chain
return simple_chain

Create router chain

创建路由链

from langchain_core.runnables import RunnableLambda
router_chain = RunnableLambda(route_query)
from langchain_core.runnables import RunnableLambda
router_chain = RunnableLambda(route_query)

Use the router

使用路由链

result = router_chain.invoke({ "query": "quantum entanglement", "complexity": "technical" })
undefined
result = router_chain.invoke({ "query": "quantum entanglement", "complexity": "technical" })
undefined

Conditional Chains

条件链

Execute chains based on conditions.
python
from langchain_core.runnables import RunnableBranch
根据条件执行链。
python
from langchain_core.runnables import RunnableBranch

Define condition-based routing

定义基于条件的路由

classification_prompt = ChatPromptTemplate.from_template( "Classify this as 'question', 'statement', or 'command': {text}" )
question_handler = ChatPromptTemplate.from_template( "Answer this question: {text}" ) | llm | StrOutputParser()
statement_handler = ChatPromptTemplate.from_template( "Acknowledge this statement: {text}" ) | llm | StrOutputParser()
command_handler = ChatPromptTemplate.from_template( "Execute this command: {text}" ) | llm | StrOutputParser()
classification_prompt = ChatPromptTemplate.from_template( "Classify this as 'question', 'statement', or 'command': {text}" )
question_handler = ChatPromptTemplate.from_template( "Answer this question: {text}" ) | llm | StrOutputParser()
statement_handler = ChatPromptTemplate.from_template( "Acknowledge this statement: {text}" ) | llm | StrOutputParser()
command_handler = ChatPromptTemplate.from_template( "Execute this command: {text}" ) | llm | StrOutputParser()

Create conditional branch

创建条件分支

branch = RunnableBranch( (lambda x: "question" in x["type"].lower(), question_handler), (lambda x: "statement" in x["type"].lower(), statement_handler), command_handler # default )
branch = RunnableBranch( (lambda x: "question" in x["type"].lower(), question_handler), (lambda x: "statement" in x["type"].lower(), statement_handler), command_handler # 默认分支 )

Full chain with classification

包含分类的完整链

full_chain = ( {"text": RunnablePassthrough(), "type": classification_prompt | llm | StrOutputParser()} | branch )
undefined
full_chain = ( {"text": RunnablePassthrough(), "type": classification_prompt | llm | StrOutputParser()} | branch )
undefined

LLMChain (Legacy)

LLMChain(传统版)

Traditional chain format still supported:
python
from langchain.chains import LLMChain
from langchain_core.prompts import PromptTemplate

prompt = PromptTemplate(
    input_variables=["product"],
    template="What is a good name for a company that makes {product}?"
)

chain = LLMChain(llm=llm, prompt=prompt)
result = chain.run(product="eco-friendly water bottles")
仍受支持的传统链格式:
python
from langchain.chains import LLMChain
from langchain_core.prompts import PromptTemplate

prompt = PromptTemplate(
    input_variables=["product"],
    template="What is a good name for a company that makes {product}?"
)

chain = LLMChain(llm=llm, prompt=prompt)
result = chain.run(product="eco-friendly water bottles")

Stuff Documents Chain

文档填充链

Combine documents into a single context:
python
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain_core.documents import Document

prompt = ChatPromptTemplate.from_template(
    """Answer based on the following context:

<context>
{context}
</context>

Question: {input}"""
)

document_chain = create_stuff_documents_chain(llm, prompt)

docs = [
    Document(page_content="LangChain supports multiple LLM providers."),
    Document(page_content="Chains can be composed using LCEL.")
]

result = document_chain.invoke({
    "input": "What does LangChain support?",
    "context": docs
})
将多个文档合并为单个上下文:
python
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain_core.documents import Document

prompt = ChatPromptTemplate.from_template(
    """Answer based on the following context:

<context>
{context}
</context>

Question: {input}"""
)

document_chain = create_stuff_documents_chain(llm, prompt)

docs = [
    Document(page_content="LangChain supports multiple LLM providers."),
    Document(page_content="Chains can be composed using LCEL.")
]

result = document_chain.invoke({
    "input": "What does LangChain support?",
    "context": docs
})

Agents

Agent

ReAct Agents

ReAct Agent

Reasoning and Acting agents that use tools iteratively.
python
from langchain.agents import create_react_agent, AgentExecutor
from langchain_core.tools import Tool
from langchain import hub
使用工具迭代执行的推理与行动Agent。
python
from langchain.agents import create_react_agent, AgentExecutor
from langchain_core.tools import Tool
from langchain import hub

Define tools

定义工具

def search_tool(query: str) -> str: """Search for information""" return f"Search results for: {query}"
def calculator_tool(expression: str) -> str: """Calculate mathematical expressions""" try: return str(eval(expression)) except: return "Invalid expression"
tools = [ Tool( name="Search", func=search_tool, description="Useful for searching information" ), Tool( name="Calculator", func=calculator_tool, description="Useful for math calculations" ) ]
def search_tool(query: str) -> str: """Search for information""" return f"Search results for: {query}"
def calculator_tool(expression: str) -> str: """Calculate mathematical expressions""" try: return str(eval(expression)) except: return "Invalid expression"
tools = [ Tool( name="Search", func=search_tool, description="Useful for searching information" ), Tool( name="Calculator", func=calculator_tool, description="Useful for math calculations" ) ]

Create ReAct agent

创建ReAct Agent

prompt = hub.pull("hwchase17/react") agent = create_react_agent(llm, tools, prompt) agent_executor = AgentExecutor( agent=agent, tools=tools, verbose=True, max_iterations=5 )
result = agent_executor.invoke({ "input": "What is 25 * 4, and then search for that number's significance" })
undefined
prompt = hub.pull("hwchase17/react") agent = create_react_agent(llm, tools, prompt) agent_executor = AgentExecutor( agent=agent, tools=tools, verbose=True, max_iterations=5 )
result = agent_executor.invoke({ "input": "What is 25 * 4, and then search for that number's significance" })
undefined

LangGraph ReAct Agent

LangGraph ReAct Agent

Modern approach using LangGraph for better control:
python
from langgraph.prebuilt import create_react_agent
from langchain_core.tools import tool
from langgraph.checkpoint.memory import MemorySaver

@tool
def retrieve(query: str) -> str:
    """Retrieve relevant information from the knowledge base"""
    # Your retrieval logic here
    return f"Retrieved information for: {query}"

@tool
def analyze(text: str) -> str:
    """Analyze text and provide insights"""
    return f"Analysis of: {text}"
使用LangGraph实现的现代方法,提供更好的控制:
python
from langgraph.prebuilt import create_react_agent
from langchain_core.tools import tool
from langgraph.checkpoint.memory import MemorySaver

@tool
def retrieve(query: str) -> str:
    """Retrieve relevant information from the knowledge base"""
    # 此处添加你的检索逻辑
    return f"Retrieved information for: {query}"

@tool
def analyze(text: str) -> str:
    """Analyze text and provide insights"""
    return f"Analysis of: {text}"

Create agent with memory

创建带内存的Agent

memory = MemorySaver() agent_executor = create_react_agent( llm, [retrieve, analyze], checkpointer=memory )
memory = MemorySaver() agent_executor = create_react_agent( llm, [retrieve, analyze], checkpointer=memory )

Use with configuration

配置使用

config = {"configurable": {"thread_id": "abc123"}} for chunk in agent_executor.stream( {"messages": [("user", "Find information about LangChain")]}, config=config ): print(chunk)
undefined
config = {"configurable": {"thread_id": "abc123"}} for chunk in agent_executor.stream( {"messages": [("user", "Find information about LangChain")]}, config=config ): print(chunk)
undefined

Conversational ReAct Agent

对话式ReAct Agent

Agent with built-in conversation memory:
python
from langchain.agents import create_conversational_retrieval_agent
from langchain_core.tools import Tool

tools = [
    Tool(
        name="Knowledge Base",
        func=lambda q: f"KB result: {q}",
        description="Search the knowledge base"
    )
]

conversational_agent = create_conversational_retrieval_agent(
    llm,
    tools,
    verbose=True
)
内置对话内存的Agent:
python
from langchain.agents import create_conversational_retrieval_agent
from langchain_core.tools import Tool

tools = [
    Tool(
        name="Knowledge Base",
        func=lambda q: f"KB result: {q}",
        description="Search the knowledge base"
    )
]

conversational_agent = create_conversational_retrieval_agent(
    llm,
    tools,
    verbose=True
)

Maintains conversation context

保持对话上下文

result1 = conversational_agent.invoke({ "input": "What is LangChain?" }) result2 = conversational_agent.invoke({ "input": "Tell me more about its features" })
undefined
result1 = conversational_agent.invoke({ "input": "What is LangChain?" }) result2 = conversational_agent.invoke({ "input": "Tell me more about its features" })
undefined

Zero-Shot React Agent

零样本ReAct Agent

Agent that works without examples:
python
from langchain.agents import AgentType, initialize_agent, load_tools
无需示例即可工作的Agent:
python
from langchain.agents import AgentType, initialize_agent, load_tools

Load pre-built tools

加载预构建工具

tools = load_tools(["serpapi", "llm-math"], llm=llm)
agent = initialize_agent( tools, llm, agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION, verbose=True, max_iterations=3 )
result = agent.run( "What is the population of Tokyo and what is that number divided by 2?" )
undefined
tools = load_tools(["serpapi", "llm-math"], llm=llm)
agent = initialize_agent( tools, llm, agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION, verbose=True, max_iterations=3 )
result = agent.run( "What is the population of Tokyo and what is that number divided by 2?" )
undefined

Structured Chat Agent

结构化聊天Agent

Agent that uses structured input/output:
python
from langchain.agents import create_structured_chat_agent
使用结构化输入/输出的Agent:
python
from langchain.agents import create_structured_chat_agent

Define tools with structured schemas

定义带结构化模式的工具

from pydantic import BaseModel, Field
class SearchInput(BaseModel): query: str = Field(description="The search query") max_results: int = Field(default=5, description="Maximum results")
@tool(args_schema=SearchInput) def structured_search(query: str, max_results: int = 5) -> str: """Search with structured parameters""" return f"Found {max_results} results for: {query}"
tools = [structured_search]
prompt = hub.pull("hwchase17/structured-chat-agent") agent = create_structured_chat_agent(llm, tools, prompt) agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True)
undefined
from pydantic import BaseModel, Field
class SearchInput(BaseModel): query: str = Field(description="The search query") max_results: int = Field(default=5, description="Maximum results")
@tool(args_schema=SearchInput) def structured_search(query: str, max_results: int = 5) -> str: """Search with structured parameters""" return f"Found {max_results} results for: {query}"
tools = [structured_search]
prompt = hub.pull("hwchase17/structured-chat-agent") agent = create_structured_chat_agent(llm, tools, prompt) agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True)
undefined

Tool Calling Agent

工具调用Agent

Modern agent using native tool calling:
python
from langchain_core.tools import tool

@tool
def multiply(a: int, b: int) -> int:
    """Multiply two numbers"""
    return a * b

@tool
def search_database(query: str, limit: int = 10) -> str:
    """Search the database"""
    return f"Found {limit} results for {query}"
使用原生工具调用的现代Agent:
python
from langchain_core.tools import tool

@tool
def multiply(a: int, b: int) -> int:
    """Multiply two numbers"""
    return a * b

@tool
def search_database(query: str, limit: int = 10) -> str:
    """Search the database"""
    return f"Found {limit} results for {query}"

Bind tools to LLM

将工具绑定到LLM

llm_with_tools = llm.bind_tools([multiply, search_database])
llm_with_tools = llm.bind_tools([multiply, search_database])

Create simple tool chain

创建简单工具链

from operator import itemgetter
tool_chain = llm_with_tools | (lambda x: x.tool_calls[0]["args"]) | multiply result = tool_chain.invoke("What's four times 23")
undefined
from operator import itemgetter
tool_chain = llm_with_tools | (lambda x: x.tool_calls[0]["args"]) | multiply result = tool_chain.invoke("What's four times 23")
undefined

Memory Systems

内存系统

ConversationBufferMemory

ConversationBufferMemory

Store complete conversation history:
python
from langchain.memory import ConversationBufferMemory
from langchain.chains import LLMChain

memory = ConversationBufferMemory(
    memory_key="chat_history",
    return_messages=True
)

prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a helpful assistant."),
    ("placeholder", "{chat_history}"),
    ("human", "{input}")
])

chain = LLMChain(llm=llm, prompt=prompt, memory=memory)
存储完整对话历史:
python
from langchain.memory import ConversationBufferMemory
from langchain.chains import LLMChain

memory = ConversationBufferMemory(
    memory_key="chat_history",
    return_messages=True
)

prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a helpful assistant."),
    ("placeholder", "{chat_history}"),
    ("human", "{input}")
])

chain = LLMChain(llm=llm, prompt=prompt, memory=memory)

Conversation is automatically stored

对话会自动存储

response1 = chain.run(input="Hi, I'm Alice") response2 = chain.run(input="What's my name?") # Will remember Alice
undefined
response1 = chain.run(input="Hi, I'm Alice") response2 = chain.run(input="What's my name?") # 会记住Alice
undefined

ConversationBufferWindowMemory

ConversationBufferWindowMemory

Keep only recent K interactions:
python
from langchain.memory import ConversationBufferWindowMemory

memory = ConversationBufferWindowMemory(
    k=5,  # Keep last 5 interactions
    memory_key="chat_history",
    return_messages=True
)

chain = LLMChain(llm=llm, prompt=prompt, memory=memory)
仅保留最近K次交互:
python
from langchain.memory import ConversationBufferWindowMemory

memory = ConversationBufferWindowMemory(
    k=5,  # 保留最近5次交互
    memory_key="chat_history",
    return_messages=True
)

chain = LLMChain(llm=llm, prompt=prompt, memory=memory)

ConversationSummaryMemory

ConversationSummaryMemory

Summarize conversation history:
python
from langchain.memory import ConversationSummaryMemory

memory = ConversationSummaryMemory(
    llm=llm,
    memory_key="chat_history",
    return_messages=True
)

chain = LLMChain(llm=llm, prompt=prompt, memory=memory)
总结对话历史:
python
from langchain.memory import ConversationSummaryMemory

memory = ConversationSummaryMemory(
    llm=llm,
    memory_key="chat_history",
    return_messages=True
)

chain = LLMChain(llm=llm, prompt=prompt, memory=memory)

Long conversations are automatically summarized

长对话会自动被总结

for i in range(20): chain.run(input=f"Tell me fact {i} about AI")
undefined
for i in range(20): chain.run(input=f"Tell me fact {i} about AI")
undefined

ConversationSummaryBufferMemory

ConversationSummaryBufferMemory

Hybrid approach: recent messages + summary:
python
from langchain.memory import ConversationSummaryBufferMemory

memory = ConversationSummaryBufferMemory(
    llm=llm,
    max_token_limit=100,  # When to trigger summarization
    memory_key="chat_history",
    return_messages=True
)
混合方式:最近消息+总结:
python
from langchain.memory import ConversationSummaryBufferMemory

memory = ConversationSummaryBufferMemory(
    llm=llm,
    max_token_limit=100,  # 触发总结的令牌阈值
    memory_key="chat_history",
    return_messages=True
)

Vector Store Memory

向量存储内存

Semantic search over conversation history:
python
from langchain.memory import VectorStoreRetrieverMemory
from langchain_community.vectorstores import FAISS
from langchain_openai import OpenAIEmbeddings

embeddings = OpenAIEmbeddings()
vectorstore = FAISS.from_texts([], embeddings)

memory = VectorStoreRetrieverMemory(
    retriever=vectorstore.as_retriever(search_kwargs={"k": 5})
)
对对话历史进行语义搜索:
python
from langchain.memory import VectorStoreRetrieverMemory
from langchain_community.vectorstores import FAISS
from langchain_openai import OpenAIEmbeddings

embeddings = OpenAIEmbeddings()
vectorstore = FAISS.from_texts([], embeddings)

memory = VectorStoreRetrieverMemory(
    retriever=vectorstore.as_retriever(search_kwargs={"k": 5})
)

Save context

保存上下文

memory.save_context( {"input": "My favorite color is blue"}, {"output": "That's great!"} )
memory.save_context( {"input": "My favorite color is blue"}, {"output": "That's great!"} )

Retrieve relevant context

检索相关上下文

relevant = memory.load_memory_variables({"input": "What's my favorite color?"})
undefined
relevant = memory.load_memory_variables({"input": "What's my favorite color?"})
undefined

Recall Memories (LangGraph)

召回内存(LangGraph)

Structured memory with save and search:
python
from langchain_core.vectorstores import InMemoryVectorStore
from langchain_openai import OpenAIEmbeddings
from langchain_core.tools import tool

recall_vector_store = InMemoryVectorStore(OpenAIEmbeddings())

@tool
def save_recall_memory(memory: str) -> str:
    """Save important information to long-term memory"""
    recall_vector_store.add_texts([memory])
    return f"Saved memory: {memory}"

@tool
def search_recall_memories(query: str) -> str:
    """Search long-term memories"""
    docs = recall_vector_store.similarity_search(query, k=3)
    return "\n".join([doc.page_content for doc in docs])
带保存和搜索功能的结构化内存:
python
from langchain_core.vectorstores import InMemoryVectorStore
from langchain_openai import OpenAIEmbeddings
from langchain_core.tools import tool

recall_vector_store = InMemoryVectorStore(OpenAIEmbeddings())

@tool
def save_recall_memory(memory: str) -> str:
    """Save important information to long-term memory"""
    recall_vector_store.add_texts([memory])
    return f"Saved memory: {memory}"

@tool
def search_recall_memories(query: str) -> str:
    """Search long-term memories"""
    docs = recall_vector_store.similarity_search(query, k=3)
    return "\n".join([doc.page_content for doc in docs])

Use with agent

与Agent一起使用

from langgraph.prebuilt import create_react_agent
agent = create_react_agent( llm, [save_recall_memory, search_recall_memories] )
undefined
from langgraph.prebuilt import create_react_agent
agent = create_react_agent( llm, [save_recall_memory, search_recall_memories] )
undefined

Custom Memory with LangGraph State

自定义内存(LangGraph状态)

Define custom state for memory:
python
from typing import List
from langgraph.graph import MessagesState, StateGraph, START, END

class State(MessagesState):
    recall_memories: List[str]

def load_memories(state: State):
    """Load relevant memories before agent processes input"""
    messages = state["messages"]
    last_message = messages[-1].content if messages else ""

    # Search for relevant memories
    docs = recall_vector_store.similarity_search(last_message, k=3)
    memories = [doc.page_content for doc in docs]

    return {"recall_memories": memories}
为内存定义自定义状态:
python
from typing import List
from langgraph.graph import MessagesState, StateGraph, START, END

class State(MessagesState):
    recall_memories: List[str]

def load_memories(state: State):
    """在Agent处理输入前加载相关内存"""
    messages = state["messages"]
    last_message = messages[-1].content if messages else ""

    # 搜索相关内存
    docs = recall_vector_store.similarity_search(last_message, k=3)
    memories = [doc.page_content for doc in docs]

    return {"recall_memories": memories}

Add to graph

添加到图中

builder = StateGraph(State) builder.add_node(load_memories) builder.add_edge(START, "load_memories")
undefined
builder = StateGraph(State) builder.add_node(load_memories) builder.add_edge(START, "load_memories")
undefined

RAG Patterns

RAG模式

Basic RAG Chain

基础RAG链

Fundamental retrieval-augmented generation:
python
from langchain_community.vectorstores import FAISS
from langchain_openai import OpenAIEmbeddings
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
基础的检索增强生成:
python
from langchain_community.vectorstores import FAISS
from langchain_openai import OpenAIEmbeddings
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough

Setup vector store

设置向量存储

embeddings = OpenAIEmbeddings() vectorstore = FAISS.from_texts( [ "LangChain supports multiple LLM providers including OpenAI, Anthropic, and more.", "Chains can be composed using LangChain Expression Language (LCEL).", "Agents can use tools to interact with external systems." ], embedding=embeddings )
retriever = vectorstore.as_retriever(search_kwargs={"k": 3})
embeddings = OpenAIEmbeddings() vectorstore = FAISS.from_texts( [ "LangChain supports multiple LLM providers including OpenAI, Anthropic, and more.", "Chains can be composed using LangChain Expression Language (LCEL).", "Agents can use tools to interact with external systems." ], embedding=embeddings )
retriever = vectorstore.as_retriever(search_kwargs={"k": 3})

RAG prompt

RAG提示词

template = """Answer the question based only on the following context:
{context}
Question: {question} """ prompt = ChatPromptTemplate.from_template(template)
def format_docs(docs): return "\n\n".join(doc.page_content for doc in docs)
template = """Answer the question based only on the following context:
{context}
Question: {question} """ prompt = ChatPromptTemplate.from_template(template)
def format_docs(docs): return "\n\n".join(doc.page_content for doc in docs)

Build RAG chain

构建RAG链

rag_chain = ( {"context": retriever | format_docs, "question": RunnablePassthrough()} | prompt | llm | StrOutputParser() )
result = rag_chain.invoke("What does LangChain support?")
undefined
rag_chain = ( {"context": retriever | format_docs, "question": RunnablePassthrough()} | prompt | llm | StrOutputParser() )
result = rag_chain.invoke("What does LangChain support?")
undefined

RAG with Retrieval Chain

带检索链的RAG

Using built-in retrieval chain constructor:
python
from langchain.chains import create_retrieval_chain
from langchain.chains.combine_documents import create_stuff_documents_chain

prompt = ChatPromptTemplate.from_template(
    """Answer based on the context:

<context>
{context}
</context>

Question: {input}"""
)

document_chain = create_stuff_documents_chain(llm, prompt)
retrieval_chain = create_retrieval_chain(retriever, document_chain)

response = retrieval_chain.invoke({
    "input": "What is LCEL?"
})
使用内置检索链构造器:
python
from langchain.chains import create_retrieval_chain
from langchain.chains.combine_documents import create_stuff_documents_chain

prompt = ChatPromptTemplate.from_template(
    """Answer based on the context:

<context>
{context}
</context>

Question: {input}"""
)

document_chain = create_stuff_documents_chain(llm, prompt)
retrieval_chain = create_retrieval_chain(retriever, document_chain)

response = retrieval_chain.invoke({
    "input": "What is LCEL?"
})

Returns: {"input": "...", "context": [...], "answer": "..."}

返回结果:{"input": "...", "context": [...], "answer": "..."}

undefined
undefined

RAG with Chat History

带对话历史的RAG

Conversational RAG with context:
python
from langchain.chains import create_history_aware_retriever
from langchain_core.prompts import MessagesPlaceholder

contextualize_prompt = ChatPromptTemplate.from_messages([
    ("system", "Given a chat history and the latest user question, "
               "formulate a standalone question which can be understood "
               "without the chat history."),
    MessagesPlaceholder("chat_history"),
    ("human", "{input}")
])

history_aware_retriever = create_history_aware_retriever(
    llm,
    retriever,
    contextualize_prompt
)
带上下文的对话式RAG:
python
from langchain.chains import create_history_aware_retriever
from langchain_core.prompts import MessagesPlaceholder

contextualize_prompt = ChatPromptTemplate.from_messages([
    ("system", "Given a chat history and the latest user question, "
               "formulate a standalone question which can be understood "
               "without the chat history."),
    MessagesPlaceholder("chat_history"),
    ("human", "{input}")
])

history_aware_retriever = create_history_aware_retriever(
    llm,
    retriever,
    contextualize_prompt
)

Use in RAG chain

在RAG链中使用

qa_chain = create_retrieval_chain( history_aware_retriever, document_chain )
qa_chain = create_retrieval_chain( history_aware_retriever, document_chain )

First question

第一个问题

result1 = qa_chain.invoke({ "input": "What is LangChain?", "chat_history": [] })
result1 = qa_chain.invoke({ "input": "What is LangChain?", "chat_history": [] })

Follow-up with context

带上下文的跟进问题

result2 = qa_chain.invoke({ "input": "What are its main features?", "chat_history": [ ("human", "What is LangChain?"), ("ai", result1["answer"]) ] })
undefined
result2 = qa_chain.invoke({ "input": "What are its main features?", "chat_history": [ ("human", "What is LangChain?"), ("ai", result1["answer"]) ] })
undefined

Multi-Query RAG

多查询RAG

Generate multiple search queries for better retrieval:
python
from langchain.retrievers.multi_query import MultiQueryRetriever

multi_query_retriever = MultiQueryRetriever.from_llm(
    retriever=vectorstore.as_retriever(),
    llm=llm
)
生成多个搜索查询以提升检索效果:
python
from langchain.retrievers.multi_query import MultiQueryRetriever

multi_query_retriever = MultiQueryRetriever.from_llm(
    retriever=vectorstore.as_retriever(),
    llm=llm
)

Automatically generates multiple query variations

自动生成多个查询变体

rag_chain = ( {"context": multi_query_retriever | format_docs, "question": RunnablePassthrough()} | prompt | llm | StrOutputParser() )
undefined
rag_chain = ( {"context": multi_query_retriever | format_docs, "question": RunnablePassthrough()} | prompt | llm | StrOutputParser() )
undefined

RAG with Reranking

带重排序的RAG

Improve relevance with reranking:
python
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import FlashrankRerank
通过重排序提升相关性:
python
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import FlashrankRerank

Setup reranker

设置重排序器

compressor = FlashrankRerank() compression_retriever = ContextualCompressionRetriever( base_compressor=compressor, base_retriever=retriever )
compressor = FlashrankRerank() compression_retriever = ContextualCompressionRetriever( base_compressor=compressor, base_retriever=retriever )

Use in RAG chain

在RAG链中使用

rag_chain = ( {"context": compression_retriever | format_docs, "question": RunnablePassthrough()} | prompt | llm | StrOutputParser() )
undefined
rag_chain = ( {"context": compression_retriever | format_docs, "question": RunnablePassthrough()} | prompt | llm | StrOutputParser() )
undefined

Parent Document Retrieval

父文档检索

Retrieve larger parent documents for full context:
python
from langchain.retrievers import ParentDocumentRetriever
from langchain.storage import InMemoryStore
from langchain_text_splitters import RecursiveCharacterTextSplitter
检索更大的父文档以获取完整上下文:
python
from langchain.retrievers import ParentDocumentRetriever
from langchain.storage import InMemoryStore
from langchain_text_splitters import RecursiveCharacterTextSplitter

Storage for parent documents

父文档存储

store = InMemoryStore()
store = InMemoryStore()

Splitters

分割器

child_splitter = RecursiveCharacterTextSplitter(chunk_size=400) parent_splitter = RecursiveCharacterTextSplitter(chunk_size=2000)
parent_retriever = ParentDocumentRetriever( vectorstore=vectorstore, docstore=store, child_splitter=child_splitter, parent_splitter=parent_splitter, )
child_splitter = RecursiveCharacterTextSplitter(chunk_size=400) parent_splitter = RecursiveCharacterTextSplitter(chunk_size=2000)
parent_retriever = ParentDocumentRetriever( vectorstore=vectorstore, docstore=store, child_splitter=child_splitter, parent_splitter=parent_splitter, )

Add documents

添加文档

parent_retriever.add_documents(documents)
undefined
parent_retriever.add_documents(documents)
undefined

Self-Query Retrieval

自查询检索

Natural language to structured queries:
python
from langchain.retrievers.self_query.base import SelfQueryRetriever
from langchain.chains.query_constructor.base import AttributeInfo

metadata_field_info = [
    AttributeInfo(
        name="source",
        description="The document source",
        type="string",
    ),
    AttributeInfo(
        name="page",
        description="The page number",
        type="integer",
    ),
]

document_content_description = "Technical documentation"

self_query_retriever = SelfQueryRetriever.from_llm(
    llm,
    vectorstore,
    document_content_description,
    metadata_field_info,
)
自然语言转结构化查询:
python
from langchain.retrievers.self_query.base import SelfQueryRetriever
from langchain.chains.query_constructor.base import AttributeInfo

metadata_field_info = [
    AttributeInfo(
        name="source",
        description="The document source",
        type="string",
    ),
    AttributeInfo(
        name="page",
        description="The page number",
        type="integer",
    ),
]

document_content_description = "Technical documentation"

self_query_retriever = SelfQueryRetriever.from_llm(
    llm,
    vectorstore,
    document_content_description,
    metadata_field_info,
)

LLM Integrations

LLM集成

OpenAI Integration

OpenAI集成

python
from langchain_openai import ChatOpenAI, OpenAI
python
from langchain_openai import ChatOpenAI, OpenAI

Chat model

聊天模型

chat_model = ChatOpenAI( model="gpt-4o-mini", temperature=0.7, max_tokens=500, api_key="your-api-key" )
chat_model = ChatOpenAI( model="gpt-4o-mini", temperature=0.7, max_tokens=500, api_key="your-api-key" )

Completion model

补全模型

completion_model = OpenAI( model="gpt-3.5-turbo-instruct", temperature=0.9 )
undefined
completion_model = OpenAI( model="gpt-3.5-turbo-instruct", temperature=0.9 )
undefined

Anthropic Claude Integration

Anthropic Claude集成

python
from langchain_anthropic import ChatAnthropic

claude = ChatAnthropic(
    model="claude-3-5-sonnet-20241022",
    temperature=0,
    max_tokens=1024,
    api_key="your-api-key"
)
python
from langchain_anthropic import ChatAnthropic

claude = ChatAnthropic(
    model="claude-3-5-sonnet-20241022",
    temperature=0,
    max_tokens=1024,
    api_key="your-api-key"
)

HuggingFace Integration

HuggingFace集成

python
from langchain_huggingface import HuggingFaceEndpoint

llm = HuggingFaceEndpoint(
    repo_id="meta-llama/Llama-2-7b-chat-hf",
    huggingfacehub_api_token="your-token",
    task="text-generation",
    temperature=0.7
)
python
from langchain_huggingface import HuggingFaceEndpoint

llm = HuggingFaceEndpoint(
    repo_id="meta-llama/Llama-2-7b-chat-hf",
    huggingfacehub_api_token="your-token",
    task="text-generation",
    temperature=0.7
)

Google Vertex AI Integration

Google Vertex AI集成

python
from langchain_google_vertexai import ChatVertexAI, VertexAI
python
from langchain_google_vertexai import ChatVertexAI, VertexAI

Chat model

聊天模型

chat_model = ChatVertexAI( model_name="chat-bison", temperature=0 )
chat_model = ChatVertexAI( model_name="chat-bison", temperature=0 )

Completion model

补全模型

completion_model = VertexAI( model_name="gemini-1.0-pro-002" )
undefined
completion_model = VertexAI( model_name="gemini-1.0-pro-002" )
undefined

Ollama Local Models

Ollama本地模型

python
from langchain_community.llms import Ollama

llm = Ollama(
    model="llama2",
    temperature=0.8
)
python
from langchain_community.llms import Ollama

llm = Ollama(
    model="llama2",
    temperature=0.8
)

Binding Tools to LLMs

将工具绑定到LLM

python
from langchain_core.tools import tool

@tool
def multiply(a: int, b: int) -> int:
    """Multiply two numbers together"""
    return a * b
python
from langchain_core.tools import tool

@tool
def multiply(a: int, b: int) -> int:
    """Multiply two numbers together"""
    return a * b

Bind tools to model

将工具绑定到模型

llm_with_tools = llm.bind_tools([multiply])
llm_with_tools = llm.bind_tools([multiply])

Model will return tool calls

模型会返回工具调用指令

response = llm_with_tools.invoke("What is 3 times 4?") print(response.tool_calls)
undefined
response = llm_with_tools.invoke("What is 3 times 4?") print(response.tool_calls)
undefined

Callbacks & Monitoring

回调与监控

Standard Callbacks

标准回调

Track chain execution:
python
from langchain_core.callbacks import StdOutCallbackHandler
from langchain.callbacks import get_openai_callback
跟踪链的执行:
python
from langchain_core.callbacks import StdOutCallbackHandler
from langchain.callbacks import get_openai_callback

Standard output callback

标准输出回调

callbacks = [StdOutCallbackHandler()]
chain = prompt | llm | StrOutputParser() result = chain.invoke( {"topic": "AI"}, config={"callbacks": callbacks} )
callbacks = [StdOutCallbackHandler()]
chain = prompt | llm | StrOutputParser() result = chain.invoke( {"topic": "AI"}, config={"callbacks": callbacks} )

OpenAI cost tracking

OpenAI成本跟踪

with get_openai_callback() as cb: result = chain.invoke({"topic": "AI"}) print(f"Total Tokens: {cb.total_tokens}") print(f"Total Cost: ${cb.total_cost}")
undefined
with get_openai_callback() as cb: result = chain.invoke({"topic": "AI"}) print(f"Total Tokens: {cb.total_tokens}") print(f"Total Cost: ${cb.total_cost}")
undefined

Custom Callbacks

自定义回调

Create custom callback handlers:
python
from langchain_core.callbacks import BaseCallbackHandler
from typing import Any, Dict

class MyCustomCallback(BaseCallbackHandler):
    def on_llm_start(self, serialized: Dict[str, Any], prompts: list[str], **kwargs):
        print(f"LLM started with prompts: {prompts}")

    def on_llm_end(self, response, **kwargs):
        print(f"LLM finished with response: {response}")

    def on_chain_start(self, serialized: Dict[str, Any], inputs: Dict[str, Any], **kwargs):
        print(f"Chain started with inputs: {inputs}")

    def on_chain_end(self, outputs: Dict[str, Any], **kwargs):
        print(f"Chain ended with outputs: {outputs}")

    def on_tool_start(self, serialized: Dict[str, Any], input_str: str, **kwargs):
        print(f"Tool started with input: {input_str}")

    def on_tool_end(self, output: str, **kwargs):
        print(f"Tool ended with output: {output}")
创建自定义回调处理器:
python
from langchain_core.callbacks import BaseCallbackHandler
from typing import Any, Dict

class MyCustomCallback(BaseCallbackHandler):
    def on_llm_start(self, serialized: Dict[str, Any], prompts: list[str], **kwargs):
        print(f"LLM started with prompts: {prompts}")

    def on_llm_end(self, response, **kwargs):
        print(f"LLM finished with response: {response}")

    def on_chain_start(self, serialized: Dict[str, Any], inputs: Dict[str, Any], **kwargs):
        print(f"Chain started with inputs: {inputs}")

    def on_chain_end(self, outputs: Dict[str, Any], **kwargs):
        print(f"Chain ended with outputs: {outputs}")

    def on_tool_start(self, serialized: Dict[str, Any], input_str: str, **kwargs):
        print(f"Tool started with input: {input_str}")

    def on_tool_end(self, output: str, **kwargs):
        print(f"Tool ended with output: {output}")

Use custom callback

使用自定义回调

custom_callback = MyCustomCallback() result = chain.invoke( {"topic": "AI"}, config={"callbacks": [custom_callback]} )
undefined
custom_callback = MyCustomCallback() result = chain.invoke( {"topic": "AI"}, config={"callbacks": [custom_callback]} )
undefined

Argilla Callback

Argilla回调

Track and log to Argilla:
python
from langchain_community.callbacks import ArgillaCallbackHandler

argilla_callback = ArgillaCallbackHandler(
    dataset_name="langchain-dataset",
    api_url="http://localhost:6900",
    api_key="your-api-key"
)

callbacks = [argilla_callback]

agent = initialize_agent(
    tools,
    llm,
    agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
    callbacks=callbacks
)

agent.run("Who was the first president of the United States?")
跟踪并记录到Argilla:
python
from langchain_community.callbacks import ArgillaCallbackHandler

argilla_callback = ArgillaCallbackHandler(
    dataset_name="langchain-dataset",
    api_url="http://localhost:6900",
    api_key="your-api-key"
)

callbacks = [argilla_callback]

agent = initialize_agent(
    tools,
    llm,
    agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
    callbacks=callbacks
)

agent.run("Who was the first president of the United States?")

UpTrain Callback

UpTrain回调

RAG evaluation and monitoring:
python
from langchain_community.callbacks import UpTrainCallbackHandler

uptrain_callback = UpTrainCallbackHandler(
    key_type="uptrain",
    api_key="your-api-key"
)

config = {"callbacks": [uptrain_callback]}
RAG评估与监控:
python
from langchain_community.callbacks import UpTrainCallbackHandler

uptrain_callback = UpTrainCallbackHandler(
    key_type="uptrain",
    api_key="your-api-key"
)

config = {"callbacks": [uptrain_callback]}

Automatically evaluates context relevance, factual accuracy, completeness

自动评估上下文相关性、事实准确性、完整性

result = rag_chain.invoke("What is LangChain?", config=config)
undefined
result = rag_chain.invoke("What is LangChain?", config=config)
undefined

LangSmith Integration

LangSmith集成

Production monitoring and debugging:
python
import os
生产环境监控与调试:
python
import os

Set environment variables

设置环境变量

os.environ["LANGCHAIN_TRACING_V2"] = "true" os.environ["LANGCHAIN_API_KEY"] = "your-langsmith-key" os.environ["LANGCHAIN_PROJECT"] = "my-project"
os.environ["LANGCHAIN_TRACING_V2"] = "true" os.environ["LANGCHAIN_API_KEY"] = "your-langsmith-key" os.environ["LANGCHAIN_PROJECT"] = "my-project"

All chains automatically traced

所有链都会自动被跟踪

result = chain.invoke({"topic": "AI"})
result = chain.invoke({"topic": "AI"})

View traces at smith.langchain.com

在smith.langchain.com查看跟踪记录

undefined
undefined

Retrieval Strategies

检索策略

Vector Store Retrievers

向量存储检索器

Basic similarity search:
python
from langchain_community.vectorstores import FAISS, Chroma, Pinecone
基础相似度搜索:
python
from langchain_community.vectorstores import FAISS, Chroma, Pinecone

FAISS

FAISS

faiss_retriever = vectorstore.as_retriever( search_type="similarity", search_kwargs={"k": 5} )
faiss_retriever = vectorstore.as_retriever( search_type="similarity", search_kwargs={"k": 5} )

Maximum Marginal Relevance (MMR)

最大边际相关性(MMR)

mmr_retriever = vectorstore.as_retriever( search_type="mmr", search_kwargs={"k": 5, "fetch_k": 20, "lambda_mult": 0.5} )
mmr_retriever = vectorstore.as_retriever( search_type="mmr", search_kwargs={"k": 5, "fetch_k": 20, "lambda_mult": 0.5} )

Similarity with threshold

带阈值的相似度搜索

threshold_retriever = vectorstore.as_retriever( search_type="similarity_score_threshold", search_kwargs={"score_threshold": 0.8, "k": 5} )
undefined
threshold_retriever = vectorstore.as_retriever( search_type="similarity_score_threshold", search_kwargs={"score_threshold": 0.8, "k": 5} )
undefined

Ensemble Retriever

集成检索器

Combine multiple retrievers:
python
from langchain.retrievers import EnsembleRetriever
from langchain_community.retrievers import BM25Retriever
组合多个检索器:
python
from langchain.retrievers import EnsembleRetriever
from langchain_community.retrievers import BM25Retriever

BM25 for keyword search

BM25关键词搜索

bm25_retriever = BM25Retriever.from_texts(texts) bm25_retriever.k = 5
bm25_retriever = BM25Retriever.from_texts(texts) bm25_retriever.k = 5

Combine with vector search

与向量搜索组合

ensemble_retriever = EnsembleRetriever( retrievers=[bm25_retriever, faiss_retriever], weights=[0.5, 0.5] )
docs = ensemble_retriever.get_relevant_documents("LangChain features")
undefined
ensemble_retriever = EnsembleRetriever( retrievers=[bm25_retriever, faiss_retriever], weights=[0.5, 0.5] )
docs = ensemble_retriever.get_relevant_documents("LangChain features")
undefined

Time-Weighted Retriever

时间加权检索器

Prioritize recent documents:
python
from langchain.retrievers import TimeWeightedVectorStoreRetriever

retriever = TimeWeightedVectorStoreRetriever(
    vectorstore=vectorstore,
    decay_rate=0.01,  # Decay factor for older docs
    k=5
)
优先考虑近期文档:
python
from langchain.retrievers import TimeWeightedVectorStoreRetriever

retriever = TimeWeightedVectorStoreRetriever(
    vectorstore=vectorstore,
    decay_rate=0.01,  # 旧文档的衰减因子
    k=5
)

Multi-Vector Retriever

多向量检索器

Multiple vectors per document:
python
from langchain.retrievers.multi_vector import MultiVectorRetriever
from langchain.storage import InMemoryByteStore

store = InMemoryByteStore()

retriever = MultiVectorRetriever(
    vectorstore=vectorstore,
    byte_store=store,
    id_key="doc_id"
)
每个文档对应多个向量:
python
from langchain.retrievers.multi_vector import MultiVectorRetriever
from langchain.storage import InMemoryByteStore

store = InMemoryByteStore()

retriever = MultiVectorRetriever(
    vectorstore=vectorstore,
    byte_store=store,
    id_key="doc_id"
)

Add documents with multiple representations

添加带多种表示形式的文档

retriever.add_documents(documents)
undefined
retriever.add_documents(documents)
undefined

Streaming

流式传输

Stream Chain Output

流式输出链结果

Stream tokens as they're generated:
python
from langchain_core.output_parsers import StrOutputParser

chain = prompt | llm | StrOutputParser()
在生成时流式传输令牌:
python
from langchain_core.output_parsers import StrOutputParser

chain = prompt | llm | StrOutputParser()

Stream method

流式方法

for chunk in chain.stream({"topic": "AI"}): print(chunk, end="", flush=True)
undefined
for chunk in chain.stream({"topic": "AI"}): print(chunk, end="", flush=True)
undefined

Stream with Callbacks

带回调的流式传输

Handle streaming events:
python
from langchain_core.callbacks import StreamingStdOutCallbackHandler

streaming_llm = ChatOpenAI(
    streaming=True,
    callbacks=[StreamingStdOutCallbackHandler()]
)

chain = prompt | streaming_llm | StrOutputParser()
result = chain.invoke({"topic": "AI"})  # Streams to stdout
处理流式事件:
python
from langchain_core.callbacks import StreamingStdOutCallbackHandler

streaming_llm = ChatOpenAI(
    streaming=True,
    callbacks=[StreamingStdOutCallbackHandler()]
)

chain = prompt | streaming_llm | StrOutputParser()
result = chain.invoke({"topic": "AI"})  # 流式输出到标准输出

Async Streaming

异步流式传输

Stream asynchronously:
python
async def stream_async():
    async for chunk in chain.astream({"topic": "AI"}):
        print(chunk, end="", flush=True)
异步流式传输:
python
async def stream_async():
    async for chunk in chain.astream({"topic": "AI"}):
        print(chunk, end="", flush=True)

Run async

运行异步函数

import asyncio asyncio.run(stream_async())
undefined
import asyncio asyncio.run(stream_async())
undefined

Stream Agent Responses

流式Agent响应

Stream agent execution:
python
from langgraph.prebuilt import create_react_agent

agent = create_react_agent(llm, tools)

for chunk in agent.stream(
    {"messages": [("user", "Search for LangChain information")]},
    stream_mode="values"
):
    chunk["messages"][-1].pretty_print()
流式传输Agent的执行过程:
python
from langgraph.prebuilt import create_react_agent

agent = create_react_agent(llm, tools)

for chunk in agent.stream(
    {"messages": [("user", "Search for LangChain information")]},
    stream_mode="values"
):
    chunk["messages"][-1].pretty_print()

Streaming RAG

流式RAG

Stream RAG responses:
python
retrieval_chain = (
    {
        "context": retriever.with_config(run_name="Docs"),
        "question": RunnablePassthrough(),
    }
    | prompt
    | llm
    | StrOutputParser()
)
流式传输RAG响应:
python
retrieval_chain = (
    {
        "context": retriever.with_config(run_name="Docs"),
        "question": RunnablePassthrough(),
    }
    | prompt
    | llm
    | StrOutputParser()
)

Stream the response

流式输出响应

for chunk in retrieval_chain.stream("What is LangChain?"): print(chunk, end="", flush=True)
undefined
for chunk in retrieval_chain.stream("What is LangChain?"): print(chunk, end="", flush=True)
undefined

Error Handling

错误处理

Retry Logic

重试逻辑

Automatic retries on failure:
python
from langchain_core.runnables import RunnableRetry
失败时自动重试:
python
from langchain_core.runnables import RunnableRetry

Add retry to chain

为链添加重试机制

chain_with_retry = (prompt | llm | StrOutputParser()).with_retry( stop_after_attempt=3, wait_exponential_jitter=True )
result = chain_with_retry.invoke({"topic": "AI"})
undefined
chain_with_retry = (prompt | llm | StrOutputParser()).with_retry( stop_after_attempt=3, wait_exponential_jitter=True )
result = chain_with_retry.invoke({"topic": "AI"})
undefined

Fallback Chains

回退链

Use fallback on errors:
python
from langchain_core.runnables import RunnableWithFallbacks

primary_llm = ChatOpenAI(model="gpt-4")
fallback_llm = ChatOpenAI(model="gpt-3.5-turbo")

chain_with_fallback = (prompt | primary_llm).with_fallbacks(
    [prompt | fallback_llm]
)

result = chain_with_fallback.invoke({"topic": "AI"})
出错时使用回退方案:
python
from langchain_core.runnables import RunnableWithFallbacks

primary_llm = ChatOpenAI(model="gpt-4")
fallback_llm = ChatOpenAI(model="gpt-3.5-turbo")

chain_with_fallback = (prompt | primary_llm).with_fallbacks(
    [prompt | fallback_llm]
)

result = chain_with_fallback.invoke({"topic": "AI"})

Try-Except Patterns

异常捕获模式

Manual error handling:
python
from langchain_core.exceptions import OutputParserException

try:
    result = chain.invoke({"topic": "AI"})
except OutputParserException as e:
    print(f"Parsing failed: {e}")
    result = chain.invoke({"topic": "AI"})  # Retry
except Exception as e:
    print(f"Chain execution failed: {e}")
    result = None
手动处理错误:
python
from langchain_core.exceptions import OutputParserException

try:
    result = chain.invoke({"topic": "AI"})
except OutputParserException as e:
    print(f"Parsing failed: {e}")
    result = chain.invoke({"topic": "AI"})  # 重试
except Exception as e:
    print(f"Chain execution failed: {e}")
    result = None

Timeout Handling

超时处理

Set execution timeouts:
python
from langchain_core.runnables import RunnableConfig

config = RunnableConfig(timeout=10.0)  # 10 seconds

try:
    result = chain.invoke({"topic": "AI"}, config=config)
except TimeoutError:
    print("Chain execution timed out")
设置执行超时:
python
from langchain_core.runnables import RunnableConfig

config = RunnableConfig(timeout=10.0)  # 10秒

try:
    result = chain.invoke({"topic": "AI"}, config=config)
except TimeoutError:
    print("Chain execution timed out")

Validation

验证

Validate inputs and outputs:
python
from pydantic import BaseModel, Field, validator

class QueryInput(BaseModel):
    topic: str = Field(..., min_length=1, max_length=100)

    @validator("topic")
    def topic_must_be_valid(cls, v):
        if not v.strip():
            raise ValueError("Topic cannot be empty")
        return v.strip()
验证输入和输出:
python
from pydantic import BaseModel, Field, validator

class QueryInput(BaseModel):
    topic: str = Field(..., min_length=1, max_length=100)

    @validator("topic")
    def topic_must_be_valid(cls, v):
        if not v.strip():
            raise ValueError("Topic cannot be empty")
        return v.strip()

Use with chain

与链一起使用

def validate_and_invoke(topic: str): try: validated = QueryInput(topic=topic) return chain.invoke({"topic": validated.topic}) except ValueError as e: return f"Validation error: {e}"
undefined
def validate_and_invoke(topic: str): try: validated = QueryInput(topic=topic) return chain.invoke({"topic": validated.topic}) except ValueError as e: return f"Validation error: {e}"
undefined

Production Best Practices

生产环境最佳实践

Environment Configuration

环境配置

Manage secrets securely:
python
import os
from dotenv import load_dotenv

load_dotenv()
安全管理密钥:
python
import os
from dotenv import load_dotenv

load_dotenv()

Use environment variables

使用环境变量

llm = ChatOpenAI( api_key=os.getenv("OPENAI_API_KEY"), model=os.getenv("MODEL_NAME", "gpt-4o-mini") )
llm = ChatOpenAI( api_key=os.getenv("OPENAI_API_KEY"), model=os.getenv("MODEL_NAME", "gpt-4o-mini") )

Vector store configuration

向量存储配置

VECTOR_STORE_TYPE = os.getenv("VECTOR_STORE", "faiss") EMBEDDING_MODEL = os.getenv("EMBEDDING_MODEL", "text-embedding-3-small")
undefined
VECTOR_STORE_TYPE = os.getenv("VECTOR_STORE", "faiss") EMBEDDING_MODEL = os.getenv("EMBEDDING_MODEL", "text-embedding-3-small")
undefined

Caching

缓存

Cache LLM responses:
python
from langchain.cache import InMemoryCache, SQLiteCache
from langchain.globals import set_llm_cache
缓存LLM响应:
python
from langchain.cache import InMemoryCache, SQLiteCache
from langchain.globals import set_llm_cache

In-memory cache

内存缓存

set_llm_cache(InMemoryCache())
set_llm_cache(InMemoryCache())

Persistent cache

持久化缓存

set_llm_cache(SQLiteCache(database_path=".langchain.db"))
set_llm_cache(SQLiteCache(database_path=".langchain.db"))

Responses are cached automatically

响应会自动被缓存

result1 = llm.invoke("What is AI?") # Calls API result2 = llm.invoke("What is AI?") # Uses cache
undefined
result1 = llm.invoke("What is AI?") # 调用API result2 = llm.invoke("What is AI?") # 使用缓存
undefined

Rate Limiting

速率限制

Control API usage:
python
from langchain_core.rate_limiters import InMemoryRateLimiter

rate_limiter = InMemoryRateLimiter(
    requests_per_second=1,
    check_every_n_seconds=0.1,
    max_bucket_size=10
)

llm = ChatOpenAI(rate_limiter=rate_limiter)
控制API使用频率:
python
from langchain_core.rate_limiters import InMemoryRateLimiter

rate_limiter = InMemoryRateLimiter(
    requests_per_second=1,
    check_every_n_seconds=0.1,
    max_bucket_size=10
)

llm = ChatOpenAI(rate_limiter=rate_limiter)

Batch Processing

批量处理

Process multiple inputs efficiently:
python
undefined
高效处理多个输入:
python
undefined

Batch invoke

批量调用

inputs = [{"topic": f"Topic {i}"} for i in range(10)] results = chain.batch(inputs, config={"max_concurrency": 5})
inputs = [{"topic": f"Topic {i}"} for i in range(10)] results = chain.batch(inputs, config={"max_concurrency": 5})

Async batch

异步批量处理

async def batch_process(): results = await chain.abatch(inputs) return results
undefined
async def batch_process(): results = await chain.abatch(inputs) return results
undefined

Monitoring and Logging

监控与日志

Production monitoring:
python
import logging
from langchain_core.callbacks import BaseCallbackHandler
生产环境监控:
python
import logging
from langchain_core.callbacks import BaseCallbackHandler

Setup logging

设置日志

logging.basicConfig(level=logging.INFO) logger = logging.getLogger(name)
class ProductionCallback(BaseCallbackHandler): def on_chain_start(self, serialized, inputs, **kwargs): logger.info(f"Chain started: {serialized.get('name', 'unknown')}")
def on_chain_end(self, outputs, **kwargs):
    logger.info(f"Chain completed successfully")

def on_chain_error(self, error, **kwargs):
    logger.error(f"Chain error: {error}")
logging.basicConfig(level=logging.INFO) logger = logging.getLogger(name)
class ProductionCallback(BaseCallbackHandler): def on_chain_start(self, serialized, inputs, **kwargs): logger.info(f"Chain started: {serialized.get('name', 'unknown')}")
def on_chain_end(self, outputs, **kwargs):
    logger.info(f"Chain completed successfully")

def on_chain_error(self, error, **kwargs):
    logger.error(f"Chain error: {error}")

Use in production

生产环境使用

production_callback = ProductionCallback() config = {"callbacks": [production_callback]}
undefined
production_callback = ProductionCallback() config = {"callbacks": [production_callback]}
undefined

Testing Chains

测试链

Unit test your chains:
python
import pytest
from langchain_core.messages import HumanMessage, AIMessage

def test_basic_chain():
    chain = prompt | llm | StrOutputParser()
    result = chain.invoke({"topic": "testing"})
    assert isinstance(result, str)
    assert len(result) > 0

def test_rag_chain():
    result = rag_chain.invoke("What is LangChain?")
    assert "LangChain" in result
    assert len(result) > 50

@pytest.mark.asyncio
async def test_async_chain():
    result = await chain.ainvoke({"topic": "async"})
    assert isinstance(result, str)
对链进行单元测试:
python
import pytest
from langchain_core.messages import HumanMessage, AIMessage

def test_basic_chain():
    chain = prompt | llm | StrOutputParser()
    result = chain.invoke({"topic": "testing"})
    assert isinstance(result, str)
    assert len(result) > 0

def test_rag_chain():
    result = rag_chain.invoke("What is LangChain?")
    assert "LangChain" in result
    assert len(result) > 50

@pytest.mark.asyncio
async def test_async_chain():
    result = await chain.ainvoke({"topic": "async"})
    assert isinstance(result, str)

Performance Optimization

性能优化

Optimize chain execution:
python
undefined
优化链的执行:
python
undefined

Use appropriate chunk sizes for text splitting

使用合适的文本分块大小

from langchain_text_splitters import RecursiveCharacterTextSplitter
splitter = RecursiveCharacterTextSplitter( chunk_size=1000, chunk_overlap=200, length_function=len )
from langchain_text_splitters import RecursiveCharacterTextSplitter
splitter = RecursiveCharacterTextSplitter( chunk_size=1000, chunk_overlap=200, length_function=len )

Limit retrieval results

限制检索结果数量

retriever = vectorstore.as_retriever(search_kwargs={"k": 3})
retriever = vectorstore.as_retriever(search_kwargs={"k": 3})

Use smaller, faster models where appropriate

在合适的场景使用更小、更快的模型

fast_llm = ChatOpenAI(model="gpt-4o-mini")
fast_llm = ChatOpenAI(model="gpt-4o-mini")

Enable streaming for better UX

启用流式传输以提升用户体验

streaming_chain = prompt | fast_llm.with_streaming() | StrOutputParser()
undefined
streaming_chain = prompt | fast_llm.with_streaming() | StrOutputParser()
undefined

Documentation

文档编写

Document your chains:
python
from langchain_core.runnables import RunnableConfig

class DocumentedChain:
    """
    Production RAG chain for technical documentation.

    Features:
    - Multi-query retrieval for better coverage
    - Reranking for improved relevance
    - Streaming support
    - Error handling with fallbacks

    Usage:
        chain = DocumentedChain()
        result = chain.invoke("Your question here")
    """

    def __init__(self):
        self.llm = ChatOpenAI(model="gpt-4o-mini")
        self.retriever = self._setup_retriever()
        self.chain = self._build_chain()

    def _setup_retriever(self):
        # Setup logic
        pass

    def _build_chain(self):
        # Chain construction
        pass

    def invoke(self, query: str, config: RunnableConfig = None):
        """Execute the chain with error handling"""
        try:
            return self.chain.invoke(query, config=config)
        except Exception as e:
            logger.error(f"Chain execution failed: {e}")
            raise

为你的链编写文档:
python
from langchain_core.runnables import RunnableConfig

class DocumentedChain:
    """
    用于技术文档的生产级RAG链。

    特性:
    - 多查询检索以提升覆盖范围
    - 重排序以提升相关性
    - 流式传输支持
    - 带回退的错误处理

    使用方法:
        chain = DocumentedChain()
        result = chain.invoke("Your question here")
    """

    def __init__(self):
        self.llm = ChatOpenAI(model="gpt-4o-mini")
        self.retriever = self._setup_retriever()
        self.chain = self._build_chain()

    def _setup_retriever(self):
        # 设置逻辑
        pass

    def _build_chain(self):
        # 链构建逻辑
        pass

    def invoke(self, query: str, config: RunnableConfig = None):
        """带错误处理的链执行"""
        try:
            return self.chain.invoke(query, config=config)
        except Exception as e:
            logger.error(f"Chain execution failed: {e}")
            raise

Summary

总结

This skill covers comprehensive LangChain orchestration patterns:
  • Chains: Sequential, map-reduce, router, conditional chains
  • Agents: ReAct, conversational, zero-shot, structured agents
  • Memory: Buffer, window, summary, vector store memory
  • RAG: Basic, multi-query, reranking, parent document retrieval
  • LLM Integration: OpenAI, Anthropic, HuggingFace, Vertex AI, Ollama
  • Callbacks: Standard, custom, Argilla, UpTrain, LangSmith
  • Retrieval: Vector store, ensemble, time-weighted, multi-vector
  • Streaming: Chain, agent, async streaming
  • Error Handling: Retry, fallback, timeout, validation
  • Production: Configuration, caching, rate limiting, monitoring, testing
For more examples and patterns, see EXAMPLES.md.
本技能指南涵盖了全面的LangChain编排模式:
  • :顺序链、映射-归约链、路由链、条件链
  • Agent:ReAct、对话式、零样本、结构化Agent
  • 内存:缓冲、窗口、总结、向量存储内存
  • RAG:基础、多查询、重排序、父文档检索
  • LLM集成:OpenAI、Anthropic、HuggingFace、Vertex AI、Ollama
  • 回调:标准、自定义、Argilla、UpTrain、LangSmith
  • 检索:向量存储、集成、时间加权、多向量
  • 流式传输:链、Agent、异步流式传输
  • 错误处理:重试、回退、超时、验证
  • 生产环境:配置、缓存、速率限制、监控、测试
更多示例和模式,请查看EXAMPLES.md。