eino-compose

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Orchestration Overview

编排概述

The
github.com/cloudwego/eino/compose
package provides three orchestration APIs:
APITopologyCyclesType Alignment
GraphDirected graphYes (Pregel mode) / No (DAG mode)Whole input/output
ChainLinear sequenceNoWhole input/output
WorkflowDAGNoField-level mapping
*Chain is implemented on top of Graph in Pregel mode but enforces linear topology.
All three compile into
Runnable[I, O]
which exposes Invoke, Stream, Collect, and Transform.
go
import "github.com/cloudwego/eino/compose"
github.com/cloudwego/eino/compose
包提供了三种编排API:
API拓扑结构是否支持循环类型对齐方式
Graph有向图是(Pregel模式)/ 否(DAG模式)整体输入输出
Chain线性序列整体输入输出
WorkflowDAG字段级映射
*Chain基于Pregel模式的Graph实现,但强制使用线性拓扑结构。
三者最终都会编译为
Runnable[I, O]
类型,提供Invoke、Stream、Collect、Transform四种执行方法。
go
import "github.com/cloudwego/eino/compose"

Choosing an API

API选型

  • Chain -- sequential pipeline: prompt -> model -> parser. Simplest API.
  • Graph -- need branching, loops (ReAct agent), or fan-out/fan-in.
  • Workflow -- need field-level mapping between nodes with different struct types; DAG only.
  • Chain -- 适用于顺序流水线:提示词 -> 模型 -> 解析器,是最简单的API。
  • Graph -- 适用于需要分支、循环(ReAct agent)、扇入/扇出的场景。
  • Workflow -- 适用于需要在不同结构体类型的节点之间做字段级映射的场景,仅支持DAG。

Graph Quick Reference

Graph快速参考

go
g := compose.NewGraph[InputType, OutputType]()

// Add nodes
g.AddChatModelNode("model", chatModel)
g.AddChatTemplateNode("tmpl", tmpl)
g.AddToolsNode("tools", toolsNode)
g.AddLambdaNode("fn", compose.InvokableLambda(myFunc))
g.AddPassthroughNode("pass")
g.AddGraphNode("sub", subGraph)

// Connect nodes
g.AddEdge(compose.START, "tmpl")
g.AddEdge("tmpl", "model")
g.AddEdge("model", compose.END)

// Branch (conditional routing)
branch := compose.NewGraphBranch(conditionFn, map[string]bool{"a": true, "b": true})
g.AddBranch("model", branch)

// Compile and run
r, err := g.Compile(ctx)
out, err := r.Invoke(ctx, input)
go
g := compose.NewGraph[InputType, OutputType]()

// Add nodes
g.AddChatModelNode("model", chatModel)
g.AddChatTemplateNode("tmpl", tmpl)
g.AddToolsNode("tools", toolsNode)
g.AddLambdaNode("fn", compose.InvokableLambda(myFunc))
g.AddPassthroughNode("pass")
g.AddGraphNode("sub", subGraph)

// Connect nodes
g.AddEdge(compose.START, "tmpl")
g.AddEdge("tmpl", "model")
g.AddEdge("model", compose.END)

// Branch (conditional routing)
branch := compose.NewGraphBranch(conditionFn, map[string]bool{"a": true, "b": true})
g.AddBranch("model", branch)

// Compile and run
r, err := g.Compile(ctx)
out, err := r.Invoke(ctx, input)

Chain Quick Reference

Chain快速参考

go
chain := compose.NewChain[InputType, OutputType]()
chain.
    AppendChatTemplate(tmpl).
    AppendChatModel(model).
    AppendLambda(compose.InvokableLambda(parseFn))

r, err := chain.Compile(ctx)
out, err := r.Invoke(ctx, input)
Append methods:
AppendChatModel
,
AppendChatTemplate
,
AppendToolsNode
,
AppendLambda
,
AppendGraph
,
AppendParallel
,
AppendBranch
,
AppendPassthrough
,
AppendRetriever
,
AppendEmbedding
,
AppendLoader
,
AppendIndexer
,
AppendDocumentTransformer
.
go
chain := compose.NewChain[InputType, OutputType]()
chain.
    AppendChatTemplate(tmpl).
    AppendChatModel(model).
    AppendLambda(compose.InvokableLambda(parseFn))

