Reservoir Sampling: Random Selection from Stream

You're processing a firehose of data—millions of log entries, a continuous social media feed, or network packets flying by at wire speed. You need a random sample of k items, but you can't store...

Key Insights

  • Reservoir sampling lets you select k random items from a stream of unknown length in a single pass, using only O(k) memory while guaranteeing uniform probability for each element.
  • The algorithm works by always keeping the first k items, then replacing existing items with decreasing probability as new elements arrive—this mathematical elegance ensures every item has exactly k/n chance of selection.
  • Beyond textbook examples, reservoir sampling powers real systems: sampling production logs without overwhelming storage, selecting training data from infinite streams, and implementing fair load balancing across distributed systems.

The Stream Sampling Problem

You’re processing a firehose of data—millions of log entries, a continuous social media feed, or network packets flying by at wire speed. You need a random sample of k items, but you can’t store everything, and you don’t know when the stream ends. How do you ensure every item has a fair chance of being selected?

This is the stream sampling problem, and it appears everywhere in production systems:

  • Log aggregation: Sample 1% of requests for detailed analysis without storing terabytes
  • Social media feeds: Select random posts for content moderation review
  • Network monitoring: Capture representative packet samples for security analysis
  • ML pipelines: Build training datasets from continuous data streams

The constraint that makes this interesting: you get one pass through the data, and you must be ready to return your sample at any moment.

Why Naive Approaches Fail

Your first instinct might be to buffer everything, count the total, then randomly select. This fails immediately when the stream is larger than available memory or when it never ends.

“Just track the count and sample with probability k/n” sounds clever, but you don’t know n until the stream ends. You can’t go back and include items you skipped.

What about sampling every nth item? This introduces bias—items at certain positions are guaranteed inclusion or exclusion. If your stream has any periodicity (and real streams often do), you’ll get skewed results.

We need an algorithm that:

  1. Processes each item exactly once
  2. Uses constant memory (proportional to sample size, not stream size)
  3. Guarantees uniform selection probability
  4. Can return a valid sample at any point

Algorithm R: Single-Item Reservoir Sampling

Let’s start simple: selecting exactly one random item from a stream. The algorithm, attributed to Alan Waterman, is elegant:

  1. Keep the first item
  2. When you see the nth item, replace your selection with probability 1/n

That’s it. Here’s why it works:

After seeing n items, what’s the probability that item i is selected? It must have been selected at position i (probability 1/i) AND survived all subsequent replacements. The survival probability is:

(1 - 1/(i+1)) × (1 - 1/(i+2)) × ... × (1 - 1/n)
= (i/(i+1)) × ((i+1)/(i+2)) × ... × ((n-1)/n)
= i/n

Combined: (1/i) × (i/n) = 1/n

Every item has exactly 1/n probability. Beautiful.

import random

def reservoir_sample_single(stream):
    """Select one random item from a stream with uniform probability."""
    result = None
    
    for i, item in enumerate(stream, start=1):
        # Replace with probability 1/i
        if random.randint(1, i) == 1:
            result = item
    
    return result

# Example: sample one line from a potentially huge file
def sample_line_from_file(filepath):
    with open(filepath, 'r') as f:
        return reservoir_sample_single(f)

Generalizing to K Items

Algorithm R extends naturally to selecting k items. Maintain a “reservoir” of size k:

  1. Fill the reservoir with the first k items
  2. For each subsequent item at position i, generate a random number j in [1, i]
  3. If j ≤ k, replace reservoir[j-1] with the current item
import random
from typing import List, Iterator, TypeVar

T = TypeVar('T')

def reservoir_sample(stream: Iterator[T], k: int) -> List[T]:
    """
    Select k random items from a stream using Algorithm R.
    Each item has k/n probability of being in the final sample.
    """
    reservoir = []
    
    for i, item in enumerate(stream):
        if i < k:
            # Fill the reservoir first
            reservoir.append(item)
        else:
            # Replace elements with decreasing probability
            j = random.randint(0, i)
            if j < k:
                reservoir[j] = item
    
    return reservoir

The probability proof extends similarly. For any item at position i where i > k:

  • Probability of entering reservoir: k/(i+1)
  • Probability of surviving until position n: product of (1 - 1/(j+1) × (1/k)) for j from i+1 to n

This telescopes to give each item exactly k/n probability.

Here’s a more robust implementation with type hints and edge case handling:

import java.util.*;
import java.util.stream.Stream;

public class ReservoirSampler<T> {
    private final int k;
    private final List<T> reservoir;
    private final Random random;
    private long count;

    public ReservoirSampler(int k) {
        this.k = k;
        this.reservoir = new ArrayList<>(k);
        this.random = new Random();
        this.count = 0;
    }

    public void observe(T item) {
        if (count < k) {
            reservoir.add(item);
        } else {
            long j = nextLong(count + 1);
            if (j < k) {
                reservoir.set((int) j, item);
            }
        }
        count++;
    }

    public List<T> getSample() {
        return new ArrayList<>(reservoir);
    }

    private long nextLong(long bound) {
        return Math.abs(random.nextLong()) % bound;
    }
}

Weighted Reservoir Sampling

