Topology and utility patterns route and reshape data flowing through channels. They provide building blocks for combining, splitting, and flattening channel streams.
Verdicts Link to heading
- Idiomatic - Considered a best practice in Go.
- Code Smell - Not wrong, but if you are using it, make sure you understand what you are doing.
- Anti-Pattern - Avoid unless you are an expert and know it is okay to break this rule.
Or Channel Pattern Link to heading
Problem Space Link to heading
The Or Channel pattern combines multiple channels into one that closes when any of the input channels closes or receives a value. This is useful for combining multiple cancellation signals or waiting for the first of several events.
Practical Example 1 Link to heading
cmd/or/main.go
package main
import (
"fmt"
"time"
)
func or(channels ...<-chan struct{}) <-chan struct{} {
switch len(channels) {
case 0:
return nil
case 1:
return channels[0]
}
done := make(chan struct{})
go func() {
defer close(done)
select {
case <-channels[0]:
case <-channels[1]:
case <-or(append(channels[2:], done)...):
}
}()
return done
}
func after(d time.Duration) <-chan struct{} {
ch := make(chan struct{})
go func() {
time.Sleep(d)
close(ch)
}()
return ch
}
func main() {
start := time.Now()
// First one wins
<-or(
after(2*time.Second),
after(500*time.Millisecond),
after(1*time.Second),
)
fmt.Printf("done after %v\n", time.Since(start)) // ~500ms
}
Practical Example 2 Link to heading
cmd/or/main.go
package main
import (
"context"
"fmt"
"os"
"os/signal"
"reflect"
"syscall"
"time"
)
func orDone(ctx context.Context, channels ...<-chan struct{}) <-chan struct{} {
done := make(chan struct{})
go func() {
defer close(done)
// Build select cases dynamically
cases := make([]reflect.SelectCase, len(channels)+1)
cases[0] = reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(ctx.Done()),
}
for i, ch := range channels {
cases[i+1] = reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(ch),
}
}
reflect.Select(cases)
}()
return done
}
func signalChannel(signals ...os.Signal) <-chan struct{} {
ch := make(chan struct{})
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, signals...)
go func() {
<-sigCh
close(ch)
}()
return ch
}
func timeout(d time.Duration) <-chan struct{} {
ch := make(chan struct{})
go func() {
time.Sleep(d)
close(ch)
}()
return ch
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Stop on: context cancel, timeout, or interrupt signal
done := orDone(ctx,
timeout(10*time.Second),
signalChannel(syscall.SIGINT, syscall.SIGTERM),
)
fmt.Println("running... press Ctrl+C or wait 10s")
<-done
fmt.Println("shutting down")
}
Considerations Link to heading
Example 1: Recursive or-channel using select. Simple and doesn’t require reflection. The recursion handles arbitrary numbers of channels by combining them in pairs.
Example 2: Uses reflect.Select for dynamic channel selection. More flexible when combining with context cancellation. Useful for graceful shutdown scenarios.
The or-channel is a building block for cancellation. When you have multiple reasons to stop (timeout, signal, explicit cancel), or-channel unifies them into a single signal.
Verdict Link to heading
- Example 1: Idiomatic
- Example 2: Code Smell (reflect adds complexity; prefer Example 1 when possible)
Tee Channel Pattern Link to heading
Problem Space Link to heading
The Tee Channel pattern splits a single input channel into two output channels. Every value from the input is sent to both outputs. This allows multiple consumers to process the same stream independently.
Practical Example 1 Link to heading
cmd/tee/main.go
package main
import (
"context"
"fmt"
)
func tee[T any](ctx context.Context, in <-chan T) (<-chan T, <-chan T) {
out1 := make(chan T)
out2 := make(chan T)
go func() {
defer close(out1)
defer close(out2)
for v := range in {
// Local copies for select
o1, o2 := out1, out2
// Send to both (order doesn't matter, both must receive)
for i := 0; i < 2; i++ {
select {
case <-ctx.Done():
return
case o1 <- v:
o1 = nil // disable this case
case o2 <- v:
o2 = nil // disable this case
}
}
}
}()
return out1, out2
}
func generate(nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
out <- n
}
}()
return out
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
source := generate(1, 2, 3, 4, 5)
ch1, ch2 := tee(ctx, source)
// Consume both channels concurrently
done := make(chan struct{})
go func() {
for v := range ch1 {
fmt.Println("consumer 1:", v)
}
done <- struct{}{}
}()
go func() {
for v := range ch2 {
fmt.Println("consumer 2:", v)
}
done <- struct{}{}
}()
<-done
<-done
}
Practical Example 2 Link to heading
cmd/tee/main.go
package main
import (
"context"
"fmt"
"sync"
)
type Event struct {
Type string
Data string
}
func teeEvents(ctx context.Context, in <-chan Event) (<-chan Event, <-chan Event) {
logger := make(chan Event)
processor := make(chan Event)
go func() {
defer close(logger)
defer close(processor)
for event := range in {
l, p := logger, processor
for i := 0; i < 2; i++ {
select {
case <-ctx.Done():
return
case l <- event:
l = nil
case p <- event:
p = nil
}
}
}
}()
return logger, processor
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
events := make(chan Event)
go func() {
defer close(events)
events <- Event{Type: "click", Data: "button-1"}
events <- Event{Type: "submit", Data: "form-1"}
events <- Event{Type: "click", Data: "button-2"}
}()
logCh, processCh := teeEvents(ctx, events)
var wg sync.WaitGroup
wg.Add(2)
// Logger: writes all events to log
go func() {
defer wg.Done()
for e := range logCh {
fmt.Printf("[LOG] %s: %s\n", e.Type, e.Data)
}
}()
// Processor: only handles clicks
go func() {
defer wg.Done()
for e := range processCh {
if e.Type == "click" {
fmt.Printf("[PROCESS] handling click on %s\n", e.Data)
}
}
}()
wg.Wait()
}
Considerations Link to heading
Example 1: Generic tee that splits any channel into two. The select loop with nil channel disabling ensures both outputs receive each value before moving to the next.
Example 2: Practical use case where events go to both a logger and a processor. Each consumer can filter or transform independently.
Both outputs must keep up with the input. If one consumer is slow, it blocks the tee and the other consumer. Use buffered channels or add separate goroutines with buffers if consumers have different speeds.
Verdict Link to heading
- Example 1: Idiomatic
- Example 2: Idiomatic
Bridge Channel Pattern Link to heading
Problem Space Link to heading
The Bridge Channel pattern flattens a channel of channels into a single channel. When you have producers that each return their own channel, bridge combines them into one stream. Values are consumed in order: all values from the first channel, then all from the second, and so on.
Practical Example 1 Link to heading
cmd/bridge/main.go
package main
import (
"context"
"fmt"
)
func bridge[T any](ctx context.Context, chanStream <-chan <-chan T) <-chan T {
out := make(chan T)
go func() {
defer close(out)
for {
var ch <-chan T
select {
case <-ctx.Done():
return
case c, ok := <-chanStream:
if !ok {
return
}
ch = c
}
// Use select instead of range to respect cancellation
for {
select {
case <-ctx.Done():
return
case v, ok := <-ch:
if !ok {
goto nextChannel
}
select {
case <-ctx.Done():
return
case out <- v:
}
}
}
nextChannel:
}
}()
return out
}
func generate(nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
out <- n
}
}()
return out
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Channel of channels
chanStream := make(chan (<-chan int))
go func() {
defer close(chanStream)
chanStream <- generate(1, 2, 3)
chanStream <- generate(4, 5, 6)
chanStream <- generate(7, 8, 9)
}()
// Bridge flattens into single stream
for v := range bridge(ctx, chanStream) {
fmt.Println(v) // 1, 2, 3, 4, 5, 6, 7, 8, 9
}
}
Practical Example 2 Link to heading
cmd/bridge/main.go
package main
import (
"context"
"fmt"
"time"
)
type Batch struct {
ID int
Items []string
}
func fetchBatches(ctx context.Context, batchIDs []int) <-chan <-chan string {
chanStream := make(chan (<-chan string))
go func() {
defer close(chanStream)
for _, id := range batchIDs {
// Each batch returns its own channel
batch := make(chan string)
select {
case <-ctx.Done():
return
case chanStream <- batch:
}
// Populate batch in background
go func(batchID int, ch chan string) {
defer close(ch)
// Simulate fetching items for this batch
time.Sleep(50 * time.Millisecond)
for i := 1; i <= 3; i++ {
select {
case <-ctx.Done():
return
case ch <- fmt.Sprintf("batch%d-item%d", batchID, i):
}
}
}(id, batch)
}
}()
return chanStream
}
func bridge[T any](ctx context.Context, chanStream <-chan <-chan T) <-chan T {
out := make(chan T)
go func() {
defer close(out)
for {
var ch <-chan T
select {
case <-ctx.Done():
return
case c, ok := <-chanStream:
if !ok {
return
}
ch = c
}
for v := range ch {
select {
case <-ctx.Done():
return
case out <- v:
}
}
}
}()
return out
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
batches := fetchBatches(ctx, []int{1, 2, 3})
for item := range bridge(ctx, batches) {
fmt.Println(item)
}
}
Considerations Link to heading
Example 1: Generic bridge that flattens a channel of channels. Processes channels sequentially, draining each completely before moving to the next.
Example 2: Practical use case fetching paginated or batched data. Each batch is fetched independently and returns its own channel. Bridge presents them as a continuous stream to the consumer.
Bridge preserves ordering within and across channels. If you need interleaved consumption (values from multiple channels as they arrive), use fan-in instead. Bridge is for sequential flattening.
Verdict Link to heading
- Example 1: Idiomatic
- Example 2: Idiomatic
Done Channel Pattern Link to heading
Problem Space Link to heading
The Done Channel pattern provides a way to signal goroutines to stop. A closed channel broadcasts to all receivers simultaneously, making it ideal for cancellation. This prevents goroutine leaks and enables graceful shutdown.
Practical Example 1 Link to heading
cmd/done/main.go
package main
import (
"fmt"
"time"
)
func worker(id int, done <-chan struct{}) {
for {
select {
case <-done:
fmt.Printf("worker %d: shutting down\n", id)
return
default:
fmt.Printf("worker %d: working\n", id)
time.Sleep(100 * time.Millisecond)
}
}
}
func main() {
done := make(chan struct{})
// Start workers
for i := 1; i <= 3; i++ {
go worker(i, done)
}
// Let them work
time.Sleep(350 * time.Millisecond)
// Signal all workers to stop
close(done)
// Give workers time to shut down
time.Sleep(50 * time.Millisecond)
fmt.Println("all workers stopped")
}
Practical Example 2 Link to heading
cmd/done/main.go
package main
import (
"context"
"fmt"
"os"
"os/signal"
"syscall"
"time"
)
func producer(ctx context.Context) <-chan int {
out := make(chan int)
go func() {
defer close(out)
i := 0
for {
i++
select {
case <-ctx.Done():
fmt.Println("producer: stopped")
return
case out <- i:
time.Sleep(100 * time.Millisecond)
}
}
}()
return out
}
func consumer(ctx context.Context, in <-chan int) {
for {
select {
case <-ctx.Done():
fmt.Println("consumer: stopped")
return
case v, ok := <-in:
if !ok {
fmt.Println("consumer: channel closed")
return
}
fmt.Println("received:", v)
}
}
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
// Cancel on interrupt signal
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigCh
fmt.Println("\nreceived shutdown signal")
cancel()
}()
// Start pipeline
nums := producer(ctx)
go consumer(ctx, nums)
// Wait for cancellation
<-ctx.Done()
// Give goroutines time to clean up
time.Sleep(50 * time.Millisecond)
fmt.Println("shutdown complete")
}
Practical Example 3 Link to heading
cmd/done/main.go
package main
import (
"context"
"fmt"
"sync"
"time"
)
func orDone[T any](ctx context.Context, in <-chan T) <-chan T {
out := make(chan T)
go func() {
defer close(out)
for {
select {
case <-ctx.Done():
return
case v, ok := <-in:
if !ok {
return
}
select {
case out <- v:
case <-ctx.Done():
return
}
}
}
}()
return out
}
func slowProducer() <-chan int {
out := make(chan int)
go func() {
defer close(out)
for i := 1; i <= 100; i++ {
out <- i
time.Sleep(50 * time.Millisecond)
}
}()
return out
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
// orDone wraps channel to respect cancellation
nums := slowProducer()
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for v := range orDone(ctx, nums) {
fmt.Println("received:", v)
}
fmt.Println("consumer: done")
}()
wg.Wait()
}
Considerations Link to heading
Example 1: Basic done channel using close(). Closing broadcasts to all goroutines simultaneously. Use struct{} for done channels since no data is needed, only the signal.
Example 2: Context-based cancellation is the standard approach. context.Context carries deadlines, cancellation signals, and values across API boundaries. Use it for any goroutine that should be cancellable.
Example 3: The orDone helper wraps any channel to make it respect context cancellation. Useful when consuming channels from code you don’t control. Without this, a blocked receive on a closed-but-not-drained channel can leak goroutines.
Always check ctx.Done() alongside channel operations in select statements. A goroutine that only reads from a channel without checking cancellation will leak if the producer stops sending but doesn’t close the channel.
Verdict Link to heading
- Example 1: Idiomatic (for simple internal coordination)
- Example 2: Idiomatic (prefer context for public APIs)
- Example 3: Idiomatic (essential helper for robust pipelines)