Skip to content
Tzach Livyatan edited this page Oct 2, 2018 · 46 revisions

This document focus on Apache Cassandra repair.

For latest on Scylla repair see https://docs.scylladb.com/operating-scylla/nodetool-commands/repair/ and https://docs.scylladb.com/operating-scylla/manager/

Introduction

Cassandra keeps several replicas (the so-called replication factor) for each key range, on several different machines. These replicas are in danger of becoming slightly out of sync with each other, e.g., when one replica is briefly unavailable while writes are done to the other replicas, or when a transient network problem caused a write not to reach one replica, despite being ready and willing to accept requests.

Cassandra employs three mechanisms to solve this problem:

  1. Hinted Handoff: One node remembers for an unavailable node the new data it couldn't send to it, and sends it again when the node becomes available again.
  2. Read Repair: For a percentage of the queries, Cassandra sends the query to several replicas, and if they send different responses, we determine the most up-to-date data (using the timestamps of data and tombstones) and repair the others. The actual implementation is more elaborate - only one replica sends a full response and the others (initially) just send digests - see http://wiki.apache.org/cassandra/ReadRepair for more information.
  3. Repair, also known as (in various sources) Anti-Entropy Repair, Active Repair, Async Repair, or Node Repair. The replicas compare their full data (or parts of it), and the most up-to-date version of each piece of data is reconciled and saved at each replica. While the previous approaches can fix most problems, this third mechanism can solve problems the first two can't. This repair mechanism is the topic of this document.

When to repair

In Cassandra, repairs are not done automatically - they need to be done manually (or scripted) using nodetool. However, the Cassandra best-practice manuals (see this) recommend running a repair at regular intervals.

One of the reasons why routine repair is important is an undesirable side-effect of repair - "resurrection" of deleted data: Cassandra keeps tombstones for deleted data, but deletes those tombstones themselves after some period of gc_grace (e.g., 1 week) has elapsed. If we do repairs more rarely than 1 week apart, it is possible that some replicas deleted data and also the tombstone, but another replica missed the deletion and still has this data live. In that case, the repair process (not seeing the tombstones which have already been expunged) will "resurrect" the already-deleted data. For this reason it is important to do repairs at least more often than gc_grace. Note: in uses cases (such as "time series") where deletion only happens through a TTL defined at the time of insertion and where the clocks are syncrhonized, repair is not necessary.

Another case where routine node repair is recommended is for write-mostly data: Read repair can often repair problems, but as its name suggest, only happens for data which is read frequently.

Routine repair is not urgent, and can be scheduled to low-usage hours.

Node repair is also recommended on a node that comes up after having been down for a longer period than Hinted Handoff normally keeps hints for, as define in config:

max_hint_window_in_ms: 10800000 # 3 hours

How repair works

Cassandra's repair mechanism is based on the one described in the Amazon Dynamo paper.

When repairing a column family (table), Cassandra builds a Merkle tree for the data in each replica, with which it can efficiently decide which parts of the data are different between the replicas.

Building the trees is done only at time of repair, and is a fairly lengthy and I/O intensive process similar to compaction - the so-called "validation compaction" is basically the read part of a "major compaction" (where all the sstables are compacted). Following that is another potentially intensive period of reading the disagreeing data and streaming it over the network between the nodes.

This page describes an idea of how Cassandra could save the Merkle trees (or something more resembling a bloom filter or a cardinality estimator) per sstable, and avoid the need for a lengthy "validation compaction" on every repair. This was never done in Cassandra, maybe we should do it in the future.

In Cassandra, different nodes do not replicate exactly the same set of data, but rather each node holds a set of partition ranges, and each partition range might be replicated on a different set of nodes. So repair happens for each range separately: the initiator finds the set of peers holding replicas of this range, tells each of them to prepare a Merkle tree, and finally compares them, and when finding a difference, it tells the conflicting nodes to stream the data in the conflicting key range.

Cassandra uses a Merkle tree of fixed size - 2^15 = 32768 leaf nodes. So if a node has a million partitions, 30 partitions will be streamed for each partition in need of repair, resulting in a lot of streaming and on the target node, storage and compaction time. This problem is known as overstreaming (see this), and can be solved with subrange repair or incremental repair (see below).

Repair options

