Design a Search Engine: Web Crawling and Indexing
Building a search engine requires clear thinking about what you're actually building. Let's define the scope.
Key Insights
- A search engine’s crawl efficiency depends entirely on URL frontier design—get politeness and priority wrong, and you’ll either get blocked or waste resources on low-value pages
- Near-duplicate detection using SimHash saves enormous storage costs and improves search quality by eliminating redundant content before indexing
- Inverted index partitioning strategy (document-based vs. term-based) fundamentally shapes your query latency and system complexity—choose based on your query patterns
System Requirements and Scale
Building a search engine requires clear thinking about what you’re actually building. Let’s define the scope.
Functional requirements:
- Crawl the web continuously, discovering new and updated pages
- Build a searchable index from crawled content
- Serve queries with relevant results (we’ll focus on crawling and indexing here)
Non-functional requirements:
- Crawl billions of pages (target: 100 billion)
- Maintain freshness (re-crawl important pages within hours, others within weeks)
- Respect politeness policies (don’t DDoS websites)
- Handle failures gracefully (the web is hostile)
Scale assumptions:
- 100 billion pages
- Average page size: 10KB compressed
- Storage for raw content: ~1 PB
- Index size: ~100-500 TB (depending on compression)
- Crawl rate: 10,000+ pages per second to maintain freshness
These numbers drive every architectural decision. You can’t build this on a single machine, and you can’t afford to be sloppy about efficiency.
High-Level Architecture Overview
The system breaks into three core subsystems that operate as a continuous pipeline:
┌─────────────────────────────────────────────────────────────────────┐
│ SEARCH ENGINE ARCHITECTURE │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────────────┐ │
│ │ URL │───▶│ Crawler │───▶│ Content Processing │ │
│ │ Frontier │ │ Service │ │ Pipeline │ │
│ └──────────────┘ └──────────────┘ └──────────────────────┘ │
│ ▲ │ │
│ │ ▼ │
│ │ ┌──────────────────────┐ │
│ └─────────────────────────────────│ Indexer │ │
│ (discovered URLs) └──────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────┐ │
│ │ Inverted Index │ │
│ │ (Sharded) │ │
│ └──────────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
URL Frontier: Manages what to crawl next. Handles prioritization, politeness, and distributed coordination.
Crawler Service: Fetches pages, handles HTTP complexities, extracts content.
Content Processing Pipeline: Parses HTML, extracts links, detects duplicates, prepares content for indexing.
Indexer: Builds and maintains the inverted index that powers search queries.
Each component scales independently. The frontier is coordination-heavy, crawlers are I/O-bound, and indexers are CPU and storage-bound.
URL Frontier and Crawl Scheduling
The URL frontier is deceptively complex. It’s not just a queue—it’s a distributed priority system that must balance competing concerns.
Core requirements:
- Politeness: Never hit the same domain too frequently
- Priority: Crawl important pages first
- Freshness: Re-crawl pages based on change frequency
- Distribution: Coordinate across hundreds of crawler instances
import heapq
import time
from collections import defaultdict
from dataclasses import dataclass, field
from typing import Optional
import hashlib
@dataclass(order=True)
class CrawlTask:
priority: float
scheduled_time: float
url: str = field(compare=False)
depth: int = field(compare=False)
class URLFrontier:
def __init__(self, politeness_delay: float = 1.0):
self.politeness_delay = politeness_delay
# Per-domain queues for politeness
self.domain_queues: dict[str, list[CrawlTask]] = defaultdict(list)
# Track last crawl time per domain
self.domain_last_crawl: dict[str, float] = {}
# Global priority queue of (next_available_time, domain)
self.ready_domains: list[tuple[float, str]] = []
# Seen URLs (in production, use bloom filter or distributed set)
self.seen_urls: set[str] = set()
def _extract_domain(self, url: str) -> str:
from urllib.parse import urlparse
return urlparse(url).netloc
def _calculate_priority(self, url: str, depth: int) -> float:
"""Lower score = higher priority"""
# Prioritize by depth (BFS-like behavior)
base_priority = depth * 10
# Boost homepage and important paths
if url.endswith('/') or '/index' in url:
base_priority -= 5
return base_priority
def add_url(self, url: str, depth: int = 0) -> bool:
"""Add URL to frontier. Returns False if already seen."""
# Normalize URL first
normalized = self._normalize_url(url)
if normalized in self.seen_urls:
return False
self.seen_urls.add(normalized)
domain = self._extract_domain(normalized)
priority = self._calculate_priority(normalized, depth)
task = CrawlTask(
priority=priority,
scheduled_time=time.time(),
url=normalized,
depth=depth
)
# Add to domain-specific queue
heapq.heappush(self.domain_queues[domain], task)
# If domain not in ready queue, add it
if domain not in self.domain_last_crawl:
heapq.heappush(self.ready_domains, (0, domain))
self.domain_last_crawl[domain] = 0
return True
def _normalize_url(self, url: str) -> str:
"""Canonicalize URL for deduplication"""
from urllib.parse import urlparse, urlunparse, parse_qsl, urlencode
parsed = urlparse(url.lower())
# Sort query parameters
query = urlencode(sorted(parse_qsl(parsed.query)))
# Remove fragments, normalize path
path = parsed.path.rstrip('/') or '/'
return urlunparse((
parsed.scheme, parsed.netloc, path, '', query, ''
))
def get_next(self) -> Optional[CrawlTask]:
"""Get next URL respecting politeness constraints"""
current_time = time.time()
while self.ready_domains:
next_time, domain = heapq.heappop(self.ready_domains)
if next_time > current_time:
# No domains ready yet, put it back
heapq.heappush(self.ready_domains, (next_time, domain))
return None
if not self.domain_queues[domain]:
continue
task = heapq.heappop(self.domain_queues[domain])
# Schedule next crawl for this domain
next_available = current_time + self.politeness_delay
self.domain_last_crawl[domain] = current_time
if self.domain_queues[domain]:
heapq.heappush(self.ready_domains, (next_available, domain))
return task
return None
For robots.txt compliance, parse and cache directives per domain:
from urllib.robotparser import RobotFileParser
from functools import lru_cache
class RobotsChecker:
def __init__(self, user_agent: str = "MySearchBot/1.0"):
self.user_agent = user_agent
self._cache: dict[str, RobotFileParser] = {}
def can_fetch(self, url: str) -> bool:
from urllib.parse import urlparse
parsed = urlparse(url)
robots_url = f"{parsed.scheme}://{parsed.netloc}/robots.txt"
if robots_url not in self._cache:
rp = RobotFileParser()
rp.set_url(robots_url)
try:
rp.read()
except Exception:
# If we can't fetch robots.txt, assume allowed
return True
self._cache[robots_url] = rp
return self._cache[robots_url].can_fetch(self.user_agent, url)
def crawl_delay(self, domain: str) -> Optional[float]:
"""Get crawl delay specified in robots.txt"""
robots_url = f"https://{domain}/robots.txt"
if robots_url in self._cache:
return self._cache[robots_url].crawl_delay(self.user_agent)
return None
Distributed Web Crawler Design
Crawler workers are the workhorses. They need to be fast, resilient, and respectful.
import asyncio
import aiohttp
from dataclasses import dataclass
from typing import Optional
import hashlib
@dataclass
class CrawlResult:
url: str
status_code: int
content: Optional[bytes]
content_type: str
headers: dict
fetch_time_ms: float
error: Optional[str] = None
class CrawlerWorker:
def __init__(
self,
worker_id: str,
max_concurrent: int = 100,
timeout_seconds: float = 30,
max_content_size: int = 10 * 1024 * 1024 # 10MB
):
self.worker_id = worker_id
self.max_concurrent = max_concurrent
self.timeout = aiohttp.ClientTimeout(total=timeout_seconds)
self.max_content_size = max_content_size
self.semaphore = asyncio.Semaphore(max_concurrent)
self._session: Optional[aiohttp.ClientSession] = None
async def _get_session(self) -> aiohttp.ClientSession:
if self._session is None or self._session.closed:
self._session = aiohttp.ClientSession(
timeout=self.timeout,
headers={"User-Agent": "MySearchBot/1.0"}
)
return self._session
async def fetch(self, url: str, max_redirects: int = 5) -> CrawlResult:
"""Fetch a single URL with proper error handling"""
start_time = asyncio.get_event_loop().time()
async with self.semaphore:
try:
session = await self._get_session()
async with session.get(
url,
allow_redirects=True,
max_redirects=max_redirects
) as response:
# Check content length before downloading
content_length = response.headers.get('Content-Length')
if content_length and int(content_length) > self.max_content_size:
return CrawlResult(
url=url,
status_code=response.status,
content=None,
content_type=response.content_type or '',
headers=dict(response.headers),
fetch_time_ms=(asyncio.get_event_loop().time() - start_time) * 1000,
error="Content too large"
)
content = await response.read()
return CrawlResult(
url=str(response.url), # Final URL after redirects
status_code=response.status,
content=content,
content_type=response.content_type or '',
headers=dict(response.headers),
fetch_time_ms=(asyncio.get_event_loop().time() - start_time) * 1000
)
except asyncio.TimeoutError:
return CrawlResult(
url=url, status_code=0, content=None, content_type='',
headers={}, fetch_time_ms=(asyncio.get_event_loop().time() - start_time) * 1000,
error="Timeout"
)
except aiohttp.ClientError as e:
return CrawlResult(
url=url, status_code=0, content=None, content_type='',
headers={}, fetch_time_ms=(asyncio.get_event_loop().time() - start_time) * 1000,
error=str(e)
)
async def batch_fetch(self, urls: list[str]) -> list[CrawlResult]:
"""Fetch multiple URLs concurrently"""
tasks = [self.fetch(url) for url in urls]
return await asyncio.gather(*tasks)
def assign_url_to_worker(url: str, num_workers: int) -> int:
"""Consistent hashing for URL-to-worker assignment"""
from urllib.parse import urlparse
domain = urlparse(url).netloc
# Hash domain to ensure same domain goes to same worker (helps with politeness)
hash_value = int(hashlib.md5(domain.encode()).hexdigest(), 16)
return hash_value % num_workers
Content Processing Pipeline
After fetching, we need to extract useful content and detect duplicates.
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
import re
class ContentProcessor:
def __init__(self):
self.tag_weights = {
'title': 10, 'h1': 5, 'h2': 3, 'h3': 2,
'strong': 1.5, 'b': 1.5, 'em': 1.2
}
def extract_content(self, html: bytes, base_url: str) -> dict:
"""Extract text, links, and metadata from HTML"""
soup = BeautifulSoup(html, 'lxml')
# Remove script and style elements
for element in soup(['script', 'style', 'nav', 'footer', 'header']):
element.decompose()
# Extract title
title = soup.title.string if soup.title else ''
# Extract main text
text = soup.get_text(separator=' ', strip=True)
text = re.sub(r'\s+', ' ', text)
# Extract links
links = []
for a in soup.find_all('a', href=True):
href = a['href']
absolute_url = urljoin(base_url, href)
# Filter out non-HTTP links
if urlparse(absolute_url).scheme in ('http', 'https'):
links.append(self._normalize_url(absolute_url))
return {
'title': title,
'text': text,
'links': list(set(links)), # Dedupe
'word_count': len(text.split())
}
def _normalize_url(self, url: str) -> str:
"""Canonicalize URL"""
from urllib.parse import urlparse, urlunparse
parsed = urlparse(url)
# Remove fragments, lowercase
return urlunparse((
parsed.scheme.lower(),
parsed.netloc.lower(),
parsed.path.rstrip('/') or '/',
'', '', ''
))
class SimHash:
"""SimHash for near-duplicate detection"""
def __init__(self, hash_bits: int = 64):
self.hash_bits = hash_bits
def _tokenize(self, text: str) -> list[str]:
"""Generate shingles (n-grams) from text"""
words = text.lower().split()
# 3-word shingles
return [' '.join(words[i:i+3]) for i in range(len(words) - 2)]
def _hash_token(self, token: str) -> int:
"""Hash a single token to 64-bit integer"""
return int(hashlib.md5(token.encode()).hexdigest()[:16], 16)
def compute(self, text: str) -> int:
"""Compute SimHash fingerprint"""
tokens = self._tokenize(text)
if not tokens:
return 0
# Initialize bit counts
v = [0] * self.hash_bits
for token in tokens:
token_hash = self._hash_token(token)
for i in range(self.hash_bits):
if token_hash & (1 << i):
v[i] += 1
else:
v[i] -= 1
# Convert to fingerprint
fingerprint = 0
for i in range(self.hash_bits):
if v[i] > 0:
fingerprint |= (1 << i)
return fingerprint
def hamming_distance(self, hash1: int, hash2: int) -> int:
"""Count differing bits between two hashes"""
xor = hash1 ^ hash2
return bin(xor).count('1')
def is_near_duplicate(self, hash1: int, hash2: int, threshold: int = 3) -> bool:
"""Check if two documents are near-duplicates"""
return self.hamming_distance(hash1, hash2) <= threshold
Inverted Index Construction
The inverted index is the heart of search. It maps terms to documents.
from collections import defaultdict
from dataclasses import dataclass
import math
import struct
@dataclass
class Posting:
doc_id: int
term_frequency: int
positions: list[int] # For phrase queries
class InvertedIndexBuilder:
def __init__(self):
self.index: dict[str, list[Posting]] = defaultdict(list)
self.doc_lengths: dict[int, int] = {}
self.doc_count = 0
self.total_docs = 0
def _tokenize(self, text: str) -> list[str]:
"""Basic tokenization with stemming"""
import re
# Lowercase and split on non-alphanumeric
tokens = re.findall(r'\b[a-z0-9]+\b', text.lower())
# Simple suffix stripping (use Porter Stemmer in production)
return [self._stem(t) for t in tokens if len(t) > 2]
def _stem(self, word: str) -> str:
"""Naive stemming - use NLTK PorterStemmer in production"""
suffixes = ['ing', 'ed', 'ly', 's', 'es']
for suffix in suffixes:
if word.endswith(suffix) and len(word) > len(suffix) + 2:
return word[:-len(suffix)]
return word
def add_document(self, doc_id: int, text: str):
"""Index a single document"""
tokens = self._tokenize(text)
self.doc_lengths[doc_id] = len(tokens)
self.total_docs += 1
# Count term frequencies and positions
term_positions: dict[str, list[int]] = defaultdict(list)
for pos, token in enumerate(tokens):
term_positions[token].append(pos)
# Add to index
for term, positions in term_positions.items():
posting = Posting(
doc_id=doc_id,
term_frequency=len(positions),
positions=positions
)
self.index[term].append(posting)
def compute_tfidf(self, term: str, doc_id: int) -> float:
"""Compute TF-IDF score for a term in a document"""
postings = self.index.get(term, [])
# Find the posting for this document
posting = next((p for p in postings if p.doc_id == doc_id), None)
if not posting:
return 0.0
# TF: log-scaled term frequency
tf = 1 + math.log10(posting.term_frequency) if posting.term_frequency > 0 else 0
# IDF: inverse document frequency
df = len(postings)
idf = math.log10(self.total_docs / df) if df > 0 else 0
return tf * idf
def merge_indices(self, other: 'InvertedIndexBuilder'):
"""Merge another index into this one (for distributed building)"""
for term, postings in other.index.items():
self.index[term].extend(postings)
# Sort by doc_id for efficient querying
self.index[term].sort(key=lambda p: p.doc_id)
self.doc_lengths.update(other.doc_lengths)
self.total_docs += other.total_docs
def serialize_postings(self, term: str) -> bytes:
"""Serialize posting list for storage (variable-byte encoding)"""
postings = self.index.get(term, [])
data = []
prev_doc_id = 0
for posting in postings:
# Delta encoding for doc_ids
delta = posting.doc_id - prev_doc_id
data.append(self._vbyte_encode(delta))
data.append(self._vbyte_encode(posting.term_frequency))
prev_doc_id = posting.doc_id
return b''.join(data)
def _vbyte_encode(self, n: int) -> bytes:
"""Variable-byte encoding for integers"""
result = []
while n >= 128:
result.append(n & 0x7f)
n >>= 7
result.append(n | 0x80) # Set high bit on last byte
return bytes(result)
Storage and Scaling Considerations
At scale, storage strategy determines both cost and performance.
from abc import ABC, abstractmethod
from typing import Optional
import hashlib
class StorageBackend(ABC):
@abstractmethod
def put(self, key: str, value: bytes) -> None:
pass
@abstractmethod
def get(self, key: str) -> Optional[bytes]:
pass
class ShardedStorage:
"""Distribute data across multiple storage backends"""
def __init__(self, backends: list[StorageBackend], replication_factor: int = 2):
self.backends = backends
self.num_shards = len(backends)
self.replication_factor = replication_factor
def _get_shard_ids(self, key: str) -> list[int]:
"""Get shard IDs for a key (primary + replicas)"""
hash_val = int(hashlib.sha256(key.encode()).hexdigest(), 16)
primary = hash_val % self.num_shards
# Simple replication: consecutive shards
shards = []
for i in range(self.replication_factor):
shards.append((primary + i) % self.num_shards)
return shards
def put(self, key: str, value: bytes) -> None:
"""Write to primary and replica shards"""
for shard_id in self._get_shard_ids(key):
self.backends[shard_id].put(key, value)
def get(self, key: str) -> Optional[bytes]:
"""Read from any available shard"""
for shard_id in self._get_shard_ids(key):
try:
result = self.backends[shard_id].get(key)
if result is not None:
return result
except Exception:
continue # Try next replica
return None
def partition_index_by_term(term: str, num_partitions: int) -> int:
"""Term-based partitioning: same term always on same shard"""
return int(hashlib.md5(term.encode()).hexdigest(), 16) % num_partitions
def partition_index_by_doc(doc_id: int, num_partitions: int) -> int:
"""Document-based partitioning: full index per shard, different docs"""
return doc_id % num_partitions
Storage tier recommendations:
- Raw HTML: Object storage (S3, GCS). Cheap, durable, rarely accessed after indexing.
- Document metadata: Distributed KV store (Cassandra, DynamoDB). Frequent reads, moderate writes.
- Inverted index: Custom storage or RocksDB. Optimized for sequential reads of posting lists.
- URL frontier state: Redis Cluster. Fast reads/writes, can tolerate some data loss.
Partitioning tradeoffs:
- Term-based: Each shard has complete posting lists for its terms. Queries hit all shards but each returns complete results. Better for single-term queries.
- Document-based: Each shard has a complete index for its documents. Queries hit all shards, results must be merged. Better for complex queries and index updates.
For most search engines, document-based partitioning wins because it simplifies incremental updates—you only modify one shard when a document changes.
The system I’ve outlined handles the core challenges: politeness at scale, efficient duplicate detection, and a queryable index structure. Production systems add layers of complexity—better ranking signals, real-time indexing, geographic distribution—but these foundations remain constant.