Post

32. Advanced Concurrency Patterns

πŸš€ Master Go concurrency! Learn worker pools, fan-out/fan-in, pipelines, select patterns, rate limiting, and semaphores. Build scalable concurrent systems! ✨

32. Advanced Concurrency Patterns

What we will learn in this post?

  • πŸ‘‰ Worker Pool Pattern
  • πŸ‘‰ Fan-Out and Fan-In Patterns
  • πŸ‘‰ Pipeline Pattern
  • πŸ‘‰ Select Statement Patterns
  • πŸ‘‰ Rate Limiting and Throttling
  • πŸ‘‰ Semaphore Pattern

Understanding the Worker Pool Pattern πŸ› οΈ

The worker pool pattern is a great way to manage tasks in a program. It helps you control how many tasks run at the same time, making your application efficient and responsive. Let’s break it down! This pattern is fundamental in production systems handling thousands of concurrent requests, from web servers to data processing pipelines.

What is a Worker Pool? πŸ€”

A worker pool consists of:

  • Workers: These are the threads or processes that do the work.
  • Job Queue: This is where tasks wait to be processed.

How It Works πŸ”„

  1. Job Distribution: When a task is added to the queue, a worker picks it up and starts working on it.
  2. Concurrency Control: You can limit the number of workers to control how many tasks run at once.
  3. Graceful Shutdown: Workers can finish their tasks before stopping, ensuring no work is lost.
  4. Backpressure Handling: If the queue gets too full, you can pause adding new tasks until some are completed.

Worker Pool Architecture πŸ“Š

graph TD
    A["πŸ“‹ Job Queue"]:::style1 --> B{"βš™οΈ Dispatcher"}:::style2
    B --> C["πŸ‘· Worker 1"]:::style3
    B --> D["πŸ‘· Worker 2"]:::style3
    B --> E["πŸ‘· Worker 3"]:::style3
    B --> F["πŸ‘· Worker N"]:::style3
    C --> G["βœ… Results Channel"]:::style4
    D --> G
    E --> G
    F --> G
    G --> H["πŸ“Š Output"]:::style5

    classDef style1 fill:#ff4f81,stroke:#c43e3e,color:#fff,font-size:16px,stroke-width:3px,rx:14,shadow:6px;
    classDef style2 fill:#6b5bff,stroke:#4a3f6b,color:#fff,font-size:16px,stroke-width:3px,rx:14,shadow:6px;
    classDef style3 fill:#ffd700,stroke:#d99120,color:#222,font-size:16px,stroke-width:3px,rx:14,shadow:6px;
    classDef style4 fill:#00bfae,stroke:#005f99,color:#fff,font-size:16px,stroke-width:3px,rx:14,shadow:6px;
    classDef style5 fill:#ff9800,stroke:#f57c00,color:#fff,font-size:16px,stroke-width:3px,rx:14,shadow:6px;

    linkStyle default stroke:#e67e22,stroke-width:3px;

Example Implementation πŸ’»

Here’s a simple example in Python:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import queue
import threading
import time

def worker(job_queue):
    while True:
        job = job_queue.get()
        if job is None:
            break
        print(f"Processing {job}")
        time.sleep(1)  # Simulate work
        job_queue.task_done()

job_queue = queue.Queue()
num_workers = 4

# Start workers
threads = []
for _ in range(num_workers):
    t = threading.Thread(target=worker, args=(job_queue,))
    t.start()
    threads.append(t)

# Add jobs
for job in range(10):
    job_queue.put(job)

# Wait for all jobs to finish
job_queue.join()

# Stop workers
for _ in range(num_workers):
    job_queue.put(None)
for t in threads:
    t.join()

Key Benefits 🌟

  • Efficiency: Makes better use of resources.
  • Scalability: Easy to add more workers.
  • Control: Manage how tasks are processed.

Real-World Example: Image Processing Service 🎯

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package main

import (
	"context"
	"fmt"
	"log"
	"sync"
	"time"
)

type Image struct {
	ID       string
	URL      string
	Format   string
}

