Pub/Sub: Publish-Subscribe Architecture

The publish-subscribe pattern fundamentally changes how services communicate. Instead of Service A calling Service B directly (request-response), Service A publishes a message to a topic, and any...

Key Insights

  • Pub/Sub decouples message producers from consumers through an intermediary broker, enabling systems to scale independently and evolve without tight coordination between services.
  • Choosing the right delivery semantics (at-least-once vs. exactly-once) and handling idempotency correctly are the decisions that will make or break your pub/sub implementation in production.
  • Pub/Sub excels at fan-out scenarios and event-driven architectures but introduces complexity around message ordering, debugging, and eventual consistency that you must plan for upfront.

Introduction to Pub/Sub

The publish-subscribe pattern fundamentally changes how services communicate. Instead of Service A calling Service B directly (request-response), Service A publishes a message to a topic, and any interested services subscribe to receive it. The publisher doesn’t know or care who’s listening.

This indirection might seem like unnecessary complexity, but it solves real problems. When your order service needs to notify inventory, shipping, analytics, and email services about a new order, direct calls create a tangled web of dependencies. With pub/sub, the order service publishes once, and each downstream service subscribes independently.

The broker sitting in the middle handles routing, buffering, and delivery. Publishers and subscribers never communicate directly—they only interact with the broker. This architectural boundary enables teams to deploy, scale, and modify their services without coordinating with every other team that touches their data.

Core Components & Terminology

Understanding pub/sub requires clarity on five core concepts:

Publishers produce messages. They don’t know who will receive them or when. A publisher’s only job is to send well-formed messages to the correct topic.

Subscribers consume messages. They register interest in specific topics and process incoming messages. A subscriber might be a single service instance or a consumer group sharing the load.

Topics (or channels) are named destinations for messages. They’re the addressing mechanism—publishers send to topics, subscribers listen to topics.

Messages are the payload. They typically contain structured data (JSON, Protocol Buffers, Avro) plus metadata like timestamps, correlation IDs, and message types.

The broker is the infrastructure that connects everything. It receives messages from publishers, stores them (temporarily or durably), and delivers them to subscribers.

Here’s how these concepts translate to code:

interface Message<T = unknown> {
  id: string;
  topic: string;
  payload: T;
  timestamp: Date;
  metadata: Record<string, string>;
}

interface Topic {
  name: string;
  subscribers: Set<Subscriber>;
}

interface Publisher {
  publish<T>(topic: string, payload: T): Promise<void>;
}

interface Subscriber {
  id: string;
  onMessage<T>(message: Message<T>): Promise<void>;
}

// Example message
const orderCreatedMessage: Message<OrderCreatedEvent> = {
  id: "msg-abc123",
  topic: "orders.created",
  payload: {
    orderId: "order-456",
    customerId: "cust-789",
    total: 99.99,
  },
  timestamp: new Date(),
  metadata: {
    correlationId: "req-xyz",
    source: "order-service",
  },
};

How Pub/Sub Works

The message lifecycle follows a predictable flow:

  1. Publishing: A service creates a message and sends it to the broker, targeting a specific topic.
  2. Routing: The broker determines which subscribers should receive the message based on their subscriptions.
  3. Delivery: The broker sends the message to each matched subscriber.
  4. Acknowledgment: Subscribers confirm successful processing, allowing the broker to mark the message as delivered.

Delivery models vary. Push-based systems send messages to subscribers as they arrive—the broker initiates the transfer. Pull-based systems let subscribers request messages when they’re ready—the subscriber initiates. Kafka uses pull; most cloud pub/sub services use push with webhooks or long-polling.

Here’s a minimal in-memory pub/sub implementation:

from dataclasses import dataclass, field
from typing import Callable, Dict, List, Any
from datetime import datetime
import uuid

@dataclass
class Message:
    topic: str
    payload: Any
    id: str = field(default_factory=lambda: str(uuid.uuid4()))
    timestamp: datetime = field(default_factory=datetime.utcnow)

MessageHandler = Callable[[Message], None]

