Observer Pattern in Go: Channel-Based Observers

The observer pattern establishes a one-to-many dependency between objects. When a subject changes state, all registered observers receive automatic notification. It's the backbone of event-driven...

Key Insights

  • Go’s channels provide a natural implementation of the observer pattern that eliminates tight coupling between subjects and observers while leveraging the language’s built-in concurrency primitives.
  • Buffered channels with non-blocking sends prevent slow observers from blocking the entire notification system, but require careful consideration of backpressure and dropped message handling.
  • Context-based cancellation and proper cleanup patterns are essential to prevent goroutine leaks—the most common pitfall when implementing channel-based observers in production systems.

Introduction to the Observer Pattern

The observer pattern establishes a one-to-many dependency between objects. When a subject changes state, all registered observers receive automatic notification. It’s the backbone of event-driven architectures, GUI frameworks, and reactive systems.

In most languages, you implement observers through interfaces and callback methods. The subject maintains a list of observer interfaces and iterates through them during notification. This works, but it creates coupling and synchronization challenges.

Go offers something better. Channels are first-class citizens designed for communication between concurrent processes. They naturally model the “publish-subscribe” relationship at the heart of the observer pattern. Instead of calling methods on observer objects, subjects send messages through channels. Observers receive these messages in their own goroutines, achieving true decoupling and concurrent processing without explicit thread management.

Traditional vs. Channel-Based Observers

Let’s start with the traditional interface-based approach to understand what we’re improving upon:

type Observer interface {
    OnEvent(event Event)
}

type Subject struct {
    mu        sync.RWMutex
    observers []Observer
}

func (s *Subject) Register(o Observer) {
    s.mu.Lock()
    defer s.mu.Unlock()
    s.observers = append(s.observers, o)
}

func (s *Subject) Notify(event Event) {
    s.mu.RLock()
    defer s.mu.RUnlock()
    for _, o := range s.observers {
        o.OnEvent(event) // Blocking call - slow observer blocks everyone
    }
}

This approach has problems. The Notify method blocks until every observer processes the event. A slow observer delays all subsequent notifications. You could spawn goroutines for each call, but then you lose ordering guarantees and complicate error handling.

The channel-based approach inverts the control flow. Observers own their processing loops and pull events from channels at their own pace:

Aspect Interface-Based Channel-Based
Coupling Observers must implement interface Observers only need a channel
Blocking Subject waits for observers Subject sends and continues
Concurrency Manual goroutine management Built into channel semantics
Type Safety Compile-time via interfaces Compile-time via typed channels
Backpressure Implicit (blocking) Explicit (buffered channels, drops)

Designing a Channel-Based Observer

Here’s a robust implementation of a channel-based subject:

type Event struct {
    Type    string
    Payload any
    Time    time.Time
}

type Subject struct {
    mu          sync.RWMutex
    subscribers map[string]chan Event
    closed      bool
}

func NewSubject() *Subject {
    return &Subject{
        subscribers: make(map[string]chan Event),
    }
}

func (s *Subject) Subscribe(id string, bufferSize int) (<-chan Event, error) {
    s.mu.Lock()
    defer s.mu.Unlock()

    if s.closed {
        return nil, errors.New("subject is closed")
    }

    if _, exists := s.subscribers[id]; exists {
        return nil, fmt.Errorf("subscriber %s already exists", id)
    }

    ch := make(chan Event, bufferSize)
    s.subscribers[id] = ch
    return ch, nil
}

func (s *Subject) Unsubscribe(id string) {
    s.mu.Lock()
    defer s.mu.Unlock()

    if ch, exists := s.subscribers[id]; exists {
        close(ch)
        delete(s.subscribers, id)
    }
}

func (s *Subject) Notify(event Event) {
    s.mu.RLock()
    defer s.mu.RUnlock()

    for id, ch := range s.subscribers {
        select {
        case ch <- event:
            // Sent successfully
        default:
            // Channel full, log or handle dropped event
            log.Printf("dropped event for subscriber %s: buffer full", id)
        }
    }
}

func (s *Subject) Close() {
    s.mu.Lock()
    defer s.mu.Unlock()

    s.closed = true
    for id, ch := range s.subscribers {
        close(ch)
        delete(s.subscribers, id)
    }
}

The key design decisions here: subscribers are identified by string IDs for easy management, channels are returned as receive-only to prevent subscribers from sending, and the Notify method uses non-blocking sends to prevent slow subscribers from affecting others.

Implementing Asynchronous Notifications

The non-blocking send pattern using select with a default case is crucial for production systems. But sometimes you want more nuanced behavior—perhaps a brief wait before dropping:

func (s *Subject) NotifyWithTimeout(event Event, timeout time.Duration) {
    s.mu.RLock()
    defer s.mu.RUnlock()

    var wg sync.WaitGroup
    for id, ch := range s.subscribers {
        wg.Add(1)
        go func(subscriberID string, subscriberCh chan Event) {
            defer wg.Done()
            select {
            case subscriberCh <- event:
                // Delivered
            case <-time.After(timeout):
                log.Printf("timeout delivering to %s", subscriberID)
            }
        }(id, ch)
    }
    wg.Wait() // Optional: wait for all delivery attempts
}

Buffered versus unbuffered channels represent a fundamental trade-off. Unbuffered channels provide backpressure—the subject knows immediately if an observer can’t keep up. Buffered channels absorb temporary bursts but can mask problems until the buffer fills. For most event systems, I recommend small buffers (10-100 events) with monitoring on buffer utilization.

