System Design: Database Sharding Techniques

Database sharding is horizontal partitioning of data across multiple database instances. Each shard holds a subset of the total data, allowing you to scale write throughput and storage beyond what a...

Key Insights

  • Shard key selection is the most critical decision in sharding architecture—a poor choice creates hotspots that negate all scaling benefits and requires painful migrations to fix.
  • Consistent hashing with virtual nodes minimizes data movement during cluster changes from O(n) to O(k/n), making operational scaling practical rather than catastrophic.
  • Cross-shard operations are where sharding complexity explodes; design your data model to minimize them rather than trying to solve distributed joins elegantly.

Introduction to Database Sharding

Database sharding is horizontal partitioning of data across multiple database instances. Each shard holds a subset of the total data, allowing you to scale write throughput and storage beyond what a single machine can handle.

Before reaching for sharding, exhaust simpler options. Read replicas handle read-heavy workloads. Caching reduces database load dramatically. Vertical scaling (bigger machines) delays complexity. Connection pooling improves throughput. Query optimization often reveals 10x improvements hiding in plain sight.

Sharding becomes necessary when you hit fundamental limits: single-node write throughput caps out, dataset exceeds practical storage for one machine, or latency requirements demand geographic distribution. If you’re not hitting these walls, sharding adds operational complexity without proportional benefit.

The cost is real. Every query now requires routing logic. Joins across shards become application-level problems. Transactions spanning shards require distributed coordination. Schema migrations touch multiple databases. Backups and recovery multiply in complexity. Choose this path deliberately.

Sharding Strategies

Four primary strategies dominate production systems, each with distinct trade-offs.

Hash-based sharding applies a hash function to the shard key, distributing data uniformly across shards. This prevents hotspots when your key has good cardinality but makes range queries expensive since related data scatters randomly.

Range-based sharding assigns contiguous key ranges to each shard. This excels for time-series data or when range queries are common, but creates hotspots if writes concentrate on recent data.

Geographic sharding routes data based on location, reducing latency for regional users and satisfying data residency requirements. Complexity increases when users or data cross regions.

Directory-based sharding maintains a lookup service mapping keys to shards. This offers maximum flexibility for custom routing logic but introduces a single point of failure and lookup overhead.

import hashlib
from typing import List

class HashBasedRouter:
    def __init__(self, shard_count: int):
        self.shard_count = shard_count
    
    def get_shard(self, key: str) -> int:
        # MD5 provides uniform distribution; not for cryptography
        hash_bytes = hashlib.md5(key.encode()).digest()
        hash_int = int.from_bytes(hash_bytes[:8], byteorder='big')
        return hash_int % self.shard_count

class RangeBasedRouter:
    def __init__(self, boundaries: List[int]):
        # boundaries = [1000000, 2000000, 3000000] -> 4 shards
        self.boundaries = sorted(boundaries)
    
    def get_shard(self, key: int) -> int:
        for i, boundary in enumerate(self.boundaries):
            if key < boundary:
                return i
        return len(self.boundaries)

# Usage
hash_router = HashBasedRouter(shard_count=8)
shard = hash_router.get_shard("user_12345")  # Returns 0-7

range_router = RangeBasedRouter([1000000, 2000000, 3000000])
shard = range_router.get_shard(1500000)  # Returns 1

Hash-based works for most OLTP workloads. Range-based suits time-ordered data. Geographic applies when latency or compliance drives architecture. Directory-based handles edge cases where other strategies fail.

Shard Key Selection

The shard key determines everything. Choose poorly and you’ll rebuild your data layer within two years.

High cardinality ensures data distributes across shards. User ID works; boolean fields don’t. A shard key with only 10 possible values caps you at 10 shards regardless of infrastructure.

Query alignment means your most frequent queries include the shard key. If 80% of queries filter by tenant_id, that’s your shard key. If queries need data from multiple shard key values, you’ll pay the cross-shard penalty constantly.

Write distribution prevents hotspots. Auto-incrementing IDs concentrate recent writes on one shard. UUIDs distribute evenly. Compound keys can combine benefits.