type ProcessedImage struct {
	ID          string
	ThumbnailURL string
	Error       error
}

// ImageProcessor handles image processing with worker pool
type ImageProcessor struct {
	workerCount int
	jobs        chan Image
	results     chan ProcessedImage
	wg          sync.WaitGroup
}

func NewImageProcessor(workerCount, bufferSize int) *ImageProcessor {
	return &ImageProcessor{
		workerCount: workerCount,
		jobs:        make(chan Image, bufferSize),
		results:     make(chan ProcessedImage, bufferSize),
	}
}

func (ip *ImageProcessor) Start(ctx context.Context) {
	for i := 0; i < ip.workerCount; i++ {
		ip.wg.Add(1)
		go ip.worker(ctx, i)
	}
}

func (ip *ImageProcessor) worker(ctx context.Context, id int) {
	defer ip.wg.Done()
	for {
		select {
		case <-ctx.Done():
			log.Printf("Worker %d: shutting down\n", id)
			return
		case job, ok := <-ip.jobs:
			if !ok {
				return
			}
			log.Printf("Worker %d: processing image %s\n", id, job.ID)
			
			// Simulate image processing
			time.Sleep(500 * time.Millisecond)
			
			ip.results <- ProcessedImage{
				ID:           job.ID,
				ThumbnailURL: fmt.Sprintf("https://cdn.example.com/thumb/%s", job.ID),
				Error:        nil,
			}
		}
	}
}

func (ip *ImageProcessor) Submit(img Image) {
	ip.jobs <- img
}

func (ip *ImageProcessor) Shutdown() {
	close(ip.jobs)
	ip.wg.Wait()
	close(ip.results)
}

func (ip *ImageProcessor) Results() <-chan ProcessedImage {
	return ip.results
}

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer cancel()

	// Create processor with 3 workers
	processor := NewImageProcessor(3, 10)
	processor.Start(ctx)

	// Submit images for processing
	images := []Image{
		{ID: "img-1", URL: "https://example.com/photo1.jpg", Format: "jpg"},
		{ID: "img-2", URL: "https://example.com/photo2.png", Format: "png"},
		{ID: "img-3", URL: "https://example.com/photo3.jpg", Format: "jpg"},
		{ID: "img-4", URL: "https://example.com/photo4.jpg", Format: "jpg"},
		{ID: "img-5", URL: "https://example.com/photo5.png", Format: "png"},
	}

	go func() {
		for _, img := range images {
			processor.Submit(img)
		}
		processor.Shutdown()
	}()

	// Collect results
	for result := range processor.Results() {
		if result.Error != nil {
			log.Printf("Error processing %s: %v\n", result.ID, result.Error)
		} else {
			fmt.Printf("βœ“ Processed %s -> %s\n", result.ID, result.ThumbnailURL)
		}
	}
}

Understanding Fan-Out and Fan-In Patterns

Fan-out and fan-in are essential patterns for distributing work and collecting results in concurrent systems, used extensively in MapReduce frameworks and distributed computing.

Fan-Out and Fan-In Visualization πŸ“Š

graph LR
    A["πŸ“₯ Input Stream"]:::style1 --> B["🌟 Fan-Out"]:::style2
    B --> C["βš™οΈ Worker 1"]:::style3
    B --> D["βš™οΈ Worker 2"]:::style3
    B --> E["βš™οΈ Worker 3"]:::style3
    C --> F["πŸ“Š Fan-In"]:::style4
    D --> F
    E --> F
    F --> G["πŸ“€ Aggregated Output"]:::style5

    classDef style1 fill:#ff4f81,stroke:#c43e3e,color:#fff,font-size:16px,stroke-width:3px,rx:14,shadow:6px;
    classDef style2 fill:#6b5bff,stroke:#4a3f6b,color:#fff,font-size:16px,stroke-width:3px,rx:14,shadow:6px;
    classDef style3 fill:#ffd700,stroke:#d99120,color:#222,font-size:16px,stroke-width:3px,rx:14,shadow:6px;
    classDef style4 fill:#00bfae,stroke:#005f99,color:#fff,font-size:16px,stroke-width:3px,rx:14,shadow:6px;
    classDef style5 fill:#ff9800,stroke:#f57c00,color:#fff,font-size:16px,stroke-width:3px,rx:14,shadow:6px;

    linkStyle default stroke:#e67e22,stroke-width:3px;

