Featured image

Topology and utility patterns route and reshape data flowing through channels. They provide building blocks for combining, splitting, and flattening channel streams.

Verdicts Link to heading

  • Idiomatic - Considered a best practice in Go.
  • Code Smell - Not wrong, but if you are using it, make sure you understand what you are doing.
  • Anti-Pattern - Avoid unless you are an expert and know it is okay to break this rule.

Or Channel Pattern Link to heading

Problem Space Link to heading

The Or Channel pattern combines multiple channels into one that closes when any of the input channels closes or receives a value. This is useful for combining multiple cancellation signals or waiting for the first of several events.

Practical Example 1 Link to heading

cmd/or/main.go

package main

import (
    "fmt"
    "time"
)

func or(channels ...<-chan struct{}) <-chan struct{} {
    switch len(channels) {
    case 0:
        return nil
    case 1:
        return channels[0]
    }
    
    done := make(chan struct{})
    go func() {
        defer close(done)
        
        select {
        case <-channels[0]:
        case <-channels[1]:
        case <-or(append(channels[2:], done)...):
        }
    }()
    
    return done
}

func after(d time.Duration) <-chan struct{} {
    ch := make(chan struct{})
    go func() {
        time.Sleep(d)
        close(ch)
    }()
    return ch
}

func main() {
    start := time.Now()
    
    // First one wins
    <-or(
        after(2*time.Second),
        after(500*time.Millisecond),
        after(1*time.Second),
    )
    
    fmt.Printf("done after %v\n", time.Since(start)) // ~500ms
}

Practical Example 2 Link to heading

cmd/or/main.go

package main

import (
    "context"
    "fmt"
    "os"
    "os/signal"
    "reflect"
    "syscall"
    "time"
)

func orDone(ctx context.Context, channels ...<-chan struct{}) <-chan struct{} {
    done := make(chan struct{})
    
    go func() {
        defer close(done)
        
        // Build select cases dynamically
        cases := make([]reflect.SelectCase, len(channels)+1)
        cases[0] = reflect.SelectCase{
            Dir:  reflect.SelectRecv,
            Chan: reflect.ValueOf(ctx.Done()),
        }
        for i, ch := range channels {
            cases[i+1] = reflect.SelectCase{
                Dir:  reflect.SelectRecv,
                Chan: reflect.ValueOf(ch),
            }
        }
        
        reflect.Select(cases)
    }()
    
    return done
}

func signalChannel(signals ...os.Signal) <-chan struct{} {
    ch := make(chan struct{})
    sigCh := make(chan os.Signal, 1)
    signal.Notify(sigCh, signals...)
    go func() {
        <-sigCh
        close(ch)
    }()
    return ch
}

func timeout(d time.Duration) <-chan struct{} {
    ch := make(chan struct{})
    go func() {
        time.Sleep(d)
        close(ch)
    }()
    return ch
}

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    
    // Stop on: context cancel, timeout, or interrupt signal
    done := orDone(ctx,
        timeout(10*time.Second),
        signalChannel(syscall.SIGINT, syscall.SIGTERM),
    )
    
    fmt.Println("running... press Ctrl+C or wait 10s")
    <-done
    fmt.Println("shutting down")
}

Considerations Link to heading

Example 1: Recursive or-channel using select. Simple and doesn’t require reflection. The recursion handles arbitrary numbers of channels by combining them in pairs.

Example 2: Uses reflect.Select for dynamic channel selection. More flexible when combining with context cancellation. Useful for graceful shutdown scenarios.

The or-channel is a building block for cancellation. When you have multiple reasons to stop (timeout, signal, explicit cancel), or-channel unifies them into a single signal.

Verdict Link to heading

  • Example 1: Idiomatic
  • Example 2: Code Smell (reflect adds complexity; prefer Example 1 when possible)

Tee Channel Pattern Link to heading

Problem Space Link to heading

