Loading documentation...
Loading documentation...
Loading documentation...
The shared package provides the foundational types and interfaces used throughout Nova. All other packages depend on these core abstractions.
Import Path: github.com/kolosys/nova/shared
Events are the fundamental unit of data in Nova. The Event interface defines what every event must provide:
type Event interface {
ID() string // Unique identifier
Type() string // Event type (e.g., "user.created")
Timestamp() time.Time // When the event occurred
Data() any // Event payload
Metadata() map[string]string // Routing, tracing, and context
}Nova provides BaseEvent as a ready-to-use implementation:
// Create a simple event
event := shared.NewBaseEvent("user-123", "user.created", map[string]any{
"name": "Alice",
"email": "alice@example.com",
})
// Create an event with metadata
event := shared.NewBaseEventWithMetadata(
"order-456",
"order.completed",
orderData,
map[string]string{
"customer_id": "cust-789",
"trace_id": "abc123",
},
)
// Add metadata after creation
event.SetMetadata("partition_key", "us-west-2")
// Read metadata
if traceID, ok := event.GetMetadata("trace_id"); ok {
// Use trace ID
}Implement the Event interface for domain-specific events:
type OrderEvent struct {
id string
orderID string
items []OrderItem
total float64
createdAt time.Time
}
func (e *OrderEvent) ID() string { return e.id }
func (e *OrderEvent) Type() string { return "order.created" }
func (e *OrderEvent) Timestamp() time.Time { return e.createdAt }
func (e *OrderEvent) Data() any { return e }
func (e *OrderEvent) Metadata() map[string]string { return nil }Listeners process events. The interface is intentionally minimal:
type Listener interface {
ID() string // Unique identifier
Handle(event Event) error // Process an event
OnError(event Event, err error) error // Handle errors from Handle()
}Use BaseListener for quick listener creation:
// Simple listener
listener := shared.NewBaseListener("order-processor", func(event shared.Event) error {
order := event.Data().(map[string]any)
fmt.Printf("Processing order: %v
", order)
return nil
})
// Listener with error handler
listener := shared.NewBaseListenerWithErrorHandler(
"payment-processor",
func(event shared.Event) error {
return processPayment(event)
},
func(event shared.Event, err error) error {
log.Printf("Payment failed for %s: %v", event.ID(), err)
return nil // Suppress error propagation
},
)Subscriptions represent active connections between listeners and event sources:
type Subscription interface {
ID() string // Subscription identifier
Topic() string // Topic or event type subscribed to
Listener() Listener // The subscribed listener
Unsubscribe() error // Remove subscription
Active() bool // Check if subscription is active
}Subscriptions are returned by Subscribe methods and can be used to unsubscribe:
sub := emitter.Subscribe("user.created", listener)
// Later, to unsubscribe
if err := sub.Unsubscribe(); err != nil {
log.Printf("Unsubscribe failed: %v", err)
}
// Check if still active
if sub.Active() {
// Still receiving events
}Middleware provides hooks for cross-cutting concerns like logging, tracing, and validation:
type Middleware interface {
Before(event Event) error // Called before processing
After(event Event, err error) error // Called after processing
}Use MiddlewareFunc for inline middleware:
// Logging middleware
loggingMW := shared.MiddlewareFunc{
BeforeFunc: func(event shared.Event) error {
log.Printf("Processing: %s (%s)", event.ID(), event.Type())
return nil
},
AfterFunc: func(event shared.Event, err error) error {
if err != nil {
log.Printf("Failed: %s - %v", event.ID(), err)
}
return nil
},
}
// Validation middleware
validationMW := shared.MiddlewareFunc{
BeforeFunc: func(event shared.Event) error {
if event.Data() == nil {
return errors.New("event data cannot be nil")
}
return nil
},
}
emitter.Middleware(loggingMW, validationMW)Nova validates events before processing. The default validator checks for:
// Default validation
err := shared.DefaultEventValidator.Validate(event)
// Custom validator
customValidator := shared.EventValidatorFunc(func(event shared.Event) error {
if event.Type() != "user.created" && event.Type() != "user.updated" {
return shared.NewValidationError("type", event.Type(), "unsupported event type")
}
return nil
})Nova provides structured error types for debugging and handling:
var (
ErrEventNotFound // Event not found in store
ErrInvalidEvent // Event failed validation
ErrListenerNotFound // Listener not registered
ErrTopicNotFound // Topic does not exist
ErrSubscriptionNotFound // Subscription not found
ErrEmitterClosed // Emitter has been shut down
ErrBusClosed // Bus has been shut down
ErrBufferFull // Buffer cannot accept more events
ErrRetryLimitExceeded // All retry attempts failed
ErrTimeout // Operation timed out
)// EventError includes event context
eventErr := shared.NewEventError(event, originalErr)
fmt.Println(eventErr) // "event error [id=123, type=user.created]: original error"
// ListenerError includes listener context
listenerErr := shared.NewListenerError("my-listener", event, originalErr)
fmt.Println(listenerErr) // "listener error [id=my-listener, event=123]: original error"
// ValidationError includes field details
validErr := shared.NewValidationError("email", "invalid@", "invalid email format")
fmt.Println(validErr) // "validation error [field=email]: invalid email format"
// Unwrap to get original error
if errors.Is(eventErr, originalErr) {
// Handle specific error type
}Nova emits metrics through the MetricsCollector interface:
type MetricsCollector interface {
IncEventsEmitted(eventType, result string)
IncEventsProcessed(listenerID, result string)
ObserveListenerDuration(listenerID string, duration time.Duration)
ObserveStoreAppendDuration(duration time.Duration)
ObserveStoreReadDuration(duration time.Duration)
SetQueueSize(component string, size int)
SetActiveListeners(emitterID string, count int)
IncEventsDropped(component, reason string)
}For testing and development:
metrics := shared.NewSimpleMetricsCollector()
emitter := emitter.New(emitter.Config{
WorkerPool: pool,
MetricsCollector: metrics,
})
// Check metrics
fmt.Printf("Events emitted: %d
", metrics.GetEventsEmitted())
fmt.Printf("Events processed: %d
", metrics.GetEventsProcessed())Used by default when no collector is provided:
// Silently ignores all metrics
var _ MetricsCollector = NoOpMetricsCollector{}Create a custom collector that wraps Prometheus metrics:
type PrometheusMetrics struct {
eventsEmitted *prometheus.CounterVec
eventDuration *prometheus.HistogramVec
// ... other metrics
}
func (p *PrometheusMetrics) IncEventsEmitted(eventType, result string) {
p.eventsEmitted.WithLabelValues(eventType, result).Inc()
}
// Implement remaining interface methods...The shared package follows these principles:
The shared package provides the foundational types and interfaces used throughout Nova. All other packages depend on these core abstractions.
Import Path: github.com/kolosys/nova/shared
Events are the fundamental unit of data in Nova. The Event interface defines what every event must provide:
type Event interface {
ID() string // Unique identifier
Type() string // Event type (e.g., "user.created")
Timestamp() time.Time // When the event occurred
Data() any // Event payload
Metadata() map[string]string // Routing, tracing, and context
}Nova provides BaseEvent as a ready-to-use implementation:
// Create a simple event
event := shared.NewBaseEvent("user-123", "user.created", map[string]any{
"name": "Alice",
"email": "alice@example.com",
})
// Create an event with metadata
event := shared.NewBaseEventWithMetadata(
"order-456",
"order.completed",
orderData,
map[string]string{
"customer_id": "cust-789",
"trace_id": "abc123",
},
)
// Add metadata after creation
event.SetMetadata("partition_key", "us-west-2")
// Read metadata
if traceID, ok := event.GetMetadata("trace_id"); ok {
// Use trace ID
}Implement the Event interface for domain-specific events:
type OrderEvent struct {
id string
orderID string
items []OrderItem
total float64
createdAt time.Time
}
func (e *OrderEvent) ID() string { return e.id }
func (e *OrderEvent) Type() string { return "order.created" }
func (e *OrderEvent) Timestamp() time.Time { return e.createdAt }
func (e *OrderEvent) Data() any { return e }
func (e *OrderEvent) Metadata() map[string]string { return nil }Listeners process events. The interface is intentionally minimal:
type Listener interface {
ID() string // Unique identifier
Handle(event Event) error // Process an event
OnError(event Event, err error) error // Handle errors from Handle()
}Use BaseListener for quick listener creation:
// Simple listener
listener := shared.NewBaseListener("order-processor", func(event shared.Event) error {
order := event.Data().(map[string]any)
fmt.Printf("Processing order: %v
", order)
return nil
})
// Listener with error handler
listener := shared.NewBaseListenerWithErrorHandler(
"payment-processor",
func(event shared.Event) error {
return processPayment(event)
},
func(event shared.Event, err error) error {
log.Printf("Payment failed for %s: %v", event.ID(), err)
return nil // Suppress error propagation
},
)Subscriptions represent active connections between listeners and event sources:
type Subscription interface {
ID() string // Subscription identifier
Topic() string // Topic or event type subscribed to
Listener() Listener // The subscribed listener
Unsubscribe() error // Remove subscription
Active() bool // Check if subscription is active
}Subscriptions are returned by Subscribe methods and can be used to unsubscribe:
sub := emitter.Subscribe("user.created", listener)
// Later, to unsubscribe
if err := sub.Unsubscribe(); err != nil {
log.Printf("Unsubscribe failed: %v", err)
}
// Check if still active
if sub.Active() {
// Still receiving events
}Middleware provides hooks for cross-cutting concerns like logging, tracing, and validation:
type Middleware interface {
Before(event Event) error // Called before processing
After(event Event, err error) error // Called after processing
}Use MiddlewareFunc for inline middleware:
// Logging middleware
loggingMW := shared.MiddlewareFunc{
BeforeFunc: func(event shared.Event) error {
log.Printf("Processing: %s (%s)", event.ID(), event.Type())
return nil
},
AfterFunc: func(event shared.Event, err error) error {
if err != nil {
log.Printf("Failed: %s - %v", event.ID(), err)
}
return nil
},
}
// Validation middleware
validationMW := shared.MiddlewareFunc{
BeforeFunc: func(event shared.Event) error {
if event.Data() == nil {
return errors.New("event data cannot be nil")
}
return nil
},
}
emitter.Middleware(loggingMW, validationMW)Nova validates events before processing. The default validator checks for:
// Default validation
err := shared.DefaultEventValidator.Validate(event)
// Custom validator
customValidator := shared.EventValidatorFunc(func(event shared.Event) error {
if event.Type() != "user.created" && event.Type() != "user.updated" {
return shared.NewValidationError("type", event.Type(), "unsupported event type")
}
return nil
})Nova provides structured error types for debugging and handling:
var (
ErrEventNotFound // Event not found in store
ErrInvalidEvent // Event failed validation
ErrListenerNotFound // Listener not registered
ErrTopicNotFound // Topic does not exist
ErrSubscriptionNotFound // Subscription not found
ErrEmitterClosed // Emitter has been shut down
ErrBusClosed // Bus has been shut down
ErrBufferFull // Buffer cannot accept more events
ErrRetryLimitExceeded // All retry attempts failed
ErrTimeout // Operation timed out
)// EventError includes event context
eventErr := shared.NewEventError(event, originalErr)
fmt.Println(eventErr) // "event error [id=123, type=user.created]: original error"
// ListenerError includes listener context
listenerErr := shared.NewListenerError("my-listener", event, originalErr)
fmt.Println(listenerErr) // "listener error [id=my-listener, event=123]: original error"
// ValidationError includes field details
validErr := shared.NewValidationError("email", "invalid@", "invalid email format")
fmt.Println(validErr) // "validation error [field=email]: invalid email format"
// Unwrap to get original error
if errors.Is(eventErr, originalErr) {
// Handle specific error type
}Nova emits metrics through the MetricsCollector interface:
type MetricsCollector interface {
IncEventsEmitted(eventType, result string)
IncEventsProcessed(listenerID, result string)
ObserveListenerDuration(listenerID string, duration time.Duration)
ObserveStoreAppendDuration(duration time.Duration)
ObserveStoreReadDuration(duration time.Duration)
SetQueueSize(component string, size int)
SetActiveListeners(emitterID string, count int)
IncEventsDropped(component, reason string)
}For testing and development:
metrics := shared.NewSimpleMetricsCollector()
emitter := emitter.New(emitter.Config{
WorkerPool: pool,
MetricsCollector: metrics,
})
// Check metrics
fmt.Printf("Events emitted: %d
", metrics.GetEventsEmitted())
fmt.Printf("Events processed: %d
", metrics.GetEventsProcessed())Used by default when no collector is provided:
// Silently ignores all metrics
var _ MetricsCollector = NoOpMetricsCollector{}Create a custom collector that wraps Prometheus metrics:
type PrometheusMetrics struct {
eventsEmitted *prometheus.CounterVec
eventDuration *prometheus.HistogramVec
// ... other metrics
}
func (p *PrometheusMetrics) IncEventsEmitted(eventType, result string) {
p.eventsEmitted.WithLabelValues(eventType, result).Inc()
}
// Implement remaining interface methods...The shared package follows these principles:
type Event interface {
ID() string // Unique identifier
Type() string // Event type (e.g., "user.created")
Timestamp() time.Time // When the event occurred
Data() any // Event payload
Metadata() map[string]string // Routing, tracing, and context
}// Create a simple event
event := shared.NewBaseEvent("user-123", "user.created", map[string]any{
"name": "Alice",
"email": "alice@example.com",
})
// Create an event with metadata
event := shared.NewBaseEventWithMetadata(
"order-456",
"order.completed",
orderData,
map[string]string{
"customer_id": "cust-789",
"trace_id": "abc123",
},
)
// Add metadata after creation
event.SetMetadata("partition_key", "us-west-2")
// Read metadata
if traceID, ok := event.GetMetadata("trace_id"); ok {
// Use trace ID
}type OrderEvent struct {
id string
orderID string
items []OrderItem
total float64
createdAt time.Time
}
func (e *OrderEvent) ID() string { return e.id }
func (e *OrderEvent) Type() string { return "order.created" }
func (e *OrderEvent) Timestamp() time.Time { return e.createdAt }
func (e *OrderEvent) Data() any { return e }
func (e *OrderEvent) Metadata() map[string]string { return nil }type Listener interface {
ID() string // Unique identifier
Handle(event Event) error // Process an event
OnError(event Event, err error) error // Handle errors from Handle()
}// Simple listener
listener := shared.NewBaseListener("order-processor", func(event shared.Event) error {
order := event.Data().(map[string]any)
fmt.Printf("Processing order: %v
", order)
return nil
})
// Listener with error handler
listener := shared.NewBaseListenerWithErrorHandler(
"payment-processor",
func(event shared.Event) error {
return processPayment(event)
},
func(event shared.Event, err error) error {
log.Printf("Payment failed for %s: %v", event.ID(), err)
return nil // Suppress error propagation
},
)type Subscription interface {
ID() string // Subscription identifier
Topic() string // Topic or event type subscribed to
Listener() Listener // The subscribed listener
Unsubscribe() error // Remove subscription
Active() bool // Check if subscription is active
}sub := emitter.Subscribe("user.created", listener)
// Later, to unsubscribe
if err := sub.Unsubscribe(); err != nil {
log.Printf("Unsubscribe failed: %v", err)
}
// Check if still active
if sub.Active() {
// Still receiving events
}type Middleware interface {
Before(event Event) error // Called before processing
After(event Event, err error) error // Called after processing
}// Logging middleware
loggingMW := shared.MiddlewareFunc{
BeforeFunc: func(event shared.Event) error {
log.Printf("Processing: %s (%s)", event.ID(), event.Type())
return nil
},
AfterFunc: func(event shared.Event, err error) error {
if err != nil {
log.Printf("Failed: %s - %v", event.ID(), err)
}
return nil
},
}
// Validation middleware
validationMW := shared.MiddlewareFunc{
BeforeFunc: func(event shared.Event) error {
if event.Data() == nil {
return errors.New("event data cannot be nil")
}
return nil
},
}
emitter.Middleware(loggingMW, validationMW)// Default validation
err := shared.DefaultEventValidator.Validate(event)
// Custom validator
customValidator := shared.EventValidatorFunc(func(event shared.Event) error {
if event.Type() != "user.created" && event.Type() != "user.updated" {
return shared.NewValidationError("type", event.Type(), "unsupported event type")
}
return nil
})var (
ErrEventNotFound // Event not found in store
ErrInvalidEvent // Event failed validation
ErrListenerNotFound // Listener not registered
ErrTopicNotFound // Topic does not exist
ErrSubscriptionNotFound // Subscription not found
ErrEmitterClosed // Emitter has been shut down
ErrBusClosed // Bus has been shut down
ErrBufferFull // Buffer cannot accept more events
ErrRetryLimitExceeded // All retry attempts failed
ErrTimeout // Operation timed out
)// EventError includes event context
eventErr := shared.NewEventError(event, originalErr)
fmt.Println(eventErr) // "event error [id=123, type=user.created]: original error"
// ListenerError includes listener context
listenerErr := shared.NewListenerError("my-listener", event, originalErr)
fmt.Println(listenerErr) // "listener error [id=my-listener, event=123]: original error"
// ValidationError includes field details
validErr := shared.NewValidationError("email", "invalid@", "invalid email format")
fmt.Println(validErr) // "validation error [field=email]: invalid email format"
// Unwrap to get original error
if errors.Is(eventErr, originalErr) {
// Handle specific error type
}type MetricsCollector interface {
IncEventsEmitted(eventType, result string)
IncEventsProcessed(listenerID, result string)
ObserveListenerDuration(listenerID string, duration time.Duration)
ObserveStoreAppendDuration(duration time.Duration)
ObserveStoreReadDuration(duration time.Duration)
SetQueueSize(component string, size int)
SetActiveListeners(emitterID string, count int)
IncEventsDropped(component, reason string)
}metrics := shared.NewSimpleMetricsCollector()
emitter := emitter.New(emitter.Config{
WorkerPool: pool,
MetricsCollector: metrics,
})
// Check metrics
fmt.Printf("Events emitted: %d
", metrics.GetEventsEmitted())
fmt.Printf("Events processed: %d
", metrics.GetEventsProcessed())// Silently ignores all metrics
var _ MetricsCollector = NoOpMetricsCollector{}type PrometheusMetrics struct {
eventsEmitted *prometheus.CounterVec
eventDuration *prometheus.HistogramVec
// ... other metrics
}
func (p *PrometheusMetrics) IncEventsEmitted(eventType, result string) {
p.eventsEmitted.WithLabelValues(eventType, result).Inc()
}
// Implement remaining interface methods...type Event interface {
ID() string // Unique identifier
Type() string // Event type (e.g., "user.created")
Timestamp() time.Time // When the event occurred
Data() any // Event payload
Metadata() map[string]string // Routing, tracing, and context
}// Create a simple event
event := shared.NewBaseEvent("user-123", "user.created", map[string]any{
"name": "Alice",
"email": "alice@example.com",
})
// Create an event with metadata
event := shared.NewBaseEventWithMetadata(
"order-456",
"order.completed",
orderData,
map[string]string{
"customer_id": "cust-789",
"trace_id": "abc123",
},
)
// Add metadata after creation
event.SetMetadata("partition_key", "us-west-2")
// Read metadata
if traceID, ok := event.GetMetadata("trace_id"); ok {
// Use trace ID
}type OrderEvent struct {
id string
orderID string
items []OrderItem
total float64
createdAt time.Time
}
func (e *OrderEvent) ID() string { return e.id }
func (e *OrderEvent) Type() string { return "order.created" }
func (e *OrderEvent) Timestamp() time.Time { return e.createdAt }
func (e *OrderEvent) Data() any { return e }
func (e *OrderEvent) Metadata() map[string]string { return nil }type Listener interface {
ID() string // Unique identifier
Handle(event Event) error // Process an event
OnError(event Event, err error) error // Handle errors from Handle()
}// Simple listener
listener := shared.NewBaseListener("order-processor", func(event shared.Event) error {
order := event.Data().(map[string]any)
fmt.Printf("Processing order: %v
", order)
return nil
})
// Listener with error handler
listener := shared.NewBaseListenerWithErrorHandler(
"payment-processor",
func(event shared.Event) error {
return processPayment(event)
},
func(event shared.Event, err error) error {
log.Printf("Payment failed for %s: %v", event.ID(), err)
return nil // Suppress error propagation
},
)type Subscription interface {
ID() string // Subscription identifier
Topic() string // Topic or event type subscribed to
Listener() Listener // The subscribed listener
Unsubscribe() error // Remove subscription
Active() bool // Check if subscription is active
}sub := emitter.Subscribe("user.created", listener)
// Later, to unsubscribe
if err := sub.Unsubscribe(); err != nil {
log.Printf("Unsubscribe failed: %v", err)
}
// Check if still active
if sub.Active() {
// Still receiving events
}type Middleware interface {
Before(event Event) error // Called before processing
After(event Event, err error) error // Called after processing
}// Logging middleware
loggingMW := shared.MiddlewareFunc{
BeforeFunc: func(event shared.Event) error {
log.Printf("Processing: %s (%s)", event.ID(), event.Type())
return nil
},
AfterFunc: func(event shared.Event, err error) error {
if err != nil {
log.Printf("Failed: %s - %v", event.ID(), err)
}
return nil
},
}
// Validation middleware
validationMW := shared.MiddlewareFunc{
BeforeFunc: func(event shared.Event) error {
if event.Data() == nil {
return errors.New("event data cannot be nil")
}
return nil
},
}
emitter.Middleware(loggingMW, validationMW)// Default validation
err := shared.DefaultEventValidator.Validate(event)
// Custom validator
customValidator := shared.EventValidatorFunc(func(event shared.Event) error {
if event.Type() != "user.created" && event.Type() != "user.updated" {
return shared.NewValidationError("type", event.Type(), "unsupported event type")
}
return nil
})var (
ErrEventNotFound // Event not found in store
ErrInvalidEvent // Event failed validation
ErrListenerNotFound // Listener not registered
ErrTopicNotFound // Topic does not exist
ErrSubscriptionNotFound // Subscription not found
ErrEmitterClosed // Emitter has been shut down
ErrBusClosed // Bus has been shut down
ErrBufferFull // Buffer cannot accept more events
ErrRetryLimitExceeded // All retry attempts failed
ErrTimeout // Operation timed out
)// EventError includes event context
eventErr := shared.NewEventError(event, originalErr)
fmt.Println(eventErr) // "event error [id=123, type=user.created]: original error"
// ListenerError includes listener context
listenerErr := shared.NewListenerError("my-listener", event, originalErr)
fmt.Println(listenerErr) // "listener error [id=my-listener, event=123]: original error"
// ValidationError includes field details
validErr := shared.NewValidationError("email", "invalid@", "invalid email format")
fmt.Println(validErr) // "validation error [field=email]: invalid email format"
// Unwrap to get original error
if errors.Is(eventErr, originalErr) {
// Handle specific error type
}type MetricsCollector interface {
IncEventsEmitted(eventType, result string)
IncEventsProcessed(listenerID, result string)
ObserveListenerDuration(listenerID string, duration time.Duration)
ObserveStoreAppendDuration(duration time.Duration)
ObserveStoreReadDuration(duration time.Duration)
SetQueueSize(component string, size int)
SetActiveListeners(emitterID string, count int)
IncEventsDropped(component, reason string)
}metrics := shared.NewSimpleMetricsCollector()
emitter := emitter.New(emitter.Config{
WorkerPool: pool,
MetricsCollector: metrics,
})
// Check metrics
fmt.Printf("Events emitted: %d
", metrics.GetEventsEmitted())
fmt.Printf("Events processed: %d
", metrics.GetEventsProcessed())// Silently ignores all metrics
var _ MetricsCollector = NoOpMetricsCollector{}type PrometheusMetrics struct {
eventsEmitted *prometheus.CounterVec
eventDuration *prometheus.HistogramVec
// ... other metrics
}
func (p *PrometheusMetrics) IncEventsEmitted(eventType, result string) {
p.eventsEmitted.WithLabelValues(eventType, result).Inc()
}
// Implement remaining interface methods...