Sometimes items aren’t equally important. You want higher-weight items to have proportionally higher selection probability. Algorithm A-Res (Efraimidis and Spirakis, 2006) handles this elegantly:

For each item, compute a key: random()^(1/weight). Keep the k items with the highest keys.

import heapq
import random
from typing import Iterator, Tuple, List, TypeVar

T = TypeVar('T')

def weighted_reservoir_sample(
    stream: Iterator[Tuple[T, float]], 
    k: int
) -> List[T]:
    """
    Weighted reservoir sampling using Algorithm A-Res.
    
    Args:
        stream: Iterator of (item, weight) tuples
        k: Number of items to sample
    
    Returns:
        List of k items, selected with probability proportional to weight
    """
    # Min-heap of (key, item) tuples
    heap: List[Tuple[float, T]] = []
    
    for item, weight in stream:
        if weight <= 0:
            continue
            
        # Key = random^(1/weight) — higher weight = higher expected key
        key = random.random() ** (1.0 / weight)
        
        if len(heap) < k:
            heapq.heappush(heap, (key, item))
        elif key > heap[0][0]:
            heapq.heapreplace(heap, (key, item))
    
    return [item for _, item in heap]

# Example: sample log entries, prioritizing errors
def sample_logs_by_severity(log_stream):
    severity_weights = {'DEBUG': 0.1, 'INFO': 0.5, 'WARN': 2.0, 'ERROR': 10.0}
    
    weighted_stream = (
        (entry, severity_weights.get(entry.level, 1.0))
        for entry in log_stream
    )
    
    return weighted_reservoir_sample(weighted_stream, k=1000)

The mathematical intuition: raising a uniform random number to power 1/w stretches the distribution. Higher weights produce higher expected keys, making those items more likely to survive in the top-k heap.

Practical Applications & Optimizations

Sampling Lines from Large Files

A common task: get a random sample from a file too large to load into memory.

import random
from pathlib import Path

def sample_file_lines(filepath: Path, k: int, encoding='utf-8') -> list[str]:
    """
    Efficiently sample k random lines from a file of any size.
    Memory usage: O(k) regardless of file size.
    """
    reservoir = []
    
    with open(filepath, 'r', encoding=encoding) as f:
        for i, line in enumerate(f):
            if i < k:
                reservoir.append(line.rstrip('\n'))
            else:
                j = random.randint(0, i)
                if j < k:
                    reservoir[j] = line.rstrip('\n')
    
    return reservoir

# Sample 10,000 lines from a 50GB log file
sample = sample_file_lines(Path('/var/log/massive.log'), k=10000)

Distributed Reservoir Sampling

When processing streams across multiple workers, you can merge reservoirs:

def merge_reservoirs(reservoirs: list[list], k: int) -> list:
    """
    Merge multiple reservoir samples into one.
    Each input reservoir should track its stream size.
    """
    # Flatten all reservoirs and re-sample
    combined = []
    for reservoir in reservoirs:
        combined.extend(reservoir)
    
    # Re-sample if we have more than k items
    if len(combined) <= k:
        return combined
    
    return random.sample(combined, k)

For proper weighted merging in distributed systems, each worker should also track the count of items it processed, allowing proportional combination.

Optimized Skip-Based Sampling

For very large k and even larger streams, you can optimize by computing how many items to skip:

import random
import math

def optimized_reservoir_sample(stream, k: int):
    """
    Optimized reservoir sampling that skips items in bulk.
    Faster for large streams where k << n.
    """
    reservoir = []
    iterator = iter(stream)
    
    # Fill initial reservoir
    for i, item in zip(range(k), iterator):
        reservoir.append(item)
    
    # Process remaining items with skip optimization
    w = math.exp(math.log(random.random()) / k)
    next_idx = k + int(math.log(random.random()) / math.log(1 - w))
    
    for i, item in enumerate(iterator, start=k):
        if i == next_idx:
            reservoir[random.randint(0, k - 1)] = item
            w *= math.exp(math.log(random.random()) / k)
            next_idx = i + 1 + int(math.log(random.random()) / math.log(1 - w))
    
    return reservoir

This reduces the number of random number generations from O(n) to O(k log(n/k)).

Complexity Analysis & Trade-offs

Time Complexity: O(n) for the basic algorithm—you must see every item once. The optimized skip-based variant achieves O(k(1 + log(n/k))) random number generations.

Space Complexity: O(k)—you only store the reservoir, regardless of stream size.

When to use reservoir sampling:

  • Stream length is unknown or infinite
  • Single-pass processing is required
  • Memory is constrained relative to data size
  • You need provably uniform random selection

When to use alternatives:

  • If you know n upfront, generate k random indices and select directly
  • If data fits in memory, random.sample() is simpler
  • If you need exact counts of specific items, use Count-Min Sketch instead

Edge cases to handle:

  • Stream shorter than k: return all items
  • Empty stream: return empty list
  • k = 0: return empty list immediately

Reservoir sampling is one of those algorithms that seems almost too simple to work, yet the mathematics are airtight. Once you internalize it, you’ll spot opportunities to apply it everywhere—anywhere you need randomness from streams without the luxury of hindsight.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.