Observer Pattern in Python: Pub/Sub Implementation
The Observer pattern solves a fundamental problem in software design: how do you notify multiple components about state changes without creating tight coupling between them? The answer is simple—you...
Key Insights
- The Observer pattern decouples event producers from consumers, letting you add new subscribers without modifying the publisher—essential for scalable, maintainable event-driven systems.
- Python’s dynamic nature enables elegant implementations using protocols, weak references, and decorators that feel more natural than traditional OOP approaches.
- A well-designed event bus can serve as the backbone for modular applications, but requires careful attention to memory management, thread safety, and avoiding notification cascades.
Introduction to the Observer Pattern
The Observer pattern solves a fundamental problem in software design: how do you notify multiple components about state changes without creating tight coupling between them? The answer is simple—you don’t let the publisher know anything about its subscribers beyond a common interface.
Think of it like a newsletter subscription. The publisher doesn’t need to know whether you’re a student, a CEO, or a bot scraping content. They just need your email address and a way to send you updates. You can subscribe, unsubscribe, and the publisher keeps doing its thing regardless.
This pattern shines in several scenarios:
- Event-driven systems: User actions, sensor readings, or external API responses that trigger multiple handlers
- UI updates: Model changes that need to reflect across multiple views
- Data synchronization: Cache invalidation, real-time dashboards, or distributed system coordination
- Plugin architectures: Core systems that emit events for extensions to hook into
The key insight is that the Subject (publisher) and Observers (subscribers) can evolve independently. Add a new display for your weather station? No changes to the sensor code required.
Core Components and Structure
The Observer pattern has two primary roles:
Subject (Publisher): Maintains a list of observers and provides methods to add, remove, and notify them. It knows nothing about what observers do with the notifications—only that they implement a specific interface.
Observer (Subscriber): Implements an update interface that the subject calls when state changes. Each observer decides independently how to react.
The relationship flows one direction: the subject holds references to observers and pushes updates to them. Observers typically don’t hold references back to the subject (though they can request the subject as a parameter during notification).
Here’s the foundational structure:
from abc import ABC, abstractmethod
from typing import Any
class Observer(ABC):
@abstractmethod
def update(self, subject: "Subject", *args: Any, **kwargs: Any) -> None:
"""Receive update from subject."""
pass
class Subject(ABC):
def __init__(self) -> None:
self._observers: list[Observer] = []
def attach(self, observer: Observer) -> None:
if observer not in self._observers:
self._observers.append(observer)
def detach(self, observer: Observer) -> None:
try:
self._observers.remove(observer)
except ValueError:
pass
def notify(self, *args: Any, **kwargs: Any) -> None:
for observer in self._observers:
observer.update(self, *args, **kwargs)
This abstract implementation establishes the contract. Concrete subjects inherit from Subject and call notify() when their state changes. Concrete observers inherit from Observer and implement update() to handle those notifications.
Basic Python Implementation
Let’s build a practical example: a weather station that broadcasts readings to multiple display units.
from dataclasses import dataclass
@dataclass
class WeatherData:
temperature: float
humidity: float
pressure: float
class WeatherStation(Subject):
def __init__(self) -> None:
super().__init__()
self._weather_data: WeatherData | None = None
@property
def weather_data(self) -> WeatherData | None:
return self._weather_data
def set_measurements(
self, temperature: float, humidity: float, pressure: float
) -> None:
self._weather_data = WeatherData(temperature, humidity, pressure)
self.notify()
class CurrentConditionsDisplay(Observer):
def __init__(self, name: str) -> None:
self.name = name
def update(self, subject: Subject, *args: Any, **kwargs: Any) -> None:
if isinstance(subject, WeatherStation) and subject.weather_data:
data = subject.weather_data
print(
f"[{self.name}] Current: {data.temperature}°F, "
f"{data.humidity}% humidity"
)
class StatisticsDisplay(Observer):
def __init__(self) -> None:
self._temperatures: list[float] = []
def update(self, subject: Subject, *args: Any, **kwargs: Any) -> None:
if isinstance(subject, WeatherStation) and subject.weather_data:
self._temperatures.append(subject.weather_data.temperature)
avg = sum(self._temperatures) / len(self._temperatures)
print(
f"[Statistics] Avg/Max/Min: {avg:.1f}/"
f"{max(self._temperatures):.1f}/{min(self._temperatures):.1f}"
)
# Usage
station = WeatherStation()
station.attach(CurrentConditionsDisplay("Living Room"))
station.attach(CurrentConditionsDisplay("Bedroom"))
station.attach(StatisticsDisplay())
station.set_measurements(80, 65, 30.4)
station.set_measurements(82, 70, 29.2)
Each display handles updates independently. The StatisticsDisplay accumulates history while CurrentConditionsDisplay just shows the latest reading. The weather station doesn’t care—it just broadcasts.
Pythonic Approaches and Variations
The abstract class approach works, but Python offers more flexible alternatives.
Using Protocols Instead of Abstract Classes
Protocols provide structural subtyping—if it has the right methods, it’s an observer:
from typing import Protocol, Any
class ObserverProtocol(Protocol):
def update(self, subject: Any, *args: Any, **kwargs: Any) -> None: ...
class FlexibleSubject:
def __init__(self) -> None:
self._observers: list[ObserverProtocol] = []
def attach(self, observer: ObserverProtocol) -> None:
self._observers.append(observer)
def notify(self, *args: Any, **kwargs: Any) -> None:
for observer in self._observers:
observer.update(self, *args, **kwargs)
Now any object with an update method works—no inheritance required.
Weak References to Prevent Memory Leaks
A common pitfall: observers that should be garbage collected stick around because the subject holds references to them. Weak references solve this:
import weakref
from typing import Callable, Any
class WeakSubject:
def __init__(self) -> None:
self._observers: list[weakref.ref[Any]] = []
def attach(self, observer: Any) -> None:
self._observers.append(weakref.ref(observer))
def notify(self, *args: Any, **kwargs: Any) -> None:
# Clean up dead references and notify live ones
live_observers = []
for ref in self._observers:
observer = ref()
if observer is not None:
observer.update(self, *args, **kwargs)
live_observers.append(ref)
self._observers = live_observers
Decorator-Based Subscription
For a more Pythonic feel, decorators can register handlers automatically:
from typing import Callable, Any
from collections import defaultdict
class EventEmitter:
def __init__(self) -> None:
self._handlers: dict[str, list[Callable[..., Any]]] = defaultdict(list)
def on(self, event: str) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
self._handlers[event].append(func)
return func
return decorator
def emit(self, event: str, *args: Any, **kwargs: Any) -> None:
for handler in self._handlers[event]:
handler(*args, **kwargs)
# Usage
emitter = EventEmitter()
@emitter.on("user_created")
def send_welcome_email(user: dict[str, Any]) -> None:
print(f"Sending welcome email to {user['email']}")
@emitter.on("user_created")
def initialize_preferences(user: dict[str, Any]) -> None:
print(f"Setting up default preferences for {user['name']}")
emitter.emit("user_created", {"name": "Alice", "email": "alice@example.com"})
This pattern is particularly clean for event-driven applications where you want handlers defined close to their logic rather than centralized registration code.
Real-World Application: Event Bus System
Let’s build a more sophisticated event bus suitable for a modular application or microservices simulation:
import asyncio
from dataclasses import dataclass, field
from typing import Callable, Any, Awaitable
from collections import defaultdict
from datetime import datetime
import weakref
@dataclass
class Event:
topic: str
payload: dict[str, Any]
timestamp: datetime = field(default_factory=datetime.now)
source: str = "unknown"
Handler = Callable[[Event], Awaitable[None]]
class EventBus:
def __init__(self) -> None:
self._handlers: dict[str, list[weakref.ref[Any]]] = defaultdict(list)
self._middleware: list[Callable[[Event], Event | None]] = []
def use(self, middleware: Callable[[Event], Event | None]) -> None:
"""Add middleware for event transformation/filtering."""
self._middleware.append(middleware)
def subscribe(
self, topic: str, handler: Handler, weak: bool = True
) -> Callable[[], None]:
"""Subscribe to a topic. Returns unsubscribe function."""
if weak:
ref = weakref.ref(handler)
else:
# Store in a way that won't be garbage collected
ref = lambda: handler # noqa: E731
self._handlers[topic].append(ref)
def unsubscribe() -> None:
try:
self._handlers[topic].remove(ref)
except ValueError:
pass
return unsubscribe
async def publish(self, event: Event) -> None:
"""Publish event to all subscribers of the topic."""
# Run through middleware
for mw in self._middleware:
result = mw(event)
if result is None:
return # Middleware filtered out the event
event = result
# Gather live handlers
handlers: list[Handler] = []
live_refs: list[weakref.ref[Any]] = []
for ref in self._handlers[event.topic]:
handler = ref()
if handler is not None:
handlers.append(handler)
live_refs.append(ref)
self._handlers[event.topic] = live_refs
# Execute handlers concurrently
if handlers:
await asyncio.gather(
*[h(event) for h in handlers],
return_exceptions=True
)
# Example usage
async def main() -> None:
bus = EventBus()
# Add logging middleware
def log_middleware(event: Event) -> Event:
print(f"[{event.timestamp}] {event.topic}: {event.payload}")
return event
bus.use(log_middleware)
# Define handlers
async def handle_order_created(event: Event) -> None:
await asyncio.sleep(0.1) # Simulate async work
print(f"Processing order: {event.payload['order_id']}")
async def handle_inventory_update(event: Event) -> None:
await asyncio.sleep(0.05)
print(f"Updating inventory for order: {event.payload['order_id']}")
# Subscribe
bus.subscribe("order.created", handle_order_created, weak=False)
bus.subscribe("order.created", handle_inventory_update, weak=False)
# Publish
await bus.publish(Event(
topic="order.created",
payload={"order_id": "12345", "items": ["widget", "gadget"]},
source="order-service"
))
asyncio.run(main())
This implementation includes middleware support for cross-cutting concerns like logging, async handler execution, and automatic cleanup of dead references.
Testing and Common Pitfalls
Testing observer-based systems requires isolating components and verifying interactions:
import pytest
from unittest.mock import Mock, AsyncMock
class TestEventBus:
@pytest.fixture
def bus(self) -> EventBus:
return EventBus()
@pytest.mark.asyncio
async def test_handler_receives_event(self, bus: EventBus) -> None:
handler = AsyncMock()
bus.subscribe("test.topic", handler, weak=False)
event = Event(topic="test.topic", payload={"key": "value"})
await bus.publish(event)
handler.assert_called_once_with(event)
@pytest.mark.asyncio
async def test_middleware_can_filter_events(self, bus: EventBus) -> None:
handler = AsyncMock()
bus.subscribe("test.topic", handler, weak=False)
# Middleware that blocks all events
bus.use(lambda e: None)
await bus.publish(Event(topic="test.topic", payload={}))
handler.assert_not_called()
Common pitfalls to avoid:
-
Notification storms: Observer A updates state that triggers Observer B, which triggers Observer A again. Use flags or queuing to break cycles.
-
Thread safety: If multiple threads publish or subscribe, protect your observer list with locks or use thread-safe collections.
-
Exception handling: One failing observer shouldn’t prevent others from receiving notifications. Catch exceptions in your notify loop.
-
Memory leaks: Always consider whether observers should use weak references, especially for long-lived subjects.
Conclusion and Alternatives
The Observer pattern provides a clean mechanism for loose coupling in event-driven systems. Python’s flexibility lets you implement it in ways that feel natural—from traditional class hierarchies to decorator-based approaches.
For production use, consider established libraries:
- blinker: Fast, simple signals library used by Flask
- PyPubSub: Full-featured pub/sub with topic hierarchies
- RxPY: Reactive extensions for complex event stream processing
When the Observer pattern feels too simple, look at Mediator (centralized coordination) or Event Sourcing (persistent event logs). But for most notification needs, a well-implemented observer system strikes the right balance between simplicity and power.