Skip to content

Commit

Permalink
feat: support RPC priority for JDBC connections and statements (#1548)
Browse files Browse the repository at this point in the history
This PR will support setting RPC priority from connection URL and statements.
  • Loading branch information
rahul2393 committed Nov 15, 2021
1 parent 0bd5666 commit b61a0d4
Show file tree
Hide file tree
Showing 18 changed files with 8,235 additions and 5,195 deletions.
15 changes: 15 additions & 0 deletions google-cloud-spanner/clirr-ignored-differences.xml
Expand Up @@ -615,6 +615,21 @@
<method>void setTransactionTag(java.lang.String)</method>
</difference>

<!-- Support for RPC Priority in Connection API -->
<!-- These are not breaking changes, since we provide default interface implementation -->
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>com.google.cloud.spanner.Options$RpcPriority getRPCPriority()</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>void setRPCPriority(com.google.cloud.spanner.Options$RpcPriority)</method>
</difference>



<!-- Adds getValue to ResultSet -->
<!-- These are not breaking changes, since we provide default interface implementation -->
<difference>
Expand Down
Expand Up @@ -22,6 +22,7 @@
import com.google.api.gax.longrunning.OperationFuture;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.SpannerExceptionFactory;
import com.google.cloud.spanner.SpannerOptions;
Expand Down Expand Up @@ -52,6 +53,7 @@ abstract class AbstractBaseUnitOfWork implements UnitOfWork {
private final StatementExecutor statementExecutor;
private final StatementTimeout statementTimeout;
protected final String transactionTag;
protected final RpcPriority rpcPriority;

/** Class for keeping track of the stacktrace of the caller of an async statement. */
static final class SpannerAsyncExecutionException extends RuntimeException {
Expand Down Expand Up @@ -84,6 +86,7 @@ abstract static class Builder<B extends Builder<?, T>, T extends AbstractBaseUni
private StatementExecutor statementExecutor;
private StatementTimeout statementTimeout = new StatementTimeout();
private String transactionTag;
private RpcPriority rpcPriority;

Builder() {}

Expand All @@ -109,6 +112,11 @@ B setTransactionTag(@Nullable String tag) {
return self();
}

B setRpcPriority(@Nullable RpcPriority rpcPriority) {
this.rpcPriority = rpcPriority;
return self();
}

abstract T build();
}

Expand All @@ -117,6 +125,7 @@ B setTransactionTag(@Nullable String tag) {
this.statementExecutor = builder.statementExecutor;
this.statementTimeout = builder.statementTimeout;
this.transactionTag = builder.transactionTag;
this.rpcPriority = builder.rpcPriority;
}

StatementExecutor getStatementExecutor() {
Expand Down
Expand Up @@ -17,13 +17,15 @@
package com.google.cloud.spanner.connection;

import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.spanner.SpannerExceptionFactory;
import com.google.cloud.spanner.TimestampBound;
import com.google.cloud.spanner.TimestampBound.Mode;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.protobuf.Duration;
import com.google.protobuf.util.Durations;
import com.google.spanner.v1.RequestOptions.Priority;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -240,4 +242,34 @@ public TransactionMode convert(String value) {
return values.get(valueWithSingleSpaces);
}
}

/** Converter for converting strings to {@link RpcPriority} values. */
static class RpcPriorityConverter implements ClientSideStatementValueConverter<Priority> {
private final CaseInsensitiveEnumMap<Priority> values =
new CaseInsensitiveEnumMap<>(Priority.class);
private final Pattern allowedValues;

public RpcPriorityConverter(String allowedValues) {
// Remove the parentheses from the beginning and end.
this.allowedValues =
Pattern.compile(
"(?is)\\A" + allowedValues.substring(1, allowedValues.length() - 1) + "\\z");
}

@Override
public Class<Priority> getParameterClass() {
return Priority.class;
}

@Override
public Priority convert(String value) {
Matcher matcher = allowedValues.matcher(value);
if (matcher.find()) {
if (matcher.group(0).equalsIgnoreCase("null")) {
return Priority.PRIORITY_UNSPECIFIED;
}
}
return values.get("PRIORITY_" + value);
}
}
}
Expand Up @@ -26,6 +26,7 @@
import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.Options.QueryOption;
import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.spanner.ReadContext.QueryAnalyzeMode;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.SpannerBatchUpdateException;
Expand Down Expand Up @@ -532,6 +533,32 @@ default String getOptimizerStatisticsPackage() {
/** @return true if this connection requests commit statistics from Cloud Spanner */
boolean isReturnCommitStats();

/**
* Sets the priority to use for RPCs executed by this connection..
*
* @param rpcPriority The RPC priority to use.
* <ul>
* <li>{@link RpcPriority#HIGH} This specifies that the RPC's invocation will be of high
* priority.
* <li>{@link RpcPriority#MEDIUM} This specifies that the RPC's invocation will be of medium
* priority.
* <li>{@link RpcPriority#LOW} This specifies that the RPC's invocation will be of low
* priority.
* </ul>
*/
default void setRPCPriority(RpcPriority rpcPriority) {
throw new UnsupportedOperationException("Unimplemented");
}

/**
* Gets the current RPC priority of this connection.
*
* @return The RPC priority that is currently used by this connection.
*/
default RpcPriority getRPCPriority() {
throw new UnsupportedOperationException("Unimplemented");
}

/**
* Commits the current transaction of this connection. All mutations that have been buffered
* during the current transaction will be written to the database.
Expand Down
Expand Up @@ -28,6 +28,7 @@
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.Options;
import com.google.cloud.spanner.Options.QueryOption;
import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.spanner.Options.UpdateOption;
import com.google.cloud.spanner.ReadContext.QueryAnalyzeMode;
import com.google.cloud.spanner.ResultSet;
Expand Down Expand Up @@ -208,6 +209,7 @@ static UnitOfWorkType of(TransactionMode transactionMode) {
private AutocommitDmlMode autocommitDmlMode = AutocommitDmlMode.TRANSACTIONAL;
private TimestampBound readOnlyStaleness = TimestampBound.strong();
private QueryOptions queryOptions = QueryOptions.getDefaultInstance();
private RpcPriority rpcPriority = null;

private String transactionTag;
private String statementTag;
Expand All @@ -227,6 +229,7 @@ static UnitOfWorkType of(TransactionMode transactionMode) {
this.readOnly = options.isReadOnly();
this.autocommit = options.isAutocommit();
this.queryOptions = this.queryOptions.toBuilder().mergeFrom(options.getQueryOptions()).build();
this.rpcPriority = options.getRPCPriority();
this.returnCommitStats = options.isReturnCommitStats();
this.ddlClient = createDdlClient();
setDefaultTransactionOptions();
Expand Down Expand Up @@ -453,6 +456,18 @@ public String getOptimizerStatisticsPackage() {
return this.queryOptions.getOptimizerStatisticsPackage();
}

@Override
public void setRPCPriority(RpcPriority rpcPriority) {
ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG);
this.rpcPriority = rpcPriority;
}

@Override
public RpcPriority getRPCPriority() {
ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG);
return this.rpcPriority;
}

@Override
public void setStatementTimeout(long timeout, TimeUnit unit) {
Preconditions.checkArgument(timeout > 0L, "Zero or negative timeout values are not allowed");
Expand Down Expand Up @@ -1018,6 +1033,19 @@ private QueryOption[] mergeQueryStatementTag(QueryOption... options) {
return options;
}

private QueryOption[] mergeQueryRequestOptions(QueryOption... options) {
if (this.rpcPriority != null) {
// Shortcut for the most common scenario.
if (options == null || options.length == 0) {
options = new QueryOption[] {Options.priority(this.rpcPriority)};
} else {
options = Arrays.copyOf(options, options.length + 1);
options[options.length - 1] = Options.priority(this.rpcPriority);
}
}
return options;
}

private UpdateOption[] mergeUpdateStatementTag(UpdateOption... options) {
if (this.statementTag != null) {
// Shortcut for the most common scenario.
Expand All @@ -1032,6 +1060,19 @@ private UpdateOption[] mergeUpdateStatementTag(UpdateOption... options) {
return options;
}

private UpdateOption[] mergeUpdateRequestOptions(UpdateOption... options) {
if (this.rpcPriority != null) {
// Shortcut for the most common scenario.
if (options == null || options.length == 0) {
options = new UpdateOption[] {Options.priority(this.rpcPriority)};
} else {
options = Arrays.copyOf(options, options.length + 1);
options[options.length - 1] = Options.priority(this.rpcPriority);
}
}
return options;
}

private ResultSet internalExecuteQuery(
final ParsedStatement statement,
final AnalyzeMode analyzeMode,
Expand All @@ -1040,7 +1081,8 @@ private ResultSet internalExecuteQuery(
statement.getType() == StatementType.QUERY, "Statement must be a query");
UnitOfWork transaction = getCurrentUnitOfWorkOrStartNewUnitOfWork();
return get(
transaction.executeQueryAsync(statement, analyzeMode, mergeQueryStatementTag(options)));
transaction.executeQueryAsync(
statement, analyzeMode, mergeQueryRequestOptions(mergeQueryStatementTag(options))));
}

private AsyncResultSet internalExecuteQueryAsync(
Expand All @@ -1051,7 +1093,8 @@ private AsyncResultSet internalExecuteQueryAsync(
statement.getType() == StatementType.QUERY, "Statement must be a query");
UnitOfWork transaction = getCurrentUnitOfWorkOrStartNewUnitOfWork();
return ResultSets.toAsyncResultSet(
transaction.executeQueryAsync(statement, analyzeMode, mergeQueryStatementTag(options)),
transaction.executeQueryAsync(
statement, analyzeMode, mergeQueryRequestOptions(mergeQueryStatementTag(options))),
spanner.getAsyncExecutorProvider(),
options);
}
Expand All @@ -1061,13 +1104,15 @@ private ApiFuture<Long> internalExecuteUpdateAsync(
Preconditions.checkArgument(
update.getType() == StatementType.UPDATE, "Statement must be an update");
UnitOfWork transaction = getCurrentUnitOfWorkOrStartNewUnitOfWork();
return transaction.executeUpdateAsync(update, mergeUpdateStatementTag(options));
return transaction.executeUpdateAsync(
update, mergeUpdateRequestOptions(mergeUpdateStatementTag(options)));
}

private ApiFuture<long[]> internalExecuteBatchUpdateAsync(
List<ParsedStatement> updates, UpdateOption... options) {
UnitOfWork transaction = getCurrentUnitOfWorkOrStartNewUnitOfWork();
return transaction.executeBatchUpdateAsync(updates, mergeUpdateStatementTag(options));
return transaction.executeBatchUpdateAsync(
updates, mergeUpdateRequestOptions(mergeUpdateStatementTag(options)));
}

/**
Expand Down Expand Up @@ -1104,6 +1149,7 @@ UnitOfWork createNewUnitOfWork() {
.setStatementTimeout(statementTimeout)
.withStatementExecutor(statementExecutor)
.setTransactionTag(transactionTag)
.setRpcPriority(rpcPriority)
.build();
case READ_WRITE_TRANSACTION:
return ReadWriteTransaction.newBuilder()
Expand All @@ -1114,6 +1160,7 @@ UnitOfWork createNewUnitOfWork() {
.setStatementTimeout(statementTimeout)
.withStatementExecutor(statementExecutor)
.setTransactionTag(transactionTag)
.setRpcPriority(rpcPriority)
.build();
case DML_BATCH:
// A DML batch can run inside the current transaction. It should therefore only
Expand All @@ -1124,6 +1171,7 @@ UnitOfWork createNewUnitOfWork() {
.setStatementTimeout(statementTimeout)
.withStatementExecutor(statementExecutor)
.setStatementTag(statementTag)
.setRpcPriority(rpcPriority)
.build();
case DDL_BATCH:
return DdlBatch.newBuilder()
Expand Down
Expand Up @@ -25,6 +25,7 @@
import com.google.cloud.ServiceOptions;
import com.google.cloud.spanner.DatabaseId;
import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.spanner.SessionPoolOptions;
import com.google.cloud.spanner.Spanner;
import com.google.cloud.spanner.SpannerException;
Expand Down Expand Up @@ -158,6 +159,7 @@ public String[] getValidValues() {
private static final String DEFAULT_USER_AGENT = null;
private static final String DEFAULT_OPTIMIZER_VERSION = "";
private static final String DEFAULT_OPTIMIZER_STATISTICS_PACKAGE = "";
private static final RpcPriority DEFAULT_RPC_PRIORITY = null;
private static final boolean DEFAULT_RETURN_COMMIT_STATS = false;
private static final boolean DEFAULT_LENIENT = false;

Expand Down Expand Up @@ -196,6 +198,8 @@ public String[] getValidValues() {
"optimizerStatisticsPackage";
/** Name of the 'lenientMode' connection property. */
public static final String LENIENT_PROPERTY_NAME = "lenient";
/** Name of the 'rpcPriority' connection property. */
public static final String RPC_PRIORITY_NAME = "rpcPriority";

/** All valid connection properties. */
public static final Set<ConnectionProperty> VALID_PROPERTIES =
Expand Down Expand Up @@ -252,7 +256,10 @@ public String[] getValidValues() {
ConnectionProperty.createBooleanProperty(
LENIENT_PROPERTY_NAME,
"Silently ignore unknown properties in the connection string/properties (true/false)",
DEFAULT_LENIENT))));
DEFAULT_LENIENT),
ConnectionProperty.createStringProperty(
RPC_PRIORITY_NAME,
"Sets the priority for all RPC invocations from this connection (HIGH/MEDIUM/LOW). The default is HIGH."))));

