kiwi-go-agent
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseKiwi Go Agent Development
Kiwi Go Agent 开发规范
This skill defines the standards for building AI agents in Go using the golanggraph framework and langchaingo. All agent development MUST follow these patterns.
本规范定义了使用golanggraph框架和langchaingo在Go语言中构建AI Agent的标准。所有Agent开发必须遵循以下模式。
Framework Overview
框架概述
| Framework | Purpose | Import |
|---|---|---|
| golanggraph | Graph-based agent workflow engine | |
| langchaingo | LLM calls, prompts, tool definitions | |
Agents are DAGs (directed acyclic graphs -- with cycles allowed for loops) of Nodes connected by Edges. Each node performs one atomic step: call an LLM, execute tools, validate output, or transform state.
| 框架 | 用途 | 导入路径 |
|---|---|---|
| golanggraph | 基于图的Agent工作流引擎 | |
| langchaingo | LLM调用、提示词、工具定义 | |
Agent是由**Node(节点)和Edge(边)**连接而成的DAG(有向无环图——允许循环用于迭代)。每个节点执行一个原子步骤:调用LLM、执行工具、验证输出或转换状态。
Core Concepts
核心概念
State
状态
All data flows through :
state.Statego
type State struct {
History []llms.MessageContent // Conversation messages (system, human, AI, tool)
Metadata map[string]interface{} // Custom data shared between nodes
}- : The LLM message history. Append to it; the framework passes it between nodes.
History - : A typed key-value map for passing structured data between nodes. Store custom state objects here.
Metadata
所有数据通过流转:
state.Statego
type State struct {
History []llms.MessageContent // 对话消息(系统、人类、AI、工具)
Metadata map[string]interface{} // 节点间共享的自定义数据
}- :LLM消息历史。可向其中追加内容,框架会在节点间传递该历史。
History - :类型化的键值映射,用于在节点间传递结构化数据。可在此存储自定义状态对象。
Metadata
Node Interface
节点接口
Every node implements :
flowcontract.Nodego
type Node interface {
Name() string
Run(ctx context.Context, currentState *state.State, streamFunc flowcontract.StreamFunc) error
}- : Unique identifier used in edge wiring.
Name() - : Executes the node logic. Modifies
Run()in place.currentState - : Optional callback for streaming events to the caller.
streamFunc
每个节点都需实现接口:
flowcontract.Nodego
type Node interface {
Name() string
Run(ctx context.Context, currentState *state.State, streamFunc flowcontract.StreamFunc) error
}- :用于边连接的唯一标识符。
Name() - :执行节点逻辑,直接修改
Run()。currentState - :可选回调函数,用于向调用者流式传输事件。
streamFunc
State Helper Methods
状态辅助方法
state.Statego
s.GetLastResponse() string // Last AI message text
s.GetResumeValue() interface{} // Value passed via ResumeWithValue()
s.IsInterrupted() bool // Whether flow was interrupted
s.GetThreadID() string // Current thread ID (for checkpointing)
s.SetInterruptPayload(payload) // Set interrupt payload before Interrupt()state.Statego
s.GetLastResponse() string // 获取最后一条AI消息文本
s.GetResumeValue() interface{} // 获取通过ResumeWithValue()传递的值
s.IsInterrupted() bool // 判断工作流是否被中断
s.GetThreadID() string // 当前线程ID(用于检查点)
s.SetInterruptPayload(payload) // 在调用Interrupt()前设置中断负载Edges
边
Edges define execution order. Two types:
go
// Direct edge: always flows From -> To
edge.Edge{From: "node_a", To: "node_b"}
// Conditional edge: ConditionFunc decides which target
edge.Edge{
From: "node_a",
ConditionalTo: []string{"node_b", "node_c", flow.EndNode},
ConditionFunc: func(ctx context.Context, st state.State) (string, error) {
// Return one of ConditionalTo values
return "node_b", nil
},
}边定义了执行顺序,分为两种类型:
go
// 直接边:始终从From流向To
edge.Edge{From: "node_a", To: "node_b"}
// 条件边:通过ConditionFunc决定目标节点
edge.Edge{
From: "node_a",
ConditionalTo: []string{"node_b", "node_c", flow.EndNode},
ConditionFunc: func(ctx context.Context, st state.State) (string, error) {
// 返回ConditionalTo中的某个值
return "node_b", nil
},
}Flow Building
工作流构建
go
compiledFlow, err := flow.NewFlowBuilder(logger).
SetName("my_agent").
SetCheckpointer(checkpointer.NewInMemoryCheckpointer()).
AddNode(nodeA).
AddNode(nodeB).
AddEdge(edge.Edge{From: flow.StartNode, To: nodeA.Name()}).
AddEdge(edge.Edge{From: nodeA.Name(), To: nodeB.Name()}).
AddEdge(edge.Edge{From: nodeB.Name(), To: flow.EndNode}).
Compile()go
compiledFlow, err := flow.NewFlowBuilder(logger).
SetName("my_agent").
SetCheckpointer(checkpointer.NewInMemoryCheckpointer()).
AddNode(nodeA).
AddNode(nodeB).
AddEdge(edge.Edge{From: flow.StartNode, To: nodeA.Name()}).
AddEdge(edge.Edge{From: nodeA.Name(), To: nodeB.Name()}).
AddEdge(edge.Edge{From: nodeB.Name(), To: flow.EndNode}).
Compile()Flow Execution
工作流执行
go
// Without streaming
finalState, err := compiledFlow.Exec(ctx, initialState, nil)
// With streaming callback
finalState, err := compiledFlow.Exec(ctx, initialState, func(ctx context.Context, event *flowcontract.FlowStreamEvent) error {
// event.Chunk contains streamed data
// event.FullState contains current flow state
return nil
})go
// 无流式传输
finalState, err := compiledFlow.Exec(ctx, initialState, nil)
// 带流式传输回调
finalState, err := compiledFlow.Exec(ctx, initialState, func(ctx context.Context, event *flowcontract.FlowStreamEvent) error {
// event.Chunk包含流式传输的数据
// event.FullState包含当前工作流状态
return nil
})Agent Patterns
Agent 模式
See references/agent-examples.md for complete code examples of each pattern.
每种模式的完整代码示例请参考references/agent-examples.md。
Pattern 1: Prebuilt Agent (RECOMMENDED)
模式1:预构建Agent(推荐)
Use for all standard chat-with-tools agents. It handles the ReAct loop, tool execution, context compression, and response validation internally.
agent.NewAgent()go
a, err := agent.NewAgent(
agent.WithName("my_agent"),
agent.WithModel(llm),
agent.WithTools([]tools.ITool{searchTool, fileTool}),
agent.WithMaxToolCalls(10), // Prevent infinite loops (default: 10)
agent.WithContextWindow(20), // Auto-compress history, keep last 20 msgs
agent.WithResponseValidator(validatorFunc), // Optional: validate LLM output
agent.WithSubAgent("researcher", researcherAgent), // Optional: delegate to sub-agents
agent.WithBeforeModelHook(beforeHook), // Optional: run before LLM call
agent.WithAfterModelHook(afterHook), // Optional: run after LLM call
agent.WithBeforeToolsHook(beforeToolsHook), // Optional: run before tool execution
agent.WithAfterToolsHook(afterToolsHook), // Optional: run after tool execution
agent.WithLogger(logger),
)The prebuilt agent implements , so it can be used as a node in a larger flow or executed standalone:
flowcontract.Nodego
// Standalone execution
finalState, err := a.Run(ctx, &initialState, streamFunc)
// Or as a node in a flow
flow.NewFlowBuilder(logger).AddNode(a)...Key: Replaces manual + + wiring. Use this unless you need custom control flow.
ChatNodeToolsNodeToolCondition对于所有标准的“带工具的对话Agent”,使用。它内部处理ReAct循环、工具执行、上下文压缩和响应验证。
agent.NewAgent()go
a, err := agent.NewAgent(
agent.WithName("my_agent"),
agent.WithModel(llm),
agent.WithTools([]tools.ITool{searchTool, fileTool}),
agent.WithMaxToolCalls(10), // 防止无限循环(默认值:10)
agent.WithContextWindow(20), // 自动压缩历史,保留最后20条消息
agent.WithResponseValidator(validatorFunc), // 可选:验证LLM输出
agent.WithSubAgent("researcher", researcherAgent), // 可选:委托给子Agent
agent.WithBeforeModelHook(beforeHook), // 可选:LLM调用前执行
agent.WithAfterModelHook(afterHook), // 可选:LLM调用后执行
agent.WithBeforeToolsHook(beforeToolsHook), // 可选:工具执行前执行
agent.WithAfterToolsHook(afterToolsHook), // 可选:工具执行后执行
agent.WithLogger(logger),
)预构建Agent实现了接口,因此可以作为更大工作流中的一个节点,也可以独立执行:
flowcontract.Nodego
// 独立执行
finalState, err := a.Run(ctx, &initialState, streamFunc)
// 或作为工作流中的节点
flow.NewFlowBuilder(logger).AddNode(a)...核心优势:替代手动连接 + + 。除非需要自定义控制流,否则请使用此模式。
ChatNodeToolsNodeToolConditionPattern 2: Custom Flow with Manual Wiring (Advanced)
模式2:手动连接的自定义工作流(进阶)
Custom nodes for specialized logic + prebuilt + for manual control. Only use when the prebuilt agent doesn't support your flow requirements.
tools.NewToolsmodel.NewModelNodeSTART -> CustomGenNode -> (has tool calls?) -> ToolsNode -> CustomGenNode (loop)
-> (no tool calls) -> ValidationNode -> ENDKey: Uses for the conditional edge. Use instead of the deprecated .
toolcondition.NewToolCondition()model.NewModelNodechat.NewChatNode为特殊逻辑创建自定义节点,结合预构建的 + 实现手动控制。仅当预构建Agent不满足工作流需求时使用。
tools.NewToolsmodel.NewModelNodeSTART -> CustomGenNode -> (是否有工具调用?) -> ToolsNode -> CustomGenNode (循环)
-> (无工具调用) -> ValidationNode -> END核心:使用创建条件边。使用替代已弃用的。
toolcondition.NewToolCondition()model.NewModelNodechat.NewChatNodePattern 3: Multi-Agent via Delegation (Prebuilt)
模式3:基于委托的多Agent(预构建)
Use to create agents that can delegate tasks to specialized sub-agents. The framework auto-creates a tool.
agent.WithSubAgent()delegate_taskgo
researcher, _ := agent.NewAgent(agent.WithName("researcher"), agent.WithModel(llm), agent.WithTools(researchTools))
writer, _ := agent.NewAgent(agent.WithName("writer"), agent.WithModel(llm))
orchestrator, _ := agent.NewAgent(
agent.WithName("orchestrator"),
agent.WithModel(llm),
agent.WithSubAgent("researcher", researcher),
agent.WithSubAgent("writer", writer),
)The orchestrator LLM receives a tool with (enum of registered sub-agents) and parameters. Each sub-agent runs with a fresh state containing just the delegated task.
delegate_taskagent_nametask使用创建可将任务委托给专业子Agent的Agent。框架会自动创建工具。
agent.WithSubAgent()delegate_taskgo
researcher, _ := agent.NewAgent(agent.WithName("researcher"), agent.WithModel(llm), agent.WithTools(researchTools))
writer, _ := agent.NewAgent(agent.WithName("writer"), agent.WithModel(llm))
orchestrator, _ := agent.NewAgent(
agent.WithName("orchestrator"),
agent.WithModel(llm),
agent.WithSubAgent("researcher", researcher),
agent.WithSubAgent("writer", writer),
)编排器LLM会收到一个工具,包含(已注册子Agent的枚举值)和参数。每个子Agent会使用仅包含委托任务的全新状态运行。
delegate_taskagent_nametaskPattern 4: Generation + Validation Loop
模式4:生成+验证循环
LLM generates content, a validation node checks it, and a fix node corrects errors in a loop.
START -> GenerationNode -> ValidationNode -> (has errors?) -> FixNode -> ValidationNode (loop)
-> (valid?) -> ENDLLM生成内容,验证节点检查内容,修复节点纠正错误,形成循环。
START -> GenerationNode -> ValidationNode -> (存在错误?) -> FixNode -> ValidationNode (循环)
-> (验证通过) -> ENDPrebuilt Components
预构建组件
| Component | Import | Purpose |
|---|---|---|
| | RECOMMENDED All-in-one ReAct agent with tools, hooks, validation, delegation |
| | Generic LLM model node (replaces deprecated |
| | Executes tool calls from history |
| | Conditional edge: routes to tools or next node |
| | Creates langchaingo LLM instance |
| | In-memory state checkpointing |
| | Redis-backed state checkpointing (for production) |
| 组件 | 导入路径 | 用途 |
|---|---|---|
| | 推荐 集成ReAct Agent、工具、钩子、验证、委托的一站式组件 |
| | 通用LLM模型节点(替代已弃用的 |
| | 执行历史中的工具调用 |
| | 条件边:路由到工具或下一个节点 |
| | 创建langchaingo LLM实例 |
| | 内存中的状态检查点 |
| | 基于Redis的状态检查点(生产环境使用) |
Tool Implementation
工具实现
Tools implement :
tools.IToolgo
type ITool interface {
Tools(ctx context.Context) []llms.Tool // Return tool definitions
Run(ctx context.Context, toolCall llms.ToolCall) (llms.ToolCallResponse, error) // Execute a single tool call
}The method receives a single and returns a . The framework's handles iterating over tool calls, matching them to the correct tool, and appending responses to history. Tools no longer need to manage state or history directly.
Runllms.ToolCallllms.ToolCallResponseToolsNodeSee references/tools-implementation.md for full tool patterns.
工具需实现接口:
tools.IToolgo
type ITool interface {
Tools(ctx context.Context) []llms.Tool // 返回工具定义
Run(ctx context.Context, toolCall llms.ToolCall) (llms.ToolCallResponse, error) // 执行单个工具调用
}Runllms.ToolCallllms.ToolCallResponseToolsNode完整的工具模式请参考references/tools-implementation.md。
Tool Definition
工具定义
go
func (t *MyTool) Tools(ctx context.Context) []llms.Tool {
return []llms.Tool{{
Type: "function",
Function: &llms.FunctionDefinition{
Name: "my_tool",
Description: "What this tool does",
Parameters: map[string]any{
"type": "object",
"properties": map[string]any{
"query": map[string]any{"type": "string", "description": "Search query"},
},
"required": []string{"query"},
},
},
}}
}go
func (t *MyTool) Tools(ctx context.Context) []llms.Tool {
return []llms.Tool{{
Type: "function",
Function: &llms.FunctionDefinition{
Name: "my_tool",
Description: "该工具的用途",
Parameters: map[string]any{
"type": "object",
"properties": map[string]any{
"query": map[string]any{"type": "string", "description": "搜索查询词"},
},
"required": []string{"query"},
},
},
}}
}Tool Execution
工具执行
go
func (t *MyTool) Run(ctx context.Context, toolCall llms.ToolCall) (llms.ToolCallResponse, error) {
// Parse arguments from the tool call
var args struct {
Query string `json:"query"`
}
if err := json.Unmarshal([]byte(toolCall.FunctionCall.Arguments), &args); err != nil {
return llms.ToolCallResponse{}, xerror.Wrap(err)
}
// Execute tool logic
result, err := t.execute(ctx, args.Query)
if err != nil {
// Return error as tool response (don't fail the agent)
return llms.ToolCallResponse{
ToolCallID: toolCall.ID,
Name: toolCall.FunctionCall.Name,
Content: "Error: " + err.Error(),
}, nil
}
return llms.ToolCallResponse{
ToolCallID: toolCall.ID,
Name: toolCall.FunctionCall.Name,
Content: result,
}, nil
}go
func (t *MyTool) Run(ctx context.Context, toolCall llms.ToolCall) (llms.ToolCallResponse, error) {
// 解析工具调用中的参数
var args struct {
Query string `json:"query"`
}
if err := json.Unmarshal([]byte(toolCall.FunctionCall.Arguments), &args); err != nil {
return llms.ToolCallResponse{}, xerror.Wrap(err)
}
// 执行工具逻辑
result, err := t.execute(ctx, args.Query)
if err != nil {
// 将错误作为工具响应返回(不终止Agent)
return llms.ToolCallResponse{
ToolCallID: toolCall.ID,
Name: toolCall.FunctionCall.Name,
Content: "Error: " + err.Error(),
}, nil
}
return llms.ToolCallResponse{
ToolCallID: toolCall.ID,
Name: toolCall.FunctionCall.Name,
Content: result,
}, nil
}LLM Call Pattern
LLM 调用模式
All LLM calls use :
llms.Model.GenerateContentgo
completion, err := llm.GenerateContent(
ctx,
messages, // []llms.MessageContent
llms.WithTemperature(0.7), // Creativity control
llms.WithTools(toolDefinitions), // Optional tool definitions
llms.WithMaxTokens(4096), // Optional max tokens
)
if err != nil { return xerror.Wrap(err) }
choice := completion.Choices[0]
// choice.Content = text response
// choice.ToolCalls = tool calls (if any)所有LLM调用均使用:
llms.Model.GenerateContentgo
completion, err := llm.GenerateContent(
ctx,
messages, // []llms.MessageContent
llms.WithTemperature(0.7), // 创造力控制
llms.WithTools(toolDefinitions), // 可选工具定义
llms.WithMaxTokens(4096), // 可选最大令牌数
)
if err != nil { return xerror.Wrap(err) }
choice := completion.Choices[0]
// choice.Content = 文本响应
// choice.ToolCalls = 工具调用(如果有)Model Override
模型覆盖
To use a specific model for a node:
go
ctx = context.WithValue(ctx, utils.OverrideModelKey, config.SpecificModel)
completion, err := llm.GenerateContent(ctx, messages, ...)为某个节点使用特定模型:
go
ctx = context.WithValue(ctx, utils.OverrideModelKey, config.SpecificModel)
completion, err := llm.GenerateContent(ctx, messages, ...)Prompt Management
提示词管理
Embedded Templates
嵌入式模板
Use for prompt files:
//go:embedgo
//go:embed prompt.txt
var promptTemplate string使用嵌入提示词文件:
//go:embedgo
//go:embed prompt.txt
var promptTemplate stringTemplate Formatting
模板格式化
Use from langchaingo:
prompts.NewPromptTemplatego
tmpl := prompts.NewPromptTemplate(promptTemplate, []string{"var1", "var2"})
formatted, err := tmpl.Format(map[string]any{"var1": "value1", "var2": "value2"})使用langchaingo的:
prompts.NewPromptTemplatego
tmpl := prompts.NewPromptTemplate(promptTemplate, []string{"var1", "var2"})
formatted, err := tmpl.Format(map[string]any{"var1": "value1", "var2": "value2"})State Management via Metadata
基于Metadata的状态管理
Pattern: Custom State in Metadata
模式:Metadata中的自定义状态
go
const MetadataKeyMyState = "my_agent_state"
type MyAgentState struct {
CurrentStep int
Results []Result
// Keep separate histories for multi-agent
AgentAHistory []llms.MessageContent `json:"-"` // Exclude from serialization
}
func getState(st *state.State) *MyAgentState {
if st.Metadata == nil { st.Metadata = make(map[string]interface{}) }
if v, ok := st.Metadata[MetadataKeyMyState]; ok {
if s, ok := v.(*MyAgentState); ok { return s }
}
s := &MyAgentState{}
st.Metadata[MetadataKeyMyState] = s
return s
}go
const MetadataKeyMyState = "my_agent_state"
type MyAgentState struct {
CurrentStep int
Results []Result
// 为多Agent保留独立历史
AgentAHistory []llms.MessageContent `json:"-"` // 序列化时排除
}
func getState(st *state.State) *MyAgentState {
if st.Metadata == nil { st.Metadata = make(map[string]interface{}) }
if v, ok := st.Metadata[MetadataKeyMyState]; ok {
if s, ok := v.(*MyAgentState); ok { return s }
}
s := &MyAgentState{}
st.Metadata[MetadataKeyMyState] = s
return s
}Multi-Agent History Isolation
多Agent历史隔离
In multi-agent flows, keep each agent's history in Metadata (NOT in ). The shared is used for tool call routing only.
state.Historystate.Historygo
type MultiAgentState struct {
DirectorHistory []llms.MessageContent // Director's conversation
CharacterHistory map[string][]llms.MessageContent // Per-character conversations
}在多Agent工作流中,将每个Agent的历史存储在Metadata中(而非)。共享的仅用于工具调用路由。
state.Historystate.Historygo
type MultiAgentState struct {
DirectorHistory []llms.MessageContent // 导演Agent的对话历史
CharacterHistory map[string][]llms.MessageContent // 每个角色Agent的对话历史
}LLM Best Practices
LLM 最佳实践
See references/llm-best-practices.md for detailed patterns and examples.
详细模式和示例请参考references/llm-best-practices.md。
Context Compression (History Trimming)
上下文压缩(历史截断)
Prebuilt Agent: Useto enable automatic context compression. The agent's built-inagent.WithContextWindow(N)preserves system messages and keeps the last N non-system messages. Manual trimming is only needed for custom flows.contextCompressHook
Trim conversation history to stay within context limits while preserving critical messages.
Sliding Window Pattern (REQUIRED for long-running agents with custom flows):
go
func (s *MyState) TrimHistory() {
const maxLen = 10
const keepRecent = 5
if len(s.History) <= maxLen { return }
preserved := []llms.MessageContent{s.History[0]} // Keep system prompt
// Optionally keep important anchors (e.g., task definition)
startIdx := len(s.History) - keepRecent
preserved = append(preserved, s.History[startIdx:]...)
s.History = preserved
}Rules:
- ALWAYS keep the system prompt (index 0)
- Keep the last N messages for recency
- Optionally preserve anchor messages (task headers, key context)
- Call before each LLM invocation in loop-based agents
预构建Agent:使用启用自动上下文压缩。Agent内置的agent.WithContextWindow(N)会保留系统消息,并保留最后N条非系统消息。仅自定义工作流需要手动截断。contextCompressHook
截断对话历史以控制在上下文限制内,同时保留关键消息。
滑动窗口模式(长运行自定义Agent必须使用):
go
func (s *MyState) TrimHistory() {
const maxLen = 10
const keepRecent = 5
if len(s.History) <= maxLen { return }
preserved := []llms.MessageContent{s.History[0]} // 保留系统提示词
// 可选保留重要锚点(如任务定义)
startIdx := len(s.History) - keepRecent
preserved = append(preserved, s.History[startIdx:]...)
s.History = preserved
}规则:
- 必须保留系统提示词(索引0)
- 保留最后N条消息以保证时效性
- 可选保留锚点消息(任务标题、关键上下文)
- 在循环Agent的每次LLM调用前调用此方法
Tool Response Trimming
工具响应截断
Truncate large tool responses in history to save context window:
go
const maxContentChars = 500
const maxResultsInResponse = 4
// In tool execution:
if len([]rune(content)) > maxContentChars {
content = string([]rune(content)[:maxContentChars]) + "...[content truncated]"
}
// Store full results in Metadata for other nodes
currentState.Metadata["full_results"] = fullResultsAlso trim old tool responses retroactively when history grows:
go
func trimToolResponsesInHistory(history []llms.MessageContent) {
for i, msg := range history {
if msg.Role != llms.ChatMessageTypeTool { continue }
for j, part := range msg.Parts {
resp, ok := part.(llms.ToolCallResponse)
if !ok { continue }
if len([]rune(resp.Content)) > maxToolResponseChars {
resp.Content = string([]rune(resp.Content)[:maxToolResponseChars]) + "\n...[truncated]"
msg.Parts[j] = resp
}
}
history[i] = msg
}
}截断历史中的大型工具响应以节省上下文窗口:
go
const maxContentChars = 500
const maxResultsInResponse = 4
// 在工具执行中:
if len([]rune(content)) > maxContentChars {
content = string([]rune(content)[:maxContentChars]) + "...[内容已截断]"
}
// 将完整结果存储在Metadata中供其他节点使用
currentState.Metadata["full_results"] = fullResults当历史增长时,还可追溯截断旧工具响应:
go
func trimToolResponsesInHistory(history []llms.MessageContent) {
for i, msg := range history {
if msg.Role != llms.ChatMessageTypeTool { continue }
for j, part := range msg.Parts {
resp, ok := part.(llms.ToolCallResponse)
if !ok { continue }
if len([]rune(resp.Content)) > maxToolResponseChars {
resp.Content = string([]rune(resp.Content)[:maxToolResponseChars]) + "\n...[已截断]"
msg.Parts[j] = resp
}
}
history[i] = msg
}
}Prompt Caching
提示词缓存
Structure prompts so static content comes first (cacheable) and dynamic content comes last:
[SYSTEM MESSAGE - Static, cacheable]
- Role definition
- Rules and constraints
- Output format specification
- Few-shot examples
[HUMAN MESSAGE - Dynamic, per-request]
- Current task/input
- Context-specific instructionsRules:
- System prompt MUST be identical across calls for caching to work
- Do NOT embed dynamic data (timestamps, user IDs) in the system prompt
- Put changing context in the LAST human message
- Anthropic: cache_control breakpoints at system message boundaries
- OpenAI: automatic prefix caching on identical message prefixes
构建提示词时,静态内容放在前面(可缓存),动态内容放在后面:
[系统消息 - 静态、可缓存]
- 角色定义
- 规则和约束
- 输出格式规范
- 少量示例
[人类消息 - 动态、按请求变化]
- 当前任务/输入
- 上下文特定指令规则:
- 系统提示词在多次调用中必须完全相同,缓存才能生效
- 不要在系统提示词中嵌入动态数据(时间戳、用户ID)
- 变化的上下文放在最后一条人类消息中
- Anthropic:在系统消息边界设置cache_control断点
- OpenAI:对相同消息前缀自动启用前缀缓存
Attention Raising in Long Context
长上下文注意力引导
Use structural markers to ensure the LLM focuses on critical instructions:
go
// XML tags for structure
prompt := `<rules>
CRITICAL: You MUST respond in valid JSON format.
</rules>
<context>
... long context here ...
</context>
<task>
Generate the output based on the rules above.
</task>`
// Ephemeral reminders (appended to messages but NOT saved to history)
messagesForCall := append(history, llms.MessageContent{
Role: llms.ChatMessageTypeHuman,
Parts: []llms.ContentPart{llms.TextContent{Text: "REMINDER: Respond in valid JSON only."}},
})Rules:
- Place the MOST IMPORTANT instructions at the START and END of the prompt (primacy/recency effect)
- Use XML tags (,
<rules>,<context>) for structural boundaries<output> - Add ephemeral reminders at the end of message lists (not saved to history)
- Use ,
IMPORTANT:,CRITICAL:for emphasis on key constraintsMUST - Repeat key format instructions near the end of long prompts
使用结构化标记确保LLM关注关键指令:
go
// 使用XML标签构建结构
prompt := `<rules>
CRITICAL: 你必须以有效的JSON格式响应。
</rules>
<context>
... 长上下文内容 ...
</context>
<task>
根据上述规则生成输出。
</task>`
// 临时提醒(追加到消息中但不保存到历史)
messagesForCall := append(history, llms.MessageContent{
Role: llms.ChatMessageTypeHuman,
Parts: []llms.ContentPart{llms.TextContent{Text: "REMINDER: 仅以有效的JSON响应。"}},
})规则:
- 最重要的指令放在提示词的开头和结尾(首因/近因效应)
- 使用XML标签(、
<rules>、<context>)划分结构边界<output> - 在消息列表末尾添加临时提醒(不保存到历史)
- 使用、
IMPORTANT:、CRITICAL:强调关键约束MUST - 在长提示词的末尾重复关键格式指令
MCP (Model Context Protocol)
MCP(模型上下文协议)
MCP provides a standard protocol for LLM-tool communication. In Go agents, MCP is implemented through the tool interface:
- Tools define their schema via JSON Schema parameters (matching MCP tool definitions)
- Tool results are returned as messages (matching MCP tool results)
ToolCallResponse - The agent framework handles the MCP message flow: LLM -> tool call -> tool result -> LLM
For MCP server integration, wrap external MCP servers as implementations that proxy calls to the MCP server.
tools.IToolMCP为LLM与工具的通信提供标准协议。在Go Agent中,MCP通过工具接口实现:
- 工具通过JSON Schema参数定义其 schema(匹配MCP工具定义)
- 工具结果以消息返回(匹配MCP工具结果)
ToolCallResponse - Agent框架处理MCP消息流:LLM -> 工具调用 -> 工具结果 -> LLM
如需集成MCP服务器,可将外部MCP服务器包装为实现,代理调用到MCP服务器。
tools.IToolHuman-in-the-Loop (Interrupt / Resume)
人机协同(中断/恢复)
The framework supports interrupting agent execution to request human input, then resuming with the provided value.
框架支持中断Agent执行以请求人类输入,然后使用提供的值恢复执行。
Interrupting from a Node
从节点中断
Use inside a node's to pause execution:
flowcontract.Interrupt()Run()go
func (n *ApprovalNode) Run(ctx context.Context, currentState *state.State, _ flowcontract.StreamFunc) error {
// Prepare payload describing what approval is needed
currentState.SetInterruptPayload(map[string]any{
"question": "Do you approve this action?",
"details": actionDetails,
})
return flowcontract.Interrupt(currentState.Metadata["interrupt_payload"])
}在节点的方法中使用暂停执行:
Run()flowcontract.Interrupt()go
func (n *ApprovalNode) Run(ctx context.Context, currentState *state.State, _ flowcontract.StreamFunc) error {
// 准备描述审批需求的负载
currentState.SetInterruptPayload(map[string]any{
"question": "你是否批准此操作?",
"details": actionDetails,
})
return flowcontract.Interrupt(currentState.Metadata["interrupt_payload"])
}Checking for Interrupts
检查中断
The caller checks whether the flow was interrupted:
go
finalState, err := compiledFlow.Exec(ctx, initialState, streamFunc)
if interruptErr, ok := flowcontract.IsInterrupt(err); ok {
// Flow paused — interruptErr.Payload contains the interrupt payload
// Present to user, collect input, then resume
}调用者检查工作流是否被中断:
go
finalState, err := compiledFlow.Exec(ctx, initialState, streamFunc)
if interruptErr, ok := flowcontract.IsInterrupt(err); ok {
// 工作流已暂停 —— interruptErr.Payload包含中断负载
// 展示给用户,收集输入,然后恢复
}Resuming with a Value
使用值恢复
Resume the flow from where it was interrupted by providing the human's response:
go
finalState, err := compiledFlow.ResumeWithValue(ctx, threadID, userResponse, streamFunc)Inside the node, access the resume value via :
state.GetResumeValue()go
func (n *ApprovalNode) Run(ctx context.Context, currentState *state.State, _ flowcontract.StreamFunc) error {
// Check if we're resuming from an interrupt
if resumeValue := currentState.GetResumeValue(); resumeValue != nil {
approval := resumeValue.(string)
if approval == "approved" {
// Proceed with the action
return nil
}
// Handle rejection
return xerror.New("action rejected by user")
}
// First visit — interrupt for approval
currentState.SetInterruptPayload(map[string]any{"question": "Approve?"})
return flowcontract.Interrupt(currentState.Metadata["interrupt_payload"])
}提供人类的响应,从中断处恢复工作流:
go
finalState, err := compiledFlow.ResumeWithValue(ctx, threadID, userResponse, streamFunc)在节点内部,通过访问恢复值:
state.GetResumeValue()go
func (n *ApprovalNode) Run(ctx context.Context, currentState *state.State, _ flowcontract.StreamFunc) error {
// 检查是否从中断恢复
if resumeValue := currentState.GetResumeValue(); resumeValue != nil {
approval := resumeValue.(string)
if approval == "approved" {
// 继续执行操作
return nil
}
// 处理拒绝
return xerror.New("操作被用户拒绝")
}
// 首次访问 —— 中断以请求审批
currentState.SetInterruptPayload(map[string]any{"question": "是否批准?"})
return flowcontract.Interrupt(currentState.Metadata["interrupt_payload"])
}Requirements
要求
- A MUST be set on the flow for interrupt/resume to work (state is persisted between calls)
Checkpointer - Use for production,
checkpointer.NewRedisCheckpointer()for developmentcheckpointer.NewInMemoryCheckpointer() - identifies the conversation thread and is used by the checkpointer to restore state
threadID
- 中断/恢复工作流必须设置(状态在多次调用间持久化)
Checkpointer - 生产环境使用,开发环境使用
checkpointer.NewRedisCheckpointer()checkpointer.NewInMemoryCheckpointer() - 标识对话线程,检查点通过它恢复状态
threadID
LangChain Chains (Non-Agent)
LangChain Chains(非Agent)
For simple single-shot LLM tasks (no loops, no tools), use langchaingo chains:
go
type MyChain struct {
llm llms.Model
memory schema.Memory
}
func (c *MyChain) Call(ctx context.Context, inputs map[string]any, options ...chains.ChainCallOption) (map[string]any, error) {
// Format prompt from template
// Call LLM
// Parse and return output
}
func (c *MyChain) GetMemory() schema.Memory { return c.memory }
func (c *MyChain) GetInputKeys() []string { return []string{"input_key"} }
func (c *MyChain) GetOutputKeys() []string { return []string{"output_key"} }See references/langchain-chains.md for examples.
对于简单的单次LLM任务(无循环、无工具),使用langchaingo chains:
go
type MyChain struct {
llm llms.Model
memory schema.Memory
}
func (c *MyChain) Call(ctx context.Context, inputs map[string]any, options ...chains.ChainCallOption) (map[string]any, error) {
// 从模板格式化提示词
// 调用LLM
// 解析并返回输出
}
func (c *MyChain) GetMemory() schema.Memory { return c.memory }
func (c *MyChain) GetInputKeys() []string { return []string{"input_key"} }
func (c *MyChain) GetOutputKeys() []string { return []string{"output_key"} }示例请参考references/langchain-chains.md。
Flow Orchestration from Domain/Application
领域/应用层的工作流编排
Agents are constructed in domain/application services and executed via :
flow.Execgo
// In domain service constructor
overviewAgent, err := overviewagent.NewOverviewAgent(
overviewagent.WithLogger(logger),
overviewagent.WithLLM(llm),
overviewagent.WithConfig(config.LLM),
)
// In domain service method
initialState := state.State{
Metadata: map[string]interface{}{
overviewagent.MetadataKeyState: myState,
},
History: []llms.MessageContent{},
}
finalState, err := s.overviewAgent.Exec(ctx, initialState, streamCallback)
// Extract results from finalState.Metadata
result := finalState.Metadata[overviewagent.MetadataKeyState].(*MyState)Agent在领域/应用服务中构建,通过执行:
flow.Execgo
// 在领域服务构造函数中
overviewAgent, err := overviewagent.NewOverviewAgent(
overviewagent.WithLogger(logger),
overviewagent.WithLLM(llm),
overviewagent.WithConfig(config.LLM),
)
// 在领域服务方法中
initialState := state.State{
Metadata: map[string]interface{}{
overviewagent.MetadataKeyState: myState,
},
History: []llms.MessageContent{},
}
finalState, err := s.overviewAgent.Exec(ctx, initialState, streamCallback)
// 从finalState.Metadata中提取结果
result := finalState.Metadata[overviewagent.MetadataKeyState].(*MyState)Agent Construction Pattern (Functional Options)
Agent 构建模式(函数式选项)
Preferfor standard agents. The functional options pattern below is for custom flows or wrapping the prebuilt agent in a domain-specific factory.agent.NewAgent()
Every agent factory follows the functional options pattern:
go
type Opt struct {
logger logger.ILogger
llm llms.Model
config *config.LLMConfig
}
type Option func(*Opt)
func WithLogger(l logger.ILogger) Option { return func(o *Opt) { o.logger = l } }
func WithLLM(l llms.Model) Option { return func(o *Opt) { o.llm = l } }
func WithConfig(c *config.LLMConfig) Option { return func(o *Opt) { o.config = c } }
func NewMyAgent(options ...Option) (*flow.Flow, error) {
opts := &Opt{}
for _, o := range options { o(opts) }
// Validate required options
if opts.logger == nil { return nil, xerror.New("logger is required") }
if opts.llm == nil { return nil, xerror.New("llm is required") }
// Build and compile flow
return flow.NewFlowBuilder(opts.logger).
SetName("my_agent").
// ... AddNode, AddEdge ...
Compile()
}**优先使用**构建标准Agent。以下函数式选项模式适用于自定义工作流,或在领域特定工厂中包装预构建Agent。agent.NewAgent()
每个Agent工厂遵循函数式选项模式:
go
type Opt struct {
logger logger.ILogger
llm llms.Model
config *config.LLMConfig
}
type Option func(*Opt)
func WithLogger(l logger.ILogger) Option { return func(o *Opt) { o.logger = l } }
func WithLLM(l llms.Model) Option { return func(o *Opt) { o.llm = l } }
func WithConfig(c *config.LLMConfig) Option { return func(o *Opt) { o.config = c } }
func NewMyAgent(options ...Option) (*flow.Flow, error) {
opts := &Opt{}
for _, o := range options { o(opts) }
// 验证必填选项
if opts.logger == nil { return nil, xerror.New("logger是必填项") }
if opts.llm == nil { return nil, xerror.New("llm是必填项") }
// 构建并编译工作流
return flow.NewFlowBuilder(opts.logger).
SetName("my_agent").
// ... 添加节点、边 ...
Compile()
}Key Import Paths
关键导入路径
go
import (
// golanggraph core
"github.com/futurxlab/golanggraph/flow" // flow.NewFlowBuilder, flow.Flow, flow.StartNode, flow.EndNode
"github.com/futurxlab/golanggraph/edge" // edge.Edge
"github.com/futurxlab/golanggraph/state" // state.State
"github.com/futurxlab/golanggraph/checkpointer" // checkpointer.NewInMemoryCheckpointer(), NewRedisCheckpointer()
flowcontract "github.com/futurxlab/golanggraph/contract" // StreamFunc, FlowStreamEvent, Node, Interrupt(), IsInterrupt()
// kiwi-lib (shared utilities — moved from golanggraph)
"github.com/Yet-Another-AI-Project/kiwi-lib/logger" // logger.ILogger
"github.com/Yet-Another-AI-Project/kiwi-lib/xerror" // xerror.Wrap, xerror.New
// golanggraph prebuilt
"github.com/futurxlab/golanggraph/prebuilt/agent" // agent.NewAgent (RECOMMENDED)
"github.com/futurxlab/golanggraph/prebuilt/node/model" // model.NewModelNode (replaces chat.NewChatNode)
"github.com/futurxlab/golanggraph/prebuilt/node/tools" // tools.NewTools, tools.ITool
"github.com/futurxlab/golanggraph/prebuilt/edge/toolcondition" // toolcondition.NewToolCondition
// langchaingo
"github.com/tmc/langchaingo/llms" // llms.Model, MessageContent, Tool, ToolCall
"github.com/tmc/langchaingo/prompts" // prompts.NewPromptTemplate
"github.com/tmc/langchaingo/chains" // chains (for chain pattern)
"github.com/tmc/langchaingo/memory" // memory.NewSimple()
"github.com/tmc/langchaingo/schema" // schema.Memory
)go
import (
// golanggraph核心
"github.com/futurxlab/golanggraph/flow" // flow.NewFlowBuilder, flow.Flow, flow.StartNode, flow.EndNode
"github.com/futurxlab/golanggraph/edge" // edge.Edge
"github.com/futurxlab/golanggraph/state" // state.State
"github.com/futurxlab/golanggraph/checkpointer" // checkpointer.NewInMemoryCheckpointer(), NewRedisCheckpointer()
flowcontract "github.com/futurxlab/golanggraph/contract" // StreamFunc, FlowStreamEvent, Node, Interrupt(), IsInterrupt()
// kiwi-lib(共享工具类 —— 从golanggraph迁移)
"github.com/Yet-Another-AI-Project/kiwi-lib/logger" // logger.ILogger
"github.com/Yet-Another-AI-Project/kiwi-lib/xerror" // xerror.Wrap, xerror.New
// golanggraph预构建组件
"github.com/futurxlab/golanggraph/prebuilt/agent" // agent.NewAgent(推荐)
"github.com/futurxlab/golanggraph/prebuilt/node/model" // model.NewModelNode(替代chat.NewChatNode)
"github.com/futurxlab/golanggraph/prebuilt/node/tools" // tools.NewTools, tools.ITool
"github.com/futurxlab/golanggraph/prebuilt/edge/toolcondition" // toolcondition.NewToolCondition
// langchaingo
"github.com/tmc/langchaingo/llms" // llms.Model, MessageContent, Tool, ToolCall
"github.com/tmc/langchaingo/prompts" // prompts.NewPromptTemplate
"github.com/tmc/langchaingo/chains" // chains(链式模式)
"github.com/tmc/langchaingo/memory" // memory.NewSimple()
"github.com/tmc/langchaingo/schema" // schema.Memory
)Shared Utilities
共享工具类
go
// agent/utils/tool_utils.go
func HasToolCalls(choice *llms.ContentChoice) bool
func HasToolCallsInHistory(st state.State) bool
func CreateToolCallMessage(choice *llms.ContentChoice) llms.MessageContent
// Reusable format reminders
var JsonReminder = llms.MessageContent{
Role: llms.ChatMessageTypeHuman,
Parts: []llms.ContentPart{llms.TextContent{Text: "IMPORTANT: Respond ONLY with valid JSON."}},
}go
// agent/utils/tool_utils.go
func HasToolCalls(choice *llms.ContentChoice) bool
func HasToolCallsInHistory(st state.State) bool
func CreateToolCallMessage(choice *llms.ContentChoice) llms.MessageContent
// 可复用的格式提醒
var JsonReminder = llms.MessageContent{
Role: llms.ChatMessageTypeHuman,
Parts: []llms.ContentPart{llms.TextContent{Text: "IMPORTANT: 仅以有效的JSON响应。"}},
}Coding Checklist
编码检查清单
Before submitting agent code, verify:
- Prefer for standard chat-with-tools agents over manual wiring
agent.NewAgent() - All custom nodes implement and
Name()fromRun()flowcontract.Node - Agent factory uses functional options pattern with validation
- All use
return err(never raw errors) — import fromxerror.Wrap(err)kiwi-lib/xerror - Flow starts with and ends with
flow.StartNodeflow.EndNode - Long-running agents have history trimming (or use for prebuilt agents)
WithContextWindow() - Tool responses are truncated in history; full results stored in Metadata
- Tool uses the new signature:
Run()Run(ctx, llms.ToolCall) (llms.ToolCallResponse, error) - Prompts use for templates
//go:embed - Static prompt content is in system messages (cacheable)
- Dynamic content is in the last human message
- Conditional edges list ALL possible targets in
ConditionalTo - MaxToolCalls is set (via or hook) to prevent infinite tool loops
agent.WithMaxToolCalls() - Streaming events use with proper nil checks
streamFunc - State is saved to Metadata after mutations in nodes
- Human-in-the-Loop flows have a set
Checkpointer - and
loggerare imported fromxerror, NOT fromkiwi-libgolanggraph
提交Agent代码前,请验证:
- **优先使用**构建标准带工具对话Agent,而非手动连接
agent.NewAgent() - 所有自定义节点实现的
flowcontract.Node和Name()方法Run() - Agent工厂使用带验证的函数式选项模式
- 所有使用
return err(绝不返回原始错误)—— 从xerror.Wrap(err)导入kiwi-lib/xerror - 工作流以开头,以
flow.StartNode结尾flow.EndNode - 长运行Agent实现历史截断(或预构建Agent使用)
WithContextWindow() - 工具响应在历史中被截断;完整结果存储在Metadata中
- 工具使用新签名:
Run()Run(ctx, llms.ToolCall) (llms.ToolCallResponse, error) - 提示词使用嵌入模板
//go:embed - 静态提示词内容放在系统消息中(可缓存)
- 动态内容放在最后一条人类消息中
- 条件边在中列出所有可能的目标
ConditionalTo - 设置MaxToolCalls(通过或钩子)防止无限工具循环
agent.WithMaxToolCalls() - 流式事件使用并进行适当的nil检查
streamFunc - 节点中修改状态后,将状态保存到Metadata
- 人机协同工作流设置了
Checkpointer - 和
logger从xerror导入,而非kiwi-libgolanggraph