Design a Web Crawler: Distributed URL Fetching

Building a web crawler that fetches a few thousand pages is straightforward. Building one that fetches billions of pages across millions of domains while respecting rate limits, handling failures...

Key Insights

  • A web crawler’s URL frontier is the heart of the system—design it with separate priority queues and per-host politeness queues to balance crawl efficiency with respectful rate limiting.
  • Stateless fetcher workers with consistent hashing enable horizontal scaling while ensuring the same worker handles the same domain, making robots.txt caching and rate limiting trivial.
  • Bloom filters provide memory-efficient URL deduplication at scale, but you must accept false positives—partition them across nodes and tune the false positive rate based on your crawl size.

Introduction & Problem Space

Building a web crawler that fetches a few thousand pages is straightforward. Building one that fetches billions of pages across millions of domains while respecting rate limits, handling failures gracefully, and avoiding duplicate work—that’s an entirely different engineering challenge.

At scale, web crawlers face four fundamental problems. Politeness requires limiting request rates per domain to avoid overwhelming servers or getting blocked. Deduplication prevents wasting resources re-fetching URLs you’ve already seen. Fault tolerance ensures worker crashes don’t lose progress or corrupt state. Throughput demands fetching thousands of pages per second across a distributed fleet.

This article walks through designing a production-grade distributed crawler, covering the URL frontier, fetcher workers, deduplication systems, and fault tolerance mechanisms. We’ll build something that can scale from thousands to billions of pages.

System Architecture Overview

A distributed web crawler consists of four core components working in concert. The URL Frontier manages the queue of URLs to crawl, handling prioritization and politeness. Fetcher Workers pull URLs from the frontier, download pages, and extract new URLs. The DNS Resolver provides cached, high-throughput domain resolution. The Content Store persists downloaded pages for downstream processing.

Data flows from seed URLs into the frontier, through fetchers that download content and extract links, back into the frontier for newly discovered URLs, with content flowing to persistent storage.

from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Optional
import asyncio

@dataclass
class CrawlURL:
    url: str
    domain: str
    priority: int
    depth: int
    discovered_at: float

@dataclass
class FetchResult:
    url: str
    status_code: int
    content: Optional[bytes]
    content_type: str
    extracted_urls: list[str]
    fetch_time_ms: int

class URLFrontier(ABC):
    @abstractmethod
    async def push(self, urls: list[CrawlURL]) -> int:
        """Add URLs to frontier, returns count of new URLs added."""
        pass
    
    @abstractmethod
    async def pop(self, worker_id: str, batch_size: int) -> list[CrawlURL]:
        """Get next batch of URLs for a specific worker."""
        pass
    
    @abstractmethod
    async def mark_complete(self, url: str, success: bool) -> None:
        """Mark URL as processed."""
        pass

class Fetcher(ABC):
    @abstractmethod
    async def fetch(self, url: CrawlURL) -> FetchResult:
        """Fetch a single URL with retry logic."""
        pass

class ContentStore(ABC):
    @abstractmethod
    async def store(self, url: str, result: FetchResult) -> str:
        """Store fetched content, returns content ID."""
        pass

These interfaces define the contracts between components. The key insight is that each component should be independently scalable—you might need 10x more fetchers than frontier nodes.

URL Frontier Design

The URL frontier is the brain of your crawler. A naive queue won’t work because you need to balance two competing concerns: crawl important pages first (prioritization) while not hammering any single domain (politeness).

The solution is a two-tier queue architecture. Front queues are priority-based—high-priority URLs (like homepage updates) go to queue 1, discovered links go to queue 5. Back queues are per-domain—each domain gets its own queue with a timestamp indicating when it can next be crawled.

When a worker requests URLs, the frontier selects from the highest-priority front queue, then routes to the appropriate back queue, only returning URLs from domains that aren’t rate-limited.

import time
import asyncio
from collections import defaultdict
import redis.asyncio as redis
import json