What is Fan-Out? 🌟

The fan-out pattern allows you to distribute work across multiple goroutines. This is useful for tasks that can be done in parallel.

Example of Fan-Out

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package main

import (
    "fmt"
    "sync"
)

func worker(id int, wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Printf("Worker %d is processing\n", id)
}

func main() {
    var wg sync.WaitGroup
    for i := 1; i <= 5; i++ {
        wg.Add(1)
        go worker(i, &wg)
    }
    wg.Wait()
}

What is Fan-In? πŸ“₯

The fan-in pattern collects results from multiple goroutines into a single channel. This helps in aggregating results efficiently.

Example of Fan-In

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package main

import (
    "fmt"
)

func square(n int, ch chan int) {
    ch <- n * n
}

func main() {
    ch := make(chan int)
    for i := 1; i <= 5; i++ {
        go square(i, ch)
    }
    
    for i := 1; i <= 5; i++ {
        fmt.Println(<-ch)
    }
}

Combining Fan-Out and Fan-In πŸ”„

You can combine both patterns to create a pipeline for parallel processing.

Example of Combined Patterns

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package main

import (
    "fmt"
    "sync"
)

func worker(id int, ch chan<- int, wg *sync.WaitGroup) {
    defer wg.Done()
    ch <- id * id
}

func main() {
    var wg sync.WaitGroup
    ch := make(chan int, 5)

    for i := 1; i <= 5; i++ {
        wg.Add(1)
        go worker(i, ch, &wg)
    }

    go func() {
        wg.Wait()
        close(ch)
    }()

    for result := range ch {
        fmt.Println(result)
    }
}

Real-World Example: Distributed Log Processing 🎯

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

type LogEntry struct {
	Timestamp time.Time
	Level     string
	Message   string
}

type ProcessedLog struct {
	Entry     LogEntry
	WordCount int
	Priority  int
}

// Fan-Out: Distribute logs to multiple processors
func logGenerator(count int) <-chan LogEntry {
	outs := make(chan LogEntry)
	go func() {
		defer close(outs)
		levels := []string{"INFO", "WARN", "ERROR", "DEBUG"}
		for i := 0; i < count; i++ {
			outs <- LogEntry{
				Timestamp: time.Now(),
				Level:     levels[rand.Intn(len(levels))],
				Message:   fmt.Sprintf("Log message %d with some content", i),
			}
			time.Sleep(50 * time.Millisecond)
		}
	}()
	return outs
}

// Processing stage
func processLogs(in <-chan LogEntry) <-chan ProcessedLog {
	out := make(chan ProcessedLog)
	go func() {
		defer close(out)
		for entry := range in {
			// Process log entry
			wordCount := len(entry.Message)
			priority := 0
			switch entry.Level {
			case "ERROR":
				priority = 3
			case "WARN":
				priority = 2
			case "INFO":
				priority = 1
			}
			
			out <- ProcessedLog{
				Entry:     entry,
				WordCount: wordCount,
				Priority:  priority,
			}
		}
	}()
	return out
}

// Fan-In: Merge multiple processing channels
func merge(channels ...<-chan ProcessedLog) <-chan ProcessedLog {
	out := make(chan ProcessedLog)
	var wg sync.WaitGroup

	output := func(c <-chan ProcessedLog) {
		defer wg.Done()
		for log := range c {
			out <- log
		}
	}

	wg.Add(len(channels))
	for _, ch := range channels {
		go output(ch)
	}

	go func() {
		wg.Wait()
		close(out)
	}()

	return out
}

