Design a Content Moderation System
Content moderation isn't optional. If you're building any platform where users can post content, you're building a content moderation system—whether you realize it or not. The question is whether you...
Key Insights
- Content moderation requires a multi-stage pipeline combining automated detection with human review—neither alone handles the scale, nuance, and speed requirements of modern platforms.
- The critical architectural decision is sync vs async processing: block-before-publish protects users but adds latency; review-after-publish scales better but risks exposure to harmful content.
- Feedback loops between human reviewers and ML models are essential—without them, your system becomes increasingly misaligned with evolving content patterns and policy changes.
Introduction & Problem Space
Content moderation isn’t optional. If you’re building any platform where users can post content, you’re building a content moderation system—whether you realize it or not. The question is whether you design it intentionally or let it emerge chaotically.
The stakes are high. User safety, legal compliance, and brand protection all depend on catching harmful content. Miss a piece of illegal content and you face regulatory consequences. Over-moderate and you alienate your user base. Do it too slowly and harm spreads before you can act.
The challenge compounds across content types. Text moderation is relatively mature, but images require computer vision, video requires frame-by-frame analysis at scale, and audio needs transcription plus analysis. Each medium has unique attack vectors—adversarial text uses unicode tricks, images embed text to bypass filters, and videos hide harmful content in specific timestamps.
You’re constantly balancing three constraints: speed (users expect instant publishing), accuracy (false positives frustrate users, false negatives cause harm), and cost (human review doesn’t scale, ML inference isn’t free). This article walks through an architecture that manages these tradeoffs pragmatically.
High-Level Architecture Overview
The core pattern is a multi-stage pipeline with increasing human involvement at each stage. Think of it as a funnel: automated systems handle the obvious cases (both clearly safe and clearly harmful), while human reviewers focus on the ambiguous middle.
┌─────────────────────────────────────────────────────────────────┐
│ Content Submission │
└─────────────────────┬───────────────────────────────────────────┘
▼
┌─────────────────────────────────────────────────────────────────┐
│ Pre-screening (sync, <100ms) │
│ Rate limits, blocklists, format validation │
└─────────────────────┬───────────────────────────────────────────┘
▼
┌─────────────────────────────────────────────────────────────────┐
│ Automated Analysis (sync/async) │
│ ML models: toxicity, NSFW, spam, violence │
└─────────────────────┬───────────────────────────────────────────┘
▼
┌─────────────────────────────────────────────────────────────────┐
│ Routing Decision │
│ High confidence → auto-action | Low confidence → queue │
└──────────┬─────────────────────────────────────┬────────────────┘
▼ ▼
┌──────────────────────┐ ┌──────────────────────────┐
│ Auto-publish or │ │ Human Review Queue │
│ Auto-reject │ │ (priority sorted) │
└──────────────────────┘ └───────────┬──────────────┘
▼
┌──────────────────────────┐
│ Appeals Queue │
└──────────────────────────┘
The sync vs async decision is architectural. Synchronous processing blocks the user until analysis completes—safer but slower. Asynchronous processing publishes immediately and reviews afterward—faster but riskier. Most systems use a hybrid: sync for high-risk content types, async for lower-risk.
Here’s a basic queue setup for routing content to appropriate processing pipelines:
from enum import Enum
from dataclasses import dataclass
from typing import Optional
import json
class ContentType(Enum):
TEXT = "text"
IMAGE = "image"
VIDEO = "video"
class ProcessingMode(Enum):
SYNC_BLOCK = "sync_block" # Block until reviewed
ASYNC_VISIBLE = "async_visible" # Publish, review async
ASYNC_HIDDEN = "async_hidden" # Hide until reviewed
@dataclass
class ModerationJob:
content_id: str
content_type: ContentType
user_id: str
content_hash: str
processing_mode: ProcessingMode
priority: int
created_at: float
class ContentRouter:
def __init__(self, queue_client):
self.queue = queue_client
self.routing_rules = {
ContentType.TEXT: ProcessingMode.ASYNC_VISIBLE,
ContentType.IMAGE: ProcessingMode.ASYNC_HIDDEN,
ContentType.VIDEO: ProcessingMode.ASYNC_HIDDEN,
}
def route(self, content_id: str, content_type: ContentType,
user_id: str, user_trust_score: float) -> ProcessingMode:
# High-trust users get faster publishing
if user_trust_score > 0.95:
mode = ProcessingMode.ASYNC_VISIBLE
else:
mode = self.routing_rules[content_type]
# Calculate priority based on user reach and content type
priority = self._calculate_priority(user_id, content_type)
job = ModerationJob(
content_id=content_id,
content_type=content_type,
user_id=user_id,
content_hash=self._hash_content(content_id),
processing_mode=mode,
priority=priority,
created_at=time.time()
)
queue_name = f"moderation.{content_type.value}"
self.queue.publish(queue_name, json.dumps(job.__dict__))
return mode
Automated Detection Layer
Your automated layer needs to be both fast and extensible. You’ll add new detectors as threats evolve, so design for plugins from the start.
The key insight is confidence aggregation. Individual models produce scores, but the routing decision depends on combining them intelligently. A piece of content might score 0.3 on toxicity but 0.9 on spam—the aggregation logic determines the outcome.
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import List, Dict
@dataclass
class DetectionResult:
detector_name: str
category: str
confidence: float
metadata: Dict
class ContentDetector(ABC):
@abstractmethod
def analyze(self, content: str) -> List[DetectionResult]:
pass
class ToxicityDetector(ContentDetector):
def __init__(self, model_client):
self.model = model_client
def analyze(self, content: str) -> List[DetectionResult]:
# Call your ML model (Perspective API, custom model, etc.)
scores = self.model.predict(content)
results = []
for category, score in scores.items():
if score > 0.1: # Only report non-trivial scores
results.append(DetectionResult(
detector_name="toxicity_v2",
category=category,
confidence=score,
metadata={"model_version": "2.3.1"}
))
return results
class BlocklistDetector(ContentDetector):
def __init__(self, blocklist_service):
self.blocklist = blocklist_service
def analyze(self, content: str) -> List[DetectionResult]:
matches = self.blocklist.find_matches(content)
return [
DetectionResult(
detector_name="blocklist",
category=match.category,
confidence=1.0, # Exact matches are certain
metadata={"matched_term": match.term, "list_id": match.list_id}
)
for match in matches
]
class ContentAnalyzer:
def __init__(self, detectors: List[ContentDetector]):
self.detectors = detectors
self.thresholds = {
"auto_reject": 0.95,
"human_review": 0.5,
"auto_approve": 0.1
}
def analyze(self, content: str) -> Dict:
all_results = []
for detector in self.detectors:
all_results.extend(detector.analyze(content))
# Aggregate by category, taking max confidence
by_category = {}
for result in all_results:
if result.category not in by_category:
by_category[result.category] = result
elif result.confidence > by_category[result.category].confidence:
by_category[result.category] = result
# Determine action based on highest confidence score
max_confidence = max((r.confidence for r in all_results), default=0)
if max_confidence >= self.thresholds["auto_reject"]:
action = "reject"
elif max_confidence >= self.thresholds["human_review"]:
action = "review"
else:
action = "approve"
return {
"action": action,
"max_confidence": max_confidence,
"results": all_results,
"by_category": by_category
}
Human Review Workflow
Human review is where the nuance happens. Your reviewers handle edge cases that automated systems can’t—context-dependent content, cultural nuances, and novel attack patterns.
The workflow needs a state machine to track each piece of content through the review process:
from enum import Enum
from typing import Optional
import time
class ReviewState(Enum):
PENDING = "pending"
ASSIGNED = "assigned"
IN_REVIEW = "in_review"
DECIDED = "decided"
APPEALED = "appealed"
APPEAL_REVIEWED = "appeal_reviewed"
class ReviewTask:
VALID_TRANSITIONS = {
ReviewState.PENDING: [ReviewState.ASSIGNED],
ReviewState.ASSIGNED: [ReviewState.IN_REVIEW, ReviewState.PENDING],
ReviewState.IN_REVIEW: [ReviewState.DECIDED, ReviewState.ASSIGNED],
ReviewState.DECIDED: [ReviewState.APPEALED],
ReviewState.APPEALED: [ReviewState.APPEAL_REVIEWED],
}
def __init__(self, content_id: str, priority: int):
self.content_id = content_id
self.priority = priority
self.state = ReviewState.PENDING
self.assigned_to: Optional[str] = None
self.decision: Optional[str] = None
self.audit_log = []
def transition(self, new_state: ReviewState, actor: str, reason: str = ""):
if new_state not in self.VALID_TRANSITIONS.get(self.state, []):
raise ValueError(f"Invalid transition: {self.state} -> {new_state}")
self.audit_log.append({
"from_state": self.state.value,
"to_state": new_state.value,
"actor": actor,
"reason": reason,
"timestamp": time.time()
})
self.state = new_state
class ReviewerAssignment:
def __init__(self, db):
self.db = db
self.max_concurrent_tasks = 5
def assign_next_task(self, reviewer_id: str) -> Optional[ReviewTask]:
# Check reviewer capacity
current_load = self.db.count_assigned_tasks(reviewer_id)
if current_load >= self.max_concurrent_tasks:
return None
# Get reviewer specializations (some handle NSFW, some handle violence, etc.)
specializations = self.db.get_reviewer_specializations(reviewer_id)
# Find highest priority unassigned task matching specialization
task = self.db.get_highest_priority_pending_task(
categories=specializations,
exclude_user_content=self._get_conflict_users(reviewer_id)
)
if task:
task.transition(ReviewState.ASSIGNED, reviewer_id)
task.assigned_to = reviewer_id
self.db.save_task(task)
return task
Real-Time vs Batch Processing
Live content—chat messages, live streams, real-time comments—requires stream processing. You can’t wait for batch jobs when harmful content is being broadcast to thousands of viewers.
from kafka import KafkaConsumer, KafkaProducer
import json
class RealTimeModerationConsumer:
def __init__(self, analyzer: ContentAnalyzer, action_producer: KafkaProducer):
self.analyzer = analyzer
self.action_producer = action_producer
self.consumer = KafkaConsumer(
'content.text.submitted',
bootstrap_servers=['localhost:9092'],
group_id='moderation-realtime',
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
max_poll_interval_ms=30000, # Must process within 30s
)
def run(self):
for message in self.consumer:
content_event = message.value
try:
result = self.analyzer.analyze(content_event['text'])
action_event = {
'content_id': content_event['content_id'],
'action': result['action'],
'confidence': result['max_confidence'],
'latency_ms': time.time() * 1000 - content_event['timestamp_ms']
}
# Emit action for downstream systems
self.action_producer.send(
'moderation.actions',
key=content_event['content_id'].encode(),
value=json.dumps(action_event).encode()
)
# If needs review, also queue for humans
if result['action'] == 'review':
self.action_producer.send(
'moderation.human_review',
value=json.dumps({
'content_id': content_event['content_id'],
'priority': self._calculate_priority(content_event, result),
'auto_analysis': result
}).encode()
)
except Exception as e:
# On failure, default to human review
self._queue_for_manual_review(content_event, str(e))
Batch processing handles different use cases: backfilling content when you deploy new models, re-evaluating content after policy changes, and periodic sweeps for content that slipped through.
Feedback Loops & Model Improvement
Without feedback loops, your models degrade. Content patterns change, attackers adapt, and policies evolve. Human reviewer decisions are your ground truth for improving automated systems.
@dataclass
class FeedbackEvent:
content_id: str
content_hash: str
content_type: str
auto_prediction: Dict # What the model predicted
human_decision: str # What the reviewer decided
reviewer_id: str
decision_confidence: str # "certain", "probable", "uncertain"
policy_version: str
timestamp: float
class FeedbackPipeline:
def __init__(self, event_store, training_data_store):
self.events = event_store
self.training_store = training_data_store
def record_decision(self, task: ReviewTask, decision: str,
confidence: str, reviewer_id: str):
auto_analysis = self.events.get_auto_analysis(task.content_id)
feedback = FeedbackEvent(
content_id=task.content_id,
content_hash=self._get_content_hash(task.content_id),
content_type=task.content_type,
auto_prediction=auto_analysis,
human_decision=decision,
reviewer_id=reviewer_id,
decision_confidence=confidence,
policy_version=self._current_policy_version(),
timestamp=time.time()
)
self.events.store(feedback)
# High-confidence disagreements are valuable training examples
if confidence == "certain" and self._is_disagreement(auto_analysis, decision):
self.training_store.add_example(
content_id=task.content_id,
label=decision,
weight=2.0 # Weight disagreements higher
)
Operational Considerations
Monitor these metrics religiously:
- Precision/Recall by category: Are you catching what you should? Are you over-moderating?
- Queue depth and age: How backed up is human review? Are tasks aging out?
- Reviewer throughput: Tasks per hour per reviewer, by category
- Auto-action rate: What percentage bypasses human review? Too high means risk, too low means wasted human effort.
- Appeal overturn rate: High overturn rates indicate systematic errors
False positives require graceful handling. When you wrongly remove content, communicate clearly: explain what triggered the action, provide an easy appeal path, and apologize when you’re wrong. Users tolerate mistakes; they don’t tolerate opaque, unaccountable systems.
Viral content events will overwhelm your system. Build in circuit breakers that automatically increase auto-approval thresholds or delay non-critical review during spikes. Have runbooks for manually scaling reviewer capacity.
Finally, geography matters. GDPR requires specific data handling. Some countries mandate content removal within hours. Others prohibit removing certain political content. Your architecture needs to route content to appropriate review queues based on jurisdiction and apply region-specific policies.
Content moderation is never “done.” It’s an ongoing operational challenge that evolves with your platform, your users, and the broader regulatory environment. Build for change from day one.