class PoliteFrontier:
    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client
        self.min_crawl_delay = 1.0  # seconds between requests to same domain
        self.priority_queues = 5
    
    async def push(self, urls: list[CrawlURL]) -> int:
        added = 0
        pipe = self.redis.pipeline()
        
        for crawl_url in urls:
            # Check if URL already seen
            seen_key = f"seen:{crawl_url.url}"
            
            # Add to priority queue (sorted set with priority as score)
            priority_key = f"priority:{crawl_url.priority}"
            url_data = json.dumps({
                "url": crawl_url.url,
                "domain": crawl_url.domain,
                "depth": crawl_url.depth,
                "discovered_at": crawl_url.discovered_at
            })
            
            pipe.setnx(seen_key, 1)
            pipe.zadd(priority_key, {url_data: crawl_url.discovered_at})
            
            # Track domain membership
            pipe.sadd(f"domain:{crawl_url.domain}", crawl_url.url)
            added += 1
        
        await pipe.execute()
        return added
    
    async def pop(self, worker_id: str, batch_size: int = 10) -> list[CrawlURL]:
        results = []
        now = time.time()
        
        for priority in range(1, self.priority_queues + 1):
            if len(results) >= batch_size:
                break
                
            priority_key = f"priority:{priority}"
            
            # Get candidate URLs
            candidates = await self.redis.zrange(
                priority_key, 0, batch_size * 3, withscores=True
            )
            
            for url_data, score in candidates:
                if len(results) >= batch_size:
                    break
                    
                data = json.loads(url_data)
                domain = data["domain"]
                
                # Check domain rate limit
                next_allowed = await self.redis.get(f"ratelimit:{domain}")
                if next_allowed and float(next_allowed) > now:
                    continue  # Domain is rate-limited
                
                # Claim this URL atomically
                removed = await self.redis.zrem(priority_key, url_data)
                if removed:
                    # Set rate limit for domain
                    await self.redis.setex(
                        f"ratelimit:{domain}",
                        int(self.min_crawl_delay * 2),
                        str(now + self.min_crawl_delay)
                    )
                    
                    results.append(CrawlURL(
                        url=data["url"],
                        domain=domain,
                        priority=priority,
                        depth=data["depth"],
                        discovered_at=data["discovered_at"]
                    ))
        
        return results

This implementation uses Redis sorted sets for priority queues and simple key-value pairs for rate limiting. The pop method checks rate limits before returning URLs, ensuring politeness without blocking workers.

Distributed Fetcher Workers

Fetcher workers should be stateless and horizontally scalable. The key design decision is how to assign URLs to workers. Random assignment works but means every worker must cache robots.txt for every domain. Consistent hashing assigns all URLs from a domain to the same worker, making robots.txt caching trivial.

import asyncio
import aiohttp
from urllib.parse import urlparse
from urllib.robotparser import RobotFileParser
import hashlib

