Design a Content Moderation System

Content moderation isn't optional. If you're building any platform where users can post content, you're building a content moderation system—whether you realize it or not. The question is whether you...

Key Insights

  • Content moderation requires a multi-stage pipeline combining automated detection with human review—neither alone handles the scale, nuance, and speed requirements of modern platforms.
  • The critical architectural decision is sync vs async processing: block-before-publish protects users but adds latency; review-after-publish scales better but risks exposure to harmful content.
  • Feedback loops between human reviewers and ML models are essential—without them, your system becomes increasingly misaligned with evolving content patterns and policy changes.

Introduction & Problem Space

Content moderation isn’t optional. If you’re building any platform where users can post content, you’re building a content moderation system—whether you realize it or not. The question is whether you design it intentionally or let it emerge chaotically.

The stakes are high. User safety, legal compliance, and brand protection all depend on catching harmful content. Miss a piece of illegal content and you face regulatory consequences. Over-moderate and you alienate your user base. Do it too slowly and harm spreads before you can act.

The challenge compounds across content types. Text moderation is relatively mature, but images require computer vision, video requires frame-by-frame analysis at scale, and audio needs transcription plus analysis. Each medium has unique attack vectors—adversarial text uses unicode tricks, images embed text to bypass filters, and videos hide harmful content in specific timestamps.

You’re constantly balancing three constraints: speed (users expect instant publishing), accuracy (false positives frustrate users, false negatives cause harm), and cost (human review doesn’t scale, ML inference isn’t free). This article walks through an architecture that manages these tradeoffs pragmatically.

High-Level Architecture Overview

The core pattern is a multi-stage pipeline with increasing human involvement at each stage. Think of it as a funnel: automated systems handle the obvious cases (both clearly safe and clearly harmful), while human reviewers focus on the ambiguous middle.

┌─────────────────────────────────────────────────────────────────┐
│                     Content Submission                          │
└─────────────────────┬───────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│              Pre-screening (sync, <100ms)                       │
│         Rate limits, blocklists, format validation              │
└─────────────────────┬───────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│            Automated Analysis (sync/async)                      │
│      ML models: toxicity, NSFW, spam, violence                  │
└─────────────────────┬───────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│              Routing Decision                                   │
│    High confidence → auto-action | Low confidence → queue       │
└──────────┬─────────────────────────────────────┬────────────────┘
           ▼                                     ▼
┌──────────────────────┐              ┌──────────────────────────┐
│   Auto-publish or    │              │    Human Review Queue    │
│   Auto-reject        │              │    (priority sorted)     │
└──────────────────────┘              └───────────┬──────────────┘
                                      ┌──────────────────────────┐
                                      │   Appeals Queue          │
                                      └──────────────────────────┘

The sync vs async decision is architectural. Synchronous processing blocks the user until analysis completes—safer but slower. Asynchronous processing publishes immediately and reviews afterward—faster but riskier. Most systems use a hybrid: sync for high-risk content types, async for lower-risk.

Here’s a basic queue setup for routing content to appropriate processing pipelines:

from enum import Enum
from dataclasses import dataclass
from typing import Optional
import json

class ContentType(Enum):
    TEXT = "text"
    IMAGE = "image"
    VIDEO = "video"
    
class ProcessingMode(Enum):
    SYNC_BLOCK = "sync_block"      # Block until reviewed
    ASYNC_VISIBLE = "async_visible" # Publish, review async
    ASYNC_HIDDEN = "async_hidden"   # Hide until reviewed

@dataclass
class ModerationJob:
    content_id: str
    content_type: ContentType
    user_id: str
    content_hash: str
    processing_mode: ProcessingMode
    priority: int
    created_at: float
    
class ContentRouter:
    def __init__(self, queue_client):
        self.queue = queue_client
        self.routing_rules = {
            ContentType.TEXT: ProcessingMode.ASYNC_VISIBLE,
            ContentType.IMAGE: ProcessingMode.ASYNC_HIDDEN,
            ContentType.VIDEO: ProcessingMode.ASYNC_HIDDEN,
        }
    
    def route(self, content_id: str, content_type: ContentType, 
              user_id: str, user_trust_score: float) -> ProcessingMode:
        # High-trust users get faster publishing
        if user_trust_score > 0.95:
            mode = ProcessingMode.ASYNC_VISIBLE
        else:
            mode = self.routing_rules[content_type]
        
        # Calculate priority based on user reach and content type
        priority = self._calculate_priority(user_id, content_type)
        
        job = ModerationJob(
            content_id=content_id,
            content_type=content_type,
            user_id=user_id,
            content_hash=self._hash_content(content_id),
            processing_mode=mode,
            priority=priority,
            created_at=time.time()
        )
        
        queue_name = f"moderation.{content_type.value}"
        self.queue.publish(queue_name, json.dumps(job.__dict__))
        
        return mode

Automated Detection Layer

