Skip to content

Commit

Permalink
Revert "CEP-15: (C*) per-table transactional configuration"
Browse files Browse the repository at this point in the history
This reverts commit 85411eb.
  • Loading branch information
aweisberg committed Mar 19, 2024
1 parent 85411eb commit abe37a4
Show file tree
Hide file tree
Showing 90 changed files with 2,254 additions and 2,585 deletions.
20 changes: 0 additions & 20 deletions src/java/org/apache/cassandra/config/AccordSpec.java
Expand Up @@ -18,8 +18,6 @@

package org.apache.cassandra.config;

import org.apache.cassandra.service.consensus.TransactionalMode;

public class AccordSpec
{
public volatile boolean enabled = false;
Expand Down Expand Up @@ -51,22 +49,4 @@ public class AccordSpec
public volatile DurationSpec durability_txnid_lag = new DurationSpec.IntSecondsBound(5);
public volatile DurationSpec shard_durability_cycle = new DurationSpec.IntMinutesBound(2);
public volatile DurationSpec global_durability_cycle = new DurationSpec.IntMinutesBound(10);

public enum TransactionalRangeMigration
{
auto, explicit
}

/**
* Defines the behavior of range migration opt-in when changing transactional settings on a table. In auto,
* all ranges are marked as migrating and no additional user action is needed aside from running repairs. In
* explicit, no ranges are marked as migrating, and the user needs to explicitly mark ranges as migrating to
* the target transactional mode via nodetool.
*/
public volatile TransactionalRangeMigration range_migration = TransactionalRangeMigration.auto;

/**
* default transactional mode for tables created by this node when no transactional mode has been specified in the DDL
*/
public TransactionalMode default_transactional_mode = TransactionalMode.off;
}
115 changes: 115 additions & 0 deletions src/java/org/apache/cassandra/config/Config.java
Expand Up @@ -42,6 +42,7 @@
import org.apache.cassandra.io.sstable.format.big.BigFormat;
import org.apache.cassandra.service.StartupChecks.StartupCheckType;
import org.apache.cassandra.utils.StorageCompatibilityMode;
import org.apache.cassandra.service.accord.IAccordService;

import static org.apache.cassandra.config.CassandraRelevantProperties.*;

Expand Down Expand Up @@ -1111,6 +1112,8 @@ public enum PaxosOnLinearizabilityViolation

public volatile boolean client_request_size_metrics_enabled = true;

public LWTStrategy lwt_strategy = LWTStrategy.migration;
public NonSerialWriteStrategy non_serial_write_strategy = NonSerialWriteStrategy.normal;

public volatile int max_top_size_partition_count = 10;
public volatile int max_top_tombstone_partition_count = 10;
Expand Down Expand Up @@ -1218,6 +1221,118 @@ public enum CorruptedTombstoneStrategy
exception
}

/*
* How to pick a consensus protocol for CAS
* and serial read operations. Transaction statements
* will always run on Accord. Legacy in this context includes PaxosV2.
*/
public enum LWTStrategy
{
/*
* Allow both Accord and PaxosV1/V2 to run on the same cluster
* Some keys and ranges might be running on Accord if they
* have been migrated and the rest will run on Paxos until
* they are migrated.
*/
migration,

/*
* Everything will be run on Accord. Useful for new deployments
* that don't want to accidentally start using legacy Paxos
* requiring migration to Accord.
*/
accord
}

