Skip to content
Yash Murty edited this page Jul 29, 2019 · 121 revisions

Introduction

In Cassandra, data is divided into partitions, which can be found by a partition key. Sometimes, the application needs to find a partition - or partitions - by the value of another column. Doing this efficiently, without scanning all the partitions requires indexing. Materialized Views, which we explain here, is one of the three indexing options supported in Cassandra 3 (see a short survey of all three here).

Cassandra's two other indexing options ("Secondary Indexing" and "SASI") are "local indexes", in the sense that each Cassandra node indexes the data it holds. The biggest problem with local indexes is scalability: When a search is expected to yield just one (or few) partitions, we need to send it to all the nodes because we cannot know which one might hold the data (the location of the data is determined by the partition key, which we don't know, not the column value we are searching).

So, "Materialized Views" is not a local index, but rather a global index: There is one big index which is distributed to the different nodes using the normal Cassandra distribution scheme. Writes becomes more complicated (the data, and the index entry, will usually be on different nodes) but reading is more scalable.

But the "Materialized Views" feature is more than just an index: It doesn't just list partition keys matching a column value (as you would expected from an index); Rather, it also keeps a subset of columns on those partitions. In essence, Materialized Views builds a new table (a regular distributed Cassandra table) with the indexed column as a partition key, and a user-chosen subset of columns as values.

People have been doing this - creating additional tables containing other "views" into the same data (and calling them "Materialized Views" or "denormalization") for a long time. The new feature in Cassandra 3.0 is how to do this in the server without application support, automatically, safely and efficiently. In other words, the application just updates the base table, and the additional materialized-view tables gets updated automatically as needed. Ensuring correctness and atomicity (the materialized views are updated together with the tables) makes updates to a table with materialized views significantly slower than normal updates, although Cassandra docs claim its performance is still acceptable. Reads, however, are just regular fast reads (as opposed to secondary index), and both read and write operations are scalable.

By the way, the idea of Materialized Views comes from SQL systems; There the views may be much more elaborate than just a simple index of a single table, as they may contain joins of multiple tables. But Cassandra's Materialized Views do not support this.

Cassandra's "Materialized Views" feature was developed in CASSANDRA-6477 and explained in this blog entry and in the design document. Another good explanation of materialized views can be found in this blog entry.

Example

Let's look at a table of buildings: the key is the building's name, and additional columns are each building's location, date of building, and height in meters:

CREATE TABLE buildings (
    name text,
    city text,
    built int,
    meters int,
    PRIMARY KEY (name)
);

Let's insert a few famous builds into this list. Each of the buildings in this example held the title of tallest-building-in-the-world when it was built:

INSERT INTO buildings (name, city, built, meters)
       VALUES ('Burj Khalifa', 'Dubai', 2010, 828);
INSERT INTO buildings (name, city, built, meters)
       VALUES ('Shangai World Financial Center', 'Shanghai', 2008, 487);
INSERT INTO buildings (name, city, built, meters)
       VALUES ('Taipei 101', 'Taipei', 2004, 449);
INSERT INTO buildings (name, city, built, meters) 
       VALUES ('Sears Tower', 'Chicago', 1974, 442);
INSERT INTO buildings (name, city, built, meters)
       VALUES ('World Trade Center', 'New York City', 1972, 417);
INSERT INTO buildings (name, city, built, meters)
       VALUES ('Empire State Building', 'New York City', 1931, 381);
INSERT INTO buildings (name, city, built, meters)
       VALUES ('Chrysler Building', 'New York City', 1930, 283);

The table looks like this:

SELECT * FROM buildings;

 name                           | built | city          | meters
--------------------------------+-------+---------------+--------
              Chrysler Building |  1930 | New York City |    283
                   Burj Khalifa |  2010 |         Dubai |    828
             World Trade Center |  1972 | New York City |    417
 Shangai World Financial Center |  2008 |      Shanghai |    487
                    Sears Tower |  1974 |       Chicago |    442
          Empire State Building |  1931 | New York City |    381
                     Taipei 101 |  2004 |        Taipei |    449

