Skip to content
Tomasz Grabiec edited this page Feb 24, 2015 · 33 revisions

The conceptual data model

sstables can be represented as

   sstable = map<blob, map<blob, blob>, order>

where the first blob is the row key, the second blob is the column name, and the third blob is the column value. The order parameter represents an implied ordering on keys determined by the natural ordering of tokens assigned by partitioner for given key.

thrift and CQL have different models that map to this. How each blob is formatted and sorted depends on the schema.

The CQL model is something like

   // some primitives can be null, some cannot:
   primitive = optional<int> | optional<bigint> | optional<varint>
             | varchar | ...
   composite = tuple<primitive...>
   scalar = primitive | composite
   collection = list<frozen_type>
              | map<frozen_type, frozen_type>
              | set<frozen_type>
              | tuple<frozen_type...>
   frozen_type = primitive | frozen<collection>
   type = primitive | composite | collection

   row = tuple<type>
   partition = struct {
       row static_columns;
       map<composite, row> columns;
   };
   partition_key = scalar;
   order = function<bool(partition_key)>; // "less than" total ordering determined by partitioner
   table = map<partition_key, partition, order>;

If we have

  create table foo (
     id varchar,
     color text,
     n int,
     s int static,
     x varchar,
     foo map<int, int>,
     primary key (id, color, n)
  )

this maps to

   row = struct {
        sstring x;
        map<int, int> foo;
   };

   clustering_key = struct {
          sstring color;
          int n;
   };

   partition = struct {
        int s;
        map<clustering_key, row> rows;
   };

   partition_key = sstring;

   table = map<sstring, partition, order>;

