Loading documentation...
Loading documentation...
Loading documentation...
This guide covers best practices for using Ion effectively in production systems.
All Ion components support context.Context for cancellation and timeouts. Always use it:
// Good: With timeout
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
result, err := cb.Execute(ctx, func(ctx context.Context) (any, error) {
return operation(ctx)
})Why: Context provides cancellation, timeouts, and request-scoped values essential for production systems.
Always check and handle errors returned by Ion components:
// Good: Explicit error handling
if err := pool.Submit(ctx, task); err != nil {
if workerpool.IsQueueFull(err) {
// Handle queue full
return handleQueueFull()
}
return fmt.Errorf("failed to submit task: %w", err)
}Why: Errors provide important information about component state and failures.
Always use defer for releasing resources:
// Good: Always released
if err := sem.Acquire(ctx, 1); err != nil {
return err
}
defer sem.Release(1)Why: Ensures resources are always released, even if errors occur.
Always configure observability in production:
// Good: Observability configured
obs := observe.New().
WithLogger(myLogger).
WithMetrics(myMetrics).
WithTracer(myTracer)
cb := circuit.New("service", circuit.WithObservability(obs))Why: Observability is essential for monitoring, debugging, and understanding system behavior.
Balance between sensitivity and false positives:
// For critical services: higher threshold
cb := circuit.New("payment-service",
circuit.WithFailureThreshold(10), // More tolerant
circuit.WithRecoveryTimeout(60*time.Second),
)
// For non-critical services: lower threshold
cb := circuit.New("analytics-service",
circuit.WithFailureThreshold(3), // More sensitive
circuit.WithRecoveryTimeout(10*time.Second),
)Distinguish between transient and permanent failures:
cb := circuit.New("http-client",
circuit.WithFailurePredicate(func(err error) bool {
// Only count 5xx errors and timeouts as failures
// 4xx errors (client errors) should not trip the circuit
if err == nil {
return false
}
// Check HTTP status code or error type
return isServerError(err) || isTimeout(err)
}),
)Log state changes for debugging:
cb := circuit.New("service",
circuit.WithStateChangeCallback(func(from, to circuit.State) {
logger.Info("circuit state changed",
"name", "service",
"from", from,
"to", to,
)
}),
)Don't expose circuit breaker internals to users:
result, err := cb.Execute(ctx, fn)
if err != nil {
if circuit.IsCircuitOpen(err) {
// User-friendly error
return errors.New("service temporarily unavailable, please try again later")
}
return err
}Set rates based on actual usage patterns:
// Analyze your traffic patterns first
// Then set rates accordingly
limiter := ratelimit.NewTokenBucket(
ratelimit.PerSecond(100), // Based on actual capacity
200, // Allow 2x burst
)Always use context with timeouts for WaitN:
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := limiter.WaitN(ctx, 1); err != nil {
if err == context.DeadlineExceeded {
// Handle timeout
}
return err
}// Token bucket for API client
apiLimiter := ratelimit.NewTokenBucket(ratelimit.PerSecond(10), 20)
// Leaky bucket for job processing
jobLimiter := ratelimit.NewLeakyBucket(ratelimit.PerSecond(5), 50)Provide clear error messages:
if !limiter.AllowN(time.Now(), 1) {
return errors.New("rate limit exceeded, please try again later")
}Use defer to ensure permits are always released:
if err := sem.Acquire(ctx, 1); err != nil {
return err
}
defer sem.Release(1) // Always releasedBase capacity on actual resource constraints:
// Based on database connection limit
sem := semaphore.NewWeighted(int64(db.MaxOpenConns()))
// Based on memory constraints
sem := semaphore.NewWeighted(calculateMemoryCapacity())// FIFO for database connections (fairness important)
dbSem := semaphore.NewWeighted(10, semaphore.WithFairness(semaphore.FIFO))
// None for high-performance operations
perfSem := semaphore.NewWeighted(100, semaphore.WithFairness(semaphore.None))Use weighted permits for operations with different resource needs:
sem := semaphore.NewWeighted(10)
// Small operation: 1 unit
sem.Acquire(ctx, 1)
// Large operation: 5 units
sem.Acquire(ctx, 5)Balance workers and queue size:
// Too few workers: underutilized
pool := workerpool.New(1, 100) // Bad
// Too many workers: resource contention
pool := workerpool.New(1000, 10) // Bad
// Balanced: based on workload
pool := workerpool.New(10, 100) // GoodUse defer to ensure pools are closed:
pool := workerpool.New(4, 20)
defer pool.Close(context.Background())Check for queue full errors:
if err := pool.Submit(ctx, task); err != nil {
if workerpool.IsQueueFull(err) {
// Handle: retry, reject, or backpressure
return handleQueueFull()
}
return err
}Always use panic recovery in production:
pool := workerpool.New(4, 20,
workerpool.WithPanicRecovery(func(r any) {
logger.Error("panic recovered", "panic", r)
// Report to error tracking service
}),
)Always check context in tasks:
pool.Submit(ctx, func(taskCtx context.Context) error {
select {
case <-taskCtx.Done():
return taskCtx.Err()
default:
// Process task
}
return nil
})Pass key-value pairs for better log analysis:
logger.Info("circuit state changed",
"name", "payment-service",
"from", "Closed",
"to", "Open",
"failures", 5,
)Use consistent naming conventions:
// Good: Consistent prefix and naming
metrics.Inc("ion_circuit_requests_total", "name", "payment-service")
metrics.Inc("ion_circuit_requests_failed", "name", "payment-service")
metrics.Histogram("ion_circuit_request_duration", duration, "name", "payment-service")Ensure context propagates through all operations:
result, err := cb.Execute(ctx, func(ctx context.Context) (any, error) {
// Context propagates to nested operations
return service.Call(ctx, req)
})Track important metrics:
Check for specific error types:
if err := pool.Submit(ctx, task); err != nil {
switch {
case workerpool.IsQueueFull(err):
// Handle queue full
case workerpool.IsPoolClosed(err):
// Handle pool closed
default:
// Handle other errors
}
}Add context to errors:
if err := sem.Acquire(ctx, 1); err != nil {
return fmt.Errorf("failed to acquire semaphore permit: %w", err)
}Provide user-friendly error messages:
result, err := cb.Execute(ctx, fn)
if err != nil {
if circuit.IsCircuitOpen(err) {
// User-friendly error
return errors.New("service temporarily unavailable")
}
// Log internal error
logger.Error("circuit breaker error", "error", err)
return errors.New("operation failed")
}Ion uses functional options for flexible configuration:
cb := circuit.New("service",
circuit.WithFailureThreshold(5),
circuit.WithRecoveryTimeout(30*time.Second),
circuit.WithLogger(myLogger),
circuit.WithMetrics(myMetrics),
)Create helpers for common configurations:
func NewPaymentServiceCircuit() circuit.CircuitBreaker {
return circuit.New("payment-service",
circuit.WithFailureThreshold(5),
circuit.WithRecoveryTimeout(30*time.Second),
circuit.WithHalfOpenMaxRequests(2),
circuit.WithHalfOpenSuccessThreshold(1),
)
}Validate configuration before use:
config := circuit.DefaultConfig()
config.FailureThreshold = 5
if err := config.Validate(); err != nil {
return fmt.Errorf("invalid configuration: %w", err)
}Always test error handling:
func TestCircuitBreaker_OpenState(t *testing.T) {
cb := circuit.New("test", circuit.WithFailureThreshold(1))
// Trip the circuit
cb.Execute(ctx, func(ctx context.Context) (any, error) {
return nil, errors.New("failure")
})
// Circuit should be open
_, err := cb.Execute(ctx, func(ctx context.Context) (any, error) {
return "success", nil
})
if !circuit.IsCircuitOpen(err) {
t.Error("expected circuit to be open")
}
}Test context cancellation:
func TestWorkerPool_ContextCancellation(t *testing.T) {
pool := workerpool.New(1, 10)
defer pool.Close(context.Background())
ctx, cancel := context.WithCancel(context.Background())
cancel() // Cancel immediately
err := pool.Submit(ctx, func(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(time.Second):
return nil
}
})
if err == nil {
t.Error("expected context cancellation error")
}
}Use test clocks for time-dependent tests:
func TestRateLimiter_WaitN(t *testing.T) {
clock := &testclock.FakeClock{}
limiter := ratelimit.NewTokenBucket(
ratelimit.PerSecond(1),
1,
ratelimit.WithClock(clock),
)
// Test time-dependent behavior
clock.Advance(time.Second)
}Reuse components when possible:
// Good: Reuse circuit breaker
var paymentCB circuit.CircuitBreaker
func init() {
paymentCB = circuit.New("payment-service")
}Don't over-provision workers:
// Good: Based on actual needs
pool := workerpool.New(runtime.NumCPU(), 100)Track performance metrics:
start := time.Now()
result, err := cb.Execute(ctx, fn)
duration := time.Since(start)
metrics.Histogram("operation_duration", duration.Seconds())Don't expose component internals in errors:
// Bad: Exposes internal state
return fmt.Errorf("circuit breaker error: %v", err)
// Good: User-friendly error
return errors.New("service temporarily unavailable")Validate inputs before using components:
if capacity <= 0 {
return fmt.Errorf("invalid capacity: %d", capacity)
}
sem := semaphore.NewWeighted(int64(capacity))This guide covers best practices for using Ion effectively in production systems.
All Ion components support context.Context for cancellation and timeouts. Always use it:
// Good: With timeout
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
result, err := cb.Execute(ctx, func(ctx context.Context) (any, error) {
return operation(ctx)
})Why: Context provides cancellation, timeouts, and request-scoped values essential for production systems.
Always check and handle errors returned by Ion components:
// Good: Explicit error handling
if err := pool.Submit(ctx, task); err != nil {
if workerpool.IsQueueFull(err) {
// Handle queue full
return handleQueueFull()
}
return fmt.Errorf("failed to submit task: %w", err)
}Why: Errors provide important information about component state and failures.
Always use defer for releasing resources:
// Good: Always released
if err := sem.Acquire(ctx, 1); err != nil {
return err
}
defer sem.Release(1)Why: Ensures resources are always released, even if errors occur.
Always configure observability in production:
// Good: Observability configured
obs := observe.New().
WithLogger(myLogger).
WithMetrics(myMetrics).
WithTracer(myTracer)
cb := circuit.New("service", circuit.WithObservability(obs))Why: Observability is essential for monitoring, debugging, and understanding system behavior.
Balance between sensitivity and false positives:
// For critical services: higher threshold
cb := circuit.New("payment-service",
circuit.WithFailureThreshold(10), // More tolerant
circuit.WithRecoveryTimeout(60*time.Second),
)
// For non-critical services: lower threshold
cb := circuit.New("analytics-service",
circuit.WithFailureThreshold(3), // More sensitive
circuit.WithRecoveryTimeout(10*time.Second),
)Distinguish between transient and permanent failures:
cb := circuit.New("http-client",
circuit.WithFailurePredicate(func(err error) bool {
// Only count 5xx errors and timeouts as failures
// 4xx errors (client errors) should not trip the circuit
if err == nil {
return false
}
// Check HTTP status code or error type
return isServerError(err) || isTimeout(err)
}),
)Log state changes for debugging:
cb := circuit.New("service",
circuit.WithStateChangeCallback(func(from, to circuit.State) {
logger.Info("circuit state changed",
"name", "service",
"from", from,
"to", to,
)
}),
)Don't expose circuit breaker internals to users:
result, err := cb.Execute(ctx, fn)
if err != nil {
if circuit.IsCircuitOpen(err) {
// User-friendly error
return errors.New("service temporarily unavailable, please try again later")
}
return err
}Set rates based on actual usage patterns:
// Analyze your traffic patterns first
// Then set rates accordingly
limiter := ratelimit.NewTokenBucket(
ratelimit.PerSecond(100), // Based on actual capacity
200, // Allow 2x burst
)Always use context with timeouts for WaitN:
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := limiter.WaitN(ctx, 1); err != nil {
if err == context.DeadlineExceeded {
// Handle timeout
}
return err
}// Token bucket for API client
apiLimiter := ratelimit.NewTokenBucket(ratelimit.PerSecond(10), 20)
// Leaky bucket for job processing
jobLimiter := ratelimit.NewLeakyBucket(ratelimit.PerSecond(5), 50)Provide clear error messages:
if !limiter.AllowN(time.Now(), 1) {
return errors.New("rate limit exceeded, please try again later")
}Use defer to ensure permits are always released:
if err := sem.Acquire(ctx, 1); err != nil {
return err
}
defer sem.Release(1) // Always releasedBase capacity on actual resource constraints:
// Based on database connection limit
sem := semaphore.NewWeighted(int64(db.MaxOpenConns()))
// Based on memory constraints
sem := semaphore.NewWeighted(calculateMemoryCapacity())// FIFO for database connections (fairness important)
dbSem := semaphore.NewWeighted(10, semaphore.WithFairness(semaphore.FIFO))
// None for high-performance operations
perfSem := semaphore.NewWeighted(100, semaphore.WithFairness(semaphore.None))Use weighted permits for operations with different resource needs:
sem := semaphore.NewWeighted(10)
// Small operation: 1 unit
sem.Acquire(ctx, 1)
// Large operation: 5 units
sem.Acquire(ctx, 5)Balance workers and queue size:
// Too few workers: underutilized
pool := workerpool.New(1, 100) // Bad
// Too many workers: resource contention
pool := workerpool.New(1000, 10) // Bad
// Balanced: based on workload
pool := workerpool.New(10, 100) // GoodUse defer to ensure pools are closed:
pool := workerpool.New(4, 20)
defer pool.Close(context.Background())Check for queue full errors:
if err := pool.Submit(ctx, task); err != nil {
if workerpool.IsQueueFull(err) {
// Handle: retry, reject, or backpressure
return handleQueueFull()
}
return err
}Always use panic recovery in production:
pool := workerpool.New(4, 20,
workerpool.WithPanicRecovery(func(r any) {
logger.Error("panic recovered", "panic", r)
// Report to error tracking service
}),
)Always check context in tasks:
pool.Submit(ctx, func(taskCtx context.Context) error {
select {
case <-taskCtx.Done():
return taskCtx.Err()
default:
// Process task
}
return nil
})Pass key-value pairs for better log analysis:
logger.Info("circuit state changed",
"name", "payment-service",
"from", "Closed",
"to", "Open",
"failures", 5,
)Use consistent naming conventions:
// Good: Consistent prefix and naming
metrics.Inc("ion_circuit_requests_total", "name", "payment-service")
metrics.Inc("ion_circuit_requests_failed", "name", "payment-service")
metrics.Histogram("ion_circuit_request_duration", duration, "name", "payment-service")Ensure context propagates through all operations:
result, err := cb.Execute(ctx, func(ctx context.Context) (any, error) {
// Context propagates to nested operations
return service.Call(ctx, req)
})Track important metrics:
Check for specific error types:
if err := pool.Submit(ctx, task); err != nil {
switch {
case workerpool.IsQueueFull(err):
// Handle queue full
case workerpool.IsPoolClosed(err):
// Handle pool closed
default:
// Handle other errors
}
}Add context to errors:
if err := sem.Acquire(ctx, 1); err != nil {
return fmt.Errorf("failed to acquire semaphore permit: %w", err)
}Provide user-friendly error messages:
result, err := cb.Execute(ctx, fn)
if err != nil {
if circuit.IsCircuitOpen(err) {
// User-friendly error
return errors.New("service temporarily unavailable")
}
// Log internal error
logger.Error("circuit breaker error", "error", err)
return errors.New("operation failed")
}Ion uses functional options for flexible configuration:
cb := circuit.New("service",
circuit.WithFailureThreshold(5),
circuit.WithRecoveryTimeout(30*time.Second),
circuit.WithLogger(myLogger),
circuit.WithMetrics(myMetrics),
)Create helpers for common configurations:
func NewPaymentServiceCircuit() circuit.CircuitBreaker {
return circuit.New("payment-service",
circuit.WithFailureThreshold(5),
circuit.WithRecoveryTimeout(30*time.Second),
circuit.WithHalfOpenMaxRequests(2),
circuit.WithHalfOpenSuccessThreshold(1),
)
}Validate configuration before use:
config := circuit.DefaultConfig()
config.FailureThreshold = 5
if err := config.Validate(); err != nil {
return fmt.Errorf("invalid configuration: %w", err)
}Always test error handling:
func TestCircuitBreaker_OpenState(t *testing.T) {
cb := circuit.New("test", circuit.WithFailureThreshold(1))
// Trip the circuit
cb.Execute(ctx, func(ctx context.Context) (any, error) {
return nil, errors.New("failure")
})
// Circuit should be open
_, err := cb.Execute(ctx, func(ctx context.Context) (any, error) {
return "success", nil
})
if !circuit.IsCircuitOpen(err) {
t.Error("expected circuit to be open")
}
}Test context cancellation:
func TestWorkerPool_ContextCancellation(t *testing.T) {
pool := workerpool.New(1, 10)
defer pool.Close(context.Background())
ctx, cancel := context.WithCancel(context.Background())
cancel() // Cancel immediately
err := pool.Submit(ctx, func(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(time.Second):
return nil
}
})
if err == nil {
t.Error("expected context cancellation error")
}
}Use test clocks for time-dependent tests:
func TestRateLimiter_WaitN(t *testing.T) {
clock := &testclock.FakeClock{}
limiter := ratelimit.NewTokenBucket(
ratelimit.PerSecond(1),
1,
ratelimit.WithClock(clock),
)
// Test time-dependent behavior
clock.Advance(time.Second)
}Reuse components when possible:
// Good: Reuse circuit breaker
var paymentCB circuit.CircuitBreaker
func init() {
paymentCB = circuit.New("payment-service")
}Don't over-provision workers:
// Good: Based on actual needs
pool := workerpool.New(runtime.NumCPU(), 100)Track performance metrics:
start := time.Now()
result, err := cb.Execute(ctx, fn)
duration := time.Since(start)
metrics.Histogram("operation_duration", duration.Seconds())Don't expose component internals in errors:
// Bad: Exposes internal state
return fmt.Errorf("circuit breaker error: %v", err)
// Good: User-friendly error
return errors.New("service temporarily unavailable")Validate inputs before using components:
if capacity <= 0 {
return fmt.Errorf("invalid capacity: %d", capacity)
}
sem := semaphore.NewWeighted(int64(capacity))// Good: With timeout
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
result, err := cb.Execute(ctx, func(ctx context.Context) (any, error) {
return operation(ctx)
})// Good: Explicit error handling
if err := pool.Submit(ctx, task); err != nil {
if workerpool.IsQueueFull(err) {
// Handle queue full
return handleQueueFull()
}
return fmt.Errorf("failed to submit task: %w", err)
}// Good: Always released
if err := sem.Acquire(ctx, 1); err != nil {
return err
}
defer sem.Release(1)// Good: Observability configured
obs := observe.New().
WithLogger(myLogger).
WithMetrics(myMetrics).
WithTracer(myTracer)
cb := circuit.New("service", circuit.WithObservability(obs))// For critical services: higher threshold
cb := circuit.New("payment-service",
circuit.WithFailureThreshold(10), // More tolerant
circuit.WithRecoveryTimeout(60*time.Second),
)
// For non-critical services: lower threshold
cb := circuit.New("analytics-service",
circuit.WithFailureThreshold(3), // More sensitive
circuit.WithRecoveryTimeout(10*time.Second),
)cb := circuit.New("http-client",
circuit.WithFailurePredicate(func(err error) bool {
// Only count 5xx errors and timeouts as failures
// 4xx errors (client errors) should not trip the circuit
if err == nil {
return false
}
// Check HTTP status code or error type
return isServerError(err) || isTimeout(err)
}),
)cb := circuit.New("service",
circuit.WithStateChangeCallback(func(from, to circuit.State) {
logger.Info("circuit state changed",
"name", "service",
"from", from,
"to", to,
)
}),
)result, err := cb.Execute(ctx, fn)
if err != nil {
if circuit.IsCircuitOpen(err) {
// User-friendly error
return errors.New("service temporarily unavailable, please try again later")
}
return err
}// Analyze your traffic patterns first
// Then set rates accordingly
limiter := ratelimit.NewTokenBucket(
ratelimit.PerSecond(100), // Based on actual capacity
200, // Allow 2x burst
)ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := limiter.WaitN(ctx, 1); err != nil {
if err == context.DeadlineExceeded {
// Handle timeout
}
return err
}// Token bucket for API client
apiLimiter := ratelimit.NewTokenBucket(ratelimit.PerSecond(10), 20)
// Leaky bucket for job processing
jobLimiter := ratelimit.NewLeakyBucket(ratelimit.PerSecond(5), 50)if !limiter.AllowN(time.Now(), 1) {
return errors.New("rate limit exceeded, please try again later")
}if err := sem.Acquire(ctx, 1); err != nil {
return err
}
defer sem.Release(1) // Always released// Based on database connection limit
sem := semaphore.NewWeighted(int64(db.MaxOpenConns()))
// Based on memory constraints
sem := semaphore.NewWeighted(calculateMemoryCapacity())// FIFO for database connections (fairness important)
dbSem := semaphore.NewWeighted(10, semaphore.WithFairness(semaphore.FIFO))
// None for high-performance operations
perfSem := semaphore.NewWeighted(100, semaphore.WithFairness(semaphore.None))sem := semaphore.NewWeighted(10)
// Small operation: 1 unit
sem.Acquire(ctx, 1)
// Large operation: 5 units
sem.Acquire(ctx, 5)// Too few workers: underutilized
pool := workerpool.New(1, 100) // Bad
// Too many workers: resource contention
pool := workerpool.New(1000, 10) // Bad
// Balanced: based on workload
pool := workerpool.New(10, 100) // Goodpool := workerpool.New(4, 20)
defer pool.Close(context.Background())if err := pool.Submit(ctx, task); err != nil {
if workerpool.IsQueueFull(err) {
// Handle: retry, reject, or backpressure
return handleQueueFull()
}
return err
}pool := workerpool.New(4, 20,
workerpool.WithPanicRecovery(func(r any) {
logger.Error("panic recovered", "panic", r)
// Report to error tracking service
}),
)pool.Submit(ctx, func(taskCtx context.Context) error {
select {
case <-taskCtx.Done():
return taskCtx.Err()
default:
// Process task
}
return nil
})logger.Info("circuit state changed",
"name", "payment-service",
"from", "Closed",
"to", "Open",
"failures", 5,
)// Good: Consistent prefix and naming
metrics.Inc("ion_circuit_requests_total", "name", "payment-service")
metrics.Inc("ion_circuit_requests_failed", "name", "payment-service")
metrics.Histogram("ion_circuit_request_duration", duration, "name", "payment-service")result, err := cb.Execute(ctx, func(ctx context.Context) (any, error) {
// Context propagates to nested operations
return service.Call(ctx, req)
})if err := pool.Submit(ctx, task); err != nil {
switch {
case workerpool.IsQueueFull(err):
// Handle queue full
case workerpool.IsPoolClosed(err):
// Handle pool closed
default:
// Handle other errors
}
}if err := sem.Acquire(ctx, 1); err != nil {
return fmt.Errorf("failed to acquire semaphore permit: %w", err)
}result, err := cb.Execute(ctx, fn)
if err != nil {
if circuit.IsCircuitOpen(err) {
// User-friendly error
return errors.New("service temporarily unavailable")
}
// Log internal error
logger.Error("circuit breaker error", "error", err)
return errors.New("operation failed")
}cb := circuit.New("service",
circuit.WithFailureThreshold(5),
circuit.WithRecoveryTimeout(30*time.Second),
circuit.WithLogger(myLogger),
circuit.WithMetrics(myMetrics),
)func NewPaymentServiceCircuit() circuit.CircuitBreaker {
return circuit.New("payment-service",
circuit.WithFailureThreshold(5),
circuit.WithRecoveryTimeout(30*time.Second),
circuit.WithHalfOpenMaxRequests(2),
circuit.WithHalfOpenSuccessThreshold(1),
)
}config := circuit.DefaultConfig()
config.FailureThreshold = 5
if err := config.Validate(); err != nil {
return fmt.Errorf("invalid configuration: %w", err)
}func TestCircuitBreaker_OpenState(t *testing.T) {
cb := circuit.New("test", circuit.WithFailureThreshold(1))
// Trip the circuit
cb.Execute(ctx, func(ctx context.Context) (any, error) {
return nil, errors.New("failure")
})
// Circuit should be open
_, err := cb.Execute(ctx, func(ctx context.Context) (any, error) {
return "success", nil
})
if !circuit.IsCircuitOpen(err) {
t.Error("expected circuit to be open")
}
}func TestWorkerPool_ContextCancellation(t *testing.T) {
pool := workerpool.New(1, 10)
defer pool.Close(context.Background())
ctx, cancel := context.WithCancel(context.Background())
cancel() // Cancel immediately
err := pool.Submit(ctx, func(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(time.Second):
return nil
}
})
if err == nil {
t.Error("expected context cancellation error")
}
}func TestRateLimiter_WaitN(t *testing.T) {
clock := &testclock.FakeClock{}
limiter := ratelimit.NewTokenBucket(
ratelimit.PerSecond(1),
1,
ratelimit.WithClock(clock),
)
// Test time-dependent behavior
clock.Advance(time.Second)
}// Good: Reuse circuit breaker
var paymentCB circuit.CircuitBreaker
func init() {
paymentCB = circuit.New("payment-service")
}// Good: Based on actual needs
pool := workerpool.New(runtime.NumCPU(), 100)start := time.Now()
result, err := cb.Execute(ctx, fn)
duration := time.Since(start)
metrics.Histogram("operation_duration", duration.Seconds())// Bad: Exposes internal state
return fmt.Errorf("circuit breaker error: %v", err)
// Good: User-friendly error
return errors.New("service temporarily unavailable")if capacity <= 0 {
return fmt.Errorf("invalid capacity: %d", capacity)
}
sem := semaphore.NewWeighted(int64(capacity))// Good: With timeout
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
result, err := cb.Execute(ctx, func(ctx context.Context) (any, error) {
return operation(ctx)
})// Good: Explicit error handling
if err := pool.Submit(ctx, task); err != nil {
if workerpool.IsQueueFull(err) {
// Handle queue full
return handleQueueFull()
}
return fmt.Errorf("failed to submit task: %w", err)
}// Good: Always released
if err := sem.Acquire(ctx, 1); err != nil {
return err
}
defer sem.Release(1)// Good: Observability configured
obs := observe.New().
WithLogger(myLogger).
WithMetrics(myMetrics).
WithTracer(myTracer)
cb := circuit.New("service", circuit.WithObservability(obs))// For critical services: higher threshold
cb := circuit.New("payment-service",
circuit.WithFailureThreshold(10), // More tolerant
circuit.WithRecoveryTimeout(60*time.Second),
)
// For non-critical services: lower threshold
cb := circuit.New("analytics-service",
circuit.WithFailureThreshold(3), // More sensitive
circuit.WithRecoveryTimeout(10*time.Second),
)cb := circuit.New("http-client",
circuit.WithFailurePredicate(func(err error) bool {
// Only count 5xx errors and timeouts as failures
// 4xx errors (client errors) should not trip the circuit
if err == nil {
return false
}
// Check HTTP status code or error type
return isServerError(err) || isTimeout(err)
}),
)cb := circuit.New("service",
circuit.WithStateChangeCallback(func(from, to circuit.State) {
logger.Info("circuit state changed",
"name", "service",
"from", from,
"to", to,
)
}),
)result, err := cb.Execute(ctx, fn)
if err != nil {
if circuit.IsCircuitOpen(err) {
// User-friendly error
return errors.New("service temporarily unavailable, please try again later")
}
return err
}// Analyze your traffic patterns first
// Then set rates accordingly
limiter := ratelimit.NewTokenBucket(
ratelimit.PerSecond(100), // Based on actual capacity
200, // Allow 2x burst
)ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := limiter.WaitN(ctx, 1); err != nil {
if err == context.DeadlineExceeded {
// Handle timeout
}
return err
}// Token bucket for API client
apiLimiter := ratelimit.NewTokenBucket(ratelimit.PerSecond(10), 20)
// Leaky bucket for job processing
jobLimiter := ratelimit.NewLeakyBucket(ratelimit.PerSecond(5), 50)if !limiter.AllowN(time.Now(), 1) {
return errors.New("rate limit exceeded, please try again later")
}if err := sem.Acquire(ctx, 1); err != nil {
return err
}
defer sem.Release(1) // Always released// Based on database connection limit
sem := semaphore.NewWeighted(int64(db.MaxOpenConns()))
// Based on memory constraints
sem := semaphore.NewWeighted(calculateMemoryCapacity())// FIFO for database connections (fairness important)
dbSem := semaphore.NewWeighted(10, semaphore.WithFairness(semaphore.FIFO))
// None for high-performance operations
perfSem := semaphore.NewWeighted(100, semaphore.WithFairness(semaphore.None))sem := semaphore.NewWeighted(10)
// Small operation: 1 unit
sem.Acquire(ctx, 1)
// Large operation: 5 units
sem.Acquire(ctx, 5)// Too few workers: underutilized
pool := workerpool.New(1, 100) // Bad
// Too many workers: resource contention
pool := workerpool.New(1000, 10) // Bad
// Balanced: based on workload
pool := workerpool.New(10, 100) // Goodpool := workerpool.New(4, 20)
defer pool.Close(context.Background())if err := pool.Submit(ctx, task); err != nil {
if workerpool.IsQueueFull(err) {
// Handle: retry, reject, or backpressure
return handleQueueFull()
}
return err
}pool := workerpool.New(4, 20,
workerpool.WithPanicRecovery(func(r any) {
logger.Error("panic recovered", "panic", r)
// Report to error tracking service
}),
)pool.Submit(ctx, func(taskCtx context.Context) error {
select {
case <-taskCtx.Done():
return taskCtx.Err()
default:
// Process task
}
return nil
})logger.Info("circuit state changed",
"name", "payment-service",
"from", "Closed",
"to", "Open",
"failures", 5,
)// Good: Consistent prefix and naming
metrics.Inc("ion_circuit_requests_total", "name", "payment-service")
metrics.Inc("ion_circuit_requests_failed", "name", "payment-service")
metrics.Histogram("ion_circuit_request_duration", duration, "name", "payment-service")result, err := cb.Execute(ctx, func(ctx context.Context) (any, error) {
// Context propagates to nested operations
return service.Call(ctx, req)
})if err := pool.Submit(ctx, task); err != nil {
switch {
case workerpool.IsQueueFull(err):
// Handle queue full
case workerpool.IsPoolClosed(err):
// Handle pool closed
default:
// Handle other errors
}
}if err := sem.Acquire(ctx, 1); err != nil {
return fmt.Errorf("failed to acquire semaphore permit: %w", err)
}result, err := cb.Execute(ctx, fn)
if err != nil {
if circuit.IsCircuitOpen(err) {
// User-friendly error
return errors.New("service temporarily unavailable")
}
// Log internal error
logger.Error("circuit breaker error", "error", err)
return errors.New("operation failed")
}cb := circuit.New("service",
circuit.WithFailureThreshold(5),
circuit.WithRecoveryTimeout(30*time.Second),
circuit.WithLogger(myLogger),
circuit.WithMetrics(myMetrics),
)func NewPaymentServiceCircuit() circuit.CircuitBreaker {
return circuit.New("payment-service",
circuit.WithFailureThreshold(5),
circuit.WithRecoveryTimeout(30*time.Second),
circuit.WithHalfOpenMaxRequests(2),
circuit.WithHalfOpenSuccessThreshold(1),
)
}config := circuit.DefaultConfig()
config.FailureThreshold = 5
if err := config.Validate(); err != nil {
return fmt.Errorf("invalid configuration: %w", err)
}func TestCircuitBreaker_OpenState(t *testing.T) {
cb := circuit.New("test", circuit.WithFailureThreshold(1))
// Trip the circuit
cb.Execute(ctx, func(ctx context.Context) (any, error) {
return nil, errors.New("failure")
})
// Circuit should be open
_, err := cb.Execute(ctx, func(ctx context.Context) (any, error) {
return "success", nil
})
if !circuit.IsCircuitOpen(err) {
t.Error("expected circuit to be open")
}
}func TestWorkerPool_ContextCancellation(t *testing.T) {
pool := workerpool.New(1, 10)
defer pool.Close(context.Background())
ctx, cancel := context.WithCancel(context.Background())
cancel() // Cancel immediately
err := pool.Submit(ctx, func(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(time.Second):
return nil
}
})
if err == nil {
t.Error("expected context cancellation error")
}
}func TestRateLimiter_WaitN(t *testing.T) {
clock := &testclock.FakeClock{}
limiter := ratelimit.NewTokenBucket(
ratelimit.PerSecond(1),
1,
ratelimit.WithClock(clock),
)
// Test time-dependent behavior
clock.Advance(time.Second)
}// Good: Reuse circuit breaker
var paymentCB circuit.CircuitBreaker
func init() {
paymentCB = circuit.New("payment-service")
}// Good: Based on actual needs
pool := workerpool.New(runtime.NumCPU(), 100)start := time.Now()
result, err := cb.Execute(ctx, fn)
duration := time.Since(start)
metrics.Histogram("operation_duration", duration.Seconds())// Bad: Exposes internal state
return fmt.Errorf("circuit breaker error: %v", err)
// Good: User-friendly error
return errors.New("service temporarily unavailable")if capacity <= 0 {
return fmt.Errorf("invalid capacity: %d", capacity)
}
sem := semaphore.NewWeighted(int64(capacity))