Observer Pattern in Python: Pub/Sub Implementation

The Observer pattern solves a fundamental problem in software design: how do you notify multiple components about state changes without creating tight coupling between them? The answer is simple—you...

Key Insights

  • The Observer pattern decouples event producers from consumers, letting you add new subscribers without modifying the publisher—essential for scalable, maintainable event-driven systems.
  • Python’s dynamic nature enables elegant implementations using protocols, weak references, and decorators that feel more natural than traditional OOP approaches.
  • A well-designed event bus can serve as the backbone for modular applications, but requires careful attention to memory management, thread safety, and avoiding notification cascades.

Introduction to the Observer Pattern

The Observer pattern solves a fundamental problem in software design: how do you notify multiple components about state changes without creating tight coupling between them? The answer is simple—you don’t let the publisher know anything about its subscribers beyond a common interface.

Think of it like a newsletter subscription. The publisher doesn’t need to know whether you’re a student, a CEO, or a bot scraping content. They just need your email address and a way to send you updates. You can subscribe, unsubscribe, and the publisher keeps doing its thing regardless.

This pattern shines in several scenarios:

  • Event-driven systems: User actions, sensor readings, or external API responses that trigger multiple handlers
  • UI updates: Model changes that need to reflect across multiple views
  • Data synchronization: Cache invalidation, real-time dashboards, or distributed system coordination
  • Plugin architectures: Core systems that emit events for extensions to hook into

The key insight is that the Subject (publisher) and Observers (subscribers) can evolve independently. Add a new display for your weather station? No changes to the sensor code required.

Core Components and Structure

The Observer pattern has two primary roles:

Subject (Publisher): Maintains a list of observers and provides methods to add, remove, and notify them. It knows nothing about what observers do with the notifications—only that they implement a specific interface.

Observer (Subscriber): Implements an update interface that the subject calls when state changes. Each observer decides independently how to react.

The relationship flows one direction: the subject holds references to observers and pushes updates to them. Observers typically don’t hold references back to the subject (though they can request the subject as a parameter during notification).

Here’s the foundational structure:

from abc import ABC, abstractmethod
from typing import Any


class Observer(ABC):
    @abstractmethod
    def update(self, subject: "Subject", *args: Any, **kwargs: Any) -> None:
        """Receive update from subject."""
        pass


class Subject(ABC):
    def __init__(self) -> None:
        self._observers: list[Observer] = []

    def attach(self, observer: Observer) -> None:
        if observer not in self._observers:
            self._observers.append(observer)

    def detach(self, observer: Observer) -> None:
        try:
            self._observers.remove(observer)
        except ValueError:
            pass

    def notify(self, *args: Any, **kwargs: Any) -> None:
        for observer in self._observers:
            observer.update(self, *args, **kwargs)

This abstract implementation establishes the contract. Concrete subjects inherit from Subject and call notify() when their state changes. Concrete observers inherit from Observer and implement update() to handle those notifications.

Basic Python Implementation

Let’s build a practical example: a weather station that broadcasts readings to multiple display units.

from dataclasses import dataclass


@dataclass
class WeatherData:
    temperature: float
    humidity: float
    pressure: float


class WeatherStation(Subject):
    def __init__(self) -> None:
        super().__init__()
        self._weather_data: WeatherData | None = None

    @property
    def weather_data(self) -> WeatherData | None:
        return self._weather_data

    def set_measurements(
        self, temperature: float, humidity: float, pressure: float
    ) -> None:
        self._weather_data = WeatherData(temperature, humidity, pressure)
        self.notify()


class CurrentConditionsDisplay(Observer):
    def __init__(self, name: str) -> None:
        self.name = name

    def update(self, subject: Subject, *args: Any, **kwargs: Any) -> None:
        if isinstance(subject, WeatherStation) and subject.weather_data:
            data = subject.weather_data
            print(
                f"[{self.name}] Current: {data.temperature}°F, "
                f"{data.humidity}% humidity"
            )


