Reservoir Sampling: Random Selection from Streams
You're processing a continuous stream of events—server logs, user clicks, sensor readings—and you need a random sample. The catch: you don't know how many items will arrive, you can't store...
Key Insights
- Reservoir sampling solves a fundamental streaming problem: selecting k uniformly random items from a dataset of unknown size in a single pass with O(k) memory.
- The algorithm’s elegance lies in its probability math—each item i is added with probability k/i, and existing items are replaced uniformly, ensuring every item has exactly k/n chance of being in the final sample.
- Weighted variants and parallel merging strategies extend reservoir sampling to production scenarios like distributed log sampling, database query optimization, and ML training pipelines.
The Stream Sampling Problem
You’re processing a continuous stream of events—server logs, user clicks, sensor readings—and you need a random sample. The catch: you don’t know how many items will arrive, you can’t store everything, and you only get one pass through the data.
This is the stream sampling problem, and it appears everywhere in production systems. Log aggregation services sample requests to avoid storage costs. A/B testing frameworks randomly assign users from an unbounded population. Data pipelines sketch massive datasets into manageable summaries. Database engines implement TABLESAMPLE clauses for query optimization.
The naive solution—collect everything, then sample—fails immediately. You either run out of memory or violate the single-pass constraint. What you need is an algorithm that maintains a fixed-size sample while guaranteeing uniform randomness across the entire stream, regardless of its length.
Why Naive Approaches Fail
Your first instinct might be: “Sample with fixed probability p.” If you want roughly 1000 items, set p=0.001 and flip a coin for each element. This approach has two critical flaws.
First, you don’t know the stream size. Setting p=0.001 gives you ~1000 items from a million-element stream but only ~10 items from ten thousand. Your sample size becomes unpredictable, which breaks downstream systems expecting fixed-size batches.
Second, even if you guess the size correctly, you’re not guaranteed exactly k items. You’ll get a binomial distribution around your target, sometimes significantly over or under.
What about storing items until you hit capacity, then stopping? Now your sample is biased toward early elements—useless for any statistical analysis.
The memory constraint is real. Processing a billion-event stream with 8-byte identifiers requires 8GB just for IDs. Add payload data and you’re looking at terabytes. Reservoir sampling maintains your sample in O(k) space regardless of stream length.
Algorithm R: Single-Item Reservoir Sampling
Let’s start with the simplest case: selecting exactly one random item from a stream. The algorithm, published by Jeffrey Vitter in 1985, is deceptively simple.
import random
def reservoir_sample_single(stream):
"""Select one random item from a stream of unknown length."""
result = None
for i, item in enumerate(stream, start=1):
# Replace with probability 1/i
if random.randint(1, i) == 1:
result = item
return result
Here’s why this works. When the first item arrives, we keep it with probability 1/1 = 100%. When the second arrives, we replace with probability 1/2. The first item survives if we don’t replace: also 1/2. Both items now have equal probability.
For item n, the probability it ends up selected is:
P(selected) = P(picked) × P(not replaced by n+1) × P(not replaced by n+2) × ...
= (1/n) × (n/(n+1)) × ((n+1)/(n+2)) × ...
= 1/N (where N is final stream length)
The telescoping product cancels beautifully, leaving exactly 1/N for every item. This holds regardless of when the item appeared in the stream.
Generalizing to K Items
Extending to k items follows the same logic. Maintain a reservoir of size k. For the first k items, add them directly. For each subsequent item i, include it with probability k/i, replacing a random existing reservoir element.
import random
def reservoir_sample(stream, k):
"""Select k random items uniformly from a stream."""
reservoir = []
for i, item in enumerate(stream):
if i < k:
# Fill the reservoir first
reservoir.append(item)
else:
# Replace with probability k/(i+1)
j = random.randint(0, i)
if j < k:
reservoir[j] = item
return reservoir
# Usage example
def generate_stream(n):
"""Simulate a stream of n items."""
for i in range(n):
yield f"item_{i}"
sample = reservoir_sample(generate_stream(1_000_000), k=100)
print(f"Sampled {len(sample)} items from 1M element stream")
The probability math extends naturally. Item i (0-indexed) gets added with probability k/(i+1) when i >= k. An existing item at position j survives if: (1) the new item isn’t selected, OR (2) the new item is selected but replaces a different position.
P(item j survives round i) = (i+1-k)/(i+1) + (k/(i+1)) × ((k-1)/k)
= (i+1-k)/(i+1) + (k-1)/(i+1)
= i/(i+1)
After processing n items, each item has probability k/n of being in the reservoir. The proof involves the same telescoping product as the single-item case.
Weighted Reservoir Sampling
Real-world sampling often requires weights. You might want to oversample error logs, prioritize recent events, or weight by request importance. Algorithm A-Res (Efraimidis and Spirakis, 2006) handles this elegantly.
The key insight: assign each item a key = random^(1/weight), then keep the k items with the highest keys.
import random
import heapq
def weighted_reservoir_sample(stream, k):
"""
Select k items with probability proportional to their weights.
Stream yields (item, weight) tuples.
"""
# Min-heap of (key, item) tuples
reservoir = []
for item, weight in stream:
if weight <= 0:
continue
# Key = random^(1/weight) - higher weight = higher expected key
key = random.random() ** (1.0 / weight)
if len(reservoir) < k:
heapq.heappush(reservoir, (key, item))
elif key > reservoir[0][0]:
# New item has higher key than minimum in reservoir
heapq.heapreplace(reservoir, (key, item))
return [item for key, item in reservoir]
# Example: Sample logs with error logs weighted 10x
def log_stream():
log_types = [("INFO", 1), ("WARN", 3), ("ERROR", 10)]
for i in range(100000):
log_type, weight = random.choice(log_types)
yield (f"{log_type}: event {i}", weight)
sample = weighted_reservoir_sample(log_stream(), k=50)
# Expect ~10x more ERROR logs than uniform sampling would produce
The mathematical elegance here is remarkable. The exponential transformation ensures that items with weight w are selected with probability proportional to w, while maintaining the streaming single-pass property.
Practical Applications and Optimizations
Production systems rarely process streams on a single machine. Distributed reservoir sampling requires merging reservoirs from multiple workers. The key insight: you can combine reservoirs by treating the union as a new stream and resampling.
import random
def merge_reservoirs(reservoirs, k):
"""
Merge multiple reservoirs into a single k-sized reservoir.
Each input reservoir should have the same target size k.
Assumes each reservoir sampled from streams of known sizes.
"""
# Collect all items with their source stream sizes
all_items = []
for reservoir, stream_size in reservoirs:
for item in reservoir:
all_items.append((item, stream_size))
# Weight by inverse of selection probability in original stream
# Item from stream of size n had probability k/n of being selected
# Weight should be n/k to compensate
total_weight = sum(size / k for _, size in all_items)
result = []
for item, stream_size in all_items:
weight = (stream_size / k) / total_weight
if len(result) < k:
result.append(item)
elif random.random() < weight * k / len(result):
result[random.randint(0, k-1)] = item
return result
# Simpler approach when stream sizes are equal: just resample the union
def merge_equal_reservoirs(reservoirs, k):
"""Merge reservoirs from equal-sized stream chunks."""
combined = [item for reservoir in reservoirs for item in reservoir]
return reservoir_sample(iter(combined), k)
Database systems use reservoir sampling extensively. PostgreSQL’s TABLESAMPLE SYSTEM uses a block-level variant. Spark’s DataFrame.sample() implements distributed reservoir sampling across partitions. BigQuery and Snowflake use similar techniques for approximate query processing.
For ML pipelines, reservoir sampling enables training on streams without storing entire datasets. You maintain a fixed-size buffer of training examples, continuously refreshed as new data arrives. This is particularly valuable for online learning and concept drift detection.
Tradeoffs and Alternatives
Reservoir sampling isn’t always the right tool. Consider these alternatives:
Count-Min Sketch: When you need frequency estimates rather than actual samples. Uses O(ε⁻¹ log δ⁻¹) space for ε-approximate counts with probability 1-δ.
Bloom Filters: When you only need membership testing, not sampling. Excellent for deduplication in streams.
Min-Hash: When you need set similarity rather than random samples. Powers near-duplicate detection systems.
Stratified Sampling: When you need guaranteed representation from subgroups. Reservoir sampling can miss rare categories entirely.
The memory-accuracy tradeoff is real. Reservoir sampling gives you exact uniform samples but only k of them. Sketching algorithms sacrifice exactness for richer aggregate statistics. Choose based on your downstream use case.
Reservoir sampling also assumes you want uniform (or weighted-uniform) randomness. If you need time-decay—recent items more likely than old ones—consider exponentially-weighted variants or sliding window approaches.
For extremely high-throughput streams, the random number generation becomes a bottleneck. Vitter’s Algorithm Z optimizes this by computing skip distances directly, reducing from O(n) random calls to O(k(1 + log(n/k))).
The elegance of reservoir sampling lies in its simplicity: a few lines of code, minimal memory, and mathematically provable guarantees. When you need random samples from unbounded data, it’s the algorithm to reach for first.