class AsyncFetcher:
    def __init__(self, worker_id: str, user_agent: str):
        self.worker_id = worker_id
        self.user_agent = user_agent
        self.robots_cache: dict[str, RobotFileParser] = {}
        self.session: Optional[aiohttp.ClientSession] = None
        self.max_retries = 3
        self.timeout = aiohttp.ClientTimeout(total=30)
    
    async def start(self):
        self.session = aiohttp.ClientSession(
            timeout=self.timeout,
            headers={"User-Agent": self.user_agent}
        )
    
    async def fetch(self, crawl_url: CrawlURL) -> FetchResult:
        # Check robots.txt first
        if not await self._can_fetch(crawl_url.url):
            return FetchResult(
                url=crawl_url.url, status_code=403,
                content=None, content_type="",
                extracted_urls=[], fetch_time_ms=0
            )
        
        for attempt in range(self.max_retries):
            try:
                start = time.time()
                async with self.session.get(
                    crawl_url.url,
                    allow_redirects=True,
                    max_redirects=5
                ) as response:
                    content = await response.read()
                    fetch_time = int((time.time() - start) * 1000)
                    
                    extracted = []
                    if response.content_type.startswith("text/html"):
                        extracted = self._extract_urls(
                            content, crawl_url.url
                        )
                    
                    return FetchResult(
                        url=str(response.url),  # Final URL after redirects
                        status_code=response.status,
                        content=content,
                        content_type=response.content_type,
                        extracted_urls=extracted,
                        fetch_time_ms=fetch_time
                    )
                    
            except asyncio.TimeoutError:
                if attempt == self.max_retries - 1:
                    return FetchResult(
                        url=crawl_url.url, status_code=408,
                        content=None, content_type="",
                        extracted_urls=[], fetch_time_ms=30000
                    )
                await asyncio.sleep(2 ** attempt)  # Exponential backoff
                
            except aiohttp.ClientError as e:
                if attempt == self.max_retries - 1:
                    return FetchResult(
                        url=crawl_url.url, status_code=0,
                        content=None, content_type="",
                        extracted_urls=[], fetch_time_ms=0
                    )
                await asyncio.sleep(2 ** attempt)
    
    async def _can_fetch(self, url: str) -> bool:
        parsed = urlparse(url)
        domain = parsed.netloc
        
        if domain not in self.robots_cache:
            robots_url = f"{parsed.scheme}://{domain}/robots.txt"
            try:
                async with self.session.get(robots_url) as resp:
                    if resp.status == 200:
                        content = await resp.text()
                        rp = RobotFileParser()
                        rp.parse(content.splitlines())
                        self.robots_cache[domain] = rp
                    else:
                        self.robots_cache[domain] = None
            except:
                self.robots_cache[domain] = None
        
        rp = self.robots_cache.get(domain)
        return rp is None or rp.can_fetch(self.user_agent, url)

The fetcher handles retries with exponential backoff, follows redirects (tracking the final URL), respects robots.txt, and extracts links from HTML content. Each worker maintains its own robots.txt cache, which works well with consistent hashing.

URL Deduplication at Scale

You can’t store billions of URLs in a hash set—you’ll run out of memory. Bloom filters provide probabilistic set membership with fixed memory usage. The trade-off is false positives: the filter might say you’ve seen a URL when you haven’t. False negatives never occur.

import mmh3
from bitarray import bitarray
import math

class ScalableBloomFilter:
    def __init__(self, expected_items: int, fp_rate: float = 0.01):
        self.fp_rate = fp_rate
        self.size = self._optimal_size(expected_items, fp_rate)
        self.hash_count = self._optimal_hash_count(self.size, expected_items)
        self.bit_array = bitarray(self.size)
        self.bit_array.setall(0)
        self.count = 0
    
    def _optimal_size(self, n: int, p: float) -> int:
        return int(-n * math.log(p) / (math.log(2) ** 2))
    
    def _optimal_hash_count(self, m: int, n: int) -> int:
        return max(1, int(m / n * math.log(2)))
    
    def _get_indexes(self, item: str) -> list[int]:
        indexes = []
        for seed in range(self.hash_count):
            hash_val = mmh3.hash(item, seed) % self.size
            indexes.append(hash_val)
        return indexes
    
    def add(self, item: str) -> bool:
        """Add item, returns True if item was new."""
        indexes = self._get_indexes(item)
        was_new = not all(self.bit_array[i] for i in indexes)
        
        for i in indexes:
            self.bit_array[i] = 1
        
        if was_new:
            self.count += 1
        return was_new
    
    def contains(self, item: str) -> bool:
        """Check if item might be in set."""
        return all(self.bit_array[i] for i in self._get_indexes(item))


