Skip to content
Nadav Har'El edited this page Jul 12, 2015 · 46 revisions

Introduction

Cassandra keeps several replicas (the so-called replication factor) for each key range, on several different machines. These replicas are in danger of becoming slightly out of sync with each other, e.g., when one replica is briefly unavailable while writes are done to the other replicas, or when a transient network problem caused a write not to reach one replica, despite being ready and willing to accept requests.

Cassandra employs three mechanisms to solve this problem:

  1. Hinted Handoff: One node remembers for an unavailable node the new data it couldn't send to it, and sends it again when the node becomes available again.
  2. Read Repair: For a percentage of the queries, Cassandra sends the query to several replicas, and if they send different responses, we determine the most up-to-date data (using the timestamps of data and tombstones) and repair the others. The actual implementation is more elaborate - only one replica sends a full response and the others (initially) just send digests - see http://wiki.apache.org/cassandra/ReadRepair for more information.
  3. Repair (a.k.a. Anti-Entropy Repair, or Node Repair) - the replicas compare their full data (or parts of it), and the most up-to-date version of each piece of data is reconciled and saved at each replica. While the previous approaches can fix most problems, this third mechanism can solve problems the first two can't. This repair mechanism is the topic of this document.

When to repair

In Cassandra, repairs are not done automatically - they need to be done manually (or scripted) using nodetool. However, the Cassandra best-practice manuals (see this) recommend running a repair at regular intervals.

One of the reasons why routine repair is important is an undesirable side-effect of repair - "resurrection" of deleted data: Cassandra keeps tombstones for deleted data, but deletes those tombstones after some period of gc_grace (e.g., 1 week). If we do repairs more rarely than 1 week apart, it is possible that some replicas deleted data but already erased the tombstone, but one replicas missed the deletion and still has the live data. In that case, the repair process (not seeing the tombstones which have already been expunged) will "resurrect" the already-deleted data. For this reason it is important to do repairs at least more often than gc_grace. Note: in uses cases (such as "time series") where deletion only happens through a TTL defined at the time of insertion and where the clocks are syncrhonized, repair is not necessary.

Another case where routine node repair is recommended is for write-mostly data: Read repair can often repair problems, but as its name suggest, only happens for data which is read frequently.

Routine repair is not urgent, and can be scheduled to low-usage hours.

Node repair is also recommended on a node that comes up after having been down for a longer period than Hinted Handoff normally keeps hints for.

How repair works

Cassandra's repair mechanism is based on the one described in the Amazon Dynamo paper

When repairing a column family (table), Cassandra builds a Merkle tree for the data in each replica.

Building the trees is done only at time of repair, and is a fairly lengthy and I/O intensive process similar to compaction - the so-called "validation compaction" is basically the read part of a "major compaction" (where all the sstables are compacted). Each repairs involves two major compactions: The first to build the tree ("validation compaction") and the second to read the disagreeing ranges. [TODO: this doesn't sound right to me -if there is little disagreement, the second pass shouldn't need to read the entire sstables]

In Cassandra, different nodes do not replicate exactly the same set of data, but rather each node holds a set of key ranges, and each key range might be replicated on a different set of nodes. So repair happens for each key range separately: the initiator finds the set of peers holding replicas of this range, tells each of them to prepare a Merkle tree, and finally compares them, and when finding a difference, it tells the conflicting nodes to stream the data in the conflicting key range.

Cassandra uses a Merkle tree of fixed size - 2^15 = 32768 leaf nodes. So if a node has a million partitions, 30 partitions will be streamed for each partition in need in repair, resulting in a lot of streaming and on the target node, storage and compaction time. This problem is known as overstreaming (see this), and can be solved with

Repair options

nodetool repair has several options that determine what gets repaired, and how:

  1. -pr (--partitioner-range): repair only the primary partition range on this node. Useful for running routine repair on each node on separate schedule - so we end up repairing each partition range once.
  2. sequential repair (the default, called "snapshot" repair in the past): When a range has three replicas, don't do the repair on all three at once, but rather take a "snapshot" (hard link) of the three, and then perform the validation compaction (merkle tree building) sequentially, one node at a time, and then do repairs in pairs. The benefit is that at any time, one of the three replicas is not busy doing the repair - so it maintains its top performance, and the dynamic snitch can divert most requests to it.
  3. parallel (-par)
  4. incremental repair
  5. subrange repair (-st <start-token> -et <end-token>). See this.

Incremental Repair

http://www.datastax.com/dev/blog/more-efficient-repairs http://docs.datastax.com/en/cassandra/2.1/cassandra/operations/ops_repair_nodes_c.html

The relevant code

Most of the rest of the repair code is in the repair package (see good overview comment in RepairSession.java). Some of the descriptions below are excerpts from comments in the code.