func main() {
	// Generate logs
	logs := logGenerator(20)

	// Fan-Out: Create multiple processors
	processor1 := processLogs(logs)
	processor2 := processLogs(logs)
	processor3 := processLogs(logs)

	// Fan-In: Merge results
	merged := merge(processor1, processor2, processor3)

	// Consume results
	for processed := range merged {
		fmt.Printf("[%s] Priority: %d, Words: %d - %s\n",
			processed.Entry.Level,
			processed.Priority,
			processed.WordCount,
			processed.Entry.Message)
	}
}

Building Concurrent Pipelines with Stages πŸš€

Creating a data processing pipeline can be fun and efficient! Let’s break it down into simple parts: stages, channels, and how they all work together. Pipelines are the backbone of stream processing systems like Apache Kafka and real-time data analytics platforms.

What are Stages and Channels? 🌟

  • Stages: Think of these as steps in a recipe. Each stage does a specific job, like filtering or transforming data.
  • Channels: These are like pipes that connect the stages. They carry data from one stage to the next.

Pipeline Stages Flow πŸ“Š

graph LR
    A["🎬 Generator"]:::style1 --> B["βš™οΈ Transform Stage 1"]:::style2
    B --> C["πŸ”§ Transform Stage 2"]:::style3
    C --> D["πŸ” Filter Stage"]:::style4
    D --> E["πŸ“Š Sink Stage"]:::style5

    classDef style1 fill:#ff4f81,stroke:#c43e3e,color:#fff,font-size:16px,stroke-width:3px,rx:14,shadow:6px;
    classDef style2 fill:#6b5bff,stroke:#4a3f6b,color:#fff,font-size:16px,stroke-width:3px,rx:14,shadow:6px;
    classDef style3 fill:#ffd700,stroke:#d99120,color:#222,font-size:16px,stroke-width:3px,rx:14,shadow:6px;
    classDef style4 fill:#00bfae,stroke:#005f99,color:#fff,font-size:16px,stroke-width:3px,rx:14,shadow:6px;
    classDef style5 fill:#ff9800,stroke:#f57c00,color:#fff,font-size:16px,stroke-width:3px,rx:14,shadow:6px;

    linkStyle default stroke:#e67e22,stroke-width:3px;

The Generator Pattern πŸ”„

This pattern helps us create data in a way that can be processed one piece at a time. It’s like a conveyor belt in a factory!

Processing Stages and Sink Stage πŸ› οΈ

  • Processing Stages: Each stage processes data and sends it to the next stage.
  • Sink Stage: This is the final stage where we collect the results.

Example Implementation πŸ’»

Here’s a simple example in Python:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import asyncio

async def stage_one(channel):
    for i in range(5):
        await channel.put(i)
    await channel.put(None)  # Signal completion

async def stage_two(channel_in, channel_out):
    while True:
        item = await channel_in.get()
        if item is None:
            await channel_out.put(None)  # Signal completion
            break
        await channel_out.put(item * 2)

async def sink(channel):
    while True:
        item = await channel.get()
        if item is None:
            break
        print(f"Processed item: {item}")

async def main():
    channel1 = asyncio.Queue()
    channel2 = asyncio.Queue()

    await asyncio.gather(
        stage_one(channel1),
        stage_two(channel1, channel2),
        sink(channel2)
    )

asyncio.run(main())

Key Points πŸ“Œ

  • Use goroutines for non-blocking operations.
  • Close channels properly to avoid hanging processes.
  • Each stage can be independently developed and tested.

Real-World Example: ETL Data Pipeline 🎯

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package main

import (
	"encoding/json"
	"fmt"
	"strings"
	"time"
)

type RawData struct {
	ID        int
	Content   string
	Timestamp time.Time
}

type CleanedData struct {
	ID      int
	Content string
	Words   int
}

type EnrichedData struct {
	ID          int
	Content     string
	Words       int
	Category    string
	ProcessedAt time.Time
}

// Extract stage: Read raw data
func extract(count int) <-chan RawData {
	out := make(chan RawData)
	go func() {
		defer close(out)
		for i := 1; i <= count; i++ {
			out <- RawData{
				ID:        i,
				Content:   fmt.Sprintf("  Raw DATA %d with EXTRA spaces  ", i),
				Timestamp: time.Now(),
			}
			time.Sleep(100 * time.Millisecond)
		}
	}()
	return out
}