private static final Set<ConnectionProperty> INTERNAL_PROPERTIES =
Collections.unmodifiableSet(
Expand Down Expand Up @@ -490,6 +497,7 @@ public static Builder newBuilder() {
private final QueryOptions queryOptions;
private final boolean returnCommitStats;
private final boolean autoConfigEmulator;
private final RpcPriority rpcPriority;

private final boolean autocommit;
private final boolean readOnly;
Expand Down Expand Up @@ -533,6 +541,7 @@ private ConnectionOptions(Builder builder) {
this.autoConfigEmulator = parseAutoConfigEmulator(this.uri);
this.usePlainText = this.autoConfigEmulator || parseUsePlainText(this.uri);
this.host = determineHost(matcher, autoConfigEmulator, usePlainText);
this.rpcPriority = parseRPCPriority(this.uri);

this.instanceId = matcher.group(Builder.INSTANCE_GROUP);
this.databaseName = matcher.group(Builder.DATABASE_GROUP);
Expand Down Expand Up @@ -725,6 +734,12 @@ static boolean parseLenient(String uri) {
return value != null ? Boolean.parseBoolean(value) : DEFAULT_LENIENT;
}

@VisibleForTesting
static RpcPriority parseRPCPriority(String uri) {
String value = parseUriProperty(uri, RPC_PRIORITY_NAME);
return value != null ? RpcPriority.valueOf(value) : DEFAULT_RPC_PRIORITY;
}

@VisibleForTesting
static String parseUriProperty(String uri, String property) {
Pattern pattern = Pattern.compile(String.format("(?is)(?:;|\\?)%s=(.*?)(?:;|$)", property));
Expand Down Expand Up @@ -923,6 +938,11 @@ public boolean isAutoConfigEmulator() {
return autoConfigEmulator;
}

/** The {@link RpcPriority} to use for the connection. */
RpcPriority getRPCPriority() {
return rpcPriority;
}

/** Interceptors that should be executed after each statement */
List<StatementExecutionInterceptor> getStatementExecutionInterceptors() {
return statementExecutionInterceptors;
Expand Down
Expand Up @@ -18,6 +18,7 @@

import com.google.cloud.spanner.TimestampBound;
import com.google.protobuf.Duration;
import com.google.spanner.v1.RequestOptions.Priority;

/**
* The Cloud Spanner JDBC driver supports a number of client side statements that are interpreted by
Expand Down Expand Up @@ -97,4 +98,8 @@ interface ConnectionStatementExecutor {
StatementResult statementRunBatch();

StatementResult statementAbortBatch();

StatementResult statementSetRPCPriority(Priority priority);

StatementResult statementShowRPCPriority();
}

0 comments on commit b61a0d4

Please sign in to comment.