Redis Streams: Event Streaming Data Structure

Redis Streams implements an append-only log structure where each entry contains a unique ID and field-value pairs. Unlike Redis Pub/Sub, which delivers messages to active subscribers only, Streams...

Key Insights

  • Redis Streams provides a log-based data structure optimized for event streaming with consumer groups, automatic message IDs, and persistence, making it superior to Pub/Sub for reliable message processing
  • Consumer groups enable multiple consumers to process messages in parallel with automatic load balancing, message acknowledgment, and pending message tracking for fault tolerance
  • Streams support time-based queries, message trimming strategies, and claim mechanisms for handling dead consumers, making them production-ready for event-driven architectures

Understanding Redis Streams Architecture

Redis Streams implements an append-only log structure where each entry contains a unique ID and field-value pairs. Unlike Redis Pub/Sub, which delivers messages to active subscribers only, Streams persist messages and allow multiple consumers to read them independently.

The stream ID format follows <millisecondsTime>-<sequenceNumber>, automatically generated by Redis to ensure ordering. You can also specify custom IDs, but they must be monotonically increasing.

import redis

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

# Add entries to a stream
order_id_1 = r.xadd('orders', {
    'order_id': '12345',
    'customer_id': 'C001',
    'amount': '299.99',
    'status': 'pending'
})

# Auto-generated ID: 1704067200000-0
print(f"Added entry with ID: {order_id_1}")

# Add with explicit ID (must be greater than previous)
order_id_2 = r.xadd('orders', {
    'order_id': '12346',
    'customer_id': 'C002',
    'amount': '149.50',
    'status': 'confirmed'
}, id='1704067200001-0')

Reading Stream Data

Redis provides multiple reading strategies: reading by range, reading from a specific ID, or blocking reads for real-time processing.

# Read all entries from the beginning
entries = r.xrange('orders', '-', '+')
for entry_id, data in entries:
    print(f"{entry_id}: {data}")

# Read entries within a time range
start_id = '1704067200000-0'
end_id = '1704067300000-0'
time_range_entries = r.xrange('orders', start_id, end_id, count=10)

# Read new entries only (blocking)
last_id = '0-0'  # Start from beginning, or use '$' for new messages only
while True:
    # Block for 5 seconds waiting for new entries
    response = r.xread({'orders': last_id}, block=5000, count=10)
    
    if response:
        for stream_name, messages in response:
            for message_id, data in messages:
                print(f"Processing: {message_id} -> {data}")
                last_id = message_id

Implementing Consumer Groups

Consumer groups enable distributed processing where multiple consumers work together to process messages. Each message is delivered to only one consumer in the group, providing automatic load balancing.

# Create a consumer group
try:
    r.xgroup_create('orders', 'order-processors', id='0', mkstream=True)
except redis.exceptions.ResponseError as e:
    if "BUSYGROUP" not in str(e):
        raise

# Consumer 1 reading from the group
def process_consumer(consumer_name):
    while True:
        # Read up to 10 messages, block for 2 seconds
        messages = r.xreadgroup(
            'order-processors',
            consumer_name,
            {'orders': '>'},  # '>' means undelivered messages
            count=10,
            block=2000
        )
        
        if not messages:
            continue
            
        for stream_name, stream_messages in messages:
            for message_id, data in stream_messages:
                try:
                    # Process the order
                    process_order(data)
                    
                    # Acknowledge successful processing
                    r.xack('orders', 'order-processors', message_id)
                    
                except Exception as e:
                    print(f"Error processing {message_id}: {e}")
                    # Message remains in pending state for retry

def process_order(data):
    print(f"Processing order {data['order_id']}")
    # Business logic here

Handling Pending Messages and Dead Consumers

When a consumer reads a message but fails to acknowledge it, the message enters a pending state. This mechanism ensures no message loss even if consumers crash.

# Check pending messages for the consumer group
pending_info = r.xpending('orders', 'order-processors')
print(f"Pending count: {pending_info['pending']}")
print(f"Min ID: {pending_info['min']}")
print(f"Max ID: {pending_info['max']}")

# Get detailed pending message information
pending_messages = r.xpending_range(
    'orders',
    'order-processors',
    min='-',
    max='+',
    count=10
)