/*
* Configure how non-serial writes should be executed. For Accord transactions to function correctly
* when mixed with non-SERIAL writes it's necessary for the writes to occur through Accord.
*
* Accord will also use this configuration to determine what consistency level to perform its reads
* at since it will need to be able to read data written at non-SERIAL consistency levels.
*
* BlockingReadRepair will also use this configuration to determine how BRR mutations are applied. For migration
* and accord the BRR mutations will be applied as Accord transactions so that BRR doesn't expose Accord to
* uncommitted Accord data that is being RRed. This can occur when Accord has applied a transaction at some, but not
* all replica since Accord defaults to asynchronous commit.
*
* By routing repairs through Accord it is guaranteed that the Accord derived contents of the repair have already been applied at any
* replica where Accord applies the transaction. This also prevents BRR from breaking atomicity of Accord writes.
*
* If they are not written through Accord then reads through Accord will be required to occur at
* consistency level compatible with the non-serial writes preventing single replica reads from being performed
* by Accord. It will also require Accord to perform read repair of non-serial writes.
*
* Even then there is the potential for Accord to inconsistently execute transactions at different replicas
* because different coordinators for an Accord transaction may encounter different non-SERIAL write state and
* race to commit different outcomes for the transaction.
*
* This is different from Paxos because Paxos performs consensus on the actual values to be applied so recovery
* coordinators will always produce a consistent state when applying a transaction. Accord performs consensus on
* the execution order of transaction and different coordinators witnessing different states not managed by Accord
* can produce multiple outcomes for a transaction.
*
* // TODO (maybe): To safely migrate you would have to route all writes through Accord with the current implementation
* // We could do it by range instead in the migration version, but then we need to know when all in flight writes
* // are done before marking a range as migrated. Would waiting out the timeout be enough (timeout bugs!)?
*/
public enum NonSerialWriteStrategy
{
/*
* Execute writes through Cassandra via StorageProxy's normal write path. This can lead Accord to compute
* multiple outcomes for a transaction that depends on data written by non-SERIAL writes.
*/
normal(false, false, false),
/*
* Allow mixing of non-SERIAL writes and Accord, but still force BRR through Accord
*/
mixed(false, false, true),
/*
* Execute writes through Accord skipping StorageProxy's normal write path, but commit
* writes at the provided consistency level so they can be read via non-SERIAL consistency levels.
*/
migration(false, true, true),
/*
* Execute writes through Accord skipping StorageProxy's normal write path. Ignores the provided consistency level
* which makes Accord commit writes at ANY similar to Paxos with commit consistency level ANY.
*/
accord(true, true, true);

public final boolean ignoresSuppliedConsistencyLevel;
public final boolean writesThroughAccord;

public final boolean blockingReadRepairThroughAccord;

NonSerialWriteStrategy(boolean ignoresSuppliedConsistencyLevel, boolean writesThroughAccord, boolean blockingReadRepairThroughAccord)
{
this.ignoresSuppliedConsistencyLevel = ignoresSuppliedConsistencyLevel;
this.writesThroughAccord = writesThroughAccord;
this.blockingReadRepairThroughAccord = blockingReadRepairThroughAccord;
}

public ConsistencyLevel commitCLForStrategy(ConsistencyLevel consistencyLevel)
{
if (ignoresSuppliedConsistencyLevel)
return null;

if (!IAccordService.SUPPORTED_COMMIT_CONSISTENCY_LEVELS.contains(consistencyLevel))
throw new UnsupportedOperationException("Consistency level " + consistencyLevel + " is unsupported with Accord for write/commit, supported are ANY, ONE, QUORUM, and ALL");

return consistencyLevel;
}

public ConsistencyLevel readCLForStrategy(ConsistencyLevel consistencyLevel)
{
if (ignoresSuppliedConsistencyLevel)
return null;

if (!IAccordService.SUPPORTED_READ_CONSISTENCY_LEVELS.contains(consistencyLevel))
throw new UnsupportedOperationException("Consistency level " + consistencyLevel + " is unsupported with Accord for read, supported are ONE, QUORUM, and SERIAL");

return consistencyLevel;
}
}

