Loading documentation...
Loading documentation...
Loading documentation...
The emitter package provides direct event emission with synchronous and asynchronous processing, middleware support, and concurrency control.
Import Path: github.com/kolosys/nova/emitter
The EventEmitter is the simplest way to publish and subscribe to events in Nova. It maps event types to listeners directly, without the topic abstraction of the Bus.
ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ
β Event βββββΆβ Emitter βββββΆβ Listeners β
β user.createdβ β middleware β β handler-1 β
ββββββββββββββββ β validation β β handler-2 β
β concurrency β ββββββββββββββββ
ββββββββββββββββEvery emitter requires an Ion workerpool:
import (
"github.com/kolosys/ion/workerpool"
"github.com/kolosys/nova/emitter"
)
pool := workerpool.New(4, 100)
defer pool.Close(ctx)
em := emitter.New(emitter.Config{
WorkerPool: pool, // Required
AsyncMode: true, // Use async by default for batches
BufferSize: 1000, // Async queue size
MaxConcurrency: 10, // Max concurrent handlers per subscription
Name: "main-emitter", // For metrics identification
})
defer em.Shutdown(ctx)| Option | Default | Description |
|---|---|---|
WorkerPool | required | Ion workerpool for async processing |
AsyncMode | false | Default mode for batch operations |
BufferSize | 1000 | Size of the async event queue |
MaxConcurrency | 10 | Max concurrent processing per subscription |
MetricsCollector | no-op | Custom metrics implementation |
EventValidator | default | Custom event validation |
Name | "emitter" | Instance identifier for metrics |
Subscribe listeners to specific event types:
// Create a listener
listener := shared.NewBaseListener("user-handler", func(event shared.Event) error {
userData := event.Data().(map[string]any)
fmt.Printf("User: %s
", userData["name"])
return nil
})
// Subscribe returns a Subscription
sub := em.Subscribe("user.created", listener)
// Multiple listeners can subscribe to the same event type
auditListener := shared.NewBaseListener("audit-logger", logEvent)
em.Subscribe("user.created", auditListener)
// Unsubscribe when done
defer sub.Unsubscribe()Emit processes the event immediately and blocks until all listeners complete:
event := shared.NewBaseEvent("user-123", "user.created", userData)
if err := em.Emit(ctx, event); err != nil {
log.Printf("Event processing failed: %v", err)
}Use synchronous emission when:
EmitAsync queues the event and returns immediately:
if err := em.EmitAsync(ctx, event); err != nil {
if errors.Is(err, shared.ErrBufferFull) {
// Apply backpressure
}
log.Printf("Failed to queue event: %v", err)
}Use asynchronous emission when:
EmitBatch processes multiple events efficiently:
events := []shared.Event{event1, event2, event3}
if err := em.EmitBatch(ctx, events); err != nil {
log.Printf("Batch failed at: %v", err)
}The batch respects the AsyncMode configuration.
Middleware intercepts events before and after processing:
// Logging middleware
loggingMW := shared.MiddlewareFunc{
BeforeFunc: func(event shared.Event) error {
log.Printf("[β] %s (%s)", event.ID(), event.Type())
return nil
},
AfterFunc: func(event shared.Event, err error) error {
if err != nil {
log.Printf("[β] %s failed: %v", event.ID(), err)
} else {
log.Printf("[β] %s completed", event.ID())
}
return nil
},
}
// Tracing middleware
tracingMW := shared.MiddlewareFunc{
BeforeFunc: func(event shared.Event) error {
if be, ok := event.(*shared.BaseEvent); ok {
be.SetMetadata("trace_start", time.Now().Format(time.RFC3339Nano))
}
return nil
},
}
// Validation middleware
validationMW := shared.MiddlewareFunc{
BeforeFunc: func(event shared.Event) error {
if event.Data() == nil {
return errors.New("event data required")
}
return nil
},
}
// Add middleware (order matters - first added runs first)
em.Middleware(tracingMW, loggingMW, validationMW)Middleware execution order:
Before runs in order of additionAfter runs in reverse order (last added runs first)Each subscription has independent concurrency control:
// Configure via emitter
em := emitter.New(emitter.Config{
WorkerPool: pool,
MaxConcurrency: 20, // Each subscription can process 20 events concurrently
})When concurrency is saturated:
Monitor emitter health and performance:
stats := em.Stats()
fmt.Printf("Events emitted: %d
", stats.EventsEmitted)
fmt.Printf("Events processed: %d
", stats.EventsProcessed)
fmt.Printf("Active listeners: %d
", stats.ActiveListeners)
fmt.Printf("Failed events: %d
", stats.FailedEvents)
fmt.Printf("Queued events: %d
", stats.QueuedEvents)
fmt.Printf("Middleware errors: %d
", stats.MiddlewareErrors)Always shut down emitters properly:
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := em.Shutdown(ctx); err != nil {
log.Printf("Shutdown error: %v", err)
}During shutdown:
ErrEmitterClosedEvents are validated before processing:
event := shared.NewBaseEvent("", "user.created", nil) // Empty ID
err := em.Emit(ctx, event)
// Returns: EventError containing ValidationErrorErrors from listeners are wrapped with context:
listener := shared.NewBaseListener("failing", func(event shared.Event) error {
return errors.New("processing failed")
})
err := em.Emit(ctx, event)
// Returns: ListenerError with listener ID and event contextAsync emission fails when the buffer is full:
err := em.EmitAsync(ctx, event)
if errors.Is(err, shared.ErrBufferFull) {
// Apply backpressure: slow down, retry later, or drop
}For non-critical events:
go func() {
_ = em.EmitAsync(ctx, event)
}()For synchronous workflows:
resultCh := make(chan Result, 1)
listener := shared.NewBaseListener("responder", func(event shared.Event) error {
result := processEvent(event)
resultCh <- result
return nil
})
sub := em.Subscribe(event.Type(), listener)
defer sub.Unsubscribe()
if err := em.Emit(ctx, event); err != nil {
return nil, err
}
select {
case result := <-resultCh:
return result, nil
case <-ctx.Done():
return nil, ctx.Err()
}Multiple listeners process the same event:
for i := 0; i < 3; i++ {
listener := shared.NewBaseListener(
fmt.Sprintf("worker-%d", i),
processEvent,
)
em.Subscribe("work.item", listener)
}The emitter package provides direct event emission with synchronous and asynchronous processing, middleware support, and concurrency control.
Import Path: github.com/kolosys/nova/emitter
The EventEmitter is the simplest way to publish and subscribe to events in Nova. It maps event types to listeners directly, without the topic abstraction of the Bus.
ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ
β Event βββββΆβ Emitter βββββΆβ Listeners β
β user.createdβ β middleware β β handler-1 β
ββββββββββββββββ β validation β β handler-2 β
β concurrency β ββββββββββββββββ
ββββββββββββββββEvery emitter requires an Ion workerpool:
import (
"github.com/kolosys/ion/workerpool"
"github.com/kolosys/nova/emitter"
)
pool := workerpool.New(4, 100)
defer pool.Close(ctx)
em := emitter.New(emitter.Config{
WorkerPool: pool, // Required
AsyncMode: true, // Use async by default for batches
BufferSize: 1000, // Async queue size
MaxConcurrency: 10, // Max concurrent handlers per subscription
Name: "main-emitter", // For metrics identification
})
defer em.Shutdown(ctx)| Option | Default | Description |
|---|---|---|
WorkerPool | required | Ion workerpool for async processing |
AsyncMode | false | Default mode for batch operations |
BufferSize | 1000 | Size of the async event queue |
MaxConcurrency | 10 | Max concurrent processing per subscription |
MetricsCollector | no-op | Custom metrics implementation |
EventValidator | default | Custom event validation |
Name | "emitter" | Instance identifier for metrics |
Subscribe listeners to specific event types:
// Create a listener
listener := shared.NewBaseListener("user-handler", func(event shared.Event) error {
userData := event.Data().(map[string]any)
fmt.Printf("User: %s
", userData["name"])
return nil
})
// Subscribe returns a Subscription
sub := em.Subscribe("user.created", listener)
// Multiple listeners can subscribe to the same event type
auditListener := shared.NewBaseListener("audit-logger", logEvent)
em.Subscribe("user.created", auditListener)
// Unsubscribe when done
defer sub.Unsubscribe()Emit processes the event immediately and blocks until all listeners complete:
event := shared.NewBaseEvent("user-123", "user.created", userData)
if err := em.Emit(ctx, event); err != nil {
log.Printf("Event processing failed: %v", err)
}Use synchronous emission when:
EmitAsync queues the event and returns immediately:
if err := em.EmitAsync(ctx, event); err != nil {
if errors.Is(err, shared.ErrBufferFull) {
// Apply backpressure
}
log.Printf("Failed to queue event: %v", err)
}Use asynchronous emission when:
EmitBatch processes multiple events efficiently:
events := []shared.Event{event1, event2, event3}
if err := em.EmitBatch(ctx, events); err != nil {
log.Printf("Batch failed at: %v", err)
}The batch respects the AsyncMode configuration.
Middleware intercepts events before and after processing:
// Logging middleware
loggingMW := shared.MiddlewareFunc{
BeforeFunc: func(event shared.Event) error {
log.Printf("[β] %s (%s)", event.ID(), event.Type())
return nil
},
AfterFunc: func(event shared.Event, err error) error {
if err != nil {
log.Printf("[β] %s failed: %v", event.ID(), err)
} else {
log.Printf("[β] %s completed", event.ID())
}
return nil
},
}
// Tracing middleware
tracingMW := shared.MiddlewareFunc{
BeforeFunc: func(event shared.Event) error {
if be, ok := event.(*shared.BaseEvent); ok {
be.SetMetadata("trace_start", time.Now().Format(time.RFC3339Nano))
}
return nil
},
}
// Validation middleware
validationMW := shared.MiddlewareFunc{
BeforeFunc: func(event shared.Event) error {
if event.Data() == nil {
return errors.New("event data required")
}
return nil
},
}
// Add middleware (order matters - first added runs first)
em.Middleware(tracingMW, loggingMW, validationMW)Middleware execution order:
Before runs in order of additionAfter runs in reverse order (last added runs first)Each subscription has independent concurrency control:
// Configure via emitter
em := emitter.New(emitter.Config{
WorkerPool: pool,
MaxConcurrency: 20, // Each subscription can process 20 events concurrently
})When concurrency is saturated:
Monitor emitter health and performance:
stats := em.Stats()
fmt.Printf("Events emitted: %d
", stats.EventsEmitted)
fmt.Printf("Events processed: %d
", stats.EventsProcessed)
fmt.Printf("Active listeners: %d
", stats.ActiveListeners)
fmt.Printf("Failed events: %d
", stats.FailedEvents)
fmt.Printf("Queued events: %d
", stats.QueuedEvents)
fmt.Printf("Middleware errors: %d
", stats.MiddlewareErrors)Always shut down emitters properly:
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := em.Shutdown(ctx); err != nil {
log.Printf("Shutdown error: %v", err)
}During shutdown:
ErrEmitterClosedEvents are validated before processing:
event := shared.NewBaseEvent("", "user.created", nil) // Empty ID
err := em.Emit(ctx, event)
// Returns: EventError containing ValidationErrorErrors from listeners are wrapped with context:
listener := shared.NewBaseListener("failing", func(event shared.Event) error {
return errors.New("processing failed")
})
err := em.Emit(ctx, event)
// Returns: ListenerError with listener ID and event contextAsync emission fails when the buffer is full:
err := em.EmitAsync(ctx, event)
if errors.Is(err, shared.ErrBufferFull) {
// Apply backpressure: slow down, retry later, or drop
}For non-critical events:
go func() {
_ = em.EmitAsync(ctx, event)
}()For synchronous workflows:
resultCh := make(chan Result, 1)
listener := shared.NewBaseListener("responder", func(event shared.Event) error {
result := processEvent(event)
resultCh <- result
return nil
})
sub := em.Subscribe(event.Type(), listener)
defer sub.Unsubscribe()
if err := em.Emit(ctx, event); err != nil {
return nil, err
}
select {
case result := <-resultCh:
return result, nil
case <-ctx.Done():
return nil, ctx.Err()
}Multiple listeners process the same event:
for i := 0; i < 3; i++ {
listener := shared.NewBaseListener(
fmt.Sprintf("worker-%d", i),
processEvent,
)
em.Subscribe("work.item", listener)
}ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ
β Event βββββΆβ Emitter βββββΆβ Listeners β
β user.createdβ β middleware β β handler-1 β
ββββββββββββββββ β validation β β handler-2 β
β concurrency β ββββββββββββββββ
ββββββββββββββββimport (
"github.com/kolosys/ion/workerpool"
"github.com/kolosys/nova/emitter"
)
pool := workerpool.New(4, 100)
defer pool.Close(ctx)
em := emitter.New(emitter.Config{
WorkerPool: pool, // Required
AsyncMode: true, // Use async by default for batches
BufferSize: 1000, // Async queue size
MaxConcurrency: 10, // Max concurrent handlers per subscription
Name: "main-emitter", // For metrics identification
})
defer em.Shutdown(ctx)// Create a listener
listener := shared.NewBaseListener("user-handler", func(event shared.Event) error {
userData := event.Data().(map[string]any)
fmt.Printf("User: %s
", userData["name"])
return nil
})
// Subscribe returns a Subscription
sub := em.Subscribe("user.created", listener)
// Multiple listeners can subscribe to the same event type
auditListener := shared.NewBaseListener("audit-logger", logEvent)
em.Subscribe("user.created", auditListener)
// Unsubscribe when done
defer sub.Unsubscribe()event := shared.NewBaseEvent("user-123", "user.created", userData)
if err := em.Emit(ctx, event); err != nil {
log.Printf("Event processing failed: %v", err)
}if err := em.EmitAsync(ctx, event); err != nil {
if errors.Is(err, shared.ErrBufferFull) {
// Apply backpressure
}
log.Printf("Failed to queue event: %v", err)
}events := []shared.Event{event1, event2, event3}
if err := em.EmitBatch(ctx, events); err != nil {
log.Printf("Batch failed at: %v", err)
}// Logging middleware
loggingMW := shared.MiddlewareFunc{
BeforeFunc: func(event shared.Event) error {
log.Printf("[β] %s (%s)", event.ID(), event.Type())
return nil
},
AfterFunc: func(event shared.Event, err error) error {
if err != nil {
log.Printf("[β] %s failed: %v", event.ID(), err)
} else {
log.Printf("[β] %s completed", event.ID())
}
return nil
},
}
// Tracing middleware
tracingMW := shared.MiddlewareFunc{
BeforeFunc: func(event shared.Event) error {
if be, ok := event.(*shared.BaseEvent); ok {
be.SetMetadata("trace_start", time.Now().Format(time.RFC3339Nano))
}
return nil
},
}
// Validation middleware
validationMW := shared.MiddlewareFunc{
BeforeFunc: func(event shared.Event) error {
if event.Data() == nil {
return errors.New("event data required")
}
return nil
},
}
// Add middleware (order matters - first added runs first)
em.Middleware(tracingMW, loggingMW, validationMW)// Configure via emitter
em := emitter.New(emitter.Config{
WorkerPool: pool,
MaxConcurrency: 20, // Each subscription can process 20 events concurrently
})stats := em.Stats()
fmt.Printf("Events emitted: %d
", stats.EventsEmitted)
fmt.Printf("Events processed: %d
", stats.EventsProcessed)
fmt.Printf("Active listeners: %d
", stats.ActiveListeners)
fmt.Printf("Failed events: %d
", stats.FailedEvents)
fmt.Printf("Queued events: %d
", stats.QueuedEvents)
fmt.Printf("Middleware errors: %d
", stats.MiddlewareErrors)ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := em.Shutdown(ctx); err != nil {
log.Printf("Shutdown error: %v", err)
}event := shared.NewBaseEvent("", "user.created", nil) // Empty ID
err := em.Emit(ctx, event)
// Returns: EventError containing ValidationErrorlistener := shared.NewBaseListener("failing", func(event shared.Event) error {
return errors.New("processing failed")
})
err := em.Emit(ctx, event)
// Returns: ListenerError with listener ID and event contexterr := em.EmitAsync(ctx, event)
if errors.Is(err, shared.ErrBufferFull) {
// Apply backpressure: slow down, retry later, or drop
}go func() {
_ = em.EmitAsync(ctx, event)
}()resultCh := make(chan Result, 1)
listener := shared.NewBaseListener("responder", func(event shared.Event) error {
result := processEvent(event)
resultCh <- result
return nil
})
sub := em.Subscribe(event.Type(), listener)
defer sub.Unsubscribe()
if err := em.Emit(ctx, event); err != nil {
return nil, err
}
select {
case result := <-resultCh:
return result, nil
case <-ctx.Done():
return nil, ctx.Err()
}for i := 0; i < 3; i++ {
listener := shared.NewBaseListener(
fmt.Sprintf("worker-%d", i),
processEvent,
)
em.Subscribe("work.item", listener)
}ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ
β Event βββββΆβ Emitter βββββΆβ Listeners β
β user.createdβ β middleware β β handler-1 β
ββββββββββββββββ β validation β β handler-2 β
β concurrency β ββββββββββββββββ
ββββββββββββββββimport (
"github.com/kolosys/ion/workerpool"
"github.com/kolosys/nova/emitter"
)
pool := workerpool.New(4, 100)
defer pool.Close(ctx)
em := emitter.New(emitter.Config{
WorkerPool: pool, // Required
AsyncMode: true, // Use async by default for batches
BufferSize: 1000, // Async queue size
MaxConcurrency: 10, // Max concurrent handlers per subscription
Name: "main-emitter", // For metrics identification
})
defer em.Shutdown(ctx)// Create a listener
listener := shared.NewBaseListener("user-handler", func(event shared.Event) error {
userData := event.Data().(map[string]any)
fmt.Printf("User: %s
", userData["name"])
return nil
})
// Subscribe returns a Subscription
sub := em.Subscribe("user.created", listener)
// Multiple listeners can subscribe to the same event type
auditListener := shared.NewBaseListener("audit-logger", logEvent)
em.Subscribe("user.created", auditListener)
// Unsubscribe when done
defer sub.Unsubscribe()event := shared.NewBaseEvent("user-123", "user.created", userData)
if err := em.Emit(ctx, event); err != nil {
log.Printf("Event processing failed: %v", err)
}if err := em.EmitAsync(ctx, event); err != nil {
if errors.Is(err, shared.ErrBufferFull) {
// Apply backpressure
}
log.Printf("Failed to queue event: %v", err)
}events := []shared.Event{event1, event2, event3}
if err := em.EmitBatch(ctx, events); err != nil {
log.Printf("Batch failed at: %v", err)
}// Logging middleware
loggingMW := shared.MiddlewareFunc{
BeforeFunc: func(event shared.Event) error {
log.Printf("[β] %s (%s)", event.ID(), event.Type())
return nil
},
AfterFunc: func(event shared.Event, err error) error {
if err != nil {
log.Printf("[β] %s failed: %v", event.ID(), err)
} else {
log.Printf("[β] %s completed", event.ID())
}
return nil
},
}
// Tracing middleware
tracingMW := shared.MiddlewareFunc{
BeforeFunc: func(event shared.Event) error {
if be, ok := event.(*shared.BaseEvent); ok {
be.SetMetadata("trace_start", time.Now().Format(time.RFC3339Nano))
}
return nil
},
}
// Validation middleware
validationMW := shared.MiddlewareFunc{
BeforeFunc: func(event shared.Event) error {
if event.Data() == nil {
return errors.New("event data required")
}
return nil
},
}
// Add middleware (order matters - first added runs first)
em.Middleware(tracingMW, loggingMW, validationMW)// Configure via emitter
em := emitter.New(emitter.Config{
WorkerPool: pool,
MaxConcurrency: 20, // Each subscription can process 20 events concurrently
})stats := em.Stats()
fmt.Printf("Events emitted: %d
", stats.EventsEmitted)
fmt.Printf("Events processed: %d
", stats.EventsProcessed)
fmt.Printf("Active listeners: %d
", stats.ActiveListeners)
fmt.Printf("Failed events: %d
", stats.FailedEvents)
fmt.Printf("Queued events: %d
", stats.QueuedEvents)
fmt.Printf("Middleware errors: %d
", stats.MiddlewareErrors)ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := em.Shutdown(ctx); err != nil {
log.Printf("Shutdown error: %v", err)
}event := shared.NewBaseEvent("", "user.created", nil) // Empty ID
err := em.Emit(ctx, event)
// Returns: EventError containing ValidationErrorlistener := shared.NewBaseListener("failing", func(event shared.Event) error {
return errors.New("processing failed")
})
err := em.Emit(ctx, event)
// Returns: ListenerError with listener ID and event contexterr := em.EmitAsync(ctx, event)
if errors.Is(err, shared.ErrBufferFull) {
// Apply backpressure: slow down, retry later, or drop
}go func() {
_ = em.EmitAsync(ctx, event)
}()resultCh := make(chan Result, 1)
listener := shared.NewBaseListener("responder", func(event shared.Event) error {
result := processEvent(event)
resultCh <- result
return nil
})
sub := em.Subscribe(event.Type(), listener)
defer sub.Unsubscribe()
if err := em.Emit(ctx, event); err != nil {
return nil, err
}
select {
case result := <-resultCh:
return result, nil
case <-ctx.Done():
return nil, ctx.Err()
}for i := 0; i < 3; i++ {
listener := shared.NewBaseListener(
fmt.Sprintf("worker-%d", i),
processEvent,
)
em.Subscribe("work.item", listener)
}