class RedisBackedBloomFilter:
    """Distributed bloom filter using Redis."""
    
    def __init__(self, redis_client, key_prefix: str, 
                 expected_items: int, fp_rate: float = 0.01):
        self.redis = redis_client
        self.key_prefix = key_prefix
        self.size = int(-expected_items * math.log(fp_rate) / (math.log(2) ** 2))
        self.hash_count = max(1, int(self.size / expected_items * math.log(2)))
        # Partition across multiple Redis keys for parallelism
        self.partitions = 16
    
    async def add(self, url: str) -> bool:
        indexes = self._get_indexes(url)
        
        # Check existing bits
        pipe = self.redis.pipeline()
        for idx in indexes:
            partition = idx % self.partitions
            bit_pos = idx // self.partitions
            pipe.getbit(f"{self.key_prefix}:{partition}", bit_pos)
        
        results = await pipe.execute()
        was_new = not all(results)
        
        # Set bits
        pipe = self.redis.pipeline()
        for idx in indexes:
            partition = idx % self.partitions
            bit_pos = idx // self.partitions
            pipe.setbit(f"{self.key_prefix}:{partition}", bit_pos, 1)
        await pipe.execute()
        
        return was_new

For a crawler expecting 1 billion URLs with a 1% false positive rate, you need about 1.2 GB of memory. Partitioning across Redis keys enables parallel access and keeps any single key from becoming a bottleneck.

Fault Tolerance & Checkpointing

Workers will crash. Networks will fail. Your crawler must handle this gracefully. The key is tracking URL state transitions: pending → in_progress → completed/failed.

class CheckpointedCrawler:
    def __init__(self, frontier: PoliteFrontier, redis_client):
        self.frontier = frontier
        self.redis = redis_client
        self.checkpoint_interval = 100  # URLs between checkpoints
    
    async def claim_urls(self, worker_id: str, batch_size: int) -> list[CrawlURL]:
        urls = await self.frontier.pop(worker_id, batch_size)
        
        # Mark URLs as in-progress with TTL
        pipe = self.redis.pipeline()
        for url in urls:
            pipe.setex(
                f"inprogress:{url.url}",
                300,  # 5 minute TTL
                json.dumps({
                    "worker_id": worker_id,
                    "claimed_at": time.time(),
                    "url_data": url.__dict__
                })
            )
        await pipe.execute()
        
        return urls
    
    async def complete_url(self, url: str, success: bool):
        await self.redis.delete(f"inprogress:{url}")
        await self.frontier.mark_complete(url, success)
    
    async def recover_abandoned_urls(self):
        """Background task to reclaim URLs from crashed workers."""
        cursor = 0
        while True:
            cursor, keys = await self.redis.scan(
                cursor, match="inprogress:*", count=100
            )
            
            for key in keys:
                data = await self.redis.get(key)
                if data:
                    info = json.loads(data)
                    # TTL handles expiration, but we can also check age
                    if time.time() - info["claimed_at"] > 300:
                        # Re-queue the URL
                        url_data = info["url_data"]
                        await self.frontier.push([CrawlURL(**url_data)])
                        await self.redis.delete(key)
            
            if cursor == 0:
                break

This provides at-least-once semantics: a URL might be fetched twice if a worker crashes after fetching but before marking complete. For most crawlers, this is acceptable. Exactly-once requires more complex coordination that usually isn’t worth the overhead.

Scaling Considerations & Metrics

Target throughput determines your architecture. For 1,000 pages/second with an average fetch time of 500ms, you need at least 500 concurrent fetchers. In practice, plan for 2-3x that to handle variance.

Monitor these metrics: pages/second (your primary throughput indicator), frontier queue depth (growing queues mean you need more fetchers), p99 fetch latency (identifies slow domains), error rate by type (timeouts vs. connection errors vs. HTTP errors), and unique domains in progress (measures crawl breadth).

Shard the frontier when queue operations become a bottleneck—typically above 10,000 pushes/second. Add fetcher workers when queue depth grows consistently. Scale the content store based on ingestion rate and storage requirements.

For cloud deployment, run fetchers as stateless containers (Kubernetes works well), use managed Redis for the frontier, and stream content to object storage (S3/GCS) with metadata in a database. This separates compute scaling from storage scaling and keeps operational complexity manageable.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.