32. Advanced Concurrency Patterns
π Master Go concurrency! Learn worker pools, fan-out/fan-in, pipelines, select patterns, rate limiting, and semaphores. Build scalable concurrent systems! β¨
What we will learn in this post?
- π Worker Pool Pattern
- π Fan-Out and Fan-In Patterns
- π Pipeline Pattern
- π Select Statement Patterns
- π Rate Limiting and Throttling
- π Semaphore Pattern
Understanding the Worker Pool Pattern π οΈ
The worker pool pattern is a great way to manage tasks in a program. It helps you control how many tasks run at the same time, making your application efficient and responsive. Letβs break it down! This pattern is fundamental in production systems handling thousands of concurrent requests, from web servers to data processing pipelines.
What is a Worker Pool? π€
A worker pool consists of:
- Workers: These are the threads or processes that do the work.
- Job Queue: This is where tasks wait to be processed.
How It Works π
- Job Distribution: When a task is added to the queue, a worker picks it up and starts working on it.
- Concurrency Control: You can limit the number of workers to control how many tasks run at once.
- Graceful Shutdown: Workers can finish their tasks before stopping, ensuring no work is lost.
- Backpressure Handling: If the queue gets too full, you can pause adding new tasks until some are completed.
Worker Pool Architecture π
graph TD
A["π Job Queue"]:::style1 --> B{"βοΈ Dispatcher"}:::style2
B --> C["π· Worker 1"]:::style3
B --> D["π· Worker 2"]:::style3
B --> E["π· Worker 3"]:::style3
B --> F["π· Worker N"]:::style3
C --> G["β
Results Channel"]:::style4
D --> G
E --> G
F --> G
G --> H["π Output"]:::style5
classDef style1 fill:#ff4f81,stroke:#c43e3e,color:#fff,font-size:16px,stroke-width:3px,rx:14,shadow:6px;
classDef style2 fill:#6b5bff,stroke:#4a3f6b,color:#fff,font-size:16px,stroke-width:3px,rx:14,shadow:6px;
classDef style3 fill:#ffd700,stroke:#d99120,color:#222,font-size:16px,stroke-width:3px,rx:14,shadow:6px;
classDef style4 fill:#00bfae,stroke:#005f99,color:#fff,font-size:16px,stroke-width:3px,rx:14,shadow:6px;
classDef style5 fill:#ff9800,stroke:#f57c00,color:#fff,font-size:16px,stroke-width:3px,rx:14,shadow:6px;
linkStyle default stroke:#e67e22,stroke-width:3px;
Example Implementation π»
Hereβs a simple example in Python:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import queue
import threading
import time
def worker(job_queue):
while True:
job = job_queue.get()
if job is None:
break
print(f"Processing {job}")
time.sleep(1) # Simulate work
job_queue.task_done()
job_queue = queue.Queue()
num_workers = 4
# Start workers
threads = []
for _ in range(num_workers):
t = threading.Thread(target=worker, args=(job_queue,))
t.start()
threads.append(t)
# Add jobs
for job in range(10):
job_queue.put(job)
# Wait for all jobs to finish
job_queue.join()
# Stop workers
for _ in range(num_workers):
job_queue.put(None)
for t in threads:
t.join()
Key Benefits π
- Efficiency: Makes better use of resources.
- Scalability: Easy to add more workers.
- Control: Manage how tasks are processed.
Real-World Example: Image Processing Service π―
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package main
import (
"context"
"fmt"
"log"
"sync"
"time"
)
type Image struct {
ID string
URL string
Format string
}
type ProcessedImage struct {
ID string
ThumbnailURL string
Error error
}
// ImageProcessor handles image processing with worker pool
type ImageProcessor struct {
workerCount int
jobs chan Image
results chan ProcessedImage
wg sync.WaitGroup
}
func NewImageProcessor(workerCount, bufferSize int) *ImageProcessor {
return &ImageProcessor{
workerCount: workerCount,
jobs: make(chan Image, bufferSize),
results: make(chan ProcessedImage, bufferSize),
}
}
func (ip *ImageProcessor) Start(ctx context.Context) {
for i := 0; i < ip.workerCount; i++ {
ip.wg.Add(1)
go ip.worker(ctx, i)
}
}
func (ip *ImageProcessor) worker(ctx context.Context, id int) {
defer ip.wg.Done()
for {
select {
case <-ctx.Done():
log.Printf("Worker %d: shutting down\n", id)
return
case job, ok := <-ip.jobs:
if !ok {
return
}
log.Printf("Worker %d: processing image %s\n", id, job.ID)
// Simulate image processing
time.Sleep(500 * time.Millisecond)
ip.results <- ProcessedImage{
ID: job.ID,
ThumbnailURL: fmt.Sprintf("https://cdn.example.com/thumb/%s", job.ID),
Error: nil,
}
}
}
}
func (ip *ImageProcessor) Submit(img Image) {
ip.jobs <- img
}
func (ip *ImageProcessor) Shutdown() {
close(ip.jobs)
ip.wg.Wait()
close(ip.results)
}
func (ip *ImageProcessor) Results() <-chan ProcessedImage {
return ip.results
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// Create processor with 3 workers
processor := NewImageProcessor(3, 10)
processor.Start(ctx)
// Submit images for processing
images := []Image{
{ID: "img-1", URL: "https://example.com/photo1.jpg", Format: "jpg"},
{ID: "img-2", URL: "https://example.com/photo2.png", Format: "png"},
{ID: "img-3", URL: "https://example.com/photo3.jpg", Format: "jpg"},
{ID: "img-4", URL: "https://example.com/photo4.jpg", Format: "jpg"},
{ID: "img-5", URL: "https://example.com/photo5.png", Format: "png"},
}
go func() {
for _, img := range images {
processor.Submit(img)
}
processor.Shutdown()
}()
// Collect results
for result := range processor.Results() {
if result.Error != nil {
log.Printf("Error processing %s: %v\n", result.ID, result.Error)
} else {
fmt.Printf("β Processed %s -> %s\n", result.ID, result.ThumbnailURL)
}
}
}
Understanding Fan-Out and Fan-In Patterns
Fan-out and fan-in are essential patterns for distributing work and collecting results in concurrent systems, used extensively in MapReduce frameworks and distributed computing.
Fan-Out and Fan-In Visualization π
graph LR
A["π₯ Input Stream"]:::style1 --> B["π Fan-Out"]:::style2
B --> C["βοΈ Worker 1"]:::style3
B --> D["βοΈ Worker 2"]:::style3
B --> E["βοΈ Worker 3"]:::style3
C --> F["π Fan-In"]:::style4
D --> F
E --> F
F --> G["π€ Aggregated Output"]:::style5
classDef style1 fill:#ff4f81,stroke:#c43e3e,color:#fff,font-size:16px,stroke-width:3px,rx:14,shadow:6px;
classDef style2 fill:#6b5bff,stroke:#4a3f6b,color:#fff,font-size:16px,stroke-width:3px,rx:14,shadow:6px;
classDef style3 fill:#ffd700,stroke:#d99120,color:#222,font-size:16px,stroke-width:3px,rx:14,shadow:6px;
classDef style4 fill:#00bfae,stroke:#005f99,color:#fff,font-size:16px,stroke-width:3px,rx:14,shadow:6px;
classDef style5 fill:#ff9800,stroke:#f57c00,color:#fff,font-size:16px,stroke-width:3px,rx:14,shadow:6px;
linkStyle default stroke:#e67e22,stroke-width:3px;
What is Fan-Out? π
The fan-out pattern allows you to distribute work across multiple goroutines. This is useful for tasks that can be done in parallel.
Example of Fan-Out
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package main
import (
"fmt"
"sync"
)
func worker(id int, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("Worker %d is processing\n", id)
}
func main() {
var wg sync.WaitGroup
for i := 1; i <= 5; i++ {
wg.Add(1)
go worker(i, &wg)
}
wg.Wait()
}
What is Fan-In? π₯
The fan-in pattern collects results from multiple goroutines into a single channel. This helps in aggregating results efficiently.
Example of Fan-In
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package main
import (
"fmt"
)
func square(n int, ch chan int) {
ch <- n * n
}
func main() {
ch := make(chan int)
for i := 1; i <= 5; i++ {
go square(i, ch)
}
for i := 1; i <= 5; i++ {
fmt.Println(<-ch)
}
}
Combining Fan-Out and Fan-In π
You can combine both patterns to create a pipeline for parallel processing.
Example of Combined Patterns
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package main
import (
"fmt"
"sync"
)
func worker(id int, ch chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
ch <- id * id
}
func main() {
var wg sync.WaitGroup
ch := make(chan int, 5)
for i := 1; i <= 5; i++ {
wg.Add(1)
go worker(i, ch, &wg)
}
go func() {
wg.Wait()
close(ch)
}()
for result := range ch {
fmt.Println(result)
}
}
Real-World Example: Distributed Log Processing π―
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
type LogEntry struct {
Timestamp time.Time
Level string
Message string
}
type ProcessedLog struct {
Entry LogEntry
WordCount int
Priority int
}
// Fan-Out: Distribute logs to multiple processors
func logGenerator(count int) <-chan LogEntry {
outs := make(chan LogEntry)
go func() {
defer close(outs)
levels := []string{"INFO", "WARN", "ERROR", "DEBUG"}
for i := 0; i < count; i++ {
outs <- LogEntry{
Timestamp: time.Now(),
Level: levels[rand.Intn(len(levels))],
Message: fmt.Sprintf("Log message %d with some content", i),
}
time.Sleep(50 * time.Millisecond)
}
}()
return outs
}
// Processing stage
func processLogs(in <-chan LogEntry) <-chan ProcessedLog {
out := make(chan ProcessedLog)
go func() {
defer close(out)
for entry := range in {
// Process log entry
wordCount := len(entry.Message)
priority := 0
switch entry.Level {
case "ERROR":
priority = 3
case "WARN":
priority = 2
case "INFO":
priority = 1
}
out <- ProcessedLog{
Entry: entry,
WordCount: wordCount,
Priority: priority,
}
}
}()
return out
}
// Fan-In: Merge multiple processing channels
func merge(channels ...<-chan ProcessedLog) <-chan ProcessedLog {
out := make(chan ProcessedLog)
var wg sync.WaitGroup
output := func(c <-chan ProcessedLog) {
defer wg.Done()
for log := range c {
out <- log
}
}
wg.Add(len(channels))
for _, ch := range channels {
go output(ch)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
// Generate logs
logs := logGenerator(20)
// Fan-Out: Create multiple processors
processor1 := processLogs(logs)
processor2 := processLogs(logs)
processor3 := processLogs(logs)
// Fan-In: Merge results
merged := merge(processor1, processor2, processor3)
// Consume results
for processed := range merged {
fmt.Printf("[%s] Priority: %d, Words: %d - %s\n",
processed.Entry.Level,
processed.Priority,
processed.WordCount,
processed.Entry.Message)
}
}
Building Concurrent Pipelines with Stages π
Creating a data processing pipeline can be fun and efficient! Letβs break it down into simple parts: stages, channels, and how they all work together. Pipelines are the backbone of stream processing systems like Apache Kafka and real-time data analytics platforms.
What are Stages and Channels? π
- Stages: Think of these as steps in a recipe. Each stage does a specific job, like filtering or transforming data.
- Channels: These are like pipes that connect the stages. They carry data from one stage to the next.
Pipeline Stages Flow π
graph LR
A["π¬ Generator"]:::style1 --> B["βοΈ Transform Stage 1"]:::style2
B --> C["π§ Transform Stage 2"]:::style3
C --> D["π Filter Stage"]:::style4
D --> E["π Sink Stage"]:::style5
classDef style1 fill:#ff4f81,stroke:#c43e3e,color:#fff,font-size:16px,stroke-width:3px,rx:14,shadow:6px;
classDef style2 fill:#6b5bff,stroke:#4a3f6b,color:#fff,font-size:16px,stroke-width:3px,rx:14,shadow:6px;
classDef style3 fill:#ffd700,stroke:#d99120,color:#222,font-size:16px,stroke-width:3px,rx:14,shadow:6px;
classDef style4 fill:#00bfae,stroke:#005f99,color:#fff,font-size:16px,stroke-width:3px,rx:14,shadow:6px;
classDef style5 fill:#ff9800,stroke:#f57c00,color:#fff,font-size:16px,stroke-width:3px,rx:14,shadow:6px;
linkStyle default stroke:#e67e22,stroke-width:3px;
The Generator Pattern π
This pattern helps us create data in a way that can be processed one piece at a time. Itβs like a conveyor belt in a factory!
Processing Stages and Sink Stage π οΈ
- Processing Stages: Each stage processes data and sends it to the next stage.
- Sink Stage: This is the final stage where we collect the results.
Example Implementation π»
Hereβs a simple example in Python:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import asyncio
async def stage_one(channel):
for i in range(5):
await channel.put(i)
await channel.put(None) # Signal completion
async def stage_two(channel_in, channel_out):
while True:
item = await channel_in.get()
if item is None:
await channel_out.put(None) # Signal completion
break
await channel_out.put(item * 2)
async def sink(channel):
while True:
item = await channel.get()
if item is None:
break
print(f"Processed item: {item}")
async def main():
channel1 = asyncio.Queue()
channel2 = asyncio.Queue()
await asyncio.gather(
stage_one(channel1),
stage_two(channel1, channel2),
sink(channel2)
)
asyncio.run(main())
Key Points π
- Use goroutines for non-blocking operations.
- Close channels properly to avoid hanging processes.
- Each stage can be independently developed and tested.
Real-World Example: ETL Data Pipeline π―
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package main
import (
"encoding/json"
"fmt"
"strings"
"time"
)
type RawData struct {
ID int
Content string
Timestamp time.Time
}
type CleanedData struct {
ID int
Content string
Words int
}
type EnrichedData struct {
ID int
Content string
Words int
Category string
ProcessedAt time.Time
}
// Extract stage: Read raw data
func extract(count int) <-chan RawData {
out := make(chan RawData)
go func() {
defer close(out)
for i := 1; i <= count; i++ {
out <- RawData{
ID: i,
Content: fmt.Sprintf(" Raw DATA %d with EXTRA spaces ", i),
Timestamp: time.Now(),
}
time.Sleep(100 * time.Millisecond)
}
}()
return out
}
// Transform stage: Clean and normalize data
func transform(in <-chan RawData) <-chan CleanedData {
out := make(chan CleanedData)
go func() {
defer close(out)
for data := range in {
cleaned := strings.TrimSpace(data.Content)
cleaned = strings.ToLower(cleaned)
wordCount := len(strings.Fields(cleaned))
out <- CleanedData{
ID: data.ID,
Content: cleaned,
Words: wordCount,
}
}
}()
return out
}
// Enrich stage: Add metadata and categorize
func enrich(in <-chan CleanedData) <-chan EnrichedData {
out := make(chan EnrichedData)
go func() {
defer close(out)
for data := range in {
category := "short"
if data.Words > 5 {
category = "long"
}
out <- EnrichedData{
ID: data.ID,
Content: data.Content,
Words: data.Words,
Category: category,
ProcessedAt: time.Now(),
}
}
}()
return out
}
// Load stage: Store enriched data
func load(in <-chan EnrichedData) {
for data := range in {
jsonData, _ := json.MarshalIndent(data, "", " ")
fmt.Printf("Loaded to database:\n%s\n\n", jsonData)
}
}
func main() {
fmt.Println("Starting ETL Pipeline...\n")
// Build ETL pipeline
rawData := extract(5)
cleanedData := transform(rawData)
enrichedData := enrich(cleanedData)
load(enrichedData)
fmt.Println("Pipeline completed!")
}
Advanced Select Patterns in Go
Understanding Select in Go
The select statement in Go is a powerful tool for handling multiple channel operations. Here are some advanced patterns to enhance your Go programming skills! π The select statement is crucial for building responsive, non-blocking concurrent systems.
1. Timeout Handling β³
You can use a time.After channel to implement timeouts. If an operation takes too long, you can handle it gracefully.
1
2
3
4
5
6
select {
case result := <-ch:
// Handle result
case <-time.After(2 * time.Second):
// Handle timeout
}
2. Default Case for Non-blocking Operations π¦
Using a default case allows you to perform non-blocking operations.
1
2
3
4
5
6
select {
case result := <-ch:
// Handle result
default:
// Do something else if no data is ready
}
3. Nil Channel Tricks π
You can use a nil channel to skip a case in select.
1
2
3
4
var ch chan int
select {
case <-ch: // This case is skipped
}
4. Select with Done Channel for Cancellation β
You can use a done channel to signal cancellation.
1
2
3
4
5
6
7
done := make(chan struct{})
select {
case <-done:
// Handle cancellation
case result := <-ch:
// Handle result
}
5. Combining Multiple Channel Operations π
You can combine multiple channels in a single select.
1
2
3
4
5
6
select {
case result1 := <-ch1:
// Handle result from ch1
case result2 := <-ch2:
// Handle result from ch2
}
Implementing Rate Limiting in Go π¦
Rate limiting is essential for controlling how many requests your API can handle. It helps prevent abuse and ensures fair usage. Letβs explore how to implement it in Go using the token bucket algorithm and the golang.org/x/time/rate package.
What is Rate Limiting?
Rate limiting restricts the number of requests a user can make in a given time frame. This is crucial for:
- Preventing server overload
- Ensuring fair access for all users
- Protecting against abuse
Using the Token Bucket Algorithm
The token bucket algorithm allows for burst traffic while maintaining an average rate. Hereβs how it works:
- Tokens are added to a bucket at a fixed rate.
- Each request consumes a token.
- If the bucket is empty, requests are denied until tokens are available.
Implementation Example
Hereβs a simple example using Go:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package main
import (
"golang.org/x/time/rate"
"time"
"fmt"
)
func main() {
limiter := rate.NewLimiter(1, 5) // 1 request per second, burst of 5
for i := 0; i < 10; i++ {
if err := limiter.Wait(context.Background()); err != nil {
fmt.Println("Rate limit exceeded:", err)
continue
}
fmt.Println("API request", i)
}
}
Handling Burst Traffic
With the above setup, you can handle bursts of up to 5 requests at once, while maintaining a steady rate of 1 request per second.
Using Semaphores in Go π¦
Semaphores help control access to resources in concurrent programming. Letβs explore how to use them in Go!
What is a Semaphore?
A semaphore is a signaling mechanism that limits the number of goroutines accessing a resource.
Buffered Channels as Semaphores
You can use buffered channels to create a simple semaphore:
1
2
3
4
5
6
7
8
9
sem := make(chan struct{}, 3) // Limit to 3 concurrent accesses
for i := 0; i < 10; i++ {
sem <- struct{}{} // Acquire
go func(i int) {
defer func() { <-sem }() // Release
// Access resource
}(i)
}
Weighted Semaphores
Weighted semaphores allow different weights for goroutines. You can implement this with a custom structure.
Using golang.org/x/sync/semaphore
This package provides a robust semaphore implementation. Hereβs how to use it:
1
2
3
4
5
6
7
8
9
10
11
12
13
import "golang.org/x/sync/semaphore"
sem := semaphore.NewWeighted(3) // Limit to 3
for i := 0; i < 10; i++ {
if err := sem.Acquire(ctx, 1); err != nil {
// Handle error
}
go func(i int) {
defer sem.Release(1) // Release
// Access resource
}(i)
}
Key Points
- Control Access: Semaphores limit concurrent access to resources.
- Buffered Channels: Simple way to implement semaphores.
- Weighted Semaphores: Allow different access levels.
golang.org/x/sync/semaphore: A powerful package for semaphores.
Concurrency Patterns Comparison π
Choosing the right concurrency pattern is crucial for building efficient systems. Hereβs a comprehensive comparison:
| Pattern | Use Case | Pros | Cons | Best For |
|---|---|---|---|---|
| Worker Pool | Fixed number of goroutines processing tasks | β
Controlled resource usage β Predictable performance β Easy backpressure handling | β Fixed capacity β May underutilize with low load | API servers, batch processing, image processing |
| Fan-Out/Fan-In | Distribute work, collect results | β
Parallel processing β High throughput β Load distribution | β Complex coordination β Result ordering challenges | MapReduce, log aggregation, data analysis |
| Pipeline | Sequential data transformation | β
Composable stages β Stream processing β Memory efficient | β Sequential bottlenecks β Complex error handling | ETL, data streams, video processing |
| Select Patterns | Multiple channel coordination | β
Timeout handling β Cancellation support β Non-blocking ops | β Can be complex β Requires careful design | Event handling, multiplexing, responsive systems |
| Rate Limiting | API throttling, resource protection | β
Prevents overload β Fair resource usage β Burst support | β May reject requests β Adds latency | Public APIs, external service calls, DDoS protection |
| Semaphore | Limit concurrent resource access | β
Fine-grained control β Weighted access β Flexible limits | β Potential deadlocks β Requires discipline | Connection pools, file handles, memory limits |
When to Use Each Pattern π―
Choose Worker Pool when:
- You have a high volume of independent tasks
- Resource usage must be predictable and controlled
- Tasks have similar processing time
Choose Fan-Out/Fan-In when:
- Work can be split into independent parallel operations
- You need to aggregate results from multiple sources
- Maximizing throughput is critical
Choose Pipeline when:
- Data flows through sequential transformation steps
- Each stage can work on different items simultaneously
- Memory efficiency matters (streaming data)
Choose Select Patterns when:
- Coordinating multiple channel operations
- Implementing timeouts or cancellation
- Building responsive, event-driven systems
Choose Rate Limiting when:
- Protecting APIs from abuse or overload
- Ensuring fair resource distribution
- Complying with external service rate limits
Choose Semaphore when:
- Limiting access to scarce resources (DB connections, file handles)
- Different operations require different resource weights
- Fine-grained concurrency control is needed
π§ Test Your Knowledge
What is the primary purpose of the worker pool pattern in Go?
The worker pool pattern limits the number of concurrent goroutines to control resource usage and prevent overwhelming the system, making it ideal for handling high-volume task processing efficiently.
In the fan-out/fan-in pattern, what does the fan-in stage do?
The fan-in pattern merges multiple output channels from parallel workers into a single channel, aggregating results for downstream processing or final consumption.
Which Go statement is specifically designed for handling multiple channel operations?
The select statement in Go allows a goroutine to wait on multiple channel operations simultaneously, enabling timeout handling, non-blocking operations, and cancellation patterns.
What algorithm is commonly used for rate limiting in Go applications?
The token bucket algorithm is the industry standard for rate limiting, allowing controlled burst traffic while maintaining an average rate. The golang.org/x/time/rate package implements this algorithm.
How can a buffered channel be used as a semaphore in Go?
A buffered channel's capacity acts as a semaphore by limiting how many goroutines can acquire a resource at once. Sending to the channel acquires access, receiving releases it.
π― Hands-On Assignment: Build a Concurrent Web Scraper with Advanced Patterns π
π Your Mission
Create a production-ready concurrent web scraper that combines worker pools, pipelines, rate limiting, and graceful shutdown. Your scraper will fetch multiple URLs, extract data, process it through a pipeline, and save resultsβall while respecting rate limits and handling errors gracefully. This mirrors real-world systems used by companies like Google, Amazon, and web crawling services.π― Requirements
- Implement a
WebScraperwith worker pool pattern:- Configurable number of worker goroutines
- Job queue for URLs to scrape
- Results channel for scraped data
- Graceful shutdown with context cancellation
- Build a 3-stage processing pipeline:
- Stage 1 (Fetch): Download HTML content from URLs
- Stage 2 (Parse): Extract titles, links, and metadata
- Stage 3 (Store): Save to file or simulate database storage
- Implement rate limiting:
- Use
golang.org/x/time/ratefor token bucket - Limit to 10 requests per second with burst of 5
- Handle rate limit errors gracefully
- Use
- Add semaphore for connection pooling:
- Limit concurrent HTTP connections to 20
- Use buffered channel or
golang.org/x/sync/semaphore
- Implement advanced select patterns:
- Timeout handling (5 seconds per request)
- Context cancellation for graceful shutdown
- Non-blocking result collection with default case
π‘ Implementation Hints
- Use
net/httpwith customClientand timeout configuration - Create separate structs:
URL,Content,ParsedData,StoredResult - Use
context.WithCancelfor coordinated shutdown across all goroutines - Implement
sync.WaitGroupto wait for worker completion - Use
rate.NewLimiter(10, 5)for rate limiting - Handle errors by sending them through error channel or logging
- Test with sites like
httpbin.orgfor controlled testing
π Example Input/Output
// Example: Concurrent Web Scraper Structure
package main
import (
"context"
"fmt"
"golang.org/x/time/rate"
"net/http"
"sync"
"time"
)
type URL struct {
Address string
Depth int
}
type FetchedContent struct {
URL string
Content string
Error error
}
type ParsedData struct {
URL string
Title string
Links []string
}
type Scraper struct {
workerCount int
limiter *rate.Limiter
jobs chan URL
results chan ParsedData
wg sync.WaitGroup
client *http.Client
}
func NewScraper(workers int, rateLimit rate.Limit, burst int) *Scraper {
return &Scraper{
workerCount: workers,
limiter: rate.NewLimiter(rateLimit, burst),
jobs: make(chan URL, 100),
results: make(chan ParsedData, 100),
client: &http.Client{
Timeout: 5 * time.Second,
},
}
}
func (s *Scraper) Start(ctx context.Context) {
for i := 0; i < s.workerCount; i++ {
s.wg.Add(1)
go s.worker(ctx, i)
}
}
func (s *Scraper) worker(ctx context.Context, id int) {
defer s.wg.Done()
for {
select {
case <-ctx.Done():
return
case job, ok := <-s.jobs:
if !ok {
return
}
// Rate limiting
if err := s.limiter.Wait(ctx); err != nil {
fmt.Printf("Rate limit error: %v\\n", err)
continue
}
// Fetch and parse (simplified)
fmt.Printf("Worker %d: Scraping %s\\n", id, job.Address)
// Simulate scraping with timeout
fetchCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
content, err := s.fetch(fetchCtx, job.Address)
cancel()
if err != nil {
fmt.Printf("Error scraping %s: %v\\n", job.Address, err)
continue
}
// Send result
s.results <- ParsedData{
URL: job.Address,
Title: content.Title,
Links: content.Links,
}
}
}
}
func (s *Scraper) Shutdown() {
close(s.jobs)
s.wg.Wait()
close(s.results)
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// Create scraper: 5 workers, 10 req/sec, burst of 5
scraper := NewScraper(5, 10, 5)
scraper.Start(ctx)
// Submit URLs
urls := []string{
"https://example.com",
"https://example.org",
"https://httpbin.org/delay/1",
}
go func() {
for _, url := range urls {
scraper.jobs <- URL{Address: url, Depth: 0}
}
scraper.Shutdown()
}()
// Collect results
for result := range scraper.results {
fmt.Printf("β Scraped: %s - Title: %s, Links: %d\\n",
result.URL, result.Title, len(result.Links))
}
}
π Bonus Challenges
- Level 2: Add retry logic with exponential backoff for failed requests
- Level 3: Implement fan-out/fan-in for parsing with multiple parsers
- Level 4: Add depth-limited crawling by following extracted links
- Level 5: Implement URL deduplication using concurrent-safe map or bloom filter
- Level 6: Add metrics collection (requests/sec, errors, avg latency) with monitoring dashboard
π Learning Goals
- Master worker pool pattern for controlled concurrency π―
- Build multi-stage pipelines for data processing β¨
- Implement production-grade rate limiting π
- Use semaphores for resource pooling π
- Apply advanced select patterns for robust concurrent systems π οΈ
- Handle graceful shutdown and context cancellation π
π‘ Pro Tip: This architecture powers real-world systems! Google's crawler uses similar patterns, Shopify's background job processor employs worker pools, and Netflix's microservices use pipelines and rate limiting extensively. Companies like Stripe and Uber rely on these concurrency patterns for high-throughput, low-latency systems!
Share Your Solution! π¬
Completed the project? Post your code in the comments below! Show us your Go concurrency mastery! πβ¨
Conclusion: Master Advanced Concurrency for Production Go Systems π
Advanced concurrency patterns are the foundation of high-performance Go applications, enabling systems to handle thousands of concurrent operations efficiently while maintaining responsiveness and resource control. By mastering worker pools for controlled parallelism, fan-out/fan-in for distributed work, pipelines for stream processing, select patterns for coordination, rate limiting for API protection, and semaphores for resource management, youβll build production-ready concurrent systems that scale elegantly from prototypes to enterprise applications serving millions of users.