Skip to content
Lai Yingchun edited this page Jan 25, 2018 · 10 revisions

Gossip

The high level Cassandra gossip protocol is explained here:

The Origin builds Gossip on top of Messaging Service which is in turn on top of TCP/IP sockets. In scylla, we also build Gossip on top of Messaging Service. However, scylla's Messaging Service builds on top of RPC infrastructure. As a result, we have a slightly different way of handling Gossip incoming and outgoing messages.

The basic data structure

Each node in a cluster maintains a map of every nodes it knows about including itself.

std::map<inet_address, endpoint_state> endpoint_state_map

All information about a node is stored in endpoint_state:

class endpoint_state {
     heart_beat_state _heart_beat_state;
     std::map<application_state, versioned_value> _application_state;
}

heart_beat_state contains a _generation field and _version field:

class heart_beat_state {
      int32_t _generation;
      int32_t _version;
}

_generation changes when the node is restarted. _version is updated every second in gms::gossiper::run().

gms::application_state is an enum type which defines different types of information we want to gossip around. e.g. STATUS, LOAD, HOST_ID, ...

gms::versioned_value is used to represents the value of each gms::aplication_state.

The Gossip message

Gossip message exchanging looks very similar to the TCP three-way handshaking. Every round of gossip between two nodes consists of 3 messages.

1) SYN message

class gossip_digest_syn {
     sstring _cluster_id;
     sstring _partioner;
     std::vector<gossip_digest> _digests;
}

class gossip_digest {
     inet_address _endpoint;
     int32_t _generation;
     int32_t _max_version;
}

When A sends a SYN message to B, A starts a gossip round.

The SYN message basically tells B that A knows the nodes in _digests.

2) ACK message

class gossip_digest_ack {
     std::vector<gossip_digest> _digests;
     std::map<inet_address, endpoint_state> _map;
}

When B receives SYN message from A, it compares against the local endpoint_state_map and figures out:

  • What A knows but B doesn't. This information is stored in gossip_digest_ack::_digests

  • What B knows but A doesn't. This information is stored in gossip_digest_ack::_map

Then, B sends a ACK message back to A.

3) ACK2 message

class gossip_digest_ack2 {
     std::map<inet_address, endpoint_state> _map;
}

When A receives a ACK message from B. A applies the application_state B tells A and send back what B wants to know by sending a ACK2 message.

Now, node A and node B should have the same information.

How to use Gossip service

1) Add application state to the cluster

First, grab a Gossiper instance. Second, add the application_state you want to Gossip around. For example, to add Add the LOAD application_state.

auto& gossiper = gms::get_local_gossiper();
auto state = gms::application_state::LOAD;
auto value = gms::versioned_value::versioned_value_factory::load(load);
gossiper.add_local_application_state(state, value);

The added appliation_state will be replicated around within the cluster.

2) Subscribe to endpoint_state change

If a class is interested in endpoint_state change, it needs to inherits class i_endpoint_state_change_subscriber which defines various callbacks to be called when endpoint_state changes. For example:

class foo : public i_endpoint_state_change_subscriber {
	register() {
		auto& gossiper = gms::get_local_gossiper();
		gossiper.register_(this);
	}
	virtual void on_change(inet_address endpoint, application_state state, versioned_value value) override {
       // Your callback here.
    }
	virtual void on_remove(inet_address endpoint) override {
	}
	virtual void on_dead(inet_address endpoint, gms::endpoint_state state) override {
	}
	virtual void on_restart(inet_address endpoint, endpoint_state state) override {
	}
	...
}

More internal of Gossip

How gossip tracks live member

gossiper::_live_endpoints does not include itself. gossiper::endpoint_state_map includes itself.

How many messages each Gossip round generate

  • gossiper::make_random_gossip_digest prepares a std::vector<gossip_digest> digests from gossiper::endpoint_state_map
  • A SYN message is cooked from diegests
  • gossiper::do_gossip_to_live_member picks one of the node in 'gossiper::_live_endpoints' randomly and sends the SYN message to it
  • gossiper::do_gossip_to_unreachable_member picks one of the node in 'gossiper::_unreachable_endpoints' randomly and sends the SYN message to it if the following conditions are met:
    • unreachable_endpoint_count > 0
    • a random number rand_dbl between 0 and 1 is generated and rand_dbl < unreachable_endpoint_count / (live_endpoint_count + 1)
  • gossiper::do_gossip_to_seed picks one of the node in 'gossiper::_seeds' randomly and sends the SYN message to it if any of the following condition is met:
    • live_endpoint_count == 0
    • a random number rand_dbl between 0 and 1 is generated and rand_dbl < seeds_count / (live_endpoint_count + unreachable_endpoint_count)