r, err := chain.Compile(ctx)
out, err := r.Invoke(ctx, input)
可用的Append方法包括:
AppendChatModel
AppendChatTemplate
AppendToolsNode
AppendLambda
AppendGraph
AppendParallel
AppendBranch
AppendPassthrough
AppendRetriever
AppendEmbedding
AppendLoader
AppendIndexer
AppendDocumentTransformer

Workflow Quick Reference

Workflow快速参考

go
wf := compose.NewWorkflow[InputStruct, OutputStruct]()

wf.AddLambdaNode("node1", compose.InvokableLambda(fn1)).
    AddInput(compose.START, compose.MapFields("FieldA", "InputField"))

wf.AddLambdaNode("node2", compose.InvokableLambda(fn2)).
    AddInput("node1", compose.ToField("Result"))

wf.End().AddInput("node2")

r, err := wf.Compile(ctx)
Field mapping helpers:
MapFields
,
ToField
,
FromField
,
MapFieldPaths
,
ToFieldPath
,
FromFieldPath
.
go
wf := compose.NewWorkflow[InputStruct, OutputStruct]()

wf.AddLambdaNode("node1", compose.InvokableLambda(fn1)).
    AddInput(compose.START, compose.MapFields("FieldA", "InputField"))

wf.AddLambdaNode("node2", compose.InvokableLambda(fn2)).
    AddInput("node1", compose.ToField("Result"))

wf.End().AddInput("node2")

r, err := wf.Compile(ctx)
字段映射辅助方法包括:
MapFields
ToField
FromField
MapFieldPaths
ToFieldPath
FromFieldPath

Stream Programming

流式编程

Four interaction modes on
Runnable[I, O]
:
ModeInputOutputLambda Constructor
Invoke
I
O
compose.InvokableLambda
Stream
I
*StreamReader[O]
compose.StreamableLambda
Collect
*StreamReader[I]
O
compose.CollectableLambda
Transform
*StreamReader[I]
*StreamReader[O]
compose.TransformableLambda
Framework auto-converts between modes:
  • Invoke call: all internal nodes run in Invoke mode.
  • Stream/Collect/Transform call: all internal nodes run in Transform mode; missing modes are auto-filled.
Stream primitives live in
github.com/cloudwego/eino/schema
:
go
sr, sw := schema.Pipe[T](capacity)
// sw.Send(chunk, nil); sw.Close()
// chunk, err := sr.Recv(); sr.Close()
Runnable[I, O]
支持四种交互模式:
模式输入输出Lambda构造器
Invoke
I
O
compose.InvokableLambda
Stream
I
*StreamReader[O]
compose.StreamableLambda
Collect
*StreamReader[I]
O
compose.CollectableLambda
Transform
*StreamReader[I]
*StreamReader[O]
compose.TransformableLambda
框架会自动在不同模式之间转换:
  • Invoke 调用:所有内部节点以Invoke模式运行。
  • Stream/Collect/Transform 调用:所有内部节点以Transform模式运行;缺失的模式实现会自动填充。
流式原语定义在
github.com/cloudwego/eino/schema
包中:
go
sr, sw := schema.Pipe[T](capacity)
// sw.Send(chunk, nil); sw.Close()
// chunk, err := sr.Recv(); sr.Close()

Compile & Run

编译与运行

go
r, err := g.Compile(ctx,
    compose.WithGraphName("my_graph"),
    compose.WithNodeTriggerMode(compose.AllPredecessor), // DAG mode
)

// Non-streaming
out, err := r.Invoke(ctx, input)

// Streaming
stream, err := r.Stream(ctx, input)
defer stream.Close()
for {
    chunk, err := stream.Recv()
    if err == io.EOF { break }
    if err != nil { return err }
    process(chunk)
}
go
r, err := g.Compile(ctx,
    compose.WithGraphName("my_graph"),
    compose.WithNodeTriggerMode(compose.AllPredecessor), // DAG mode
)

// 非流式执行
out, err := r.Invoke(ctx, input)

// 流式执行
stream, err := r.Stream(ctx, input)
defer stream.Close()
for {
    chunk, err := stream.Recv()
    if err == io.EOF { break }
    if err != nil { return err }
    process(chunk)
}

State Graph

状态图

Share state across nodes within a single request:
go
g := compose.NewGraph[string, string](compose.WithGenLocalState(func(ctx context.Context) *MyState {
    return &MyState{}
}))