nodetool repair has several options that determine what gets repaired, and how:

  1. Partitioner Range repair (-pr or --partitioner-range): repair only the primary partition range on this node (not data for which this node is a replica). This is useful for running routine repair on each node on a separate schedule - so we end up repairing each partition range only once. But it's not useful for other repair scenarios.
  2. Sequential repair (the default, called "snapshot" repair in the past): When a range has three replicas, don't do the repair on all three at once, but rather take a "snapshot" (hard link) of the three, and then perform the validation compaction (Merkle tree building) sequentially, one node at a time, and then do repairs in pairs. The benefit is that at any time, one of the three replicas is not busy doing the repair - so it maintains its top performance, and the dynamic snitch can divert most of requests to it.
  3. Parallel repair (-par): Build all Merkle trees in parallel, and then do all the repair in parallel. Can finish faster than sequential repair, but slow down all replicas at the same time thereby potentially hurting access performance during the repair.
  4. Incremental repair (-inc). The new way - see below.
  5. Subrange repair (-st <start-token> -et <end-token>). See this.

Incremental Repair

Incremental Repair is a new repair technique introduced in Cassanra 2.1, and described here, here, and here.

Incremental repair works by building separate SSTables for data which has already been repaired, and data which still needs repair (hopefully, a small subset of the full data). An "anticompaction" process splits an "unrepaired" sstable into a part belonging to a range that we just repaired (and therefore should move to a "repaired" sstable), and the rest. Additionally, the compaction strategies are modified to uphold the repaired-unrepaired segregation; For example, compaction will always compact repaired sstables with other repaired sstables, and unrepaired with unrepaired. For leveled-compaction, Cassandra will use leveled compaction only for the repaired sstables (which hopefully should be most of the data), and use size-tiered compaction for the unrepaired sstables. See the above links for more information.

The relevant code

Most of the the repair code is in the repair package, though some is outside. Here is a description of the relevant classes (some of these descriptions are excerpts from comments in the code).

service.StorageService has methods repairAsync(), forceRepairAsync(), forceRepairRangeAsync(), forceTerminateAllRepairSessions(). These are used by manual operations (like NodeTool). They use a service.ActiveRepairService singleton:

service.ActiveRepairService is the starting point for manual repairs. Each user-triggered repair will correspond to one or multiple repair session (see RepairSession), one for each token range to repair. One repair session might repair multiple column families. For each of those column families, the repair session will request Merkle trees for each replica of the range being repaired, diff those trees upon receiving them, schedule the streaming of the parts to repair (based on the tree diffs) and wait for all those operations. The creation of a repair session is done through the submitRepairSession() function that returns a future on the completion of that session.

A given RepairSession repairs a set of replicas for a given range on a list of column families. For each of the column family to repair, RepairSession creates a RepairJob that handles the repair of that CF.

A given RepairJob has the 2 main phases:

  1. Validation phase, ValidationTask: this phase requests Merkle trees from each of the replica involves and waits until all trees are received (in RepairSession.validationComplete()). Merkle trees are kept in a TreeResponse object.
  2. Synchronization phase, SyncTask: once all trees are received, this phase compares each tree with all the other using a SyncTask. If there is difference between two trees, the concerned SyncTask will start a streaming of the difference between the two endpoint concerned. There are two kind of SyncTask, depending on whether one of the two endpoints is local: LocalSyncTask and RemoteSyncTask.

