System Design: Two-Phase Commit Protocol
When your data lives on a single database server, ACID transactions are straightforward. The database engine handles atomicity, consistency, isolation, and durability through well-understood...
Key Insights
- Two-Phase Commit (2PC) guarantees atomic transactions across distributed systems by splitting the commit process into a voting phase and a decision phase, ensuring all participants agree before any changes become permanent.
- The protocol’s blocking nature is its Achilles’ heel—if the coordinator fails after sending prepare messages but before broadcasting the decision, participants holding locks must wait indefinitely for recovery.
- Modern distributed systems often favor eventual consistency patterns like Sagas for long-running transactions, reserving 2PC for scenarios where strong consistency across a small number of participants is non-negotiable.
Introduction to Distributed Transactions
When your data lives on a single database server, ACID transactions are straightforward. The database engine handles atomicity, consistency, isolation, and durability through well-understood mechanisms like write-ahead logging and lock management. But the moment you split data across multiple nodes—whether for scaling, geographic distribution, or microservice boundaries—you inherit a fundamental problem: how do you ensure that operations spanning multiple systems either all succeed or all fail?
Consider a classic example: transferring money between two bank accounts stored in different databases. You need to debit Account A and credit Account B. If the debit succeeds but the credit fails, you’ve lost money into the void. If both operations complete independently without coordination, a crash between them leaves your system in an inconsistent state.
This is the distributed transaction problem, and Two-Phase Commit is the canonical solution that’s been deployed in production systems since the 1970s.
The Two-Phase Commit Protocol Explained
2PC divides the commit process into two distinct phases, orchestrated by a designated coordinator node.
Phase 1: Prepare (Voting)
The coordinator sends a PREPARE message to all participants. Each participant must decide whether it can commit the transaction. If a participant can commit, it:
- Writes all transaction data to durable storage
- Acquires any necessary locks
- Writes a “prepared” record to its transaction log
- Responds with
VOTE_COMMIT
If a participant cannot commit (constraint violation, resource unavailable), it responds with VOTE_ABORT and can immediately roll back.
Phase 2: Commit (Decision)
The coordinator collects all votes. The decision rule is simple:
- If ALL participants voted
VOTE_COMMIT: sendGLOBAL_COMMITto all - If ANY participant voted
VOTE_ABORT: sendGLOBAL_ABORTto all
Participants execute the decision and acknowledge. The transaction completes when all acknowledgments arrive.
Here’s the state machine representation:
from enum import Enum, auto
class CoordinatorState(Enum):
INIT = auto()
WAITING = auto() # Sent PREPARE, waiting for votes
COMMITTED = auto() # Decision: commit
ABORTED = auto() # Decision: abort
class ParticipantState(Enum):
INIT = auto()
PREPARED = auto() # Voted COMMIT, waiting for decision
COMMITTED = auto() # Received GLOBAL_COMMIT
ABORTED = auto() # Received GLOBAL_ABORT or voted ABORT
# State transitions
COORDINATOR_TRANSITIONS = {
CoordinatorState.INIT: [CoordinatorState.WAITING],
CoordinatorState.WAITING: [CoordinatorState.COMMITTED, CoordinatorState.ABORTED],
CoordinatorState.COMMITTED: [], # Terminal
CoordinatorState.ABORTED: [], # Terminal
}
PARTICIPANT_TRANSITIONS = {
ParticipantState.INIT: [ParticipantState.PREPARED, ParticipantState.ABORTED],
ParticipantState.PREPARED: [ParticipantState.COMMITTED, ParticipantState.ABORTED],
ParticipantState.COMMITTED: [], # Terminal
ParticipantState.ABORTED: [], # Terminal
}
Implementation Walkthrough
Let’s build a working 2PC implementation. The critical detail most tutorials skip: everything must be logged to durable storage before sending messages.
import uuid
import json
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from enum import Enum
import time
class TransactionState(Enum):
PENDING = "pending"
PREPARED = "prepared"
COMMITTED = "committed"
ABORTED = "aborted"
@dataclass
class TransactionLog:
"""Durable transaction log - in production, use actual persistent storage"""
log_file: str
def write(self, tx_id: str, state: str, data: dict = None):
entry = {"tx_id": tx_id, "state": state, "data": data, "timestamp": time.time()}
with open(self.log_file, "a") as f:
f.write(json.dumps(entry) + "\n")
def read_all(self) -> List[dict]:
try:
with open(self.log_file, "r") as f:
return [json.loads(line) for line in f]
except FileNotFoundError:
return []
class Participant:
def __init__(self, node_id: str, log: TransactionLog):
self.node_id = node_id
self.log = log
self.transactions: Dict[str, TransactionState] = {}
self.locked_resources: Dict[str, str] = {} # resource -> tx_id
def prepare(self, tx_id: str, operations: List[dict]) -> bool:
"""Phase 1: Prepare to commit. Returns True if ready."""
# Check if we can acquire all needed resources
for op in operations:
resource = op.get("resource")
if resource in self.locked_resources:
if self.locked_resources[resource] != tx_id:
# Resource locked by another transaction
self.log.write(tx_id, "vote_abort", {"reason": "resource_conflict"})
return False
# Simulate validation (constraints, business rules)
if not self._validate_operations(operations):
self.log.write(tx_id, "vote_abort", {"reason": "validation_failed"})
return False
# CRITICAL: Log prepared state BEFORE responding
self.log.write(tx_id, "prepared", {"operations": operations})
# Acquire locks
for op in operations:
resource = op.get("resource")
if resource:
self.locked_resources[resource] = tx_id
self.transactions[tx_id] = TransactionState.PREPARED
return True
def commit(self, tx_id: str) -> bool:
"""Phase 2: Execute the commit."""
if self.transactions.get(tx_id) != TransactionState.PREPARED:
return False
# Log commit decision BEFORE applying
self.log.write(tx_id, "committed")
# Apply changes (in real system, this executes the actual operations)
self._apply_operations(tx_id)
# Release locks
self._release_locks(tx_id)
self.transactions[tx_id] = TransactionState.COMMITTED
return True
def abort(self, tx_id: str) -> bool:
"""Abort and rollback."""
self.log.write(tx_id, "aborted")
self._release_locks(tx_id)
self.transactions[tx_id] = TransactionState.ABORTED
return True
def _validate_operations(self, operations: List[dict]) -> bool:
# Implement business validation logic
return True
def _apply_operations(self, tx_id: str):
# Apply the actual data changes
pass
def _release_locks(self, tx_id: str):
self.locked_resources = {
r: t for r, t in self.locked_resources.items() if t != tx_id
}
class Coordinator:
def __init__(self, log: TransactionLog):
self.log = log
self.transactions: Dict[str, dict] = {}
def begin_transaction(self, participants: List[Participant],
operations: Dict[str, List[dict]]) -> str:
tx_id = str(uuid.uuid4())
self.transactions[tx_id] = {
"participants": participants,
"operations": operations,
"state": TransactionState.PENDING
}
self.log.write(tx_id, "started", {"participant_ids": [p.node_id for p in participants]})
return tx_id
def execute(self, tx_id: str) -> bool:
"""Execute the 2PC protocol."""
tx = self.transactions[tx_id]
participants = tx["participants"]
operations = tx["operations"]
# Phase 1: Prepare
votes = []
for participant in participants:
participant_ops = operations.get(participant.node_id, [])
vote = participant.prepare(tx_id, participant_ops)
votes.append(vote)
# Log decision BEFORE sending to participants
if all(votes):
self.log.write(tx_id, "decision_commit")
# Phase 2: Commit
for participant in participants:
participant.commit(tx_id)
self.transactions[tx_id]["state"] = TransactionState.COMMITTED
return True
else:
self.log.write(tx_id, "decision_abort")
# Phase 2: Abort
for participant in participants:
participant.abort(tx_id)
self.transactions[tx_id]["state"] = TransactionState.ABORTED
return False
Failure Scenarios and Recovery
The 2PC protocol handles failures through its logging discipline. Here’s how recovery works:
class RecoveryManager:
def __init__(self, log: TransactionLog):
self.log = log
def recover_participant(self) -> Dict[str, str]:
"""Recover participant state from transaction log."""
entries = self.log.read_all()
tx_states: Dict[str, str] = {}
for entry in entries:
tx_id = entry["tx_id"]
state = entry["state"]
tx_states[tx_id] = state
in_doubt = []
actions = {}
for tx_id, state in tx_states.items():
if state == "prepared":
# IN-DOUBT: We voted commit but never got decision
# Must wait for coordinator or query it
in_doubt.append(tx_id)
actions[tx_id] = "query_coordinator"
elif state == "committed":
# Ensure commit was applied
actions[tx_id] = "ensure_committed"
elif state == "aborted":
# Ensure rollback was applied
actions[tx_id] = "ensure_aborted"
return actions
def recover_coordinator(self) -> Dict[str, str]:
"""Recover coordinator state and complete pending transactions."""
entries = self.log.read_all()
tx_states: Dict[str, str] = {}
for entry in entries:
tx_id = entry["tx_id"]
state = entry["state"]
tx_states[tx_id] = state
actions = {}
for tx_id, state in tx_states.items():
if state == "started":
# Never reached decision - safe to abort
actions[tx_id] = "abort"
elif state == "decision_commit":
# Must ensure all participants committed
actions[tx_id] = "retry_commit"
elif state == "decision_abort":
# Must ensure all participants aborted
actions[tx_id] = "retry_abort"
return actions
Limitations and the Blocking Problem
Here’s the uncomfortable truth about 2PC: it’s a blocking protocol. When a participant enters the PREPARED state, it has voted to commit and holds locks on resources. It cannot unilaterally decide to commit or abort—it must wait for the coordinator’s decision.
If the coordinator crashes after logging its decision but before broadcasting it, participants are stuck. They’re holding locks, blocking other transactions, and cannot make progress. This is the blocking problem.
The implications are severe:
- Latency: Every transaction requires at least two round-trips
- Lock duration: Resources are locked from PREPARE until COMMIT/ABORT acknowledgment
- Availability: Coordinator failure blocks all in-flight transactions
- CAP theorem: 2PC chooses consistency over availability during partitions
Alternatives and Enhancements
For long-running transactions or when availability matters more than strict consistency, the Saga pattern offers an alternative:
from dataclasses import dataclass
from typing import Callable, List
@dataclass
class SagaStep:
name: str
action: Callable
compensation: Callable
class SagaOrchestrator:
def __init__(self, steps: List[SagaStep]):
self.steps = steps
self.completed_steps: List[SagaStep] = []
def execute(self, context: dict) -> bool:
for step in self.steps:
try:
step.action(context)
self.completed_steps.append(step)
except Exception as e:
print(f"Step {step.name} failed: {e}")
self._compensate(context)
return False
return True
def _compensate(self, context: dict):
"""Execute compensating transactions in reverse order."""
for step in reversed(self.completed_steps):
try:
step.compensation(context)
except Exception as e:
print(f"Compensation for {step.name} failed: {e}")
# Log for manual intervention
# Usage
def debit_account(ctx):
print(f"Debiting {ctx['amount']} from {ctx['from_account']}")
def credit_account(ctx):
print(f"Crediting {ctx['amount']} to {ctx['to_account']}")
def reverse_debit(ctx):
print(f"Reversing debit of {ctx['amount']} to {ctx['from_account']}")
def reverse_credit(ctx):
print(f"Reversing credit of {ctx['amount']} from {ctx['to_account']}")
transfer_saga = SagaOrchestrator([
SagaStep("debit", debit_account, reverse_debit),
SagaStep("credit", credit_account, reverse_credit),
])
Three-Phase Commit (3PC) adds a pre-commit phase to reduce blocking but still fails under network partitions. Paxos and Raft provide consensus without a single coordinator but add complexity. Choose based on your consistency requirements and failure tolerance.
Production Considerations
When deploying 2PC:
- Set aggressive timeouts: A participant waiting in PREPARED state should timeout and query the coordinator, not wait forever
- Monitor coordinator health: Use leader election (via Raft/ZooKeeper) for coordinator high availability
- Make operations idempotent: Retries during recovery must be safe to repeat
- Limit participant count: 2PC latency grows linearly with participants; keep transactions small
- Consider your actual needs: If you’re building microservices with eventual consistency tolerance, Sagas are usually the better choice
2PC remains the right tool when you need strong consistency across a small number of databases—think financial systems, inventory management, or any domain where “eventually consistent” means “occasionally wrong in expensive ways.”