The Tee Channel pattern splits a single input channel into two output channels. Every value from the input is sent to both outputs. This allows multiple consumers to process the same stream independently.

Practical Example 1 Link to heading

cmd/tee/main.go

package main

import (
    "context"
    "fmt"
)

func tee[T any](ctx context.Context, in <-chan T) (<-chan T, <-chan T) {
    out1 := make(chan T)
    out2 := make(chan T)
    
    go func() {
        defer close(out1)
        defer close(out2)
        
        for v := range in {
            // Local copies for select
            o1, o2 := out1, out2
            
            // Send to both (order doesn't matter, both must receive)
            for i := 0; i < 2; i++ {
                select {
                case <-ctx.Done():
                    return
                case o1 <- v:
                    o1 = nil // disable this case
                case o2 <- v:
                    o2 = nil // disable this case
                }
            }
        }
    }()
    
    return out1, out2
}

func generate(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for _, n := range nums {
            out <- n
        }
    }()
    return out
}

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    
    source := generate(1, 2, 3, 4, 5)
    ch1, ch2 := tee(ctx, source)
    
    // Consume both channels concurrently
    done := make(chan struct{})
    
    go func() {
        for v := range ch1 {
            fmt.Println("consumer 1:", v)
        }
        done <- struct{}{}
    }()
    
    go func() {
        for v := range ch2 {
            fmt.Println("consumer 2:", v)
        }
        done <- struct{}{}
    }()
    
    <-done
    <-done
}

Practical Example 2 Link to heading

cmd/tee/main.go

package main

import (
    "context"
    "fmt"
    "sync"
)

type Event struct {
    Type string
    Data string
}

func teeEvents(ctx context.Context, in <-chan Event) (<-chan Event, <-chan Event) {
    logger := make(chan Event)
    processor := make(chan Event)
    
    go func() {
        defer close(logger)
        defer close(processor)
        
        for event := range in {
            l, p := logger, processor
            
            for i := 0; i < 2; i++ {
                select {
                case <-ctx.Done():
                    return
                case l <- event:
                    l = nil
                case p <- event:
                    p = nil
                }
            }
        }
    }()
    
    return logger, processor
}

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    
    events := make(chan Event)
    go func() {
        defer close(events)
        events <- Event{Type: "click", Data: "button-1"}
        events <- Event{Type: "submit", Data: "form-1"}
        events <- Event{Type: "click", Data: "button-2"}
    }()
    
    logCh, processCh := teeEvents(ctx, events)
    
    var wg sync.WaitGroup
    wg.Add(2)
    
    // Logger: writes all events to log
    go func() {
        defer wg.Done()
        for e := range logCh {
            fmt.Printf("[LOG] %s: %s\n", e.Type, e.Data)
        }
    }()
    
    // Processor: only handles clicks
    go func() {
        defer wg.Done()
        for e := range processCh {
            if e.Type == "click" {
                fmt.Printf("[PROCESS] handling click on %s\n", e.Data)
            }
        }
    }()
    
    wg.Wait()
}

Considerations Link to heading

Example 1: Generic tee that splits any channel into two. The select loop with nil channel disabling ensures both outputs receive each value before moving to the next.

Example 2: Practical use case where events go to both a logger and a processor. Each consumer can filter or transform independently.

Both outputs must keep up with the input. If one consumer is slow, it blocks the tee and the other consumer. Use buffered channels or add separate goroutines with buffers if consumers have different speeds.

Verdict Link to heading

  • Example 1: Idiomatic
  • Example 2: Idiomatic

Bridge Channel Pattern Link to heading

Problem Space Link to heading

The Bridge Channel pattern flattens a channel of channels into a single channel. When you have producers that each return their own channel, bridge combines them into one stream. Values are consumed in order: all values from the first channel, then all from the second, and so on.

Practical Example 1 Link to heading

cmd/bridge/main.go

package main