So, in each gossip round there will be 1-3 SYN messages sending to 1-3 nodes.

Sharding

There is one gossiper instance per node and it runs on cpu0 only. We can not guarantee there will always be a core to core tcp connection within messaging service, so messaging service needs to listen on all cpus. When a remote node connects to local node with a connection bound to cpu other than cpu0, we need to forward this message to cpu0. Thus, we will call gossiper::init_messaging_service_handler on all the cpus to register callbacks in gossiper::start, so that we listen on all the cpus. For instance, the SYN verb handler looks like:

ms().register_handler(messaging_verb::GOSSIP_DIGEST_SYN, [] (gossip_digest_syn syn_msg) {
    return smp::submit_to(0, [syn_msg = std::move(syn_msg)] () mutable {
        auto& gossiper = gms::get_local_gossiper();
        return gossiper.handle_syn_msg(std::move(syn_msg));
    });
});

How failure detector works

1) check

gossiper::do_status_check() runs every second. It checks all the nodes in gossiper::endpoint_state_map. For each node, it calls failure_detector::interpret(endpoint), which calls double phi = arrival_window::phi(now) to get a phi number.

class arrival_window {
	....
    utils::bounded_stats_deque _arrival_intervals;
    ....
}
double arrival_window::phi(long tnow) {
    long t = tnow - _tlast;
    return t / mean();
}
double arrival_window::mean() {
    return _arrival_intervals.mean();
    
}

_arrival_intervals is of type utils::bounded_stats_deque which is a bounded statistic queue. In failure detector, the queue size is 1000. So, the failure detector will remember the most recent 1000 samples of the internal time.

gossiper::apply_state_locally and gossiper::notify_failure_detector will call failure_detector::report to add samples to std::map<inet_address, arrival_window> failure_detector::_arrival_samples.

For instance, assume local node talks to a remote node at timestamp: 1s, 1.2s, 1.5s, 1.8s, then _arrival_intervals will contain 3 samples: 0.2s, 0.3s, 0.3s. The mean is 0.27s. When timestamp goes to 2s, we have t = 2.0 - 1.8 = 0.2s, so phi = 0.2 / 0.27 = 0.74.

Then, it checks if a node is down using the method below. If ture, it calls gossiper::convict to mark the node is dead.

if (PHI_FACTOR * phi > get_phi_convict_threshold()) {
    for (auto& listener : _fd_evnt_listeners) {
        listener->convict(ep, phi);
    }
}
static constexpr const double PHI_FACTOR{1.0 / std::log(10.0)} == 0.434
By default, get_phi_convict_threshold() == 8

In the above example, if the remote is dead after timestamp 1s, 1.2s, 1.5s, 1.8s. In order to detect it, we need phi > 8 / 0.434 = 18 and phi = (tnow - tlast) / mean = (tnow - 1.8 ) / 0.27, then we need (tnow - 1.8) > 18 * 0.27 = 4.86s. So around 5 seconds later, we will notice remote node is dead.

2) update

gossiper.notify_failure_detector() is called when remote node tell local node some new information about other nodes, i.e., ACK message's callback and ACK2 message's callback. In gossiper::notify_failure_detector, if remote node tells local node that a endpoint's 1) generation is newer or 2) generation is the same but heart beat version is newer, then local node will update the endpoint's timestamp by calling endpoint_state::update_timestamp() and call failure_detector::report(endpoint) to update failure detector's arrival sample.

void failure_detector::report(inet_address ep) {
    long now = db_clock::now().time_since_epoch().count();
    auto it = _arrival_samples.find(ep);
    if (it == _arrival_samples.end()) {
        // avoid adding an empty ArrivalWindow to the Map
        auto heartbeat_window = arrival_window(SAMPLE_SIZE);
        heartbeat_window.add(now);
        _arrival_samples.emplace(ep, heartbeat_window);
    } else {
        it->second.add(now);
    }
}
class failure_detector {
   ...
	std::map<inet_address, arrival_window> _arrival_samples;
	...
}
void arrival_window::add(long value) {
   ...
   long inter_arrival_time = value - _tlast;
   _arrival_intervals.add(inter_arrival_time);
   ...
}

Please note, imagine a situation that:

  • Node A can talk to Node B
  • Node B can talk to Node C
  • Node A can not talk to Node C due to whaterver network issue

Will Node A think Node C be dead? No, becasue Node A maintains Node C's arriaval samples and the sample will be updated when Node B tells Node A about Node C. To be more specific, when Node A sends a SYN message to Node B, Node B will send a ACK message back to Node A containing Node C's endpoint_state information with the endpoint_state::_heart_beat_state newer than Node A knows.

Links

Clone this wiki locally