class StatisticsDisplay(Observer):
    def __init__(self) -> None:
        self._temperatures: list[float] = []

    def update(self, subject: Subject, *args: Any, **kwargs: Any) -> None:
        if isinstance(subject, WeatherStation) and subject.weather_data:
            self._temperatures.append(subject.weather_data.temperature)
            avg = sum(self._temperatures) / len(self._temperatures)
            print(
                f"[Statistics] Avg/Max/Min: {avg:.1f}/"
                f"{max(self._temperatures):.1f}/{min(self._temperatures):.1f}"
            )


# Usage
station = WeatherStation()
station.attach(CurrentConditionsDisplay("Living Room"))
station.attach(CurrentConditionsDisplay("Bedroom"))
station.attach(StatisticsDisplay())

station.set_measurements(80, 65, 30.4)
station.set_measurements(82, 70, 29.2)

Each display handles updates independently. The StatisticsDisplay accumulates history while CurrentConditionsDisplay just shows the latest reading. The weather station doesn’t care—it just broadcasts.

Pythonic Approaches and Variations

The abstract class approach works, but Python offers more flexible alternatives.

Using Protocols Instead of Abstract Classes

Protocols provide structural subtyping—if it has the right methods, it’s an observer:

from typing import Protocol, Any


class ObserverProtocol(Protocol):
    def update(self, subject: Any, *args: Any, **kwargs: Any) -> None: ...


class FlexibleSubject:
    def __init__(self) -> None:
        self._observers: list[ObserverProtocol] = []

    def attach(self, observer: ObserverProtocol) -> None:
        self._observers.append(observer)

    def notify(self, *args: Any, **kwargs: Any) -> None:
        for observer in self._observers:
            observer.update(self, *args, **kwargs)

Now any object with an update method works—no inheritance required.

Weak References to Prevent Memory Leaks

A common pitfall: observers that should be garbage collected stick around because the subject holds references to them. Weak references solve this:

import weakref
from typing import Callable, Any


class WeakSubject:
    def __init__(self) -> None:
        self._observers: list[weakref.ref[Any]] = []

    def attach(self, observer: Any) -> None:
        self._observers.append(weakref.ref(observer))

    def notify(self, *args: Any, **kwargs: Any) -> None:
        # Clean up dead references and notify live ones
        live_observers = []
        for ref in self._observers:
            observer = ref()
            if observer is not None:
                observer.update(self, *args, **kwargs)
                live_observers.append(ref)
        self._observers = live_observers

Decorator-Based Subscription

For a more Pythonic feel, decorators can register handlers automatically:

from typing import Callable, Any
from collections import defaultdict


class EventEmitter:
    def __init__(self) -> None:
        self._handlers: dict[str, list[Callable[..., Any]]] = defaultdict(list)

    def on(self, event: str) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
        def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
            self._handlers[event].append(func)
            return func
        return decorator

    def emit(self, event: str, *args: Any, **kwargs: Any) -> None:
        for handler in self._handlers[event]:
            handler(*args, **kwargs)


# Usage
emitter = EventEmitter()


@emitter.on("user_created")
def send_welcome_email(user: dict[str, Any]) -> None:
    print(f"Sending welcome email to {user['email']}")


@emitter.on("user_created")
def initialize_preferences(user: dict[str, Any]) -> None:
    print(f"Setting up default preferences for {user['name']}")


emitter.emit("user_created", {"name": "Alice", "email": "alice@example.com"})

This pattern is particularly clean for event-driven applications where you want handlers defined close to their logic rather than centralized registration code.

Real-World Application: Event Bus System

Let’s build a more sophisticated event bus suitable for a modular application or microservices simulation:

import asyncio
from dataclasses import dataclass, field
from typing import Callable, Any, Awaitable
from collections import defaultdict
from datetime import datetime
import weakref


@dataclass
class Event:
    topic: str
    payload: dict[str, Any]
    timestamp: datetime = field(default_factory=datetime.now)
    source: str = "unknown"


Handler = Callable[[Event], Awaitable[None]]


