Gossip
The high level Cassandra gossip protocol is explained here:
The Origin builds Gossip on top of Messaging Service which is in turn on top of TCP/IP sockets. In scylla, we also build Gossip on top of Messaging Service. However, scylla's Messaging Service builds on top of RPC infrastructure. As a result, we have a slightly different way of handling Gossip incoming and outgoing messages.
Each node in a cluster maintains a map of every nodes it knows about including itself.
std::map<inet_address, endpoint_state> endpoint_state_map
All information about a node is stored in endpoint_state:
class endpoint_state {
heart_beat_state _heart_beat_state;
std::map<application_state, versioned_value> _application_state;
}
heart_beat_state
contains a _generation
field and _version
field:
class heart_beat_state {
int32_t _generation;
int32_t _version;
}
_generation
changes when the node is restarted. _version
is updated every second in gms::gossiper::run()
.
gms::application_state
is an enum type which defines different types of information we want to gossip around. e.g. STATUS, LOAD, HOST_ID, ...
gms::versioned_value
is used to represents the value of each gms::aplication_state
.
Gossip message exchanging looks very similar to the TCP three-way handshaking. Every round of gossip between two nodes consists of 3 messages.
class gossip_digest_syn {
sstring _cluster_id;
sstring _partioner;
std::vector<gossip_digest> _digests;
}
class gossip_digest {
inet_address _endpoint;
int32_t _generation;
int32_t _max_version;
}
When A sends a SYN message to B, A starts a gossip round.
The SYN message basically tells B that A knows the nodes in _digests
.
class gossip_digest_ack {
std::vector<gossip_digest> _digests;
std::map<inet_address, endpoint_state> _map;
}
When B receives SYN message from A, it compares against the local endpoint_state_map and figures out:
-
What A knows but B doesn't. This information is stored in
gossip_digest_ack::_digests
-
What B knows but A doesn't. This information is stored in
gossip_digest_ack::_map
Then, B sends a ACK message back to A.
class gossip_digest_ack2 {
std::map<inet_address, endpoint_state> _map;
}
When A receives a ACK message from B. A applies the application_state B tells A and send back what B wants to know by sending a ACK2 message.
Now, node A and node B should have the same information.
First, grab a Gossiper instance. Second, add the application_state
you want to Gossip around. For example, to add Add the LOAD application_state
.
auto& gossiper = gms::get_local_gossiper();
auto state = gms::application_state::LOAD;
auto value = gms::versioned_value::versioned_value_factory::load(load);
gossiper.add_local_application_state(state, value);
The added appliation_state will be replicated around within the cluster.
If a class is interested in endpoint_state change, it needs to inherits class i_endpoint_state_change_subscriber
which
defines various callbacks to be called when endpoint_state changes. For example:
class foo : public i_endpoint_state_change_subscriber {
register() {
auto& gossiper = gms::get_local_gossiper();
gossiper.register_(this);
}
virtual void on_change(inet_address endpoint, application_state state, versioned_value value) override {
// Your callback here.
}
virtual void on_remove(inet_address endpoint) override {
}
virtual void on_dead(inet_address endpoint, gms::endpoint_state state) override {
}
virtual void on_restart(inet_address endpoint, endpoint_state state) override {
}
...
}
gossiper::_live_endpoints
does not include itself. gossiper::endpoint_state_map
includes itself.
-
gossiper::make_random_gossip_digest
prepares astd::vector<gossip_digest> digests
fromgossiper::endpoint_state_map
- A SYN message is cooked from diegests
-
gossiper::do_gossip_to_live_member
picks one of the node in 'gossiper::_live_endpoints' randomly and sends the SYN message to it -
gossiper::do_gossip_to_unreachable_member
picks one of the node in 'gossiper::_unreachable_endpoints' randomly and sends the SYN message to it if the following conditions are met:unreachable_endpoint_count > 0
- a random number
rand_dbl
between 0 and 1 is generated andrand_dbl
<unreachable_endpoint_count / (live_endpoint_count + 1)
-
gossiper::do_gossip_to_seed
picks one of the node in 'gossiper::_seeds' randomly and sends the SYN message to it if any of the following condition is met:live_endpoint_count == 0
- a random number
rand_dbl
between 0 and 1 is generated andrand_dbl
<seeds_count / (live_endpoint_count + unreachable_endpoint_count)
So, in each gossip round there will be 1-3 SYN messages sending to 1-3 nodes.
There is one gossiper instance per node and it runs on cpu0 only. We can not guarantee there will always be a core to core tcp connection within messaging service, so messaging service needs to listen on all cpus. When a remote node connects to local node with a connection bound to cpu other than cpu0, we need to forward this message to cpu0. Thus, we will call gossiper::init_messaging_service_handler
on all the cpus to register callbacks in gossiper::start
, so that we listen on all the cpus. For instance, the SYN verb handler looks like:
ms().register_handler(messaging_verb::GOSSIP_DIGEST_SYN, [] (gossip_digest_syn syn_msg) {
return smp::submit_to(0, [syn_msg = std::move(syn_msg)] () mutable {
auto& gossiper = gms::get_local_gossiper();
return gossiper.handle_syn_msg(std::move(syn_msg));
});
});
gossiper::do_status_check()
runs every second. It checks all the nodes in gossiper::endpoint_state_map
. For each node, it calls failure_detector::interpret(endpoint)
, which calls double phi = arrival_window::phi(now)
to get a phi
number.
class arrival_window {
....
utils::bounded_stats_deque _arrival_intervals;
....
}
double arrival_window::phi(long tnow) {
long t = tnow - _tlast;
return t / mean();
}
double arrival_window::mean() {
return _arrival_intervals.mean();
}
_arrival_intervals
is of type utils::bounded_stats_deque
which is a bounded statistic queue. In failure detector, the queue size is 1000. So, the failure detector will remember the most recent 1000 samples of the internal time.
gossiper::apply_state_locally
and gossiper::notify_failure_detector
will call failure_detector::report
to add samples to std::map<inet_address, arrival_window> failure_detector::_arrival_samples
.
For instance, assume local node talks to a remote node at timestamp: 1s, 1.2s, 1.5s, 1.8s, then _arrival_intervals
will contain 3 samples: 0.2s, 0.3s, 0.3s. The mean is 0.27s. When timestamp goes to 2s, we have t = 2.0 - 1.8 = 0.2s, so phi = 0.2 / 0.27 = 0.74.
Then, it checks if a node is down using the method below. If ture, it calls gossiper::convict
to mark the node is dead.
if (PHI_FACTOR * phi > get_phi_convict_threshold()) {
for (auto& listener : _fd_evnt_listeners) {
listener->convict(ep, phi);
}
}
static constexpr const double PHI_FACTOR{1.0 / std::log(10.0)} == 0.434
By default, get_phi_convict_threshold() == 8
In the above example, if the remote is dead after timestamp 1s, 1.2s, 1.5s, 1.8s. In order to detect it, we need phi > 8 / 0.434 = 18 and phi = (tnow - tlast) / mean = (tnow - 1.8 ) / 0.27, then we need (tnow - 1.8) > 18 * 0.27 = 4.86s. So around 5 seconds later, we will notice remote node is dead.
gossiper.notify_failure_detector()
is called when remote node tell local node some new information about other nodes, i.e., ACK message's callback and ACK2 message's callback. In gossiper::notify_failure_detector
, if remote node tells local node that a endpoint's 1) generation is newer or 2) generation is the same but heart beat version is newer, then local node will update the endpoint's timestamp by calling endpoint_state::update_timestamp()
and call failure_detector::report(endpoint)
to update failure detector's arrival sample.
void failure_detector::report(inet_address ep) {
long now = db_clock::now().time_since_epoch().count();
auto it = _arrival_samples.find(ep);
if (it == _arrival_samples.end()) {
// avoid adding an empty ArrivalWindow to the Map
auto heartbeat_window = arrival_window(SAMPLE_SIZE);
heartbeat_window.add(now);
_arrival_samples.emplace(ep, heartbeat_window);
} else {
it->second.add(now);
}
}
class failure_detector {
...
std::map<inet_address, arrival_window> _arrival_samples;
...
}
void arrival_window::add(long value) {
...
long inter_arrival_time = value - _tlast;
_arrival_intervals.add(inter_arrival_time);
...
}
Please note, imagine a situation that:
- Node A can talk to Node B
- Node B can talk to Node C
- Node A can not talk to Node C due to whaterver network issue
Will Node A think Node C be dead? No, becasue Node A maintains Node C's arriaval samples and the sample will be updated when Node B tells Node A about Node C. To be more specific, when Node A sends a SYN message to Node B, Node B will send a ACK message back to Node A containing Node C's endpoint_state information with the endpoint_state::_heart_beat_state
newer than Node A knows.
- [ArchitectureGossip] (http://wiki.apache.org/cassandra/ArchitectureGossip)
- Gossip Protocol in Cassandra
- [Cassandra Internals — Understanding Gossip] (https://www.youtube.com/watch?v=FuP1Fvrv6ZQ&list=PLqcm6qE9lgKJkxYZUOIykswDndrOItnn2&index=49)
- [The φ Accrual Failure Detector] (http://ddg.jaist.ac.jp/pub/HDY+04.pdf)