for msg in pending_messages:
    message_id = msg['message_id']
    consumer = msg['consumer']
    time_since_delivered = msg['time_since_delivered']
    delivery_count = msg['times_delivered']
    
    # If message pending for > 60 seconds, claim it
    if time_since_delivered > 60000:
        claimed = r.xclaim(
            'orders',
            'order-processors',
            'recovery-consumer',
            min_idle_time=60000,
            message_ids=[message_id]
        )
        
        for claimed_id, claimed_data in claimed:
            print(f"Claimed message {claimed_id} from {consumer}")
            # Reprocess the message

Stream Trimming and Memory Management

Streams grow indefinitely unless explicitly trimmed. Redis provides several trimming strategies to manage memory.

# Trim to maximum length (approximate)
r.xtrim('orders', maxlen=10000, approximate=True)

# Trim to maximum length (exact)
r.xtrim('orders', maxlen=10000, approximate=False)

# Trim by time (Redis 6.2+) - keep last 24 hours
import time
min_id = int((time.time() - 86400) * 1000)
r.xtrim('orders', minid=f"{min_id}-0", approximate=True)

# Add with automatic trimming
r.xadd(
    'orders',
    {'order_id': '12347', 'amount': '99.99'},
    maxlen=10000,
    approximate=True
)

Production Pattern: Event Processing Pipeline

Here’s a complete example implementing a robust event processing system with error handling and monitoring.

import redis
import json
import logging
from typing import Dict, Callable

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class StreamProcessor:
    def __init__(self, redis_client, stream_name, group_name, consumer_name):
        self.redis = redis_client
        self.stream_name = stream_name
        self.group_name = group_name
        self.consumer_name = consumer_name
        self.handlers: Dict[str, Callable] = {}
        
        # Create consumer group if not exists
        try:
            self.redis.xgroup_create(
                stream_name, group_name, id='0', mkstream=True
            )
        except redis.exceptions.ResponseError:
            pass
    
    def register_handler(self, event_type: str, handler: Callable):
        """Register a handler for specific event type"""
        self.handlers[event_type] = handler
    
    def process_pending(self, min_idle_time=300000):
        """Claim and process messages pending for > min_idle_time ms"""
        pending = self.redis.xpending_range(
            self.stream_name,
            self.group_name,
            min='-',
            max='+',
            count=100
        )
        
        for msg in pending:
            if msg['time_since_delivered'] > min_idle_time:
                claimed = self.redis.xclaim(
                    self.stream_name,
                    self.group_name,
                    self.consumer_name,
                    min_idle_time=min_idle_time,
                    message_ids=[msg['message_id']]
                )
                
                for msg_id, data in claimed:
                    self._process_message(msg_id, data)
    
    def _process_message(self, message_id, data):
        """Process individual message with error handling"""
        event_type = data.get('event_type', 'unknown')
        handler = self.handlers.get(event_type)
        
        if not handler:
            logger.warning(f"No handler for event type: {event_type}")
            self.redis.xack(self.stream_name, self.group_name, message_id)
            return
        
        try:
            handler(data)
            self.redis.xack(self.stream_name, self.group_name, message_id)
            logger.info(f"Processed {message_id}")
        except Exception as e:
            logger.error(f"Error processing {message_id}: {e}")
            # Message stays pending for retry
    
    def run(self, block_ms=5000):
        """Main processing loop"""
        logger.info(f"Starting consumer {self.consumer_name}")
        
        while True:
            # Process any pending messages first
            self.process_pending()
            
            # Read new messages
            messages = self.redis.xreadgroup(
                self.group_name,
                self.consumer_name,
                {self.stream_name: '>'},
                count=10,
                block=block_ms
            )
            
            if messages:
                for _, stream_messages in messages:
                    for message_id, data in stream_messages:
                        self._process_message(message_id, data)

# Usage
r = redis.Redis(decode_responses=True)
processor = StreamProcessor(r, 'events', 'event-processors', 'worker-1')

# Register handlers
processor.register_handler('order.created', lambda data: print(f"New order: {data}"))
processor.register_handler('order.shipped', lambda data: print(f"Shipped: {data}"))

# Start processing
processor.run()

This implementation provides automatic recovery from failures, prevents message loss through acknowledgments, and handles dead consumers through the pending message claim mechanism. The pattern scales horizontally by adding more consumers to the same group, with Redis automatically distributing messages across them.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.