Graceful Shutdown and Resource Cleanup

Goroutine leaks are the silent killer of Go applications. Every observer loop must have a clear termination path. Context cancellation provides the cleanest approach:

type Observer struct {
    id     string
    events <-chan Event
}

func (o *Observer) Run(ctx context.Context, handler func(Event)) {
    for {
        select {
        case <-ctx.Done():
            log.Printf("observer %s shutting down: %v", o.id, ctx.Err())
            // Drain remaining events if needed
            for {
                select {
                case event, ok := <-o.events:
                    if !ok {
                        return
                    }
                    handler(event) // Process remaining buffered events
                default:
                    return
                }
            }
        case event, ok := <-o.events:
            if !ok {
                log.Printf("observer %s: channel closed", o.id)
                return
            }
            handler(event)
        }
    }
}

The drain loop after context cancellation is optional but important for systems where you can’t afford to lose events. It processes any remaining buffered events before the observer terminates.

Practical Example: Real-Time Event Bus

Let’s build a complete event bus for an order processing system:

type OrderEvent struct {
    OrderID   string
    EventType string // "created", "paid", "shipped", "delivered"
    Data      map[string]any
    Timestamp time.Time
}

type EventBus struct {
    mu          sync.RWMutex
    subscribers map[string]*Subscription
    closed      bool
}

type Subscription struct {
    ID       string
    Channel  chan OrderEvent
    Filter   func(OrderEvent) bool
}

func NewEventBus() *EventBus {
    return &EventBus{
        subscribers: make(map[string]*Subscription),
    }
}

func (eb *EventBus) Subscribe(id string, bufferSize int, filter func(OrderEvent) bool) (<-chan OrderEvent, error) {
    eb.mu.Lock()
    defer eb.mu.Unlock()

    if eb.closed {
        return nil, errors.New("event bus is closed")
    }

    if filter == nil {
        filter = func(OrderEvent) bool { return true }
    }

    ch := make(chan OrderEvent, bufferSize)
    eb.subscribers[id] = &Subscription{
        ID:      id,
        Channel: ch,
        Filter:  filter,
    }
    return ch, nil
}

func (eb *EventBus) Publish(event OrderEvent) {
    eb.mu.RLock()
    defer eb.mu.RUnlock()

    for _, sub := range eb.subscribers {
        if !sub.Filter(event) {
            continue
        }
        select {
        case sub.Channel <- event:
        default:
            // Consider metrics here
        }
    }
}

func (eb *EventBus) Unsubscribe(id string) {
    eb.mu.Lock()
    defer eb.mu.Unlock()

    if sub, exists := eb.subscribers[id]; exists {
        close(sub.Channel)
        delete(eb.subscribers, id)
    }
}

func (eb *EventBus) Close() {
    eb.mu.Lock()
    defer eb.mu.Unlock()

    eb.closed = true
    for _, sub := range eb.subscribers {
        close(sub.Channel)
    }
    eb.subscribers = make(map[string]*Subscription)
}

Here’s how consumers use it:

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    bus := NewEventBus()
    defer bus.Close()

    // Inventory service only cares about new orders
    inventoryCh, _ := bus.Subscribe("inventory", 100, func(e OrderEvent) bool {
        return e.EventType == "created"
    })

    // Shipping service cares about paid orders
    shippingCh, _ := bus.Subscribe("shipping", 100, func(e OrderEvent) bool {
        return e.EventType == "paid"
    })

    // Analytics wants everything
    analyticsCh, _ := bus.Subscribe("analytics", 1000, nil)

    // Start consumers
    go runConsumer(ctx, "inventory", inventoryCh, handleInventory)
    go runConsumer(ctx, "shipping", shippingCh, handleShipping)
    go runConsumer(ctx, "analytics", analyticsCh, handleAnalytics)

    // Publish events
    bus.Publish(OrderEvent{
        OrderID:   "ORD-123",
        EventType: "created",
        Timestamp: time.Now(),
    })
}

func runConsumer(ctx context.Context, name string, ch <-chan OrderEvent, handler func(OrderEvent)) {
    for {
        select {
        case <-ctx.Done():
            return
        case event, ok := <-ch:
            if !ok {
                return
            }
            handler(event)
        }
    }
}

Best Practices and Pitfalls

Forgotten unsubscribes cause memory leaks and wasted CPU cycles. Always pair subscriptions with deferred unsubscriptions or use context cancellation to trigger cleanup.

Blocking receivers defeat the purpose of channel-based observers. If your handler does slow work (database writes, HTTP calls), process events in batches or spawn worker pools.

Premature channel closure causes panics. The subject owns the channels and should be the only entity closing them. Never close a channel from the receiving side.

Missing backpressure handling leads to unbounded memory growth. Monitor your buffer utilization. If subscribers consistently fall behind, you need faster consumers or explicit load shedding.

For production systems, add these capabilities:

  • Metrics: Track events published, delivered, and dropped per subscriber
  • Health checks: Expose subscriber lag as a health indicator
  • Replay capability: Consider persistent event logs for crash recovery
  • Testing: Use unbuffered channels in tests to catch race conditions

Channel-based observers align with Go’s philosophy of sharing memory by communicating. They’re not always the right choice—simple synchronous callbacks work fine for in-process, non-concurrent scenarios. But when you need concurrent event processing with clean separation between producers and consumers, channels provide an elegant, idiomatic solution.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.