Design a Task Scheduler: Distributed Job Queue

Every production system eventually needs to run tasks outside the request-response cycle. You need to send a welcome email after signup, generate a monthly report at midnight, process uploaded files...

Key Insights

  • A distributed task scheduler requires three core components working in harmony: a durable queue for job storage, a coordinator for scheduling decisions, and stateless workers for execution—each must be independently scalable.
  • Exactly-once execution is a myth in distributed systems; design for at-least-once delivery with idempotent job handlers, and you’ll build a system that’s both reliable and simple to reason about.
  • The choice between push and pull models for job distribution fundamentally shapes your system’s behavior under load—pull models provide natural backpressure while push models offer lower latency.

Introduction & Problem Space

Every production system eventually needs to run tasks outside the request-response cycle. You need to send a welcome email after signup, generate a monthly report at midnight, process uploaded files asynchronously, or retry failed webhook deliveries. These requirements seem simple until you’re handling millions of jobs per day across dozens of worker machines.

Distributed task scheduling solves the problem of reliably executing work across a fleet of machines. The “distributed” part is where complexity lives. A single-machine cron job is trivial. A system that guarantees job execution even when machines fail, handles retries gracefully, and scales horizontally—that’s an engineering challenge.

The core challenges you’ll face:

Reliability: Jobs must execute even when workers crash mid-execution. You need visibility into what’s running, what failed, and what’s stuck.

Ordering: Some jobs must run in sequence. Others can run in any order but shouldn’t run concurrently for the same entity.

Exactly-once execution: The holy grail that doesn’t actually exist. You’ll settle for at-least-once with idempotency.

Core Architecture Components

A distributed task scheduler has four essential components:

┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│  Producers  │────▶│    Queue    │────▶│   Workers   │
│  (API/Apps) │     │   Broker    │     │  (Consumers)│
└─────────────┘     └─────────────┘     └─────────────┘
                    ┌─────────────┐
                    │  Metadata   │
                    │    Store    │
                    └─────────────┘
                    ┌─────────────┐
                    │ Coordinator │
                    │  (Scheduler)│
                    └─────────────┘

Producers enqueue jobs from your application code. Workers pull jobs and execute them. The queue broker provides durable storage and delivery guarantees. The metadata store tracks job state, history, and scheduling information. The coordinator handles delayed jobs, recurring schedules, and dead job detection.

Here’s how to model the core entities:

from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from typing import Optional
import uuid

class JobStatus(Enum):
    PENDING = "pending"
    SCHEDULED = "scheduled"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    DEAD = "dead"  # Exhausted all retries

@dataclass
class Job:
    id: str
    queue_name: str
    handler: str  # e.g., "email.send_welcome"
    payload: dict
    status: JobStatus
    priority: int = 0
    max_retries: int = 3
    retry_count: int = 0
    scheduled_at: Optional[datetime] = None
    started_at: Optional[datetime] = None
    completed_at: Optional[datetime] = None
    locked_by: Optional[str] = None  # Worker ID
    locked_until: Optional[datetime] = None
    idempotency_key: Optional[str] = None
    
    @classmethod
    def create(cls, handler: str, payload: dict, **kwargs):
        return cls(
            id=str(uuid.uuid4()),
            queue_name=kwargs.get("queue", "default"),
            handler=handler,
            payload=payload,
            status=JobStatus.PENDING,
            **kwargs
        )

@dataclass
class TaskQueue:
    name: str
    max_concurrency: int = 10
    rate_limit: Optional[int] = None  # Jobs per second
    retry_delay_seconds: int = 60
    job_timeout_seconds: int = 300

Queue Design & Job Distribution

Partitioning your queues correctly determines whether your system scales gracefully or becomes a bottleneck.

Partition by job type when different jobs have vastly different execution characteristics. Image processing jobs shouldn’t block email sends.

Partition by tenant in multi-tenant systems to provide isolation and prevent noisy neighbors.

Partition by priority to ensure critical jobs get processed first.