Your automated layer needs to be both fast and extensible. You’ll add new detectors as threats evolve, so design for plugins from the start.

The key insight is confidence aggregation. Individual models produce scores, but the routing decision depends on combining them intelligently. A piece of content might score 0.3 on toxicity but 0.9 on spam—the aggregation logic determines the outcome.

from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import List, Dict

@dataclass
class DetectionResult:
    detector_name: str
    category: str
    confidence: float
    metadata: Dict

class ContentDetector(ABC):
    @abstractmethod
    def analyze(self, content: str) -> List[DetectionResult]:
        pass

class ToxicityDetector(ContentDetector):
    def __init__(self, model_client):
        self.model = model_client
        
    def analyze(self, content: str) -> List[DetectionResult]:
        # Call your ML model (Perspective API, custom model, etc.)
        scores = self.model.predict(content)
        results = []
        for category, score in scores.items():
            if score > 0.1:  # Only report non-trivial scores
                results.append(DetectionResult(
                    detector_name="toxicity_v2",
                    category=category,
                    confidence=score,
                    metadata={"model_version": "2.3.1"}
                ))
        return results

class BlocklistDetector(ContentDetector):
    def __init__(self, blocklist_service):
        self.blocklist = blocklist_service
        
    def analyze(self, content: str) -> List[DetectionResult]:
        matches = self.blocklist.find_matches(content)
        return [
            DetectionResult(
                detector_name="blocklist",
                category=match.category,
                confidence=1.0,  # Exact matches are certain
                metadata={"matched_term": match.term, "list_id": match.list_id}
            )
            for match in matches
        ]

class ContentAnalyzer:
    def __init__(self, detectors: List[ContentDetector]):
        self.detectors = detectors
        self.thresholds = {
            "auto_reject": 0.95,
            "human_review": 0.5,
            "auto_approve": 0.1
        }
    
    def analyze(self, content: str) -> Dict:
        all_results = []
        for detector in self.detectors:
            all_results.extend(detector.analyze(content))
        
        # Aggregate by category, taking max confidence
        by_category = {}
        for result in all_results:
            if result.category not in by_category:
                by_category[result.category] = result
            elif result.confidence > by_category[result.category].confidence:
                by_category[result.category] = result
        
        # Determine action based on highest confidence score
        max_confidence = max((r.confidence for r in all_results), default=0)
        
        if max_confidence >= self.thresholds["auto_reject"]:
            action = "reject"
        elif max_confidence >= self.thresholds["human_review"]:
            action = "review"
        else:
            action = "approve"
            
        return {
            "action": action,
            "max_confidence": max_confidence,
            "results": all_results,
            "by_category": by_category
        }

Human Review Workflow

Human review is where the nuance happens. Your reviewers handle edge cases that automated systems can’t—context-dependent content, cultural nuances, and novel attack patterns.

The workflow needs a state machine to track each piece of content through the review process:

from enum import Enum
from typing import Optional
import time

class ReviewState(Enum):
    PENDING = "pending"
    ASSIGNED = "assigned"
    IN_REVIEW = "in_review"
    DECIDED = "decided"
    APPEALED = "appealed"
    APPEAL_REVIEWED = "appeal_reviewed"

class ReviewTask:
    VALID_TRANSITIONS = {
        ReviewState.PENDING: [ReviewState.ASSIGNED],
        ReviewState.ASSIGNED: [ReviewState.IN_REVIEW, ReviewState.PENDING],
        ReviewState.IN_REVIEW: [ReviewState.DECIDED, ReviewState.ASSIGNED],
        ReviewState.DECIDED: [ReviewState.APPEALED],
        ReviewState.APPEALED: [ReviewState.APPEAL_REVIEWED],
    }
    
    def __init__(self, content_id: str, priority: int):
        self.content_id = content_id
        self.priority = priority
        self.state = ReviewState.PENDING
        self.assigned_to: Optional[str] = None
        self.decision: Optional[str] = None
        self.audit_log = []
        
    def transition(self, new_state: ReviewState, actor: str, reason: str = ""):
        if new_state not in self.VALID_TRANSITIONS.get(self.state, []):
            raise ValueError(f"Invalid transition: {self.state} -> {new_state}")
        
        self.audit_log.append({
            "from_state": self.state.value,
            "to_state": new_state.value,
            "actor": actor,
            "reason": reason,
            "timestamp": time.time()
        })
        self.state = new_state

class ReviewerAssignment:
    def __init__(self, db):
        self.db = db
        self.max_concurrent_tasks = 5
        
    def assign_next_task(self, reviewer_id: str) -> Optional[ReviewTask]:
        # Check reviewer capacity
        current_load = self.db.count_assigned_tasks(reviewer_id)
        if current_load >= self.max_concurrent_tasks:
            return None
        
        # Get reviewer specializations (some handle NSFW, some handle violence, etc.)
        specializations = self.db.get_reviewer_specializations(reviewer_id)
        
        # Find highest priority unassigned task matching specialization
        task = self.db.get_highest_priority_pending_task(
            categories=specializations,
            exclude_user_content=self._get_conflict_users(reviewer_id)
        )
        
        if task:
            task.transition(ReviewState.ASSIGNED, reviewer_id)
            task.assigned_to = reviewer_id
            self.db.save_task(task)
            
        return task

