Loading...
Loading...
Eino orchestration with Graph, Chain, and Workflow. Use when a user needs to build multi-step pipelines, compose components into executable graphs, handle streaming between nodes, use branching or parallel execution, manage state with checkpoints, or understand the Runnable abstraction. Covers Graph (directed graph with cycles), Chain (linear sequential), and Workflow (DAG with field mapping).
npx skill4agent add cloudwego/eino-ext eino-composegithub.com/cloudwego/eino/compose| API | Topology | Cycles | Type Alignment |
|---|---|---|---|
| Graph | Directed graph | Yes (Pregel mode) / No (DAG mode) | Whole input/output |
| Chain | Linear sequence | No | Whole input/output |
| Workflow | DAG | No | Field-level mapping |
Runnable[I, O]import "github.com/cloudwego/eino/compose"g := compose.NewGraph[InputType, OutputType]()
// Add nodes
g.AddChatModelNode("model", chatModel)
g.AddChatTemplateNode("tmpl", tmpl)
g.AddToolsNode("tools", toolsNode)
g.AddLambdaNode("fn", compose.InvokableLambda(myFunc))
g.AddPassthroughNode("pass")
g.AddGraphNode("sub", subGraph)
// Connect nodes
g.AddEdge(compose.START, "tmpl")
g.AddEdge("tmpl", "model")
g.AddEdge("model", compose.END)
// Branch (conditional routing)
branch := compose.NewGraphBranch(conditionFn, map[string]bool{"a": true, "b": true})
g.AddBranch("model", branch)
// Compile and run
r, err := g.Compile(ctx)
out, err := r.Invoke(ctx, input)chain := compose.NewChain[InputType, OutputType]()
chain.
AppendChatTemplate(tmpl).
AppendChatModel(model).
AppendLambda(compose.InvokableLambda(parseFn))
r, err := chain.Compile(ctx)
out, err := r.Invoke(ctx, input)AppendChatModelAppendChatTemplateAppendToolsNodeAppendLambdaAppendGraphAppendParallelAppendBranchAppendPassthroughAppendRetrieverAppendEmbeddingAppendLoaderAppendIndexerAppendDocumentTransformerwf := compose.NewWorkflow[InputStruct, OutputStruct]()
wf.AddLambdaNode("node1", compose.InvokableLambda(fn1)).
AddInput(compose.START, compose.MapFields("FieldA", "InputField"))
wf.AddLambdaNode("node2", compose.InvokableLambda(fn2)).
AddInput("node1", compose.ToField("Result"))
wf.End().AddInput("node2")
r, err := wf.Compile(ctx)MapFieldsToFieldFromFieldMapFieldPathsToFieldPathFromFieldPathRunnable[I, O]| Mode | Input | Output | Lambda Constructor |
|---|---|---|---|
| Invoke | | | |
| Stream | | | |
| Collect | | | |
| Transform | | | |
github.com/cloudwego/eino/schemasr, sw := schema.Pipe[T](capacity)
// sw.Send(chunk, nil); sw.Close()
// chunk, err := sr.Recv(); sr.Close()r, err := g.Compile(ctx,
compose.WithGraphName("my_graph"),
compose.WithNodeTriggerMode(compose.AllPredecessor), // DAG mode
)
// Non-streaming
out, err := r.Invoke(ctx, input)
// Streaming
stream, err := r.Stream(ctx, input)
defer stream.Close()
for {
chunk, err := stream.Recv()
if err == io.EOF { break }
if err != nil { return err }
process(chunk)
}g := compose.NewGraph[string, string](compose.WithGenLocalState(func(ctx context.Context) *MyState {
return &MyState{}
}))
g.AddLambdaNode("node", lambda,
compose.WithStatePreHandler(func(ctx context.Context, in string, state *MyState) (string, error) {
// read/write state before node executes
return in, nil
}),
compose.WithStatePostHandler(func(ctx context.Context, out string, state *MyState) (string, error) {
// read/write state after node executes
return out, nil
}),
)g.Compile(ctx)Runnable[I,O]defer sr.Close()WithInputKeyWithOutputKeyAnyPredecessorAllPredecessorcompose.WithCallbacks(handler)compose.WithCheckPointStore(store)