class EventBus:
    def __init__(self) -> None:
        self._handlers: dict[str, list[weakref.ref[Any]]] = defaultdict(list)
        self._middleware: list[Callable[[Event], Event | None]] = []

    def use(self, middleware: Callable[[Event], Event | None]) -> None:
        """Add middleware for event transformation/filtering."""
        self._middleware.append(middleware)

    def subscribe(
        self, topic: str, handler: Handler, weak: bool = True
    ) -> Callable[[], None]:
        """Subscribe to a topic. Returns unsubscribe function."""
        if weak:
            ref = weakref.ref(handler)
        else:
            # Store in a way that won't be garbage collected
            ref = lambda: handler  # noqa: E731

        self._handlers[topic].append(ref)

        def unsubscribe() -> None:
            try:
                self._handlers[topic].remove(ref)
            except ValueError:
                pass

        return unsubscribe

    async def publish(self, event: Event) -> None:
        """Publish event to all subscribers of the topic."""
        # Run through middleware
        for mw in self._middleware:
            result = mw(event)
            if result is None:
                return  # Middleware filtered out the event
            event = result

        # Gather live handlers
        handlers: list[Handler] = []
        live_refs: list[weakref.ref[Any]] = []

        for ref in self._handlers[event.topic]:
            handler = ref()
            if handler is not None:
                handlers.append(handler)
                live_refs.append(ref)

        self._handlers[event.topic] = live_refs

        # Execute handlers concurrently
        if handlers:
            await asyncio.gather(
                *[h(event) for h in handlers],
                return_exceptions=True
            )


# Example usage
async def main() -> None:
    bus = EventBus()

    # Add logging middleware
    def log_middleware(event: Event) -> Event:
        print(f"[{event.timestamp}] {event.topic}: {event.payload}")
        return event

    bus.use(log_middleware)

    # Define handlers
    async def handle_order_created(event: Event) -> None:
        await asyncio.sleep(0.1)  # Simulate async work
        print(f"Processing order: {event.payload['order_id']}")

    async def handle_inventory_update(event: Event) -> None:
        await asyncio.sleep(0.05)
        print(f"Updating inventory for order: {event.payload['order_id']}")

    # Subscribe
    bus.subscribe("order.created", handle_order_created, weak=False)
    bus.subscribe("order.created", handle_inventory_update, weak=False)

    # Publish
    await bus.publish(Event(
        topic="order.created",
        payload={"order_id": "12345", "items": ["widget", "gadget"]},
        source="order-service"
    ))


asyncio.run(main())

This implementation includes middleware support for cross-cutting concerns like logging, async handler execution, and automatic cleanup of dead references.

Testing and Common Pitfalls

Testing observer-based systems requires isolating components and verifying interactions:

import pytest
from unittest.mock import Mock, AsyncMock


class TestEventBus:
    @pytest.fixture
    def bus(self) -> EventBus:
        return EventBus()

    @pytest.mark.asyncio
    async def test_handler_receives_event(self, bus: EventBus) -> None:
        handler = AsyncMock()
        bus.subscribe("test.topic", handler, weak=False)

        event = Event(topic="test.topic", payload={"key": "value"})
        await bus.publish(event)

        handler.assert_called_once_with(event)

    @pytest.mark.asyncio
    async def test_middleware_can_filter_events(self, bus: EventBus) -> None:
        handler = AsyncMock()
        bus.subscribe("test.topic", handler, weak=False)

        # Middleware that blocks all events
        bus.use(lambda e: None)

        await bus.publish(Event(topic="test.topic", payload={}))

        handler.assert_not_called()

Common pitfalls to avoid:

  1. Notification storms: Observer A updates state that triggers Observer B, which triggers Observer A again. Use flags or queuing to break cycles.

  2. Thread safety: If multiple threads publish or subscribe, protect your observer list with locks or use thread-safe collections.

  3. Exception handling: One failing observer shouldn’t prevent others from receiving notifications. Catch exceptions in your notify loop.

  4. Memory leaks: Always consider whether observers should use weak references, especially for long-lived subjects.

Conclusion and Alternatives

The Observer pattern provides a clean mechanism for loose coupling in event-driven systems. Python’s flexibility lets you implement it in ways that feel natural—from traditional class hierarchies to decorator-based approaches.

For production use, consider established libraries:

  • blinker: Fast, simple signals library used by Flask
  • PyPubSub: Full-featured pub/sub with topic hierarchies
  • RxPY: Reactive extensions for complex event stream processing

When the Observer pattern feels too simple, look at Mediator (centralized coordination) or Event Sourcing (persistent event logs). But for most notification needs, a well-implemented observer system strikes the right balance between simplicity and power.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.