Design a Search Engine: Web Crawling and Indexing

Building a search engine requires clear thinking about what you're actually building. Let's define the scope.

Key Insights

  • A search engine’s crawl efficiency depends entirely on URL frontier design—get politeness and priority wrong, and you’ll either get blocked or waste resources on low-value pages
  • Near-duplicate detection using SimHash saves enormous storage costs and improves search quality by eliminating redundant content before indexing
  • Inverted index partitioning strategy (document-based vs. term-based) fundamentally shapes your query latency and system complexity—choose based on your query patterns

System Requirements and Scale

Building a search engine requires clear thinking about what you’re actually building. Let’s define the scope.

Functional requirements:

  • Crawl the web continuously, discovering new and updated pages
  • Build a searchable index from crawled content
  • Serve queries with relevant results (we’ll focus on crawling and indexing here)

Non-functional requirements:

  • Crawl billions of pages (target: 100 billion)
  • Maintain freshness (re-crawl important pages within hours, others within weeks)
  • Respect politeness policies (don’t DDoS websites)
  • Handle failures gracefully (the web is hostile)

Scale assumptions:

  • 100 billion pages
  • Average page size: 10KB compressed
  • Storage for raw content: ~1 PB
  • Index size: ~100-500 TB (depending on compression)
  • Crawl rate: 10,000+ pages per second to maintain freshness

These numbers drive every architectural decision. You can’t build this on a single machine, and you can’t afford to be sloppy about efficiency.

High-Level Architecture Overview

The system breaks into three core subsystems that operate as a continuous pipeline:

┌─────────────────────────────────────────────────────────────────────┐
│                        SEARCH ENGINE ARCHITECTURE                   │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  ┌──────────────┐    ┌──────────────┐    ┌──────────────────────┐  │
│  │  URL         │───▶│  Crawler     │───▶│  Content Processing  │  │
│  │  Frontier    │    │  Service     │    │  Pipeline            │  │
│  └──────────────┘    └──────────────┘    └──────────────────────┘  │
│         ▲                                          │               │
│         │                                          ▼               │
│         │                                 ┌──────────────────────┐ │
│         └─────────────────────────────────│  Indexer             │ │
│              (discovered URLs)            └──────────────────────┘ │
│                                                    │               │
│                                                    ▼               │
│                                           ┌──────────────────────┐ │
│                                           │  Inverted Index      │ │
│                                           │  (Sharded)           │ │
│                                           └──────────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘

URL Frontier: Manages what to crawl next. Handles prioritization, politeness, and distributed coordination.

Crawler Service: Fetches pages, handles HTTP complexities, extracts content.

Content Processing Pipeline: Parses HTML, extracts links, detects duplicates, prepares content for indexing.

Indexer: Builds and maintains the inverted index that powers search queries.

Each component scales independently. The frontier is coordination-heavy, crawlers are I/O-bound, and indexers are CPU and storage-bound.

URL Frontier and Crawl Scheduling

The URL frontier is deceptively complex. It’s not just a queue—it’s a distributed priority system that must balance competing concerns.

Core requirements:

  1. Politeness: Never hit the same domain too frequently
  2. Priority: Crawl important pages first
  3. Freshness: Re-crawl pages based on change frequency
  4. Distribution: Coordinate across hundreds of crawler instances
import heapq
import time
from collections import defaultdict
from dataclasses import dataclass, field
from typing import Optional
import hashlib

@dataclass(order=True)
class CrawlTask:
    priority: float
    scheduled_time: float
    url: str = field(compare=False)
    depth: int = field(compare=False)