g.AddLambdaNode("node", lambda,
    compose.WithStatePreHandler(func(ctx context.Context, in string, state *MyState) (string, error) {
        // read/write state before node executes
        return in, nil
    }),
    compose.WithStatePostHandler(func(ctx context.Context, out string, state *MyState) (string, error) {
        // read/write state after node executes
        return out, nil
    }),
)
在单个请求的多个节点之间共享状态:
go
g := compose.NewGraph[string, string](compose.WithGenLocalState(func(ctx context.Context) *MyState {
    return &MyState{}
}))

g.AddLambdaNode("node", lambda,
    compose.WithStatePreHandler(func(ctx context.Context, in string, state *MyState) (string, error) {
        // 在节点执行前读写状态
        return in, nil
    }),
    compose.WithStatePostHandler(func(ctx context.Context, out string, state *MyState) (string, error) {
        // 在节点执行后读写状态
        return out, nil
    }),
)

Instructions to Agent

给Agent的使用指引

When helping users build orchestration:
  1. Default to Graph for most use cases. Use Chain only for simple linear pipelines. Use Workflow when field-level mapping between different struct types is needed.
  2. Always show the Compile step --
    g.Compile(ctx)
    returns
    Runnable[I,O]
    .
  3. Always close StreamReaders -- use
    defer sr.Close()
    immediately after obtaining a stream.
  4. Upstream output type must match downstream input type (or use
    WithInputKey
    /
    WithOutputKey
    for map conversion).
  5. For cyclic graphs (e.g., ReAct agent), use default Pregel mode (
    AnyPredecessor
    ). For DAGs, set
    AllPredecessor
    .
  6. Use
    compose.WithCallbacks(handler)
    to inject logging/tracing at runtime.
  7. Use
    compose.WithCheckPointStore(store)
    with interrupt nodes for pause/resume workflows.
在帮助用户构建编排逻辑时,请遵循以下规则:
  1. 大多数场景默认使用Graph;仅简单线性流水线使用Chain;需要不同结构体间字段级映射时使用Workflow。
  2. 始终展示Compile步骤 --
    g.Compile(ctx)
    会返回
    Runnable[I,O]
    实例。
  3. 始终要关闭StreamReaders -- 获取流后立即使用
    defer sr.Close()
  4. 上游输出类型必须匹配下游输入类型(也可使用
    WithInputKey
    /
    WithOutputKey
    做Map类型转换)。
  5. 对于循环图(例如ReAct agent),使用默认的Pregel模式(
    AnyPredecessor
    );对于DAG,设置
    AllPredecessor
  6. 使用
    compose.WithCallbacks(handler)
    在运行时注入日志/链路追踪能力。
  7. 结合
    compose.WithCheckPointStore(store)
    和中断节点实现工作流的暂停/恢复。

Reference Files

参考文件

Read these files on-demand for detailed API, examples, and advanced usage:
  • reference/graph.md -- Full Graph API, branches, state graph, cyclic graph, complete ReAct example
  • reference/chain.md -- Chain API, when to use, parallel/branch in chain
  • reference/workflow.md -- Workflow API, field-level mapping helpers, constraints
  • reference/stream.md -- StreamReader/Writer, Pipe/Copy/Merge, lambda constructors, auto-conversion rules
  • reference/callback.md -- Callback timings, handler registration, trigger rules, tracing example
  • reference/call-option.md -- Per-request CallOption, component-type options, node targeting
  • reference/checkpoint-and-state.md -- CheckPointStore, interrupt/resume, state management
按需读取以下文件获取详细API、示例和高级用法:
  • reference/graph.md -- 完整Graph API、分支、状态图、循环图、完整ReAct示例
  • reference/chain.md -- Chain API、适用场景、Chain中的并行/分支实现
  • reference/workflow.md -- Workflow API、字段映射辅助方法、使用约束
  • reference/stream.md -- StreamReader/Writer、Pipe/Copy/Merge、Lambda构造器、自动转换规则
  • reference/callback.md -- 回调时机、处理器注册、触发规则、链路追踪示例
  • reference/call-option.md -- 单请求CallOption、组件级选项、节点定向配置
  • reference/checkpoint-and-state.md -- CheckPointStore、中断/恢复、状态管理