Real-Time vs Batch Processing

Live content—chat messages, live streams, real-time comments—requires stream processing. You can’t wait for batch jobs when harmful content is being broadcast to thousands of viewers.

from kafka import KafkaConsumer, KafkaProducer
import json

class RealTimeModerationConsumer:
    def __init__(self, analyzer: ContentAnalyzer, action_producer: KafkaProducer):
        self.analyzer = analyzer
        self.action_producer = action_producer
        self.consumer = KafkaConsumer(
            'content.text.submitted',
            bootstrap_servers=['localhost:9092'],
            group_id='moderation-realtime',
            value_deserializer=lambda m: json.loads(m.decode('utf-8')),
            max_poll_interval_ms=30000,  # Must process within 30s
        )
    
    def run(self):
        for message in self.consumer:
            content_event = message.value
            
            try:
                result = self.analyzer.analyze(content_event['text'])
                
                action_event = {
                    'content_id': content_event['content_id'],
                    'action': result['action'],
                    'confidence': result['max_confidence'],
                    'latency_ms': time.time() * 1000 - content_event['timestamp_ms']
                }
                
                # Emit action for downstream systems
                self.action_producer.send(
                    'moderation.actions',
                    key=content_event['content_id'].encode(),
                    value=json.dumps(action_event).encode()
                )
                
                # If needs review, also queue for humans
                if result['action'] == 'review':
                    self.action_producer.send(
                        'moderation.human_review',
                        value=json.dumps({
                            'content_id': content_event['content_id'],
                            'priority': self._calculate_priority(content_event, result),
                            'auto_analysis': result
                        }).encode()
                    )
                    
            except Exception as e:
                # On failure, default to human review
                self._queue_for_manual_review(content_event, str(e))

Batch processing handles different use cases: backfilling content when you deploy new models, re-evaluating content after policy changes, and periodic sweeps for content that slipped through.

Feedback Loops & Model Improvement

Without feedback loops, your models degrade. Content patterns change, attackers adapt, and policies evolve. Human reviewer decisions are your ground truth for improving automated systems.

@dataclass
class FeedbackEvent:
    content_id: str
    content_hash: str
    content_type: str
    auto_prediction: Dict  # What the model predicted
    human_decision: str    # What the reviewer decided
    reviewer_id: str
    decision_confidence: str  # "certain", "probable", "uncertain"
    policy_version: str
    timestamp: float

class FeedbackPipeline:
    def __init__(self, event_store, training_data_store):
        self.events = event_store
        self.training_store = training_data_store
        
    def record_decision(self, task: ReviewTask, decision: str, 
                        confidence: str, reviewer_id: str):
        auto_analysis = self.events.get_auto_analysis(task.content_id)
        
        feedback = FeedbackEvent(
            content_id=task.content_id,
            content_hash=self._get_content_hash(task.content_id),
            content_type=task.content_type,
            auto_prediction=auto_analysis,
            human_decision=decision,
            reviewer_id=reviewer_id,
            decision_confidence=confidence,
            policy_version=self._current_policy_version(),
            timestamp=time.time()
        )
        
        self.events.store(feedback)
        
        # High-confidence disagreements are valuable training examples
        if confidence == "certain" and self._is_disagreement(auto_analysis, decision):
            self.training_store.add_example(
                content_id=task.content_id,
                label=decision,
                weight=2.0  # Weight disagreements higher
            )

Operational Considerations

Monitor these metrics religiously:

  • Precision/Recall by category: Are you catching what you should? Are you over-moderating?
  • Queue depth and age: How backed up is human review? Are tasks aging out?
  • Reviewer throughput: Tasks per hour per reviewer, by category
  • Auto-action rate: What percentage bypasses human review? Too high means risk, too low means wasted human effort.
  • Appeal overturn rate: High overturn rates indicate systematic errors

False positives require graceful handling. When you wrongly remove content, communicate clearly: explain what triggered the action, provide an easy appeal path, and apologize when you’re wrong. Users tolerate mistakes; they don’t tolerate opaque, unaccountable systems.

Viral content events will overwhelm your system. Build in circuit breakers that automatically increase auto-approval thresholds or delay non-critical review during spikes. Have runbooks for manually scaling reviewer capacity.

Finally, geography matters. GDPR requires specific data handling. Some countries mandate content removal within hours. Others prohibit removing certain political content. Your architecture needs to route content to appropriate review queues based on jurisdiction and apply region-specific policies.

Content moderation is never “done.” It’s an ongoing operational challenge that evolves with your platform, your users, and the broader regulatory environment. Build for change from day one.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.