class URLFrontier:
    def __init__(self, politeness_delay: float = 1.0):
        self.politeness_delay = politeness_delay
        # Per-domain queues for politeness
        self.domain_queues: dict[str, list[CrawlTask]] = defaultdict(list)
        # Track last crawl time per domain
        self.domain_last_crawl: dict[str, float] = {}
        # Global priority queue of (next_available_time, domain)
        self.ready_domains: list[tuple[float, str]] = []
        # Seen URLs (in production, use bloom filter or distributed set)
        self.seen_urls: set[str] = set()
    
    def _extract_domain(self, url: str) -> str:
        from urllib.parse import urlparse
        return urlparse(url).netloc
    
    def _calculate_priority(self, url: str, depth: int) -> float:
        """Lower score = higher priority"""
        # Prioritize by depth (BFS-like behavior)
        base_priority = depth * 10
        # Boost homepage and important paths
        if url.endswith('/') or '/index' in url:
            base_priority -= 5
        return base_priority
    
    def add_url(self, url: str, depth: int = 0) -> bool:
        """Add URL to frontier. Returns False if already seen."""
        # Normalize URL first
        normalized = self._normalize_url(url)
        if normalized in self.seen_urls:
            return False
        
        self.seen_urls.add(normalized)
        domain = self._extract_domain(normalized)
        priority = self._calculate_priority(normalized, depth)
        
        task = CrawlTask(
            priority=priority,
            scheduled_time=time.time(),
            url=normalized,
            depth=depth
        )
        
        # Add to domain-specific queue
        heapq.heappush(self.domain_queues[domain], task)
        
        # If domain not in ready queue, add it
        if domain not in self.domain_last_crawl:
            heapq.heappush(self.ready_domains, (0, domain))
            self.domain_last_crawl[domain] = 0
        
        return True
    
    def _normalize_url(self, url: str) -> str:
        """Canonicalize URL for deduplication"""
        from urllib.parse import urlparse, urlunparse, parse_qsl, urlencode
        parsed = urlparse(url.lower())
        # Sort query parameters
        query = urlencode(sorted(parse_qsl(parsed.query)))
        # Remove fragments, normalize path
        path = parsed.path.rstrip('/') or '/'
        return urlunparse((
            parsed.scheme, parsed.netloc, path, '', query, ''
        ))
    
    def get_next(self) -> Optional[CrawlTask]:
        """Get next URL respecting politeness constraints"""
        current_time = time.time()
        
        while self.ready_domains:
            next_time, domain = heapq.heappop(self.ready_domains)
            
            if next_time > current_time:
                # No domains ready yet, put it back
                heapq.heappush(self.ready_domains, (next_time, domain))
                return None
            
            if not self.domain_queues[domain]:
                continue
            
            task = heapq.heappop(self.domain_queues[domain])
            
            # Schedule next crawl for this domain
            next_available = current_time + self.politeness_delay
            self.domain_last_crawl[domain] = current_time
            
            if self.domain_queues[domain]:
                heapq.heappush(self.ready_domains, (next_available, domain))
            
            return task
        
        return None

For robots.txt compliance, parse and cache directives per domain:

from urllib.robotparser import RobotFileParser
from functools import lru_cache

class RobotsChecker:
    def __init__(self, user_agent: str = "MySearchBot/1.0"):
        self.user_agent = user_agent
        self._cache: dict[str, RobotFileParser] = {}
    
    def can_fetch(self, url: str) -> bool:
        from urllib.parse import urlparse
        parsed = urlparse(url)
        robots_url = f"{parsed.scheme}://{parsed.netloc}/robots.txt"
        
        if robots_url not in self._cache:
            rp = RobotFileParser()
            rp.set_url(robots_url)
            try:
                rp.read()
            except Exception:
                # If we can't fetch robots.txt, assume allowed
                return True
            self._cache[robots_url] = rp
        
        return self._cache[robots_url].can_fetch(self.user_agent, url)
    
    def crawl_delay(self, domain: str) -> Optional[float]:
        """Get crawl delay specified in robots.txt"""
        robots_url = f"https://{domain}/robots.txt"
        if robots_url in self._cache:
            return self._cache[robots_url].crawl_delay(self.user_agent)
        return None

Distributed Web Crawler Design

Crawler workers are the workhorses. They need to be fast, resilient, and respectful.

import asyncio
import aiohttp
from dataclasses import dataclass
from typing import Optional
import hashlib

@dataclass
class CrawlResult:
    url: str
    status_code: int
    content: Optional[bytes]
    content_type: str
    headers: dict
    fetch_time_ms: float
    error: Optional[str] = None

