Design a Web Crawler: Distributed URL Fetching
Building a web crawler that fetches a few thousand pages is straightforward. Building one that fetches billions of pages across millions of domains while respecting rate limits, handling failures...
Key Insights
- A web crawler’s URL frontier is the heart of the system—design it with separate priority queues and per-host politeness queues to balance crawl efficiency with respectful rate limiting.
- Stateless fetcher workers with consistent hashing enable horizontal scaling while ensuring the same worker handles the same domain, making robots.txt caching and rate limiting trivial.
- Bloom filters provide memory-efficient URL deduplication at scale, but you must accept false positives—partition them across nodes and tune the false positive rate based on your crawl size.
Introduction & Problem Space
Building a web crawler that fetches a few thousand pages is straightforward. Building one that fetches billions of pages across millions of domains while respecting rate limits, handling failures gracefully, and avoiding duplicate work—that’s an entirely different engineering challenge.
At scale, web crawlers face four fundamental problems. Politeness requires limiting request rates per domain to avoid overwhelming servers or getting blocked. Deduplication prevents wasting resources re-fetching URLs you’ve already seen. Fault tolerance ensures worker crashes don’t lose progress or corrupt state. Throughput demands fetching thousands of pages per second across a distributed fleet.
This article walks through designing a production-grade distributed crawler, covering the URL frontier, fetcher workers, deduplication systems, and fault tolerance mechanisms. We’ll build something that can scale from thousands to billions of pages.
System Architecture Overview
A distributed web crawler consists of four core components working in concert. The URL Frontier manages the queue of URLs to crawl, handling prioritization and politeness. Fetcher Workers pull URLs from the frontier, download pages, and extract new URLs. The DNS Resolver provides cached, high-throughput domain resolution. The Content Store persists downloaded pages for downstream processing.
Data flows from seed URLs into the frontier, through fetchers that download content and extract links, back into the frontier for newly discovered URLs, with content flowing to persistent storage.
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Optional
import asyncio
@dataclass
class CrawlURL:
url: str
domain: str
priority: int
depth: int
discovered_at: float
@dataclass
class FetchResult:
url: str
status_code: int
content: Optional[bytes]
content_type: str
extracted_urls: list[str]
fetch_time_ms: int
class URLFrontier(ABC):
@abstractmethod
async def push(self, urls: list[CrawlURL]) -> int:
"""Add URLs to frontier, returns count of new URLs added."""
pass
@abstractmethod
async def pop(self, worker_id: str, batch_size: int) -> list[CrawlURL]:
"""Get next batch of URLs for a specific worker."""
pass
@abstractmethod
async def mark_complete(self, url: str, success: bool) -> None:
"""Mark URL as processed."""
pass
class Fetcher(ABC):
@abstractmethod
async def fetch(self, url: CrawlURL) -> FetchResult:
"""Fetch a single URL with retry logic."""
pass
class ContentStore(ABC):
@abstractmethod
async def store(self, url: str, result: FetchResult) -> str:
"""Store fetched content, returns content ID."""
pass
These interfaces define the contracts between components. The key insight is that each component should be independently scalable—you might need 10x more fetchers than frontier nodes.
URL Frontier Design
The URL frontier is the brain of your crawler. A naive queue won’t work because you need to balance two competing concerns: crawl important pages first (prioritization) while not hammering any single domain (politeness).
The solution is a two-tier queue architecture. Front queues are priority-based—high-priority URLs (like homepage updates) go to queue 1, discovered links go to queue 5. Back queues are per-domain—each domain gets its own queue with a timestamp indicating when it can next be crawled.
When a worker requests URLs, the frontier selects from the highest-priority front queue, then routes to the appropriate back queue, only returning URLs from domains that aren’t rate-limited.
import time
import asyncio
from collections import defaultdict
import redis.asyncio as redis
import json
class PoliteFrontier:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
self.min_crawl_delay = 1.0 # seconds between requests to same domain
self.priority_queues = 5
async def push(self, urls: list[CrawlURL]) -> int:
added = 0
pipe = self.redis.pipeline()
for crawl_url in urls:
# Check if URL already seen
seen_key = f"seen:{crawl_url.url}"
# Add to priority queue (sorted set with priority as score)
priority_key = f"priority:{crawl_url.priority}"
url_data = json.dumps({
"url": crawl_url.url,
"domain": crawl_url.domain,
"depth": crawl_url.depth,
"discovered_at": crawl_url.discovered_at
})
pipe.setnx(seen_key, 1)
pipe.zadd(priority_key, {url_data: crawl_url.discovered_at})
# Track domain membership
pipe.sadd(f"domain:{crawl_url.domain}", crawl_url.url)
added += 1
await pipe.execute()
return added
async def pop(self, worker_id: str, batch_size: int = 10) -> list[CrawlURL]:
results = []
now = time.time()
for priority in range(1, self.priority_queues + 1):
if len(results) >= batch_size:
break
priority_key = f"priority:{priority}"
# Get candidate URLs
candidates = await self.redis.zrange(
priority_key, 0, batch_size * 3, withscores=True
)
for url_data, score in candidates:
if len(results) >= batch_size:
break
data = json.loads(url_data)
domain = data["domain"]
# Check domain rate limit
next_allowed = await self.redis.get(f"ratelimit:{domain}")
if next_allowed and float(next_allowed) > now:
continue # Domain is rate-limited
# Claim this URL atomically
removed = await self.redis.zrem(priority_key, url_data)
if removed:
# Set rate limit for domain
await self.redis.setex(
f"ratelimit:{domain}",
int(self.min_crawl_delay * 2),
str(now + self.min_crawl_delay)
)
results.append(CrawlURL(
url=data["url"],
domain=domain,
priority=priority,
depth=data["depth"],
discovered_at=data["discovered_at"]
))
return results
This implementation uses Redis sorted sets for priority queues and simple key-value pairs for rate limiting. The pop method checks rate limits before returning URLs, ensuring politeness without blocking workers.
Distributed Fetcher Workers
Fetcher workers should be stateless and horizontally scalable. The key design decision is how to assign URLs to workers. Random assignment works but means every worker must cache robots.txt for every domain. Consistent hashing assigns all URLs from a domain to the same worker, making robots.txt caching trivial.
import asyncio
import aiohttp
from urllib.parse import urlparse
from urllib.robotparser import RobotFileParser
import hashlib
class AsyncFetcher:
def __init__(self, worker_id: str, user_agent: str):
self.worker_id = worker_id
self.user_agent = user_agent
self.robots_cache: dict[str, RobotFileParser] = {}
self.session: Optional[aiohttp.ClientSession] = None
self.max_retries = 3
self.timeout = aiohttp.ClientTimeout(total=30)
async def start(self):
self.session = aiohttp.ClientSession(
timeout=self.timeout,
headers={"User-Agent": self.user_agent}
)
async def fetch(self, crawl_url: CrawlURL) -> FetchResult:
# Check robots.txt first
if not await self._can_fetch(crawl_url.url):
return FetchResult(
url=crawl_url.url, status_code=403,
content=None, content_type="",
extracted_urls=[], fetch_time_ms=0
)
for attempt in range(self.max_retries):
try:
start = time.time()
async with self.session.get(
crawl_url.url,
allow_redirects=True,
max_redirects=5
) as response:
content = await response.read()
fetch_time = int((time.time() - start) * 1000)
extracted = []
if response.content_type.startswith("text/html"):
extracted = self._extract_urls(
content, crawl_url.url
)
return FetchResult(
url=str(response.url), # Final URL after redirects
status_code=response.status,
content=content,
content_type=response.content_type,
extracted_urls=extracted,
fetch_time_ms=fetch_time
)
except asyncio.TimeoutError:
if attempt == self.max_retries - 1:
return FetchResult(
url=crawl_url.url, status_code=408,
content=None, content_type="",
extracted_urls=[], fetch_time_ms=30000
)
await asyncio.sleep(2 ** attempt) # Exponential backoff
except aiohttp.ClientError as e:
if attempt == self.max_retries - 1:
return FetchResult(
url=crawl_url.url, status_code=0,
content=None, content_type="",
extracted_urls=[], fetch_time_ms=0
)
await asyncio.sleep(2 ** attempt)
async def _can_fetch(self, url: str) -> bool:
parsed = urlparse(url)
domain = parsed.netloc
if domain not in self.robots_cache:
robots_url = f"{parsed.scheme}://{domain}/robots.txt"
try:
async with self.session.get(robots_url) as resp:
if resp.status == 200:
content = await resp.text()
rp = RobotFileParser()
rp.parse(content.splitlines())
self.robots_cache[domain] = rp
else:
self.robots_cache[domain] = None
except:
self.robots_cache[domain] = None
rp = self.robots_cache.get(domain)
return rp is None or rp.can_fetch(self.user_agent, url)
The fetcher handles retries with exponential backoff, follows redirects (tracking the final URL), respects robots.txt, and extracts links from HTML content. Each worker maintains its own robots.txt cache, which works well with consistent hashing.
URL Deduplication at Scale
You can’t store billions of URLs in a hash set—you’ll run out of memory. Bloom filters provide probabilistic set membership with fixed memory usage. The trade-off is false positives: the filter might say you’ve seen a URL when you haven’t. False negatives never occur.
import mmh3
from bitarray import bitarray
import math
class ScalableBloomFilter:
def __init__(self, expected_items: int, fp_rate: float = 0.01):
self.fp_rate = fp_rate
self.size = self._optimal_size(expected_items, fp_rate)
self.hash_count = self._optimal_hash_count(self.size, expected_items)
self.bit_array = bitarray(self.size)
self.bit_array.setall(0)
self.count = 0
def _optimal_size(self, n: int, p: float) -> int:
return int(-n * math.log(p) / (math.log(2) ** 2))
def _optimal_hash_count(self, m: int, n: int) -> int:
return max(1, int(m / n * math.log(2)))
def _get_indexes(self, item: str) -> list[int]:
indexes = []
for seed in range(self.hash_count):
hash_val = mmh3.hash(item, seed) % self.size
indexes.append(hash_val)
return indexes
def add(self, item: str) -> bool:
"""Add item, returns True if item was new."""
indexes = self._get_indexes(item)
was_new = not all(self.bit_array[i] for i in indexes)
for i in indexes:
self.bit_array[i] = 1
if was_new:
self.count += 1
return was_new
def contains(self, item: str) -> bool:
"""Check if item might be in set."""
return all(self.bit_array[i] for i in self._get_indexes(item))
class RedisBackedBloomFilter:
"""Distributed bloom filter using Redis."""
def __init__(self, redis_client, key_prefix: str,
expected_items: int, fp_rate: float = 0.01):
self.redis = redis_client
self.key_prefix = key_prefix
self.size = int(-expected_items * math.log(fp_rate) / (math.log(2) ** 2))
self.hash_count = max(1, int(self.size / expected_items * math.log(2)))
# Partition across multiple Redis keys for parallelism
self.partitions = 16
async def add(self, url: str) -> bool:
indexes = self._get_indexes(url)
# Check existing bits
pipe = self.redis.pipeline()
for idx in indexes:
partition = idx % self.partitions
bit_pos = idx // self.partitions
pipe.getbit(f"{self.key_prefix}:{partition}", bit_pos)
results = await pipe.execute()
was_new = not all(results)
# Set bits
pipe = self.redis.pipeline()
for idx in indexes:
partition = idx % self.partitions
bit_pos = idx // self.partitions
pipe.setbit(f"{self.key_prefix}:{partition}", bit_pos, 1)
await pipe.execute()
return was_new
For a crawler expecting 1 billion URLs with a 1% false positive rate, you need about 1.2 GB of memory. Partitioning across Redis keys enables parallel access and keeps any single key from becoming a bottleneck.
Fault Tolerance & Checkpointing
Workers will crash. Networks will fail. Your crawler must handle this gracefully. The key is tracking URL state transitions: pending → in_progress → completed/failed.
class CheckpointedCrawler:
def __init__(self, frontier: PoliteFrontier, redis_client):
self.frontier = frontier
self.redis = redis_client
self.checkpoint_interval = 100 # URLs between checkpoints
async def claim_urls(self, worker_id: str, batch_size: int) -> list[CrawlURL]:
urls = await self.frontier.pop(worker_id, batch_size)
# Mark URLs as in-progress with TTL
pipe = self.redis.pipeline()
for url in urls:
pipe.setex(
f"inprogress:{url.url}",
300, # 5 minute TTL
json.dumps({
"worker_id": worker_id,
"claimed_at": time.time(),
"url_data": url.__dict__
})
)
await pipe.execute()
return urls
async def complete_url(self, url: str, success: bool):
await self.redis.delete(f"inprogress:{url}")
await self.frontier.mark_complete(url, success)
async def recover_abandoned_urls(self):
"""Background task to reclaim URLs from crashed workers."""
cursor = 0
while True:
cursor, keys = await self.redis.scan(
cursor, match="inprogress:*", count=100
)
for key in keys:
data = await self.redis.get(key)
if data:
info = json.loads(data)
# TTL handles expiration, but we can also check age
if time.time() - info["claimed_at"] > 300:
# Re-queue the URL
url_data = info["url_data"]
await self.frontier.push([CrawlURL(**url_data)])
await self.redis.delete(key)
if cursor == 0:
break
This provides at-least-once semantics: a URL might be fetched twice if a worker crashes after fetching but before marking complete. For most crawlers, this is acceptable. Exactly-once requires more complex coordination that usually isn’t worth the overhead.
Scaling Considerations & Metrics
Target throughput determines your architecture. For 1,000 pages/second with an average fetch time of 500ms, you need at least 500 concurrent fetchers. In practice, plan for 2-3x that to handle variance.
Monitor these metrics: pages/second (your primary throughput indicator), frontier queue depth (growing queues mean you need more fetchers), p99 fetch latency (identifies slow domains), error rate by type (timeouts vs. connection errors vs. HTTP errors), and unique domains in progress (measures crawl breadth).
Shard the frontier when queue operations become a bottleneck—typically above 10,000 pushes/second. Add fetcher workers when queue depth grows consistently. Scale the content store based on ingestion rate and storage requirements.
For cloud deployment, run fetchers as stateless containers (Kubernetes works well), use managed Redis for the frontier, and stream content to object storage (S3/GCS) with metadata in a database. This separates compute scaling from storage scaling and keeps operational complexity manageable.