Distributed Systems
Clocks, consensus, and the hard problems of building at scale.
Clocks & Time in Distributed Systems
In a single machine, time is simple โ one clock, one truth. In a distributed system, every node has its own clock, and those clocks drift apart. NTP in a well-configured datacenter achieves ~1ms precision between servers. Over the public internet, NTP precision degrades to 50โ100ms. Google's internal systems observed clock skew of up to 200ms between machines using standard NTP โ which is why Google built TrueTime for Spanner, bounding uncertainty to ~7ms using GPS receivers and atomic clocks in each datacenter. When two events happen on different machines "at the same time," you cannot determine which happened first using wall clocks alone. This is not a bug โ it is a fundamental property of physics in distributed systems.
Lamport Timestamps
- Each node maintains a counter, incremented on every event
- On message send: attach counter. On receive: max(local, received) + 1
- Guarantees: if AโB (causally), then timestamp(A) < timestamp(B)
- Limitation: cannot determine if two events are concurrent
- Simple, compact (single integer), widely used
Vector Clocks
- Each node maintains a vector of N counters (one per node)
- Can detect both causal ordering AND concurrency
- If V(A) < V(B) in all positions: A happened before B
- If neither dominates: events are concurrent โ conflict
- Cost: O(N) storage per event. Used by Riak, Dynamo
Hybrid Logical Clocks (HLC) combine physical time with logical counters โ giving you a timestamp that respects causality while staying close to wall-clock time. Used by CockroachDB and Spanner. Google Spanner uses TrueTime (GPS + atomic clocks) to bound clock uncertainty to ~7ms โ expensive hardware solving a fundamental physics problem.
โ ๏ธ Clock Skew Causes Real Data Loss
Last-Write-Wins conflict resolution (used by Cassandra and DynamoDB by default) uses timestamps to decide which write wins. If two nodes have clocks 100ms apart, the node with the slower clock will always lose โ its writes appear to have happened earlier even when they occurred later in real time. This is not theoretical โ it causes real data loss in production LWW systems. Solutions: use logical clocks for LWW comparison, or use application-level version vectors instead of physical timestamps.
The impossibility of perfect time ordering in asynchronous distributed systems is provable. The FLP impossibility theorem (Fischer, Lynch, Paterson 1985) states that in an asynchronous system, no consensus protocol can guarantee termination if even one node might crash. Time synchronization faces the same class of impossibility. Every practical solution (NTP, Lamport, vector clocks, TrueTime) is an engineering compromise โ not a solution to the fundamental problem.
There is no global clock in a distributed system. Events on different machines cannot be ordered by timestamp alone. You must choose: Lamport clocks (total order, no concurrency detection), vector clocks (detect concurrency, O(N) cost), or hardware solutions (GPS/atomic clocks โ expensive, bounded uncertainty).
- Wall clocks drift โ NTP: ~1ms in datacenter, 50โ100ms over internet. Insufficient for cross-node ordering.
- Lamport timestamps: single counter, total order, cannot detect concurrency.
- Vector clocks: N counters, detect causality AND concurrency. O(N) per event.
- HLC: physical + logical hybrid. Used by CockroachDB, Spanner.
- Clock skew + LWW: causes real data loss when comparing timestamps across nodes. Use logical clocks or version vectors instead.
- FLP impossibility: perfect ordering in async systems is provably impossible. All practical solutions are engineering compromises.
Consensus Algorithms
Consensus is the problem of getting multiple nodes to agree on a single value โ even when some nodes crash, messages are lost, and the network partitions. It sounds simple until you realize that in an asynchronous system, you cannot distinguish a crashed node from a slow one. That impossibility (FLP theorem) means every practical consensus algorithm makes trade-offs: timeouts, leader election, quorum sizes. Paxos solved it first. Raft made it understandable.
- First proven consensus algorithm (Lamport, 1989)
- Correct, fault-tolerant, foundational
- Notoriously difficult to understand and implement
- Multi-Paxos for practical repeated consensus
- Used in: Google Chubby, Spanner (internally)
- Designed for understandability (Ongaro, 2014)
- Same guarantees as Paxos, simpler to implement
- Strong leader model: all writes go through leader
- Clear phases: election โ normal operation โ recovery
- Used in: etcd, Consul, CockroachDB, TiKV
Raft nodes are in one of three states: Follower, Candidate, or Leader. All nodes start as Followers. The election process is sequential:
- Election trigger: a Follower does not hear from a Leader within the election timeout (randomized 150โ300ms). It assumes the Leader has failed and becomes a Candidate.
- Voting: the Candidate increments its term number, votes for itself, and sends
RequestVoteRPCs to all other nodes. A node grants its vote if it has not voted in this term and the Candidate's log is at least as up-to-date as its own. - Majority wins: if the Candidate receives votes from a majority (N/2 + 1), it becomes the new Leader and immediately sends heartbeats (
AppendEntrieswith empty payload) to prevent new elections. - Split vote: if no Candidate wins a majority (two Candidates start simultaneously), all wait for a new randomized timeout and try again. The randomization makes simultaneous elections unlikely.
Why randomized timeouts matter: if all nodes used the same timeout, they would all call elections simultaneously every time the Leader failed. The randomization ensures one node almost always starts its election before others โ preventing indefinite election loops.
Once a Leader is elected, it handles all client writes. Each write is appended to the Leader's log as an uncommitted entry. The Leader sends AppendEntries RPC to all Followers in parallel. Once a majority of nodes (including the Leader) have written the entry to their logs, the Leader commits the entry and applies it to its state machine. The Leader then notifies Followers on the next heartbeat, and they commit in turn.
Consistency guarantee: if an entry is committed, it will not be lost even if the Leader fails. This is because the entry exists on a majority of nodes, and any new Leader elected must have all committed entries (Raft's log matching property ensures this).
โ๏ธ Practical Raft Considerations
Cluster size: 3-node cluster tolerates 1 failure. 5-node cluster tolerates 2 failures. 7-node tolerates 3. Beyond 7 nodes, coordination overhead grows faster than the fault tolerance benefit. Most production systems use 3 or 5 nodes for their consensus layer (etcd, Consul), not 7.
Read scaling: Raft by default requires reads to go through the Leader to guarantee linearizability. For read-heavy workloads, this is a bottleneck. Solutions: leader leases (Leader grants itself permission to serve reads for a bounded time), or follower reads with acceptable staleness (like CockroachDB's stale reads).
Consensus is not "agreement" โ it is agreement under failure. Any protocol can agree when all nodes are healthy. The hard requirement is maintaining agreement when nodes crash, restart, and the network splits. Raft is the default choice for new systems today โ same guarantees as Paxos, implementable by humans.
- FLP impossibility: in an async system, consensus is impossible if even one node can crash. Practical algorithms use timeouts.
- Paxos: correct but notoriously hard to implement. Foundation of the field.
- Raft: understandable alternative. Leader-based. Used in etcd, Consul, CockroachDB.
- Raft election: randomized timeouts, majority vote, term numbers prevent split-brain.
- Log replication: Leader appends โ replicates to majority โ commits โ Followers apply.
- Cluster size: 3 nodes (1 failure tolerance), 5 nodes (2 failures). Beyond 7, coordination overhead outweighs benefit.
- Read scaling: leader leases or stale reads for read-heavy workloads.
- Quorum: committed when majority (N/2 + 1) acknowledges. Tolerates โ(N-1)/2โ failures.
Distributed Transactions
Single-database transactions are well-understood: BEGIN, do work, COMMIT or ROLLBACK. But when a business operation spans multiple services โ each with its own database โ how do you ensure atomicity? Either all succeed or all roll back. Two-Phase Commit (2PC) was the traditional answer; the Saga pattern is the modern one. Both have significant trade-offs, and choosing wrong means either blocking failures or inconsistent states that require manual repair.
โ ๏ธ The Best Strategy: Avoid Them
The best distributed transaction strategy is to design your service boundaries so that you do not need distributed transactions. A business operation requiring atomic changes to three services' databases is a signal that the service decomposition is wrong โ those services may belong in the same service with a single database.
Before reaching for 2PC or sagas, ask: does this operation actually cross a natural domain boundary, or have we artificially split a cohesive domain into separate services? When you genuinely need it โ different bounded contexts, different teams, different scaling rates โ then choose between 2PC and sagas based on the consistency requirement.
- Strong atomicity: all-or-nothing across participants
- Blocking: coordinator failure = all participants stuck
- Requires all participants available simultaneously
- Higher latency (2 round trips minimum)
- Use when: strong consistency required, few participants
- Eventual consistency via compensating transactions
- Non-blocking: each step independent, retryable
- Participants don't need to be available simultaneously
- Complex: must design compensation for every step
- Use when: long-running workflows, microservices, high availability
Choreography Saga
- Services emit events, other services react
- No central coordinator โ decentralized
- Simple for 2โ3 steps. Chaotic for 5+
- Hard to track overall progress
- Example: Order โ Payment โ Inventory โ Shipping (each listens)
Orchestration Saga
- Central orchestrator directs each step
- Knows the overall workflow state
- Easier to reason about, debug, and monitor
- Orchestrator is a potential SPOF (needs HA)
- Example: OrderSaga calls Payment, then Inventory, then Shipping
๐ Three-Phase Commit (3PC)
3PC adds a pre-commit phase to reduce 2PC's blocking problem: after all participants vote Yes, the coordinator sends a pre-commit before the final commit. If the coordinator crashes after pre-commit, participants can safely commit rather than blocking indefinitely. However, 3PC is still not safe under network partitions โ a partition during pre-commit can lead to inconsistency. This is why 3PC is rarely used in practice. Modern systems choose between 2PC (databases with XA support) and sagas (microservices).
๐ Saga Idempotency Requirement
Every saga step and its compensating transaction must be idempotent. The orchestrator may retry a step multiple times if it does not receive an acknowledgment (due to network failure). If the step is not idempotent, retrying creates duplicate actions โ charging a payment twice, reserving inventory twice. Use idempotency keys on every saga step: the orchestrator generates a unique key per step per saga execution, and the executing service stores the result โ returning the stored result on duplicate requests.
2PC gives you strong consistency at the cost of availability. Sagas give you availability at the cost of complexity. In microservices, sagas dominate โ because having all services available simultaneously for a 2PC lock is unrealistic at scale. But sagas require designing compensation logic for every step โ "undo payment," "release reservation" โ which is real engineering work.
- Prefer avoiding distributed transactions: cross-service atomicity is a warning sign of wrong service decomposition.
- 2PC: strong atomicity, blocking on coordinator failure. Good for few participants, strong consistency needs.
- 3PC: adds pre-commit to reduce blocking, but still unsafe under network partitions. Rarely used.
- Saga: compensating transactions, non-blocking. Good for microservices, long-running workflows.
- Choreography: event-driven, decentralized. Simple for few steps, chaotic at scale.
- Orchestration: central coordinator, easy to monitor. Orchestrator needs high availability.
- Saga idempotency: every step and compensation must be idempotent. Use idempotency keys โ orchestrator may retry any step.
Replication Strategies Deep Dive
Replication exists for two reasons: fault tolerance (if one copy dies, others survive) and performance (serve reads from nearby replicas). But the choice of replication strategy determines your consistency guarantees, write scalability, and failure modes. Single-leader is simple but limits write throughput. Multi-leader scales writes but creates conflicts. Leaderless avoids coordination but requires quorum math. Every database you use makes one of these choices โ understanding which helps you predict its behavior under failure.
Single-Leader
- One node accepts all writes
- Followers replicate from leader
- Strong consistency possible
- Write bottleneck at leader
- Leader failure = brief unavailability
- Used: PostgreSQL, MySQL, MongoDB
Multi-Leader
- Multiple nodes accept writes
- Each leader replicates to others
- Write scalability across regions
- Conflict resolution required
- Complex: concurrent writes to same key
- Used: CockroachDB, multi-DC MySQL
Leaderless (Dynamo)
- Any node accepts reads/writes
- Quorum: W + R > N for consistency
- No single point of failure for writes
- Eventual consistency by default
- Read repair + anti-entropy for convergence
- Used: Cassandra, DynamoDB, Riak
Synchronous
- Leader waits for all replicas to acknowledge before confirming write
- Zero data loss on failover โ every ack'd write exists on all nodes
- Cost: write latency = latency to the slowest replica
- If one replica is slow or unavailable, all writes block
- PostgreSQL:
synchronous_commit=on - Use for: financial data, audit trails โ zero loss is non-negotiable
Asynchronous
- Leader acknowledges write immediately after its own log write
- Replication happens in the background
- Write latency: minimal โ not affected by replica speed
- Cost: if Leader fails before replicas catch up, ack'd writes are lost
- PostgreSQL default. Most MySQL setups.
- Use for: high-throughput, stale replicas acceptable, latency matters
Semi-Synchronous
- Leader waits for at least one replica (not all) to acknowledge
- Reduces data loss risk without full sync latency
- MySQL semi-sync replication
- Middle ground: one confirmed copy beyond the Leader
- If the one confirmed replica fails, degrades to async
- Use for: balanced durability and write performance
Replication strategy is not a database choice โ it is a system design choice. Single-leader gives you simplicity and strong consistency. Multi-leader gives you write availability across regions. Leaderless gives you partition tolerance with tunable consistency. Know which your database uses, and design your application around its guarantees.
Replication lag is the delay between a write committed on the Leader and visible on a Follower. In a healthy cluster: milliseconds. Under high write load or a slow replica: seconds or minutes.
The read-after-write problem: a user writes data (posts a comment), then immediately reads from a replica that has not yet received the write. The user sees their comment missing โ as if the write was lost. The write was not lost; it is on the Leader and will propagate. But the user's experience is broken.
Solutions: route reads to the primary for a short window (typically 60 seconds) after any write from that user. Track the replication position on each write and route reads to replicas that have caught up to at least that position. For critical reads (a user's own profile after editing), always read from the primary.
- Single-leader: simple, strong consistency possible. Write bottleneck. PostgreSQL, MySQL.
- Multi-leader: write scalability across regions. Conflict resolution complexity. CockroachDB.
- Leaderless: no write SPOF. Quorum: W+R > N for consistency. Cassandra, DynamoDB.
- Synchronous: zero data loss, write latency = slowest replica. Asynchronous: fast writes, risk of loss on failover.
- Semi-synchronous: wait for one replica โ balanced durability and latency.
- Quorum tuning: W=1,R=N (fast write) vs W=N,R=1 (fast read) vs balanced W=R=majority.
- Read-after-write: route reads to primary for 60 seconds after user writes, or track replication position for routing.
Gossip Protocols & Failure Detection
In a cluster of 100 nodes, how does each node know which others are alive? A central coordinator creates a single point of failure. Polling all nodes is O(Nยฒ). Gossip protocols solve this elegantly: each node periodically tells a few random peers what it knows. Information spreads exponentially โ like a rumor. Within O(log N) rounds, every node has the full picture. No coordinator, no single point of failure, and bandwidth cost per node is constant regardless of cluster size.
Gossip Protocol Properties
- Scalable: O(1) per-node bandwidth, O(log N) convergence
- Fault tolerant: no SPOF, continues if nodes fail
- Eventually consistent: all nodes converge to same view
- Probabilistic: not guaranteed in exact N rounds
- Used by: Cassandra, Consul, SWIM, Serf
Failure Detection
- Heartbeat: periodic "I'm alive" messages (simple but slow)
- Phi Accrual: probabilistic โ suspects with confidence level
- SWIM: combines gossip with failure detection in one protocol
- Challenge: slow node โ dead node. Don't evict too eagerly
- False positives โ unnecessary data rebalancing (expensive)
Gossip tells nodes what other nodes exist and their health. But when a node comes back online after a failure, its data is stale. Three mechanisms reconcile data differences:
Anti-Entropy (Merkle Trees)
- Background process: nodes periodically compare data with a neighbor
- Uses a Merkle tree โ hashes subtrees of data hierarchically
- Comparing root hashes instantly tells if any data differs
- If roots differ: traverse tree to find exactly which keys diverged
- Only diverged keys are synchronized โ minimal transfer
- Cassandra's
nodetool repairโ required operational task
Read Repair
- Coordinator reads from multiple replicas on read request
- If responses differ (stale replica returns old value) โ sends latest value back to stale replica
- Passive: only fixes data on the read path
- Data that is never read is never repaired
- Not a replacement for anti-entropy โ a complement to it
Hinted Handoff
- When a node is temporarily unavailable, the node that would have sent it data stores the write as a "hint"
- When the unavailable node returns, hints are delivered
- Reduces amount of repair needed after short outages
- Not durable: hints may be lost if the hinting node also fails
๐ SWIM Protocol Details
The problem with simple heartbeating: if node A cannot reach node B, A declares B dead. But A's network path to B might be the problem โ not B itself. This causes false positives.
SWIM's solution โ indirect probing: if A cannot reach B, A asks several other nodes (C, D, E) to try reaching B on A's behalf. If none can reach B, then B is truly suspected as failed โ not just unreachable from A's network path. This dramatically reduces false positives.
Suspicion mechanism: before declaring a node dead, SWIM marks it as suspected. Other nodes can refute the suspicion if they have recent contact. Only after a timeout without refutation is the node declared dead. Used by Consul's serf library and HashiCorp's memberlist.
Gossip scales where coordination does not. The coordinator pattern works for 5 nodes. At 1,000 nodes, you need epidemic protocols. Gossip gives you cluster membership, failure detection, and metadata propagation โ all without a single point of failure, and with constant per-node cost.
- Gossip: each node tells random peers what it knows. O(log N) convergence, O(1) per-node cost.
- Properties: scalable, fault-tolerant, eventually consistent, decentralized.
- Anti-entropy: Merkle tree comparison to find and sync diverged data. Required operational task in Cassandra.
- Read repair: stale replicas updated passively on read path. Hinted handoff covers short outages.
- SWIM indirect probing: ask peers to verify unreachable node before declaring it dead. Reduces false positives vs simple heartbeats.
- Failure detection: heartbeats (simple), phi accrual (probabilistic), SWIM (combined).
- Used by: Cassandra (membership), Consul (service discovery), DynamoDB (internal).
Consistent Hashing Deep Dive
When you distribute data across N servers using modulo hashing (key % N), adding or removing one server forces nearly all keys to remap โ a catastrophic cache miss storm or massive data migration. Consistent hashing solves this elegantly: when a node joins or leaves, only 1/N of keys move. It is the foundation of every distributed cache (Memcached), every distributed database (Cassandra, DynamoDB), and every CDN (Akamai). Understanding it is non-negotiable for distributed systems work.
- key % N โ server assignment
- Add 1 server (NโN+1): ~100% of keys remap
- Cache stampede: all clients miss simultaneously
- Database: massive data migration on topology change
- Unacceptable at scale
- Hash ring: servers and keys mapped to positions on a circle
- Key assigned to next server clockwise on ring
- Add 1 server: only 1/N of keys remap (neighbors only)
- Minimal disruption on topology change
- Foundation of all distributed data systems
Virtual nodes solve the imbalance problem. With only a few physical nodes on the ring, data distribution is uneven โ a server assigned a large arc receives proportionally more traffic and stores more data than its neighbors. This is not theoretical; with 3 random positions, arcs are almost certainly unequal. Virtual nodes place each physical server at 100โ200 positions (vnodes). The key space is now divided into hundreds of small arcs โ statistical averaging ensures each physical server ends up with roughly equal load. Additional benefit: when a physical server is removed, its virtual nodes are distributed among all remaining servers โ load spreads evenly, instead of falling on a single clockwise neighbor.
๐ Rendezvous Hashing (Highest Random Weight)
An alternative to consistent hashing that is simpler to implement for small to medium node sets. For a given key, compute a random weight for each server (hash of key+server_id as seed). Assign the key to the server with the highest weight. When a server is added or removed, only ~1/N of keys remap โ same property as consistent hashing.
When to use: rendezvous hashing is simpler and has better baseline distribution than consistent hashing without virtual nodes. Preferred when node count is small and does not change frequently. Consistent hashing with vnodes is preferred for large clusters where minimal remapping on topology change is critical.
๐๏ธ Consistent Hashing in Practice
- Cassandra: 256 vnodes per node (default). Partition key hashed with Murmur3. Adding a node automatically receives vnodes from existing nodes โ data migrates in parallel from multiple sources.
- Redis Cluster: 16,384 fixed hash slots instead of a traditional ring. Each node is responsible for a range of slots. Slots manually or automatically reassigned on topology change. Fixed slot count simplifies implementation.
- DynamoDB: consistent hashing internally with a coordinate-based approach, but abstracted from users. You specify a partition key and DynamoDB manages the ring โ the partition key hash determines which storage node holds your data.
Consistent hashing is the answer whenever you need to distribute anything across a variable number of nodes. Cache sharding, database partitioning, load balancing, CDN routing โ wherever adding/removing a node should not cause a full redistribution. Virtual nodes make it production-ready.
- Modulo hashing: add/remove node โ ~100% keys remap. Unacceptable at scale.
- Consistent hashing: ring-based. Add/remove node โ only 1/N keys move.
- Hotspot problem: without vnodes, unequal arc sizes cause uneven load. Virtual nodes solve through statistical averaging.
- Virtual nodes: 100โ200 per physical server. Even distribution, heterogeneous hardware support. Load spreads to all nodes on removal.
- Rendezvous hashing: simpler alternative for small node sets. Same 1/N remapping, better baseline distribution without vnodes.
- In practice: Cassandra: 256 vnodes/Murmur3. Redis Cluster: 16,384 fixed slots. DynamoDB: abstracted internally.
No Global Clock Exists
- NTP: ~1ms datacenter, 50-100ms internet. Not enough for ordering
- Lamport: total order. Vector: causality + concurrency. HLC: hybrid
- Clock skew + LWW = real data loss. Use logical clocks instead
- FLP impossibility: all practical solutions are engineering compromises
Agreement Under Failure
- Raft election: randomized timeouts, majority vote, term numbers
- Log replication: Leader โ majority ack โ commit โ Followers apply
- Cluster: 3 nodes (1 fail), 5 nodes (2 fail). Beyond 7 = diminishing returns
- Read scaling: leader leases or stale follower reads
Avoid First, 2PC or Sagas Second
- Best strategy: design service boundaries to avoid cross-service tx
- 2PC: atomicity but blocking. 3PC: less blocking but partition-unsafe
- Sagas: compensating tx, non-blocking. Idempotency keys on every step
- Orchestration: easier debug. Choreography: no coordinator
Single, Multi, or Leaderless
- Sync: zero loss, slow writes. Async: fast writes, risk of loss. Semi-sync: balanced
- Single-leader: simple. Multi-leader: write scale. Leaderless: quorum W+R>N
- Read-after-write: route reads to primary for 60s after user writes
- Every database makes one of these choices โ know yours
Epidemic Spread, O(log N)
- Each node tells random peers โ exponential spread, O(1) per-node
- Anti-entropy: Merkle tree repair. Read repair: passive on read path
- SWIM: indirect probing โ ask peers before declaring node dead
- Hinted handoff covers short outages. Cassandra, Consul
Add Node โ Only 1/N Keys Move
- Hotspot: without vnodes, unequal arcs = uneven load
- Virtual nodes: 100-200 per server, statistical balance, even removal
- Rendezvous hashing: simpler for small node sets, same 1/N remapping
- Cassandra: 256 vnodes/Murmur3. Redis: 16,384 slots. DynamoDB: abstracted