Design a Notification Service: Push, Email, SMS

A notification service is the backbone of user communication in modern applications. It's responsible for delivering the right message, through the right channel, at the right time. Get it wrong, and...

Key Insights

  • A notification service must be asynchronous by design—synchronous delivery blocks your application and creates cascading failures when providers experience latency or outages.
  • The adapter pattern is non-negotiable for channel providers; you will switch vendors, and your core business logic shouldn’t know or care about the difference between Twilio and SNS.
  • Idempotency isn’t optional—without it, a retry after a network timeout means your user gets the same “Your order shipped!” email three times.

Introduction & Requirements

A notification service is the backbone of user communication in modern applications. It’s responsible for delivering the right message, through the right channel, at the right time. Get it wrong, and you annoy users with duplicates, miss critical alerts, or burn money on failed deliveries.

Functional requirements:

  • Multi-channel delivery (push, email, SMS)
  • Template management with variable substitution
  • Scheduling for delayed or recurring notifications
  • User preference management (opt-outs, channel preferences)
  • Delivery tracking and status updates

Non-functional requirements:

  • Scalability to handle millions of notifications per day
  • 99.9% delivery reliability (for messages that reach providers)
  • Sub-second latency for time-sensitive notifications
  • At-least-once delivery with idempotency guarantees
  • Provider failover without manual intervention

High-Level Architecture

The architecture follows a standard event-driven pattern with clear separation of concerns. Here’s how the components interact:

┌─────────────┐     ┌──────────────────┐     ┌─────────────────┐
│   Client    │────▶│   API Gateway    │────▶│  Notification   │
│  Services   │     │                  │     │    Service      │
└─────────────┘     └──────────────────┘     └────────┬────────┘
                                             ┌────────────────┐
                                             │  Message Queue │
                                             │  (per channel) │
                                             └───────┬────────┘
                           ┌────────────────────────┼────────────────────────┐
                           ▼                        ▼                        ▼
                    ┌─────────────┐          ┌─────────────┐          ┌─────────────┐
                    │ Push Worker │          │Email Worker │          │ SMS Worker  │
                    └──────┬──────┘          └──────┬──────┘          └──────┬──────┘
                           ▼                        ▼                        ▼
                    ┌─────────────┐          ┌─────────────┐          ┌─────────────┐
                    │  FCM/APNs   │          │SendGrid/SES │          │ Twilio/SNS  │
                    └─────────────┘          └─────────────┘          └─────────────┘

The Notification Service validates requests, checks user preferences, renders templates, and enqueues messages. Workers consume from channel-specific queues and handle provider communication.

// Core service interface
type NotificationService interface {
    Send(ctx context.Context, req NotificationRequest) (string, error)
    GetStatus(ctx context.Context, notificationID string) (DeliveryStatus, error)
    Schedule(ctx context.Context, req NotificationRequest, sendAt time.Time) (string, error)
}

type NotificationRequest struct {
    UserID       string                 `json:"user_id"`
    Channels     []Channel              `json:"channels"` // PUSH, EMAIL, SMS
    TemplateID   string                 `json:"template_id"`
    Variables    map[string]interface{} `json:"variables"`
    Priority     Priority               `json:"priority"` // HIGH, NORMAL, LOW
    IdempotencyKey string               `json:"idempotency_key"`
}

type Channel string
const (
    ChannelPush  Channel = "PUSH"
    ChannelEmail Channel = "EMAIL"
    ChannelSMS   Channel = "SMS"
)

Message Queue & Async Processing

Synchronous notification delivery is a recipe for disaster. When Twilio has a 30-second timeout, your API shouldn’t hang. When you need to send 100,000 promotional emails, you shouldn’t block your web servers.

I recommend queue-per-channel over a single queue with routing. Here’s why:

  1. Independent scaling: SMS might need 10 workers during a flash sale, while push needs 2.
  2. Isolation: A slow email provider doesn’t block time-sensitive push notifications.
  3. Priority handling: Each channel queue can have priority lanes.
import json
from dataclasses import dataclass
from typing import Optional
import pika
from tenacity import retry, stop_after_attempt, wait_exponential

@dataclass
class NotificationMessage:
    notification_id: str
    user_id: str
    channel: str
    template_id: str
    variables: dict
    attempt: int = 0
    max_attempts: int = 3

