Event System
The GOE event system provides a robust, Redis Streams-based publish-subscribe (pub/sub) eventing system for building scalable, distributed applications. It supports advanced features like consumer groups, dead letter queues, delayed messages, and guaranteed message ordering.
Features
- Redis Streams Backend: Uses Redis Streams for high-throughput, persistent message storage
- Consumer Groups: Horizontal scaling with load balancing across multiple consumers
- Dead Letter Queue: Automatic isolation of failed messages with replay capabilities
- Delayed Messages: Schedule messages for future delivery
- Message Ordering: Guaranteed order within a single stream/topic
- At-Least-Once Delivery: Ensures messages are processed at least once
- Stale Consumer Recovery: Automatic recovery from failed consumer instances
- Observability: Built-in metrics and tracing support
Quick Start
1. Enable the Event Module
package main
import (
"go.oease.dev/goe/v2"
"go.oease.dev/goe/v2/contract"
)
func main() {
app := goe.New(goe.Options{
WithEvent: true, // Enable event system
})
app.Run()
}
2. Global Accessors
The event system is available via global accessors anywhere in your application:
// Access event services globally
eventManager := goe.EventManager() // Full event manager
publisher := goe.EventPublisher() // Just publishing
consumer := goe.EventConsumer() // Just consuming
dlq := goe.DeadLetterQueue() // Dead letter queue
// Use in HTTP handlers, services, etc.
func MyHandler(c fiber.Ctx) error {
event := event.NewEvent("user.action", userData)
return goe.EventPublisher().Publish(c.Context(), "user-events", event)
}
3. Basic Event Publishing
// Inject the event publisher
func HandleUserSignup(publisher contract.EventPublisher) func(c fiber.Ctx) error {
return func(c fiber.Ctx) error {
// Create and publish an event
event := event.NewEvent("user.signup", map[string]interface{}{
"userId": 123,
"email": "user@example.com",
"plan": "premium",
})
err := publisher.Publish(c.Context(), "user-events", event)
if err != nil {
return c.Status(500).JSON(fiber.Map{"error": "Failed to publish event"})
}
return c.JSON(fiber.Map{"message": "User created successfully"})
}
}
4. Basic Event Consumption
// Event handler
func HandleUserSignup(ctx context.Context, event contract.Event) error {
// Extract payload
var payload map[string]interface{}
if err := event.UnmarshalEventPayload(event, &payload); err != nil {
return err
}
userID := payload["userId"]
email := payload["email"]
// Process the event
fmt.Printf("Processing user signup: %v, %v\n", userID, email)
// Send welcome email, update analytics, etc.
return nil
}
// Subscribe to events
func SetupEventSubscriptions(consumer contract.EventConsumer) {
handler := event.CreateEventHandler(HandleUserSignup)
// Subscribe to user events with consumer group
consumer.Subscribe(
context.Background(),
"user-events", // topic
"user-signup-processor", // consumer group
handler,
)
}
Configuration
The event system can be configured using environment variables:
Redis Connection
The event system supports flexible Redis connection configuration with multiple formats:
URL Format (Recommended)
# Standard Redis connection
EVENT_REDIS_URL=redis://username:password@localhost:6379/0
# TLS/SSL connection (rediss protocol)
EVENT_REDIS_URL=rediss://username:password@redis.example.com:6380/0
Individual Parameters
# Multiple hosts support
EVENT_REDIS_HOSTS=localhost:6379,redis2:6379
# Authentication
EVENT_REDIS_USERNAME=username
EVENT_REDIS_PASSWORD=password
# Database selection
EVENT_REDIS_DB=0
Legacy Format (Deprecated)
# Still supported for backward compatibility
EVENT_REDIS_ADDR=localhost:6379
EVENT_REDIS_PASSWORD=
EVENT_REDIS_DB=0
Configuration Priority
The event system uses the following priority order:
- URL Format (
EVENT_REDIS_URL
) - Complete connection string - Individual Parameters -
EVENT_REDIS_HOSTS
,EVENT_REDIS_USERNAME
, etc. - Legacy Format -
EVENT_REDIS_ADDR
(for backward compatibility)
Redis URL Examples
# Basic connection
EVENT_REDIS_URL=redis://localhost:6379
# With authentication
EVENT_REDIS_URL=redis://username:password@localhost:6379
# With database selection
EVENT_REDIS_URL=redis://localhost:6379/2
# TLS/SSL connection
EVENT_REDIS_URL=rediss://username:password@redis.example.com:6380/0
Consumer Settings
# Consumer timeout for processing messages
EVENT_CONSUMER_TIMEOUT=30s
# Maximum number of retries before moving to DLQ
EVENT_MAX_RETRIES=3
# Backoff time between retries
EVENT_RETRY_BACKOFF=1s
# Dead letter queue TTL
EVENT_DLQ_TTL=24h
# Timeout for detecting stale consumers
EVENT_STALE_CONSUMER_TIMEOUT=5m
Performance Settings
# Number of messages to process in a batch
EVENT_BATCH_SIZE=10
# Maximum pending messages per consumer
EVENT_MAX_PENDING_MESSAGES=1000
# Minimum idle time before claiming stale messages
EVENT_CLAIM_MIN_IDLE_TIME=1m
# Interval for claiming stale messages
EVENT_CLAIM_INTERVAL=30s
Delayed Queue Settings
# Enable delayed message processing
EVENT_DELAYED_QUEUE_ENABLED=true
# Interval for checking delayed messages
EVENT_DELAYED_QUEUE_CHECK_INTERVAL=1s
Advanced Usage
Delayed Messages
// Publish a message with a 5-minute delay
err := publisher.PublishWithDelay(
ctx,
"notifications",
event.NewEvent("reminder.send", reminderData),
5*time.Minute,
)
Batch Publishing
// Create a batch publisher
batch := event.NewBatchPublisher(publisher, "analytics")
// Add events to batch
batch.AddJSON("user.action", actionData1)
batch.AddJSON("user.action", actionData2)
batch.AddJSON("user.action", actionData3)
// Publish all events in a single operation
err := batch.Publish(ctx)
Typed Event Handlers
type UserSignupEvent struct {
UserID int `json:"userId"`
Email string `json:"email"`
Plan string `json:"plan"`
}
// Create a typed handler
handler := event.CreateTypedEventHandler(
func(ctx context.Context, event contract.Event, payload UserSignupEvent) error {
// Payload is automatically unmarshaled
fmt.Printf("User %d signed up with email %s\n", payload.UserID, payload.Email)
return nil
},
)
Dead Letter Queue Management
// Get the dead letter queue manager
dlq := eventManager.GetDeadLetterQueue()
// Retrieve failed messages
messages, err := dlq.GetMessages(ctx, "user-events", 10)
if err != nil {
log.Printf("Failed to get DLQ messages: %v", err)
return
}
// Replay a message
for _, msg := range messages {
messageID := msg.Headers()["dlq_message_id"]
err := dlq.ReplayMessage(ctx, "user-events", messageID)
if err != nil {
log.Printf("Failed to replay message %s: %v", messageID, err)
}
}
Pattern Matching
// Create a pattern for filtering events
pattern := &event.EventPattern{
Topic: "user-events",
EventName: "user.signup",
Headers: map[string]string{
"source": "web-app",
},
}
// Wrap handler with pattern matching
patternHandler := event.NewPatternHandler(pattern, handler)
Multiple Consumer Groups (Fan-out)
// Multiple consumer groups can process the same messages
// Each group gets a copy of every message
// Analytics consumer group
consumer.Subscribe(ctx, "user-events", "analytics", analyticsHandler)
// Notification consumer group
consumer.Subscribe(ctx, "user-events", "notifications", notificationHandler)
// Audit consumer group
consumer.Subscribe(ctx, "user-events", "audit", auditHandler)
Error Handling
Retry Logic
The event system automatically retries failed messages up to EVENT_MAX_RETRIES
times with exponential backoff. After all retries are exhausted, messages are moved to the dead letter queue.
Dead Letter Queue
Failed messages are automatically moved to a dead letter queue where they can be:
- Inspected for debugging
- Replayed after fixing the underlying issue
- Removed if they're no longer needed
Stale Consumer Recovery
The system automatically detects and recovers from stale consumers:
- Messages from crashed consumers are reclaimed
- Automatic failover to healthy consumers
- Configurable timeout for stale detection
Monitoring
Health Checks
// Check event system health
err := eventManager.Health(ctx)
if err != nil {
log.Printf("Event system unhealthy: %v", err)
}
Statistics
// Get event system statistics
stats, err := eventManager.Stats(ctx)
if err != nil {
log.Printf("Failed to get stats: %v", err)
return
}
// Print topic statistics
for topic, topicStats := range stats.Topics {
fmt.Printf("Topic %s: %d messages\n", topic, topicStats.MessagesCount)
}
// Print consumer group statistics
for group, groupStats := range stats.ConsumerGroups {
fmt.Printf("Group %s: %d pending messages\n", group, groupStats.PendingMessagesCount)
}
Observability Integration
When the observability module is enabled, the event system automatically provides:
- Metrics: Message throughput, processing latency, error rates
- Tracing: Distributed tracing across event publishing and consumption
- Logging: Structured logging with correlation IDs
Best Practices
1. Event Design
- Use meaningful event names with namespacing (e.g.,
user.signup
,order.created
) - Include all necessary data in the event payload
- Add metadata in headers for filtering and routing
- Keep events immutable and backward compatible
2. Consumer Groups
- Use descriptive consumer group names
- Group related consumers together
- Scale consumer groups horizontally for high throughput
- Monitor consumer group lag
3. Error Handling
- Implement proper error handling in event handlers
- Use dead letter queues for poison messages
- Set appropriate retry counts and backoff strategies
- Monitor and alert on DLQ growth
4. Performance
- Use batch publishing for high-volume scenarios
- Configure appropriate batch sizes for your workload
- Monitor Redis memory usage and performance
- Consider partitioning topics for very high volumes
5. Testing
- Use the provided test utilities for integration testing
- Test both happy path and error scenarios
- Mock event publishers in unit tests
- Test consumer group failover scenarios
Common Patterns
Event Sourcing
// Store events as the source of truth
events := []contract.Event{
event.NewEvent("user.created", userData),
event.NewEvent("user.email.verified", emailData),
event.NewEvent("user.plan.upgraded", planData),
}
for _, evt := range events {
publisher.Publish(ctx, "user-events", evt)
}
Saga Pattern
// Coordinate distributed transactions
func HandleOrderPlaced(ctx context.Context, event contract.Event) error {
// Step 1: Reserve inventory
publisher.Publish(ctx, "inventory",
event.NewEvent("inventory.reserve", inventoryData))
// Step 2: Process payment
publisher.Publish(ctx, "payment",
event.NewEvent("payment.charge", paymentData))
return nil
}
CQRS (Command Query Responsibility Segregation)
// Separate read and write models
func HandleUserCommand(ctx context.Context, event contract.Event) error {
// Update write model
writeModel.UpdateUser(userData)
// Publish event for read model updates
publisher.Publish(ctx, "user-projections",
event.NewEvent("user.updated", userData))
return nil
}
Troubleshooting
Common Issues
Messages not being consumed
- Check Redis connection
- Verify consumer group is created
- Check consumer is running and healthy
High DLQ growth
- Check handler error logs
- Verify message format compatibility
- Review retry configuration
Performance issues
- Monitor Redis memory and CPU
- Check batch size configuration
- Review consumer group scaling
Message ordering issues
- Verify single topic usage
- Check consumer group configuration
- Review message publishing order
Debug Commands
# Check Redis streams
redis-cli XINFO STREAM event:stream:user-events
# Check consumer groups
redis-cli XINFO GROUPS event:stream:user-events
# Check pending messages
redis-cli XPENDING event:stream:user-events user-group
# Check DLQ
redis-cli XLEN event:dlq:user-events