Redis Caching Patterns: Cache-Aside, Write-Through

Redis caching can reduce database load by 60-90% and improve response times from hundreds of milliseconds to single-digit milliseconds. But throwing Redis in front of your database without a coherent...

Key Insights

  • Cache-aside offers better resilience and memory efficiency by loading data on-demand, making it ideal for read-heavy workloads where not all data needs caching
  • Write-through guarantees cache consistency by synchronously updating both cache and database, but introduces write latency and should be reserved for data requiring strict coherence
  • Cache stampede protection through distributed locking is essential for cache-aside patterns in high-traffic scenarios to prevent database overload during cache misses

Introduction to Caching Patterns

Redis caching can reduce database load by 60-90% and improve response times from hundreds of milliseconds to single-digit milliseconds. But throwing Redis in front of your database without a coherent strategy often creates more problems than it solves—stale data, cache inconsistencies, and mysterious production issues.

The two fundamental caching patterns are cache-aside and write-through. Cache-aside puts your application in control of cache population, loading data lazily when requested. Write-through makes the cache the primary write target, synchronously updating both cache and database. Your choice between them fundamentally affects your system’s consistency guarantees, performance characteristics, and operational complexity.

Choose cache-aside for read-heavy workloads where eventual consistency is acceptable and you want to cache only frequently accessed data. Choose write-through when you need strong consistency guarantees and can tolerate the additional write latency.

Cache-Aside Pattern (Lazy Loading)

Cache-aside is the most common caching pattern. The application code explicitly manages the cache, checking it first on reads and populating it on misses. This creates a lazy loading behavior where only requested data enters the cache.

The read flow works like this: check cache → on miss, query database → populate cache with TTL → return data. On cache hits, you skip the database entirely. The application bears full responsibility for cache coherence.

Here’s a practical implementation for fetching user profiles:

import redis
import json
import psycopg2
from typing import Optional, Dict

class UserProfileCache:
    def __init__(self, redis_client: redis.Redis, db_conn):
        self.redis = redis_client
        self.db = db_conn
        self.ttl = 3600  # 1 hour
    
    def get_user_profile(self, user_id: int) -> Optional[Dict]:
        cache_key = f"user:profile:{user_id}"
        
        # Check cache first
        cached_data = self.redis.get(cache_key)
        if cached_data:
            return json.loads(cached_data)
        
        # Cache miss - query database
        cursor = self.db.cursor()
        cursor.execute(
            "SELECT id, username, email, created_at FROM users WHERE id = %s",
            (user_id,)
        )
        row = cursor.fetchone()
        cursor.close()
        
        if not row:
            return None
        
        # Build profile dict
        profile = {
            'id': row[0],
            'username': row[1],
            'email': row[2],
            'created_at': row[3].isoformat()
        }
        
        # Populate cache with TTL
        self.redis.setex(
            cache_key,
            self.ttl,
            json.dumps(profile)
        )
        
        return profile

This pattern shines for read-heavy workloads. You only cache what’s actually requested, making efficient use of Redis memory. If Redis goes down, your application continues functioning—just slower. The database remains the source of truth.

The downside is cache staleness. When data updates in the database, the cache doesn’t know until the TTL expires. For many applications, this eventual consistency is perfectly acceptable.

Write-Through Pattern

Write-through flips the responsibility. Instead of lazily populating the cache, you proactively update it on every write. The application writes to both cache and database synchronously, maintaining consistency between them.

The write flow: receive update → write to cache → write to database → return success. Both writes must succeed, or you roll back. This guarantees that cached data always reflects the database state.

Here’s write-through for updating user settings:

import redis
import json
import psycopg2
from typing import Dict

class UserSettingsCache:
    def __init__(self, redis_client: redis.Redis, db_conn):
        self.redis = redis_client
        self.db = db_conn
        self.ttl = 86400  # 24 hours
    
    def update_settings(self, user_id: int, settings: Dict) -> bool:
        cache_key = f"user:settings:{user_id}"
        
        try:
            # Start database transaction
            cursor = self.db.cursor()
            
            # Update database first
            cursor.execute(
                """
                INSERT INTO user_settings (user_id, theme, notifications, language)
                VALUES (%s, %s, %s, %s)
                ON CONFLICT (user_id) DO UPDATE
                SET theme = EXCLUDED.theme,
                    notifications = EXCLUDED.notifications,
                    language = EXCLUDED.language,
                    updated_at = NOW()
                """,
                (user_id, settings['theme'], settings['notifications'], settings['language'])
            )
            
            # Write to cache
            self.redis.setex(
                cache_key,
                self.ttl,
                json.dumps(settings)
            )
            
            # Commit transaction
            self.db.commit()
            cursor.close()
            return True
            
        except Exception as e:
            # Rollback on any failure
            self.db.rollback()
            # Invalidate cache to be safe
            self.redis.delete(cache_key)
            raise e
    
    def get_settings(self, user_id: int) -> Optional[Dict]:
        cache_key = f"user:settings:{user_id}"
        
        # Check cache
        cached = self.redis.get(cache_key)
        if cached:
            return json.loads(cached)
        
        # Fallback to database
        cursor = self.db.cursor()
        cursor.execute(
            "SELECT theme, notifications, language FROM user_settings WHERE user_id = %s",
            (user_id,)
        )
        row = cursor.fetchone()
        cursor.close()
        
        if row:
            settings = {
                'theme': row[0],
                'notifications': row[1],
                'language': row[2]
            }
            self.redis.setex(cache_key, self.ttl, json.dumps(settings))
            return settings
        
        return None

Write-through guarantees consistency but doubles your write latency. Every update waits for both Redis and the database. For write-heavy workloads, this overhead becomes significant.

