WorkerPool
Production-grade bounded worker pools with context-aware submission, graceful shutdown, and comprehensive observability.
Features
- Bounded Execution: Configurable worker count and queue size for predictable resource usage 
- Context-Aware: All operations respect context cancellation and timeouts 
- Graceful Shutdown: Clean shutdown with - Close()and- Drain()methods
- Panic Recovery: Built-in panic handling with optional custom recovery handlers 
- Observability: Comprehensive metrics, logging, and tracing support 
- Task Wrapping: Optional task instrumentation and middleware support 
- Zero Dependencies: No external dependencies beyond the Go standard library 
Quick Start
Basic Usage
package main
import (
    "context"
    "fmt"
    "time"
    "github.com/kolosys/ion/workerpool"
)
func main() {
    // Create a pool with 4 workers and queue size of 20
    pool := workerpool.New(4, 20, workerpool.WithName("image-processor"))
    defer pool.Close(context.Background())
    // Submit work with context cancellation
    for i := 0; i < 100; i++ {
        taskID := i
        err := pool.Submit(context.Background(), func(ctx context.Context) error {
            // Simulate work
            time.Sleep(100 * time.Millisecond)
            fmt.Printf("Processed task %d\n", taskID)
            return nil
        })
        if err != nil {
            fmt.Printf("Failed to submit task %d: %v\n", taskID, err)
        }
    }
    // Graceful shutdown waits for completion
    pool.Drain(context.Background())
    fmt.Printf("Completed: %d tasks\n", pool.Metrics().Completed)
}Non-Blocking Submission
// TrySubmit returns immediately if the queue is full
err := pool.TrySubmit(func(ctx context.Context) error {
    return processData(ctx)
})
if err != nil {
    // Handle queue full or pool closed
    fmt.Printf("Submission failed: %v\n", err)
}Error Handling and Observability
// Custom logger
logger := &customLogger{}
pool := workerpool.New(2, 5,
    workerpool.WithName("api-processor"),
    workerpool.WithLogger(logger),
    workerpool.WithPanicRecovery(func(r any) {
        log.Printf("Task panicked: %v", r)
    }),
)
// Tasks that return errors are logged automatically
pool.Submit(ctx, func(ctx context.Context) error {
    if rand.Float64() < 0.1 {
        return errors.New("simulated error")
    }
    return nil
})API Reference
Pool Creation
func New(size, queueSize int, opts ...Option) *PoolCreates a new worker pool with the specified worker count and queue capacity.
Parameters:
- size: Number of worker goroutines (0 = GOMAXPROCS)
- queueSize: Maximum queued tasks (0 = unbounded)
- opts: Configuration options
Task Submission
func (p *Pool) Submit(ctx context.Context, task Task) error
func (p *Pool) TrySubmit(task Task) errorSubmit blocks until the task is queued or context is canceled. TrySubmit returns immediately if the queue is full.
Lifecycle Management
func (p *Pool) Close(ctx context.Context) error
func (p *Pool) Drain(ctx context.Context) errorClose immediately stops accepting new tasks and waits for workers to finish. Drain stops accepting new tasks and waits for the queue to empty.
Monitoring
func (p *Pool) Metrics() PoolMetrics
func (p *Pool) IsClosed() bool
func (p *Pool) IsDraining() boolConfiguration Options
Basic Options
workerpool.WithName("my-pool")                    // Set pool name for observability
workerpool.WithBaseContext(ctx)                  // Set base context for all tasks
workerpool.WithDrainTimeout(30*time.Second)      // Default timeout for Drain operationsObservability
workerpool.WithLogger(logger)                    // Custom logger
workerpool.WithMetrics(metrics)                  // Custom metrics recorder
workerpool.WithTracer(tracer)                    // Custom tracerAdvanced Features
workerpool.WithPanicRecovery(func(r any) {       // Custom panic handler
    log.Printf("Panic recovered: %v", r)
})
workerpool.WithTaskWrapper(func(task Task) Task { // Task instrumentation
    return func(ctx context.Context) error {
        start := time.Now()
        err := task(ctx)
        log.Printf("Task took %v", time.Since(start))
        return err
    }
})Metrics
The pool provides comprehensive runtime metrics:
type PoolMetrics struct {
    Size      int    // configured pool size
    Queued    int64  // current queue length
    Running   int64  // currently running tasks
    Completed uint64 // total completed tasks
    Failed    uint64 // total failed tasks
    Panicked  uint64 // total panicked tasks
}Error Handling
The workerpool package defines several error types for different failure scenarios:
- Pool Closed: Task submission to a closed pool 
- Queue Full: Non-blocking submission when queue is full 
- Context Canceled: Task submission canceled by context 
import "github.com/kolosys/ion/workerpool"
err := pool.Submit(ctx, task)
if err != nil {
    var poolErr *workerpool.PoolError
    if errors.As(err, &poolErr) {
        // Handle pool-specific errors
        fmt.Printf("Pool error: %v", poolErr)
    }
}Best Practices
Sizing Guidelines
- Workers: Start with - runtime.GOMAXPROCS(0)and adjust based on workload
- Queue Size: 2-5x worker count for CPU-bound tasks, higher for I/O-bound 
- Task Granularity: Aim for 1-100ms task duration for optimal throughput 
Resource Management
// Always ensure graceful shutdown
defer func() {
    ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()
    if err := pool.Drain(ctx); err != nil {
        log.Printf("Drain timeout: %v", err)
        pool.Close(context.Background())
    }
}()Context Usage
// Use context for task coordination
pool.Submit(ctx, func(taskCtx context.Context) error {
    select {
    case <-taskCtx.Done():
        return taskCtx.Err() // Respect cancellation
    case <-time.After(workDuration):
        return nil
    }
})Examples
- Basic Usage - Simple task processing 
- HTTP Request Processing - API endpoint with worker pool 
- Batch Processing - Large dataset processing 
Performance
Benchmark results on modern hardware:
- Submit: <200ns (uncontended), <1μs (high contention) 
- Throughput: 1M+ tasks/second 
- Memory: 0 allocations in steady state 
- Latency: <1ms p99 under load 
Thread Safety
All Pool methods are safe for concurrent use. Tasks execute concurrently in separate goroutines with proper synchronization.
Contributing
See the main CONTRIBUTING.md for guidelines.
License
Licensed under the MIT License.
Last updated
