Persona: You are a Go engineer who reaches for reactive streams when data flows asynchronously or infinitely. You use samber/ro to build declarative pipelines instead of manual goroutine/channel wiring, but you know when a simple slice + samber/lo is enough.
Thinking mode: Use
when designing advanced reactive pipelines or choosing between cold/hot observables, subjects, and combining operators. Wrong architecture leads to resource leaks or missed events.
samber/ro — Reactive Streams for Go
Go implementation of
ReactiveX. Generics-first, type-safe, composable pipelines for asynchronous data streams with automatic backpressure, error propagation, context integration, and resource cleanup. 150+ operators, 5 subject types, 40+ plugins.
Official Resources:
This skill is not exhaustive. Please refer to library documentation and code examples for more informations. Context7 can help as a discoverability platform.
Why samber/ro (Streams vs Slices)
Go channels + goroutines become unwieldy for complex async pipelines: manual channel closures, verbose goroutine lifecycle, error propagation across nested selects, and no composable operators.
solves this with declarative, chainable stream operators.
When to use which tool:
| Scenario | Tool | Why |
|---|
| Transform a slice (map, filter, reduce) | | Finite, synchronous, eager — no stream overhead needed |
| Simple goroutine fan-out with error handling | | Standard lib, lightweight, sufficient for bounded concurrency |
| Infinite event stream (WebSocket, tickers, file watcher) | | Declarative pipeline with backpressure, retry, timeout, combine |
| Real-time data enrichment from multiple async sources | | CombineLatest/Zip compose dependent streams without manual select |
| Pub/sub with multiple consumers sharing one source | | Hot observables (Share/Subjects) handle multicast natively |
Key differences: lo vs ro
| Aspect | | |
|---|
| Data | Finite slices | Infinite streams |
| Execution | Synchronous, blocking | Asynchronous, non-blocking |
| Evaluation | Eager (allocates intermediate slices) | Lazy (processes items as they arrive) |
| Timing | Immediate | Time-aware (delay, throttle, interval, timeout) |
| Error model | Return per call | Error channel propagates through pipeline |
| Use case | Collection transforms | Event-driven, real-time, async pipelines |
Installation
bash
go get github.com/samber/ro
Core Concepts
Four building blocks:
- Observable — a data source that emits values over time. Cold by default: each subscriber triggers independent execution from scratch
- Observer — a consumer with three callbacks: , ,
- Operator — a function that transforms an observable into another observable, chained via
- Subscription — the connection between observable and observer. Call to block or to cancel
go
observable := ro.Pipe2(
ro.RangeWithInterval(0, 5, 1*time.Second),
ro.Filter(func(x int) bool { return x%2 == 0 }),
ro.Map(func(x int) string { return fmt.Sprintf("even-%d", x) }),
)
observable.Subscribe(ro.NewObserver(
func(s string) { fmt.Println(s) }, // onNext
func(err error) { log.Println(err) }, // onError
func() { fmt.Println("Done!") }, // onComplete
))
// Output: "even-0", "even-2", "even-4", "Done!"
// Or collect synchronously:
values, err := ro.Collect(observable)
Cold vs Hot Observables
Cold (default): each
starts a new independent execution. Safe and predictable — use by default.
Hot: multiple subscribers share a single execution. Use when the source is expensive (WebSocket, DB poll) or subscribers must see the same events.
| Convert with | Behavior |
|---|
| Cold → hot with reference counting. Last unsubscribe tears down |
| Same as Share + buffers last N values for late subscribers |
| Cold → hot, but waits for explicit call |
| Subjects | Natively hot — call , , directly |
| Subject | Constructor | Replay behavior |
|---|
| | None — late subscribers miss past events |
| NewBehaviorSubject[T](initial)
| Replays last value to new subscribers |
| NewReplaySubject[T](bufferSize)
| Replays last N values |
| | Emits only last value, only on complete |
| NewUnicastSubject[T](bufferSize)
| Single subscriber only |
For subject details and hot observable patterns, see Subjects Guide.
Operator Quick Reference
| Category | Key operators | Purpose |
|---|
| Creation | , , , , , , | Create observables from various sources |
| Transform | , , , , , | Transform or accumulate stream values |
| Filter | , , , , , , , | Selectively emit values |
| Combine | , , –, –, | Merge multiple observables |
| Error | , , , , | Recover from errors |
| Timing | , , , , , | Control emission timing |
| Side effect | /, , , | Observe without altering stream |
| Terminal | , , , | Consume stream into Go types |
Use typed
,
...
for compile-time type safety across operator chains. The untyped
uses
and loses type checking.
For the complete operator catalog (150+ operators with signatures), see Operators Guide.
Common Mistakes
| Mistake | Why it fails | Fix |
|---|
| Using without error handler | Errors are silently dropped — bugs hide in production | Use ro.NewObserver(onNext, onError, onComplete)
with all 3 callbacks |
| Using untyped instead of / | Loses compile-time type safety, errors surface at runtime | Use , ... for typed operator chains |
| Forgetting on infinite streams | Goroutine leak — the observable runs forever | Use , context cancellation, or explicit |
| Using when cold is sufficient | Unnecessary complexity, harder to reason about lifecycle | Use hot observables only when multiple consumers need the same stream |
| Using for finite slice transforms | Stream overhead (goroutines, subscriptions) for a synchronous operation | Use — it's simpler, faster, and purpose-built for slices |
| Not propagating context for cancellation | Streams ignore shutdown signals, causing resource leaks on termination | Chain or in the pipeline |
Best Practices
- Always handle all three events — use
NewObserver(onNext, onError, onComplete)
, not just . Unhandled errors cause silent data loss
- Use for synchronous consumption — when the stream is finite and you need , blocks until complete and returns the slice + error
- Prefer typed Pipe functions — , ... catch type mismatches at compile time. Reserve untyped for dynamic operator chains
- Bound infinite streams — use , , , or context cancellation. Unbounded streams leak goroutines
- Use / for observability — log, trace, or meter emissions without altering the stream. Chain for error monitoring
- Prefer for simple transforms — if the data is a finite slice and you need Map/Filter/Reduce, use . Reach for when data arrives over time, from multiple sources, or needs retry/timeout/backpressure
Plugin Ecosystem
40+ plugins extend ro with domain-specific operators:
| Category | Plugins | Import path prefix |
|---|
| Encoding | JSON, CSV, Base64, Gob | |
| Network | HTTP, I/O, FSNotify | , , |
| Scheduling | Cron, ICS | , |
| Observability | Zap, Slog, Zerolog, Logrus, Sentry, Oops | plugins/observability/...
, |
| Rate limiting | Native, Ulule | |
| Data | Bytes, Strings, Sort, Strconv, Regexp, Template | , , etc. |
| System | Process, Signal | , |
For the full plugin catalog with import paths and usage examples, see Plugin Ecosystem.
For real-world reactive patterns (retry+timeout, WebSocket fan-out, graceful shutdown, stream combination), see Patterns.
If you encounter a bug or unexpected behavior in samber/ro, open an issue at
github.com/samber/ro/issues.
Cross-References
- → See
samber/cc-skills-golang@golang-samber-lo
skill for finite slice transforms (Map, Filter, Reduce, GroupBy) — use lo when data is already in a slice
- → See
samber/cc-skills-golang@golang-samber-mo
skill for monadic types (Option, Result, Either) that compose with ro pipelines
- → See
samber/cc-skills-golang@golang-samber-hot
skill for in-memory caching (also available as an ro plugin)
- → See
samber/cc-skills-golang@golang-concurrency
skill for goroutine/channel patterns when reactive streams are overkill
- → See
samber/cc-skills-golang@golang-observability
skill for monitoring reactive pipelines in production