import (
    "context"
    "fmt"
)

func bridge[T any](ctx context.Context, chanStream <-chan <-chan T) <-chan T {
    out := make(chan T)
    
    go func() {
        defer close(out)
        
        for {
            var ch <-chan T
            
            select {
            case <-ctx.Done():
                return
            case c, ok := <-chanStream:
                if !ok {
                    return
                }
                ch = c
            }
            
            // Use select instead of range to respect cancellation
            for {
                select {
                case <-ctx.Done():
                    return
                case v, ok := <-ch:
                    if !ok {
                        goto nextChannel
                    }
                    select {
                    case <-ctx.Done():
                        return
                    case out <- v:
                    }
                }
            }
        nextChannel:
        }
    }()
    
    return out
}

func generate(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for _, n := range nums {
            out <- n
        }
    }()
    return out
}

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    
    // Channel of channels
    chanStream := make(chan (<-chan int))
    go func() {
        defer close(chanStream)
        chanStream <- generate(1, 2, 3)
        chanStream <- generate(4, 5, 6)
        chanStream <- generate(7, 8, 9)
    }()
    
    // Bridge flattens into single stream
    for v := range bridge(ctx, chanStream) {
        fmt.Println(v) // 1, 2, 3, 4, 5, 6, 7, 8, 9
    }
}

Practical Example 2 Link to heading

cmd/bridge/main.go

package main

import (
    "context"
    "fmt"
    "time"
)

type Batch struct {
    ID    int
    Items []string
}

func fetchBatches(ctx context.Context, batchIDs []int) <-chan <-chan string {
    chanStream := make(chan (<-chan string))
    
    go func() {
        defer close(chanStream)
        
        for _, id := range batchIDs {
            // Each batch returns its own channel
            batch := make(chan string)
            
            select {
            case <-ctx.Done():
                return
            case chanStream <- batch:
            }
            
            // Populate batch in background
            go func(batchID int, ch chan string) {
                defer close(ch)
                
                // Simulate fetching items for this batch
                time.Sleep(50 * time.Millisecond)
                for i := 1; i <= 3; i++ {
                    select {
                    case <-ctx.Done():
                        return
                    case ch <- fmt.Sprintf("batch%d-item%d", batchID, i):
                    }
                }
            }(id, batch)
        }
    }()
    
    return chanStream
}

func bridge[T any](ctx context.Context, chanStream <-chan <-chan T) <-chan T {
    out := make(chan T)
    
    go func() {
        defer close(out)
        
        for {
            var ch <-chan T
            
            select {
            case <-ctx.Done():
                return
            case c, ok := <-chanStream:
                if !ok {
                    return
                }
                ch = c
            }
            
            for v := range ch {
                select {
                case <-ctx.Done():
                    return
                case out <- v:
                }
            }
        }
    }()
    
    return out
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
    defer cancel()
    
    batches := fetchBatches(ctx, []int{1, 2, 3})
    
    for item := range bridge(ctx, batches) {
        fmt.Println(item)
    }
}

Considerations Link to heading

Example 1: Generic bridge that flattens a channel of channels. Processes channels sequentially, draining each completely before moving to the next.

Example 2: Practical use case fetching paginated or batched data. Each batch is fetched independently and returns its own channel. Bridge presents them as a continuous stream to the consumer.

Bridge preserves ordering within and across channels. If you need interleaved consumption (values from multiple channels as they arrive), use fan-in instead. Bridge is for sequential flattening.

Verdict Link to heading

  • Example 1: Idiomatic
  • Example 2: Idiomatic

Done Channel Pattern Link to heading

Problem Space Link to heading

The Done Channel pattern provides a way to signal goroutines to stop. A closed channel broadcasts to all receivers simultaneously, making it ideal for cancellation. This prevents goroutine leaks and enables graceful shutdown.

Practical Example 1 Link to heading

cmd/done/main.go

package main

