Design a Notification Service: Push, Email, SMS
A notification service is the backbone of user communication in modern applications. It's responsible for delivering the right message, through the right channel, at the right time. Get it wrong, and...
Key Insights
- A notification service must be asynchronous by design—synchronous delivery blocks your application and creates cascading failures when providers experience latency or outages.
- The adapter pattern is non-negotiable for channel providers; you will switch vendors, and your core business logic shouldn’t know or care about the difference between Twilio and SNS.
- Idempotency isn’t optional—without it, a retry after a network timeout means your user gets the same “Your order shipped!” email three times.
Introduction & Requirements
A notification service is the backbone of user communication in modern applications. It’s responsible for delivering the right message, through the right channel, at the right time. Get it wrong, and you annoy users with duplicates, miss critical alerts, or burn money on failed deliveries.
Functional requirements:
- Multi-channel delivery (push, email, SMS)
- Template management with variable substitution
- Scheduling for delayed or recurring notifications
- User preference management (opt-outs, channel preferences)
- Delivery tracking and status updates
Non-functional requirements:
- Scalability to handle millions of notifications per day
- 99.9% delivery reliability (for messages that reach providers)
- Sub-second latency for time-sensitive notifications
- At-least-once delivery with idempotency guarantees
- Provider failover without manual intervention
High-Level Architecture
The architecture follows a standard event-driven pattern with clear separation of concerns. Here’s how the components interact:
┌─────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ Client │────▶│ API Gateway │────▶│ Notification │
│ Services │ │ │ │ Service │
└─────────────┘ └──────────────────┘ └────────┬────────┘
│
▼
┌────────────────┐
│ Message Queue │
│ (per channel) │
└───────┬────────┘
┌────────────────────────┼────────────────────────┐
▼ ▼ ▼
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Push Worker │ │Email Worker │ │ SMS Worker │
└──────┬──────┘ └──────┬──────┘ └──────┬──────┘
▼ ▼ ▼
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ FCM/APNs │ │SendGrid/SES │ │ Twilio/SNS │
└─────────────┘ └─────────────┘ └─────────────┘
The Notification Service validates requests, checks user preferences, renders templates, and enqueues messages. Workers consume from channel-specific queues and handle provider communication.
// Core service interface
type NotificationService interface {
Send(ctx context.Context, req NotificationRequest) (string, error)
GetStatus(ctx context.Context, notificationID string) (DeliveryStatus, error)
Schedule(ctx context.Context, req NotificationRequest, sendAt time.Time) (string, error)
}
type NotificationRequest struct {
UserID string `json:"user_id"`
Channels []Channel `json:"channels"` // PUSH, EMAIL, SMS
TemplateID string `json:"template_id"`
Variables map[string]interface{} `json:"variables"`
Priority Priority `json:"priority"` // HIGH, NORMAL, LOW
IdempotencyKey string `json:"idempotency_key"`
}
type Channel string
const (
ChannelPush Channel = "PUSH"
ChannelEmail Channel = "EMAIL"
ChannelSMS Channel = "SMS"
)
Message Queue & Async Processing
Synchronous notification delivery is a recipe for disaster. When Twilio has a 30-second timeout, your API shouldn’t hang. When you need to send 100,000 promotional emails, you shouldn’t block your web servers.
I recommend queue-per-channel over a single queue with routing. Here’s why:
- Independent scaling: SMS might need 10 workers during a flash sale, while push needs 2.
- Isolation: A slow email provider doesn’t block time-sensitive push notifications.
- Priority handling: Each channel queue can have priority lanes.
import json
from dataclasses import dataclass
from typing import Optional
import pika
from tenacity import retry, stop_after_attempt, wait_exponential
@dataclass
class NotificationMessage:
notification_id: str
user_id: str
channel: str
template_id: str
variables: dict
attempt: int = 0
max_attempts: int = 3
class NotificationProducer:
def __init__(self, connection: pika.BlockingConnection):
self.channel = connection.channel()
# Declare queues for each notification channel
for queue in ['notifications.push', 'notifications.email', 'notifications.sms']:
self.channel.queue_declare(queue=queue, durable=True)
# Dead letter queue for failed messages
self.channel.queue_declare(queue=f'{queue}.dlq', durable=True)
def enqueue(self, message: NotificationMessage):
queue_name = f'notifications.{message.channel.lower()}'
self.channel.basic_publish(
exchange='',
routing_key=queue_name,
body=json.dumps(message.__dict__),
properties=pika.BasicProperties(
delivery_mode=2, # Persistent
headers={'x-attempt': message.attempt}
)
)
class NotificationConsumer:
def __init__(self, channel: str, sender: 'NotificationSender'):
self.queue_name = f'notifications.{channel}'
self.sender = sender
def process_message(self, ch, method, properties, body):
message = NotificationMessage(**json.loads(body))
try:
self.sender.send(message)
ch.basic_ack(delivery_tag=method.delivery_tag)
except ProviderTemporaryError as e:
# Retry with exponential backoff
if message.attempt < message.max_attempts:
message.attempt += 1
self.requeue_with_delay(message, delay_seconds=2 ** message.attempt)
ch.basic_ack(delivery_tag=method.delivery_tag)
else:
# Move to dead letter queue
self.move_to_dlq(message, str(e))
ch.basic_ack(delivery_tag=method.delivery_tag)
except ProviderPermanentError as e:
# Don't retry - invalid phone number, unsubscribed, etc.
self.move_to_dlq(message, str(e))
ch.basic_ack(delivery_tag=method.delivery_tag)
For backpressure, implement consumer prefetch limits. If workers can’t keep up, the queue grows, and you can alert on queue depth. Don’t drop messages—let the queue buffer.
Channel Adapters & Provider Abstraction
Every notification channel needs a provider, and every provider will eventually fail you. Abstract them behind a common interface so you can swap providers, implement failover, and A/B test delivery rates.
from abc import ABC, abstractmethod
from typing import Optional, List
from dataclasses import dataclass
@dataclass
class SendResult:
success: bool
provider_message_id: Optional[str]
error_code: Optional[str]
error_message: Optional[str]
class NotificationSender(ABC):
@abstractmethod
def send(self, recipient: str, content: str, **kwargs) -> SendResult:
pass
@abstractmethod
def get_channel(self) -> str:
pass
# Push implementations
class FCMSender(NotificationSender):
def __init__(self, credentials_path: str):
self.client = initialize_firebase(credentials_path)
def send(self, recipient: str, content: str, **kwargs) -> SendResult:
try:
response = self.client.send(
messaging.Message(
token=recipient,
notification=messaging.Notification(
title=kwargs.get('title', ''),
body=content
),
data=kwargs.get('data', {})
)
)
return SendResult(success=True, provider_message_id=response)
except messaging.UnregisteredError:
return SendResult(success=False, error_code='INVALID_TOKEN',
error_message='Device token no longer valid')
def get_channel(self) -> str:
return 'PUSH'
class TwilioSMSSender(NotificationSender):
def __init__(self, account_sid: str, auth_token: str, from_number: str):
self.client = Client(account_sid, auth_token)
self.from_number = from_number
def send(self, recipient: str, content: str, **kwargs) -> SendResult:
try:
message = self.client.messages.create(
body=content,
from_=self.from_number,
to=recipient
)
return SendResult(success=True, provider_message_id=message.sid)
except TwilioRestException as e:
return SendResult(success=False, error_code=str(e.code),
error_message=e.msg)
def get_channel(self) -> str:
return 'SMS'
# Provider manager with failover
class ProviderManager:
def __init__(self):
self.providers: dict[str, List[NotificationSender]] = {}
def register(self, channel: str, sender: NotificationSender, priority: int = 0):
if channel not in self.providers:
self.providers[channel] = []
self.providers[channel].insert(priority, sender)
def send(self, channel: str, recipient: str, content: str, **kwargs) -> SendResult:
for provider in self.providers.get(channel, []):
result = provider.send(recipient, content, **kwargs)
if result.success or result.error_code in ['INVALID_TOKEN', 'UNSUBSCRIBED']:
return result
# Try next provider on temporary failures
return SendResult(success=False, error_code='ALL_PROVIDERS_FAILED')
User Preferences & Template Management
Users have strong opinions about how you contact them. Respect those preferences or face unsubscribes and spam complaints.
from jinja2 import Environment, BaseLoader
from typing import Optional
import json
class UserPreferenceService:
def __init__(self, redis_client, db_connection):
self.redis = redis_client
self.db = db_connection
def get_preferences(self, user_id: str) -> dict:
# Check cache first
cached = self.redis.get(f'prefs:{user_id}')
if cached:
return json.loads(cached)
# Fall back to database
prefs = self.db.query(
"SELECT channel, enabled, contact_value FROM user_notification_preferences WHERE user_id = %s",
(user_id,)
)
result = {row['channel']: {'enabled': row['enabled'], 'contact': row['contact_value']}
for row in prefs}
self.redis.setex(f'prefs:{user_id}', 3600, json.dumps(result))
return result
def can_send(self, user_id: str, channel: str) -> tuple[bool, Optional[str]]:
prefs = self.get_preferences(user_id)
channel_pref = prefs.get(channel, {})
if not channel_pref.get('enabled', False):
return False, None
return True, channel_pref.get('contact')
class TemplateService:
def __init__(self, db_connection):
self.db = db_connection
self.env = Environment(loader=BaseLoader())
self.cache = {}
def render(self, template_id: str, channel: str, variables: dict, locale: str = 'en') -> str:
cache_key = f'{template_id}:{channel}:{locale}'
if cache_key not in self.cache:
template_row = self.db.query(
"""SELECT content FROM notification_templates
WHERE template_id = %s AND channel = %s AND locale = %s""",
(template_id, channel, locale)
)
if not template_row:
# Fall back to default locale
template_row = self.db.query(
"""SELECT content FROM notification_templates
WHERE template_id = %s AND channel = %s AND locale = 'en'""",
(template_id, channel)
)
self.cache[cache_key] = self.env.from_string(template_row[0]['content'])
return self.cache[cache_key].render(**variables)
Reliability & Observability
Idempotency prevents the nightmare scenario where a network timeout causes a retry, and your user gets charged twice or receives duplicate notifications.
import hashlib
from functools import wraps
from datetime import datetime, timedelta
class IdempotencyMiddleware:
def __init__(self, redis_client, ttl_hours: int = 24):
self.redis = redis_client
self.ttl = timedelta(hours=ttl_hours)
def check_and_set(self, idempotency_key: str) -> tuple[bool, Optional[str]]:
"""Returns (is_duplicate, existing_notification_id)"""
key = f'idempotency:{idempotency_key}'
# Try to set with NX (only if not exists)
existing = self.redis.get(key)
if existing:
return True, existing.decode()
return False, None
def mark_processed(self, idempotency_key: str, notification_id: str):
key = f'idempotency:{idempotency_key}'
self.redis.setex(key, int(self.ttl.total_seconds()), notification_id)
def idempotent(get_key):
"""Decorator for idempotent notification sending"""
def decorator(func):
@wraps(func)
def wrapper(self, request: NotificationRequest, *args, **kwargs):
key = get_key(request)
is_dup, existing_id = self.idempotency.check_and_set(key)
if is_dup:
return existing_id # Return existing notification ID
result = func(self, request, *args, **kwargs)
self.idempotency.mark_processed(key, result)
return result
return wrapper
return decorator
For observability, track these metrics: queue depth per channel, delivery success rate per provider, end-to-end latency (request to delivery confirmation), and retry rate. Alert on queue depth exceeding 10x normal and success rate dropping below 95%.
Scaling Considerations & Trade-offs
Worker scaling: Scale workers independently per channel. SMS typically needs fewer workers (rate-limited by providers) than push (high throughput, low latency). Use horizontal pod autoscaling based on queue depth.
Database choices: Use PostgreSQL for notification logs if you need complex queries and joins (user notification history). Use Cassandra or DynamoDB if you’re optimizing for write throughput and can query by partition key (notification_id or user_id + time range).
Cost optimization: Batch email sends where possible (SendGrid charges per API call). Use SNS for SMS in AWS-heavy environments (cheaper than Twilio for high volume). Cache templates and user preferences aggressively.
Key trade-offs:
- Queue-per-channel adds operational complexity but provides better isolation
- Provider abstraction adds code but saves you during outages
- Idempotency storage costs money but prevents user-facing bugs
Build for the failure modes you’ll inevitably face: provider outages, traffic spikes, and the intern who accidentally triggers a million notifications. Your future self will thank you.