Cassandra defines mapping between the various fields and the three blobs (rowkey, colkey, value) that go into the sstable. In our example id would be mapped to pkey, and tuples

        ("s", s)  // the static column
        ({color ":" n ":x"}, x)
        ({color ":" n ":foo:" foo_key, foo_value)

would map into (colkey, value).

So in the end, all the extra layers of indirection in CQL translate to concatenating various field names and values together, and stuffing them into either the sstable row key, the sstable column key, the sstable column value.

Data-size constraints

We generally follow Origin here.

  • We can assume that an entire mutation will fit into memory
  • We can assume that an entire query page will fit into memory
  • We can assume that an entire cell will fit into memory
  • We cannot assume that an entire CQL row will fit into memory (I'm not sure if this couldn't be relaxed)
  • We cannot assume that partition will fit into memory

Links:

http://wiki.apache.org/cassandra/CassandraLimitations

http://www.datastax.com/documentation/cassandra/1.2/cassandra/architecture/architecturePlanningAntiPatterns_c.html?scroll=concept_ds_emm_hwl_fk__multiple-gets

In-memory data structures

The requirements are:

  • efficient lookup given a key (both row keys and collection keys)
  • efficient scanning given a key range (- " -)
  • efficiently reading and writing a given row (row should not be scattered around memory)
  • efficiently updating a part of a row (should not copy entire row to update a single value)
  • be able to hold part of a row (a row can be larger than memory)
  • effective eviction of cached data from memory (anti fragmentation)
  • compact storage so caching is effective
column_family = struct {
   intrusive_multimap<decorated_row_key, versioned_row> for_scan;
   intrusive_multihash<row_key, versioned_row> for_lookup;

   // static columns
   intrusive_multimap<decorated_partition_key, versioned_row> static_for_scan;
   intrusive_multihash<partition_key, versioned_row> static_for_lookup;

   // for supporting thrift out-of-schema columns, the versioned row has only one element, columns's value
   intrusive_multimap<tuple<partition_key, bytes>, versioned_row> out_of_schema;
}

To support both lookup and scan, we have separate indexes using map and hash associative containers (for_lookup and for_scan).

The partition key is not ordered according to the natural ordering of its components but according to the ordering of tokens assigned by the partitioner. Therefore for range lookups we store the keys together with corresponding tokens (decorated_row_key).

TODO:

  • describe layout of partition_key, row_key and decorated_row_key.
  • We avoid duplication of column names per non-PK columns, but duplication may still exist inside row keys. Eg. if we have a clustering column, the partitioning key is duplicated for each value of that column for the same partitioning key. This could be solved by using multi-level hierarchy (first lookup by partition key, then by clustering columns), but that would add extra step to lookups and would fragment scans. The problem could still exist if there are multiple clustering columns (though to a lesser degree). Another solution would be to intern cell name components and represent keys as vectors of pointers. That would slow down lookups though because of an extra indirection.

In order to support incremental updates of individual cells, we store multiple immutable versioned_rows to represent each row. Versioned rows are stored in insertion order. Cell values in newer versioned rows hide those from older versioned rows.

A versioned row is a single byte vector in memory, but designed to be easily parsed, and not to replicate metadata information (column names). It could look something like this:

versioned_row = struct {
   int ref_count;               // for intrusive_ptr<>

   vector<int> column_offset;   // column_offset[3] describes where to find the value of the 4th column
                                // Each column can be present (>=0), missing (-1) or deleted (-2)
   column value 0
   column value 1
   ...
}

Column values are stored in a semi-serialized form:

  • C++ primitive types (int, bool, etc.) are stored with native alignment and byte order (can be accessed using plain pointers)
  • Compound types (strings, blobs, maps, whatnot) replace pointers with uint16_t offsets from the start of the row. This means a versioned_row can be moved around memory for compaction.

With each value (that applies to collection elements too) we need to store a timestamp for conflict resolution.

A versioned_row should also contain an indication whether the rows preceding and following it in memory are also the preceding and following rows on disk; if that's true a scan can avoid looking at the sstable. If false we must scan the sstable and memory simultaneously, and merge the results.

Data model in Origin

This section describes how data is modeled in Origin. Later I'll add how this maps to our data model and APIs.

org.apache.cassandra.db.ColumnFamilyStore#getSequentialIterator() returns an abstract iterator which yields sequence of Row objects. A Row object holds a decorated key (partition key) and a ColumnFamily. ColumnFamily represents a partition (not a table). It can be used to iterate over partition Cells (in clustering order). Each Cell has a name (CellName) and a value in serialized form (ByteBuffer).

The same abstractions are used across in-memory storage (memtables and row cache).

The Cell type is packed with responsibilities:

  • it holds the cell name
  • it holds the cell value
  • it knows how to resolve conflicts (reconciliation)
  • it knows how to expire
  • ...

The 'Cell' type hierarchy has multiple dimensions:

  • Buffer* types mean the cell data is kept on heap (in Cell objects), Native* types mean the data (including Cell fields) is stored off heap
  • Cell "kind": *CounterCell, *DeletedCell, *ExpiringCell, *Cell.

I think we don't need a distinction between on-heap and off-heap in our data model.

The information about cell type (Counter/Expiring/Regular) could be extracted from the schema. In that sense, Cell type hierarchy duplicates information which is already in the schema. We certainly can drop this information from the storage model. We could integrate schema information at the time data is queried and expose it the the user. TODO: analyze usages in more detail to determine what's the best way to do that.

Every cell can be deleted, so this type is sort of orthogonal to other cell types. This information is not encoded in the schema, it's part of the cell's dynamic state. We could therefore have: cell = variant<deleted_cell,live_cell>.

CellName is an abstraction which represents cell's location within partition (it does not hold the partitioning key). This type is packed with information:

  • it knows about the number of components and their values (obviously)
  • it knows which components correspond to clustering or static columns
  • it knows about data types of its components
  • it knows about the column name of the non-clustering component

The CellName class hierarchy encodes information about:

  • whether storage format is dense or not (*DenseCellName) (See Compatibility with thrift)
  • whether the column comparator is compound or not (Compound*CellName vs Simple*CellName) which determines whether the cell name is a composite or not.

Note: non-compact CQL tables are always CompoundSparse*, compact CQL with only one clustering column are SimpleDense*, with more than one clustering column are CompoundDense*, with no clustering columns are SimpleSparse*. SimpleSparse* is also for static thrift column families (with column definitions).

All this information could be extracted from the schema. We certainly don't need to replicate it in storage, it could be integrated from the schema at query time. TODO: investigate what would be the best API to return this information to the user.

Handling TTL

TODO

Commit log

TODO

Handling schema changes

These are the allowed schema transformations in CQL3:

  1. renaming a column

This is cheap, because we store the names in one place and can update it in-place.

  1. adding new column

One can only add regular columns (those not part of the primary key), so we can append the column to the list of regular columns and leave our versioned rows as they are, provided we store the column count with each row and do the bounds check on access. Bound miss is interpreted as missing data.

  1. changing column's value type to a more generic type

This is cheap, the bytes stored in columns remain unchanged.

  1. deleting a column

If we want to avoid rewriting/invalidating rows in memory (Origin does not do that), we need to store additional information with versioned rows which will allow us to identify rows with stale data. Note: sstable flush in progress should not be affected by deletes, and we cannot invalidate such rows right away. So we need some way to identify stale rows. One way to do that is to store a pointer to the schema with each row. TODO.

TODO: describe schema altering via thrift

Compatibility with thrift

Not all tables created from CQL are compatible with thrift and vice versa.

When creating a table in CQL one can use "WITH COMPACT STORAGE" to make the table thrift-compatible. This imposes some limitations on the schema:

  • there may be only one column which is not part of the primary key
  • table cannot have collections

In Origin it also affects the way data is stored. Unless we want to be compatible on storage level, we could ignore that aspect.

We could implement this CQL syntax by marking the table as compact and enforce constraints on the schema during table creation and altering. When returning data to thrift, we would have to transform the data appropriately.

TODO: describe how the transformation will look like

Dense tables

Origin has a notion of a dense table.

A dense table is a table in which all components of the comparator (aka. cell name type) correspond to clustering columns, ie no component is used to encode column name.

Dense is not the same as compact. Creating a compact table with any clustering columns makes the table dense. But a compact table without any clustering columns is not dense.

The following tables are not dense:

  • tables created from CQL which have no clustering key columns
  • tables created from thrift with any columns defined

You cannot add new columns to dense tables.

Concurrency control

Data inside versioned rows can be accessed concurrently from other cores, hence it should be immutable (at least the part which is accessed from other cores) and ref counted.

Versioned rows which are undergoing a flush also need to be locked in memory and their content left untouched. Immutable versioned rows and ref counting allows for that.

Row cache

This section describes how read cache works in Origin.

Origin has a row cache which is meant to speed up frequent queries so that they don't have to integrate the state of a row, which is expensive (see Reads section).

Row cache stores only a contiguous subset of CQL rows. In case the whole query can be satisfied by row cache, it's used.

Any update made to a partition discards all cached rows for that partition. See: https://issues.apache.org/jira/browse/CASSANDRA-2864, https://issues.apache.org/jira/browse/CASSANDRA-5357.

The amount of data cached is configured per column family.

Row caching can be configured to cache the whole partition or a fixed number of rows. Caching whole partition is not recommended because it may exhaust memory.

Read cache, when enabled, is populated lazily on read. We populate the whole cache for given partition on each read, even if the read touches less data.

Read cache is supposed to help workloads which mostly access head or tail of the partition and rarely the middle parts.

Updates

In Cassandra, partition state is distributed among many nodes called replicas. The system is AP in CAP sense with eventual consistency, so regular updates do not require coordination. For eventual consistency to work, the partition state as a whole is treated as CRDT. State mutations hold partial information about the state. Mutations may arrive in different order, but merged together should resolve to the same view on each node. We need to therefore have a merge function defined on partition state which is associative and commutative.

Cassandra favors writes to reads, the most expensive part of state merging happens during reads. Writes merge only the state of individual cells or collection items, but do not take into account deletions of parent structures like rows and partitions, these need to be processed by reads.

Update execution starts at the node which gets the query. It's called the coordinator node. The query is parsed on that node and passed ot other nodes in form of mutations.

Some updates require rows to be read and sent back to the coordinator, that includes:

  • removing element or element at particular index from a list column
  • light-weight transactions (LWT, or CAS), eg: "UPDATE ... IF ..."

Other mutations do not require reads. Mutations are constructed by appending cells (live cells or tombstones for deleted cells), range tombstones and partition tombstone. Mutation of a single partition is represented by Mutation object. Mutation is composed of partition key (ByteBuffer) and a column family UUID to ColumnFamily mapping. The ColumnFamily implementation which backs mutations is ArrayBackedSortedColumns. As every ColumnFamily, also the one used by Memtable, this implementation stores Cell objects sorted by CellName.

Mutation implements IMutation. The only other implementation is CounterMutation which only adds consistency level setting as far as data is concerned. There's also some dispatching based on type, but it looks like we could get away with using aggregation and variant<>. I'm leaving counters out for now.

There is one Mutation object per partition. A single UPDATE statement can only change a single row anyway, but CQL allows to batch requests, in which case coordinator may construct multiple Mutation objects and each of those may be sent to different nodes for execution. It's generally discouraged to update multiple partitions in one batch.

Mutations are sent via StorageProxy to all replicas for given partition key. Each replica applies mutations to its local state. Depending on configured consistency level, we may wait for one or more replicas to respond. It's also possible for write to succeed even if all replicas time out (at consistency level "ANY"), thanks to a mechanism called hinted handoff. In such case the coordinator records the mutations in the hints table and will retry it once replicas are back up (timeout after 3 hours).

Random notes about CQL updates:

  • it is fine to use UPDATE statement with primary key which does not exist. The coordinator has no way of knowing if it exists or not, nor can the replicas distinguish that case from the case when they simply missed the insert. Also, looking at all sstables to check if the CQL row exists or not would be prohibitivly expensive.
  • the difference between an update and an insert is subtle. You cannot use insert on counter tables

Updating lists

Lists can be modeled as sorted maps where keys are time-based UUIDs and values are element values:

  • Appending an item creates a cell (UUID(now), value).
  • Prepending an item creates a cell (UUID(-now), value).
  • Deleting an index involves reading the whole list, checking what is the key for n-th element and creating a tombstone for that key.
  • Removing an element by value is similar to removal by index, but the list is scanned for the presence of the element.

List is not a CRDT type, removal depends on row state as seen by one of the replicas and is therefore unreliable.

Applying mutations

Mutations are applied to local node via Keyspace.apply(). First, mutation is logged in the commit log. After this operation we know the commit log position associated with this mutation. Memtables need to know the commit log positions of their updates because once one gets flushed into an sstable, its range in the commit log needs to be invalidated.

We also need the commit log position to know to which memtable the update should go. When memtable flush is started, we mark the current commit log position. The memtable flush needs to wait for all writes which were started before we made the mark but have not yet completed (the data may be already in the commit log but not yet in the memtable). Origin has a synchronization mechanism for that built around OpOrder class. Reads and writes register in the current OpOrder. One can issue a barrier via OpOrder.newBarrier() to divide prior operations from later operations. The returned Barrier object allows the issuer to wait for all prior operations.

Records written to the commit log are not immediately synced to disk. They are batched in memory and synced periodically. There is a setting (commitlog_sync) which determines whether writes should wait for fsync or not before they're applied and acked back. By default writes do not wait for fsync.

Next, mutation is inserted into current memtable. First, row tombstone (partition deletion) and range tombstones (CQL row deletions) are applied. Partition tombstone is recorded in a field. Range tombstones are kept in a sorted structure which can be traversed in-order during reads.

Saving cells to memtable is done by cloning their content via MemtableAllocator#clone() (called by Cell#localCopy). This basically serializes Cell's content into the backing storage, which depends on the underlying Allocator. The memory allocated by memtable allocator is retained as long as the memtable is live. The memtable is removed one it's flushed onto disk (View#replaceFlushed).

Some allocators put the cells on heap, some use off-heap (native) storage. Which allocator is used is determined in org.apache.cassandra.config.DatabaseDescriptor#getMemtableAllocatorPool and it comes from configuration. I empirically determined that SlabAllocator is used by default. This allocator uses on-heap allocaoted ByteBuffers (1 MB in size) onto which cells are appended.

Conflict resolution

Cell conflicts are resolved in org.apache.cassandra.db.Cell#reconcile. The default resolution logic is:

  • most recent cell wins, based on timestamp field.
  • in case of equal timestamps, we chose live cells before dead cells
  • if both are live or dead, the one with higher value wins

Reads are also using reconcile() to merge cells.

The timestamp field is int64_t representing microseconds. However it has currently millisecond resolution (org.apache.cassandra.service.ClientState#getTimestamp) unless user-supplied timestamps are used.

If two updates to the same row have the same timestamp, the system may eventually resolve to a state as if the updates did not execute atomically. See: https://aphyr.com/posts/294-call-me-maybe-cassandra

org.apache.cassandra.service.ClientState#getTimestamp papers back over the problem by ensuring updates sent over a single client state have consecutive timestamps (in microsecond units).

Read repair does not use reconcile(), it uses diff(), which only compares timestamps. Furthermore, the coordinator sends an update only if the conflicting cells have different timestamps. To me it seems looks like a bug because we may end up with replicas disagreeing about cell values.

Cell reconciliation during compaction happens in org.apache.cassandra.db.compaction.LazilyCompactedRow.Reducer#reduce, which then calls org.apache.cassandra.db.ArrayBackedSortedColumns#addColumn, which calls Cell#reconcile.

Deletions

When deleting cells we need to record that fact so that this information can be used for resolving data state. Such record is called a tombstone. We must retain tombstones for at least gc_grace_period (configuration) so that nodes which were unavailable (partition, failure) can be healed.

It's also possible that we will receive a tombstone before we receive data which it removes, so we need to keep it so that we can resolve to the correct state.

Tombstones need to be written back to sstables. A tombstone can go away during compaction and only when it has expired and we know that the row is not present in any other sstable on current node (during major compaction by definition, or by checking bloom filters).

Tombstone accumulation was causing issues for Spotify. In one of their tables rows were immutable which meant that changing state involved removal of the current row followed by insertion of the new row with a different key. This generates a tombstones with every update. They said that in one case they saw 500k accumulated tombstones, processing of which was causing latency spikes.

In Origin recording deletes is fast. Deleting a partition is just storing deletion timestamp in a field. Deleting a row is storing range tombstone in a sorted per-partition structure.

Cassandra purges deleted cells on each read request from the results. It uses a kind of forward iterator over deletion information (InOrderTester) and moves it as it moves over cells in comparator order. Range tombstones are applied in org.apache.cassandra.db.RangeTombstoneList.InOrderTester#isDeleted.

Range tombstone is represented as a pair of composites and represents deletes on all cells whose cell name is contained between these composites (both ends inclusive) according to the comparator.

This means that you could use range tombstones to delete:

  • a range of CQL rows (including the ones which this node does not know about yet)
  • a range of cells ordered by name within a row
  • all CQl rows from a range except some range (requires 2 range tombstones)
  • an ordered subset of collection items (eg. all keys beginning with 'FOO')

We cannot model above on per-CQL object basis, it's not enough to associate tombstones with CQL rows or cells. The main reason is that the range may cover a huge amount of rows, most of which may never come to existence. We need to keep range tombstones in a separate structure.

The following deletes involving range tombstones are supported by CQL 3.1 (skipping those which map to cell tombstones or partition tombstone):

  • whole CQL row(s): delete from table where pk = x;, delete from table where pk in (x, y);
  • a single collection element: delete numbers[3] from table where pk = x;
  • whole collection: delete numbers from table where pk = x;

We could model all of these using tombstones per CQL entity. Advantage would be better locality, no extra tombstone lookups during reads (See Mutation model proposal)

Not everything which is possible to do with range tombstones is yet possible to do from CQL, but it's possible that CQL will evolve to make use of these features (?). I could imagine that tese extensions could be efficiently implemented:

  • deleting a range of CQL rows: delete from table where partition_key = x and clustering_column > "A";
  • deleting a range of items from a collection: delete numbers[3:5] from table where pk = x;
  • deleting all items in a set beginning with a prefix: delete names[starts_with("Tom")] from table where pk = x;

There is an impedance mismatch between the CQL model (which our model wants to stay close to) and the low-level model. The low-level model is more powerful. We can easily translate the CQL model into low-level model, but the reverse is not always easy.

Another example of model mismatch (not related to range tombstones) is deletion of CQL rows. In CQL there is a difference between a missing row and a row which has all non-PK columns missing. It can't be represented using the low-level model cleany, so origin inserts a special "row marker" cell with an artificial column name (see org.apache.cassandra.db.composites.CompoundSparseCellNameType#rowMarkerId) to distinguish that.

External data model

This is the model meant to be used outside the DB engine (mutations, query results). Important part of it are algorithms, which are not included here.

These entities are serialized and sent between cluster nodes (between coordinator and replicas). This model is meant to be similar to the one used by storage, so that we can avoid overhead and complexity of conversions.

timestamp_type = int64_t;

tombstone = struct {
	timestamp_type timestamp;
	time_point ttl;
};

deletable_aggregate<T> = struct {
	// can have tombstone, value, or both
	optional<tombstone> last_deleted;
	optional<T> value;
};

mutation = struct {
	UUID schema_id;
	partition_key key;
	deletable_aggregate<partition> value;
};

clustering_key = vector<any>;
partition_key = vector<any>;

partition = struct {
	row static_row;
	map<clustering_key, deletable_aggregate<row>> rows;
	map<any, atomic_cell> out_of_schema; // SimpleSparse cells. All other cell name types can be mapped to CQL rows.
};

cell = any; // actual type from schema (atomic_cell, cql_map)

row = struct {
	map<column_id, cell> cells;
};

column_id = int;

// atomic cells support no partial state updates
atomic_cell = variant<tombstone, live_atomic_cell>;

live_atomic_cell = struct {
	timestamp_type timestamp;
	optional<time_point> ttl;
	any value;
};

cql_map = deletable_aggregate<map<any, atomic_cell>>;

Reads

Query starts on coordinator node in org.apache.cassandra.service.StorageProxy#read, interesting part starts in org.apache.cassandra.service.StorageProxy#fetchRows. The coordinator sends request for data to the nearest replica (snitch determines which), other nodes are queried for digests based on read-repair settings and consitency level. In case digests don't match, background read repair is conducted (full requests to all replicas, coordinator compares and sends diff to each).

The Replica repsonds with ReadResponse which has data digest and a Row. A Row has a partition key and a ColumnFamily.

Locally reads enter replica node in org.apache.cassandra.db.Keyspace#getRow. The whole read result must fit in memory. If row cache is enabled, it's tried. See Row cache section.

We need to merge data from both in-memory structures as well as from sstables which have this partition key. Merging state involves combining information from these places:

  • partition tombstone
  • range tombstones
  • memtable cells
  • sstables (which include partition tombstones, range tombstones, cells)

Cassandra purges tombstones and dropped columns on each read request from the result set.

Once we have the resulting set, it is applied back as a mutation in order to defragment the data by putting the whole request into the current memtable so that it will land in one sstable.

Column aliases

TODO

Replication

TODO

Schema persistence

TODO

Compaction

TODO

Memory allocation

TODO

Clone this wiki locally