-- Poor shard key: status has low cardinality, creates hotspots
CREATE TABLE orders (
    id BIGINT PRIMARY KEY,
    status VARCHAR(20),  -- 'pending', 'shipped', 'delivered'
    customer_id BIGINT,
    created_at TIMESTAMP
);
-- Sharding by status: 3 possible shards, 'pending' gets all new writes

-- Better shard key: customer_id has high cardinality
CREATE TABLE orders (
    id BIGINT,
    customer_id BIGINT,  -- millions of unique values
    status VARCHAR(20),
    created_at TIMESTAMP,
    PRIMARY KEY (customer_id, id)  -- compound key, shard on customer_id
);
-- Queries by customer hit single shard, writes distribute evenly

-- Best for multi-tenant: tenant_id isolates data completely
CREATE TABLE orders (
    id BIGINT,
    tenant_id INT,  -- organizational isolation
    customer_id BIGINT,
    status VARCHAR(20),
    created_at TIMESTAMP,
    PRIMARY KEY (tenant_id, id)
);
-- Each tenant's data lives on predictable shards
-- Cross-tenant queries are rare, single-tenant queries are fast

Common mistakes: sharding by timestamp (all writes hit one shard), sharding by auto-increment ID (same problem), sharding by low-cardinality field (limited scale ceiling), ignoring query patterns (constant cross-shard joins).

Consistent Hashing

Standard hash-based sharding has a fatal flaw: adding or removing shards invalidates most key-to-shard mappings. With modulo hashing, changing from 8 to 9 shards moves roughly 89% of data.

Consistent hashing solves this by mapping both keys and nodes to positions on a conceptual ring. Each key routes to the first node clockwise from its position. Adding a node only affects keys between it and its predecessor.

Virtual nodes improve distribution further. Instead of one position per physical node, each node claims multiple positions on the ring. This smooths out the distribution and allows weighted capacity allocation.

import hashlib
from bisect import bisect_right
from typing import Dict, List, Optional

class ConsistentHashRing:
    def __init__(self, virtual_nodes: int = 150):
        self.virtual_nodes = virtual_nodes
        self.ring: List[int] = []  # sorted positions
        self.ring_to_node: Dict[int, str] = {}  # position -> node
        self.nodes: set = set()
    
    def _hash(self, key: str) -> int:
        return int(hashlib.sha256(key.encode()).hexdigest(), 16)
    
    def add_node(self, node: str) -> None:
        if node in self.nodes:
            return
        self.nodes.add(node)
        for i in range(self.virtual_nodes):
            virtual_key = f"{node}:vn{i}"
            position = self._hash(virtual_key)
            self.ring.append(position)
            self.ring_to_node[position] = node
        self.ring.sort()
    
    def remove_node(self, node: str) -> None:
        if node not in self.nodes:
            return
        self.nodes.remove(node)
        for i in range(self.virtual_nodes):
            virtual_key = f"{node}:vn{i}"
            position = self._hash(virtual_key)
            self.ring.remove(position)
            del self.ring_to_node[position]
    
    def get_node(self, key: str) -> Optional[str]:
        if not self.ring:
            return None
        position = self._hash(key)
        # Find first node position >= key position
        idx = bisect_right(self.ring, position)
        if idx == len(self.ring):
            idx = 0  # Wrap around
        return self.ring_to_node[self.ring[idx]]

# Usage
ring = ConsistentHashRing(virtual_nodes=150)
ring.add_node("shard-1")
ring.add_node("shard-2")
ring.add_node("shard-3")

# Route keys
shard = ring.get_node("user:12345")

# Adding shard-4 only moves ~25% of keys (1/4 of ring)
ring.add_node("shard-4")

The 150 virtual nodes per physical node is a reasonable default. Fewer creates uneven distribution; more increases memory overhead. Adjust based on your node count and tolerance for imbalance.

Cross-Shard Operations

Cross-shard operations are where sharding extracts its cost. Every technique here adds latency, complexity, or both.