class PubSubBroker:
    def __init__(self):
        self._subscriptions: Dict[str, List[MessageHandler]] = {}
    
    def subscribe(self, topic: str, handler: MessageHandler) -> None:
        if topic not in self._subscriptions:
            self._subscriptions[topic] = []
        self._subscriptions[topic].append(handler)
        print(f"Subscribed to '{topic}'")
    
    def publish(self, topic: str, payload: Any) -> None:
        message = Message(topic=topic, payload=payload)
        handlers = self._subscriptions.get(topic, [])
        
        print(f"Publishing to '{topic}': {len(handlers)} subscribers")
        for handler in handlers:
            try:
                handler(message)
            except Exception as e:
                print(f"Handler failed: {e}")
    
    def unsubscribe(self, topic: str, handler: MessageHandler) -> None:
        if topic in self._subscriptions:
            self._subscriptions[topic].remove(handler)

# Usage
broker = PubSubBroker()

def inventory_handler(msg: Message):
    print(f"Inventory updating for order: {msg.payload['order_id']}")

def email_handler(msg: Message):
    print(f"Sending confirmation email for: {msg.payload['order_id']}")

broker.subscribe("orders.created", inventory_handler)
broker.subscribe("orders.created", email_handler)

broker.publish("orders.created", {"order_id": "12345", "total": 99.99})

Common Patterns & Variations

Fan-out is the classic pub/sub pattern: one message goes to many subscribers. When an order is created, inventory, shipping, billing, and analytics all receive the same event independently.

Fan-in aggregates messages from multiple publishers into a single processing pipeline. Multiple microservices publish audit logs to a central topic; a single logging service consumes them all.

Topic hierarchies add flexibility through wildcard subscriptions. Instead of subscribing to each order event type separately, you subscribe to orders.* and receive all order-related events:

import re
from typing import Dict, List, Set

class HierarchicalPubSub:
    def __init__(self):
        self._exact_subscriptions: Dict[str, List[MessageHandler]] = {}
        self._wildcard_subscriptions: Dict[str, List[MessageHandler]] = {}
    
    def subscribe(self, pattern: str, handler: MessageHandler) -> None:
        if '*' in pattern or '#' in pattern:
            self._wildcard_subscriptions.setdefault(pattern, []).append(handler)
        else:
            self._exact_subscriptions.setdefault(pattern, []).append(handler)
    
    def _matches_pattern(self, topic: str, pattern: str) -> bool:
        # Convert pattern to regex
        # '*' matches one segment, '#' matches zero or more segments
        regex_pattern = pattern.replace('.', r'\.')
        regex_pattern = regex_pattern.replace('*', r'[^.]+')
        regex_pattern = regex_pattern.replace('#', r'.*')
        return bool(re.fullmatch(regex_pattern, topic))
    
    def _get_handlers(self, topic: str) -> List[MessageHandler]:
        handlers = list(self._exact_subscriptions.get(topic, []))
        
        for pattern, pattern_handlers in self._wildcard_subscriptions.items():
            if self._matches_pattern(topic, pattern):
                handlers.extend(pattern_handlers)
        
        return handlers
    
    def publish(self, topic: str, payload: Any) -> None:
        message = Message(topic=topic, payload=payload)
        for handler in self._get_handlers(topic):
            handler(message)

# Usage
broker = HierarchicalPubSub()

def order_audit(msg: Message):
    print(f"Audit log: {msg.topic} - {msg.payload}")

broker.subscribe("orders.*", order_audit)  # Matches orders.created, orders.cancelled
broker.publish("orders.created", {"order_id": "123"})
broker.publish("orders.cancelled", {"order_id": "456"})

Delivery semantics matter enormously:

  • At-most-once: Fire and forget. Fast but messages can be lost.
  • At-least-once: Retry until acknowledged. Messages may be delivered multiple times.
  • Exactly-once: Guaranteed single delivery. Expensive and complex to implement correctly.

Most systems use at-least-once delivery, which means your subscribers must handle duplicates gracefully.

Real-World Implementations

Redis Pub/Sub is simple and fast but ephemeral—messages are lost if no subscriber is listening. Good for real-time notifications where missing a message is acceptable.