private static final Set<String> SENSITIVE_KEYS = new HashSet<String>() {{
add("client_encryption_options");
add("server_encryption_options");
Expand Down
43 changes: 29 additions & 14 deletions src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Expand Up @@ -76,6 +76,8 @@
import org.apache.cassandra.auth.IRoleManager;
import org.apache.cassandra.config.Config.CommitLogSync;
import org.apache.cassandra.config.Config.DiskAccessMode;
import org.apache.cassandra.config.Config.LWTStrategy;
import org.apache.cassandra.config.Config.NonSerialWriteStrategy;
import org.apache.cassandra.config.Config.PaxosOnLinearizabilityViolation;
import org.apache.cassandra.config.Config.PaxosStatePurging;
import org.apache.cassandra.db.ConsistencyLevel;
Expand Down Expand Up @@ -109,7 +111,6 @@
import org.apache.cassandra.security.SSLFactory;
import org.apache.cassandra.service.CacheService.CacheType;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.service.consensus.TransactionalMode;
import org.apache.cassandra.service.paxos.Paxos;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.StorageCompatibilityMode;
Expand Down Expand Up @@ -153,8 +154,8 @@

public class DatabaseDescriptor
{
public static final String NO_ACCORD_PAXOS_STRATEGY_WITH_ACCORD_DISABLED_MESSAGE =
"Cannot use lwt_strategy \"accord\" while Accord transactions are disabled.";
public static final String NO_ACCORD_PAXOS_STRATEGY_WITH_ACCORD_DISABLED_MESSAGE =
"Cannot use lwt_strategy \"accord\" while Accord transactions are disabled.";

static
{
Expand Down Expand Up @@ -892,8 +893,8 @@ else if (conf.repair_session_space.toMebibytes() > (int) (Runtime.getRuntime().m
{
// if consensusMigrationCacheSizeInMiB option was set to "auto" then size of the cache should be "min(1% of Heap (in MB), 50MB)
consensusMigrationCacheSizeInMiB = (conf.consensus_migration_cache_size == null)
? Math.min(Math.max(1, (int) (Runtime.getRuntime().totalMemory() * 0.01 / 1024 / 1024)), 50)
: conf.consensus_migration_cache_size.toMebibytes();
? Math.min(Math.max(1, (int) (Runtime.getRuntime().totalMemory() * 0.01 / 1024 / 1024)), 50)
: conf.consensus_migration_cache_size.toMebibytes();

if (consensusMigrationCacheSizeInMiB < 0)
throw new NumberFormatException(); // to escape duplicating error message
Expand Down Expand Up @@ -1050,6 +1051,14 @@ else if (conf.max_value_size.toMebibytes() >= 2048)
throw new ConfigurationException(String.format("Invalid value for.progress_barrier_default_consistency_level %s. Allowed values: %s",
conf.progress_barrier_default_consistency_level, progressBarrierCLsArr));
}

if (conf.lwt_strategy == LWTStrategy.accord)
{
if (!conf.accord.enabled)
throw new ConfigurationException(NO_ACCORD_PAXOS_STRATEGY_WITH_ACCORD_DISABLED_MESSAGE);
if (conf.non_serial_write_strategy == Config.NonSerialWriteStrategy.normal)
throw new ConfigurationException("If Accord is used for LWTs then regular writes needs to be routed through Accord for interoperability by setting non_serial_write_strategy to \"accord\" or \"migration\"");
}
}

@VisibleForTesting
Expand Down Expand Up @@ -3284,14 +3293,25 @@ public static boolean paxoTopologyRepairStrictEachQuorum()
return conf.paxos_topology_repair_strict_each_quorum;
}

public static AccordSpec.TransactionalRangeMigration getTransactionalRangeMigration()
// TODO (desired): This configuration should come out of TrM to force the cluster to agree on it
public static LWTStrategy getLWTStrategy()
{
return conf.lwt_strategy;
}

public static void setLWTStrategy(LWTStrategy lwtStrategy)
{
return conf.accord.range_migration;
conf.lwt_strategy = lwtStrategy;
}

public static void setTransactionalRangeMigration(AccordSpec.TransactionalRangeMigration val)
public static Config.NonSerialWriteStrategy getNonSerialWriteStrategy()
{
conf.accord.range_migration = Preconditions.checkNotNull(val);
return conf.non_serial_write_strategy;
}

public static void setNonSerialWriteStrategy(NonSerialWriteStrategy nonSerialWriteStrategy)
{
conf.non_serial_write_strategy = nonSerialWriteStrategy;
}

public static int getAccordBarrierRetryAttempts()
Expand All @@ -3314,11 +3334,6 @@ public static long getAccordRangeBarrierTimeoutNanos()
return conf.accord.range_barrier_timeout.to(TimeUnit.NANOSECONDS);
}

public static TransactionalMode defaultTransactionalMode()
{
return conf.accord.default_transactional_mode;
}

public static void setNativeTransportMaxRequestDataInFlightPerIpInBytes(long maxRequestDataInFlightInBytes)
{
if (maxRequestDataInFlightInBytes == -1)
Expand Down
Expand Up @@ -34,6 +34,7 @@

import accord.api.Update;
import accord.primitives.Txn;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.ColumnIdentifier;
import org.apache.cassandra.cql3.QueryOptions;
import org.apache.cassandra.cql3.UpdateParameters;
Expand Down Expand Up @@ -480,7 +481,7 @@ public Txn toAccordTxn(ConsistencyLevel consistencyLevel, ConsistencyLevel commi
Update update = createUpdate(clientState, commitConsistencyLevel);
// If the write strategy is sending all writes through Accord there is no need to use the supplied consistency
// level since Accord will manage reading safely
consistencyLevel = metadata.params.transactionalMode.readCLForStrategy(consistencyLevel);
consistencyLevel = DatabaseDescriptor.getNonSerialWriteStrategy().readCLForStrategy(consistencyLevel);
TxnRead read = TxnRead.createCasRead(readCommand, consistencyLevel);
// In a CAS requesting only one key is supported and writes
// can't be dependent on any data that is read (only conditions)
Expand All @@ -492,7 +493,7 @@ private Update createUpdate(ClientState clientState, ConsistencyLevel commitCons
{
// Potentially ignore commit consistency level if non-SERIAL write strategy is Accord
// since it is safe to match what non-SERIAL writes do
commitConsistencyLevel = metadata.params.transactionalMode.commitCLForStrategy(commitConsistencyLevel);
commitConsistencyLevel = DatabaseDescriptor.getNonSerialWriteStrategy().commitCLForStrategy(commitConsistencyLevel);
return new TxnUpdate(createWriteFragments(clientState), createCondition(), commitConsistencyLevel);
}

Expand Down
Expand Up @@ -62,11 +62,9 @@
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.partitions.FilteredPartition;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.service.accord.AccordService;
import org.apache.cassandra.service.accord.api.AccordRoutableKey;
import org.apache.cassandra.service.accord.txn.AccordUpdate;
import org.apache.cassandra.service.accord.txn.TxnCondition;
import org.apache.cassandra.service.accord.txn.TxnData;
Expand All @@ -78,12 +76,13 @@
import org.apache.cassandra.service.accord.txn.TxnResult;
import org.apache.cassandra.service.accord.txn.TxnUpdate;
import org.apache.cassandra.service.accord.txn.TxnWrite;
import org.apache.cassandra.service.consensus.TransactionalMode;
import org.apache.cassandra.transport.messages.ResultMessage;
import org.apache.cassandra.utils.FBUtilities;

import static accord.primitives.Txn.Kind.EphemeralRead;
import static accord.primitives.Txn.Kind.Read;
import static org.apache.cassandra.config.Config.NonSerialWriteStrategy.accord;
import static org.apache.cassandra.config.DatabaseDescriptor.getNonSerialWriteStrategy;
import static org.apache.cassandra.cql3.statements.RequestValidations.checkFalse;
import static org.apache.cassandra.cql3.statements.RequestValidations.checkNotNull;
import static org.apache.cassandra.cql3.statements.RequestValidations.checkTrue;
Expand Down Expand Up @@ -314,11 +313,6 @@ Keys toKeys(SortedSet<Key> keySet)
return new Keys(keySet);
}

private static TransactionalMode transactionalModeForSingleKey(Keys keys)
{
return Schema.instance.getTableMetadata(((AccordRoutableKey) keys.get(0)).table()).params.transactionalMode;
}

@VisibleForTesting
public Txn createTxn(ClientState state, QueryOptions options)
{
Expand All @@ -331,8 +325,7 @@ public Txn createTxn(ClientState state, QueryOptions options)
List<TxnNamedRead> reads = createNamedReads(options, state, ImmutableMap.of(), keySet::add);
Keys txnKeys = toKeys(keySet);
TxnRead read = createTxnRead(reads, txnKeys, null);
Txn.Kind kind = txnKeys.size() == 1 && transactionalModeForSingleKey(txnKeys) == TransactionalMode.full
? EphemeralRead : Read;
Txn.Kind kind = txnKeys.size() == 1 && getNonSerialWriteStrategy() == accord ? EphemeralRead : Read;
return new Txn.InMemory(kind, txnKeys, read, TxnQuery.ALL, null);
}
else
Expand Down Expand Up @@ -382,6 +375,8 @@ public ResultMessage execute(QueryState state, QueryOptions options, long queryS

Txn txn = createTxn(state.getClientState(), options);

AccordService.instance().maybeConvertTablesToAccord(txn);

TxnResult txnResult = AccordService.instance().coordinate(txn, options.getConsistency(), queryStartNanoTime);
if (txnResult.kind() == retry_new_protocol)
throw new IllegalStateException("Transaction statement should never be required to switch consensus protocols");
Expand Down
Expand Up @@ -34,7 +34,6 @@
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.ClientWarn;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.service.accord.AccordTopology;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.transport.Event.SchemaChange;
import org.apache.cassandra.transport.messages.ResultMessage;
Expand Down Expand Up @@ -177,9 +176,6 @@ public ResultMessage execute(QueryState state)
if (null != user && !user.isAnonymous())
createdResources(diff).forEach(r -> grantPermissionsOnResource(r, user));

// if the changes affected accord, wait for accord to apply them
AccordTopology.awaitTopologyReadiness(diff, result.epoch);

return new ResultMessage.SchemaChange(schemaChangeEvent(diff));
}

Expand Down

0 comments on commit abe37a4

Please sign in to comment.