Featured image

Pipeline patterns process data sequentially through stages. Each stage receives input from the previous stage, transforms it, and passes it to the next. Channels connect the stages, allowing each to run concurrently.

Verdicts Link to heading

  • Idiomatic - Considered a best practice in Go.
  • Code Smell - Not wrong, but if you are using it, make sure you understand what you are doing.
  • Anti-Pattern - Avoid unless you are an expert and know it is okay to break this rule.

Pipeline Design Pattern Link to heading

Problem Space Link to heading

A pipeline breaks a process into discrete stages connected by channels. Each stage is a goroutine that receives values from an input channel, performs work, and sends results to an output channel. This enables concurrent processing where each stage works independently.

Practical Example 1 Link to heading

cmd/pipeline/main.go

package main

import "fmt"

func generate(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for _, n := range nums {
            out <- n
        }
    }()
    return out
}

func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            out <- n * n
        }
    }()
    return out
}

func double(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            out <- n * 2
        }
    }()
    return out
}

func main() {
    // Build pipeline: generate -> square -> double
    nums := generate(1, 2, 3, 4, 5)
    squared := square(nums)
    doubled := double(squared)
    
    // Consume results
    for result := range doubled {
        fmt.Println(result)
    }
}

Practical Example 2 Link to heading

internal/pipeline/pipeline.go

package pipeline

type Stage[T any] func(<-chan T) <-chan T

func Run[T any](source <-chan T, stages ...Stage[T]) <-chan T {
    current := source
    for _, stage := range stages {
        current = stage(current)
    }
    return current
}

func Map[T any](fn func(T) T) Stage[T] {
    return func(in <-chan T) <-chan T {
        out := make(chan T)
        go func() {
            defer close(out)
            for v := range in {
                out <- fn(v)
            }
        }()
        return out
    }
}

func Filter[T any](predicate func(T) bool) Stage[T] {
    return func(in <-chan T) <-chan T {
        out := make(chan T)
        go func() {
            defer close(out)
            for v := range in {
                if predicate(v) {
                    out <- v
                }
            }
        }()
        return out
    }
}

cmd/pipeline/main.go

package main

import (
    "fmt"
    "myapp/internal/pipeline"
)

func generate(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for _, n := range nums {
            out <- n
        }
    }()
    return out
}

func main() {
    source := generate(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    
    result := pipeline.Run(source,
        pipeline.Filter(func(n int) bool { return n%2 == 0 }),
        pipeline.Map(func(n int) int { return n * n }),
        pipeline.Map(func(n int) int { return n + 1 }),
    )
    
    for v := range result {
        fmt.Println(v) // 5, 17, 37, 65, 101
    }
}

Considerations Link to heading

Example 1: Each stage is a standalone function that returns a channel. Stages compose by passing the output of one as input to the next. Simple and explicit.

Example 2: Generic pipeline with composable stages. Map and Filter return stage functions that can be chained. More reusable but adds abstraction.

Pipelines work best when stages are I/O bound or have varying processing times. The channel buffering allows stages to work at their own pace. For CPU-bound work with uniform processing, the channel overhead may not be worth it.

Verdict Link to heading

  • Example 1: Idiomatic
  • Example 2: Idiomatic (when reusability matters)

Channel Generator Function Pattern Link to heading

Problem Space Link to heading

A generator function returns a receive-only channel and spawns a goroutine to produce values. The caller consumes values without knowing how they’re generated. This encapsulates the production logic and provides a clean interface.

Practical Example Link to heading

cmd/generator/main.go

package main

import "fmt"

func counter(max int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for i := 1; i <= max; i++ {
            out <- i
        }
    }()
    return out
}

func fibonacci(n int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        a, b := 0, 1
        for i := 0; i < n; i++ {
            out <- a
            a, b = b, a+b
        }
    }()
    return out
}

func main() {
    fmt.Println("counting:")
    for n := range counter(5) {
        fmt.Println(n)
    }
    
    fmt.Println("\nfibonacci:")
    for n := range fibonacci(10) {
        fmt.Println(n)
    }
}

Practical Example 2 Link to heading

cmd/generator/main.go

package main

import (
    "bufio"
    "context"
    "fmt"
    "os"
)