// Transform stage: Clean and normalize data
func transform(in <-chan RawData) <-chan CleanedData {
	out := make(chan CleanedData)
	go func() {
		defer close(out)
		for data := range in {
			cleaned := strings.TrimSpace(data.Content)
			cleaned = strings.ToLower(cleaned)
			wordCount := len(strings.Fields(cleaned))
			
			out <- CleanedData{
				ID:      data.ID,
				Content: cleaned,
				Words:   wordCount,
			}
		}
	}()
	return out
}

// Enrich stage: Add metadata and categorize
func enrich(in <-chan CleanedData) <-chan EnrichedData {
	out := make(chan EnrichedData)
	go func() {
		defer close(out)
		for data := range in {
			category := "short"
			if data.Words > 5 {
				category = "long"
			}
			
			out <- EnrichedData{
				ID:          data.ID,
				Content:     data.Content,
				Words:       data.Words,
				Category:    category,
				ProcessedAt: time.Now(),
			}
		}
	}()
	return out
}

// Load stage: Store enriched data
func load(in <-chan EnrichedData) {
	for data := range in {
		jsonData, _ := json.MarshalIndent(data, "", "  ")
		fmt.Printf("Loaded to database:\n%s\n\n", jsonData)
	}
}

func main() {
	fmt.Println("Starting ETL Pipeline...\n")

	// Build ETL pipeline
	rawData := extract(5)
	cleanedData := transform(rawData)
	enrichedData := enrich(cleanedData)
	load(enrichedData)

	fmt.Println("Pipeline completed!")
}

Advanced Select Patterns in Go

Understanding Select in Go

The select statement in Go is a powerful tool for handling multiple channel operations. Here are some advanced patterns to enhance your Go programming skills! πŸš€ The select statement is crucial for building responsive, non-blocking concurrent systems.

1. Timeout Handling ⏳

You can use a time.After channel to implement timeouts. If an operation takes too long, you can handle it gracefully.

1
2
3
4
5
6
select {
case result := <-ch:
    // Handle result
case <-time.After(2 * time.Second):
    // Handle timeout
}

2. Default Case for Non-blocking Operations 🚦

Using a default case allows you to perform non-blocking operations.

1
2
3
4
5
6
select {
case result := <-ch:
    // Handle result
default:
    // Do something else if no data is ready
}

3. Nil Channel Tricks πŸŒ€

You can use a nil channel to skip a case in select.

1
2
3
4
var ch chan int
select {
case <-ch: // This case is skipped
}

4. Select with Done Channel for Cancellation ❌

You can use a done channel to signal cancellation.

1
2
3
4
5
6
7
done := make(chan struct{})
select {
case <-done:
    // Handle cancellation
case result := <-ch:
    // Handle result
}

5. Combining Multiple Channel Operations πŸ”—

You can combine multiple channels in a single select.

1
2
3
4
5
6
select {
case result1 := <-ch1:
    // Handle result from ch1
case result2 := <-ch2:
    // Handle result from ch2
}

Implementing Rate Limiting in Go 🚦

Rate limiting is essential for controlling how many requests your API can handle. It helps prevent abuse and ensures fair usage. Let’s explore how to implement it in Go using the token bucket algorithm and the golang.org/x/time/rate package.

What is Rate Limiting?

Rate limiting restricts the number of requests a user can make in a given time frame. This is crucial for:

  • Preventing server overload
  • Ensuring fair access for all users
  • Protecting against abuse

Using the Token Bucket Algorithm

The token bucket algorithm allows for burst traffic while maintaining an average rate. Here’s how it works:

  • Tokens are added to a bucket at a fixed rate.
  • Each request consumes a token.
  • If the bucket is empty, requests are denied until tokens are available.

Implementation Example

Here’s a simple example using Go:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package main

import (
    "golang.org/x/time/rate"
    "time"
    "fmt"
)