Scatter-gather sends queries to all relevant shards in parallel, then aggregates results. Simple to implement, but latency equals your slowest shard plus aggregation time.

Distributed transactions using two-phase commit provide ACID guarantees but introduce coordinator overhead and failure complexity. Most systems avoid them by designing single-shard transactions.

Denormalization duplicates data across shards to avoid joins. Storage increases, but queries stay local. Accept eventual consistency for the duplicated data.

import asyncio
from typing import List, Dict, Any
from dataclasses import dataclass

@dataclass
class ShardResult:
    shard_id: str
    data: List[Dict[str, Any]]
    error: Optional[Exception] = None

class QueryCoordinator:
    def __init__(self, shard_connections: Dict[str, Any]):
        self.shards = shard_connections
    
    async def scatter_gather(
        self, 
        query: str, 
        params: Dict[str, Any],
        target_shards: List[str] = None
    ) -> List[Dict[str, Any]]:
        shards = target_shards or list(self.shards.keys())
        
        async def query_shard(shard_id: str) -> ShardResult:
            try:
                conn = self.shards[shard_id]
                result = await conn.execute(query, params)
                return ShardResult(shard_id, result)
            except Exception as e:
                return ShardResult(shard_id, [], error=e)
        
        # Execute on all shards in parallel
        tasks = [query_shard(s) for s in shards]
        results = await asyncio.gather(*tasks)
        
        # Handle partial failures
        errors = [r for r in results if r.error]
        if errors:
            # Log errors, decide if partial results acceptable
            pass
        
        # Aggregate results
        return self._merge_results([r.data for r in results if not r.error])
    
    def _merge_results(self, shard_results: List[List[Dict]]) -> List[Dict]:
        # Flatten and apply any ordering/limits
        merged = []
        for result_set in shard_results:
            merged.extend(result_set)
        return merged

# For aggregations, merge differently
async def count_across_shards(coordinator, query):
    results = await coordinator.scatter_gather(query, {})
    return sum(r.get('count', 0) for r in results)

Design principle: structure your data model so 95% of operations hit a single shard. Accept that 5% will be expensive. If cross-shard operations dominate, your shard key is wrong.

Rebalancing and Resharding

Clusters change. Shards fill unevenly. Traffic patterns shift. You need strategies for moving data without downtime.

Double-write migration writes to both old and new locations during transition. Read from old, verify against new, then switch reads. Safe but doubles write load temporarily.

Background copying transfers data incrementally while the system runs. Track changes during copy via change data capture. Catch up, then cut over.

Consistent hashing rebalance naturally minimizes movement when adding nodes. Only keys between the new node and its predecessor move.

For range-based sharding, split hot shards by adjusting boundaries. Monitor shard sizes and query latencies to detect when splits are needed.

Automation matters. Manual resharding doesn’t scale past a few shards. Build tooling that detects imbalance, plans migrations, executes safely, and verifies correctness.

Real-World Considerations

Operational complexity is sharding’s true cost. You’re now running a distributed system with all attendant challenges.

Monitoring needs shard-level granularity: query latency per shard, storage utilization, connection counts, replication lag. Aggregate metrics hide problems.

Failure handling requires decisions. If one shard is unavailable, do queries fail entirely or return partial results? How do you handle split-brain scenarios? What’s your recovery procedure?

Schema migrations multiply by shard count. Rolling migrations become necessary. Backward-compatible changes become mandatory.

Managed solutions handle much of this complexity. Vitess (MySQL), Citus (PostgreSQL), and CockroachDB provide sharding with less operational burden. The trade-off is less control and potential vendor lock-in.

Build custom sharding when you need specific routing logic, have unusual consistency requirements, or operate at scale where managed solutions become cost-prohibitive. Otherwise, start with managed solutions and migrate if needed.

Sharding is a powerful tool for horizontal scale, but it’s not free. Understand the costs, design your data model around shard boundaries, and invest in operational tooling. Done well, sharding enables practically unlimited scale. Done poorly, it creates a distributed monolith that’s harder to operate than what you started with.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.