import (
    "fmt"
    "time"
)

func worker(id int, done <-chan struct{}) {
    for {
        select {
        case <-done:
            fmt.Printf("worker %d: shutting down\n", id)
            return
        default:
            fmt.Printf("worker %d: working\n", id)
            time.Sleep(100 * time.Millisecond)
        }
    }
}

func main() {
    done := make(chan struct{})
    
    // Start workers
    for i := 1; i <= 3; i++ {
        go worker(i, done)
    }
    
    // Let them work
    time.Sleep(350 * time.Millisecond)
    
    // Signal all workers to stop
    close(done)
    
    // Give workers time to shut down
    time.Sleep(50 * time.Millisecond)
    fmt.Println("all workers stopped")
}

Practical Example 2 Link to heading

cmd/done/main.go

package main

import (
    "context"
    "fmt"
    "os"
    "os/signal"
    "syscall"
    "time"
)

func producer(ctx context.Context) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        i := 0
        for {
            i++
            select {
            case <-ctx.Done():
                fmt.Println("producer: stopped")
                return
            case out <- i:
                time.Sleep(100 * time.Millisecond)
            }
        }
    }()
    return out
}

func consumer(ctx context.Context, in <-chan int) {
    for {
        select {
        case <-ctx.Done():
            fmt.Println("consumer: stopped")
            return
        case v, ok := <-in:
            if !ok {
                fmt.Println("consumer: channel closed")
                return
            }
            fmt.Println("received:", v)
        }
    }
}

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    
    // Cancel on interrupt signal
    sigCh := make(chan os.Signal, 1)
    signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
    go func() {
        <-sigCh
        fmt.Println("\nreceived shutdown signal")
        cancel()
    }()
    
    // Start pipeline
    nums := producer(ctx)
    go consumer(ctx, nums)
    
    // Wait for cancellation
    <-ctx.Done()
    
    // Give goroutines time to clean up
    time.Sleep(50 * time.Millisecond)
    fmt.Println("shutdown complete")
}

Practical Example 3 Link to heading

cmd/done/main.go

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

func orDone[T any](ctx context.Context, in <-chan T) <-chan T {
    out := make(chan T)
    go func() {
        defer close(out)
        for {
            select {
            case <-ctx.Done():
                return
            case v, ok := <-in:
                if !ok {
                    return
                }
                select {
                case out <- v:
                case <-ctx.Done():
                    return
                }
            }
        }
    }()
    return out
}

func slowProducer() <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for i := 1; i <= 100; i++ {
            out <- i
            time.Sleep(50 * time.Millisecond)
        }
    }()
    return out
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
    defer cancel()
    
    // orDone wraps channel to respect cancellation
    nums := slowProducer()
    
    var wg sync.WaitGroup
    wg.Add(1)
    
    go func() {
        defer wg.Done()
        for v := range orDone(ctx, nums) {
            fmt.Println("received:", v)
        }
        fmt.Println("consumer: done")
    }()
    
    wg.Wait()
}

Considerations Link to heading

Example 1: Basic done channel using close(). Closing broadcasts to all goroutines simultaneously. Use struct{} for done channels since no data is needed, only the signal.

Example 2: Context-based cancellation is the standard approach. context.Context carries deadlines, cancellation signals, and values across API boundaries. Use it for any goroutine that should be cancellable.

Example 3: The orDone helper wraps any channel to make it respect context cancellation. Useful when consuming channels from code you don’t control. Without this, a blocked receive on a closed-but-not-drained channel can leak goroutines.

Always check ctx.Done() alongside channel operations in select statements. A goroutine that only reads from a channel without checking cancellation will leak if the producer stops sending but doesn’t close the channel.

Verdict Link to heading

  • Example 1: Idiomatic (for simple internal coordination)
  • Example 2: Idiomatic (prefer context for public APIs)
  • Example 3: Idiomatic (essential helper for robust pipelines)