class CrawlerWorker:
    def __init__(
        self,
        worker_id: str,
        max_concurrent: int = 100,
        timeout_seconds: float = 30,
        max_content_size: int = 10 * 1024 * 1024  # 10MB
    ):
        self.worker_id = worker_id
        self.max_concurrent = max_concurrent
        self.timeout = aiohttp.ClientTimeout(total=timeout_seconds)
        self.max_content_size = max_content_size
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self._session: Optional[aiohttp.ClientSession] = None
    
    async def _get_session(self) -> aiohttp.ClientSession:
        if self._session is None or self._session.closed:
            self._session = aiohttp.ClientSession(
                timeout=self.timeout,
                headers={"User-Agent": "MySearchBot/1.0"}
            )
        return self._session
    
    async def fetch(self, url: str, max_redirects: int = 5) -> CrawlResult:
        """Fetch a single URL with proper error handling"""
        start_time = asyncio.get_event_loop().time()
        
        async with self.semaphore:
            try:
                session = await self._get_session()
                async with session.get(
                    url,
                    allow_redirects=True,
                    max_redirects=max_redirects
                ) as response:
                    # Check content length before downloading
                    content_length = response.headers.get('Content-Length')
                    if content_length and int(content_length) > self.max_content_size:
                        return CrawlResult(
                            url=url,
                            status_code=response.status,
                            content=None,
                            content_type=response.content_type or '',
                            headers=dict(response.headers),
                            fetch_time_ms=(asyncio.get_event_loop().time() - start_time) * 1000,
                            error="Content too large"
                        )
                    
                    content = await response.read()
                    
                    return CrawlResult(
                        url=str(response.url),  # Final URL after redirects
                        status_code=response.status,
                        content=content,
                        content_type=response.content_type or '',
                        headers=dict(response.headers),
                        fetch_time_ms=(asyncio.get_event_loop().time() - start_time) * 1000
                    )
            
            except asyncio.TimeoutError:
                return CrawlResult(
                    url=url, status_code=0, content=None, content_type='',
                    headers={}, fetch_time_ms=(asyncio.get_event_loop().time() - start_time) * 1000,
                    error="Timeout"
                )
            except aiohttp.ClientError as e:
                return CrawlResult(
                    url=url, status_code=0, content=None, content_type='',
                    headers={}, fetch_time_ms=(asyncio.get_event_loop().time() - start_time) * 1000,
                    error=str(e)
                )
    
    async def batch_fetch(self, urls: list[str]) -> list[CrawlResult]:
        """Fetch multiple URLs concurrently"""
        tasks = [self.fetch(url) for url in urls]
        return await asyncio.gather(*tasks)

def assign_url_to_worker(url: str, num_workers: int) -> int:
    """Consistent hashing for URL-to-worker assignment"""
    from urllib.parse import urlparse
    domain = urlparse(url).netloc
    # Hash domain to ensure same domain goes to same worker (helps with politeness)
    hash_value = int(hashlib.md5(domain.encode()).hexdigest(), 16)
    return hash_value % num_workers

Content Processing Pipeline

After fetching, we need to extract useful content and detect duplicates.

from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
import re

class ContentProcessor:
    def __init__(self):
        self.tag_weights = {
            'title': 10, 'h1': 5, 'h2': 3, 'h3': 2,
            'strong': 1.5, 'b': 1.5, 'em': 1.2
        }
    
    def extract_content(self, html: bytes, base_url: str) -> dict:
        """Extract text, links, and metadata from HTML"""
        soup = BeautifulSoup(html, 'lxml')
        
        # Remove script and style elements
        for element in soup(['script', 'style', 'nav', 'footer', 'header']):
            element.decompose()
        
        # Extract title
        title = soup.title.string if soup.title else ''
        
        # Extract main text
        text = soup.get_text(separator=' ', strip=True)
        text = re.sub(r'\s+', ' ', text)
        
        # Extract links
        links = []
        for a in soup.find_all('a', href=True):
            href = a['href']
            absolute_url = urljoin(base_url, href)
            # Filter out non-HTTP links
            if urlparse(absolute_url).scheme in ('http', 'https'):
                links.append(self._normalize_url(absolute_url))
        
        return {
            'title': title,
            'text': text,
            'links': list(set(links)),  # Dedupe
            'word_count': len(text.split())
        }
    
    def _normalize_url(self, url: str) -> str:
        """Canonicalize URL"""
        from urllib.parse import urlparse, urlunparse
        parsed = urlparse(url)
        # Remove fragments, lowercase
        return urlunparse((
            parsed.scheme.lower(),
            parsed.netloc.lower(),
            parsed.path.rstrip('/') or '/',
            '', '', ''
        ))