func readLines(ctx context.Context, path string) (<-chan string, <-chan error) {
    lines := make(chan string)
    errc := make(chan error, 1)
    
    go func() {
        defer close(lines)
        defer close(errc)
        
        f, err := os.Open(path)
        if err != nil {
            errc <- err
            return
        }
        defer f.Close()
        
        scanner := bufio.NewScanner(f)
        for scanner.Scan() {
            select {
            case <-ctx.Done():
                errc <- ctx.Err()
                return
            case lines <- scanner.Text():
            }
        }
        
        if err := scanner.Err(); err != nil {
            errc <- err
        }
    }()
    
    return lines, errc
}

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    
    lines, errc := readLines(ctx, "go.mod")
    
    count := 0
    for line := range lines {
        fmt.Println(line)
        count++
        if count >= 5 {
            cancel() // stop reading early
            break
        }
    }
    
    if err := <-errc; err != nil && err != context.Canceled {
        fmt.Println("error:", err)
    }
}

Considerations Link to heading

Example 1: Simple generators that produce a sequence and close the channel when done. The consumer uses range to read until the channel closes.

Example 2: Generator with context cancellation and error handling. Returns two channels: one for values, one for errors. The caller can cancel early, and the generator cleans up properly.

Always close the channel in the generator, never in the consumer. Use buffered error channels (size 1) so the goroutine can exit even if nobody reads the error.

Verdict Link to heading

  • Example 1: Idiomatic
  • Example 2: Idiomatic (when cancellation and error handling are needed)

Fan-In Pattern Link to heading

Problem Space Link to heading

Fan-In merges multiple input channels into a single output channel. This is useful when you have parallel producers and want to consume their results through a single channel. The output order depends on which producer sends first.

Practical Example 1 Link to heading

cmd/fanin/main.go

package main

import (
    "fmt"
    "time"
    "sync"
)

func producer(name string, interval time.Duration, count int) <-chan string {
    out := make(chan string)
    go func() {
        defer close(out)
        for i := 1; i <= count; i++ {
            time.Sleep(interval)
            out <- fmt.Sprintf("%s: %d", name, i)
        }
    }()
    return out
}

func fanIn(channels ...<-chan string) <-chan string {
    out := make(chan string)
    var wg sync.WaitGroup
    
    for _, ch := range channels {
        wg.Add(1)
        go func(c <-chan string) {
            defer wg.Done()
            for v := range c {
                out <- v
            }
        }(ch)
    }
    
    go func() {
        wg.Wait()
        close(out)
    }()
    
    return out
}

func main() {
    fast := producer("fast", 50*time.Millisecond, 5)
    slow := producer("slow", 150*time.Millisecond, 3)
    
    merged := fanIn(fast, slow)
    
    for msg := range merged {
        fmt.Println(msg)
    }
}

Practical Example 2 Link to heading

cmd/fanin/main.go

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

func producer(ctx context.Context, name string, interval time.Duration) <-chan string {
    out := make(chan string)
    go func() {
        defer close(out)
        i := 0
        for {
            i++
            select {
            case <-ctx.Done():
                return
            case <-time.After(interval):
                select {
                case out <- fmt.Sprintf("%s: %d", name, i):
                case <-ctx.Done():
                    return
                }
            }
        }
    }()
    return out
}

func fanIn[T any](ctx context.Context, channels ...<-chan T) <-chan T {
    out := make(chan T)
    var wg sync.WaitGroup
    
    for _, ch := range channels {
        wg.Add(1)
        go func(c <-chan T) {
            defer wg.Done()
            for {
                select {
                case <-ctx.Done():
                    return
                case v, ok := <-c:
                    if !ok {
                        return
                    }
                    select {
                    case out <- v:
                    case <-ctx.Done():
                        return
                    }
                }
            }
        }(ch)
    }
    
    go func() {
        wg.Wait()
        close(out)
    }()
    
    return out
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
    defer cancel()
    
    fast := producer(ctx, "fast", 50*time.Millisecond)
    slow := producer(ctx, "slow", 150*time.Millisecond)
    
    merged := fanIn(ctx, fast, slow)
    
    for msg := range merged {
        fmt.Println(msg)
    }
    
    fmt.Println("done")
}

Considerations Link to heading

Example 1: Simple fan-in using a WaitGroup to track when all input channels close. Each input gets its own goroutine that forwards values to the output.

Example 2: Fan-in with context cancellation. Producers run indefinitely until cancelled. The fan-in respects cancellation at every select to avoid goroutine leaks.

Fan-in does not preserve order between channels. If you need ordered results, you’ll need to add sequencing or timestamps to your values.

Verdict Link to heading

  • Example 1: Idiomatic
  • Example 2: Idiomatic (for long-running or cancellable pipelines)