Loading documentation...
Loading documentation...
Loading documentation...
Complete API documentation for the shared package.
Import Path: github.com/kolosys/nova/shared
ErrEventNotFound, ErrInvalidEvent, ErrListenerNotFound, ErrTopicNotFound, ErrSubscriptionNotFound, ErrEmitterClosed, ErrBusClosed, ErrStoreReadOnly, ErrBufferFull, ErrRetryLimitExceeded, ErrTimeout
Common error types for Nova event system
var ErrEventNotFound = errors.New("event not found") // ErrEventNotFound indicates an event was not found in the store
var ErrInvalidEvent = errors.New("invalid event") // ErrInvalidEvent indicates an event failed validation
var ErrListenerNotFound = errors.New("listener not found") // ErrListenerNotFound indicates a listener was not found
var ErrTopicNotFound = errors.New("topic not found") // ErrTopicNotFound indicates a topic was not found
var ErrSubscriptionNotFound = errors.New("subscription not found") // ErrSubscriptionNotFound indicates a subscription was not found
var ErrEmitterClosed = errors.New("emitter is closed") // ErrEmitterClosed indicates the emitter has been closed
var ErrBusClosed = errors.New("bus is closed") // ErrBusClosed indicates the bus has been closed
var ErrStoreReadOnly = errors.New("store is read-only") // ErrStoreReadOnly indicates the store is in read-only mode
var ErrBufferFull = errors.New("buffer is full") // ErrBufferFull indicates a buffer is full and cannot accept more events
var ErrRetryLimitExceeded = errors.New("retry limit exceeded") // ErrRetryLimitExceeded indicates retry attempts have been exhausted
var ErrTimeout = errors.New("operation timed out") // ErrTimeout indicates an operation timed out
DefaultEventValidator
DefaultEventValidator provides basic event validation
var DefaultEventValidator = EventValidatorFunc(func(event Event) error {
if event == nil {
return NewValidationError("event", nil, "event cannot be nil")
}
if event.ID() == "" {
return NewValidationError("id", event.ID(), "event ID cannot be empty")
}
if event.Type() == "" {
return NewValidationError("type", event.Type(), "event type cannot be empty")
}
if event.Timestamp().IsZero() {
return NewValidationError("timestamp", event.Timestamp(), "event timestamp cannot be zero")
}
return nil
})BaseEvent provides a default implementation of the Event interface
// Create a new BaseEvent
baseevent := BaseEvent{
}type BaseEvent struct {
}NewBaseEvent creates a new BaseEvent
func NewBaseEvent(id, eventType string, data any) *BaseEventParameters:
id (string)eventType (string)data (any)Returns:
NewBaseEventWithMetadata creates a new BaseEvent with metadata
func NewBaseEventWithMetadata(id, eventType string, data any, metadata map[string]string) *BaseEventParameters:
id (string)eventType (string)data (any)metadata (map[string]string)Returns:
Data returns the event data
func (*BaseEvent) Data() anyParameters: None
Returns:
GetMetadata gets a metadata value by key
func (*BaseEvent) GetMetadata(key string) (string, bool)Parameters:
key (string)Returns:
ID returns the event ID
func (*BaseListener) ID() stringParameters: None
Returns:
Metadata returns the event metadata
func (*BaseEvent) Metadata() map[string]stringParameters: None
Returns:
SetMetadata sets a metadata key-value pair
func (*BaseEvent) SetMetadata(key, value string)Parameters:
key (string)value (string)Returns: None
Timestamp returns the event timestamp
func (*BaseEvent) Timestamp() time.TimeParameters: None
Returns:
Type returns the event type
func (*BaseEvent) Type() stringParameters: None
Returns:
BaseListener provides a basic implementation of Listener
// Create a new BaseListener
baselistener := BaseListener{
}type BaseListener struct {
}NewBaseListener creates a new BaseListener
func NewBaseListener(id string, handler func(event Event) error) *BaseListenerParameters:
id (string)handler (func(event Event) error)Returns:
NewBaseListenerWithErrorHandler creates a new BaseListener with error handling
func NewBaseListenerWithErrorHandler(id string, handler func(event Event) error, errorHandler func(event Event, err error) error) *BaseListenerParameters:
id (string)handler (func(event Event) error)errorHandler (func(event Event, err error) error)Returns:
Handle processes an event
func (*BaseListener) Handle(event Event) errorParameters:
event (Event)Returns:
ID returns the listener ID
func (*BaseListener) ID() stringParameters: None
Returns:
OnError handles errors
func (*BaseListener) OnError(event Event, err error) errorParameters:
event (Event)err (error)Returns:
BaseSubscription provides a default implementation of Subscription
// Create a new BaseSubscription
basesubscription := BaseSubscription{
}type BaseSubscription struct {
}NewBaseSubscription creates a new BaseSubscription
func NewBaseSubscription(id, topic string, listener Listener) *BaseSubscriptionParameters:
id (string)topic (string)listener (Listener)Returns:
NewBaseSubscriptionWithCallback creates a new BaseSubscription with a close callback
func NewBaseSubscriptionWithCallback(id, topic string, listener Listener, onClose func()) *BaseSubscriptionParameters:
id (string)topic (string)listener (Listener)onClose (func())Returns:
Active returns whether this subscription is active
func (*BaseSubscription) Active() boolParameters: None
Returns:
ID returns the subscription ID
func (*BaseEvent) ID() stringParameters: None
Returns:
Listener returns the listener
func (*BaseSubscription) Listener() ListenerParameters: None
Returns:
Topic returns the topic
func (*BaseSubscription) Topic() stringParameters: None
Returns:
Unsubscribe removes this subscription
func (*BaseSubscription) Unsubscribe() errorParameters: None
Returns:
Event represents a domain event in the Nova system
// Example implementation of Event
type MyEvent struct {
// Add your fields here
}
func (m MyEvent) ID() string {
// Implement your logic here
return
}
func (m MyEvent) Type() string {
// Implement your logic here
return
}
func (m MyEvent) Timestamp() time.Time {
// Implement your logic here
return
}
func (m MyEvent) Data() any {
// Implement your logic here
return
}
func (m MyEvent) Metadata() map[string]string {
// Implement your logic here
return
}
type Event interface {
ID() string
Type() string
Timestamp() time.Time
Data() any
Metadata() map[string]string
}| Method | Description |
|---|
EventError wraps an error with event context
// Create a new EventError
eventerror := EventError{
Event: Event{},
Err: error{},
}type EventError struct {
Event Event
Err error
}| Field | Type | Description |
|---|---|---|
| Event | Event | |
| Err | error |
NewEventError creates a new EventError
func NewEventError(event Event, err error) *EventErrorParameters:
event (Event)err (error)Returns:
func (*ValidationError) Error() stringParameters: None
Returns:
func (*ListenerError) Unwrap() errorParameters: None
Returns:
EventValidator validates events before processing
// Example implementation of EventValidator
type MyEventValidator struct {
// Add your fields here
}
func (m MyEventValidator) Validate(param1 Event) error {
// Implement your logic here
return
}
type EventValidator interface {
Validate(event Event) error
}| Method | Description |
|---|
EventValidatorFunc is a function adapter for EventValidator
// Example usage of EventValidatorFunc
var value EventValidatorFunc
// Initialize with appropriate valuetype EventValidatorFunc func(event Event) errorValidate implements EventValidator
func (EventValidatorFunc) Validate(event Event) errorParameters:
event (Event)Returns:
Listener represents an event listener
// Example implementation of Listener
type MyListener struct {
// Add your fields here
}
func (m MyListener) ID() string {
// Implement your logic here
return
}
func (m MyListener) Handle(param1 Event) error {
// Implement your logic here
return
}
func (m MyListener) OnError(param1 Event, param2 error) error {
// Implement your logic here
return
}
type Listener interface {
ID() string
Handle(event Event) error
OnError(event Event, err error) error
}| Method | Description |
|---|
ListenerError wraps an error with listener context
// Create a new ListenerError
listenererror := ListenerError{
ListenerID: "example",
Event: Event{},
Err: error{},
}type ListenerError struct {
ListenerID string
Event Event
Err error
}| Field | Type | Description |
|---|---|---|
| ListenerID | string | |
| Event | Event | |
| Err | error |
NewListenerError creates a new ListenerError
func NewListenerError(listenerID string, event Event, err error) *ListenerErrorParameters:
listenerID (string)event (Event)err (error)Returns:
func (*ValidationError) Error() stringParameters: None
Returns:
func (*ListenerError) Unwrap() errorParameters: None
Returns:
MetricsCollector defines the interface for collecting Nova metrics
// Example implementation of MetricsCollector
type MyMetricsCollector struct {
// Add your fields here
}
func (m MyMetricsCollector) IncEventsEmitted(param1 string, param2 string) {
// Implement your logic here
return
}
func (m MyMetricsCollector) IncEventsProcessed(param1 string, param2 string) {
// Implement your logic here
return
}
func (m MyMetricsCollector) ObserveListenerDuration(param1 string, param2 time.Duration) {
// Implement your logic here
return
}
func (m MyMetricsCollector) ObserveSagaDuration(param1 string, param2 string, param3 time.Duration) {
// Implement your logic here
return
}
func (m MyMetricsCollector) ObserveStoreAppendDuration(param1 time.Duration) {
// Implement your logic here
return
}
func (m MyMetricsCollector) ObserveStoreReadDuration(param1 time.Duration) {
// Implement your logic here
return
}
func (m MyMetricsCollector) SetQueueSize(param1 string, param2 int) {
// Implement your logic here
return
}
func (m MyMetricsCollector) SetActiveListeners(param1 string, param2 int) {
// Implement your logic here
return
}
func (m MyMetricsCollector) IncEventsDropped(param1 string, param2 string) {
// Implement your logic here
return
}
type MetricsCollector interface {
IncEventsEmitted(eventType string, result string)
IncEventsProcessed(listenerID string, result string)
ObserveListenerDuration(listenerID string, duration time.Duration)
ObserveSagaDuration(sagaID string, step string, duration time.Duration)
ObserveStoreAppendDuration(duration time.Duration)
ObserveStoreReadDuration(duration time.Duration)
SetQueueSize(component string, size int)
SetActiveListeners(emitterID string, count int)
IncEventsDropped(component string, reason string)
}| Method | Description |
|---|
Middleware provides hooks for cross-cutting concerns
// Example implementation of Middleware
type MyMiddleware struct {
// Add your fields here
}
func (m MyMiddleware) Before(param1 Event) error {
// Implement your logic here
return
}
func (m MyMiddleware) After(param1 Event, param2 error) error {
// Implement your logic here
return
}
type Middleware interface {
Before(event Event) error
After(event Event, err error) error
}| Method | Description |
|---|
MiddlewareFunc is a function adapter for Middleware
// Create a new MiddlewareFunc
middlewarefunc := MiddlewareFunc{
BeforeFunc: /* value */,
AfterFunc: /* value */,
}type MiddlewareFunc struct {
BeforeFunc func(event Event) error
AfterFunc func(event Event, err error) error
}| Field | Type | Description |
|---|---|---|
| BeforeFunc | func(event Event) error | |
| AfterFunc | func(event Event, err error) error |
After implements Middleware
func (MiddlewareFunc) After(event Event, err error) errorParameters:
event (Event)err (error)Returns:
Before implements Middleware
func (MiddlewareFunc) Before(event Event) errorParameters:
event (Event)Returns:
NoOpMetricsCollector provides a no-op implementation for when metrics are disabled
// Create a new NoOpMetricsCollector
noopmetricscollector := NoOpMetricsCollector{
}type NoOpMetricsCollector struct {
}IncEventsDropped does nothing
func (*SimpleMetricsCollector) IncEventsDropped(component string, reason string)Parameters:
component (string)reason (string)Returns: None
IncEventsEmitted does nothing
func (*SimpleMetricsCollector) IncEventsEmitted(eventType string, result string)Parameters:
eventType (string)result (string)Returns: None
IncEventsProcessed does nothing
func (*SimpleMetricsCollector) IncEventsProcessed(listenerID string, result string)Parameters:
listenerID (string)result (string)Returns: None
ObserveListenerDuration does nothing
func (*SimpleMetricsCollector) ObserveListenerDuration(listenerID string, duration time.Duration)Parameters:
listenerID (string)duration (time.Duration)Returns: None
ObserveSagaDuration does nothing
func (*SimpleMetricsCollector) ObserveSagaDuration(sagaID string, step string, duration time.Duration)Parameters:
sagaID (string)step (string)duration (time.Duration)Returns: None
ObserveStoreAppendDuration does nothing
func (*SimpleMetricsCollector) ObserveStoreAppendDuration(duration time.Duration)Parameters:
duration (time.Duration)Returns: None
ObserveStoreReadDuration does nothing
func (*SimpleMetricsCollector) ObserveStoreReadDuration(duration time.Duration)Parameters:
duration (time.Duration)Returns: None
SetActiveListeners does nothing
func (*SimpleMetricsCollector) SetActiveListeners(emitterID string, count int)Parameters:
emitterID (string)count (int)Returns: None
SetQueueSize does nothing
func (*SimpleMetricsCollector) SetQueueSize(component string, size int)Parameters:
component (string)size (int)Returns: None
SimpleMetricsCollector provides a basic in-memory metrics collector for testing
// Create a new SimpleMetricsCollector
simplemetricscollector := SimpleMetricsCollector{
EventsEmitted: 42,
EventsProcessed: 42,
EventsDropped: 42,
ListenerDurations: 42,
SagaDurations: 42,
StoreAppends: 42,
StoreReads: 42,
QueueSizes: map[],
ActiveListeners: map[],
}type SimpleMetricsCollector struct {
EventsEmitted int64
EventsProcessed int64
EventsDropped int64
ListenerDurations int64
SagaDurations int64
StoreAppends int64
StoreReads int64
QueueSizes map[string]int64
ActiveListeners map[string]int64
}| Field | Type | Description |
|---|---|---|
| EventsEmitted | int64 | |
| EventsProcessed | int64 | |
| EventsDropped | int64 | |
| ListenerDurations | int64 | |
| SagaDurations | int64 | |
| StoreAppends | int64 | |
| StoreReads | int64 | |
| QueueSizes | map[string]int64 | |
| ActiveListeners | map[string]int64 |
NewSimpleMetricsCollector creates a new SimpleMetricsCollector
func NewSimpleMetricsCollector() *SimpleMetricsCollectorParameters: None
Returns:
GetEventsDropped returns the current events dropped count
func (*SimpleMetricsCollector) GetEventsDropped() int64Parameters: None
Returns:
GetEventsEmitted returns the current events emitted count
func (*SimpleMetricsCollector) GetEventsEmitted() int64Parameters: None
Returns:
GetEventsProcessed returns the current events processed count
func (*SimpleMetricsCollector) GetEventsProcessed() int64Parameters: None
Returns:
IncEventsDropped increments dropped events counter
func (*SimpleMetricsCollector) IncEventsDropped(component string, reason string)Parameters:
component (string)reason (string)Returns: None
IncEventsEmitted increments events emitted counter
func (*SimpleMetricsCollector) IncEventsEmitted(eventType string, result string)Parameters:
eventType (string)result (string)Returns: None
IncEventsProcessed increments events processed counter
func (*SimpleMetricsCollector) IncEventsProcessed(listenerID string, result string)Parameters:
listenerID (string)result (string)Returns: None
ObserveListenerDuration records listener processing duration
func (*SimpleMetricsCollector) ObserveListenerDuration(listenerID string, duration time.Duration)Parameters:
listenerID (string)duration (time.Duration)Returns: None
ObserveSagaDuration records saga step duration
func (*SimpleMetricsCollector) ObserveSagaDuration(sagaID string, step string, duration time.Duration)Parameters:
sagaID (string)step (string)duration (time.Duration)Returns: None
ObserveStoreAppendDuration records store append duration
func (*SimpleMetricsCollector) ObserveStoreAppendDuration(duration time.Duration)Parameters:
duration (time.Duration)Returns: None
ObserveStoreReadDuration records store read duration
func (*SimpleMetricsCollector) ObserveStoreReadDuration(duration time.Duration)Parameters:
duration (time.Duration)Returns: None
SetActiveListeners sets current active listeners count
func (*SimpleMetricsCollector) SetActiveListeners(emitterID string, count int)Parameters:
emitterID (string)count (int)Returns: None
SetQueueSize sets current queue size
func (*SimpleMetricsCollector) SetQueueSize(component string, size int)Parameters:
component (string)size (int)Returns: None
Subscription represents an active subscription to events
// Example implementation of Subscription
type MySubscription struct {
// Add your fields here
}
func (m MySubscription) ID() string {
// Implement your logic here
return
}
func (m MySubscription) Topic() string {
// Implement your logic here
return
}
func (m MySubscription) Listener() Listener {
// Implement your logic here
return
}
func (m MySubscription) Unsubscribe() error {
// Implement your logic here
return
}
func (m MySubscription) Active() bool {
// Implement your logic here
return
}
type Subscription interface {
ID() string
Topic() string
Listener() Listener
Unsubscribe() error
Active() bool
}| Method | Description |
|---|
ValidationError indicates a validation failure
// Create a new ValidationError
validationerror := ValidationError{
Field: "example",
Value: any{},
Message: "example",
}type ValidationError struct {
Field string
Value any
Message string
}| Field | Type | Description |
|---|---|---|
| Field | string | |
| Value | any | |
| Message | string |
NewValidationError creates a new ValidationError
func NewValidationError(field string, value any, message string) *ValidationErrorParameters:
field (string)value (any)message (string)Returns:
func (*ValidationError) Error() stringParameters: None
Returns:
Complete API documentation for the shared package.
Import Path: github.com/kolosys/nova/shared
ErrEventNotFound, ErrInvalidEvent, ErrListenerNotFound, ErrTopicNotFound, ErrSubscriptionNotFound, ErrEmitterClosed, ErrBusClosed, ErrStoreReadOnly, ErrBufferFull, ErrRetryLimitExceeded, ErrTimeout
Common error types for Nova event system
var ErrEventNotFound = errors.New("event not found") // ErrEventNotFound indicates an event was not found in the store
var ErrInvalidEvent = errors.New("invalid event") // ErrInvalidEvent indicates an event failed validation
var ErrListenerNotFound = errors.New("listener not found") // ErrListenerNotFound indicates a listener was not found
var ErrTopicNotFound = errors.New("topic not found") // ErrTopicNotFound indicates a topic was not found
var ErrSubscriptionNotFound = errors.New("subscription not found") // ErrSubscriptionNotFound indicates a subscription was not found
var ErrEmitterClosed = errors.New("emitter is closed") // ErrEmitterClosed indicates the emitter has been closed
var ErrBusClosed = errors.New("bus is closed") // ErrBusClosed indicates the bus has been closed
var ErrStoreReadOnly = errors.New("store is read-only") // ErrStoreReadOnly indicates the store is in read-only mode
var ErrBufferFull = errors.New("buffer is full") // ErrBufferFull indicates a buffer is full and cannot accept more events
var ErrRetryLimitExceeded = errors.New("retry limit exceeded") // ErrRetryLimitExceeded indicates retry attempts have been exhausted
var ErrTimeout = errors.New("operation timed out") // ErrTimeout indicates an operation timed out
DefaultEventValidator
DefaultEventValidator provides basic event validation
var DefaultEventValidator = EventValidatorFunc(func(event Event) error {
if event == nil {
return NewValidationError("event", nil, "event cannot be nil")
}
if event.ID() == "" {
return NewValidationError("id", event.ID(), "event ID cannot be empty")
}
if event.Type() == "" {
return NewValidationError("type", event.Type(), "event type cannot be empty")
}
if event.Timestamp().IsZero() {
return NewValidationError("timestamp", event.Timestamp(), "event timestamp cannot be zero")
}
return nil
})BaseEvent provides a default implementation of the Event interface
// Create a new BaseEvent
baseevent := BaseEvent{
}type BaseEvent struct {
}NewBaseEvent creates a new BaseEvent
func NewBaseEvent(id, eventType string, data any) *BaseEventParameters:
id (string)eventType (string)data (any)Returns:
NewBaseEventWithMetadata creates a new BaseEvent with metadata
func NewBaseEventWithMetadata(id, eventType string, data any, metadata map[string]string) *BaseEventParameters:
id (string)eventType (string)data (any)metadata (map[string]string)Returns:
Data returns the event data
func (*BaseEvent) Data() anyParameters: None
Returns:
GetMetadata gets a metadata value by key
func (*BaseEvent) GetMetadata(key string) (string, bool)Parameters:
key (string)Returns:
ID returns the event ID
func (*BaseListener) ID() stringParameters: None
Returns:
Metadata returns the event metadata
func (*BaseEvent) Metadata() map[string]stringParameters: None
Returns:
SetMetadata sets a metadata key-value pair
func (*BaseEvent) SetMetadata(key, value string)Parameters:
key (string)value (string)Returns: None
Timestamp returns the event timestamp
func (*BaseEvent) Timestamp() time.TimeParameters: None
Returns:
Type returns the event type
func (*BaseEvent) Type() stringParameters: None
Returns:
BaseListener provides a basic implementation of Listener
// Create a new BaseListener
baselistener := BaseListener{
}type BaseListener struct {
}NewBaseListener creates a new BaseListener
func NewBaseListener(id string, handler func(event Event) error) *BaseListenerParameters:
id (string)handler (func(event Event) error)Returns:
NewBaseListenerWithErrorHandler creates a new BaseListener with error handling
func NewBaseListenerWithErrorHandler(id string, handler func(event Event) error, errorHandler func(event Event, err error) error) *BaseListenerParameters:
id (string)handler (func(event Event) error)errorHandler (func(event Event, err error) error)Returns:
Handle processes an event
func (*BaseListener) Handle(event Event) errorParameters:
event (Event)Returns:
ID returns the listener ID
func (*BaseListener) ID() stringParameters: None
Returns:
OnError handles errors
func (*BaseListener) OnError(event Event, err error) errorParameters:
event (Event)err (error)Returns:
BaseSubscription provides a default implementation of Subscription
// Create a new BaseSubscription
basesubscription := BaseSubscription{
}type BaseSubscription struct {
}NewBaseSubscription creates a new BaseSubscription
func NewBaseSubscription(id, topic string, listener Listener) *BaseSubscriptionParameters:
id (string)topic (string)listener (Listener)Returns:
NewBaseSubscriptionWithCallback creates a new BaseSubscription with a close callback
func NewBaseSubscriptionWithCallback(id, topic string, listener Listener, onClose func()) *BaseSubscriptionParameters:
id (string)topic (string)listener (Listener)onClose (func())Returns:
Active returns whether this subscription is active
func (*BaseSubscription) Active() boolParameters: None
Returns:
ID returns the subscription ID
func (*BaseEvent) ID() stringParameters: None
Returns:
Listener returns the listener
func (*BaseSubscription) Listener() ListenerParameters: None
Returns:
Topic returns the topic
func (*BaseSubscription) Topic() stringParameters: None
Returns:
Unsubscribe removes this subscription
func (*BaseSubscription) Unsubscribe() errorParameters: None
Returns:
Event represents a domain event in the Nova system
// Example implementation of Event
type MyEvent struct {
// Add your fields here
}
func (m MyEvent) ID() string {
// Implement your logic here
return
}
func (m MyEvent) Type() string {
// Implement your logic here
return
}
func (m MyEvent) Timestamp() time.Time {
// Implement your logic here
return
}
func (m MyEvent) Data() any {
// Implement your logic here
return
}
func (m MyEvent) Metadata() map[string]string {
// Implement your logic here
return
}
type Event interface {
ID() string
Type() string
Timestamp() time.Time
Data() any
Metadata() map[string]string
}| Method | Description |
|---|
EventError wraps an error with event context
// Create a new EventError
eventerror := EventError{
Event: Event{},
Err: error{},
}type EventError struct {
Event Event
Err error
}| Field | Type | Description |
|---|---|---|
| Event | Event | |
| Err | error |
NewEventError creates a new EventError
func NewEventError(event Event, err error) *EventErrorParameters:
event (Event)err (error)Returns:
func (*ValidationError) Error() stringParameters: None
Returns:
func (*ListenerError) Unwrap() errorParameters: None
Returns:
EventValidator validates events before processing
// Example implementation of EventValidator
type MyEventValidator struct {
// Add your fields here
}
func (m MyEventValidator) Validate(param1 Event) error {
// Implement your logic here
return
}
type EventValidator interface {
Validate(event Event) error
}| Method | Description |
|---|
EventValidatorFunc is a function adapter for EventValidator
// Example usage of EventValidatorFunc
var value EventValidatorFunc
// Initialize with appropriate valuetype EventValidatorFunc func(event Event) errorValidate implements EventValidator
func (EventValidatorFunc) Validate(event Event) errorParameters:
event (Event)Returns:
Listener represents an event listener
// Example implementation of Listener
type MyListener struct {
// Add your fields here
}
func (m MyListener) ID() string {
// Implement your logic here
return
}
func (m MyListener) Handle(param1 Event) error {
// Implement your logic here
return
}
func (m MyListener) OnError(param1 Event, param2 error) error {
// Implement your logic here
return
}
type Listener interface {
ID() string
Handle(event Event) error
OnError(event Event, err error) error
}| Method | Description |
|---|
ListenerError wraps an error with listener context
// Create a new ListenerError
listenererror := ListenerError{
ListenerID: "example",
Event: Event{},
Err: error{},
}type ListenerError struct {
ListenerID string
Event Event
Err error
}| Field | Type | Description |
|---|---|---|
| ListenerID | string | |
| Event | Event | |
| Err | error |
NewListenerError creates a new ListenerError
func NewListenerError(listenerID string, event Event, err error) *ListenerErrorParameters:
listenerID (string)event (Event)err (error)Returns:
func (*ValidationError) Error() stringParameters: None
Returns:
func (*ListenerError) Unwrap() errorParameters: None
Returns:
MetricsCollector defines the interface for collecting Nova metrics
// Example implementation of MetricsCollector
type MyMetricsCollector struct {
// Add your fields here
}
func (m MyMetricsCollector) IncEventsEmitted(param1 string, param2 string) {
// Implement your logic here
return
}
func (m MyMetricsCollector) IncEventsProcessed(param1 string, param2 string) {
// Implement your logic here
return
}
func (m MyMetricsCollector) ObserveListenerDuration(param1 string, param2 time.Duration) {
// Implement your logic here
return
}
func (m MyMetricsCollector) ObserveSagaDuration(param1 string, param2 string, param3 time.Duration) {
// Implement your logic here
return
}
func (m MyMetricsCollector) ObserveStoreAppendDuration(param1 time.Duration) {
// Implement your logic here
return
}
func (m MyMetricsCollector) ObserveStoreReadDuration(param1 time.Duration) {
// Implement your logic here
return
}
func (m MyMetricsCollector) SetQueueSize(param1 string, param2 int) {
// Implement your logic here
return
}
func (m MyMetricsCollector) SetActiveListeners(param1 string, param2 int) {
// Implement your logic here
return
}
func (m MyMetricsCollector) IncEventsDropped(param1 string, param2 string) {
// Implement your logic here
return
}
type MetricsCollector interface {
IncEventsEmitted(eventType string, result string)
IncEventsProcessed(listenerID string, result string)
ObserveListenerDuration(listenerID string, duration time.Duration)
ObserveSagaDuration(sagaID string, step string, duration time.Duration)
ObserveStoreAppendDuration(duration time.Duration)
ObserveStoreReadDuration(duration time.Duration)
SetQueueSize(component string, size int)
SetActiveListeners(emitterID string, count int)
IncEventsDropped(component string, reason string)
}| Method | Description |
|---|
Middleware provides hooks for cross-cutting concerns
// Example implementation of Middleware
type MyMiddleware struct {
// Add your fields here
}
func (m MyMiddleware) Before(param1 Event) error {
// Implement your logic here
return
}
func (m MyMiddleware) After(param1 Event, param2 error) error {
// Implement your logic here
return
}
type Middleware interface {
Before(event Event) error
After(event Event, err error) error
}| Method | Description |
|---|
MiddlewareFunc is a function adapter for Middleware
// Create a new MiddlewareFunc
middlewarefunc := MiddlewareFunc{
BeforeFunc: /* value */,
AfterFunc: /* value */,
}type MiddlewareFunc struct {
BeforeFunc func(event Event) error
AfterFunc func(event Event, err error) error
}| Field | Type | Description |
|---|---|---|
| BeforeFunc | func(event Event) error | |
| AfterFunc | func(event Event, err error) error |
After implements Middleware
func (MiddlewareFunc) After(event Event, err error) errorParameters:
event (Event)err (error)Returns:
Before implements Middleware
func (MiddlewareFunc) Before(event Event) errorParameters:
event (Event)Returns:
NoOpMetricsCollector provides a no-op implementation for when metrics are disabled
// Create a new NoOpMetricsCollector
noopmetricscollector := NoOpMetricsCollector{
}type NoOpMetricsCollector struct {
}IncEventsDropped does nothing
func (*SimpleMetricsCollector) IncEventsDropped(component string, reason string)Parameters:
component (string)reason (string)Returns: None
IncEventsEmitted does nothing
func (*SimpleMetricsCollector) IncEventsEmitted(eventType string, result string)Parameters:
eventType (string)result (string)Returns: None
IncEventsProcessed does nothing
func (*SimpleMetricsCollector) IncEventsProcessed(listenerID string, result string)Parameters:
listenerID (string)result (string)Returns: None
ObserveListenerDuration does nothing
func (*SimpleMetricsCollector) ObserveListenerDuration(listenerID string, duration time.Duration)Parameters:
listenerID (string)duration (time.Duration)Returns: None
ObserveSagaDuration does nothing
func (*SimpleMetricsCollector) ObserveSagaDuration(sagaID string, step string, duration time.Duration)Parameters:
sagaID (string)step (string)duration (time.Duration)Returns: None
ObserveStoreAppendDuration does nothing
func (*SimpleMetricsCollector) ObserveStoreAppendDuration(duration time.Duration)Parameters:
duration (time.Duration)Returns: None
ObserveStoreReadDuration does nothing
func (*SimpleMetricsCollector) ObserveStoreReadDuration(duration time.Duration)Parameters:
duration (time.Duration)Returns: None
SetActiveListeners does nothing
func (*SimpleMetricsCollector) SetActiveListeners(emitterID string, count int)Parameters:
emitterID (string)count (int)Returns: None
SetQueueSize does nothing
func (*SimpleMetricsCollector) SetQueueSize(component string, size int)Parameters:
component (string)size (int)Returns: None
SimpleMetricsCollector provides a basic in-memory metrics collector for testing
// Create a new SimpleMetricsCollector
simplemetricscollector := SimpleMetricsCollector{
EventsEmitted: 42,
EventsProcessed: 42,
EventsDropped: 42,
ListenerDurations: 42,
SagaDurations: 42,
StoreAppends: 42,
StoreReads: 42,
QueueSizes: map[],
ActiveListeners: map[],
}type SimpleMetricsCollector struct {
EventsEmitted int64
EventsProcessed int64
EventsDropped int64
ListenerDurations int64
SagaDurations int64
StoreAppends int64
StoreReads int64
QueueSizes map[string]int64
ActiveListeners map[string]int64
}| Field | Type | Description |
|---|---|---|
| EventsEmitted | int64 | |
| EventsProcessed | int64 | |
| EventsDropped | int64 | |
| ListenerDurations | int64 | |
| SagaDurations | int64 | |
| StoreAppends | int64 | |
| StoreReads | int64 | |
| QueueSizes | map[string]int64 | |
| ActiveListeners | map[string]int64 |
NewSimpleMetricsCollector creates a new SimpleMetricsCollector
func NewSimpleMetricsCollector() *SimpleMetricsCollectorParameters: None
Returns:
GetEventsDropped returns the current events dropped count
func (*SimpleMetricsCollector) GetEventsDropped() int64Parameters: None
Returns:
GetEventsEmitted returns the current events emitted count
func (*SimpleMetricsCollector) GetEventsEmitted() int64Parameters: None
Returns:
GetEventsProcessed returns the current events processed count
func (*SimpleMetricsCollector) GetEventsProcessed() int64Parameters: None
Returns:
IncEventsDropped increments dropped events counter
func (*SimpleMetricsCollector) IncEventsDropped(component string, reason string)Parameters:
component (string)reason (string)Returns: None
IncEventsEmitted increments events emitted counter
func (*SimpleMetricsCollector) IncEventsEmitted(eventType string, result string)Parameters:
eventType (string)result (string)Returns: None
IncEventsProcessed increments events processed counter
func (*SimpleMetricsCollector) IncEventsProcessed(listenerID string, result string)Parameters:
listenerID (string)result (string)Returns: None
ObserveListenerDuration records listener processing duration
func (*SimpleMetricsCollector) ObserveListenerDuration(listenerID string, duration time.Duration)Parameters:
listenerID (string)duration (time.Duration)Returns: None
ObserveSagaDuration records saga step duration
func (*SimpleMetricsCollector) ObserveSagaDuration(sagaID string, step string, duration time.Duration)Parameters:
sagaID (string)step (string)duration (time.Duration)Returns: None
ObserveStoreAppendDuration records store append duration
func (*SimpleMetricsCollector) ObserveStoreAppendDuration(duration time.Duration)Parameters:
duration (time.Duration)Returns: None
ObserveStoreReadDuration records store read duration
func (*SimpleMetricsCollector) ObserveStoreReadDuration(duration time.Duration)Parameters:
duration (time.Duration)Returns: None
SetActiveListeners sets current active listeners count
func (*SimpleMetricsCollector) SetActiveListeners(emitterID string, count int)Parameters:
emitterID (string)count (int)Returns: None
SetQueueSize sets current queue size
func (*SimpleMetricsCollector) SetQueueSize(component string, size int)Parameters:
component (string)size (int)Returns: None
Subscription represents an active subscription to events
// Example implementation of Subscription
type MySubscription struct {
// Add your fields here
}
func (m MySubscription) ID() string {
// Implement your logic here
return
}
func (m MySubscription) Topic() string {
// Implement your logic here
return
}
func (m MySubscription) Listener() Listener {
// Implement your logic here
return
}
func (m MySubscription) Unsubscribe() error {
// Implement your logic here
return
}
func (m MySubscription) Active() bool {
// Implement your logic here
return
}
type Subscription interface {
ID() string
Topic() string
Listener() Listener
Unsubscribe() error
Active() bool
}| Method | Description |
|---|
ValidationError indicates a validation failure
// Create a new ValidationError
validationerror := ValidationError{
Field: "example",
Value: any{},
Message: "example",
}type ValidationError struct {
Field string
Value any
Message string
}| Field | Type | Description |
|---|---|---|
| Field | string | |
| Value | any | |
| Message | string |
NewValidationError creates a new ValidationError
func NewValidationError(field string, value any, message string) *ValidationErrorParameters:
field (string)value (any)message (string)Returns:
func (*ValidationError) Error() stringParameters: None
Returns:
var ErrEventNotFound = errors.New("event not found") // ErrEventNotFound indicates an event was not found in the store
var ErrInvalidEvent = errors.New("invalid event") // ErrInvalidEvent indicates an event failed validation
var ErrListenerNotFound = errors.New("listener not found") // ErrListenerNotFound indicates a listener was not found
var ErrTopicNotFound = errors.New("topic not found") // ErrTopicNotFound indicates a topic was not found
var ErrSubscriptionNotFound = errors.New("subscription not found") // ErrSubscriptionNotFound indicates a subscription was not found
var ErrEmitterClosed = errors.New("emitter is closed") // ErrEmitterClosed indicates the emitter has been closed
var ErrBusClosed = errors.New("bus is closed") // ErrBusClosed indicates the bus has been closed
var ErrStoreReadOnly = errors.New("store is read-only") // ErrStoreReadOnly indicates the store is in read-only mode
var ErrBufferFull = errors.New("buffer is full") // ErrBufferFull indicates a buffer is full and cannot accept more events
var ErrRetryLimitExceeded = errors.New("retry limit exceeded") // ErrRetryLimitExceeded indicates retry attempts have been exhausted
var ErrTimeout = errors.New("operation timed out") // ErrTimeout indicates an operation timed out
var DefaultEventValidator = EventValidatorFunc(func(event Event) error {
if event == nil {
return NewValidationError("event", nil, "event cannot be nil")
}
if event.ID() == "" {
return NewValidationError("id", event.ID(), "event ID cannot be empty")
}
if event.Type() == "" {
return NewValidationError("type", event.Type(), "event type cannot be empty")
}
if event.Timestamp().IsZero() {
return NewValidationError("timestamp", event.Timestamp(), "event timestamp cannot be zero")
}
return nil
})// Create a new BaseEvent
baseevent := BaseEvent{
}type BaseEvent struct {
}func NewBaseEvent(id, eventType string, data any) *BaseEventfunc NewBaseEventWithMetadata(id, eventType string, data any, metadata map[string]string) *BaseEventfunc (*BaseEvent) Data() anyfunc (*BaseEvent) GetMetadata(key string) (string, bool)func (*BaseListener) ID() stringfunc (*BaseEvent) Metadata() map[string]stringfunc (*BaseEvent) SetMetadata(key, value string)func (*BaseEvent) Timestamp() time.Timefunc (*BaseEvent) Type() string// Create a new BaseListener
baselistener := BaseListener{
}type BaseListener struct {
}func NewBaseListener(id string, handler func(event Event) error) *BaseListenerfunc NewBaseListenerWithErrorHandler(id string, handler func(event Event) error, errorHandler func(event Event, err error) error) *BaseListenerfunc (*BaseListener) Handle(event Event) errorfunc (*BaseListener) ID() stringfunc (*BaseListener) OnError(event Event, err error) error// Create a new BaseSubscription
basesubscription := BaseSubscription{
}type BaseSubscription struct {
}func NewBaseSubscription(id, topic string, listener Listener) *BaseSubscriptionfunc NewBaseSubscriptionWithCallback(id, topic string, listener Listener, onClose func()) *BaseSubscriptionfunc (*BaseSubscription) Active() boolfunc (*BaseEvent) ID() stringfunc (*BaseSubscription) Listener() Listenerfunc (*BaseSubscription) Topic() stringfunc (*BaseSubscription) Unsubscribe() error// Example implementation of Event
type MyEvent struct {
// Add your fields here
}
func (m MyEvent) ID() string {
// Implement your logic here
return
}
func (m MyEvent) Type() string {
// Implement your logic here
return
}
func (m MyEvent) Timestamp() time.Time {
// Implement your logic here
return
}
func (m MyEvent) Data() any {
// Implement your logic here
return
}
func (m MyEvent) Metadata() map[string]string {
// Implement your logic here
return
}
type Event interface {
ID() string
Type() string
Timestamp() time.Time
Data() any
Metadata() map[string]string
}// Create a new EventError
eventerror := EventError{
Event: Event{},
Err: error{},
}type EventError struct {
Event Event
Err error
}func NewEventError(event Event, err error) *EventErrorfunc (*ValidationError) Error() stringfunc (*ListenerError) Unwrap() error// Example implementation of EventValidator
type MyEventValidator struct {
// Add your fields here
}
func (m MyEventValidator) Validate(param1 Event) error {
// Implement your logic here
return
}
type EventValidator interface {
Validate(event Event) error
}// Example usage of EventValidatorFunc
var value EventValidatorFunc
// Initialize with appropriate valuetype EventValidatorFunc func(event Event) errorfunc (EventValidatorFunc) Validate(event Event) error// Example implementation of Listener
type MyListener struct {
// Add your fields here
}
func (m MyListener) ID() string {
// Implement your logic here
return
}
func (m MyListener) Handle(param1 Event) error {
// Implement your logic here
return
}
func (m MyListener) OnError(param1 Event, param2 error) error {
// Implement your logic here
return
}
type Listener interface {
ID() string
Handle(event Event) error
OnError(event Event, err error) error
}// Create a new ListenerError
listenererror := ListenerError{
ListenerID: "example",
Event: Event{},
Err: error{},
}type ListenerError struct {
ListenerID string
Event Event
Err error
}func NewListenerError(listenerID string, event Event, err error) *ListenerErrorfunc (*ValidationError) Error() stringfunc (*ListenerError) Unwrap() error// Example implementation of MetricsCollector
type MyMetricsCollector struct {
// Add your fields here
}
func (m MyMetricsCollector) IncEventsEmitted(param1 string, param2 string) {
// Implement your logic here
return
}
func (m MyMetricsCollector) IncEventsProcessed(param1 string, param2 string) {
// Implement your logic here
return
}
func (m MyMetricsCollector) ObserveListenerDuration(param1 string, param2 time.Duration) {
// Implement your logic here
return
}
func (m MyMetricsCollector) ObserveSagaDuration(param1 string, param2 string, param3 time.Duration) {
// Implement your logic here
return
}
func (m MyMetricsCollector) ObserveStoreAppendDuration(param1 time.Duration) {
// Implement your logic here
return
}
func (m MyMetricsCollector) ObserveStoreReadDuration(param1 time.Duration) {
// Implement your logic here
return
}
func (m MyMetricsCollector) SetQueueSize(param1 string, param2 int) {
// Implement your logic here
return
}
func (m MyMetricsCollector) SetActiveListeners(param1 string, param2 int) {
// Implement your logic here
return
}
func (m MyMetricsCollector) IncEventsDropped(param1 string, param2 string) {
// Implement your logic here
return
}
type MetricsCollector interface {
IncEventsEmitted(eventType string, result string)
IncEventsProcessed(listenerID string, result string)
ObserveListenerDuration(listenerID string, duration time.Duration)
ObserveSagaDuration(sagaID string, step string, duration time.Duration)
ObserveStoreAppendDuration(duration time.Duration)
ObserveStoreReadDuration(duration time.Duration)
SetQueueSize(component string, size int)
SetActiveListeners(emitterID string, count int)
IncEventsDropped(component string, reason string)
}// Example implementation of Middleware
type MyMiddleware struct {
// Add your fields here
}
func (m MyMiddleware) Before(param1 Event) error {
// Implement your logic here
return
}
func (m MyMiddleware) After(param1 Event, param2 error) error {
// Implement your logic here
return
}
type Middleware interface {
Before(event Event) error
After(event Event, err error) error
}// Create a new MiddlewareFunc
middlewarefunc := MiddlewareFunc{
BeforeFunc: /* value */,
AfterFunc: /* value */,
}type MiddlewareFunc struct {
BeforeFunc func(event Event) error
AfterFunc func(event Event, err error) error
}func (MiddlewareFunc) After(event Event, err error) errorfunc (MiddlewareFunc) Before(event Event) error// Create a new NoOpMetricsCollector
noopmetricscollector := NoOpMetricsCollector{
}type NoOpMetricsCollector struct {
}func (*SimpleMetricsCollector) IncEventsDropped(component string, reason string)func (*SimpleMetricsCollector) IncEventsEmitted(eventType string, result string)func (*SimpleMetricsCollector) IncEventsProcessed(listenerID string, result string)func (*SimpleMetricsCollector) ObserveListenerDuration(listenerID string, duration time.Duration)func (*SimpleMetricsCollector) ObserveSagaDuration(sagaID string, step string, duration time.Duration)func (*SimpleMetricsCollector) ObserveStoreAppendDuration(duration time.Duration)func (*SimpleMetricsCollector) ObserveStoreReadDuration(duration time.Duration)func (*SimpleMetricsCollector) SetActiveListeners(emitterID string, count int)func (*SimpleMetricsCollector) SetQueueSize(component string, size int)// Create a new SimpleMetricsCollector
simplemetricscollector := SimpleMetricsCollector{
EventsEmitted: 42,
EventsProcessed: 42,
EventsDropped: 42,
ListenerDurations: 42,
SagaDurations: 42,
StoreAppends: 42,
StoreReads: 42,
QueueSizes: map[],
ActiveListeners: map[],
}type SimpleMetricsCollector struct {
EventsEmitted int64
EventsProcessed int64
EventsDropped int64
ListenerDurations int64
SagaDurations int64
StoreAppends int64
StoreReads int64
QueueSizes map[string]int64
ActiveListeners map[string]int64
}func NewSimpleMetricsCollector() *SimpleMetricsCollectorfunc (*SimpleMetricsCollector) GetEventsDropped() int64func (*SimpleMetricsCollector) GetEventsEmitted() int64func (*SimpleMetricsCollector) GetEventsProcessed() int64func (*SimpleMetricsCollector) IncEventsDropped(component string, reason string)func (*SimpleMetricsCollector) IncEventsEmitted(eventType string, result string)func (*SimpleMetricsCollector) IncEventsProcessed(listenerID string, result string)func (*SimpleMetricsCollector) ObserveListenerDuration(listenerID string, duration time.Duration)func (*SimpleMetricsCollector) ObserveSagaDuration(sagaID string, step string, duration time.Duration)func (*SimpleMetricsCollector) ObserveStoreAppendDuration(duration time.Duration)func (*SimpleMetricsCollector) ObserveStoreReadDuration(duration time.Duration)func (*SimpleMetricsCollector) SetActiveListeners(emitterID string, count int)func (*SimpleMetricsCollector) SetQueueSize(component string, size int)// Example implementation of Subscription
type MySubscription struct {
// Add your fields here
}
func (m MySubscription) ID() string {
// Implement your logic here
return
}
func (m MySubscription) Topic() string {
// Implement your logic here
return
}
func (m MySubscription) Listener() Listener {
// Implement your logic here
return
}
func (m MySubscription) Unsubscribe() error {
// Implement your logic here
return
}
func (m MySubscription) Active() bool {
// Implement your logic here
return
}
type Subscription interface {
ID() string
Topic() string
Listener() Listener
Unsubscribe() error
Active() bool
}// Create a new ValidationError
validationerror := ValidationError{
Field: "example",
Value: any{},
Message: "example",
}type ValidationError struct {
Field string
Value any
Message string
}func NewValidationError(field string, value any, message string) *ValidationErrorfunc (*ValidationError) Error() stringvar ErrEventNotFound = errors.New("event not found") // ErrEventNotFound indicates an event was not found in the store
var ErrInvalidEvent = errors.New("invalid event") // ErrInvalidEvent indicates an event failed validation
var ErrListenerNotFound = errors.New("listener not found") // ErrListenerNotFound indicates a listener was not found
var ErrTopicNotFound = errors.New("topic not found") // ErrTopicNotFound indicates a topic was not found
var ErrSubscriptionNotFound = errors.New("subscription not found") // ErrSubscriptionNotFound indicates a subscription was not found
var ErrEmitterClosed = errors.New("emitter is closed") // ErrEmitterClosed indicates the emitter has been closed
var ErrBusClosed = errors.New("bus is closed") // ErrBusClosed indicates the bus has been closed
var ErrStoreReadOnly = errors.New("store is read-only") // ErrStoreReadOnly indicates the store is in read-only mode
var ErrBufferFull = errors.New("buffer is full") // ErrBufferFull indicates a buffer is full and cannot accept more events
var ErrRetryLimitExceeded = errors.New("retry limit exceeded") // ErrRetryLimitExceeded indicates retry attempts have been exhausted
var ErrTimeout = errors.New("operation timed out") // ErrTimeout indicates an operation timed out
var DefaultEventValidator = EventValidatorFunc(func(event Event) error {
if event == nil {
return NewValidationError("event", nil, "event cannot be nil")
}
if event.ID() == "" {
return NewValidationError("id", event.ID(), "event ID cannot be empty")
}
if event.Type() == "" {
return NewValidationError("type", event.Type(), "event type cannot be empty")
}
if event.Timestamp().IsZero() {
return NewValidationError("timestamp", event.Timestamp(), "event timestamp cannot be zero")
}
return nil
})// Create a new BaseEvent
baseevent := BaseEvent{
}type BaseEvent struct {
}func NewBaseEvent(id, eventType string, data any) *BaseEventfunc NewBaseEventWithMetadata(id, eventType string, data any, metadata map[string]string) *BaseEventfunc (*BaseEvent) Data() anyfunc (*BaseEvent) GetMetadata(key string) (string, bool)func (*BaseListener) ID() stringfunc (*BaseEvent) Metadata() map[string]stringfunc (*BaseEvent) SetMetadata(key, value string)func (*BaseEvent) Timestamp() time.Timefunc (*BaseEvent) Type() string// Create a new BaseListener
baselistener := BaseListener{
}type BaseListener struct {
}func NewBaseListener(id string, handler func(event Event) error) *BaseListenerfunc NewBaseListenerWithErrorHandler(id string, handler func(event Event) error, errorHandler func(event Event, err error) error) *BaseListenerfunc (*BaseListener) Handle(event Event) errorfunc (*BaseListener) ID() stringfunc (*BaseListener) OnError(event Event, err error) error// Create a new BaseSubscription
basesubscription := BaseSubscription{
}type BaseSubscription struct {
}func NewBaseSubscription(id, topic string, listener Listener) *BaseSubscriptionfunc NewBaseSubscriptionWithCallback(id, topic string, listener Listener, onClose func()) *BaseSubscriptionfunc (*BaseSubscription) Active() boolfunc (*BaseEvent) ID() stringfunc (*BaseSubscription) Listener() Listenerfunc (*BaseSubscription) Topic() stringfunc (*BaseSubscription) Unsubscribe() error// Example implementation of Event
type MyEvent struct {
// Add your fields here
}
func (m MyEvent) ID() string {
// Implement your logic here
return
}
func (m MyEvent) Type() string {
// Implement your logic here
return
}
func (m MyEvent) Timestamp() time.Time {
// Implement your logic here
return
}
func (m MyEvent) Data() any {
// Implement your logic here
return
}
func (m MyEvent) Metadata() map[string]string {
// Implement your logic here
return
}
type Event interface {
ID() string
Type() string
Timestamp() time.Time
Data() any
Metadata() map[string]string
}// Create a new EventError
eventerror := EventError{
Event: Event{},
Err: error{},
}type EventError struct {
Event Event
Err error
}func NewEventError(event Event, err error) *EventErrorfunc (*ValidationError) Error() stringfunc (*ListenerError) Unwrap() error// Example implementation of EventValidator
type MyEventValidator struct {
// Add your fields here
}
func (m MyEventValidator) Validate(param1 Event) error {
// Implement your logic here
return
}
type EventValidator interface {
Validate(event Event) error
}// Example usage of EventValidatorFunc
var value EventValidatorFunc
// Initialize with appropriate valuetype EventValidatorFunc func(event Event) errorfunc (EventValidatorFunc) Validate(event Event) error// Example implementation of Listener
type MyListener struct {
// Add your fields here
}
func (m MyListener) ID() string {
// Implement your logic here
return
}
func (m MyListener) Handle(param1 Event) error {
// Implement your logic here
return
}
func (m MyListener) OnError(param1 Event, param2 error) error {
// Implement your logic here
return
}
type Listener interface {
ID() string
Handle(event Event) error
OnError(event Event, err error) error
}// Create a new ListenerError
listenererror := ListenerError{
ListenerID: "example",
Event: Event{},
Err: error{},
}type ListenerError struct {
ListenerID string
Event Event
Err error
}func NewListenerError(listenerID string, event Event, err error) *ListenerErrorfunc (*ValidationError) Error() stringfunc (*ListenerError) Unwrap() error// Example implementation of MetricsCollector
type MyMetricsCollector struct {
// Add your fields here
}
func (m MyMetricsCollector) IncEventsEmitted(param1 string, param2 string) {
// Implement your logic here
return
}
func (m MyMetricsCollector) IncEventsProcessed(param1 string, param2 string) {
// Implement your logic here
return
}
func (m MyMetricsCollector) ObserveListenerDuration(param1 string, param2 time.Duration) {
// Implement your logic here
return
}
func (m MyMetricsCollector) ObserveSagaDuration(param1 string, param2 string, param3 time.Duration) {
// Implement your logic here
return
}
func (m MyMetricsCollector) ObserveStoreAppendDuration(param1 time.Duration) {
// Implement your logic here
return
}
func (m MyMetricsCollector) ObserveStoreReadDuration(param1 time.Duration) {
// Implement your logic here
return
}
func (m MyMetricsCollector) SetQueueSize(param1 string, param2 int) {
// Implement your logic here
return
}
func (m MyMetricsCollector) SetActiveListeners(param1 string, param2 int) {
// Implement your logic here
return
}
func (m MyMetricsCollector) IncEventsDropped(param1 string, param2 string) {
// Implement your logic here
return
}
type MetricsCollector interface {
IncEventsEmitted(eventType string, result string)
IncEventsProcessed(listenerID string, result string)
ObserveListenerDuration(listenerID string, duration time.Duration)
ObserveSagaDuration(sagaID string, step string, duration time.Duration)
ObserveStoreAppendDuration(duration time.Duration)
ObserveStoreReadDuration(duration time.Duration)
SetQueueSize(component string, size int)
SetActiveListeners(emitterID string, count int)
IncEventsDropped(component string, reason string)
}// Example implementation of Middleware
type MyMiddleware struct {
// Add your fields here
}
func (m MyMiddleware) Before(param1 Event) error {
// Implement your logic here
return
}
func (m MyMiddleware) After(param1 Event, param2 error) error {
// Implement your logic here
return
}
type Middleware interface {
Before(event Event) error
After(event Event, err error) error
}// Create a new MiddlewareFunc
middlewarefunc := MiddlewareFunc{
BeforeFunc: /* value */,
AfterFunc: /* value */,
}type MiddlewareFunc struct {
BeforeFunc func(event Event) error
AfterFunc func(event Event, err error) error
}func (MiddlewareFunc) After(event Event, err error) errorfunc (MiddlewareFunc) Before(event Event) error// Create a new NoOpMetricsCollector
noopmetricscollector := NoOpMetricsCollector{
}type NoOpMetricsCollector struct {
}func (*SimpleMetricsCollector) IncEventsDropped(component string, reason string)func (*SimpleMetricsCollector) IncEventsEmitted(eventType string, result string)func (*SimpleMetricsCollector) IncEventsProcessed(listenerID string, result string)func (*SimpleMetricsCollector) ObserveListenerDuration(listenerID string, duration time.Duration)func (*SimpleMetricsCollector) ObserveSagaDuration(sagaID string, step string, duration time.Duration)func (*SimpleMetricsCollector) ObserveStoreAppendDuration(duration time.Duration)func (*SimpleMetricsCollector) ObserveStoreReadDuration(duration time.Duration)func (*SimpleMetricsCollector) SetActiveListeners(emitterID string, count int)func (*SimpleMetricsCollector) SetQueueSize(component string, size int)// Create a new SimpleMetricsCollector
simplemetricscollector := SimpleMetricsCollector{
EventsEmitted: 42,
EventsProcessed: 42,
EventsDropped: 42,
ListenerDurations: 42,
SagaDurations: 42,
StoreAppends: 42,
StoreReads: 42,
QueueSizes: map[],
ActiveListeners: map[],
}type SimpleMetricsCollector struct {
EventsEmitted int64
EventsProcessed int64
EventsDropped int64
ListenerDurations int64
SagaDurations int64
StoreAppends int64
StoreReads int64
QueueSizes map[string]int64
ActiveListeners map[string]int64
}func NewSimpleMetricsCollector() *SimpleMetricsCollectorfunc (*SimpleMetricsCollector) GetEventsDropped() int64func (*SimpleMetricsCollector) GetEventsEmitted() int64func (*SimpleMetricsCollector) GetEventsProcessed() int64func (*SimpleMetricsCollector) IncEventsDropped(component string, reason string)func (*SimpleMetricsCollector) IncEventsEmitted(eventType string, result string)func (*SimpleMetricsCollector) IncEventsProcessed(listenerID string, result string)func (*SimpleMetricsCollector) ObserveListenerDuration(listenerID string, duration time.Duration)func (*SimpleMetricsCollector) ObserveSagaDuration(sagaID string, step string, duration time.Duration)func (*SimpleMetricsCollector) ObserveStoreAppendDuration(duration time.Duration)func (*SimpleMetricsCollector) ObserveStoreReadDuration(duration time.Duration)func (*SimpleMetricsCollector) SetActiveListeners(emitterID string, count int)func (*SimpleMetricsCollector) SetQueueSize(component string, size int)// Example implementation of Subscription
type MySubscription struct {
// Add your fields here
}
func (m MySubscription) ID() string {
// Implement your logic here
return
}
func (m MySubscription) Topic() string {
// Implement your logic here
return
}
func (m MySubscription) Listener() Listener {
// Implement your logic here
return
}
func (m MySubscription) Unsubscribe() error {
// Implement your logic here
return
}
func (m MySubscription) Active() bool {
// Implement your logic here
return
}
type Subscription interface {
ID() string
Topic() string
Listener() Listener
Unsubscribe() error
Active() bool
}// Create a new ValidationError
validationerror := ValidationError{
Field: "example",
Value: any{},
Message: "example",
}type ValidationError struct {
Field string
Value any
Message string
}func NewValidationError(field string, value any, message string) *ValidationErrorfunc (*ValidationError) Error() string