class SimHash:
    """SimHash for near-duplicate detection"""
    
    def __init__(self, hash_bits: int = 64):
        self.hash_bits = hash_bits
    
    def _tokenize(self, text: str) -> list[str]:
        """Generate shingles (n-grams) from text"""
        words = text.lower().split()
        # 3-word shingles
        return [' '.join(words[i:i+3]) for i in range(len(words) - 2)]
    
    def _hash_token(self, token: str) -> int:
        """Hash a single token to 64-bit integer"""
        return int(hashlib.md5(token.encode()).hexdigest()[:16], 16)
    
    def compute(self, text: str) -> int:
        """Compute SimHash fingerprint"""
        tokens = self._tokenize(text)
        if not tokens:
            return 0
        
        # Initialize bit counts
        v = [0] * self.hash_bits
        
        for token in tokens:
            token_hash = self._hash_token(token)
            for i in range(self.hash_bits):
                if token_hash & (1 << i):
                    v[i] += 1
                else:
                    v[i] -= 1
        
        # Convert to fingerprint
        fingerprint = 0
        for i in range(self.hash_bits):
            if v[i] > 0:
                fingerprint |= (1 << i)
        
        return fingerprint
    
    def hamming_distance(self, hash1: int, hash2: int) -> int:
        """Count differing bits between two hashes"""
        xor = hash1 ^ hash2
        return bin(xor).count('1')
    
    def is_near_duplicate(self, hash1: int, hash2: int, threshold: int = 3) -> bool:
        """Check if two documents are near-duplicates"""
        return self.hamming_distance(hash1, hash2) <= threshold

Inverted Index Construction

The inverted index is the heart of search. It maps terms to documents.

from collections import defaultdict
from dataclasses import dataclass
import math
import struct

@dataclass
class Posting:
    doc_id: int
    term_frequency: int
    positions: list[int]  # For phrase queries

class InvertedIndexBuilder:
    def __init__(self):
        self.index: dict[str, list[Posting]] = defaultdict(list)
        self.doc_lengths: dict[int, int] = {}
        self.doc_count = 0
        self.total_docs = 0
    
    def _tokenize(self, text: str) -> list[str]:
        """Basic tokenization with stemming"""
        import re
        # Lowercase and split on non-alphanumeric
        tokens = re.findall(r'\b[a-z0-9]+\b', text.lower())
        # Simple suffix stripping (use Porter Stemmer in production)
        return [self._stem(t) for t in tokens if len(t) > 2]
    
    def _stem(self, word: str) -> str:
        """Naive stemming - use NLTK PorterStemmer in production"""
        suffixes = ['ing', 'ed', 'ly', 's', 'es']
        for suffix in suffixes:
            if word.endswith(suffix) and len(word) > len(suffix) + 2:
                return word[:-len(suffix)]
        return word
    
    def add_document(self, doc_id: int, text: str):
        """Index a single document"""
        tokens = self._tokenize(text)
        self.doc_lengths[doc_id] = len(tokens)
        self.total_docs += 1
        
        # Count term frequencies and positions
        term_positions: dict[str, list[int]] = defaultdict(list)
        for pos, token in enumerate(tokens):
            term_positions[token].append(pos)
        
        # Add to index
        for term, positions in term_positions.items():
            posting = Posting(
                doc_id=doc_id,
                term_frequency=len(positions),
                positions=positions
            )
            self.index[term].append(posting)
    
    def compute_tfidf(self, term: str, doc_id: int) -> float:
        """Compute TF-IDF score for a term in a document"""
        postings = self.index.get(term, [])
        
        # Find the posting for this document
        posting = next((p for p in postings if p.doc_id == doc_id), None)
        if not posting:
            return 0.0
        
        # TF: log-scaled term frequency
        tf = 1 + math.log10(posting.term_frequency) if posting.term_frequency > 0 else 0
        
        # IDF: inverse document frequency
        df = len(postings)
        idf = math.log10(self.total_docs / df) if df > 0 else 0
        
        return tf * idf
    
    def merge_indices(self, other: 'InvertedIndexBuilder'):
        """Merge another index into this one (for distributed building)"""
        for term, postings in other.index.items():
            self.index[term].extend(postings)
            # Sort by doc_id for efficient querying
            self.index[term].sort(key=lambda p: p.doc_id)
        
        self.doc_lengths.update(other.doc_lengths)
        self.total_docs += other.total_docs
    
    def serialize_postings(self, term: str) -> bytes:
        """Serialize posting list for storage (variable-byte encoding)"""
        postings = self.index.get(term, [])
        data = []
        prev_doc_id = 0
        
        for posting in postings:
            # Delta encoding for doc_ids
            delta = posting.doc_id - prev_doc_id
            data.append(self._vbyte_encode(delta))
            data.append(self._vbyte_encode(posting.term_frequency))
            prev_doc_id = posting.doc_id
        
        return b''.join(data)
    
    def _vbyte_encode(self, n: int) -> bytes:
        """Variable-byte encoding for integers"""
        result = []
        while n >= 128:
            result.append(n & 0x7f)
            n >>= 7
        result.append(n | 0x80)  # Set high bit on last byte
        return bytes(result)

