Parallel execution patterns distribute work across multiple goroutines. They enable concurrent processing of independent tasks to improve throughput and utilize available resources.
Verdicts Link to heading
- Idiomatic - Considered a best practice in Go.
- Code Smell - Not wrong, but if you are using it, make sure you understand what you are doing.
- Anti-Pattern - Avoid unless you are an expert and know it is okay to break this rule.
Fan-Out Pattern Link to heading
Problem Space Link to heading
Fan-Out distributes work from a single source to multiple workers. Each worker processes items independently and concurrently. This increases throughput when you have CPU-bound or I/O-bound tasks that can run in parallel.
Practical Example 1 Link to heading
cmd/fanout/main.go
package main
import (
"fmt"
"sync"
"time"
)
func generate(nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
out <- n
}
}()
return out
}
func process(id int, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
time.Sleep(50 * time.Millisecond) // simulate work
out <- n * n
fmt.Printf("worker %d processed %d\n", id, n)
}
}()
return out
}
func merge(channels ...<-chan int) <-chan int {
out := make(chan int)
var wg sync.WaitGroup
for _, ch := range channels {
wg.Add(1)
go func(c <-chan int) {
defer wg.Done()
for v := range c {
out <- v
}
}(ch)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
source := generate(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
// Fan-out to 3 workers (each reads from same source)
w1 := process(1, source)
w2 := process(2, source)
w3 := process(3, source)
// Merge results back
for result := range merge(w1, w2, w3) {
fmt.Println("result:", result)
}
}
Practical Example 2 Link to heading
cmd/fanout/main.go
package main
import (
"context"
"fmt"
"sync"
"time"
)
type Job struct {
ID int
Data string
}
type Result struct {
JobID int
Output string
}
func fanOut[T, R any](ctx context.Context, in <-chan T, workers int, fn func(T) R) <-chan R {
out := make(chan R)
var wg sync.WaitGroup
for i := 0; i < workers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case v, ok := <-in:
if !ok {
return
}
select {
case out <- fn(v):
case <-ctx.Done():
return
}
}
}
}()
}
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
// Generate jobs
jobs := make(chan Job)
go func() {
defer close(jobs)
for i := 1; i <= 10; i++ {
select {
case <-ctx.Done():
return
case jobs <- Job{ID: i, Data: fmt.Sprintf("item-%d", i)}:
}
}
}()
// Fan-out processing
results := fanOut(ctx, jobs, 3, func(j Job) Result {
time.Sleep(100 * time.Millisecond) // simulate work
return Result{JobID: j.ID, Output: fmt.Sprintf("processed: %s", j.Data)}
})
for r := range results {
fmt.Printf("job %d: %s\n", r.JobID, r.Output)
}
}
Considerations Link to heading
Example 1: Multiple workers read from the same channel. Go’s channel semantics ensure each value goes to exactly one worker. Simple and effective for distributing work.
Example 2: Generic fan-out function with context cancellation. Workers process items using a provided function and respect cancellation. More reusable for different job types.
Fan-out works best when tasks are independent and take similar time. If tasks have dependencies or vastly different durations, consider a worker pool with a job queue instead.
Verdict Link to heading
- Example 1: Idiomatic
- Example 2: Idiomatic (when reusability and cancellation matter)
Worker Pool Pattern Link to heading
Problem Space Link to heading
A worker pool maintains a fixed number of goroutines that process jobs from a shared queue. This bounds concurrency, preventing resource exhaustion while keeping workers busy. Jobs are submitted to a channel and workers pull from it.
Practical Example 1 Link to heading
cmd/pool/main.go
package main
import (
"fmt"
"sync"
"time"
)
type Job struct {
ID int
}
type Result struct {
JobID int
Value int
}
func worker(id int, jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
time.Sleep(50 * time.Millisecond) // simulate work
fmt.Printf("worker %d processing job %d\n", id, job.ID)
results <- Result{JobID: job.ID, Value: job.ID * 2}
}
}
func main() {
jobs := make(chan Job, 100)
results := make(chan Result, 100)
// Start workers
var wg sync.WaitGroup
for i := 1; i <= 3; i++ {
wg.Add(1)
go worker(i, jobs, results, &wg)
}
// Submit jobs
for i := 1; i <= 10; i++ {
jobs <- Job{ID: i}
}
close(jobs)
// Wait for workers and close results
go func() {
wg.Wait()
close(results)
}()
// Collect results
for r := range results {
fmt.Printf("result: job %d = %d\n", r.JobID, r.Value)
}
}
Practical Example 2 Link to heading
internal/pool/pool.go
package pool
import (
"context"
"sync"
)
type Pool[T, R any] struct {
workers int
fn func(context.Context, T) (R, error)
}
func New[T, R any](workers int, fn func(context.Context, T) (R, error)) *Pool[T, R] {
return &Pool[T, R]{
workers: workers,
fn: fn,
}
}
type Result[R any] struct {
Value R
Err error
}
func (p *Pool[T, R]) Process(ctx context.Context, jobs <-chan T) <-chan Result[R] {
results := make(chan Result[R])
var wg sync.WaitGroup
for i := 0; i < p.workers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case job, ok := <-jobs:
if !ok {
return
}
value, err := p.fn(ctx, job)
select {
case results <- Result[R]{Value: value, Err: err}:
case <-ctx.Done():
return
}
}
}
}()
}
go func() {
wg.Wait()
close(results)
}()
return results
}
cmd/pool/main.go
package main
import (
"context"
"fmt"
"myapp/internal/pool"
"time"
)
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
p := pool.New(3, func(ctx context.Context, id int) (string, error) {
select {
case <-ctx.Done():
return "", ctx.Err()
case <-time.After(100 * time.Millisecond):
return fmt.Sprintf("processed-%d", id), nil
}
})
jobs := make(chan int)
go func() {
defer close(jobs)
for i := 1; i <= 10; i++ {
select {
case <-ctx.Done():
return
case jobs <- i:
}
}
}()
for result := range p.Process(ctx, jobs) {
if result.Err != nil {
fmt.Println("error:", result.Err)
} else {
fmt.Println(result.Value)
}
}
}
Considerations Link to heading
Example 1: Basic worker pool with fixed goroutines reading from a job channel. Buffered channels allow job submission without blocking. The WaitGroup tracks when all workers finish.
Example 2: Generic reusable pool with context support. Workers respect cancellation and return results with errors. Useful when you need the same pool pattern across different job types.
Size the pool based on the nature of work. For CPU-bound tasks, use runtime.NumCPU(). For I/O-bound tasks, more workers may help. Always bound the pool to prevent unbounded goroutine growth.
Verdict Link to heading
- Example 1: Idiomatic
- Example 2: Idiomatic (for reusable, production code)
Semaphore Pattern Link to heading
Problem Space Link to heading
A semaphore limits concurrent access to a resource. Unlike a mutex (which allows one), a semaphore allows N concurrent accesses. This is useful for rate limiting, connection pooling, or bounding parallel operations.
Practical Example 1 Link to heading
cmd/semaphore/main.go
package main
import (
"fmt"
"sync"
"time"
)
func main() {
// Semaphore as buffered channel
sem := make(chan struct{}, 3) // allow 3 concurrent
var wg sync.WaitGroup
for i := 1; i <= 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
// Acquire
sem <- struct{}{}
defer func() { <-sem }() // Release
fmt.Printf("worker %d: starting\n", id)
time.Sleep(200 * time.Millisecond)
fmt.Printf("worker %d: done\n", id)
}(i)
}
wg.Wait()
}
Practical Example 2 Link to heading
cmd/semaphore/main.go
package main
import (
"context"
"fmt"
"time"
"golang.org/x/sync/semaphore"
)
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
// Weighted semaphore: 3 slots
sem := semaphore.NewWeighted(3)
for i := 1; i <= 10; i++ {
// Acquire 1 slot (blocks if none available)
if err := sem.Acquire(ctx, 1); err != nil {
fmt.Printf("worker %d: failed to acquire: %v\n", i, err)
continue
}
go func(id int) {
defer sem.Release(1)
fmt.Printf("worker %d: starting\n", id)
select {
case <-ctx.Done():
fmt.Printf("worker %d: cancelled\n", id)
return
case <-time.After(200 * time.Millisecond):
fmt.Printf("worker %d: done\n", id)
}
}(i)
}
// Wait for all to complete by acquiring all slots
if err := sem.Acquire(ctx, 3); err != nil {
fmt.Println("timeout waiting for workers")
return
}
fmt.Println("all workers finished")
}
Practical Example 3 Link to heading
cmd/semaphore/main.go
package main
import (
"fmt"
"io"
"net/http"
"sync"
"time"
)
func fetchWithLimit(urls []string, maxConcurrent int) []string {
sem := make(chan struct{}, maxConcurrent)
results := make([]string, len(urls))
var wg sync.WaitGroup
client := &http.Client{Timeout: 5 * time.Second}
for i, url := range urls {
wg.Add(1)
go func(idx int, u string) {
defer wg.Done()
sem <- struct{}{}
defer func() { <-sem }()
resp, err := client.Get(u)
if err != nil {
results[idx] = fmt.Sprintf("error: %v", err)
return
}
defer resp.Body.Close()
body, _ := io.ReadAll(io.LimitReader(resp.Body, 100))
results[idx] = string(body)
}(i, url)
}
wg.Wait()
return results
}
func main() {
urls := []string{
"https://example.com",
"https://example.org",
"https://example.net",
"https://www.google.com",
"https://www.github.com",
}
results := fetchWithLimit(urls, 2)
for i, r := range results {
fmt.Printf("%d: %.50s...\n", i, r)
}
}
Considerations Link to heading
Example 1: Buffered channel as semaphore. Send to acquire, receive to release. Simple and requires no external packages.
Example 2: golang.org/x/sync/semaphore provides a weighted semaphore with context support. Acquire can request multiple slots, and it respects cancellation. Use when you need timeout behavior or weighted resources.
Example 3: Practical use case limiting concurrent HTTP requests. Prevents overwhelming servers or exhausting local resources like file descriptors.
Semaphores bound concurrency without prescribing structure. Unlike worker pools where workers are long-lived, semaphores let you spawn goroutines freely while limiting how many run simultaneously.
Verdict Link to heading
- Example 1: Idiomatic
- Example 2: Idiomatic (when context cancellation or weighted slots are needed)
- Example 3: Idiomatic