System Design: Two-Phase Commit Protocol

When your data lives on a single database server, ACID transactions are straightforward. The database engine handles atomicity, consistency, isolation, and durability through well-understood...

Key Insights

  • Two-Phase Commit (2PC) guarantees atomic transactions across distributed systems by splitting the commit process into a voting phase and a decision phase, ensuring all participants agree before any changes become permanent.
  • The protocol’s blocking nature is its Achilles’ heel—if the coordinator fails after sending prepare messages but before broadcasting the decision, participants holding locks must wait indefinitely for recovery.
  • Modern distributed systems often favor eventual consistency patterns like Sagas for long-running transactions, reserving 2PC for scenarios where strong consistency across a small number of participants is non-negotiable.

Introduction to Distributed Transactions

When your data lives on a single database server, ACID transactions are straightforward. The database engine handles atomicity, consistency, isolation, and durability through well-understood mechanisms like write-ahead logging and lock management. But the moment you split data across multiple nodes—whether for scaling, geographic distribution, or microservice boundaries—you inherit a fundamental problem: how do you ensure that operations spanning multiple systems either all succeed or all fail?

Consider a classic example: transferring money between two bank accounts stored in different databases. You need to debit Account A and credit Account B. If the debit succeeds but the credit fails, you’ve lost money into the void. If both operations complete independently without coordination, a crash between them leaves your system in an inconsistent state.

This is the distributed transaction problem, and Two-Phase Commit is the canonical solution that’s been deployed in production systems since the 1970s.

The Two-Phase Commit Protocol Explained

2PC divides the commit process into two distinct phases, orchestrated by a designated coordinator node.

Phase 1: Prepare (Voting)

The coordinator sends a PREPARE message to all participants. Each participant must decide whether it can commit the transaction. If a participant can commit, it:

  1. Writes all transaction data to durable storage
  2. Acquires any necessary locks
  3. Writes a “prepared” record to its transaction log
  4. Responds with VOTE_COMMIT

If a participant cannot commit (constraint violation, resource unavailable), it responds with VOTE_ABORT and can immediately roll back.

Phase 2: Commit (Decision)

The coordinator collects all votes. The decision rule is simple:

  • If ALL participants voted VOTE_COMMIT: send GLOBAL_COMMIT to all
  • If ANY participant voted VOTE_ABORT: send GLOBAL_ABORT to all

Participants execute the decision and acknowledge. The transaction completes when all acknowledgments arrive.

Here’s the state machine representation:

from enum import Enum, auto

class CoordinatorState(Enum):
    INIT = auto()
    WAITING = auto()      # Sent PREPARE, waiting for votes
    COMMITTED = auto()    # Decision: commit
    ABORTED = auto()      # Decision: abort

class ParticipantState(Enum):
    INIT = auto()
    PREPARED = auto()     # Voted COMMIT, waiting for decision
    COMMITTED = auto()    # Received GLOBAL_COMMIT
    ABORTED = auto()      # Received GLOBAL_ABORT or voted ABORT

# State transitions
COORDINATOR_TRANSITIONS = {
    CoordinatorState.INIT: [CoordinatorState.WAITING],
    CoordinatorState.WAITING: [CoordinatorState.COMMITTED, CoordinatorState.ABORTED],
    CoordinatorState.COMMITTED: [],  # Terminal
    CoordinatorState.ABORTED: [],    # Terminal
}

PARTICIPANT_TRANSITIONS = {
    ParticipantState.INIT: [ParticipantState.PREPARED, ParticipantState.ABORTED],
    ParticipantState.PREPARED: [ParticipantState.COMMITTED, ParticipantState.ABORTED],
    ParticipantState.COMMITTED: [],  # Terminal
    ParticipantState.ABORTED: [],    # Terminal
}

Implementation Walkthrough

Let’s build a working 2PC implementation. The critical detail most tutorials skip: everything must be logged to durable storage before sending messages.

import uuid
import json
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from enum import Enum
import time

class TransactionState(Enum):
    PENDING = "pending"
    PREPARED = "prepared"
    COMMITTED = "committed"
    ABORTED = "aborted"

@dataclass
class TransactionLog:
    """Durable transaction log - in production, use actual persistent storage"""
    log_file: str
    
    def write(self, tx_id: str, state: str, data: dict = None):
        entry = {"tx_id": tx_id, "state": state, "data": data, "timestamp": time.time()}
        with open(self.log_file, "a") as f:
            f.write(json.dumps(entry) + "\n")
    
    def read_all(self) -> List[dict]:
        try:
            with open(self.log_file, "r") as f:
                return [json.loads(line) for line in f]
        except FileNotFoundError:
            return []