Storage and Scaling Considerations

At scale, storage strategy determines both cost and performance.

from abc import ABC, abstractmethod
from typing import Optional
import hashlib

class StorageBackend(ABC):
    @abstractmethod
    def put(self, key: str, value: bytes) -> None:
        pass
    
    @abstractmethod
    def get(self, key: str) -> Optional[bytes]:
        pass

class ShardedStorage:
    """Distribute data across multiple storage backends"""
    
    def __init__(self, backends: list[StorageBackend], replication_factor: int = 2):
        self.backends = backends
        self.num_shards = len(backends)
        self.replication_factor = replication_factor
    
    def _get_shard_ids(self, key: str) -> list[int]:
        """Get shard IDs for a key (primary + replicas)"""
        hash_val = int(hashlib.sha256(key.encode()).hexdigest(), 16)
        primary = hash_val % self.num_shards
        
        # Simple replication: consecutive shards
        shards = []
        for i in range(self.replication_factor):
            shards.append((primary + i) % self.num_shards)
        return shards
    
    def put(self, key: str, value: bytes) -> None:
        """Write to primary and replica shards"""
        for shard_id in self._get_shard_ids(key):
            self.backends[shard_id].put(key, value)
    
    def get(self, key: str) -> Optional[bytes]:
        """Read from any available shard"""
        for shard_id in self._get_shard_ids(key):
            try:
                result = self.backends[shard_id].get(key)
                if result is not None:
                    return result
            except Exception:
                continue  # Try next replica
        return None

def partition_index_by_term(term: str, num_partitions: int) -> int:
    """Term-based partitioning: same term always on same shard"""
    return int(hashlib.md5(term.encode()).hexdigest(), 16) % num_partitions

def partition_index_by_doc(doc_id: int, num_partitions: int) -> int:
    """Document-based partitioning: full index per shard, different docs"""
    return doc_id % num_partitions

Storage tier recommendations:

  1. Raw HTML: Object storage (S3, GCS). Cheap, durable, rarely accessed after indexing.
  2. Document metadata: Distributed KV store (Cassandra, DynamoDB). Frequent reads, moderate writes.
  3. Inverted index: Custom storage or RocksDB. Optimized for sequential reads of posting lists.
  4. URL frontier state: Redis Cluster. Fast reads/writes, can tolerate some data loss.

Partitioning tradeoffs:

  • Term-based: Each shard has complete posting lists for its terms. Queries hit all shards but each returns complete results. Better for single-term queries.
  • Document-based: Each shard has a complete index for its documents. Queries hit all shards, results must be merged. Better for complex queries and index updates.

For most search engines, document-based partitioning wins because it simplifies incremental updates—you only modify one shard when a document changes.

The system I’ve outlined handles the core challenges: politeness at scale, efficient duplicate detection, and a queryable index structure. Production systems add layers of complexity—better ranking signals, real-time indexing, geographic distribution—but these foundations remain constant.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.