Loading documentation...
Loading documentation...
Loading documentation...
The bus package provides topic-based event routing with partitioning, pattern matching, and delivery guarantees.
Import Path: github.com/kolosys/nova/bus
The EventBus adds a topic abstraction layer on top of basic event handling. Events are published to topics, and subscribers receive events from topics they're interested in.
ββββββββββββββββ ββββββββββββββββββββββββββββββββ
β Publisher ββββββββββΆβ Topic β
β β β ββββββββββ ββββββββββ β
ββββββββββββββββ β β Part 0 β β Part 1 β ... β
β βββββ¬βββββ βββββ¬βββββ β
ββββββββΌβββββββββββΌββββββββββββ
β β
ββββββββΌβββββββββββΌβββββββ
β Subscribers β
β β’ handler-1 β
β β’ handler-2 β
β β’ pattern-subscriber β
ββββββββββββββββββββββββββimport (
"github.com/kolosys/ion/workerpool"
"github.com/kolosys/nova/bus"
)
pool := workerpool.New(4, 100)
defer pool.Close(ctx)
b := bus.New(bus.Config{
WorkerPool: pool, // Required
DefaultBufferSize: 1000, // Default topic buffer
DefaultPartitions: 4, // Default partition count
DefaultDeliveryMode: bus.AtLeastOnce,// Default delivery guarantee
Name: "main-bus", // For metrics identification
})
defer b.Shutdown(ctx)| Option | Default | Description |
|---|---|---|
WorkerPool | required | Ion workerpool for event processing |
DefaultBufferSize | 1000 | Buffer size for auto-created topics |
DefaultPartitions | 1 | Partition count for auto-created topics |
DefaultDeliveryMode | AtLeastOnce | Delivery guarantee for auto-created topics |
MetricsCollector | no-op | Custom metrics implementation |
EventValidator | default | Custom event validation |
Name | "bus" | Instance identifier for metrics |
Topics organize events by category. They can be created explicitly or auto-created on first use.
err := b.CreateTopic("orders", bus.TopicConfig{
BufferSize: 2000,
Partitions: 8,
Retention: 24 * time.Hour,
DeliveryMode: bus.ExactlyOnce,
MaxConcurrency: 20,
OrderingKey: func(e shared.Event) string {
return e.Metadata()["customer_id"]
},
})| Option | Default | Description |
|---|---|---|
BufferSize | 1000 | Events buffered per partition |
Partitions | 1 | Number of parallel processing lanes |
Retention | 24h | How long events are kept (for replay) |
DeliveryMode | AtLeastOnce | Delivery guarantee |
MaxConcurrency | 10 | Concurrent handlers per partition |
OrderingKey | event ID | Function to determine partition |
Topics are created automatically when you publish or subscribe:
// Topic "notifications" is created with default config
b.Subscribe("notifications", listener)
b.Publish(ctx, "notifications", event)// List all topics
topics := b.Topics()
for _, name := range topics {
fmt.Println(name)
}
// Delete a topic
if err := b.DeleteTopic("old-topic"); err != nil {
log.Printf("Delete failed: %v", err)
}Nova supports three delivery guarantees:
Fire-and-forget delivery. Events may be lost but are never duplicated.
b.CreateTopic("telemetry", bus.TopicConfig{
DeliveryMode: bus.AtMostOnce,
})Use for:
Guaranteed delivery with possible duplicates. Failed events are retried.
b.CreateTopic("orders", bus.TopicConfig{
DeliveryMode: bus.AtLeastOnce,
})Use for:
Guaranteed delivery without duplicates. Highest overhead.
b.CreateTopic("payments", bus.TopicConfig{
DeliveryMode: bus.ExactlyOnce,
})Use for:
Partitions enable parallel processing while maintaining ordering within a partition.
Event with key "customer-123" βββ
ββββΆ hash("customer-123") % 8 = 3 βββΆ Partition 3
Event with key "customer-123" βββ
Event with key "customer-456" ββββββΆ hash("customer-456") % 8 = 7 βββΆ Partition 7Events with the same ordering key are processed in order:
b.CreateTopic("orders", bus.TopicConfig{
Partitions: 8,
OrderingKey: func(e shared.Event) string {
// All orders from the same customer go to the same partition
return e.Metadata()["customer_id"]
},
})The default uses the event ID. Customize based on your ordering needs:
// By geographic region
OrderingKey: func(e shared.Event) string {
return e.Metadata()["region"]
}
// By tenant
OrderingKey: func(e shared.Event) string {
return e.Metadata()["tenant_id"]
}
// Random distribution (no ordering)
OrderingKey: func(e shared.Event) string {
return fmt.Sprintf("%d", time.Now().UnixNano())
}Subscribe to a specific topic:
listener := shared.NewBaseListener("order-handler", processOrder)
sub := b.Subscribe("orders", listener)
// Unsubscribe when done
defer sub.Unsubscribe()Subscribe to topics matching a regex pattern:
// Subscribe to all user events
auditListener := shared.NewBaseListener("audit", logEvent)
b.SubscribePattern("user\..*", auditListener)
// Subscribe to all events
allListener := shared.NewBaseListener("monitor", monitorEvent)
b.SubscribePattern(".*", allListener)
// Subscribe to specific patterns
orderListener := shared.NewBaseListener("orders", handleOrders)
b.SubscribePattern("order\.(created|updated)", orderListener)Pattern subscriptions:
event := shared.NewBaseEventWithMetadata(
"order-123",
"order.created",
orderData,
map[string]string{
"customer_id": "cust-456",
"region": "us-west",
},
)
if err := b.Publish(ctx, "orders", event); err != nil {
switch {
case errors.Is(err, shared.ErrBufferFull):
// Apply backpressure
case errors.Is(err, shared.ErrBusClosed):
// Bus is shutting down
default:
log.Printf("Publish failed: %v", err)
}
}| Error | Cause | Resolution |
|---|---|---|
ErrBusClosed | Bus is shutting down | Stop publishing |
ErrBufferFull | Partition buffer is full | Slow down or drop |
ValidationError | Event failed validation | Fix event data |
| Context cancelled | Timeout or cancellation | Retry or abort |
stats := b.Stats()
fmt.Printf("Events published: %d
", stats.EventsPublished)
fmt.Printf("Events processed: %d
", stats.EventsProcessed)
fmt.Printf("Active topics: %d
", stats.ActiveTopics)
fmt.Printf("Active subscribers: %d
", stats.ActiveSubscribers)
fmt.Printf("Failed events: %d
", stats.FailedEvents)
fmt.Printf("Queued events: %d
", stats.QueuedEvents)ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := b.Shutdown(ctx); err != nil {
log.Printf("Shutdown timeout: %v", err)
}During shutdown:
ErrBusClosedOrganize topics in a hierarchy and use pattern subscriptions:
// Topics
"user.created"
"user.updated"
"user.deleted"
"order.created"
"order.shipped"
// Subscribe to all user events
b.SubscribePattern("user\..*", userHandler)
// Subscribe to all events (for audit)
b.SubscribePattern(".*", auditHandler)Multiple consumers process the same event independently:
// Each service subscribes to the same topic
b.Subscribe("order.created", inventoryHandler)
b.Subscribe("order.created", notificationHandler)
b.Subscribe("order.created", analyticsHandler)Filter events in the handler:
listener := shared.NewBaseListener("vip-handler", func(event shared.Event) error {
if event.Metadata()["customer_tier"] != "vip" {
return nil // Skip non-VIP events
}
return processVIPOrder(event)
})Handle failed events:
deadLetterHandler := shared.NewBaseListener("dead-letter", func(event shared.Event) error {
log.Printf("Dead letter: %s - %s", event.ID(), event.Metadata()["error"])
// Store for manual review
return nil
})
b.Subscribe("dead-letter", deadLetterHandler)
mainHandler := shared.NewBaseListenerWithErrorHandler(
"main",
processEvent,
func(event shared.Event, err error) error {
// Forward to dead letter topic
if be, ok := event.(*shared.BaseEvent); ok {
be.SetMetadata("error", err.Error())
}
return b.Publish(context.Background(), "dead-letter", event)
},
)The bus package provides topic-based event routing with partitioning, pattern matching, and delivery guarantees.
Import Path: github.com/kolosys/nova/bus
The EventBus adds a topic abstraction layer on top of basic event handling. Events are published to topics, and subscribers receive events from topics they're interested in.
ββββββββββββββββ ββββββββββββββββββββββββββββββββ
β Publisher ββββββββββΆβ Topic β
β β β ββββββββββ ββββββββββ β
ββββββββββββββββ β β Part 0 β β Part 1 β ... β
β βββββ¬βββββ βββββ¬βββββ β
ββββββββΌβββββββββββΌββββββββββββ
β β
ββββββββΌβββββββββββΌβββββββ
β Subscribers β
β β’ handler-1 β
β β’ handler-2 β
β β’ pattern-subscriber β
ββββββββββββββββββββββββββimport (
"github.com/kolosys/ion/workerpool"
"github.com/kolosys/nova/bus"
)
pool := workerpool.New(4, 100)
defer pool.Close(ctx)
b := bus.New(bus.Config{
WorkerPool: pool, // Required
DefaultBufferSize: 1000, // Default topic buffer
DefaultPartitions: 4, // Default partition count
DefaultDeliveryMode: bus.AtLeastOnce,// Default delivery guarantee
Name: "main-bus", // For metrics identification
})
defer b.Shutdown(ctx)| Option | Default | Description |
|---|---|---|
WorkerPool | required | Ion workerpool for event processing |
DefaultBufferSize | 1000 | Buffer size for auto-created topics |
DefaultPartitions | 1 | Partition count for auto-created topics |
DefaultDeliveryMode | AtLeastOnce | Delivery guarantee for auto-created topics |
MetricsCollector | no-op | Custom metrics implementation |
EventValidator | default | Custom event validation |
Name | "bus" | Instance identifier for metrics |
Topics organize events by category. They can be created explicitly or auto-created on first use.
err := b.CreateTopic("orders", bus.TopicConfig{
BufferSize: 2000,
Partitions: 8,
Retention: 24 * time.Hour,
DeliveryMode: bus.ExactlyOnce,
MaxConcurrency: 20,
OrderingKey: func(e shared.Event) string {
return e.Metadata()["customer_id"]
},
})| Option | Default | Description |
|---|---|---|
BufferSize | 1000 | Events buffered per partition |
Partitions | 1 | Number of parallel processing lanes |
Retention | 24h | How long events are kept (for replay) |
DeliveryMode | AtLeastOnce | Delivery guarantee |
MaxConcurrency | 10 | Concurrent handlers per partition |
OrderingKey | event ID | Function to determine partition |
Topics are created automatically when you publish or subscribe:
// Topic "notifications" is created with default config
b.Subscribe("notifications", listener)
b.Publish(ctx, "notifications", event)// List all topics
topics := b.Topics()
for _, name := range topics {
fmt.Println(name)
}
// Delete a topic
if err := b.DeleteTopic("old-topic"); err != nil {
log.Printf("Delete failed: %v", err)
}Nova supports three delivery guarantees:
Fire-and-forget delivery. Events may be lost but are never duplicated.
b.CreateTopic("telemetry", bus.TopicConfig{
DeliveryMode: bus.AtMostOnce,
})Use for:
Guaranteed delivery with possible duplicates. Failed events are retried.
b.CreateTopic("orders", bus.TopicConfig{
DeliveryMode: bus.AtLeastOnce,
})Use for:
Guaranteed delivery without duplicates. Highest overhead.
b.CreateTopic("payments", bus.TopicConfig{
DeliveryMode: bus.ExactlyOnce,
})Use for:
Partitions enable parallel processing while maintaining ordering within a partition.
Event with key "customer-123" βββ
ββββΆ hash("customer-123") % 8 = 3 βββΆ Partition 3
Event with key "customer-123" βββ
Event with key "customer-456" ββββββΆ hash("customer-456") % 8 = 7 βββΆ Partition 7Events with the same ordering key are processed in order:
b.CreateTopic("orders", bus.TopicConfig{
Partitions: 8,
OrderingKey: func(e shared.Event) string {
// All orders from the same customer go to the same partition
return e.Metadata()["customer_id"]
},
})The default uses the event ID. Customize based on your ordering needs:
// By geographic region
OrderingKey: func(e shared.Event) string {
return e.Metadata()["region"]
}
// By tenant
OrderingKey: func(e shared.Event) string {
return e.Metadata()["tenant_id"]
}
// Random distribution (no ordering)
OrderingKey: func(e shared.Event) string {
return fmt.Sprintf("%d", time.Now().UnixNano())
}Subscribe to a specific topic:
listener := shared.NewBaseListener("order-handler", processOrder)
sub := b.Subscribe("orders", listener)
// Unsubscribe when done
defer sub.Unsubscribe()Subscribe to topics matching a regex pattern:
// Subscribe to all user events
auditListener := shared.NewBaseListener("audit", logEvent)
b.SubscribePattern("user\..*", auditListener)
// Subscribe to all events
allListener := shared.NewBaseListener("monitor", monitorEvent)
b.SubscribePattern(".*", allListener)
// Subscribe to specific patterns
orderListener := shared.NewBaseListener("orders", handleOrders)
b.SubscribePattern("order\.(created|updated)", orderListener)Pattern subscriptions:
event := shared.NewBaseEventWithMetadata(
"order-123",
"order.created",
orderData,
map[string]string{
"customer_id": "cust-456",
"region": "us-west",
},
)
if err := b.Publish(ctx, "orders", event); err != nil {
switch {
case errors.Is(err, shared.ErrBufferFull):
// Apply backpressure
case errors.Is(err, shared.ErrBusClosed):
// Bus is shutting down
default:
log.Printf("Publish failed: %v", err)
}
}| Error | Cause | Resolution |
|---|---|---|
ErrBusClosed | Bus is shutting down | Stop publishing |
ErrBufferFull | Partition buffer is full | Slow down or drop |
ValidationError | Event failed validation | Fix event data |
| Context cancelled | Timeout or cancellation | Retry or abort |
stats := b.Stats()
fmt.Printf("Events published: %d
", stats.EventsPublished)
fmt.Printf("Events processed: %d
", stats.EventsProcessed)
fmt.Printf("Active topics: %d
", stats.ActiveTopics)
fmt.Printf("Active subscribers: %d
", stats.ActiveSubscribers)
fmt.Printf("Failed events: %d
", stats.FailedEvents)
fmt.Printf("Queued events: %d
", stats.QueuedEvents)ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := b.Shutdown(ctx); err != nil {
log.Printf("Shutdown timeout: %v", err)
}During shutdown:
ErrBusClosedOrganize topics in a hierarchy and use pattern subscriptions:
// Topics
"user.created"
"user.updated"
"user.deleted"
"order.created"
"order.shipped"
// Subscribe to all user events
b.SubscribePattern("user\..*", userHandler)
// Subscribe to all events (for audit)
b.SubscribePattern(".*", auditHandler)Multiple consumers process the same event independently:
// Each service subscribes to the same topic
b.Subscribe("order.created", inventoryHandler)
b.Subscribe("order.created", notificationHandler)
b.Subscribe("order.created", analyticsHandler)Filter events in the handler:
listener := shared.NewBaseListener("vip-handler", func(event shared.Event) error {
if event.Metadata()["customer_tier"] != "vip" {
return nil // Skip non-VIP events
}
return processVIPOrder(event)
})Handle failed events:
deadLetterHandler := shared.NewBaseListener("dead-letter", func(event shared.Event) error {
log.Printf("Dead letter: %s - %s", event.ID(), event.Metadata()["error"])
// Store for manual review
return nil
})
b.Subscribe("dead-letter", deadLetterHandler)
mainHandler := shared.NewBaseListenerWithErrorHandler(
"main",
processEvent,
func(event shared.Event, err error) error {
// Forward to dead letter topic
if be, ok := event.(*shared.BaseEvent); ok {
be.SetMetadata("error", err.Error())
}
return b.Publish(context.Background(), "dead-letter", event)
},
)ββββββββββββββββ ββββββββββββββββββββββββββββββββ
β Publisher ββββββββββΆβ Topic β
β β β ββββββββββ ββββββββββ β
ββββββββββββββββ β β Part 0 β β Part 1 β ... β
β βββββ¬βββββ βββββ¬βββββ β
ββββββββΌβββββββββββΌββββββββββββ
β β
ββββββββΌβββββββββββΌβββββββ
β Subscribers β
β β’ handler-1 β
β β’ handler-2 β
β β’ pattern-subscriber β
ββββββββββββββββββββββββββimport (
"github.com/kolosys/ion/workerpool"
"github.com/kolosys/nova/bus"
)
pool := workerpool.New(4, 100)
defer pool.Close(ctx)
b := bus.New(bus.Config{
WorkerPool: pool, // Required
DefaultBufferSize: 1000, // Default topic buffer
DefaultPartitions: 4, // Default partition count
DefaultDeliveryMode: bus.AtLeastOnce,// Default delivery guarantee
Name: "main-bus", // For metrics identification
})
defer b.Shutdown(ctx)err := b.CreateTopic("orders", bus.TopicConfig{
BufferSize: 2000,
Partitions: 8,
Retention: 24 * time.Hour,
DeliveryMode: bus.ExactlyOnce,
MaxConcurrency: 20,
OrderingKey: func(e shared.Event) string {
return e.Metadata()["customer_id"]
},
})// Topic "notifications" is created with default config
b.Subscribe("notifications", listener)
b.Publish(ctx, "notifications", event)// List all topics
topics := b.Topics()
for _, name := range topics {
fmt.Println(name)
}
// Delete a topic
if err := b.DeleteTopic("old-topic"); err != nil {
log.Printf("Delete failed: %v", err)
}b.CreateTopic("telemetry", bus.TopicConfig{
DeliveryMode: bus.AtMostOnce,
})b.CreateTopic("orders", bus.TopicConfig{
DeliveryMode: bus.AtLeastOnce,
})b.CreateTopic("payments", bus.TopicConfig{
DeliveryMode: bus.ExactlyOnce,
})Event with key "customer-123" βββ
ββββΆ hash("customer-123") % 8 = 3 βββΆ Partition 3
Event with key "customer-123" βββ
Event with key "customer-456" ββββββΆ hash("customer-456") % 8 = 7 βββΆ Partition 7b.CreateTopic("orders", bus.TopicConfig{
Partitions: 8,
OrderingKey: func(e shared.Event) string {
// All orders from the same customer go to the same partition
return e.Metadata()["customer_id"]
},
})// By geographic region
OrderingKey: func(e shared.Event) string {
return e.Metadata()["region"]
}
// By tenant
OrderingKey: func(e shared.Event) string {
return e.Metadata()["tenant_id"]
}
// Random distribution (no ordering)
OrderingKey: func(e shared.Event) string {
return fmt.Sprintf("%d", time.Now().UnixNano())
}listener := shared.NewBaseListener("order-handler", processOrder)
sub := b.Subscribe("orders", listener)
// Unsubscribe when done
defer sub.Unsubscribe()// Subscribe to all user events
auditListener := shared.NewBaseListener("audit", logEvent)
b.SubscribePattern("user\..*", auditListener)
// Subscribe to all events
allListener := shared.NewBaseListener("monitor", monitorEvent)
b.SubscribePattern(".*", allListener)
// Subscribe to specific patterns
orderListener := shared.NewBaseListener("orders", handleOrders)
b.SubscribePattern("order\.(created|updated)", orderListener)event := shared.NewBaseEventWithMetadata(
"order-123",
"order.created",
orderData,
map[string]string{
"customer_id": "cust-456",
"region": "us-west",
},
)
if err := b.Publish(ctx, "orders", event); err != nil {
switch {
case errors.Is(err, shared.ErrBufferFull):
// Apply backpressure
case errors.Is(err, shared.ErrBusClosed):
// Bus is shutting down
default:
log.Printf("Publish failed: %v", err)
}
}stats := b.Stats()
fmt.Printf("Events published: %d
", stats.EventsPublished)
fmt.Printf("Events processed: %d
", stats.EventsProcessed)
fmt.Printf("Active topics: %d
", stats.ActiveTopics)
fmt.Printf("Active subscribers: %d
", stats.ActiveSubscribers)
fmt.Printf("Failed events: %d
", stats.FailedEvents)
fmt.Printf("Queued events: %d
", stats.QueuedEvents)ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := b.Shutdown(ctx); err != nil {
log.Printf("Shutdown timeout: %v", err)
}// Topics
"user.created"
"user.updated"
"user.deleted"
"order.created"
"order.shipped"
// Subscribe to all user events
b.SubscribePattern("user\..*", userHandler)
// Subscribe to all events (for audit)
b.SubscribePattern(".*", auditHandler)// Each service subscribes to the same topic
b.Subscribe("order.created", inventoryHandler)
b.Subscribe("order.created", notificationHandler)
b.Subscribe("order.created", analyticsHandler)listener := shared.NewBaseListener("vip-handler", func(event shared.Event) error {
if event.Metadata()["customer_tier"] != "vip" {
return nil // Skip non-VIP events
}
return processVIPOrder(event)
})deadLetterHandler := shared.NewBaseListener("dead-letter", func(event shared.Event) error {
log.Printf("Dead letter: %s - %s", event.ID(), event.Metadata()["error"])
// Store for manual review
return nil
})
b.Subscribe("dead-letter", deadLetterHandler)
mainHandler := shared.NewBaseListenerWithErrorHandler(
"main",
processEvent,
func(event shared.Event, err error) error {
// Forward to dead letter topic
if be, ok := event.(*shared.BaseEvent); ok {
be.SetMetadata("error", err.Error())
}
return b.Publish(context.Background(), "dead-letter", event)
},
)ββββββββββββββββ ββββββββββββββββββββββββββββββββ
β Publisher ββββββββββΆβ Topic β
β β β ββββββββββ ββββββββββ β
ββββββββββββββββ β β Part 0 β β Part 1 β ... β
β βββββ¬βββββ βββββ¬βββββ β
ββββββββΌβββββββββββΌββββββββββββ
β β
ββββββββΌβββββββββββΌβββββββ
β Subscribers β
β β’ handler-1 β
β β’ handler-2 β
β β’ pattern-subscriber β
ββββββββββββββββββββββββββimport (
"github.com/kolosys/ion/workerpool"
"github.com/kolosys/nova/bus"
)
pool := workerpool.New(4, 100)
defer pool.Close(ctx)
b := bus.New(bus.Config{
WorkerPool: pool, // Required
DefaultBufferSize: 1000, // Default topic buffer
DefaultPartitions: 4, // Default partition count
DefaultDeliveryMode: bus.AtLeastOnce,// Default delivery guarantee
Name: "main-bus", // For metrics identification
})
defer b.Shutdown(ctx)err := b.CreateTopic("orders", bus.TopicConfig{
BufferSize: 2000,
Partitions: 8,
Retention: 24 * time.Hour,
DeliveryMode: bus.ExactlyOnce,
MaxConcurrency: 20,
OrderingKey: func(e shared.Event) string {
return e.Metadata()["customer_id"]
},
})// Topic "notifications" is created with default config
b.Subscribe("notifications", listener)
b.Publish(ctx, "notifications", event)// List all topics
topics := b.Topics()
for _, name := range topics {
fmt.Println(name)
}
// Delete a topic
if err := b.DeleteTopic("old-topic"); err != nil {
log.Printf("Delete failed: %v", err)
}b.CreateTopic("telemetry", bus.TopicConfig{
DeliveryMode: bus.AtMostOnce,
})b.CreateTopic("orders", bus.TopicConfig{
DeliveryMode: bus.AtLeastOnce,
})b.CreateTopic("payments", bus.TopicConfig{
DeliveryMode: bus.ExactlyOnce,
})Event with key "customer-123" βββ
ββββΆ hash("customer-123") % 8 = 3 βββΆ Partition 3
Event with key "customer-123" βββ
Event with key "customer-456" ββββββΆ hash("customer-456") % 8 = 7 βββΆ Partition 7b.CreateTopic("orders", bus.TopicConfig{
Partitions: 8,
OrderingKey: func(e shared.Event) string {
// All orders from the same customer go to the same partition
return e.Metadata()["customer_id"]
},
})// By geographic region
OrderingKey: func(e shared.Event) string {
return e.Metadata()["region"]
}
// By tenant
OrderingKey: func(e shared.Event) string {
return e.Metadata()["tenant_id"]
}
// Random distribution (no ordering)
OrderingKey: func(e shared.Event) string {
return fmt.Sprintf("%d", time.Now().UnixNano())
}listener := shared.NewBaseListener("order-handler", processOrder)
sub := b.Subscribe("orders", listener)
// Unsubscribe when done
defer sub.Unsubscribe()// Subscribe to all user events
auditListener := shared.NewBaseListener("audit", logEvent)
b.SubscribePattern("user\..*", auditListener)
// Subscribe to all events
allListener := shared.NewBaseListener("monitor", monitorEvent)
b.SubscribePattern(".*", allListener)
// Subscribe to specific patterns
orderListener := shared.NewBaseListener("orders", handleOrders)
b.SubscribePattern("order\.(created|updated)", orderListener)event := shared.NewBaseEventWithMetadata(
"order-123",
"order.created",
orderData,
map[string]string{
"customer_id": "cust-456",
"region": "us-west",
},
)
if err := b.Publish(ctx, "orders", event); err != nil {
switch {
case errors.Is(err, shared.ErrBufferFull):
// Apply backpressure
case errors.Is(err, shared.ErrBusClosed):
// Bus is shutting down
default:
log.Printf("Publish failed: %v", err)
}
}stats := b.Stats()
fmt.Printf("Events published: %d
", stats.EventsPublished)
fmt.Printf("Events processed: %d
", stats.EventsProcessed)
fmt.Printf("Active topics: %d
", stats.ActiveTopics)
fmt.Printf("Active subscribers: %d
", stats.ActiveSubscribers)
fmt.Printf("Failed events: %d
", stats.FailedEvents)
fmt.Printf("Queued events: %d
", stats.QueuedEvents)ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := b.Shutdown(ctx); err != nil {
log.Printf("Shutdown timeout: %v", err)
}// Topics
"user.created"
"user.updated"
"user.deleted"
"order.created"
"order.shipped"
// Subscribe to all user events
b.SubscribePattern("user\..*", userHandler)
// Subscribe to all events (for audit)
b.SubscribePattern(".*", auditHandler)// Each service subscribes to the same topic
b.Subscribe("order.created", inventoryHandler)
b.Subscribe("order.created", notificationHandler)
b.Subscribe("order.created", analyticsHandler)listener := shared.NewBaseListener("vip-handler", func(event shared.Event) error {
if event.Metadata()["customer_tier"] != "vip" {
return nil // Skip non-VIP events
}
return processVIPOrder(event)
})deadLetterHandler := shared.NewBaseListener("dead-letter", func(event shared.Event) error {
log.Printf("Dead letter: %s - %s", event.ID(), event.Metadata()["error"])
// Store for manual review
return nil
})
b.Subscribe("dead-letter", deadLetterHandler)
mainHandler := shared.NewBaseListenerWithErrorHandler(
"main",
processEvent,
func(event shared.Event, err error) error {
// Forward to dead letter topic
if be, ok := event.(*shared.BaseEvent); ok {
be.SetMetadata("error", err.Error())
}
return b.Publish(context.Background(), "dead-letter", event)
},
)