Now, from this "base table", let's ask Cassandra to automatically maintain a second table, a materialized view, for finding buildings by city. The new table will have the city as the partition key. But the city cannot be the entire key for each record (a building), because we can have multiple buildings in the same city - in the above example we have multiple buildings in New York. So we will have (city, name) as the primary key of the new materialized-view table: city is the partition key, and name is a clustering key:

CREATE MATERIALIZED VIEW building_by_city AS
SELECT * FROM buildings
WHERE city IS NOT NULL 
PRIMARY KEY(city, name);

One might observe that in this example data (city, built), or even just (built) could also have also served as a key because it also uniquely determines a building in this data set. However, Cassandra has no way to guarantee that this remains so as more data is added. For example, one might add another building built in 1930 to the list (we'll even do this below). The only key which is really guaranteed to be unique - and remain unique as additional data is added - is the original table's key. So, as a rule, in a materialized view all the components of the original primary key of the table MUST also appear in the materialized-view's key. This is why we added name in to the view's key in this example. The materialized-view key can also have additional (currently in Cassandra 3, at most one - see CASSANDRA-9928) components that did not appear in the base table's key, like we used city in this example.

The WHERE city IS NOT NULL filter in the snippet above ensures if a building includes a null value for "city", it would not be added to the view table, because doing so would be illegal (Cassandra does not allow a key component - either partition or clustering key - to be null). Adding this filter is mandatory, if you don't, Cassandra will refuse to create the materialized view:

CREATE MATERIALIZED VIEW building_by_city AS SELECT * FROM buildings PRIMARY KEY(city, name);
InvalidRequest: code=2200 [Invalid query] message="Primary key column 'city' is required to be filtered by 'IS NOT NULL'"

As expected, the new table looks like this:

SELECT * FROM building_by_city;

 city          | name                           | built | meters
---------------+--------------------------------+-------+--------
 New York City |              Chrysler Building |  1930 |    283
 New York City |          Empire State Building |  1931 |    381
 New York City |             World Trade Center |  1972 |    417
      Shanghai | Shangai World Financial Center |  2008 |    487
       Chicago |                    Sears Tower |  1974 |    442
         Dubai |                   Burj Khalifa |  2010 |    828
        Taipei |                     Taipei 101 |  2004 |    449

This view contains all the columns in the base table because of the SELECT * command we used to define the view. We can create a view which selects only some of the columns in the original table. For example,

CREATE MATERIALIZED VIEW building_by_city2 AS
SELECT meters FROM buildings
WHERE city IS NOT NULL 
PRIMARY KEY(city, name);

SELECT * FROM building_by_city2;
 city          | name                           | meters
---------------+--------------------------------+--------
 New York City |              Chrysler Building |    283
 New York City |          Empire State Building |    381
 New York City |             World Trade Center |    417
      Shanghai | Shangai World Financial Center |    487
       Chicago |                    Sears Tower |    442
         Dubai |                   Burj Khalifa |    828
        Taipei |                     Taipei 101 |    449

Unsurprisingly, the new view contains the key columns (city and name), but the only additional column selected is "meters": the "built" column in the base table is not copied to the new view because it was not selected.

Attempting to use more complex SELECT statements, WHERE filters or aggregates to create materialized views is currently not supported, or only partially supported. See for example CASSANDRA-9664, CASSANDRA-10368, CASSANDRA-9778

As we already mentioned, the materialized view tables are maintained automatically when the base table changes. For example, if we add another building to the base table:

INSERT INTO buildings (name, city, built, meters)
       VALUES ('Bank of Manhattan Trust Building', 'New York City', 1930, 255);

the base table, and both views we created of it, are modified automatically:

SELECT * FROM buildings WHERE name='Bank of Manhattan Trust Building'; 
 name                             | built | city          | meters
----------------------------------+-------+---------------+--------
 Bank of Manhattan Trust Building |  1930 | New York City |    255
SELECT * FROM building_by_city WHERE city='New York City';
 city          | name                             | built | meters
---------------+----------------------------------+-------+--------
 New York City | Bank of Manhattan Trust Building |  1930 |    255
 New York City |                Chrysler Building |  1930 |    283
 New York City |            Empire State Building |  1931 |    381
 New York City |               World Trade Center |  1972 |    417
SELECT * FROM building_by_city2 WHERE city='New York City';
 city          | name                             | meters
---------------+----------------------------------+--------
 New York City | Bank of Manhattan Trust Building |    255
 New York City |                Chrysler Building |    283
 New York City |            Empire State Building |    381
 New York City |               World Trade Center |    417

Note that the new data, the Bank of Manhattan Trust Building (today known as the Trump Building), appears in all the views.

Although internally each materialized view is a separate table, a user is not allowed to modify a view directly:

DELETE FROM building_by_city WHERE city='Taipei';
InvalidRequest: code=2200 [Invalid query] message="Cannot directly modify a materialized view"

How it works

Each of the materialized views is a separate table. The view tables are distributed across the cluster in the normal Cassandra way. The view tables are created in the same keyspace as the base table, and in particular have the same replication factor (we'll use this fact below to set up a pairing between base-table replicas and view-table replicas).

Cassandra ensures, in a manner we explain below, that updates to the base table also cause updates to each of the views tables.

The consistency level requested by the client applies as usual to the base table. For example, if QUORUM is required and RF=3, the coordinator will acknowledge a successful write only if it receives 2 acks from base table replicas. However, the consistency we guarantee for view tables is weaker: In the above example, we only guarantee that the view will be updated eventually on at least 2 view replicas (out of 3). In other words, at the time the coordinator receives 2 acks from base table replicas, and considers the write to have succeeded, the view may not have been updated yet. But we have made sure that it will be, eventually.

Read

Reading from a materialized view, or from the base table, is just a normal read from a table.

Write

When an update (or insertion or deletion) is done on a base table which has materialized views, we need to update the base table, and also the view tables. Each partition in the base table is replicated on several nodes (known as base replicas), and an entry in the view table is also held on several (different) nodes (view replicas).

Cassandra does this using the following steps:

  1. If the system property cassandra.mv_enable_coordinator_batchlog is set, the coordinator will create a batchlog for the operation.

    This batchlog helps in a rare edge case: Imagine that the coordinator starts sending an update to base replicas, succesfully sends the update to only one base replica and then the coordinator dies before it sent the update to any other replica. The one base replica that received the update sends the update to the paired view replica, and then the base replica dies too. The view replica processes the update, but no living base replica contains this update.

    Because this combination of events is considered very unlikely, and the coordinator batchlog is expensive, this option is off by default. See discussion in CASSANDRA-10230

  2. The coordinator sends the mutation to all base replicas and waits for as many acknowledgment(s) as requested by Consistency Level.

  3. Each base replica acquires a local lock on the partition to be inserted/updated/deleted in the base table.

    This lock is fairly expensive, but necessary to ensure consistent update of the view table in case of several concurrent updates to the same row of the base table: Consider that the two updates set column C (the new column being indexed by the view) to two different values, V1 and V2, and that before the updates, C's value was Vold. Both updates may remove from the view the old row with Vold, one will add a row with V1 and the second will add a row with V2, and we end up with two rows, with the two different values, instead of just one row with the last value. The local lock solves this problem, ensuring atomic read-modify-write to the view table: First the row with Vold is deleted and a new one with V1 is created, and then the row with V1 is deleted and a new one with V2 is created.

    Note that only a lock on a row is necessary - a lock on the whole partition is not necessary. Nevertheless, Cassandra currently locks the entire partition. This is planned to be fixed - see CASSANDRA-10307.

  4. Each base replica performs a local read on the partition of the base table.

  5. Each base replica creates a local batchlog with two statements on the view table: The first deletes the entry with the old value (as read in step 4), and the second adds an entry with the new value (also requires the read from step 4).

    Note that when the mutation is a deletion, there might actually be several "old values" that need to be deleted from the view table: A mutation deleting a single partition with multiple rows in the base table, might result in multiple separate deletions in the view table.

  6. Each base replica executes this local batchlog asynchronously. For each statement in the batchlog, it is executed against just one paired view replica.

    Each of the base replicas is paired with one view replica, as determined in the Java code by the ViewUtils.getViewNaturalEndpoint() function. The pairing is straightforward - the first base replica (in ring order) is paired with the first view replica, the the second base replica is paired with the second view replica, and so on. This pairing is possible because the base table and view table are in the same keyspace, so they have the same replication factor.

    The update of the view replica is asynchronous, i.e., the base replica does not block and wait for an acknowledgment from the paired view replica before updating the base table. The local batchlog is used to guarantee that even in case of error (e.g., the view replica is temporarily down, or the base replica restarts) the update to the paired view replica is eventually done.

    Because the view update is asynchronous, the view may not be updated when the client receives an acknowledgment of the write to the base table. However, the delay is usually small, and Cassandra has metrics to measure this delay (see CASSANDRA-10323).

  7. Each base replica applies the mutation on the base table locally.

  8. Each base replica releases the local lock on the partition of the base table.

  9. If the local mutation is successful, each replica sends an acknowledgment back to the coordinator.

  10. If as many acknowledgment(s) as Consistency Level are received by the coordinator, the client is acknowledged that the mutation is successful.

  11. Optionally, if the system property cassandra.mv_enable_coordinator_batchlog is set and if QUORUM acknowledgments were received by the coordinator, the coordinator-level batchlog is removed

    Note that we need to use here QUORUM - not the user chosen Consistency Level. The reason is described here but it is arguably out of date and incorrect (it describes an algorithm where every base replica sends to every mv replica). TODO: understand. Also TODO: figure out if this is actually in the code (I couldn't find it).

In source code

This section is about how the algorithm above is organized in the actual Cassandra source code.

A View object (note org.apache.cassandra.db.view.View - there are two other classes in Cassandra called View...) holds the definition of a single materialized view. At runtime, the most important method is mayBeAffectedBy() which checks whether this view may need updating given a specific update to its base table. Note that the View class does not handle the update itself (see below for where that happens).

A ViewManager object holds a list of Views. The main purpose of this manager is to provide the updatesAffectView() method to check which (if any) view may be affected by a given base-table update. It also has a method forTable() returning the list of views relevant to one table in the form of a TableViews (see below) and a static function for providing a lock for concurrent updates. As a comment in Cassandra's ViewManager suggests, all this functionality can be moved to the keyspace or table classes, and the separate ViewManager class could be eliminated.

A TableViews groups all the Views for a given table. Its most important capability is to build mutations to be applied to each view given a mutation to the base table, and push them to the correct replica - see methods generateViewReplicaUpdates() and pushViewReplicaUpdates() described in more detail below.

A Keyspace holds a ViewManager object referring to all the materialized views of this keyspace's tables. The Keyspace.apply() method, which traditionally applied a mutation to the the relevant table, now also applies this mutation to the relevant view tables:

  1. It asks the ViewManager.updatesAffectView() if the mutation touches any materialized view.
  2. If it does, we try to take a lock on the mutated partition for each of the base table(s). These are simple in-memory locks (done by a static ViewManager.acquireLockFor() described above), done as a try_lock loop to avoid deadlock issues [STEP 3]
  3. We write the data to the commitlog.
  4. If the mutation touches a materialized view (as checked in 1), we call ViewManger.forTable() to get the materialized views, and on it call TableViews.pushViewReplicaUpdates() to send the update to the view replica [STEP 4,5,6]
  5. The base data is written to the memtables [STEP 7]
  6. The locks are released [STEP 8]

TableViews.pushViewReplicaUpdates() mentioned above takes a base mutation in the PartitionUpdate form (apparently in Cassandra a Mutation can mutate multiple partitions, and a PartitionUpdate is a mutation of a single partition). This method:

  1. Calls the updatedViews() to eliminate from the list of views those which have excluded this partition key, so the update will certainly not affect them.
  2. Uses readExistingRowsCommand() to determine what we need to read from the base partition, depending on the update command (e.g., an update of single row needs to read just a single row, but a deletion of a partition needs to read all the rows), and read those rows.
  3. Executes this request locally (on the data in this node).
  4. Calls the generateViewUpdates() function to generate the mutations we need to send to the view (deleting old rows, adding new one), given the base data we read in the previous step and the desired update.
  5. Finally calls StorageProxy.mutateMV() which creates a batch log with the view mutations generated in the previous step, and sends them asynchronously (without waiting for them to actually complete) to the paired view replica.

The ViewBuilder class is responsible for building the view tables for an existing base table. It uses some of the functions mentioned previously, like View.createMutations(), passing them an isBuilding=true option which tells those functions that we only need to create the new view partitions - and not delete any old values (because there are none).

Code in Parser.g parses the CQL statements to create, alter or drop materialized views. This creates a CreateViewStatement, AlterViewStatement or DropViewStatement objects, respectively. CreateViewStatement checks its parameters, creates a new ViewDefinition object holding the parameters of the view (keyspace, view table, base table, select statement, etc.), and calls MigrationManager.announceNewView() to announce this new view to all nodes.

TODO: I couldn't find a code which implements STEP 1 and 11 - the coordinator batch log. Keyspace.apply() calls ViewManager.updatesAffectView() (note the placement of the letter "s"), but always with the parameter coordinatorBatchLog so the cassandra.mv_enable_coordinator_batchlog setting seems to be ignored... I don't see any other place where this option makes any difference (!?).

Other minor source code appearance of views:

A ColumnFamilyStore also refers to a ViewManager and forwards some operations also to the view CFs.

Consistency guarantees

Writing to a base table which has a materialized view, with consistency level CL, we want to make the following three guarantees:

  1. The write to the base table has the desired consistency level.

    In other words, the write does not return successfully before CL base table replicas have been persistently modified.

  2. The write to the view table will be eventually consistent.

    In other words, while the write does not have to wait for CL view replicas to contain the update, it does need to wait until we can ensure that eventually, CL view replicas will have the new value. We need to ensure that we can never lose a view update, but also that if updates to the same column race, eventually the view replicas will contain the same row, which also matches that in the base table (and not, for example, two rows matching two different values).

  3. A view cannot contain data not in the base table. More accurately, if data can be found in one a view replica (from there it could reach others via repair), then eventually it should exist in the base table too.

    For successful writes, guarantee 1 and 2 already ensure that the update will appear in both base and view tables. So guarantee 3 is relevant to failed writes.

Let's look at several scenarios which clarify how these three guarantees shaped the update algorithm described above:

  1. Lost view update: Each base replica needs to 1. update its content and 2. send a modification to the paired view replica. If a base replica managed to update its content but the view update failed (the base replica rebooted, or the view replica lost connectivity.), we'll miss one copy of the updated view, and violate guarantee 2. To ensure that that the view update cannot be lost, we have a batchlog in the base replicate: Before updating the base table, it writes its intent to send to the view replica.

  2. Concurrent update: Imagine that starting with a row c=1, a base replica gets two updates concurrently: c=2 and c=3. If done concurrently, both updates may read the same value of c=1, and thus delete the view partition c=1, and then add two separate partitions: c=2 and c=3. This situation will never be resolved (guarantee 2 mandates that eventually, we'll have just one of these rows - the one with the latest timestamp, because that is what the base table will keep). The solution is to use a local lock on the base replica, to ensure that the two updates to the same row happen in sequence, leaving only one live view row at the end - with highest timestamp. Note that the concurrent updates may arrive in different order to different base replicas (e.g., when coming from different coordinators) but that is also not a problem. This issue was first presented here

  3. UNSOLVED BUG? Concurrent update, redux: However, there is still a hole in the above description. The scheme works well if one base replica eventually sees both concurrent updates. But what if none of the base replicas get both updates? Consider a failure scenario where one base replica gets the c=2 update, a second base replica gets the c=3, and then both updates fail. At this point, both view replicas will have a tombstone for the row c=1, but one replica will have a row c=2 while the other has a row c=3. This different will never be reconciled (repair of the view table will put both rows in the view table...).

  4. Two failures: Imagine that the coordinator starts sending an update to base replicas, successfully sends the update to only one base replica and then the coordinator reboots before it sent the update to any other replica. The one base replica that received the update sends the update to the paired view replica, but before writing to the base table, the base replica reboots too. The view replica processes the update, but no living base replica contains this update. This violates guarantee 3. The coordinator batch log option was meant to solve this problem, but isn't turned on... Adding the local write to the batchlog could also have helped (TODO: should we do this?), but not in the case when the base replica doesn't just reboot, but go away forever.

  5. UNSOLVED BUG? Two failures, but actually one: The "two failures" issue above requires an unlikely failure of two nodes. However, what if both nodes are actually one? With a "smart driver", the coordinator node is a node is known to hold the data, so it is one of the base replicas. If this coordinate starts by sending itself (as a base replica), and then sends an update to the view replica before writing locally, then a reboot of just this one node will cause the same inconsistency described above. One possible solution (not implemented in Cassandra) is to make this into a two-failure problem again by ensuring that a coordinator sends to a remote base replica before sending to itself. TODO: Another possibility is that each base replica writes locally before writing the view update? but then we get the opposite inconsistencity (although repair solves it)?

TODO: Benedict's issues - https://github.com/scylladb/scylla/issues/1141#issuecomment-210044118 TODO: data loss (CASSANDRA-10346).

Repair

Note that the explanation below doesn't actually prove that a repair of the base fixes real problems in the view - just that it modifies the view.

Since materialized views are separate tables, they can be repaired separately from the base table. When the base table is repaired, the view will also be updated because Cassandra has a mutation-based repair (repair that goes through the write path, instead of directly streaming sstables). Apparently (see end of http://www.datastax.com/dev/blog/understanding-materialized-views) The same mode is used also for bootstrapping new nodes and for SSTable loading (? TODO: understand).

Similar statements can be made on read-repair: read-repair happens separately on views (it will not repair the base table), but read-repair of the base table may also modify the views. Similarly hint replay on the base table will trigger updates to the views as well.

Other operations

Materialized views are ordinary tables, and their properties (such as compaction, compression, etc.) can be tuned with the "ALTER MATERIALIZED VIEW" command.

It is allowed to create a materialized view for an already populated base table. In that case, a background operation is started to populate the materialized view. While this happens, there will be a period during which queries against the materialized view may not return all results. When the build is complete, the system.built_materializedviews table on each node will be updated with the view’s name.

Cassandra refuses to drop a column from a base table if that column is used by a materialized view - even if this column is not part of the view's key (TODO: http://www.datastax.com/dev/blog/new-in-cassandra-3-0-materialized-views claims the opposite, so which is true?). Cassandra also forbids dropping the base table - you must drop all of its materialized views first.

One can can add a new column to the base table. If there is a materialized view with a "SELECT *", the added column will be added to the materialized view's columns (the initial value of all these columns will be missing, i.e., null).

Column timestamps, and the "shadowable tombstone"

The view table copies various columns from the base table. When an operation on the base table (e.g., updating a column) causes a view row to be created, what timestamp should its columns get?

Let's consider two possible alternative choices and present their flaws, before describing Cassandra's choice:

The first, "obvious" choice we consider (and discard) is to use, for all the data in the new view row, the timestamp of the operation which caused this view row to be created. This option, had it worked, would avoid the complications we'll run into below. But it doesn't work: The first complication is that we can have base-table mutations with multiple timestamps. But more fundamentally, it does not actually work considering Cassandra's usual per-column conflict resolution rules: For example, consider a base table with two regular (non-key) columns, x and y, which are to be duplicated in a view table. Let's start with x=OLD@0, y=OLD@0 (by "x=OLD@0" we mean x has the value OLD with timestamp 0). Now we get two separate mutations: x=NEW@100 and then, later, y=NEW@99 (this last mutation was presumably delayed a bit before we received it). Since the values for both x and y are newer than the previous ones, the end result in the base table will be x=NEW@100, y=NEW@99. But what happens in the view table? After the first mutation, option 1 would write to the view table x=NEW@100, y=OLD@100, i.e., the old value of y would "advanced" to timestamp 100. But then, when the second (and chronologically earlier) mutation tries to set y=NEW@99, it will be ignored (because it has an earlier timestamp), and y will remain at the wrong value of OLD in the view table.

Another options we could perhaps consider tries to fix the two problems of the previous option - the lack of single timestamp, and wrong merging - by using an increasing counter as a timestamp. The goal is to have the view update sent by the base replica always replace the view row completely, instead of merging into it. A view update contains already merged data read from the base table (usually all the view row's data, though what happens if only a regular column is modified, not a view key?), so instead of trying to "merge" it with the old view in the view table, we want to simply overwrite the old data - and we would do that because of the increasing timestamp. One problem with this option is how to maintain this increasing timestamp. It is perhaps possible to forgo keeping a global counter and use a per-replica counter, if we modify global operations (such as repair) to ignore it. But I'm not sure this option doesn't have additional problems... So let's move along to Cassandra's choice, which also has a goal of replacing instead of merging, but does this only for row tombstones.

Cassandra's choice is: The columns are copied from the base table to the view table with the same timestamp that each column had in the base table.

That creates a problem: View rows are sometimes deleted, when a base column that is the view's key is changed. This creates in the view table a row tombstone which shadows the old columns of that row. So far so good. But what if the key is changed back to its previous value, and we need to yet again re-add this row to the view table? The problem is that now, some of the re-added columns have timestamps (those copied from the base table) which are older than the tombstone we already have for that row! So the view's row tombstone shadows some of its data, which is wrong.

Cassandra solved this problem by adding the concept of a shadowable row tombstone: A row tombstone (see db/rows/Rows.java) has a "isShadowable" flag. With this flag turned on (as it is in view tables), the row tombstone is replaced - rather then being merged - when new data comes in. The Cassandra code explains this in a comment:

A shadowable row deletion only exists if the row has no timestamp. In other words, the deletion is only valid as long as no newer insert is done (thus setting a row timestamp; note that if the row timestamp set is lower than the deletion, it is shadowed (and thus ignored) as usual). That is, if a row has a shadowable deletion with timestamp A and an update is made to that row with a timestamp B such that {@code B > A} (and that update sets the row timestamp), then the shadowable deletion is 'shadowed' by that update. A concrete consequence is that if said update has cells with timestamp lower than A, then those cells are preserved(since the deletion is removed), and this is contrary to a normal (regular) deletion where the deletion is preserved and such cells are removed. Currently, the only use of shadowable row deletions is Materialized Views, see CASSANDRA-10261.

See also: last section of http://www.doanduyhai.com/blog/?p=1930, https://issues.apache.org/jira/browse/CASSANDRA-10261

Performance

Write

Adding materialized views to a table slows down updates, because of the following additional steps:

  • Local lock on base table partition
  • Local read of the base table partition (before the write)
  • Updating, with local batchlog, of the view table
  • Optionally, coordinator batchlog

In practice, the biggest performance hit comes from the read-before-write, which also hurts latency because it happens synchronously, i.e., before the update is considered successfully (while other parts, like updating the view table, happen asynchronously).

Interestingly, the latency of updates does not increase with the number of materialized views on the same base table, because we only need to read the current values once, even if we have many materialized views. However, the total throughput of the cluster will be hurt because although the view table updates are asynchronous, we need to send a separate one (to different nodes) for each view.

It is not surprising that denormalization comes at a cost, and the good news is that the cost of this server-implemented materialized views is lower than client-implemented denormalized tables - because a client-side implementation would need to do the same operations (like read-before-write) over the network. Not to mention that it is much more convenient.

Most of the write overheads described above occur because we support modifications to existing rows, which necessitates the read-before-write, the locks, etc. If we knew that a specific base-table only allows adding new data, not modifying data, we could avoid all this overhead - see CASSANDA-9779. However, it's not clear how commonly applications could make due with this restriction. Obviously, many can't.

Read

Reading from a materialized view, or from the base table, is just a normal read from a table, and incurs no additional overheads.

Data cardinality

Because materialized views are implemented as ordinary Cassandra tables, all the regular data-modeling caveats that apply to tables also apply to views.

For example, it is a known faux-pas to have a partition key with low cardinality - i.e., very few different keys, or where a significant percentage of the rows have the same value for the partition key. It us usually fine to have a non-key column with low cardinality, e.g., consider data where each row is a person, and there is a column for the person's gender - which only has two different values ('male' or 'female'). However, trying to use this low-cardinality column as a partition key in a materialized view is not fine: if one tries to create a materialized view with the gender column as the partition key, all the data will be put in just two partitions (one partition listing all the males, one partition listing all the females) which would be extremely inefficient, and the storage and load is unbalanced (all the data is on a small set of nodes and the rest have no data from this table).

Benchmark

mvbench is an example benchmark for materialized views.

Clone this wiki locally