Comparing the Two Patterns

Aspect Cache-Aside Write-Through
Consistency Eventual (TTL-dependent) Strong (always synchronized)
Read Performance Fast on hit, slow on miss Fast (cache always populated)
Write Performance Fast (database only) Slower (dual writes)
Cache Staleness Possible until TTL expires Never stale
Memory Efficiency High (only requested data) Lower (all written data cached)
Complexity Low Medium (transaction handling)
Failure Mode Degrades gracefully Write failures affect both systems
Best For Read-heavy, eventual consistency OK Strong consistency requirements

Use cache-aside for product catalogs, user profiles, and content feeds where slightly stale data is acceptable. Use write-through for user sessions, shopping carts, and real-time inventory where consistency matters more than write performance.

Handling Common Edge Cases

Cache stampede is the most critical edge case in cache-aside patterns. When a popular cache entry expires, multiple concurrent requests simultaneously query the database, causing a spike that can overwhelm it.

Prevent this with distributed locking:

import redis
import json
import time
from typing import Optional, Dict

class StampedeProtectedCache:
    def __init__(self, redis_client: redis.Redis, db_conn):
        self.redis = redis_client
        self.db = db_conn
        self.ttl = 3600
        self.lock_ttl = 10  # Lock expires after 10 seconds
    
    def get_product(self, product_id: int) -> Optional[Dict]:
        cache_key = f"product:{product_id}"
        lock_key = f"lock:product:{product_id}"
        
        # Try cache first
        cached = self.redis.get(cache_key)
        if cached:
            return json.loads(cached)
        
        # Acquire lock to prevent stampede
        lock_acquired = self.redis.set(
            lock_key,
            "1",
            nx=True,  # Only set if doesn't exist
            ex=self.lock_ttl
        )
        
        if lock_acquired:
            try:
                # Double-check cache (might have been populated while waiting)
                cached = self.redis.get(cache_key)
                if cached:
                    return json.loads(cached)
                
                # Query database
                cursor = self.db.cursor()
                cursor.execute(
                    "SELECT id, name, price, stock FROM products WHERE id = %s",
                    (product_id,)
                )
                row = cursor.fetchone()
                cursor.close()
                
                if row:
                    product = {
                        'id': row[0],
                        'name': row[1],
                        'price': float(row[2]),
                        'stock': row[3]
                    }
                    self.redis.setex(cache_key, self.ttl, json.dumps(product))
                    return product
                
                return None
                
            finally:
                # Always release lock
                self.redis.delete(lock_key)
        else:
            # Another request is loading the data, wait and retry
            time.sleep(0.1)
            cached = self.redis.get(cache_key)
            if cached:
                return json.loads(cached)
            
            # If still not cached, query database directly
            return self._query_database(product_id)
    
    def _query_database(self, product_id: int) -> Optional[Dict]:
        cursor = self.db.cursor()
        cursor.execute(
            "SELECT id, name, price, stock FROM products WHERE id = %s",
            (product_id,)
        )
        row = cursor.fetchone()
        cursor.close()
        
        if row:
            return {
                'id': row[0],
                'name': row[1],
                'price': float(row[2]),
                'stock': row[3]
            }
        return None

This approach ensures only one request queries the database when a cache entry expires, while others wait briefly for the cache to be populated.

Real-World Implementation Considerations

Monitor your cache hit rate religiously. A healthy cache-aside implementation should maintain 80-95% hit rates for frequently accessed data. Below 70% suggests your TTL is too short or your access patterns don’t benefit from caching.

Here’s a metrics wrapper for tracking cache performance:

import redis
import time
import logging
from typing import Optional, Callable, Any

class MonitoredCache:
    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client
        self.logger = logging.getLogger(__name__)
        self.hits = 0
        self.misses = 0
    
    def get_with_metrics(
        self,
        cache_key: str,
        fetch_fn: Callable[[], Any],
        ttl: int = 3600
    ) -> Optional[Any]:
        start_time = time.time()
        
        # Check cache
        cached = self.redis.get(cache_key)
        
        if cached:
            self.hits += 1
            latency = (time.time() - start_time) * 1000
            self.logger.info(
                f"Cache HIT: {cache_key} (latency: {latency:.2f}ms, "
                f"hit_rate: {self.hit_rate():.2%})"
            )
            return cached
        
        # Cache miss - fetch data
        self.misses += 1
        data = fetch_fn()
        
        if data:
            self.redis.setex(cache_key, ttl, data)
        
        latency = (time.time() - start_time) * 1000
        self.logger.info(
            f"Cache MISS: {cache_key} (latency: {latency:.2f}ms, "
            f"hit_rate: {self.hit_rate():.2%})"
        )
        
        return data
    
    def hit_rate(self) -> float:
        total = self.hits + self.misses
        return self.hits / total if total > 0 else 0.0

Set appropriate TTLs based on your data volatility. User profiles might cache for hours, while real-time prices need seconds. Use Redis’s maxmemory-policy set to allkeys-lru to automatically evict least recently used keys when memory fills up.

Conclusion and Best Practices

Cache-aside and write-through serve different needs. Cache-aside excels at read-heavy workloads where eventual consistency is acceptable. Write-through guarantees consistency but at the cost of write performance.

For production deployments: implement cache stampede protection for high-traffic keys, monitor hit rates and eviction rates, set appropriate TTLs based on data volatility, and always handle Redis failures gracefully by falling back to the database. Use cache-aside as your default pattern—it’s simpler and more resilient. Only reach for write-through when you have specific consistency requirements that justify the added complexity.

Most importantly, remember that caching is an optimization. Start with a working system, measure where bottlenecks exist, then apply the appropriate caching pattern. Premature caching creates as many problems as premature optimization.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.