Here’s a priority queue implementation using Redis sorted sets:

import redis
import json
import time

class RedisPriorityQueue:
    def __init__(self, redis_client: redis.Redis, queue_name: str):
        self.redis = redis_client
        self.queue_key = f"queue:{queue_name}"
        self.processing_key = f"processing:{queue_name}"
    
    def enqueue(self, job: Job):
        # Score = priority (inverted) + timestamp for FIFO within priority
        # Lower score = higher priority, processed first
        score = (10 - job.priority) * 1e12 + time.time()
        self.redis.zadd(
            self.queue_key,
            {json.dumps({"id": job.id, "handler": job.handler, "payload": job.payload}): score}
        )
    
    def dequeue(self, worker_id: str, timeout_seconds: int = 300) -> Optional[dict]:
        # Atomic move from queue to processing set
        # BZPOPMIN blocks until item available
        result = self.redis.bzpopmin(self.queue_key, timeout=1)
        if not result:
            return None
        
        _, job_data, _ = result
        job = json.loads(job_data)
        
        # Track in processing set with expiration score
        expire_at = time.time() + timeout_seconds
        self.redis.zadd(
            self.processing_key,
            {json.dumps({**job, "worker": worker_id}): expire_at}
        )
        return job
    
    def complete(self, job_id: str):
        # Remove from processing set
        # In production, you'd need to find by job_id, not full payload
        pass
    
    def requeue_stale_jobs(self):
        """Called periodically by coordinator to handle dead workers"""
        now = time.time()
        stale = self.redis.zrangebyscore(self.processing_key, 0, now)
        for job_data in stale:
            self.redis.zrem(self.processing_key, job_data)
            job = json.loads(job_data)
            # Re-enqueue with original priority
            self.redis.zadd(self.queue_key, {job_data: now})

Reliability & Failure Handling

Workers will crash. Networks will partition. Your system must handle this gracefully.

The pattern is straightforward: workers acquire a lock on jobs with a timeout, send periodic heartbeats, and the coordinator requeues jobs whose locks expired without completion.

import threading
import time
from contextlib import contextmanager

class Worker:
    def __init__(self, worker_id: str, queue: RedisPriorityQueue, redis_client: redis.Redis):
        self.worker_id = worker_id
        self.queue = queue
        self.redis = redis_client
        self.current_job = None
        self.heartbeat_interval = 30
        self.running = True
    
    def start_heartbeat(self):
        def heartbeat_loop():
            while self.running:
                if self.current_job:
                    # Extend lock timeout
                    self.redis.expire(
                        f"job_lock:{self.current_job['id']}", 
                        self.heartbeat_interval * 2
                    )
                time.sleep(self.heartbeat_interval)
        
        thread = threading.Thread(target=heartbeat_loop, daemon=True)
        thread.start()
    
    def execute_with_retry(self, job: dict, handler_fn, max_retries: int = 3):
        retry_count = 0
        while retry_count <= max_retries:
            try:
                self.current_job = job
                handler_fn(job["payload"])
                self.queue.complete(job["id"])
                return
            except Exception as e:
                retry_count += 1
                if retry_count > max_retries:
                    self.move_to_dead_letter(job, str(e))
                    raise
                
                # Exponential backoff: 1s, 2s, 4s, 8s...
                delay = min(2 ** retry_count, 300)  # Cap at 5 minutes
                time.sleep(delay)
            finally:
                self.current_job = None
    
    def move_to_dead_letter(self, job: dict, error: str):
        self.redis.lpush(
            f"dead_letter:{self.queue.queue_key}",
            json.dumps({**job, "error": error, "failed_at": time.time()})
        )

Scheduling Mechanisms

Delayed and recurring jobs require a different approach than immediate execution. You can’t just dump them in the main queue—you need a scheduling layer.

from croniter import croniter
from datetime import datetime, timedelta