func main() {
    limiter := rate.NewLimiter(1, 5) // 1 request per second, burst of 5

    for i := 0; i < 10; i++ {
        if err := limiter.Wait(context.Background()); err != nil {
            fmt.Println("Rate limit exceeded:", err)
            continue
        }
        fmt.Println("API request", i)
    }
}

Handling Burst Traffic

With the above setup, you can handle bursts of up to 5 requests at once, while maintaining a steady rate of 1 request per second.

Using Semaphores in Go 🚦

Semaphores help control access to resources in concurrent programming. Let’s explore how to use them in Go!

What is a Semaphore?

A semaphore is a signaling mechanism that limits the number of goroutines accessing a resource.

Buffered Channels as Semaphores

You can use buffered channels to create a simple semaphore:

1
2
3
4
5
6
7
8
9
sem := make(chan struct{}, 3) // Limit to 3 concurrent accesses

for i := 0; i < 10; i++ {
    sem <- struct{}{} // Acquire
    go func(i int) {
        defer func() { <-sem }() // Release
        // Access resource
    }(i)
}

Weighted Semaphores

Weighted semaphores allow different weights for goroutines. You can implement this with a custom structure.

Using golang.org/x/sync/semaphore

This package provides a robust semaphore implementation. Here’s how to use it:

1
2
3
4
5
6
7
8
9
10
11
12
13
import "golang.org/x/sync/semaphore"

sem := semaphore.NewWeighted(3) // Limit to 3

for i := 0; i < 10; i++ {
    if err := sem.Acquire(ctx, 1); err != nil {
        // Handle error
    }
    go func(i int) {
        defer sem.Release(1) // Release
        // Access resource
    }(i)
}

Key Points

  • Control Access: Semaphores limit concurrent access to resources.
  • Buffered Channels: Simple way to implement semaphores.
  • Weighted Semaphores: Allow different access levels.
  • golang.org/x/sync/semaphore: A powerful package for semaphores.

Concurrency Patterns Comparison πŸ“Š

Choosing the right concurrency pattern is crucial for building efficient systems. Here’s a comprehensive comparison:

PatternUse CaseProsConsBest For
Worker PoolFixed number of goroutines processing tasksβœ… Controlled resource usage
βœ… Predictable performance
βœ… Easy backpressure handling
❌ Fixed capacity
❌ May underutilize with low load
API servers, batch processing, image processing
Fan-Out/Fan-InDistribute work, collect resultsβœ… Parallel processing
βœ… High throughput
βœ… Load distribution
❌ Complex coordination
❌ Result ordering challenges
MapReduce, log aggregation, data analysis
PipelineSequential data transformationβœ… Composable stages
βœ… Stream processing
βœ… Memory efficient
❌ Sequential bottlenecks
❌ Complex error handling
ETL, data streams, video processing
Select PatternsMultiple channel coordinationβœ… Timeout handling
βœ… Cancellation support
βœ… Non-blocking ops
❌ Can be complex
❌ Requires careful design
Event handling, multiplexing, responsive systems
Rate LimitingAPI throttling, resource protectionβœ… Prevents overload
βœ… Fair resource usage
βœ… Burst support
❌ May reject requests
❌ Adds latency
Public APIs, external service calls, DDoS protection
SemaphoreLimit concurrent resource accessβœ… Fine-grained control
βœ… Weighted access
βœ… Flexible limits
❌ Potential deadlocks
❌ Requires discipline
Connection pools, file handles, memory limits

When to Use Each Pattern 🎯

Choose Worker Pool when:

  • You have a high volume of independent tasks
  • Resource usage must be predictable and controlled
  • Tasks have similar processing time

Choose Fan-Out/Fan-In when:

  • Work can be split into independent parallel operations
  • You need to aggregate results from multiple sources
  • Maximizing throughput is critical

Choose Pipeline when:

  • Data flows through sequential transformation steps
  • Each stage can work on different items simultaneously
  • Memory efficiency matters (streaming data)

Choose Select Patterns when:

  • Coordinating multiple channel operations
  • Implementing timeouts or cancellation
  • Building responsive, event-driven systems