utils.MerkleTree.java is an implementation of the Merkle Tree.

service.ActiveRepairService is the starting point for manual "active" repairs. Each user triggered repair will correspond to one or multiple repair session (see RepairSession), one for each token range to repair. On repair session might repair multiple column families. For each of those column families, the repair session will request Merkle trees for each replica of the range being repaired, diff those trees upon receiving them, schedule the streaming ofthe parts to repair (based on the tree diffs) and wait for all those operation. The creation of a repair session is done through the submitRepairSession that returns a future on the completion of that session.

A given RepairSession repairs a set of replicas for a given range on a list of column families. For each of the column family to repair, RepairSession creates a RepairJob that handles the repair of that CF.

A given RepairJob has the 2 main phases:

  1. Validation phase, ValidationTask: the job requests Merkle trees from each of the replica involves and waits until all trees are received (in RepairSession.validationComplete()). Merkle trees are kept in a TreeResponse object.
  2. Synchronization phase, SyncTask: once all trees are received, the job compares each tree with all the other using a SyncTask. If there is difference between two trees, the concerned SyncTask will start a streaming of the difference between the two endpoint concerned. There are two kind of SyncTask, depending on whether one of the two endpoints is local: LocalSyncTask and RemoteSyncTask.

The job is done once all its SyncTasks are done (i.e. have either computed no differences or the streaming they started is done (RepairSession.syncComplete())). RepairJob returns a RepairResult, a simple container for a a RepairJobDesc (a simple descriptor for a RepairJob) and a list of SyncStat objects, each counting the number of differences in a pair of nodes (NodePair holds a pair of node addresses).

A given session will execute the first phase (validation phase) of each of it's job sequentially. In other words, it will start the first job and only start the next one once that first job validation phase is complete. This is done so that the replica only create one Merkle tree at a time, which is our way to ensure that such creation starts roughly at the same time on every node (see CASSANDRA-2816). However the synchronization phases are allowed to run concurrently (with each other and with validation phases).

A given RepairJob has 3 modes, determined by the RepairParallelism enum: SEQUENTIAL ("sequential"), PARALLEL ("parallel") or DATACENTER_AWARE ("dc_parallel"). If sequential, it will requests Merkle tree creation from each replica in sequence (though in that case we still first send a message to each node to flush and snapshot data so each merkle tree creation is still done on similar data, even if the actual creation is not done simultaneously). If not sequential, all Merkle tree are requested in parallel. Similarly, if a job is sequential, it will handle one SyncTask at a time, but will handle all of them in parallel otherwise and use a SnapshotTask to create the required snapshots.

RepairSession eventually returns a RepairSessionResult, mainly a collection of RepairResult (the results of the several RepairJob, desscribed above).

The RepairMessageVerbHandler class handles the repair messages between nodes for a given RepairJobDesc.

  • PREPARE_MESSAGE (messages.PrepareMessage) - opens column families, and register repair in service.ActiveRepairService
  • SNAPSHOT - create snapshots of sstables
  • VALIDATION_REQUEST (messages.ValidationRequest) - creates Validator and passes it to compaction.CompactionManager's submitValidation(). The Validator is a builder of a Merkle tree for a column family. It's lifecycle: 1. prepare() - Initialize tree with samples. 2. add() - 0 or more times, to add hashes to the tree.3. complete() - Enqueues any operations that were blocked waiting for a valid tree.
  • SYNC_REQUEST (messages.SyncRequest) - creates a StreamingRepairTask. That performs data streaming between two remote replica which neither is not repair coordinator. Task will send messages. SyncComplete message back to coordinator upon streaming completion.
  • ANTICOMPACTION_REQUEST (messages.AnticompactionRequest) calls ActiveRepairService's doAntiCompaction(), which ends up running CompactionManager's submitAntiCompaction.

The messages.RepairMessage class is the base class for the internal repair-related messages in the repair.messages packages: AnticompactionRequest, PrepareMessage, SnapshotMessage, SyncComplete, SyncRequest, ValidationComplete, ValidationRequest.

The messages.RepairOption class describes the asynchronous repair request, sent by NodeTool and processed by StorageService.

Further reading

  1. https://wiki.apache.org/cassandra/AntiEntropy
  2. http://wiki.apache.org/cassandra/ReadRepair
  3. https://wiki.apache.org/cassandra/HintedHandoff
  4. http://www.slideshare.net/ClmentLARDEUR/deep-into-cassandra-data-repair-mechanisms
  5. http://docs.datastax.com/en/cassandra/2.1/cassandra/operations/ops_repair_nodes_c.html
  6. http://www.datastax.com/dev/blog/advanced-repair-techniques
  7. http://wiki.apache.org/cassandra/ArchitectureAntiEntropy
Clone this wiki locally