The job is done once all its SyncTasks are done (i.e. have either computed no differences or the streaming they started is done (RepairSession.syncComplete()). RepairJob returns a RepairResult, a container for a a RepairJobDesc (a simple descriptor for a RepairJob) and a list of SyncStat objects, each counting the number of differences in a pair of nodes (NodePair holds a pair of node addresses).

A RepairJob has three modes, determined by the RepairParallelism enum: SEQUENTIAL ("sequential"), PARALLEL ("parallel") or DATACENTER_AWARE ("dc_parallel").

In sequential mode, a given session will execute the first phase (validation phase) of each of its jobs sequentially. In other words, it will start the first job and only start the next one once that first job validation phase is complete. This is done so that the replica only creates one Merkle tree at a time, which is our way to ensure that such creation starts roughly at the same time on every node (see CASSANDRA-2816). However the synchronization phases are allowed to run concurrently (with each other and with validation phases).

If sequential, it will requests Merkle tree creation from each replica in sequence (though in that case we still first send a message to each node to flush and snapshot data so each merkle tree creation is still done on similar data, even if the actual creation is not done simultaneously - see CASSANDRA-2816; SnapshotTask creates the required snapshots). If not sequential, all Merkle tree are requested in parallel. Similarly, if a job is sequential, it will handle one SyncTask at a time, but will handle all of them in parallel otherwise.

RepairSession eventually returns a RepairSessionResult, mainly a collection of RepairResult (the results of the several RepairJob, desscribed above).

The RepairMessageVerbHandler class handles the repair messages between nodes for a given RepairJobDesc:

  • PREPARE_MESSAGE (messages.PrepareMessage) - opens column families, and register repair in service.ActiveRepairService.
  • SNAPSHOT - create snapshots of sstables
  • VALIDATION_REQUEST (messages.ValidationRequest) - creates Validator and passes it to compaction.CompactionManager's submitValidation(). The Validator is a builder of a Merkle tree for a column family. It's lifecycle: 1. prepare() - Initialize tree with samples. 2. add() - 0 or more times, to add hashes to the tree.3. complete() - Enqueues any operations that were blocked waiting for a valid tree.
  • SYNC_REQUEST (messages.SyncRequest) - creates a StreamingRepairTask. That performs data streaming between two remote replica which neither is not repair coordinator. Task will send messages. SyncComplete message back to coordinator upon streaming completion.
  • ANTICOMPACTION_REQUEST (messages.AnticompactionRequest) calls ActiveRepairService's doAntiCompaction(), which ends up running CompactionManager's submitAntiCompaction.

The messages.RepairMessage class is the base class for the internal repair-related messages in the repair.messages packages: AnticompactionRequest, PrepareMessage, SnapshotMessage, SyncComplete, SyncRequest, ValidationComplete, ValidationRequest.

The messages.RepairOption class describes the asynchronous repair request, sent by NodeTool and processed by StorageService.

utils.MerkleTree.java is an implementation of the Merkle Tree algorithm.

Launching and tracking repair via JMX

Repair can be launched via JMX using org.apache.cassandra.service.repairAsync [8], which returns a small integer id for tracking the outcome of this repair command.

To find out when a repair is done, one can subscribe to JMX notification on the same MBean on type "repair"[9]. The notification object contains a human-readable message, and two integers: the command id (the id previously returned by repairAsync), and the command's status (see the enum ActiveRepairService.Status - it can be STARTED, SESSION_SUCCESS, SESSION_FAILED, FINISHED or RUNNING).

If the "isTraced" repair option is off, the user will get a "STARTED" notification once at the beginning of the repair command, a "FINISHED" notification once when the repair has completed (either successfully or not). Additionally, after every RepairSession (i.e., repair of one token range) has completed, it will get a SESSION_SUCCESS or SESSION_FAILED notification, as appropriate.

When "isTraced" is on, we additionally get "RUNNING" notification with various tracing messages in them.

The Nodetool implementation of the repair using JMX (see org.apache.cassandra.tools.RepairRunner) simply checks whether it saw any SESSION_FAILED message before the final FINISHED message (it always waits for the FINISHED message before completing). So things could have been simpler if we just had one FINISHED message with a success boolean, and that's it. However, RepairRunner does another thing, which is to print all the human-consumable messages on the notifications it gets, when it gets them. This might perhaps be useful to the user, although I'm not sure this sort of very coarse "progress report" is really useful, especially for parallel repair (where all the sessions are expected to finish around the same time anyway).

TODO: also mention forceTerminateAllRepairSessions

Nodetool

nodetool repair is using both to act as a sync command line operation. Since the actual operation is async, killing the nodetool will not stop the repair.

Further reading

  1. https://wiki.apache.org/cassandra/AntiEntropy
  2. http://wiki.apache.org/cassandra/ReadRepair
  3. https://wiki.apache.org/cassandra/HintedHandoff
  4. http://www.slideshare.net/ClmentLARDEUR/deep-into-cassandra-data-repair-mechanisms
  5. http://docs.datastax.com/en/cassandra/2.1/cassandra/operations/ops_repair_nodes_c.html
  6. http://www.datastax.com/dev/blog/advanced-repair-techniques
  7. http://wiki.apache.org/cassandra/ArchitectureAntiEntropy
  8. https://github.com/apache/cassandra/blob/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java#L288
  9. https://github.com/apache/cassandra/blob/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java#L294
  10. https://issues.apache.org/jira/browse/CASSANDRA-193 - the discussion in 2009 before the repair feature was added to Cassandra. Contains several interesting alternatives to Cassandra's current repair design.
Clone this wiki locally