Loading documentation...
Loading documentation...
Loading documentation...
Import Path: github.com/kolosys/ion/semaphore
Semaphores control access to shared resources with weighted permits and configurable fairness modes. They're essential for managing concurrent access to limited resources like database connections, file handles, or memory.
Semaphores allow a fixed number of concurrent operations. When all permits are acquired, additional requests must wait until permits are released.
┌─────────────────────┐
│ Semaphore │
│ Capacity: 10 │
├─────────────────────┤
│ Available: 7 │
│ Acquired: 3 │
│ Waiting: 2 │
└─────────────────────┘Semaphores support weighted permits, allowing operations to acquire multiple permits:
sem := semaphore.NewWeighted(10) // Total capacity: 10
// Acquire 1 permit
sem.Acquire(ctx, 1)
// Acquire 3 permits (for larger operations)
sem.Acquire(ctx, 3)
// Release permits
sem.Release(1)
sem.Release(3)Try to acquire permits without blocking:
if sem.TryAcquire(1) {
// Permit acquired
defer sem.Release(1)
// Use resource
} else {
// No permits available
}Wait for permits with context support:
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := sem.Acquire(ctx, 1); err != nil {
// Context canceled or timeout
return err
}
defer sem.Release(1)
// Use resourceLimit concurrent database connections:
package main
import (
"context"
"database/sql"
"fmt"
"github.com/kolosys/ion/semaphore"
_ "github.com/lib/pq"
)
type PooledDB struct {
db *sql.DB
sem semaphore.Semaphore
}
func NewPooledDB(dsn string, maxConnections int) (*PooledDB, error) {
db, err := sql.Open("postgres", dsn)
if err != nil {
return nil, err
}
db.SetMaxOpenConns(maxConnections)
return &PooledDB{
db: db,
sem: semaphore.NewWeighted(int64(maxConnections),
semaphore.WithName("db-pool"),
semaphore.WithFairness(semaphore.FIFO),
),
}, nil
}
func (p *PooledDB) Query(ctx context.Context, query string, args ...any) (*sql.Rows, error) {
// Acquire a connection permit
if err := p.sem.Acquire(ctx, 1); err != nil {
return nil, fmt.Errorf("failed to acquire connection: %w", err)
}
defer p.sem.Release(1)
// Use the connection
return p.db.QueryContext(ctx, query, args...)
}
func (p *PooledDB) Exec(ctx context.Context, query string, args ...any) (sql.Result, error) {
if err := p.sem.Acquire(ctx, 1); err != nil {
return nil, fmt.Errorf("failed to acquire connection: %w", err)
}
defer p.sem.Release(1)
return p.db.ExecContext(ctx, query, args...)
}
func main() {
db, err := NewPooledDB("postgres://...", 10)
if err != nil {
panic(err)
}
ctx := context.Background()
rows, err := db.Query(ctx, "SELECT * FROM users")
if err != nil {
panic(err)
}
defer rows.Close()
}Control concurrent file operations:
package main
import (
"context"
"fmt"
"os"
"github.com/kolosys/ion/semaphore"
)
type FileProcessor struct {
sem semaphore.Semaphore
}
func NewFileProcessor(maxConcurrent int) *FileProcessor {
return &FileProcessor{
sem: semaphore.NewWeighted(int64(maxConcurrent),
semaphore.WithName("file-processor"),
),
}
}
func (fp *FileProcessor) ProcessFile(ctx context.Context, path string) error {
// Acquire permit for file operation
if err := fp.sem.Acquire(ctx, 1); err != nil {
return fmt.Errorf("failed to acquire file permit: %w", err)
}
defer fp.sem.Release(1)
// Process file
file, err := os.Open(path)
if err != nil {
return err
}
defer file.Close()
// Read and process file
fmt.Printf("Processing %s
", path)
return nil
}Limit memory-intensive operations:
package main
import (
"context"
"fmt"
"github.com/kolosys/ion/semaphore"
)
type ImageProcessor struct {
sem semaphore.Semaphore
}
func NewImageProcessor(maxConcurrent int) *ImageProcessor {
// Each image processing operation uses significant memory
// Limit concurrent operations to prevent OOM
return &ImageProcessor{
sem: semaphore.NewWeighted(int64(maxConcurrent),
semaphore.WithName("image-processor"),
),
}
}
func (ip *ImageProcessor) ProcessImage(ctx context.Context, imageData []byte) error {
// Acquire permit (each image uses ~50MB)
// With capacity 4, max memory usage is ~200MB
if err := ip.sem.Acquire(ctx, 1); err != nil {
return fmt.Errorf("failed to acquire processing permit: %w", err)
}
defer ip.sem.Release(1)
// Process image (memory-intensive)
fmt.Printf("Processing image (%d bytes)
", len(imageData))
return nil
}Use weighted permits for operations with different resource requirements:
package main
import (
"context"
"fmt"
"github.com/kolosys/ion/semaphore"
)
type ResourceManager struct {
sem semaphore.Semaphore
}
func NewResourceManager() *ResourceManager {
// Total capacity: 10 units
return &ResourceManager{
sem: semaphore.NewWeighted(10,
semaphore.WithName("resource-manager"),
),
}
}
func (rm *ResourceManager) ProcessSmall(ctx context.Context) error {
// Small operation: 1 unit
if err := rm.sem.Acquire(ctx, 1); err != nil {
return err
}
defer rm.sem.Release(1)
fmt.Println("Processing small operation")
return nil
}
func (rm *ResourceManager) ProcessLarge(ctx context.Context) error {
// Large operation: 5 units
if err := rm.sem.Acquire(ctx, 5); err != nil {
return err
}
defer rm.sem.Release(5)
fmt.Println("Processing large operation")
return nil
}
func main() {
rm := NewResourceManager()
ctx := context.Background()
// Can run 10 small operations concurrently
// Or 2 large operations concurrently
// Or mix: 5 small + 1 large = 10 units
rm.ProcessSmall(ctx)
rm.ProcessLarge(ctx)
}Use semaphore to limit concurrent API calls:
package main
import (
"context"
"fmt"
"time"
"github.com/kolosys/ion/semaphore"
)
type APIClient struct {
sem semaphore.Semaphore
}
func NewAPIClient(maxConcurrent int) *APIClient {
return &APIClient{
sem: semaphore.NewWeighted(int64(maxConcurrent),
semaphore.WithName("api-client"),
semaphore.WithFairness(semaphore.FIFO),
),
}
}
func (c *APIClient) CallAPI(ctx context.Context, endpoint string) error {
// Limit concurrent API calls
if err := c.sem.Acquire(ctx, 1); err != nil {
return fmt.Errorf("failed to acquire API permit: %w", err)
}
defer c.sem.Release(1)
// Make API call
fmt.Printf("Calling %s
", endpoint)
time.Sleep(100 * time.Millisecond)
return nil
}
func main() {
client := NewAPIClient(5) // Max 5 concurrent API calls
ctx := context.Background()
// Make multiple API calls - semaphore limits concurrency
for i := 0; i < 10; i++ {
go func(id int) {
if err := client.CallAPI(ctx, fmt.Sprintf("/api/v1/users/%d", id)); err != nil {
fmt.Printf("Error: %v
", err)
}
}(i)
}
time.Sleep(2 * time.Second)
}Waiters are processed in order of arrival:
sem := semaphore.NewWeighted(10,
semaphore.WithFairness(semaphore.FIFO), // Default
)Use when:
Most recent waiters are processed first:
sem := semaphore.NewWeighted(10,
semaphore.WithFairness(semaphore.LIFO),
)Use when:
No fairness guarantees, maximum performance:
sem := semaphore.NewWeighted(10,
semaphore.WithFairness(semaphore.None),
)Use when:
sem := semaphore.NewWeighted(10,
semaphore.WithName("my-semaphore"),
semaphore.WithFairness(semaphore.FIFO),
semaphore.WithAcquireTimeout(5*time.Second),
semaphore.WithLogger(myLogger),
semaphore.WithMetrics(myMetrics),
)defer to ensure permits are releasedProblem: Deadlock when all permits are acquired but never released
// Bad: Permit never released
sem.Acquire(ctx, 1)
// ... operation ...
// Forgot to release!Solution: Always use defer
// Good: Always released
if err := sem.Acquire(ctx, 1); err != nil {
return err
}
defer sem.Release(1)Problem: Panic when releasing more permits than acquired
// Bad
sem.Acquire(ctx, 1)
sem.Release(2) // Panic!Solution: Always release exactly what you acquired
// Good
weight := int64(1)
sem.Acquire(ctx, weight)
defer sem.Release(weight)Problem: Acquire blocks indefinitely
// Bad: No timeout
sem.Acquire(context.Background(), 1)Solution: Always use context with timeout
// Good: With timeout
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := sem.Acquire(ctx, 1); err != nil {
return err
}Problem: Too small capacity causes contention, too large wastes resources
Solution: Size based on actual resource constraints
// Good: Based on actual database connection limit
sem := semaphore.NewWeighted(int64(db.MaxOpenConns()))pool := workerpool.New(10, 100)
sem := semaphore.NewWeighted(5) // Limit resource access
pool.Submit(ctx, func(ctx context.Context) error {
if err := sem.Acquire(ctx, 1); err != nil {
return err
}
defer sem.Release(1)
// Use limited resource
return nil
})cb := circuit.New("service")
sem := semaphore.NewWeighted(10)
_, err := cb.Execute(ctx, func(ctx context.Context) (any, error) {
if err := sem.Acquire(ctx, 1); err != nil {
return nil, err
}
defer sem.Release(1)
// Protected operation
return operation(ctx)
})Import Path: github.com/kolosys/ion/semaphore
Semaphores control access to shared resources with weighted permits and configurable fairness modes. They're essential for managing concurrent access to limited resources like database connections, file handles, or memory.
Semaphores allow a fixed number of concurrent operations. When all permits are acquired, additional requests must wait until permits are released.
┌─────────────────────┐
│ Semaphore │
│ Capacity: 10 │
├─────────────────────┤
│ Available: 7 │
│ Acquired: 3 │
│ Waiting: 2 │
└─────────────────────┘Semaphores support weighted permits, allowing operations to acquire multiple permits:
sem := semaphore.NewWeighted(10) // Total capacity: 10
// Acquire 1 permit
sem.Acquire(ctx, 1)
// Acquire 3 permits (for larger operations)
sem.Acquire(ctx, 3)
// Release permits
sem.Release(1)
sem.Release(3)Try to acquire permits without blocking:
if sem.TryAcquire(1) {
// Permit acquired
defer sem.Release(1)
// Use resource
} else {
// No permits available
}Wait for permits with context support:
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := sem.Acquire(ctx, 1); err != nil {
// Context canceled or timeout
return err
}
defer sem.Release(1)
// Use resourceLimit concurrent database connections:
package main
import (
"context"
"database/sql"
"fmt"
"github.com/kolosys/ion/semaphore"
_ "github.com/lib/pq"
)
type PooledDB struct {
db *sql.DB
sem semaphore.Semaphore
}
func NewPooledDB(dsn string, maxConnections int) (*PooledDB, error) {
db, err := sql.Open("postgres", dsn)
if err != nil {
return nil, err
}
db.SetMaxOpenConns(maxConnections)
return &PooledDB{
db: db,
sem: semaphore.NewWeighted(int64(maxConnections),
semaphore.WithName("db-pool"),
semaphore.WithFairness(semaphore.FIFO),
),
}, nil
}
func (p *PooledDB) Query(ctx context.Context, query string, args ...any) (*sql.Rows, error) {
// Acquire a connection permit
if err := p.sem.Acquire(ctx, 1); err != nil {
return nil, fmt.Errorf("failed to acquire connection: %w", err)
}
defer p.sem.Release(1)
// Use the connection
return p.db.QueryContext(ctx, query, args...)
}
func (p *PooledDB) Exec(ctx context.Context, query string, args ...any) (sql.Result, error) {
if err := p.sem.Acquire(ctx, 1); err != nil {
return nil, fmt.Errorf("failed to acquire connection: %w", err)
}
defer p.sem.Release(1)
return p.db.ExecContext(ctx, query, args...)
}
func main() {
db, err := NewPooledDB("postgres://...", 10)
if err != nil {
panic(err)
}
ctx := context.Background()
rows, err := db.Query(ctx, "SELECT * FROM users")
if err != nil {
panic(err)
}
defer rows.Close()
}Control concurrent file operations:
package main
import (
"context"
"fmt"
"os"
"github.com/kolosys/ion/semaphore"
)
type FileProcessor struct {
sem semaphore.Semaphore
}
func NewFileProcessor(maxConcurrent int) *FileProcessor {
return &FileProcessor{
sem: semaphore.NewWeighted(int64(maxConcurrent),
semaphore.WithName("file-processor"),
),
}
}
func (fp *FileProcessor) ProcessFile(ctx context.Context, path string) error {
// Acquire permit for file operation
if err := fp.sem.Acquire(ctx, 1); err != nil {
return fmt.Errorf("failed to acquire file permit: %w", err)
}
defer fp.sem.Release(1)
// Process file
file, err := os.Open(path)
if err != nil {
return err
}
defer file.Close()
// Read and process file
fmt.Printf("Processing %s
", path)
return nil
}Limit memory-intensive operations:
package main
import (
"context"
"fmt"
"github.com/kolosys/ion/semaphore"
)
type ImageProcessor struct {
sem semaphore.Semaphore
}
func NewImageProcessor(maxConcurrent int) *ImageProcessor {
// Each image processing operation uses significant memory
// Limit concurrent operations to prevent OOM
return &ImageProcessor{
sem: semaphore.NewWeighted(int64(maxConcurrent),
semaphore.WithName("image-processor"),
),
}
}
func (ip *ImageProcessor) ProcessImage(ctx context.Context, imageData []byte) error {
// Acquire permit (each image uses ~50MB)
// With capacity 4, max memory usage is ~200MB
if err := ip.sem.Acquire(ctx, 1); err != nil {
return fmt.Errorf("failed to acquire processing permit: %w", err)
}
defer ip.sem.Release(1)
// Process image (memory-intensive)
fmt.Printf("Processing image (%d bytes)
", len(imageData))
return nil
}Use weighted permits for operations with different resource requirements:
package main
import (
"context"
"fmt"
"github.com/kolosys/ion/semaphore"
)
type ResourceManager struct {
sem semaphore.Semaphore
}
func NewResourceManager() *ResourceManager {
// Total capacity: 10 units
return &ResourceManager{
sem: semaphore.NewWeighted(10,
semaphore.WithName("resource-manager"),
),
}
}
func (rm *ResourceManager) ProcessSmall(ctx context.Context) error {
// Small operation: 1 unit
if err := rm.sem.Acquire(ctx, 1); err != nil {
return err
}
defer rm.sem.Release(1)
fmt.Println("Processing small operation")
return nil
}
func (rm *ResourceManager) ProcessLarge(ctx context.Context) error {
// Large operation: 5 units
if err := rm.sem.Acquire(ctx, 5); err != nil {
return err
}
defer rm.sem.Release(5)
fmt.Println("Processing large operation")
return nil
}
func main() {
rm := NewResourceManager()
ctx := context.Background()
// Can run 10 small operations concurrently
// Or 2 large operations concurrently
// Or mix: 5 small + 1 large = 10 units
rm.ProcessSmall(ctx)
rm.ProcessLarge(ctx)
}Use semaphore to limit concurrent API calls:
package main
import (
"context"
"fmt"
"time"
"github.com/kolosys/ion/semaphore"
)
type APIClient struct {
sem semaphore.Semaphore
}
func NewAPIClient(maxConcurrent int) *APIClient {
return &APIClient{
sem: semaphore.NewWeighted(int64(maxConcurrent),
semaphore.WithName("api-client"),
semaphore.WithFairness(semaphore.FIFO),
),
}
}
func (c *APIClient) CallAPI(ctx context.Context, endpoint string) error {
// Limit concurrent API calls
if err := c.sem.Acquire(ctx, 1); err != nil {
return fmt.Errorf("failed to acquire API permit: %w", err)
}
defer c.sem.Release(1)
// Make API call
fmt.Printf("Calling %s
", endpoint)
time.Sleep(100 * time.Millisecond)
return nil
}
func main() {
client := NewAPIClient(5) // Max 5 concurrent API calls
ctx := context.Background()
// Make multiple API calls - semaphore limits concurrency
for i := 0; i < 10; i++ {
go func(id int) {
if err := client.CallAPI(ctx, fmt.Sprintf("/api/v1/users/%d", id)); err != nil {
fmt.Printf("Error: %v
", err)
}
}(i)
}
time.Sleep(2 * time.Second)
}Waiters are processed in order of arrival:
sem := semaphore.NewWeighted(10,
semaphore.WithFairness(semaphore.FIFO), // Default
)Use when:
Most recent waiters are processed first:
sem := semaphore.NewWeighted(10,
semaphore.WithFairness(semaphore.LIFO),
)Use when:
No fairness guarantees, maximum performance:
sem := semaphore.NewWeighted(10,
semaphore.WithFairness(semaphore.None),
)Use when:
sem := semaphore.NewWeighted(10,
semaphore.WithName("my-semaphore"),
semaphore.WithFairness(semaphore.FIFO),
semaphore.WithAcquireTimeout(5*time.Second),
semaphore.WithLogger(myLogger),
semaphore.WithMetrics(myMetrics),
)defer to ensure permits are releasedProblem: Deadlock when all permits are acquired but never released
// Bad: Permit never released
sem.Acquire(ctx, 1)
// ... operation ...
// Forgot to release!Solution: Always use defer
// Good: Always released
if err := sem.Acquire(ctx, 1); err != nil {
return err
}
defer sem.Release(1)Problem: Panic when releasing more permits than acquired
// Bad
sem.Acquire(ctx, 1)
sem.Release(2) // Panic!Solution: Always release exactly what you acquired
// Good
weight := int64(1)
sem.Acquire(ctx, weight)
defer sem.Release(weight)Problem: Acquire blocks indefinitely
// Bad: No timeout
sem.Acquire(context.Background(), 1)Solution: Always use context with timeout
// Good: With timeout
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := sem.Acquire(ctx, 1); err != nil {
return err
}Problem: Too small capacity causes contention, too large wastes resources
Solution: Size based on actual resource constraints
// Good: Based on actual database connection limit
sem := semaphore.NewWeighted(int64(db.MaxOpenConns()))pool := workerpool.New(10, 100)
sem := semaphore.NewWeighted(5) // Limit resource access
pool.Submit(ctx, func(ctx context.Context) error {
if err := sem.Acquire(ctx, 1); err != nil {
return err
}
defer sem.Release(1)
// Use limited resource
return nil
})cb := circuit.New("service")
sem := semaphore.NewWeighted(10)
_, err := cb.Execute(ctx, func(ctx context.Context) (any, error) {
if err := sem.Acquire(ctx, 1); err != nil {
return nil, err
}
defer sem.Release(1)
// Protected operation
return operation(ctx)
})┌─────────────────────┐
│ Semaphore │
│ Capacity: 10 │
├─────────────────────┤
│ Available: 7 │
│ Acquired: 3 │
│ Waiting: 2 │
└─────────────────────┘sem := semaphore.NewWeighted(10) // Total capacity: 10
// Acquire 1 permit
sem.Acquire(ctx, 1)
// Acquire 3 permits (for larger operations)
sem.Acquire(ctx, 3)
// Release permits
sem.Release(1)
sem.Release(3)if sem.TryAcquire(1) {
// Permit acquired
defer sem.Release(1)
// Use resource
} else {
// No permits available
}ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := sem.Acquire(ctx, 1); err != nil {
// Context canceled or timeout
return err
}
defer sem.Release(1)
// Use resourcepackage main
import (
"context"
"database/sql"
"fmt"
"github.com/kolosys/ion/semaphore"
_ "github.com/lib/pq"
)
type PooledDB struct {
db *sql.DB
sem semaphore.Semaphore
}
func NewPooledDB(dsn string, maxConnections int) (*PooledDB, error) {
db, err := sql.Open("postgres", dsn)
if err != nil {
return nil, err
}
db.SetMaxOpenConns(maxConnections)
return &PooledDB{
db: db,
sem: semaphore.NewWeighted(int64(maxConnections),
semaphore.WithName("db-pool"),
semaphore.WithFairness(semaphore.FIFO),
),
}, nil
}
func (p *PooledDB) Query(ctx context.Context, query string, args ...any) (*sql.Rows, error) {
// Acquire a connection permit
if err := p.sem.Acquire(ctx, 1); err != nil {
return nil, fmt.Errorf("failed to acquire connection: %w", err)
}
defer p.sem.Release(1)
// Use the connection
return p.db.QueryContext(ctx, query, args...)
}
func (p *PooledDB) Exec(ctx context.Context, query string, args ...any) (sql.Result, error) {
if err := p.sem.Acquire(ctx, 1); err != nil {
return nil, fmt.Errorf("failed to acquire connection: %w", err)
}
defer p.sem.Release(1)
return p.db.ExecContext(ctx, query, args...)
}
func main() {
db, err := NewPooledDB("postgres://...", 10)
if err != nil {
panic(err)
}
ctx := context.Background()
rows, err := db.Query(ctx, "SELECT * FROM users")
if err != nil {
panic(err)
}
defer rows.Close()
}package main
import (
"context"
"fmt"
"os"
"github.com/kolosys/ion/semaphore"
)
type FileProcessor struct {
sem semaphore.Semaphore
}
func NewFileProcessor(maxConcurrent int) *FileProcessor {
return &FileProcessor{
sem: semaphore.NewWeighted(int64(maxConcurrent),
semaphore.WithName("file-processor"),
),
}
}
func (fp *FileProcessor) ProcessFile(ctx context.Context, path string) error {
// Acquire permit for file operation
if err := fp.sem.Acquire(ctx, 1); err != nil {
return fmt.Errorf("failed to acquire file permit: %w", err)
}
defer fp.sem.Release(1)
// Process file
file, err := os.Open(path)
if err != nil {
return err
}
defer file.Close()
// Read and process file
fmt.Printf("Processing %s
", path)
return nil
}package main
import (
"context"
"fmt"
"github.com/kolosys/ion/semaphore"
)
type ImageProcessor struct {
sem semaphore.Semaphore
}
func NewImageProcessor(maxConcurrent int) *ImageProcessor {
// Each image processing operation uses significant memory
// Limit concurrent operations to prevent OOM
return &ImageProcessor{
sem: semaphore.NewWeighted(int64(maxConcurrent),
semaphore.WithName("image-processor"),
),
}
}
func (ip *ImageProcessor) ProcessImage(ctx context.Context, imageData []byte) error {
// Acquire permit (each image uses ~50MB)
// With capacity 4, max memory usage is ~200MB
if err := ip.sem.Acquire(ctx, 1); err != nil {
return fmt.Errorf("failed to acquire processing permit: %w", err)
}
defer ip.sem.Release(1)
// Process image (memory-intensive)
fmt.Printf("Processing image (%d bytes)
", len(imageData))
return nil
}package main
import (
"context"
"fmt"
"github.com/kolosys/ion/semaphore"
)
type ResourceManager struct {
sem semaphore.Semaphore
}
func NewResourceManager() *ResourceManager {
// Total capacity: 10 units
return &ResourceManager{
sem: semaphore.NewWeighted(10,
semaphore.WithName("resource-manager"),
),
}
}
func (rm *ResourceManager) ProcessSmall(ctx context.Context) error {
// Small operation: 1 unit
if err := rm.sem.Acquire(ctx, 1); err != nil {
return err
}
defer rm.sem.Release(1)
fmt.Println("Processing small operation")
return nil
}
func (rm *ResourceManager) ProcessLarge(ctx context.Context) error {
// Large operation: 5 units
if err := rm.sem.Acquire(ctx, 5); err != nil {
return err
}
defer rm.sem.Release(5)
fmt.Println("Processing large operation")
return nil
}
func main() {
rm := NewResourceManager()
ctx := context.Background()
// Can run 10 small operations concurrently
// Or 2 large operations concurrently
// Or mix: 5 small + 1 large = 10 units
rm.ProcessSmall(ctx)
rm.ProcessLarge(ctx)
}package main
import (
"context"
"fmt"
"time"
"github.com/kolosys/ion/semaphore"
)
type APIClient struct {
sem semaphore.Semaphore
}
func NewAPIClient(maxConcurrent int) *APIClient {
return &APIClient{
sem: semaphore.NewWeighted(int64(maxConcurrent),
semaphore.WithName("api-client"),
semaphore.WithFairness(semaphore.FIFO),
),
}
}
func (c *APIClient) CallAPI(ctx context.Context, endpoint string) error {
// Limit concurrent API calls
if err := c.sem.Acquire(ctx, 1); err != nil {
return fmt.Errorf("failed to acquire API permit: %w", err)
}
defer c.sem.Release(1)
// Make API call
fmt.Printf("Calling %s
", endpoint)
time.Sleep(100 * time.Millisecond)
return nil
}
func main() {
client := NewAPIClient(5) // Max 5 concurrent API calls
ctx := context.Background()
// Make multiple API calls - semaphore limits concurrency
for i := 0; i < 10; i++ {
go func(id int) {
if err := client.CallAPI(ctx, fmt.Sprintf("/api/v1/users/%d", id)); err != nil {
fmt.Printf("Error: %v
", err)
}
}(i)
}
time.Sleep(2 * time.Second)
}sem := semaphore.NewWeighted(10,
semaphore.WithFairness(semaphore.FIFO), // Default
)sem := semaphore.NewWeighted(10,
semaphore.WithFairness(semaphore.LIFO),
)sem := semaphore.NewWeighted(10,
semaphore.WithFairness(semaphore.None),
)sem := semaphore.NewWeighted(10,
semaphore.WithName("my-semaphore"),
semaphore.WithFairness(semaphore.FIFO),
semaphore.WithAcquireTimeout(5*time.Second),
semaphore.WithLogger(myLogger),
semaphore.WithMetrics(myMetrics),
)// Bad: Permit never released
sem.Acquire(ctx, 1)
// ... operation ...
// Forgot to release!// Good: Always released
if err := sem.Acquire(ctx, 1); err != nil {
return err
}
defer sem.Release(1)// Bad
sem.Acquire(ctx, 1)
sem.Release(2) // Panic!// Good
weight := int64(1)
sem.Acquire(ctx, weight)
defer sem.Release(weight)// Bad: No timeout
sem.Acquire(context.Background(), 1)// Good: With timeout
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := sem.Acquire(ctx, 1); err != nil {
return err
}// Good: Based on actual database connection limit
sem := semaphore.NewWeighted(int64(db.MaxOpenConns()))pool := workerpool.New(10, 100)
sem := semaphore.NewWeighted(5) // Limit resource access
pool.Submit(ctx, func(ctx context.Context) error {
if err := sem.Acquire(ctx, 1); err != nil {
return err
}
defer sem.Release(1)
// Use limited resource
return nil
})cb := circuit.New("service")
sem := semaphore.NewWeighted(10)
_, err := cb.Execute(ctx, func(ctx context.Context) (any, error) {
if err := sem.Acquire(ctx, 1); err != nil {
return nil, err
}
defer sem.Release(1)
// Protected operation
return operation(ctx)
})┌─────────────────────┐
│ Semaphore │
│ Capacity: 10 │
├─────────────────────┤
│ Available: 7 │
│ Acquired: 3 │
│ Waiting: 2 │
└─────────────────────┘sem := semaphore.NewWeighted(10) // Total capacity: 10
// Acquire 1 permit
sem.Acquire(ctx, 1)
// Acquire 3 permits (for larger operations)
sem.Acquire(ctx, 3)
// Release permits
sem.Release(1)
sem.Release(3)if sem.TryAcquire(1) {
// Permit acquired
defer sem.Release(1)
// Use resource
} else {
// No permits available
}ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := sem.Acquire(ctx, 1); err != nil {
// Context canceled or timeout
return err
}
defer sem.Release(1)
// Use resourcepackage main
import (
"context"
"database/sql"
"fmt"
"github.com/kolosys/ion/semaphore"
_ "github.com/lib/pq"
)
type PooledDB struct {
db *sql.DB
sem semaphore.Semaphore
}
func NewPooledDB(dsn string, maxConnections int) (*PooledDB, error) {
db, err := sql.Open("postgres", dsn)
if err != nil {
return nil, err
}
db.SetMaxOpenConns(maxConnections)
return &PooledDB{
db: db,
sem: semaphore.NewWeighted(int64(maxConnections),
semaphore.WithName("db-pool"),
semaphore.WithFairness(semaphore.FIFO),
),
}, nil
}
func (p *PooledDB) Query(ctx context.Context, query string, args ...any) (*sql.Rows, error) {
// Acquire a connection permit
if err := p.sem.Acquire(ctx, 1); err != nil {
return nil, fmt.Errorf("failed to acquire connection: %w", err)
}
defer p.sem.Release(1)
// Use the connection
return p.db.QueryContext(ctx, query, args...)
}
func (p *PooledDB) Exec(ctx context.Context, query string, args ...any) (sql.Result, error) {
if err := p.sem.Acquire(ctx, 1); err != nil {
return nil, fmt.Errorf("failed to acquire connection: %w", err)
}
defer p.sem.Release(1)
return p.db.ExecContext(ctx, query, args...)
}
func main() {
db, err := NewPooledDB("postgres://...", 10)
if err != nil {
panic(err)
}
ctx := context.Background()
rows, err := db.Query(ctx, "SELECT * FROM users")
if err != nil {
panic(err)
}
defer rows.Close()
}package main
import (
"context"
"fmt"
"os"
"github.com/kolosys/ion/semaphore"
)
type FileProcessor struct {
sem semaphore.Semaphore
}
func NewFileProcessor(maxConcurrent int) *FileProcessor {
return &FileProcessor{
sem: semaphore.NewWeighted(int64(maxConcurrent),
semaphore.WithName("file-processor"),
),
}
}
func (fp *FileProcessor) ProcessFile(ctx context.Context, path string) error {
// Acquire permit for file operation
if err := fp.sem.Acquire(ctx, 1); err != nil {
return fmt.Errorf("failed to acquire file permit: %w", err)
}
defer fp.sem.Release(1)
// Process file
file, err := os.Open(path)
if err != nil {
return err
}
defer file.Close()
// Read and process file
fmt.Printf("Processing %s
", path)
return nil
}package main
import (
"context"
"fmt"
"github.com/kolosys/ion/semaphore"
)
type ImageProcessor struct {
sem semaphore.Semaphore
}
func NewImageProcessor(maxConcurrent int) *ImageProcessor {
// Each image processing operation uses significant memory
// Limit concurrent operations to prevent OOM
return &ImageProcessor{
sem: semaphore.NewWeighted(int64(maxConcurrent),
semaphore.WithName("image-processor"),
),
}
}
func (ip *ImageProcessor) ProcessImage(ctx context.Context, imageData []byte) error {
// Acquire permit (each image uses ~50MB)
// With capacity 4, max memory usage is ~200MB
if err := ip.sem.Acquire(ctx, 1); err != nil {
return fmt.Errorf("failed to acquire processing permit: %w", err)
}
defer ip.sem.Release(1)
// Process image (memory-intensive)
fmt.Printf("Processing image (%d bytes)
", len(imageData))
return nil
}package main
import (
"context"
"fmt"
"github.com/kolosys/ion/semaphore"
)
type ResourceManager struct {
sem semaphore.Semaphore
}
func NewResourceManager() *ResourceManager {
// Total capacity: 10 units
return &ResourceManager{
sem: semaphore.NewWeighted(10,
semaphore.WithName("resource-manager"),
),
}
}
func (rm *ResourceManager) ProcessSmall(ctx context.Context) error {
// Small operation: 1 unit
if err := rm.sem.Acquire(ctx, 1); err != nil {
return err
}
defer rm.sem.Release(1)
fmt.Println("Processing small operation")
return nil
}
func (rm *ResourceManager) ProcessLarge(ctx context.Context) error {
// Large operation: 5 units
if err := rm.sem.Acquire(ctx, 5); err != nil {
return err
}
defer rm.sem.Release(5)
fmt.Println("Processing large operation")
return nil
}
func main() {
rm := NewResourceManager()
ctx := context.Background()
// Can run 10 small operations concurrently
// Or 2 large operations concurrently
// Or mix: 5 small + 1 large = 10 units
rm.ProcessSmall(ctx)
rm.ProcessLarge(ctx)
}package main
import (
"context"
"fmt"
"time"
"github.com/kolosys/ion/semaphore"
)
type APIClient struct {
sem semaphore.Semaphore
}
func NewAPIClient(maxConcurrent int) *APIClient {
return &APIClient{
sem: semaphore.NewWeighted(int64(maxConcurrent),
semaphore.WithName("api-client"),
semaphore.WithFairness(semaphore.FIFO),
),
}
}
func (c *APIClient) CallAPI(ctx context.Context, endpoint string) error {
// Limit concurrent API calls
if err := c.sem.Acquire(ctx, 1); err != nil {
return fmt.Errorf("failed to acquire API permit: %w", err)
}
defer c.sem.Release(1)
// Make API call
fmt.Printf("Calling %s
", endpoint)
time.Sleep(100 * time.Millisecond)
return nil
}
func main() {
client := NewAPIClient(5) // Max 5 concurrent API calls
ctx := context.Background()
// Make multiple API calls - semaphore limits concurrency
for i := 0; i < 10; i++ {
go func(id int) {
if err := client.CallAPI(ctx, fmt.Sprintf("/api/v1/users/%d", id)); err != nil {
fmt.Printf("Error: %v
", err)
}
}(i)
}
time.Sleep(2 * time.Second)
}sem := semaphore.NewWeighted(10,
semaphore.WithFairness(semaphore.FIFO), // Default
)sem := semaphore.NewWeighted(10,
semaphore.WithFairness(semaphore.LIFO),
)sem := semaphore.NewWeighted(10,
semaphore.WithFairness(semaphore.None),
)sem := semaphore.NewWeighted(10,
semaphore.WithName("my-semaphore"),
semaphore.WithFairness(semaphore.FIFO),
semaphore.WithAcquireTimeout(5*time.Second),
semaphore.WithLogger(myLogger),
semaphore.WithMetrics(myMetrics),
)// Bad: Permit never released
sem.Acquire(ctx, 1)
// ... operation ...
// Forgot to release!// Good: Always released
if err := sem.Acquire(ctx, 1); err != nil {
return err
}
defer sem.Release(1)// Bad
sem.Acquire(ctx, 1)
sem.Release(2) // Panic!// Good
weight := int64(1)
sem.Acquire(ctx, weight)
defer sem.Release(weight)// Bad: No timeout
sem.Acquire(context.Background(), 1)// Good: With timeout
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := sem.Acquire(ctx, 1); err != nil {
return err
}// Good: Based on actual database connection limit
sem := semaphore.NewWeighted(int64(db.MaxOpenConns()))pool := workerpool.New(10, 100)
sem := semaphore.NewWeighted(5) // Limit resource access
pool.Submit(ctx, func(ctx context.Context) error {
if err := sem.Acquire(ctx, 1); err != nil {
return err
}
defer sem.Release(1)
// Use limited resource
return nil
})cb := circuit.New("service")
sem := semaphore.NewWeighted(10)
_, err := cb.Execute(ctx, func(ctx context.Context) (any, error) {
if err := sem.Acquire(ctx, 1); err != nil {
return nil, err
}
defer sem.Release(1)
// Protected operation
return operation(ctx)
})