Apache Kafka is a distributed log, not traditional pub/sub. Messages persist and can be replayed. Consumer groups enable parallel processing. Excellent for high-throughput event streaming but operationally complex.

RabbitMQ offers flexible routing with exchanges and queues. Supports multiple messaging patterns beyond pub/sub. Good middle ground for many use cases.

AWS SNS/SQS combines pub/sub (SNS) with durable queuing (SQS). SNS fans out to multiple SQS queues, giving each subscriber independent, durable message storage.

Google Cloud Pub/Sub provides fully managed pub/sub with at-least-once delivery, automatic scaling, and global availability.

Here’s a practical example using Redis:

import redis
import json
import threading

class RedisPubSub:
    def __init__(self, host='localhost', port=6379):
        self._redis = redis.Redis(host=host, port=port, decode_responses=True)
        self._pubsub = self._redis.pubsub()
    
    def publish(self, topic: str, payload: dict) -> None:
        message = json.dumps(payload)
        self._redis.publish(topic, message)
    
    def subscribe(self, topic: str, handler: Callable[[dict], None]) -> None:
        def message_handler(message):
            if message['type'] == 'message':
                payload = json.loads(message['data'])
                handler(payload)
        
        self._pubsub.subscribe(**{topic: message_handler})
        
        # Run listener in background thread
        thread = threading.Thread(target=self._pubsub.run_in_thread, daemon=True)
        thread.start()

# Usage
pubsub = RedisPubSub()

def handle_order(payload):
    print(f"Processing order: {payload}")

pubsub.subscribe("orders", handle_order)
pubsub.publish("orders", {"order_id": "789", "status": "created"})

When to Use (and When Not To)

Pub/sub shines in these scenarios:

  • Event-driven architectures: Services react to events rather than being directly invoked.
  • Microservices decoupling: Teams deploy independently without coordinating API changes.
  • Real-time notifications: Push updates to many clients simultaneously.
  • Audit logging: Capture events for compliance without slowing down primary operations.

Avoid pub/sub when:

  • Strict ordering is required: Most pub/sub systems don’t guarantee order across partitions.
  • You need synchronous responses: Pub/sub is inherently asynchronous.
  • Simple point-to-point communication: Direct calls are simpler when you have one producer and one consumer.
  • Transactions span multiple services: Eventual consistency is baked into the model.

Production Considerations

Dead letter queues catch messages that fail processing repeatedly. Without them, poison messages can block your entire pipeline. Configure a DLQ for every subscription and monitor it.

Idempotency is non-negotiable with at-least-once delivery. Use message IDs to deduplicate:

class IdempotentHandler:
    def __init__(self, redis_client: redis.Redis, ttl_seconds: int = 3600):
        self._redis = redis_client
        self._ttl = ttl_seconds
    
    def handle_once(self, message_id: str, handler: Callable[[], None]) -> bool:
        """Execute handler only if message_id hasn't been processed."""
        key = f"processed:{message_id}"
        
        # SET NX returns True only if key didn't exist
        if self._redis.set(key, "1", nx=True, ex=self._ttl):
            try:
                handler()
                return True
            except Exception:
                # Delete key so message can be retried
                self._redis.delete(key)
                raise
        
        print(f"Skipping duplicate message: {message_id}")
        return False

# Usage
idempotent = IdempotentHandler(redis.Redis())

def process_order(msg: Message):
    def do_work():
        print(f"Processing {msg.payload['order_id']}")
    
    idempotent.handle_once(msg.id, do_work)

Monitoring must cover message rates, consumer lag, error rates, and DLQ depth. Alert on growing lag—it means consumers can’t keep up.

Schema evolution becomes critical as your system grows. Use a schema registry (Confluent Schema Registry, AWS Glue) to manage message format changes without breaking consumers.

Pub/sub is powerful infrastructure, but it shifts complexity rather than eliminating it. Plan for failure modes, build in observability from day one, and choose delivery semantics deliberately. The decoupling benefits are real—but only if you handle the distributed systems challenges that come with them.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.