class NotificationProducer:
    def __init__(self, connection: pika.BlockingConnection):
        self.channel = connection.channel()
        # Declare queues for each notification channel
        for queue in ['notifications.push', 'notifications.email', 'notifications.sms']:
            self.channel.queue_declare(queue=queue, durable=True)
            # Dead letter queue for failed messages
            self.channel.queue_declare(queue=f'{queue}.dlq', durable=True)
    
    def enqueue(self, message: NotificationMessage):
        queue_name = f'notifications.{message.channel.lower()}'
        self.channel.basic_publish(
            exchange='',
            routing_key=queue_name,
            body=json.dumps(message.__dict__),
            properties=pika.BasicProperties(
                delivery_mode=2,  # Persistent
                headers={'x-attempt': message.attempt}
            )
        )

class NotificationConsumer:
    def __init__(self, channel: str, sender: 'NotificationSender'):
        self.queue_name = f'notifications.{channel}'
        self.sender = sender
    
    def process_message(self, ch, method, properties, body):
        message = NotificationMessage(**json.loads(body))
        
        try:
            self.sender.send(message)
            ch.basic_ack(delivery_tag=method.delivery_tag)
        except ProviderTemporaryError as e:
            # Retry with exponential backoff
            if message.attempt < message.max_attempts:
                message.attempt += 1
                self.requeue_with_delay(message, delay_seconds=2 ** message.attempt)
                ch.basic_ack(delivery_tag=method.delivery_tag)
            else:
                # Move to dead letter queue
                self.move_to_dlq(message, str(e))
                ch.basic_ack(delivery_tag=method.delivery_tag)
        except ProviderPermanentError as e:
            # Don't retry - invalid phone number, unsubscribed, etc.
            self.move_to_dlq(message, str(e))
            ch.basic_ack(delivery_tag=method.delivery_tag)

For backpressure, implement consumer prefetch limits. If workers can’t keep up, the queue grows, and you can alert on queue depth. Don’t drop messages—let the queue buffer.

Channel Adapters & Provider Abstraction

Every notification channel needs a provider, and every provider will eventually fail you. Abstract them behind a common interface so you can swap providers, implement failover, and A/B test delivery rates.

from abc import ABC, abstractmethod
from typing import Optional, List
from dataclasses import dataclass

@dataclass
class SendResult:
    success: bool
    provider_message_id: Optional[str]
    error_code: Optional[str]
    error_message: Optional[str]

class NotificationSender(ABC):
    @abstractmethod
    def send(self, recipient: str, content: str, **kwargs) -> SendResult:
        pass
    
    @abstractmethod
    def get_channel(self) -> str:
        pass

# Push implementations
class FCMSender(NotificationSender):
    def __init__(self, credentials_path: str):
        self.client = initialize_firebase(credentials_path)
    
    def send(self, recipient: str, content: str, **kwargs) -> SendResult:
        try:
            response = self.client.send(
                messaging.Message(
                    token=recipient,
                    notification=messaging.Notification(
                        title=kwargs.get('title', ''),
                        body=content
                    ),
                    data=kwargs.get('data', {})
                )
            )
            return SendResult(success=True, provider_message_id=response)
        except messaging.UnregisteredError:
            return SendResult(success=False, error_code='INVALID_TOKEN', 
                            error_message='Device token no longer valid')
    
    def get_channel(self) -> str:
        return 'PUSH'

class TwilioSMSSender(NotificationSender):
    def __init__(self, account_sid: str, auth_token: str, from_number: str):
        self.client = Client(account_sid, auth_token)
        self.from_number = from_number
    
    def send(self, recipient: str, content: str, **kwargs) -> SendResult:
        try:
            message = self.client.messages.create(
                body=content,
                from_=self.from_number,
                to=recipient
            )
            return SendResult(success=True, provider_message_id=message.sid)
        except TwilioRestException as e:
            return SendResult(success=False, error_code=str(e.code),
                            error_message=e.msg)
    
    def get_channel(self) -> str:
        return 'SMS'

# Provider manager with failover
class ProviderManager:
    def __init__(self):
        self.providers: dict[str, List[NotificationSender]] = {}
    
    def register(self, channel: str, sender: NotificationSender, priority: int = 0):
        if channel not in self.providers:
            self.providers[channel] = []
        self.providers[channel].insert(priority, sender)
    
    def send(self, channel: str, recipient: str, content: str, **kwargs) -> SendResult:
        for provider in self.providers.get(channel, []):
            result = provider.send(recipient, content, **kwargs)
            if result.success or result.error_code in ['INVALID_TOKEN', 'UNSUBSCRIBED']:
                return result
            # Try next provider on temporary failures
        return SendResult(success=False, error_code='ALL_PROVIDERS_FAILED')

User Preferences & Template Management

Users have strong opinions about how you contact them. Respect those preferences or face unsubscribes and spam complaints.