Choose Rate Limiting when:

  • Protecting APIs from abuse or overload
  • Ensuring fair resource distribution
  • Complying with external service rate limits

Choose Semaphore when:

  • Limiting access to scarce resources (DB connections, file handles)
  • Different operations require different resource weights
  • Fine-grained concurrency control is needed

🧠 Test Your Knowledge

What is the primary purpose of the worker pool pattern in Go?
Explanation

The worker pool pattern limits the number of concurrent goroutines to control resource usage and prevent overwhelming the system, making it ideal for handling high-volume task processing efficiently.

In the fan-out/fan-in pattern, what does the fan-in stage do?
Explanation

The fan-in pattern merges multiple output channels from parallel workers into a single channel, aggregating results for downstream processing or final consumption.

Which Go statement is specifically designed for handling multiple channel operations?
Explanation

The select statement in Go allows a goroutine to wait on multiple channel operations simultaneously, enabling timeout handling, non-blocking operations, and cancellation patterns.

What algorithm is commonly used for rate limiting in Go applications?
Explanation

The token bucket algorithm is the industry standard for rate limiting, allowing controlled burst traffic while maintaining an average rate. The golang.org/x/time/rate package implements this algorithm.

How can a buffered channel be used as a semaphore in Go?
Explanation

A buffered channel's capacity acts as a semaphore by limiting how many goroutines can acquire a resource at once. Sending to the channel acquires access, receiving releases it.


🎯 Hands-On Assignment: Build a Concurrent Web Scraper with Advanced Patterns πŸš€

πŸ“ Your Mission

Create a production-ready concurrent web scraper that combines worker pools, pipelines, rate limiting, and graceful shutdown. Your scraper will fetch multiple URLs, extract data, process it through a pipeline, and save resultsβ€”all while respecting rate limits and handling errors gracefully. This mirrors real-world systems used by companies like Google, Amazon, and web crawling services.

🎯 Requirements

  1. Implement a WebScraper with worker pool pattern:
    • Configurable number of worker goroutines
    • Job queue for URLs to scrape
    • Results channel for scraped data
    • Graceful shutdown with context cancellation
  2. Build a 3-stage processing pipeline:
    • Stage 1 (Fetch): Download HTML content from URLs
    • Stage 2 (Parse): Extract titles, links, and metadata
    • Stage 3 (Store): Save to file or simulate database storage
  3. Implement rate limiting:
    • Use golang.org/x/time/rate for token bucket
    • Limit to 10 requests per second with burst of 5
    • Handle rate limit errors gracefully
  4. Add semaphore for connection pooling:
    • Limit concurrent HTTP connections to 20
    • Use buffered channel or golang.org/x/sync/semaphore
  5. Implement advanced select patterns:
    • Timeout handling (5 seconds per request)
    • Context cancellation for graceful shutdown
    • Non-blocking result collection with default case

πŸ’‘ Implementation Hints

  1. Use net/http with custom Client and timeout configuration
  2. Create separate structs: URL, Content, ParsedData, StoredResult
  3. Use context.WithCancel for coordinated shutdown across all goroutines
  4. Implement sync.WaitGroup to wait for worker completion
  5. Use rate.NewLimiter(10, 5) for rate limiting
  6. Handle errors by sending them through error channel or logging
  7. Test with sites like httpbin.org for controlled testing

πŸš€ Example Input/Output

// Example: Concurrent Web Scraper Structure
package main

import (
    "context"
    "fmt"
    "golang.org/x/time/rate"
    "net/http"
    "sync"
    "time"
)

type URL struct {
    Address string
    Depth   int
}

type FetchedContent struct {
    URL     string
    Content string
    Error   error
}

type ParsedData struct {
    URL   string
    Title string
    Links []string
}

type Scraper struct {
    workerCount int
    limiter     *rate.Limiter
    jobs        chan URL
    results     chan ParsedData
    wg          sync.WaitGroup
    client      *http.Client
}