class Participant:
    def __init__(self, node_id: str, log: TransactionLog):
        self.node_id = node_id
        self.log = log
        self.transactions: Dict[str, TransactionState] = {}
        self.locked_resources: Dict[str, str] = {}  # resource -> tx_id
    
    def prepare(self, tx_id: str, operations: List[dict]) -> bool:
        """Phase 1: Prepare to commit. Returns True if ready."""
        # Check if we can acquire all needed resources
        for op in operations:
            resource = op.get("resource")
            if resource in self.locked_resources:
                if self.locked_resources[resource] != tx_id:
                    # Resource locked by another transaction
                    self.log.write(tx_id, "vote_abort", {"reason": "resource_conflict"})
                    return False
        
        # Simulate validation (constraints, business rules)
        if not self._validate_operations(operations):
            self.log.write(tx_id, "vote_abort", {"reason": "validation_failed"})
            return False
        
        # CRITICAL: Log prepared state BEFORE responding
        self.log.write(tx_id, "prepared", {"operations": operations})
        
        # Acquire locks
        for op in operations:
            resource = op.get("resource")
            if resource:
                self.locked_resources[resource] = tx_id
        
        self.transactions[tx_id] = TransactionState.PREPARED
        return True
    
    def commit(self, tx_id: str) -> bool:
        """Phase 2: Execute the commit."""
        if self.transactions.get(tx_id) != TransactionState.PREPARED:
            return False
        
        # Log commit decision BEFORE applying
        self.log.write(tx_id, "committed")
        
        # Apply changes (in real system, this executes the actual operations)
        self._apply_operations(tx_id)
        
        # Release locks
        self._release_locks(tx_id)
        self.transactions[tx_id] = TransactionState.COMMITTED
        return True
    
    def abort(self, tx_id: str) -> bool:
        """Abort and rollback."""
        self.log.write(tx_id, "aborted")
        self._release_locks(tx_id)
        self.transactions[tx_id] = TransactionState.ABORTED
        return True
    
    def _validate_operations(self, operations: List[dict]) -> bool:
        # Implement business validation logic
        return True
    
    def _apply_operations(self, tx_id: str):
        # Apply the actual data changes
        pass
    
    def _release_locks(self, tx_id: str):
        self.locked_resources = {
            r: t for r, t in self.locked_resources.items() if t != tx_id
        }

class Coordinator:
    def __init__(self, log: TransactionLog):
        self.log = log
        self.transactions: Dict[str, dict] = {}
    
    def begin_transaction(self, participants: List[Participant], 
                          operations: Dict[str, List[dict]]) -> str:
        tx_id = str(uuid.uuid4())
        self.transactions[tx_id] = {
            "participants": participants,
            "operations": operations,
            "state": TransactionState.PENDING
        }
        self.log.write(tx_id, "started", {"participant_ids": [p.node_id for p in participants]})
        return tx_id
    
    def execute(self, tx_id: str) -> bool:
        """Execute the 2PC protocol."""
        tx = self.transactions[tx_id]
        participants = tx["participants"]
        operations = tx["operations"]
        
        # Phase 1: Prepare
        votes = []
        for participant in participants:
            participant_ops = operations.get(participant.node_id, [])
            vote = participant.prepare(tx_id, participant_ops)
            votes.append(vote)
        
        # Log decision BEFORE sending to participants
        if all(votes):
            self.log.write(tx_id, "decision_commit")
            # Phase 2: Commit
            for participant in participants:
                participant.commit(tx_id)
            self.transactions[tx_id]["state"] = TransactionState.COMMITTED
            return True
        else:
            self.log.write(tx_id, "decision_abort")
            # Phase 2: Abort
            for participant in participants:
                participant.abort(tx_id)
            self.transactions[tx_id]["state"] = TransactionState.ABORTED
            return False

Failure Scenarios and Recovery

The 2PC protocol handles failures through its logging discipline. Here’s how recovery works:

class RecoveryManager:
    def __init__(self, log: TransactionLog):
        self.log = log
    
    def recover_participant(self) -> Dict[str, str]:
        """Recover participant state from transaction log."""
        entries = self.log.read_all()
        tx_states: Dict[str, str] = {}
        
        for entry in entries:
            tx_id = entry["tx_id"]
            state = entry["state"]
            tx_states[tx_id] = state
        
        in_doubt = []
        actions = {}
        
        for tx_id, state in tx_states.items():
            if state == "prepared":
                # IN-DOUBT: We voted commit but never got decision
                # Must wait for coordinator or query it
                in_doubt.append(tx_id)
                actions[tx_id] = "query_coordinator"
            elif state == "committed":
                # Ensure commit was applied
                actions[tx_id] = "ensure_committed"
            elif state == "aborted":
                # Ensure rollback was applied
                actions[tx_id] = "ensure_aborted"
        
        return actions
    
    def recover_coordinator(self) -> Dict[str, str]:
        """Recover coordinator state and complete pending transactions."""
        entries = self.log.read_all()
        tx_states: Dict[str, str] = {}
        
        for entry in entries:
            tx_id = entry["tx_id"]
            state = entry["state"]
            tx_states[tx_id] = state
        
        actions = {}
        for tx_id, state in tx_states.items():
            if state == "started":
                # Never reached decision - safe to abort
                actions[tx_id] = "abort"
            elif state == "decision_commit":
                # Must ensure all participants committed
                actions[tx_id] = "retry_commit"
            elif state == "decision_abort":
                # Must ensure all participants aborted
                actions[tx_id] = "retry_abort"
        
        return actions

Limitations and the Blocking Problem

Here’s the uncomfortable truth about 2PC: it’s a blocking protocol. When a participant enters the PREPARED state, it has voted to commit and holds locks on resources. It cannot unilaterally decide to commit or abort—it must wait for the coordinator’s decision.

If the coordinator crashes after logging its decision but before broadcasting it, participants are stuck. They’re holding locks, blocking other transactions, and cannot make progress. This is the blocking problem.

The implications are severe:

  • Latency: Every transaction requires at least two round-trips
  • Lock duration: Resources are locked from PREPARE until COMMIT/ABORT acknowledgment
  • Availability: Coordinator failure blocks all in-flight transactions
  • CAP theorem: 2PC chooses consistency over availability during partitions

Alternatives and Enhancements

For long-running transactions or when availability matters more than strict consistency, the Saga pattern offers an alternative:

from dataclasses import dataclass
from typing import Callable, List

@dataclass
class SagaStep:
    name: str
    action: Callable
    compensation: Callable

class SagaOrchestrator:
    def __init__(self, steps: List[SagaStep]):
        self.steps = steps
        self.completed_steps: List[SagaStep] = []
    
    def execute(self, context: dict) -> bool:
        for step in self.steps:
            try:
                step.action(context)
                self.completed_steps.append(step)
            except Exception as e:
                print(f"Step {step.name} failed: {e}")
                self._compensate(context)
                return False
        return True
    
    def _compensate(self, context: dict):
        """Execute compensating transactions in reverse order."""
        for step in reversed(self.completed_steps):
            try:
                step.compensation(context)
            except Exception as e:
                print(f"Compensation for {step.name} failed: {e}")
                # Log for manual intervention

# Usage
def debit_account(ctx): 
    print(f"Debiting {ctx['amount']} from {ctx['from_account']}")

def credit_account(ctx): 
    print(f"Crediting {ctx['amount']} to {ctx['to_account']}")

def reverse_debit(ctx): 
    print(f"Reversing debit of {ctx['amount']} to {ctx['from_account']}")

def reverse_credit(ctx): 
    print(f"Reversing credit of {ctx['amount']} from {ctx['to_account']}")

transfer_saga = SagaOrchestrator([
    SagaStep("debit", debit_account, reverse_debit),
    SagaStep("credit", credit_account, reverse_credit),
])

Three-Phase Commit (3PC) adds a pre-commit phase to reduce blocking but still fails under network partitions. Paxos and Raft provide consensus without a single coordinator but add complexity. Choose based on your consistency requirements and failure tolerance.

Production Considerations

When deploying 2PC:

  1. Set aggressive timeouts: A participant waiting in PREPARED state should timeout and query the coordinator, not wait forever
  2. Monitor coordinator health: Use leader election (via Raft/ZooKeeper) for coordinator high availability
  3. Make operations idempotent: Retries during recovery must be safe to repeat
  4. Limit participant count: 2PC latency grows linearly with participants; keep transactions small
  5. Consider your actual needs: If you’re building microservices with eventual consistency tolerance, Sagas are usually the better choice

2PC remains the right tool when you need strong consistency across a small number of databases—think financial systems, inventory management, or any domain where “eventually consistent” means “occasionally wrong in expensive ways.”

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.