Design a Task Scheduler: Distributed Job Queue
Every production system eventually needs to run tasks outside the request-response cycle. You need to send a welcome email after signup, generate a monthly report at midnight, process uploaded files...
Key Insights
- A distributed task scheduler requires three core components working in harmony: a durable queue for job storage, a coordinator for scheduling decisions, and stateless workers for execution—each must be independently scalable.
- Exactly-once execution is a myth in distributed systems; design for at-least-once delivery with idempotent job handlers, and you’ll build a system that’s both reliable and simple to reason about.
- The choice between push and pull models for job distribution fundamentally shapes your system’s behavior under load—pull models provide natural backpressure while push models offer lower latency.
Introduction & Problem Space
Every production system eventually needs to run tasks outside the request-response cycle. You need to send a welcome email after signup, generate a monthly report at midnight, process uploaded files asynchronously, or retry failed webhook deliveries. These requirements seem simple until you’re handling millions of jobs per day across dozens of worker machines.
Distributed task scheduling solves the problem of reliably executing work across a fleet of machines. The “distributed” part is where complexity lives. A single-machine cron job is trivial. A system that guarantees job execution even when machines fail, handles retries gracefully, and scales horizontally—that’s an engineering challenge.
The core challenges you’ll face:
Reliability: Jobs must execute even when workers crash mid-execution. You need visibility into what’s running, what failed, and what’s stuck.
Ordering: Some jobs must run in sequence. Others can run in any order but shouldn’t run concurrently for the same entity.
Exactly-once execution: The holy grail that doesn’t actually exist. You’ll settle for at-least-once with idempotency.
Core Architecture Components
A distributed task scheduler has four essential components:
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Producers │────▶│ Queue │────▶│ Workers │
│ (API/Apps) │ │ Broker │ │ (Consumers)│
└─────────────┘ └─────────────┘ └─────────────┘
│
▼
┌─────────────┐
│ Metadata │
│ Store │
└─────────────┘
│
▼
┌─────────────┐
│ Coordinator │
│ (Scheduler)│
└─────────────┘
Producers enqueue jobs from your application code. Workers pull jobs and execute them. The queue broker provides durable storage and delivery guarantees. The metadata store tracks job state, history, and scheduling information. The coordinator handles delayed jobs, recurring schedules, and dead job detection.
Here’s how to model the core entities:
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from typing import Optional
import uuid
class JobStatus(Enum):
PENDING = "pending"
SCHEDULED = "scheduled"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
DEAD = "dead" # Exhausted all retries
@dataclass
class Job:
id: str
queue_name: str
handler: str # e.g., "email.send_welcome"
payload: dict
status: JobStatus
priority: int = 0
max_retries: int = 3
retry_count: int = 0
scheduled_at: Optional[datetime] = None
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
locked_by: Optional[str] = None # Worker ID
locked_until: Optional[datetime] = None
idempotency_key: Optional[str] = None
@classmethod
def create(cls, handler: str, payload: dict, **kwargs):
return cls(
id=str(uuid.uuid4()),
queue_name=kwargs.get("queue", "default"),
handler=handler,
payload=payload,
status=JobStatus.PENDING,
**kwargs
)
@dataclass
class TaskQueue:
name: str
max_concurrency: int = 10
rate_limit: Optional[int] = None # Jobs per second
retry_delay_seconds: int = 60
job_timeout_seconds: int = 300
Queue Design & Job Distribution
Partitioning your queues correctly determines whether your system scales gracefully or becomes a bottleneck.
Partition by job type when different jobs have vastly different execution characteristics. Image processing jobs shouldn’t block email sends.
Partition by tenant in multi-tenant systems to provide isolation and prevent noisy neighbors.
Partition by priority to ensure critical jobs get processed first.
Here’s a priority queue implementation using Redis sorted sets:
import redis
import json
import time
class RedisPriorityQueue:
def __init__(self, redis_client: redis.Redis, queue_name: str):
self.redis = redis_client
self.queue_key = f"queue:{queue_name}"
self.processing_key = f"processing:{queue_name}"
def enqueue(self, job: Job):
# Score = priority (inverted) + timestamp for FIFO within priority
# Lower score = higher priority, processed first
score = (10 - job.priority) * 1e12 + time.time()
self.redis.zadd(
self.queue_key,
{json.dumps({"id": job.id, "handler": job.handler, "payload": job.payload}): score}
)
def dequeue(self, worker_id: str, timeout_seconds: int = 300) -> Optional[dict]:
# Atomic move from queue to processing set
# BZPOPMIN blocks until item available
result = self.redis.bzpopmin(self.queue_key, timeout=1)
if not result:
return None
_, job_data, _ = result
job = json.loads(job_data)
# Track in processing set with expiration score
expire_at = time.time() + timeout_seconds
self.redis.zadd(
self.processing_key,
{json.dumps({**job, "worker": worker_id}): expire_at}
)
return job
def complete(self, job_id: str):
# Remove from processing set
# In production, you'd need to find by job_id, not full payload
pass
def requeue_stale_jobs(self):
"""Called periodically by coordinator to handle dead workers"""
now = time.time()
stale = self.redis.zrangebyscore(self.processing_key, 0, now)
for job_data in stale:
self.redis.zrem(self.processing_key, job_data)
job = json.loads(job_data)
# Re-enqueue with original priority
self.redis.zadd(self.queue_key, {job_data: now})
Reliability & Failure Handling
Workers will crash. Networks will partition. Your system must handle this gracefully.
The pattern is straightforward: workers acquire a lock on jobs with a timeout, send periodic heartbeats, and the coordinator requeues jobs whose locks expired without completion.
import threading
import time
from contextlib import contextmanager
class Worker:
def __init__(self, worker_id: str, queue: RedisPriorityQueue, redis_client: redis.Redis):
self.worker_id = worker_id
self.queue = queue
self.redis = redis_client
self.current_job = None
self.heartbeat_interval = 30
self.running = True
def start_heartbeat(self):
def heartbeat_loop():
while self.running:
if self.current_job:
# Extend lock timeout
self.redis.expire(
f"job_lock:{self.current_job['id']}",
self.heartbeat_interval * 2
)
time.sleep(self.heartbeat_interval)
thread = threading.Thread(target=heartbeat_loop, daemon=True)
thread.start()
def execute_with_retry(self, job: dict, handler_fn, max_retries: int = 3):
retry_count = 0
while retry_count <= max_retries:
try:
self.current_job = job
handler_fn(job["payload"])
self.queue.complete(job["id"])
return
except Exception as e:
retry_count += 1
if retry_count > max_retries:
self.move_to_dead_letter(job, str(e))
raise
# Exponential backoff: 1s, 2s, 4s, 8s...
delay = min(2 ** retry_count, 300) # Cap at 5 minutes
time.sleep(delay)
finally:
self.current_job = None
def move_to_dead_letter(self, job: dict, error: str):
self.redis.lpush(
f"dead_letter:{self.queue.queue_key}",
json.dumps({**job, "error": error, "failed_at": time.time()})
)
Scheduling Mechanisms
Delayed and recurring jobs require a different approach than immediate execution. You can’t just dump them in the main queue—you need a scheduling layer.
from croniter import croniter
from datetime import datetime, timedelta
class SchedulerService:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
self.scheduled_jobs_key = "scheduled_jobs"
self.recurring_jobs_key = "recurring_jobs"
def schedule_job(self, job: Job, run_at: datetime):
"""Schedule a one-time delayed job"""
score = run_at.timestamp()
self.redis.zadd(
self.scheduled_jobs_key,
{json.dumps(job.__dict__, default=str): score}
)
def schedule_recurring(self, job_template: dict, cron_expression: str):
"""Register a recurring job"""
self.redis.hset(
self.recurring_jobs_key,
job_template["handler"],
json.dumps({
"template": job_template,
"cron": cron_expression,
"last_scheduled": None
})
)
def poll_and_enqueue(self, queue: RedisPriorityQueue):
"""Called every second by the coordinator"""
now = time.time()
# Handle delayed jobs
due_jobs = self.redis.zrangebyscore(
self.scheduled_jobs_key, 0, now
)
for job_data in due_jobs:
self.redis.zrem(self.scheduled_jobs_key, job_data)
job_dict = json.loads(job_data)
queue.enqueue(Job(**job_dict))
# Handle recurring jobs
for handler, config_data in self.redis.hgetall(self.recurring_jobs_key).items():
config = json.loads(config_data)
cron = croniter(config["cron"], datetime.now())
next_run = cron.get_next(datetime)
# If next run is within our polling window, schedule it
if next_run.timestamp() <= now + 1:
job = Job.create(**config["template"])
queue.enqueue(job)
Scaling & Performance
Rate limiting prevents your workers from overwhelming downstream services:
class TokenBucketRateLimiter:
def __init__(self, redis_client: redis.Redis, key: str, rate: int, capacity: int):
self.redis = redis_client
self.key = f"rate_limit:{key}"
self.rate = rate # Tokens per second
self.capacity = capacity
def acquire(self, tokens: int = 1) -> bool:
now = time.time()
# Lua script for atomic token bucket
lua_script = """
local key = KEYS[1]
local rate = tonumber(ARGV[1])
local capacity = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local requested = tonumber(ARGV[4])
local bucket = redis.call('HMGET', key, 'tokens', 'last_update')
local tokens = tonumber(bucket[1]) or capacity
local last_update = tonumber(bucket[2]) or now
local elapsed = now - last_update
tokens = math.min(capacity, tokens + elapsed * rate)
if tokens >= requested then
tokens = tokens - requested
redis.call('HMSET', key, 'tokens', tokens, 'last_update', now)
redis.call('EXPIRE', key, 3600)
return 1
end
return 0
"""
result = self.redis.eval(
lua_script, 1, self.key,
self.rate, self.capacity, now, tokens
)
return bool(result)
Monitor these metrics religiously: queue depth (jobs waiting), processing time p50/p95/p99, failure rate by job type, and worker utilization.
Technology Choices & Tradeoffs
Redis + Bull/BullMQ: Excellent for Node.js. Fast, feature-rich, but Redis persistence requires careful configuration.
RabbitMQ: Battle-tested, supports complex routing. Overkill for simple use cases.
Amazon SQS: Zero operational overhead. Limited to 14-day retention, no priority queues.
Kafka: Overkill for task queues. Use it when you need event streaming, not job processing.
PostgreSQL-based (custom): Surprisingly viable. Use SELECT FOR UPDATE SKIP LOCKED for atomic job claiming. Scales to thousands of jobs per second with proper indexing.
Build your own when you need deep integration with your domain model or have unusual requirements. Buy when reliability matters more than customization—battle-tested systems have handled edge cases you haven’t imagined yet.
For most teams, start with a managed solution. You can always migrate to something custom when you understand your actual requirements—not the ones you imagined.