class SchedulerService:
    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client
        self.scheduled_jobs_key = "scheduled_jobs"
        self.recurring_jobs_key = "recurring_jobs"
    
    def schedule_job(self, job: Job, run_at: datetime):
        """Schedule a one-time delayed job"""
        score = run_at.timestamp()
        self.redis.zadd(
            self.scheduled_jobs_key,
            {json.dumps(job.__dict__, default=str): score}
        )
    
    def schedule_recurring(self, job_template: dict, cron_expression: str):
        """Register a recurring job"""
        self.redis.hset(
            self.recurring_jobs_key,
            job_template["handler"],
            json.dumps({
                "template": job_template,
                "cron": cron_expression,
                "last_scheduled": None
            })
        )
    
    def poll_and_enqueue(self, queue: RedisPriorityQueue):
        """Called every second by the coordinator"""
        now = time.time()
        
        # Handle delayed jobs
        due_jobs = self.redis.zrangebyscore(
            self.scheduled_jobs_key, 0, now
        )
        for job_data in due_jobs:
            self.redis.zrem(self.scheduled_jobs_key, job_data)
            job_dict = json.loads(job_data)
            queue.enqueue(Job(**job_dict))
        
        # Handle recurring jobs
        for handler, config_data in self.redis.hgetall(self.recurring_jobs_key).items():
            config = json.loads(config_data)
            cron = croniter(config["cron"], datetime.now())
            next_run = cron.get_next(datetime)
            
            # If next run is within our polling window, schedule it
            if next_run.timestamp() <= now + 1:
                job = Job.create(**config["template"])
                queue.enqueue(job)

Scaling & Performance

Rate limiting prevents your workers from overwhelming downstream services:

class TokenBucketRateLimiter:
    def __init__(self, redis_client: redis.Redis, key: str, rate: int, capacity: int):
        self.redis = redis_client
        self.key = f"rate_limit:{key}"
        self.rate = rate  # Tokens per second
        self.capacity = capacity
    
    def acquire(self, tokens: int = 1) -> bool:
        now = time.time()
        
        # Lua script for atomic token bucket
        lua_script = """
        local key = KEYS[1]
        local rate = tonumber(ARGV[1])
        local capacity = tonumber(ARGV[2])
        local now = tonumber(ARGV[3])
        local requested = tonumber(ARGV[4])
        
        local bucket = redis.call('HMGET', key, 'tokens', 'last_update')
        local tokens = tonumber(bucket[1]) or capacity
        local last_update = tonumber(bucket[2]) or now
        
        local elapsed = now - last_update
        tokens = math.min(capacity, tokens + elapsed * rate)
        
        if tokens >= requested then
            tokens = tokens - requested
            redis.call('HMSET', key, 'tokens', tokens, 'last_update', now)
            redis.call('EXPIRE', key, 3600)
            return 1
        end
        return 0
        """
        
        result = self.redis.eval(
            lua_script, 1, self.key, 
            self.rate, self.capacity, now, tokens
        )
        return bool(result)

Monitor these metrics religiously: queue depth (jobs waiting), processing time p50/p95/p99, failure rate by job type, and worker utilization.

Technology Choices & Tradeoffs

Redis + Bull/BullMQ: Excellent for Node.js. Fast, feature-rich, but Redis persistence requires careful configuration.

RabbitMQ: Battle-tested, supports complex routing. Overkill for simple use cases.

Amazon SQS: Zero operational overhead. Limited to 14-day retention, no priority queues.

Kafka: Overkill for task queues. Use it when you need event streaming, not job processing.

PostgreSQL-based (custom): Surprisingly viable. Use SELECT FOR UPDATE SKIP LOCKED for atomic job claiming. Scales to thousands of jobs per second with proper indexing.

Build your own when you need deep integration with your domain model or have unusual requirements. Buy when reliability matters more than customization—battle-tested systems have handled edge cases you haven’t imagined yet.

For most teams, start with a managed solution. You can always migrate to something custom when you understand your actual requirements—not the ones you imagined.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.