Go Concurrency in Practice: Goroutines, Channels, and the Patterns That Actually Work

Go's concurrency model is one of its most compelling features — and one of its most misused. The go keyword makes spawning a concurrent task trivial. chan makes communication between goroutines clean. But "trivial to start" doesn't mean "trivial to do correctly." Goroutine leaks, data races, and deadlocks are common in Go codebases that treat channels as magic thread-safety sauce.
This guide covers the concurrency patterns that work in production: fan-out/fan-in for parallelism, pipelines for streaming data processing, worker pools for bounded concurrency, context cancellation for clean shutdown, and the sync primitives for the cases where channels are the wrong tool.
The Problem: Concurrency vs Parallelism vs Shared State
Go's model is based on Communicating Sequential Processes (CSP): instead of sharing memory and protecting it with locks, you pass ownership of data through channels. "Do not communicate by sharing memory; share memory by communicating."
This is a principle, not a rule. The standard library's sync package has mutexes and atomic operations precisely because channels aren't always the right tool. The skill is knowing when to use each.
flowchart LR
A[Need concurrent work]
A --> B{Coordinating results\nfrom parallel tasks?}
B -- Yes --> C[Channels: fan-out/fan-in]
B -- No --> D{Protecting shared state?}
D -- Simple counter/flag --> E[sync/atomic]
D -- Complex struct --> F[sync.Mutex + struct]
D -- Read-heavy --> G[sync.RWMutex]
A --> H{Pipeline of\ntransformations?}
H -- Yes --> I[Channel pipeline stages]
A --> J{Bounded concurrency\nworker pool?}
J -- Yes --> K[Buffered channel as semaphore]
style C fill:#22c55e,color:#fff
style I fill:#22c55e,color:#fff
style K fill:#22c55e,color:#fff
How It Works: Goroutines Are Cheap
A goroutine starts with a 2KB stack (versus 1-8MB for OS threads). Go's runtime multiplexes goroutines onto OS threads (GOMAXPROCS, defaulting to CPU count). Creating 10,000 goroutines is reasonable; creating 10,000 OS threads is not.
But "cheap to create" doesn't mean "free." Every goroutine that isn't garbage collected is a resource leak. The most common leak: goroutines blocked forever on a channel nobody writes to.
// GOROUTINE LEAK: nobody ever sends to ch, goroutine blocks forever
func leaky() {
ch := make(chan int)
go func() {
val := <-ch // Blocks forever — goroutine leaks
fmt.Println(val)
}()
// ch is never written to, function returns, but goroutine lives on
}
// FIX: always have a cancellation path
func notLeaky(ctx context.Context) {
ch := make(chan int)
go func() {
select {
case val := <-ch:
fmt.Println(val)
case <-ctx.Done():
return // Clean shutdown when context is cancelled
}
}()
}
Implementation: The Core Patterns
Pattern 1: Fan-Out / Fan-In
Distribute work across N workers, collect results. The standard pattern for parallelizing CPU-bound or I/O-bound tasks:
package main
import (
"context"
"fmt"
"sync"
)
// fanOut sends each input to N concurrent workers
func fanOut[In, Out any](
ctx context.Context,
input <-chan In,
workerCount int,
process func(context.Context, In) (Out, error),
) <-chan Result[Out] {
results := make(chan Result[Out], workerCount)
var wg sync.WaitGroup
for i := 0; i < workerCount; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for item := range input {
out, err := process(ctx, item)
select {
case results <- Result[Out]{Value: out, Err: err}:
case <-ctx.Done():
return
}
}
}()
}
// Close results channel when all workers finish
go func() {
wg.Wait()
close(results)
}()
return results
}
type Result[T any] struct {
Value T
Err error
}
// Usage: fetch 100 URLs concurrently, 10 at a time
func fetchAll(ctx context.Context, urls []string) []Result[[]byte] {
// Send all URLs into a channel
urlChan := make(chan string, len(urls))
for _, url := range urls {
urlChan <- url
}
close(urlChan)
// Fan out to 10 workers
results := fanOut(ctx, urlChan, 10, func(ctx context.Context, url string) ([]byte, error) {
return httpGet(ctx, url)
})
// Collect all results
var all []Result[[]byte]
for r := range results {
all = append(all, r)
}
return all
}
Pattern 2: Pipeline
Chain transformations where each stage runs concurrently. Data flows through the pipeline like an assembly line:
// Pipeline: read files → parse lines → filter → count words
func pipeline(ctx context.Context, filePaths []string) <-chan WordCount {
// Stage 1: generate file paths
paths := func() <-chan string {
out := make(chan string)
go func() {
defer close(out)
for _, p := range filePaths {
select {
case out <- p:
case <-ctx.Done():
return
}
}
}()
return out
}()
// Stage 2: read files
lines := func(in <-chan string) <-chan string {
out := make(chan string, 100)
go func() {
defer close(out)
for path := range in {
for _, line := range readLines(path) {
select {
case out <- line:
case <-ctx.Done():
return
}
}
}
}()
return out
}(paths)
// Stage 3: count words (fan out to 4 workers for CPU-bound work)
counts := fanOut(ctx, lines, 4, func(_ context.Context, line string) (WordCount, error) {
return countWords(line), nil
})
// Stage 4: aggregate
aggregated := make(chan WordCount)
go func() {
defer close(aggregated)
total := WordCount{}
for r := range counts {
if r.Err == nil {
total.Merge(r.Value)
}
}
aggregated <- total
}()
return aggregated
}
Each stage runs in its own goroutine. Backpressure is natural: if a downstream stage is slow, the upstream channel fills up and the upstream goroutine blocks — no unbounded memory growth.
Pattern 3: Worker Pool with Bounded Concurrency
For tasks where you need to limit concurrency (database connections, external API rate limits):
// semaphore: buffered channel as counting semaphore
type semaphore chan struct{}
func newSemaphore(n int) semaphore {
return make(chan struct{}, n)
}
func (s semaphore) Acquire() { s <- struct{}{} }
func (s semaphore) Release() { <-s }
// BoundedPool runs tasks with at most maxConcurrent running at once
type BoundedPool struct {
sem semaphore
wg sync.WaitGroup
}
func NewBoundedPool(maxConcurrent int) *BoundedPool {
return &BoundedPool{sem: newSemaphore(maxConcurrent)}
}
func (p *BoundedPool) Submit(ctx context.Context, task func()) error {
select {
case p.sem <- struct{}{}:
// Acquired semaphore slot
case <-ctx.Done():
return ctx.Err()
}
p.wg.Add(1)
go func() {
defer p.wg.Done()
defer p.sem.Release() // Always release slot
task()
}()
return nil
}
func (p *BoundedPool) Wait() { p.wg.Wait() }
// Usage: process 1000 items with max 20 concurrent DB writes
func processItems(ctx context.Context, items []Item) error {
pool := NewBoundedPool(20) // Max 20 concurrent operations
for _, item := range items {
item := item // Capture loop variable
if err := pool.Submit(ctx, func() {
if err := db.Insert(ctx, item); err != nil {
log.Printf("failed to insert %s: %v", item.ID, err)
}
}); err != nil {
return fmt.Errorf("pool submit: %w", err)
}
}
pool.Wait()
return nil
}
Pattern 4: Context-Driven Cancellation
Every long-running goroutine should respect context cancellation for clean shutdown. This is not optional in production code:
func processStream(ctx context.Context, events <-chan Event) error {
for {
select {
case event, ok := <-events:
if !ok {
return nil // Channel closed — done
}
if err := handleEvent(ctx, event); err != nil {
return fmt.Errorf("handle event: %w", err)
}
case <-ctx.Done():
// Context cancelled (timeout, shutdown signal)
// Clean up and return
log.Printf("processStream stopping: %v", ctx.Err())
return ctx.Err()
}
}
}
// Graceful shutdown pattern for servers
func main() {
ctx, cancel := context.WithCancel(context.Background())
// Handle OS shutdown signals
go func() {
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGINT)
<-sigChan
log.Println("Shutting down...")
cancel() // Cancel root context — all child contexts propagate
}()
// Start workers with the cancellable context
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
processStream(ctx, eventStream)
}(i)
}
wg.Wait() // Wait for all goroutines to finish after cancel
log.Println("All workers stopped. Exiting.")
}
errgroup: Fan-Out with Error Propagation
The standard library's sync.WaitGroup doesn't propagate errors from goroutines. golang.org/x/sync/errgroup adds error propagation and context cancellation — it's what most production fan-out code should use:
import "golang.org/x/sync/errgroup"
// errgroup: cancel all goroutines on first error
func fetchAllWithErrors(ctx context.Context, urls []string) ([][]byte, error) {
// g.Wait() returns the first non-nil error from any goroutine
g, ctx := errgroup.WithContext(ctx) // ctx is cancelled when any goroutine fails
results := make([][]byte, len(urls))
for i, url := range urls {
i, url := i, url // Capture loop variables
g.Go(func() error {
data, err := httpGet(ctx, url)
if err != nil {
return fmt.Errorf("fetching %s: %w", url, err)
}
results[i] = data
return nil
})
}
// Wait for all goroutines to finish (or for one to fail)
if err := g.Wait(); err != nil {
return nil, err // First error from any goroutine
}
return results, nil
}
// With concurrency limit: errgroup + semaphore
func fetchWithLimit(ctx context.Context, urls []string, maxConcurrent int) ([][]byte, error) {
g, ctx := errgroup.WithContext(ctx)
sem := make(chan struct{}, maxConcurrent)
results := make([][]byte, len(urls))
for i, url := range urls {
i, url := i, url
sem <- struct{}{} // Acquire semaphore (blocks if at limit)
g.Go(func() error {
defer func() { <-sem }() // Release on completion
data, err := httpGet(ctx, url)
if err != nil {
return fmt.Errorf("fetching %s: %w", url, err)
}
results[i] = data
return nil
})
}
return results, g.Wait()
}
errgroup.WithContext gives you a context that's cancelled when any goroutine returns an error. Other goroutines that check ctx.Done() will stop early — no wasted work after a partial failure.
sync.Once for Initialization
For lazy initialization that must happen exactly once — database connection setup, config loading — sync.Once is the correct tool:
var (
dbOnce sync.Once
dbInstance *sql.DB
dbErr error
)
func GetDB() (*sql.DB, error) {
dbOnce.Do(func() {
// This runs exactly once, even with 1000 concurrent calls
dbInstance, dbErr = sql.Open("postgres", os.Getenv("DATABASE_URL"))
if dbErr == nil {
dbErr = dbInstance.Ping()
}
})
return dbInstance, dbErr
}
// After dbOnce.Do fires once, all subsequent calls return immediately
// with the same dbInstance and dbErr — no locking overhead
sync.Once is safe for concurrent use and has no overhead after the first call. It's preferable to init() for resources that need explicit configuration, and preferable to a bare mutex for "initialize exactly once" semantics.
When to Use sync.Mutex Instead of Channels
Channels are the right tool for ownership transfer and coordination. Mutexes are the right tool for protecting shared state with many readers or complex update logic.
// Cache: multiple goroutines read, occasionally write — RWMutex is correct
type Cache[K comparable, V any] struct {
mu sync.RWMutex
items map[K]V
}
func (c *Cache[K, V]) Get(key K) (V, bool) {
c.mu.RLock() // Multiple concurrent readers allowed
defer c.mu.RUnlock()
v, ok := c.items[key]
return v, ok
}
func (c *Cache[K, V]) Set(key K, value V) {
c.mu.Lock() // Exclusive write lock
defer c.mu.Unlock()
c.items[key] = value
}
// Counter: atomic operations avoid mutex overhead for simple increments
type RequestCounter struct {
total atomic.Int64
errors atomic.Int64
}
func (c *RequestCounter) RecordSuccess() { c.total.Add(1) }
func (c *RequestCounter) RecordError() { c.total.Add(1); c.errors.Add(1) }
func (c *RequestCounter) ErrorRate() float64 {
total := c.total.Load()
if total == 0 { return 0 }
return float64(c.errors.Load()) / float64(total)
}
Testing Concurrent Code
Concurrency bugs are notoriously hard to reproduce. Three strategies for reliable concurrent tests:
// Strategy 1: Run tests with -race flag always (in CI, locally)
// go test -race ./...
// Strategy 2: Use testing.T.Parallel() to run tests concurrently
// (increases the chance of surfacing races in CI)
func TestCache(t *testing.T) {
t.Parallel()
c := NewCache[string, int]()
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
i := i
go func() {
defer wg.Done()
c.Set(fmt.Sprintf("key-%d", i), i)
c.Get(fmt.Sprintf("key-%d", i))
}()
}
wg.Wait()
}
// Strategy 3: Deterministic concurrency testing with -count=100
// Running the same test 100 times increases race detection probability
// go test -race -count=100 -run TestCache ./...
// Strategy 4: goleak for goroutine leak detection
func TestProcessStream(t *testing.T) {
defer goleak.VerifyNone(t) // Fails if goroutines outlive the test
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
events := make(chan Event, 10)
events <- Event{ID: "1"}
close(events)
err := processStream(ctx, events)
require.NoError(t, err)
// goleak checks: are all goroutines spawned during this test done?
}
The -race flag is non-negotiable. Make it part of your CI pipeline's test command. Data races that pass in sequential tests routinely appear in production under load.
When the race detector fires in CI, don't dismiss it. The output shows the exact lines where the unsynchronized access occurred:
==================
WARNING: DATA RACE
Write at 0x00c000126058 by goroutine 8:
main.dataraceExample.func1()
/tmp/main.go:12 +0x40 ← goroutine 8 writes here
Previous write at 0x00c000126058 by goroutine 7:
main.dataraceExample.func1()
/tmp/main.go:12 +0x40 ← goroutine 7 also writes here
Goroutine 8 (running) created at:
main.dataraceExample()
/tmp/main.go:9 +0x78
==================
Three lines in the race report: the conflicting write location, the previous write location, and where the goroutines were created. This is almost always enough to identify the fix immediately.
Detecting and Preventing Data Races
Go ships with a race detector. Run it in tests and staging — it catches races at runtime:
# Run tests with race detector go test -race ./... # Build and run a binary with race detector (production: too slow; staging: essential) go run -race main.go
The race detector adds ~5-10× overhead and memory usage. Run it in staging, not production. But always run it before shipping new concurrent code.
// This has a data race: two goroutines access counter without synchronization
func dataraceExample() {
var counter int
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter++ // DATA RACE: concurrent read-modify-write
}()
}
wg.Wait()
fmt.Println(counter) // Not guaranteed to be 100
}
// Fix: use atomic
func noRaceExample() {
var counter atomic.Int64
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter.Add(1) // Atomic — no race
}()
}
wg.Wait()
fmt.Println(counter.Load()) // Always 100
}
Production Considerations
Goroutine Leak Detection
In production, monitor goroutine counts. A growing goroutine count is a leak:
import "runtime"
func goroutineMetrics() {
ticker := time.NewTicker(30 * time.Second)
for range ticker.C {
count := runtime.NumGoroutine()
metrics.Gauge("goroutines.active", float64(count))
if count > 10000 {
log.Printf("WARNING: high goroutine count: %d — possible leak", count)
}
}
}
Use goleak in tests to catch leaks before production:
import "go.uber.org/goleak"
func TestFanOut(t *testing.T) {
defer goleak.VerifyNone(t) // Fails the test if goroutines leak
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// ... test fan-out pattern
}
GOMAXPROCS in Containers
By default, GOMAXPROCS equals the number of logical CPUs on the host. In containers with CPU limits, this causes the runtime to spawn more threads than the CPU quota allows, degrading performance. Use automaxprocs:
import _ "go.uber.org/automaxprocs" // Reads cgroup CPU limits, sets GOMAXPROCS correctly
func main() {
// automaxprocs sets GOMAXPROCS to the container's CPU limit
// e.g., if the container has 0.5 CPU, GOMAXPROCS = 1
...
}
The select Statement and Timeouts
select is Go's mechanism for waiting on multiple channel operations simultaneously. It's also how you implement timeouts and cancellation cleanly:
// select: handle whichever channel is ready first
func processWithTimeout(ctx context.Context, work func() (Result, error)) (Result, error) {
resultCh := make(chan Result, 1)
errCh := make(chan error, 1)
go func() {
result, err := work()
if err != nil {
errCh <- err
} else {
resultCh <- result
}
}()
select {
case result := <-resultCh:
return result, nil
case err := <-errCh:
return Result{}, err
case <-ctx.Done():
// Context timed out or was cancelled — work is still running in background
// (which is a goroutine leak unless work also checks ctx.Done())
return Result{}, fmt.Errorf("timed out: %w", ctx.Err())
}
}
// Non-blocking channel operation with default
func tryReceive[T any](ch <-chan T) (T, bool) {
select {
case val := <-ch:
return val, true
default:
var zero T
return zero, false // Channel empty — don't block
}
}
// Rate limiting with time.Ticker
func rateLimitedWorker(ctx context.Context, work <-chan Task, ratePerSecond int) {
ticker := time.NewTicker(time.Second / time.Duration(ratePerSecond))
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case task := <-work:
<-ticker.C // Wait for rate limiter tick before processing
go process(task)
}
}
}
select with a default case makes channel operations non-blocking — useful for try-receive patterns and implementing backpressure. Without default, select blocks until one of the channels is ready.
Conclusion
Go's concurrency model is powerful but requires understanding the patterns to avoid the failure modes:
- Always have a cancellation path in goroutines — context or channel close
- Fan-out/fan-in for parallel work; worker pools for bounded concurrency
- Channels for coordination; mutexes for shared state; atomics for counters
- Race detector on every PR — silent data races become production incidents
- Monitor goroutine count — leaks show up as monotonically growing metrics
Go makes concurrent code easy to write. The discipline is making it correct.
The most important shift when learning Go concurrency: think in ownership. A value should have one clear owner at a time. Channels transfer ownership. When you find yourself reaching for a mutex to protect every field of a struct, ask whether a redesign with clearer ownership would eliminate the mutex entirely. Often it does.
Two resources that shaped how Go programmers think about this: Rob Pike's "Concurrency Is Not Parallelism" talk (the visualizations alone are worth watching) and the "Pipelines and cancellation" Go blog post. Both are free, short, and still the best mental models for idiomatic Go concurrency in 2026.
Sources
- "Concurrency in Go" by Katherine Cox-Buday
- Rob Pike: "Concurrency Is Not Parallelism" (GopherCon talk)
- Go blog: Pipelines and cancellation
- go.uber.org/goleak: goroutine leak testing
- go.uber.org/automaxprocs: container-aware GOMAXPROCS
Enjoyed this post? Follow AmtocSoft for AI tutorials from beginner to professional.
☕ Buy Me a Coffee | 🔔 YouTube | 💼 LinkedIn | 🐦 X/Twitter
Comments
Post a Comment