Observer Pattern in Go: Channel-Based Observers
The observer pattern establishes a one-to-many dependency between objects. When a subject changes state, all registered observers receive automatic notification. It's the backbone of event-driven...
Key Insights
- Go’s channels provide a natural implementation of the observer pattern that eliminates tight coupling between subjects and observers while leveraging the language’s built-in concurrency primitives.
- Buffered channels with non-blocking sends prevent slow observers from blocking the entire notification system, but require careful consideration of backpressure and dropped message handling.
- Context-based cancellation and proper cleanup patterns are essential to prevent goroutine leaks—the most common pitfall when implementing channel-based observers in production systems.
Introduction to the Observer Pattern
The observer pattern establishes a one-to-many dependency between objects. When a subject changes state, all registered observers receive automatic notification. It’s the backbone of event-driven architectures, GUI frameworks, and reactive systems.
In most languages, you implement observers through interfaces and callback methods. The subject maintains a list of observer interfaces and iterates through them during notification. This works, but it creates coupling and synchronization challenges.
Go offers something better. Channels are first-class citizens designed for communication between concurrent processes. They naturally model the “publish-subscribe” relationship at the heart of the observer pattern. Instead of calling methods on observer objects, subjects send messages through channels. Observers receive these messages in their own goroutines, achieving true decoupling and concurrent processing without explicit thread management.
Traditional vs. Channel-Based Observers
Let’s start with the traditional interface-based approach to understand what we’re improving upon:
type Observer interface {
OnEvent(event Event)
}
type Subject struct {
mu sync.RWMutex
observers []Observer
}
func (s *Subject) Register(o Observer) {
s.mu.Lock()
defer s.mu.Unlock()
s.observers = append(s.observers, o)
}
func (s *Subject) Notify(event Event) {
s.mu.RLock()
defer s.mu.RUnlock()
for _, o := range s.observers {
o.OnEvent(event) // Blocking call - slow observer blocks everyone
}
}
This approach has problems. The Notify method blocks until every observer processes the event. A slow observer delays all subsequent notifications. You could spawn goroutines for each call, but then you lose ordering guarantees and complicate error handling.
The channel-based approach inverts the control flow. Observers own their processing loops and pull events from channels at their own pace:
| Aspect | Interface-Based | Channel-Based |
|---|---|---|
| Coupling | Observers must implement interface | Observers only need a channel |
| Blocking | Subject waits for observers | Subject sends and continues |
| Concurrency | Manual goroutine management | Built into channel semantics |
| Type Safety | Compile-time via interfaces | Compile-time via typed channels |
| Backpressure | Implicit (blocking) | Explicit (buffered channels, drops) |
Designing a Channel-Based Observer
Here’s a robust implementation of a channel-based subject:
type Event struct {
Type string
Payload any
Time time.Time
}
type Subject struct {
mu sync.RWMutex
subscribers map[string]chan Event
closed bool
}
func NewSubject() *Subject {
return &Subject{
subscribers: make(map[string]chan Event),
}
}
func (s *Subject) Subscribe(id string, bufferSize int) (<-chan Event, error) {
s.mu.Lock()
defer s.mu.Unlock()
if s.closed {
return nil, errors.New("subject is closed")
}
if _, exists := s.subscribers[id]; exists {
return nil, fmt.Errorf("subscriber %s already exists", id)
}
ch := make(chan Event, bufferSize)
s.subscribers[id] = ch
return ch, nil
}
func (s *Subject) Unsubscribe(id string) {
s.mu.Lock()
defer s.mu.Unlock()
if ch, exists := s.subscribers[id]; exists {
close(ch)
delete(s.subscribers, id)
}
}
func (s *Subject) Notify(event Event) {
s.mu.RLock()
defer s.mu.RUnlock()
for id, ch := range s.subscribers {
select {
case ch <- event:
// Sent successfully
default:
// Channel full, log or handle dropped event
log.Printf("dropped event for subscriber %s: buffer full", id)
}
}
}
func (s *Subject) Close() {
s.mu.Lock()
defer s.mu.Unlock()
s.closed = true
for id, ch := range s.subscribers {
close(ch)
delete(s.subscribers, id)
}
}
The key design decisions here: subscribers are identified by string IDs for easy management, channels are returned as receive-only to prevent subscribers from sending, and the Notify method uses non-blocking sends to prevent slow subscribers from affecting others.
Implementing Asynchronous Notifications
The non-blocking send pattern using select with a default case is crucial for production systems. But sometimes you want more nuanced behavior—perhaps a brief wait before dropping:
func (s *Subject) NotifyWithTimeout(event Event, timeout time.Duration) {
s.mu.RLock()
defer s.mu.RUnlock()
var wg sync.WaitGroup
for id, ch := range s.subscribers {
wg.Add(1)
go func(subscriberID string, subscriberCh chan Event) {
defer wg.Done()
select {
case subscriberCh <- event:
// Delivered
case <-time.After(timeout):
log.Printf("timeout delivering to %s", subscriberID)
}
}(id, ch)
}
wg.Wait() // Optional: wait for all delivery attempts
}
Buffered versus unbuffered channels represent a fundamental trade-off. Unbuffered channels provide backpressure—the subject knows immediately if an observer can’t keep up. Buffered channels absorb temporary bursts but can mask problems until the buffer fills. For most event systems, I recommend small buffers (10-100 events) with monitoring on buffer utilization.
Graceful Shutdown and Resource Cleanup
Goroutine leaks are the silent killer of Go applications. Every observer loop must have a clear termination path. Context cancellation provides the cleanest approach:
type Observer struct {
id string
events <-chan Event
}
func (o *Observer) Run(ctx context.Context, handler func(Event)) {
for {
select {
case <-ctx.Done():
log.Printf("observer %s shutting down: %v", o.id, ctx.Err())
// Drain remaining events if needed
for {
select {
case event, ok := <-o.events:
if !ok {
return
}
handler(event) // Process remaining buffered events
default:
return
}
}
case event, ok := <-o.events:
if !ok {
log.Printf("observer %s: channel closed", o.id)
return
}
handler(event)
}
}
}
The drain loop after context cancellation is optional but important for systems where you can’t afford to lose events. It processes any remaining buffered events before the observer terminates.
Practical Example: Real-Time Event Bus
Let’s build a complete event bus for an order processing system:
type OrderEvent struct {
OrderID string
EventType string // "created", "paid", "shipped", "delivered"
Data map[string]any
Timestamp time.Time
}
type EventBus struct {
mu sync.RWMutex
subscribers map[string]*Subscription
closed bool
}
type Subscription struct {
ID string
Channel chan OrderEvent
Filter func(OrderEvent) bool
}
func NewEventBus() *EventBus {
return &EventBus{
subscribers: make(map[string]*Subscription),
}
}
func (eb *EventBus) Subscribe(id string, bufferSize int, filter func(OrderEvent) bool) (<-chan OrderEvent, error) {
eb.mu.Lock()
defer eb.mu.Unlock()
if eb.closed {
return nil, errors.New("event bus is closed")
}
if filter == nil {
filter = func(OrderEvent) bool { return true }
}
ch := make(chan OrderEvent, bufferSize)
eb.subscribers[id] = &Subscription{
ID: id,
Channel: ch,
Filter: filter,
}
return ch, nil
}
func (eb *EventBus) Publish(event OrderEvent) {
eb.mu.RLock()
defer eb.mu.RUnlock()
for _, sub := range eb.subscribers {
if !sub.Filter(event) {
continue
}
select {
case sub.Channel <- event:
default:
// Consider metrics here
}
}
}
func (eb *EventBus) Unsubscribe(id string) {
eb.mu.Lock()
defer eb.mu.Unlock()
if sub, exists := eb.subscribers[id]; exists {
close(sub.Channel)
delete(eb.subscribers, id)
}
}
func (eb *EventBus) Close() {
eb.mu.Lock()
defer eb.mu.Unlock()
eb.closed = true
for _, sub := range eb.subscribers {
close(sub.Channel)
}
eb.subscribers = make(map[string]*Subscription)
}
Here’s how consumers use it:
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
bus := NewEventBus()
defer bus.Close()
// Inventory service only cares about new orders
inventoryCh, _ := bus.Subscribe("inventory", 100, func(e OrderEvent) bool {
return e.EventType == "created"
})
// Shipping service cares about paid orders
shippingCh, _ := bus.Subscribe("shipping", 100, func(e OrderEvent) bool {
return e.EventType == "paid"
})
// Analytics wants everything
analyticsCh, _ := bus.Subscribe("analytics", 1000, nil)
// Start consumers
go runConsumer(ctx, "inventory", inventoryCh, handleInventory)
go runConsumer(ctx, "shipping", shippingCh, handleShipping)
go runConsumer(ctx, "analytics", analyticsCh, handleAnalytics)
// Publish events
bus.Publish(OrderEvent{
OrderID: "ORD-123",
EventType: "created",
Timestamp: time.Now(),
})
}
func runConsumer(ctx context.Context, name string, ch <-chan OrderEvent, handler func(OrderEvent)) {
for {
select {
case <-ctx.Done():
return
case event, ok := <-ch:
if !ok {
return
}
handler(event)
}
}
}
Best Practices and Pitfalls
Forgotten unsubscribes cause memory leaks and wasted CPU cycles. Always pair subscriptions with deferred unsubscriptions or use context cancellation to trigger cleanup.
Blocking receivers defeat the purpose of channel-based observers. If your handler does slow work (database writes, HTTP calls), process events in batches or spawn worker pools.
Premature channel closure causes panics. The subject owns the channels and should be the only entity closing them. Never close a channel from the receiving side.
Missing backpressure handling leads to unbounded memory growth. Monitor your buffer utilization. If subscribers consistently fall behind, you need faster consumers or explicit load shedding.
For production systems, add these capabilities:
- Metrics: Track events published, delivered, and dropped per subscriber
- Health checks: Expose subscriber lag as a health indicator
- Replay capability: Consider persistent event logs for crash recovery
- Testing: Use unbuffered channels in tests to catch race conditions
Channel-based observers align with Go’s philosophy of sharing memory by communicating. They’re not always the right choice—simple synchronous callbacks work fine for in-process, non-concurrent scenarios. But when you need concurrent event processing with clean separation between producers and consumers, channels provide an elegant, idiomatic solution.