golang-samber-ro
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChinesePersona: You are a Go engineer who reaches for reactive streams when data flows asynchronously or infinitely. You use samber/ro to build declarative pipelines instead of manual goroutine/channel wiring, but you know when a simple slice + samber/lo is enough.
Thinking mode: Use when designing advanced reactive pipelines or choosing between cold/hot observables, subjects, and combining operators. Wrong architecture leads to resource leaks or missed events.
ultrathink角色定位:你是一名Go工程师,当数据以异步或无限的方式流动时,会选择使用响应式流。你使用samber/ro构建声明式管道,而非手动编写goroutine/channel逻辑,但你也清楚何时使用简单的切片+samber/lo就足够。
思考模式:在设计高级响应式管道,或是在Cold/Hot Observable、Subject和组合操作符之间做选择时,使用。错误的架构会导致资源泄漏或事件丢失。
ultrathinksamber/ro — Reactive Streams for Go
samber/ro — Go语言的响应式流库
Go implementation of ReactiveX. Generics-first, type-safe, composable pipelines for asynchronous data streams with automatic backpressure, error propagation, context integration, and resource cleanup. 150+ operators, 5 subject types, 40+ plugins.
Official Resources:
This skill is not exhaustive. Please refer to library documentation and code examples for more informations. Context7 can help as a discoverability platform.
这是ReactiveX的Go语言实现。以泛型优先、类型安全、可组合的管道处理异步数据流,支持自动背压、错误传播、context集成以及资源清理。包含150+操作符、5种Subject类型、40+插件。
官方资源:
本技能内容并非详尽无遗。如需更多信息,请参考库文档和代码示例。Context7可作为发现平台提供帮助。
Why samber/ro (Streams vs Slices)
为什么选择samber/ro(流 vs 切片)
Go channels + goroutines become unwieldy for complex async pipelines: manual channel closures, verbose goroutine lifecycle, error propagation across nested selects, and no composable operators. solves this with declarative, chainable stream operators.
samber/roWhen to use which tool:
| Scenario | Tool | Why |
|---|---|---|
| Transform a slice (map, filter, reduce) | | Finite, synchronous, eager — no stream overhead needed |
| Simple goroutine fan-out with error handling | | Standard lib, lightweight, sufficient for bounded concurrency |
| Infinite event stream (WebSocket, tickers, file watcher) | | Declarative pipeline with backpressure, retry, timeout, combine |
| Real-time data enrichment from multiple async sources | | CombineLatest/Zip compose dependent streams without manual select |
| Pub/sub with multiple consumers sharing one source | | Hot observables (Share/Subjects) handle multicast natively |
Key differences: lo vs ro
| Aspect | | |
|---|---|---|
| Data | Finite slices | Infinite streams |
| Execution | Synchronous, blocking | Asynchronous, non-blocking |
| Evaluation | Eager (allocates intermediate slices) | Lazy (processes items as they arrive) |
| Timing | Immediate | Time-aware (delay, throttle, interval, timeout) |
| Error model | Return | Error channel propagates through pipeline |
| Use case | Collection transforms | Event-driven, real-time, async pipelines |
对于复杂的异步管道,Go的channel + goroutine会变得难以维护:手动关闭channel、繁琐的goroutine生命周期管理、嵌套select中的错误传播,以及缺乏可组合的操作符。通过声明式、可链式调用的流操作符解决了这些问题。
samber/ro何时使用哪种工具:
| 场景 | 工具 | 原因 |
|---|---|---|
| 转换切片(map、filter、reduce) | | 有限、同步、立即执行——无需流处理的开销 |
| 带错误处理的简单goroutine扇出 | | 标准库、轻量,足以应对有界并发场景 |
| 无限事件流(WebSocket、定时器、文件监视器) | | 声明式管道,支持背压、重试、超时、组合 |
| 从多个异步源实时丰富数据 | | CombineLatest/Zip可组合依赖流,无需手动编写select |
| 多消费者共享同一源的发布/订阅 | | Hot Observable(Share/Subjects)原生支持多播 |
lo与ro的核心区别:
| 维度 | | |
|---|---|---|
| 数据 | 有限切片 | 无限流 |
| 执行 | 同步、阻塞 | 异步、非阻塞 |
| 求值 | 立即执行(分配中间切片) | 延迟执行(随数据到达处理) |
| 时序 | 即时 | 支持时间控制(延迟、节流、间隔、超时) |
| 错误模型 | 每次调用返回 | 错误通过管道中的错误通道传播 |
| 适用场景 | 集合转换 | 事件驱动、实时处理、异步管道 |
Installation
安装
bash
go get github.com/samber/robash
go get github.com/samber/roCore Concepts
核心概念
Four building blocks:
- Observable — a data source that emits values over time. Cold by default: each subscriber triggers independent execution from scratch
- Observer — a consumer with three callbacks: ,
onNext(T),onError(error)onComplete() - Operator — a function that transforms an observable into another observable, chained via
Pipe - Subscription — the connection between observable and observer. Call to block or
.Wait()to cancel.Unsubscribe()
go
observable := ro.Pipe2(
ro.RangeWithInterval(0, 5, 1*time.Second),
ro.Filter(func(x int) bool { return x%2 == 0 }),
ro.Map(func(x int) string { return fmt.Sprintf("even-%d", x) }),
)
observable.Subscribe(ro.NewObserver(
func(s string) { fmt.Println(s) }, // onNext
func(err error) { log.Println(err) }, // onError
func() { fmt.Println("Done!") }, // onComplete
))
// Output: "even-0", "even-2", "even-4", "Done!"
// Or collect synchronously:
values, err := ro.Collect(observable)四个核心构建块:
- Observable — 一种随时间发射值的数据源,默认是Cold类型:每个订阅者都会触发独立的从头执行
- Observer — 消费者,包含三个回调:、
onNext(T)、onError(error)onComplete() - Operator — 将一个Observable转换为另一个Observable的函数,通过链式调用
Pipe - Subscription — Observable与Observer之间的连接。调用会阻塞,调用
.Wait()可取消连接.Unsubscribe()
go
observable := ro.Pipe2(
ro.RangeWithInterval(0, 5, 1*time.Second),
ro.Filter(func(x int) bool { return x%2 == 0 }),
ro.Map(func(x int) string { return fmt.Sprintf("even-%d", x) }),
)
observable.Subscribe(ro.NewObserver(
func(s string) { fmt.Println(s) }, // onNext
func(err error) { log.Println(err) }, // onError
func() { fmt.Println("Done!") }, // onComplete
))
// 输出: "even-0", "even-2", "even-4", "Done!"
// 或者同步收集结果:
values, err := ro.Collect(observable)Cold vs Hot Observables
Cold与Hot Observable
Cold (default): each starts a new independent execution. Safe and predictable — use by default.
.Subscribe()Hot: multiple subscribers share a single execution. Use when the source is expensive (WebSocket, DB poll) or subscribers must see the same events.
| Convert with | Behavior |
|---|---|
| Cold → hot with reference counting. Last unsubscribe tears down |
| Same as Share + buffers last N values for late subscribers |
| Cold → hot, but waits for explicit |
| Subjects | Natively hot — call |
| Subject | Constructor | Replay behavior |
|---|---|---|
| | None — late subscribers miss past events |
| | Replays last value to new subscribers |
| | Replays last N values |
| | Emits only last value, only on complete |
| | Single subscriber only |
For subject details and hot observable patterns, see Subjects Guide.
Cold(默认):每次都会启动一个独立的执行流程。安全且可预测——默认使用该类型。
.Subscribe()Hot:多个订阅者共享同一个执行流程。当数据源开销较大(如WebSocket、数据库轮询),或订阅者需要查看相同事件时使用。
| 转换方式 | 行为 |
|---|---|
| Cold → Hot,带引用计数。最后一个订阅者取消订阅时会销毁流 |
| 与Share相同,同时缓存最后N个值给晚加入的订阅者 |
| Cold → Hot,但需等待显式调用 |
| Subjects | 原生Hot类型——可直接调用 |
| Subject类型 | 构造函数 | 重放行为 |
|---|---|---|
| | 无——晚加入的订阅者会错过之前的事件 |
| | 向新订阅者重放最后一个值 |
| | 重放最后N个值 |
| | 仅在完成时发射最后一个值 |
| | 仅支持单个订阅者 |
如需了解Subject的详细信息和Hot Observable的模式,请查看Subjects指南。
Operator Quick Reference
操作符快速参考
| Category | Key operators | Purpose |
|---|---|---|
| Creation | | Create observables from various sources |
| Transform | | Transform or accumulate stream values |
| Filter | | Selectively emit values |
| Combine | | Merge multiple observables |
| Error | | Recover from errors |
| Timing | | Control emission timing |
| Side effect | | Observe without altering stream |
| Terminal | | Consume stream into Go types |
Use typed , ... for compile-time type safety across operator chains. The untyped uses and loses type checking.
Pipe2Pipe3Pipe25PipeanyFor the complete operator catalog (150+ operators with signatures), see Operators Guide.
| 分类 | 核心操作符 | 用途 |
|---|---|---|
| 创建 | | 从各种源创建Observable |
| 转换 | | 转换或累积流中的值 |
| 过滤 | | 选择性发射值 |
| 组合 | | 合并多个Observable |
| 错误处理 | | 从错误中恢复 |
| 时序控制 | | 控制值的发射时机 |
| 副作用 | | 观察流而不修改其内容 |
| 终端操作 | | 将流转换为Go原生类型 |
使用带类型的, ...可在操作符链中获得编译时类型安全。无类型的使用类型,会丢失类型检查。
Pipe2Pipe3Pipe25Pipeany如需完整的操作符目录(150+带签名的操作符),请查看操作符指南。
Common Mistakes
常见错误
| Mistake | Why it fails | Fix |
|---|---|---|
Using | Errors are silently dropped — bugs hide in production | Use |
Using untyped | Loses compile-time type safety, errors surface at runtime | Use |
Forgetting | Goroutine leak — the observable runs forever | Use |
Using | Unnecessary complexity, harder to reason about lifecycle | Use hot observables only when multiple consumers need the same stream |
Using | Stream overhead (goroutines, subscriptions) for a synchronous operation | Use |
| Not propagating context for cancellation | Streams ignore shutdown signals, causing resource leaks on termination | Chain |
| 错误 | 失败原因 | 修复方案 |
|---|---|---|
使用 | 错误会被静默丢弃——生产环境中bug会隐藏 | 使用 |
使用无类型的 | 丢失编译时类型安全,错误仅在运行时暴露 | 使用 |
无限流忘记调用 | Goroutine泄漏——Observable会一直运行 | 使用 |
当Cold Observable足够时使用 | 不必要的复杂度,生命周期更难推理 | 仅当多个消费者需要共享同一流时才使用Hot Observable |
使用 | 同步操作却带来了流处理的开销(goroutine、订阅) | 使用 |
| 未传播用于取消的context | 流会忽略关闭信号,导致终止时资源泄漏 | 在管道中链式调用 |
Best Practices
最佳实践
- Always handle all three events — use , not just
NewObserver(onNext, onError, onComplete). Unhandled errors cause silent data lossOnNext - Use for synchronous consumption — when the stream is finite and you need
Collect(),[]Tblocks until complete and returns the slice + errorCollect - Prefer typed Pipe functions — ,
Pipe2...Pipe3catch type mismatches at compile time. Reserve untypedPipe25for dynamic operator chainsPipe - Bound infinite streams — use ,
Take(n),TakeUntil(signal), or context cancellation. Unbounded streams leak goroutinesTimeout(d) - Use /
Tapfor observability — log, trace, or meter emissions without altering the stream. ChainDofor error monitoringTapOnError - Prefer for simple transforms — if the data is a finite slice and you need Map/Filter/Reduce, use
samber/lo. Reach forlowhen data arrives over time, from multiple sources, or needs retry/timeout/backpressurero
- 始终处理三种事件——使用,不要只使用
NewObserver(onNext, onError, onComplete)。未处理的错误会导致数据静默丢失OnNext - 使用进行同步消费——当流是有限的,且你需要
Collect()时,[]T会阻塞直到流完成,返回切片+错误Collect - 优先使用带类型的Pipe函数——,
Pipe2...Pipe3会在编译时捕获类型不匹配问题。仅在动态操作符链中使用无类型的Pipe25Pipe - 限制无限流的生命周期——使用、
Take(n)、TakeUntil(signal)或context取消。无界流会导致Goroutine泄漏Timeout(d) - 使用/
Tap实现可观察性——记录日志、追踪或计量发射的值,而不修改流。链式调用Do进行错误监控TapOnError - 简单转换优先使用——如果数据是有限切片,且你需要Map/Filter/Reduce,使用
samber/lo。当数据随时间到达、来自多个源,或需要重试/超时/背压时,再使用loro
Plugin Ecosystem
插件生态
40+ plugins extend ro with domain-specific operators:
| Category | Plugins | Import path prefix |
|---|---|---|
| Encoding | JSON, CSV, Base64, Gob | |
| Network | HTTP, I/O, FSNotify | |
| Scheduling | Cron, ICS | |
| Observability | Zap, Slog, Zerolog, Logrus, Sentry, Oops | |
| Rate limiting | Native, Ulule | |
| Data | Bytes, Strings, Sort, Strconv, Regexp, Template | |
| System | Process, Signal | |
For the full plugin catalog with import paths and usage examples, see Plugin Ecosystem.
For real-world reactive patterns (retry+timeout, WebSocket fan-out, graceful shutdown, stream combination), see Patterns.
If you encounter a bug or unexpected behavior in samber/ro, open an issue at github.com/samber/ro/issues.
40+插件扩展了ro的领域特定操作符:
| 分类 | 插件 | 导入路径前缀 |
|---|---|---|
| 编码 | JSON、CSV、Base64、Gob | |
| 网络 | HTTP、I/O、FSNotify | |
| 调度 | Cron、ICS | |
| 可观察性 | Zap、Slog、Zerolog、Logrus、Sentry、Oops | |
| 限流 | 原生、Ulule | |
| 数据 | Bytes、Strings、Sort、Strconv、Regexp、Template | |
| 系统 | 进程、信号 | |
如需完整的插件目录(含导入路径和使用示例),请查看插件生态。
如需了解真实世界的响应式模式(重试+超时、WebSocket扇出、优雅关闭、流组合),请查看模式。
如果你在使用samber/ro时遇到bug或意外行为,请在github.com/samber/ro/issues提交问题。
Cross-References
交叉参考
- → See skill for finite slice transforms (Map, Filter, Reduce, GroupBy) — use lo when data is already in a slice
samber/cc-skills-golang@golang-samber-lo - → See skill for monadic types (Option, Result, Either) that compose with ro pipelines
samber/cc-skills-golang@golang-samber-mo - → See skill for in-memory caching (also available as an ro plugin)
samber/cc-skills-golang@golang-samber-hot - → See skill for goroutine/channel patterns when reactive streams are overkill
samber/cc-skills-golang@golang-concurrency - → See skill for monitoring reactive pipelines in production
samber/cc-skills-golang@golang-observability
- → 如需有限切片转换(Map、Filter、Reduce、GroupBy),请查看技能——当数据已在切片中时使用lo
samber/cc-skills-golang@golang-samber-lo - → 如需与ro管道组合的单子类型(Option、Result、Either),请查看技能
samber/cc-skills-golang@golang-samber-mo - → 如需内存缓存(也可作为ro插件使用),请查看技能
samber/cc-skills-golang@golang-samber-hot - → 当响应式流过于重量级时,如需goroutine/channel模式,请查看技能
samber/cc-skills-golang@golang-concurrency - → 如需在生产环境中监控响应式管道,请查看技能
samber/cc-skills-golang@golang-observability