func NewScraper(workers int, rateLimit rate.Limit, burst int) *Scraper {
    return &Scraper{
        workerCount: workers,
        limiter:     rate.NewLimiter(rateLimit, burst),
        jobs:        make(chan URL, 100),
        results:     make(chan ParsedData, 100),
        client: &http.Client{
            Timeout: 5 * time.Second,
        },
    }
}

func (s *Scraper) Start(ctx context.Context) {
    for i := 0; i < s.workerCount; i++ {
        s.wg.Add(1)
        go s.worker(ctx, i)
    }
}

func (s *Scraper) worker(ctx context.Context, id int) {
    defer s.wg.Done()
    for {
        select {
        case <-ctx.Done():
            return
        case job, ok := <-s.jobs:
            if !ok {
                return
            }
            
            // Rate limiting
            if err := s.limiter.Wait(ctx); err != nil {
                fmt.Printf("Rate limit error: %v\\n", err)
                continue
            }
            
            // Fetch and parse (simplified)
            fmt.Printf("Worker %d: Scraping %s\\n", id, job.Address)
            
            // Simulate scraping with timeout
            fetchCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
            content, err := s.fetch(fetchCtx, job.Address)
            cancel()
            
            if err != nil {
                fmt.Printf("Error scraping %s: %v\\n", job.Address, err)
                continue
            }
            
            // Send result
            s.results <- ParsedData{
                URL:   job.Address,
                Title: content.Title,
                Links: content.Links,
            }
        }
    }
}

func (s *Scraper) Shutdown() {
    close(s.jobs)
    s.wg.Wait()
    close(s.results)
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()
    
    // Create scraper: 5 workers, 10 req/sec, burst of 5
    scraper := NewScraper(5, 10, 5)
    scraper.Start(ctx)
    
    // Submit URLs
    urls := []string{
        "https://example.com",
        "https://example.org",
        "https://httpbin.org/delay/1",
    }
    
    go func() {
        for _, url := range urls {
            scraper.jobs <- URL{Address: url, Depth: 0}
        }
        scraper.Shutdown()
    }()
    
    // Collect results
    for result := range scraper.results {
        fmt.Printf("βœ“ Scraped: %s - Title: %s, Links: %d\\n", 
            result.URL, result.Title, len(result.Links))
    }
}

πŸ† Bonus Challenges

  • Level 2: Add retry logic with exponential backoff for failed requests
  • Level 3: Implement fan-out/fan-in for parsing with multiple parsers
  • Level 4: Add depth-limited crawling by following extracted links
  • Level 5: Implement URL deduplication using concurrent-safe map or bloom filter
  • Level 6: Add metrics collection (requests/sec, errors, avg latency) with monitoring dashboard

πŸ“š Learning Goals

  • Master worker pool pattern for controlled concurrency 🎯
  • Build multi-stage pipelines for data processing ✨
  • Implement production-grade rate limiting πŸ”„
  • Use semaphores for resource pooling πŸ”—
  • Apply advanced select patterns for robust concurrent systems πŸ› οΈ
  • Handle graceful shutdown and context cancellation πŸ”’

πŸ’‘ Pro Tip: This architecture powers real-world systems! Google's crawler uses similar patterns, Shopify's background job processor employs worker pools, and Netflix's microservices use pipelines and rate limiting extensively. Companies like Stripe and Uber rely on these concurrency patterns for high-throughput, low-latency systems!

Share Your Solution! πŸ’¬

Completed the project? Post your code in the comments below! Show us your Go concurrency mastery! πŸš€βœ¨


Conclusion: Master Advanced Concurrency for Production Go Systems πŸŽ“

Advanced concurrency patterns are the foundation of high-performance Go applications, enabling systems to handle thousands of concurrent operations efficiently while maintaining responsiveness and resource control. By mastering worker pools for controlled parallelism, fan-out/fan-in for distributed work, pipelines for stream processing, select patterns for coordination, rate limiting for API protection, and semaphores for resource management, you’ll build production-ready concurrent systems that scale elegantly from prototypes to enterprise applications serving millions of users.

This post is licensed under CC BY 4.0 by the author.