from jinja2 import Environment, BaseLoader
from typing import Optional
import json

class UserPreferenceService:
    def __init__(self, redis_client, db_connection):
        self.redis = redis_client
        self.db = db_connection
    
    def get_preferences(self, user_id: str) -> dict:
        # Check cache first
        cached = self.redis.get(f'prefs:{user_id}')
        if cached:
            return json.loads(cached)
        
        # Fall back to database
        prefs = self.db.query(
            "SELECT channel, enabled, contact_value FROM user_notification_preferences WHERE user_id = %s",
            (user_id,)
        )
        result = {row['channel']: {'enabled': row['enabled'], 'contact': row['contact_value']} 
                  for row in prefs}
        
        self.redis.setex(f'prefs:{user_id}', 3600, json.dumps(result))
        return result
    
    def can_send(self, user_id: str, channel: str) -> tuple[bool, Optional[str]]:
        prefs = self.get_preferences(user_id)
        channel_pref = prefs.get(channel, {})
        if not channel_pref.get('enabled', False):
            return False, None
        return True, channel_pref.get('contact')

class TemplateService:
    def __init__(self, db_connection):
        self.db = db_connection
        self.env = Environment(loader=BaseLoader())
        self.cache = {}
    
    def render(self, template_id: str, channel: str, variables: dict, locale: str = 'en') -> str:
        cache_key = f'{template_id}:{channel}:{locale}'
        
        if cache_key not in self.cache:
            template_row = self.db.query(
                """SELECT content FROM notification_templates 
                   WHERE template_id = %s AND channel = %s AND locale = %s""",
                (template_id, channel, locale)
            )
            if not template_row:
                # Fall back to default locale
                template_row = self.db.query(
                    """SELECT content FROM notification_templates 
                       WHERE template_id = %s AND channel = %s AND locale = 'en'""",
                    (template_id, channel)
                )
            self.cache[cache_key] = self.env.from_string(template_row[0]['content'])
        
        return self.cache[cache_key].render(**variables)

Reliability & Observability

Idempotency prevents the nightmare scenario where a network timeout causes a retry, and your user gets charged twice or receives duplicate notifications.

import hashlib
from functools import wraps
from datetime import datetime, timedelta

class IdempotencyMiddleware:
    def __init__(self, redis_client, ttl_hours: int = 24):
        self.redis = redis_client
        self.ttl = timedelta(hours=ttl_hours)
    
    def check_and_set(self, idempotency_key: str) -> tuple[bool, Optional[str]]:
        """Returns (is_duplicate, existing_notification_id)"""
        key = f'idempotency:{idempotency_key}'
        
        # Try to set with NX (only if not exists)
        existing = self.redis.get(key)
        if existing:
            return True, existing.decode()
        
        return False, None
    
    def mark_processed(self, idempotency_key: str, notification_id: str):
        key = f'idempotency:{idempotency_key}'
        self.redis.setex(key, int(self.ttl.total_seconds()), notification_id)

def idempotent(get_key):
    """Decorator for idempotent notification sending"""
    def decorator(func):
        @wraps(func)
        def wrapper(self, request: NotificationRequest, *args, **kwargs):
            key = get_key(request)
            is_dup, existing_id = self.idempotency.check_and_set(key)
            
            if is_dup:
                return existing_id  # Return existing notification ID
            
            result = func(self, request, *args, **kwargs)
            self.idempotency.mark_processed(key, result)
            return result
        return wrapper
    return decorator

For observability, track these metrics: queue depth per channel, delivery success rate per provider, end-to-end latency (request to delivery confirmation), and retry rate. Alert on queue depth exceeding 10x normal and success rate dropping below 95%.

Scaling Considerations & Trade-offs

Worker scaling: Scale workers independently per channel. SMS typically needs fewer workers (rate-limited by providers) than push (high throughput, low latency). Use horizontal pod autoscaling based on queue depth.

Database choices: Use PostgreSQL for notification logs if you need complex queries and joins (user notification history). Use Cassandra or DynamoDB if you’re optimizing for write throughput and can query by partition key (notification_id or user_id + time range).

Cost optimization: Batch email sends where possible (SendGrid charges per API call). Use SNS for SMS in AWS-heavy environments (cheaper than Twilio for high volume). Cache templates and user preferences aggressively.

Key trade-offs:

  • Queue-per-channel adds operational complexity but provides better isolation
  • Provider abstraction adds code but saves you during outages
  • Idempotency storage costs money but prevents user-facing bugs

Build for the failure modes you’ll inevitably face: provider outages, traffic spikes, and the intern who accidentally triggers a million notifications. Your future self will thank you.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.