Skip to content

Network Topology Configuration and Quering

Botond Dénes edited this page Nov 3, 2017 · 32 revisions

How it goes in Cassandra?

One of the important features in Cassandra is clustering. This includes spreading the data among different Nodes and creating replicas of the same data on different Nodes. Read this[1] article for more details on a Cassandra design in general and this[2] one for a brief explanation on how data is distributed across the cluster. And in order to start clustering first thing to do is to define the set of Nodes to spread the data on.

The main components that define the Nodes configuration are:

  • cassandra.yaml - contains the configuration of the current Node such as:
    • listen_address and listen_interface - address or interface to bind to and tell other Cassandra nodes to connect to.
    • seed_provider - addresses of hosts that are deemed contact points.
  • cassandra-topology.properties[3] - defines which Nodes belong to which Racks and Data Centers. It contains the information for the whole cluster.
  • cassandra-rackdc.properties[4] - contains the Data Center and Rack name of the local node. This information will be propagated to other Nodes of the cluster via gossip.
  • _Snitch_es - this is a family of classes that implement the topology metric and are used for querying the relative distance between Nodes. The most interesting feature of _snitch_es is a dynamic snitching[5].

Types of snitches

There are a few types of snitches that differ in the way they are initialized. After the initialization phase they all switch to a dynamic snitching.

The main abstractions that define the topology are:

  • Node's IP address
  • Rack - contains a few Nodes
  • Data Centre - contains a few Racks

Beyond that snitches differ in the way they parse the input configuration. After that all non-trivial snitches (trivial snitches are a SimpleSnitch and RackInferringSnitch) broadcast the information about the local Node using Gossip.

Every snitch has to implement the following interface:

public interface IEndpointSnitch
{
    /**
     * returns a String repesenting the rack this endpoint belongs to
     */
    public String getRack(InetAddress endpoint);

    /**
     * returns a String representing the datacenter this endpoint belongs to
     */
    public String getDatacenter(InetAddress endpoint);

    /**
     * returns a new <tt>List</tt> sorted by proximity to the given endpoint
     */
    public List<InetAddress> getSortedListByProximity(InetAddress address, Collection<InetAddress> unsortedAddress);

    /**
     * This method will sort the <tt>List</tt> by proximity to the given address.
     */
    public void sortByProximity(InetAddress address, List<InetAddress> addresses);

    /**
     * compares two endpoints in relation to the target endpoint, returning as Comparator.compare would
     */
    public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2);

    /**
     * called after Gossiper instance exists immediately before it starts gossiping
     */
    public void gossiperStarting();

    /**
     * Returns whether for a range query doing a query against merged is likely
     * to be faster than 2 sequential queries, one against l1 followed by one against l2.
     */
    public boolean isWorthMergingForRangeQuery(List<InetAddress> merged, List<InetAddress> l1,  List<InetAddress> l2);
}

Snitches retrieve the topology information in different places:

  • SimpleSnitch - from nowhere. It equalises all Nodes.
  • RackInferringSnitch - from nowhere. It retrieves the Rack and DC info from the input IPs themselves.
  • PropertyFileSnitch - holds the topology map inside the snitch instance.
  • GossipingPropertyFileSnitch - contains a PropertyFileSnitch instance but retrieves the topology info in the following order (moves to the next source if previous is not available):
  1. From the Gossiper.endpointStateMap.
  2. From PropertyFileSnitch.
  3. From SystemKeyspace using the following query: SELECT peer, data_center, rack from system.peers
  • EC2Snitch, EC2MultiRegionSnitch, GoogleCloudSnitch, CloudstackSnitch - like GossipingPropertyFileSnitch except for source (2).

The Scylla way

  • We will implement a pure virtual class snitch_base that implements all methods but getRack() and getDatacenter().
  • All snitches will inherit from it:
  • SimpleSnitch and RackInferringSnitch will implement the missing methods the same way they do it in C*.
  • For others we will implement them to always retrieve the information from sources (1) and (3) as described above.
  • PropertyFileSnitch will behave similarly to the GossipingPropertyFileSnitch and will broadcast the information about itself via Gossip. The rest of the cluster topology will be eventually received via gossip. Since property file has to be the same on all Nodes, the current Node is going to eventually have exactly the same information about the Nodes as if it read it from the local property file. However this would allow us not to implement a "special" flow for the PropertyFileSnitch and utilize the same flow we are going to use for more advanced snitches.

Like in C* the snitch_base sons will be responsible for parsing the corresponding inputs depending on their types.

Dynamic snitching

Dynamic snitching (scores update) is maybe the most complicated and CPU-consuming part of the snitching flow.
As described in [5], the dynamic snitching is relevant only for reads since for writes they "send them all and block until the consistency level is achieved". The dynamic snitch handles the task of choosing the best possible replica to ask by monitoring the performance of reads from the various replicas and choosing the best one based on this history.

The main challenges are:

  1. Define the goodness metric as exact as possible.
  2. Don't waist too much resources in order to achieve (1).
  3. React on the topology and goodness metric changes as fast as possible.

To achieve the above Cassandra is using its proprietary scoring method based on the periodic sampling of the read queries from the nodes.

  • Scores are updated once per sampling period (100 msec).
  • Number of samples per sampling period is caped by some number (10K).
  • Every specified time period (10 min) the scores are reset in order to re-sample all Nodes in case some "bad" Nodes have recovered.
Clone this wiki locally