Loading documentation...
Loading documentation...
Loading documentation...
Complete API documentation for the workerpool package.
Import Path: github.com/kolosys/ion/workerpool
Package workerpool provides a bounded worker pool with context-aware submission, graceful shutdown, and observability hooks.
Option configures pool behavior
// Example usage of Option
var value Option
// Initialize with appropriate valuetype Option func(*config)WithBaseContext sets the base context for the pool. All task contexts will be derived from this context.
func WithBaseContext(ctx context.Context) OptionParameters:
ctx (context.Context)Returns:
WithDrainTimeout sets the default timeout for Drain operations
func WithDrainTimeout(timeout time.Duration) OptionParameters:
timeout (time.Duration)Returns:
WithLogger sets the logger for observability
func WithLogger(logger observe.Logger) OptionParameters:
logger (observe.Logger)Returns:
WithMetrics sets the metrics recorder for observability
func WithMetrics(metrics observe.Metrics) OptionParameters:
metrics (observe.Metrics)Returns:
WithName sets the pool name for observability and error reporting
func WithName(name string) OptionParameters:
name (string)Returns:
WithPanicRecovery sets a custom panic handler for task execution. If not set, panics are recovered and counted in metrics.
func WithPanicRecovery(handler func(any)) OptionParameters:
handler (func(any))Returns:
WithTaskWrapper sets a function to wrap tasks for instrumentation. The wrapper is applied to every submitted task.
func WithTaskWrapper(wrapper func(Task) Task) OptionParameters:
wrapper (func(Task) Task)Returns:
WithTracer sets the tracer for observability
func WithTracer(tracer observe.Tracer) OptionParameters:
tracer (observe.Tracer)Returns:
Pool represents a bounded worker pool that executes tasks with controlled concurrency and queue management.
// Create a new Pool
pool := Pool{
}type Pool struct {
}New creates a new worker pool with the specified size and queue capacity. size determines the number of worker goroutines. queueSize determines the maximum number of queued tasks.
func New(size, queueSize int, opts ...Option) *PoolParameters:
size (int)queueSize (int)opts (...Option)Returns:
Close immediately stops accepting new tasks and signals all workers to stop. It waits for currently running tasks to complete unless the provided context is canceled or times out. If the context expires, workers are asked to stop via task context cancellation.
func (*Pool) Close(ctx context.Context) errorParameters:
ctx (context.Context)Returns:
Drain prevents new task submissions and waits for the queue to empty and all currently running tasks to complete. Unlike Close, Drain allows queued tasks to continue being processed until the queue is empty.
func (*Pool) Drain(ctx context.Context) errorParameters:
ctx (context.Context)Returns:
GetName returns the name of the pool
func (*Pool) GetName() stringParameters: None
Returns:
GetQueueSize returns the queue size of the pool
func (*Pool) GetQueueSize() intParameters: None
Returns:
GetSize returns the size of the pool
func (*Pool) GetSize() intParameters: None
Returns:
IsClosed returns true if the pool has been closed or is in the process of closing
func (*Pool) IsClosed() boolParameters: None
Returns:
IsDraining returns true if the pool is in draining mode (not accepting new tasks but still processing queued tasks)
func (*Pool) IsDraining() boolParameters: None
Returns:
Metrics returns a snapshot of the current pool metrics
func (*Pool) Metrics() PoolMetricsParameters: None
Returns:
Submit submits a task to the pool for execution. It respects the provided context for cancellation and timeouts. If the context is canceled before the task can be queued, it returns the context error wrapped. If the pool is closed or draining, it returns an appropriate error.
func (*Pool) Submit(ctx context.Context, task Task) errorParameters:
ctx (context.Context)task (Task)Returns:
TrySubmit attempts to submit a task to the pool without blocking. It returns true if the task was successfully queued, false if the queue is full or the pool is closed/draining. It does not respect context cancellation since it returns immediately.
func (*Pool) TrySubmit(task Task) errorParameters:
task (Task)Returns:
PoolError represents workerpool-specific errors with context
// Create a new PoolError
poolerror := PoolError{
Op: "example",
PoolName: "example",
Err: error{},
}type PoolError struct {
Op string
PoolName string
Err error
}| Field | Type | Description |
|---|---|---|
| Op | string | operation that failed |
| PoolName | string | name of the pool |
| Err | error | underlying error |
func (*PoolError) Error() stringParameters: None
Returns:
func (*PoolError) Unwrap() errorParameters: None
Returns:
PoolMetrics holds runtime metrics for the pool
// Create a new PoolMetrics
poolmetrics := PoolMetrics{
Size: 42,
Queued: 42,
Running: 42,
Completed: 42,
Failed: 42,
Panicked: 42,
}type PoolMetrics struct {
Size int
Queued int64
Running int64
Completed uint64
Failed uint64
Panicked uint64
}| Field | Type | Description |
|---|---|---|
| Size | int | configured pool size |
| Queued | int64 | current queue length |
| Running | int64 | currently running tasks |
| Completed | uint64 | total completed tasks |
| Failed | uint64 | total failed tasks |
| Panicked | uint64 | total panicked tasks |
Task represents a unit of work to be executed by the worker pool. Tasks receive a context that will be canceled if either the submission context or the pool's base context is canceled.
// Example usage of Task
var value Task
// Initialize with appropriate valuetype Task func(ctx context.Context) errorNewPoolClosedError creates an error indicating the pool is closed
func NewPoolClosedError(poolName string) errorParameters:
| Parameter | Type | Description |
|---|---|---|
poolName | string |
Returns:
| Type | Description |
|---|---|
error |
Example:
// Example usage of NewPoolClosedError
result := NewPoolClosedError(/* parameters */)NewQueueFullError creates an error indicating the queue is full
func NewQueueFullError(poolName string, queueSize int) errorParameters:
| Parameter | Type | Description |
|---|---|---|
poolName | string | |
queueSize | int |
Returns:
| Type | Description |
|---|---|
error |
Example:
// Example usage of NewQueueFullError
result := NewQueueFullError(/* parameters */)Complete API documentation for the workerpool package.
Import Path: github.com/kolosys/ion/workerpool
Package workerpool provides a bounded worker pool with context-aware submission, graceful shutdown, and observability hooks.
Option configures pool behavior
// Example usage of Option
var value Option
// Initialize with appropriate valuetype Option func(*config)WithBaseContext sets the base context for the pool. All task contexts will be derived from this context.
func WithBaseContext(ctx context.Context) OptionParameters:
ctx (context.Context)Returns:
WithDrainTimeout sets the default timeout for Drain operations
func WithDrainTimeout(timeout time.Duration) OptionParameters:
timeout (time.Duration)Returns:
WithLogger sets the logger for observability
func WithLogger(logger observe.Logger) OptionParameters:
logger (observe.Logger)Returns:
WithMetrics sets the metrics recorder for observability
func WithMetrics(metrics observe.Metrics) OptionParameters:
metrics (observe.Metrics)Returns:
WithName sets the pool name for observability and error reporting
func WithName(name string) OptionParameters:
name (string)Returns:
WithPanicRecovery sets a custom panic handler for task execution. If not set, panics are recovered and counted in metrics.
func WithPanicRecovery(handler func(any)) OptionParameters:
handler (func(any))Returns:
WithTaskWrapper sets a function to wrap tasks for instrumentation. The wrapper is applied to every submitted task.
func WithTaskWrapper(wrapper func(Task) Task) OptionParameters:
wrapper (func(Task) Task)Returns:
WithTracer sets the tracer for observability
func WithTracer(tracer observe.Tracer) OptionParameters:
tracer (observe.Tracer)Returns:
Pool represents a bounded worker pool that executes tasks with controlled concurrency and queue management.
// Create a new Pool
pool := Pool{
}type Pool struct {
}New creates a new worker pool with the specified size and queue capacity. size determines the number of worker goroutines. queueSize determines the maximum number of queued tasks.
func New(size, queueSize int, opts ...Option) *PoolParameters:
size (int)queueSize (int)opts (...Option)Returns:
Close immediately stops accepting new tasks and signals all workers to stop. It waits for currently running tasks to complete unless the provided context is canceled or times out. If the context expires, workers are asked to stop via task context cancellation.
func (*Pool) Close(ctx context.Context) errorParameters:
ctx (context.Context)Returns:
Drain prevents new task submissions and waits for the queue to empty and all currently running tasks to complete. Unlike Close, Drain allows queued tasks to continue being processed until the queue is empty.
func (*Pool) Drain(ctx context.Context) errorParameters:
ctx (context.Context)Returns:
GetName returns the name of the pool
func (*Pool) GetName() stringParameters: None
Returns:
GetQueueSize returns the queue size of the pool
func (*Pool) GetQueueSize() intParameters: None
Returns:
GetSize returns the size of the pool
func (*Pool) GetSize() intParameters: None
Returns:
IsClosed returns true if the pool has been closed or is in the process of closing
func (*Pool) IsClosed() boolParameters: None
Returns:
IsDraining returns true if the pool is in draining mode (not accepting new tasks but still processing queued tasks)
func (*Pool) IsDraining() boolParameters: None
Returns:
Metrics returns a snapshot of the current pool metrics
func (*Pool) Metrics() PoolMetricsParameters: None
Returns:
Submit submits a task to the pool for execution. It respects the provided context for cancellation and timeouts. If the context is canceled before the task can be queued, it returns the context error wrapped. If the pool is closed or draining, it returns an appropriate error.
func (*Pool) Submit(ctx context.Context, task Task) errorParameters:
ctx (context.Context)task (Task)Returns:
TrySubmit attempts to submit a task to the pool without blocking. It returns true if the task was successfully queued, false if the queue is full or the pool is closed/draining. It does not respect context cancellation since it returns immediately.
func (*Pool) TrySubmit(task Task) errorParameters:
task (Task)Returns:
PoolError represents workerpool-specific errors with context
// Create a new PoolError
poolerror := PoolError{
Op: "example",
PoolName: "example",
Err: error{},
}type PoolError struct {
Op string
PoolName string
Err error
}| Field | Type | Description |
|---|---|---|
| Op | string | operation that failed |
| PoolName | string | name of the pool |
| Err | error | underlying error |
func (*PoolError) Error() stringParameters: None
Returns:
func (*PoolError) Unwrap() errorParameters: None
Returns:
PoolMetrics holds runtime metrics for the pool
// Create a new PoolMetrics
poolmetrics := PoolMetrics{
Size: 42,
Queued: 42,
Running: 42,
Completed: 42,
Failed: 42,
Panicked: 42,
}type PoolMetrics struct {
Size int
Queued int64
Running int64
Completed uint64
Failed uint64
Panicked uint64
}| Field | Type | Description |
|---|---|---|
| Size | int | configured pool size |
| Queued | int64 | current queue length |
| Running | int64 | currently running tasks |
| Completed | uint64 | total completed tasks |
| Failed | uint64 | total failed tasks |
| Panicked | uint64 | total panicked tasks |
Task represents a unit of work to be executed by the worker pool. Tasks receive a context that will be canceled if either the submission context or the pool's base context is canceled.
// Example usage of Task
var value Task
// Initialize with appropriate valuetype Task func(ctx context.Context) errorNewPoolClosedError creates an error indicating the pool is closed
func NewPoolClosedError(poolName string) errorParameters:
| Parameter | Type | Description |
|---|---|---|
poolName | string |
Returns:
| Type | Description |
|---|---|
error |
Example:
// Example usage of NewPoolClosedError
result := NewPoolClosedError(/* parameters */)NewQueueFullError creates an error indicating the queue is full
func NewQueueFullError(poolName string, queueSize int) errorParameters:
| Parameter | Type | Description |
|---|---|---|
poolName | string | |
queueSize | int |
Returns:
| Type | Description |
|---|---|
error |
Example:
// Example usage of NewQueueFullError
result := NewQueueFullError(/* parameters */)// Example usage of Option
var value Option
// Initialize with appropriate valuetype Option func(*config)func WithBaseContext(ctx context.Context) Optionfunc WithDrainTimeout(timeout time.Duration) Optionfunc WithLogger(logger observe.Logger) Optionfunc WithMetrics(metrics observe.Metrics) Optionfunc WithName(name string) Optionfunc WithPanicRecovery(handler func(any)) Optionfunc WithTaskWrapper(wrapper func(Task) Task) Optionfunc WithTracer(tracer observe.Tracer) Option// Create a new Pool
pool := Pool{
}type Pool struct {
}func New(size, queueSize int, opts ...Option) *Poolfunc (*Pool) Close(ctx context.Context) errorfunc (*Pool) Drain(ctx context.Context) errorfunc (*Pool) GetName() stringfunc (*Pool) GetQueueSize() intfunc (*Pool) GetSize() intfunc (*Pool) IsClosed() boolfunc (*Pool) IsDraining() boolfunc (*Pool) Metrics() PoolMetricsfunc (*Pool) Submit(ctx context.Context, task Task) errorfunc (*Pool) TrySubmit(task Task) error// Create a new PoolError
poolerror := PoolError{
Op: "example",
PoolName: "example",
Err: error{},
}type PoolError struct {
Op string
PoolName string
Err error
}func (*PoolError) Error() stringfunc (*PoolError) Unwrap() error// Create a new PoolMetrics
poolmetrics := PoolMetrics{
Size: 42,
Queued: 42,
Running: 42,
Completed: 42,
Failed: 42,
Panicked: 42,
}type PoolMetrics struct {
Size int
Queued int64
Running int64
Completed uint64
Failed uint64
Panicked uint64
}// Example usage of Task
var value Task
// Initialize with appropriate valuetype Task func(ctx context.Context) errorfunc NewPoolClosedError(poolName string) error// Example usage of NewPoolClosedError
result := NewPoolClosedError(/* parameters */)func NewQueueFullError(poolName string, queueSize int) error// Example usage of NewQueueFullError
result := NewQueueFullError(/* parameters */)// Example usage of Option
var value Option
// Initialize with appropriate valuetype Option func(*config)func WithBaseContext(ctx context.Context) Optionfunc WithDrainTimeout(timeout time.Duration) Optionfunc WithLogger(logger observe.Logger) Optionfunc WithMetrics(metrics observe.Metrics) Optionfunc WithName(name string) Optionfunc WithPanicRecovery(handler func(any)) Optionfunc WithTaskWrapper(wrapper func(Task) Task) Optionfunc WithTracer(tracer observe.Tracer) Option// Create a new Pool
pool := Pool{
}type Pool struct {
}func New(size, queueSize int, opts ...Option) *Poolfunc (*Pool) Close(ctx context.Context) errorfunc (*Pool) Drain(ctx context.Context) errorfunc (*Pool) GetName() stringfunc (*Pool) GetQueueSize() intfunc (*Pool) GetSize() intfunc (*Pool) IsClosed() boolfunc (*Pool) IsDraining() boolfunc (*Pool) Metrics() PoolMetricsfunc (*Pool) Submit(ctx context.Context, task Task) errorfunc (*Pool) TrySubmit(task Task) error// Create a new PoolError
poolerror := PoolError{
Op: "example",
PoolName: "example",
Err: error{},
}type PoolError struct {
Op string
PoolName string
Err error
}func (*PoolError) Error() stringfunc (*PoolError) Unwrap() error// Create a new PoolMetrics
poolmetrics := PoolMetrics{
Size: 42,
Queued: 42,
Running: 42,
Completed: 42,
Failed: 42,
Panicked: 42,
}type PoolMetrics struct {
Size int
Queued int64
Running int64
Completed uint64
Failed uint64
Panicked uint64
}// Example usage of Task
var value Task
// Initialize with appropriate valuetype Task func(ctx context.Context) errorfunc NewPoolClosedError(poolName string) error// Example usage of NewPoolClosedError
result := NewPoolClosedError(/* parameters */)func NewQueueFullError(poolName string, queueSize int) error// Example usage of NewQueueFullError
result := NewQueueFullError(/* parameters */)