Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support RPC priority for JDBC connections and statements #1548

Merged
merged 6 commits into from Nov 15, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 15 additions & 0 deletions google-cloud-spanner/clirr-ignored-differences.xml
Expand Up @@ -615,6 +615,21 @@
<method>void setTransactionTag(java.lang.String)</method>
</difference>

<!-- Support for RPC Priority in Connection API -->
<!-- These are not breaking changes, since we provide default interface implementation -->
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>com.google.cloud.spanner.Options$RpcPriority getRPCPriority()</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>void setRPCPriority(com.google.cloud.spanner.Options$RpcPriority)</method>
</difference>



<!-- Adds getValue to ResultSet -->
<!-- These are not breaking changes, since we provide default interface implementation -->
<difference>
Expand Down
Expand Up @@ -22,6 +22,7 @@
import com.google.api.gax.longrunning.OperationFuture;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.SpannerExceptionFactory;
import com.google.cloud.spanner.SpannerOptions;
Expand Down Expand Up @@ -52,6 +53,7 @@ abstract class AbstractBaseUnitOfWork implements UnitOfWork {
private final StatementExecutor statementExecutor;
private final StatementTimeout statementTimeout;
protected final String transactionTag;
protected final RpcPriority rpcPriority;

/** Class for keeping track of the stacktrace of the caller of an async statement. */
static final class SpannerAsyncExecutionException extends RuntimeException {
Expand Down Expand Up @@ -84,6 +86,7 @@ abstract static class Builder<B extends Builder<?, T>, T extends AbstractBaseUni
private StatementExecutor statementExecutor;
private StatementTimeout statementTimeout = new StatementTimeout();
private String transactionTag;
private RpcPriority rpcPriority;

Builder() {}

Expand All @@ -109,6 +112,11 @@ B setTransactionTag(@Nullable String tag) {
return self();
}

B setRpcPriority(@Nullable RpcPriority rpcPriority) {
this.rpcPriority = rpcPriority;
return self();
}

abstract T build();
}

Expand All @@ -117,6 +125,7 @@ B setTransactionTag(@Nullable String tag) {
this.statementExecutor = builder.statementExecutor;
this.statementTimeout = builder.statementTimeout;
this.transactionTag = builder.transactionTag;
this.rpcPriority = builder.rpcPriority;
}

StatementExecutor getStatementExecutor() {
Expand Down
Expand Up @@ -17,13 +17,15 @@
package com.google.cloud.spanner.connection;

import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.spanner.SpannerExceptionFactory;
import com.google.cloud.spanner.TimestampBound;
import com.google.cloud.spanner.TimestampBound.Mode;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.protobuf.Duration;
import com.google.protobuf.util.Durations;
import com.google.spanner.v1.RequestOptions.Priority;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -240,4 +242,34 @@ public TransactionMode convert(String value) {
return values.get(valueWithSingleSpaces);
}
}

/** Converter for converting strings to {@link RpcPriority} values. */
static class RpcPriorityConverter implements ClientSideStatementValueConverter<Priority> {
private final CaseInsensitiveEnumMap<Priority> values =
new CaseInsensitiveEnumMap<>(Priority.class);
private final Pattern allowedValues;

public RpcPriorityConverter(String allowedValues) {
// Remove the parentheses from the beginning and end.
this.allowedValues =
Pattern.compile(
"(?is)\\A" + allowedValues.substring(1, allowedValues.length() - 1) + "\\z");
}

@Override
public Class<Priority> getParameterClass() {
return Priority.class;
}

@Override
public Priority convert(String value) {
Matcher matcher = allowedValues.matcher(value);
if (matcher.find()) {
if (matcher.group(0).equalsIgnoreCase("null")) {
return Priority.PRIORITY_UNSPECIFIED;
}
}
return values.get("PRIORITY_" + value);
}
}
}
Expand Up @@ -26,6 +26,7 @@
import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.Options.QueryOption;
import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.spanner.ReadContext.QueryAnalyzeMode;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.SpannerBatchUpdateException;
Expand Down Expand Up @@ -532,6 +533,32 @@ default String getOptimizerStatisticsPackage() {
/** @return true if this connection requests commit statistics from Cloud Spanner */
boolean isReturnCommitStats();

/**
* Sets the priority to use for RPCs executed by this connection..
*
* @param rpcPriority The RPC priority to use.
* <ul>
* <li>{@link RpcPriority#HIGH} This specifies that the RPC's invocation will be of high
* priority.
* <li>{@link RpcPriority#MEDIUM} This specifies that the RPC's invocation will be of medium
* priority.
* <li>{@link RpcPriority#LOW} This specifies that the RPC's invocation will be of low
* priority.
* </ul>
*/
default void setRPCPriority(RpcPriority rpcPriority) {
throw new UnsupportedOperationException("Unimplemented");
}

/**
* Gets the current RPC priority of this connection.
*
* @return The RPC priority that is currently used by this connection.
*/
default RpcPriority getRPCPriority() {
throw new UnsupportedOperationException("Unimplemented");
}

/**
* Commits the current transaction of this connection. All mutations that have been buffered
* during the current transaction will be written to the database.
Expand Down
Expand Up @@ -28,6 +28,7 @@
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.Options;
import com.google.cloud.spanner.Options.QueryOption;
import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.spanner.Options.UpdateOption;
import com.google.cloud.spanner.ReadContext.QueryAnalyzeMode;
import com.google.cloud.spanner.ResultSet;
Expand Down Expand Up @@ -208,6 +209,7 @@ static UnitOfWorkType of(TransactionMode transactionMode) {
private AutocommitDmlMode autocommitDmlMode = AutocommitDmlMode.TRANSACTIONAL;
private TimestampBound readOnlyStaleness = TimestampBound.strong();
private QueryOptions queryOptions = QueryOptions.getDefaultInstance();
private RpcPriority rpcPriority = null;
thiagotnunes marked this conversation as resolved.
Show resolved Hide resolved

private String transactionTag;
private String statementTag;
Expand All @@ -227,6 +229,7 @@ static UnitOfWorkType of(TransactionMode transactionMode) {
this.readOnly = options.isReadOnly();
this.autocommit = options.isAutocommit();
this.queryOptions = this.queryOptions.toBuilder().mergeFrom(options.getQueryOptions()).build();
this.rpcPriority = options.getRPCPriority();
this.returnCommitStats = options.isReturnCommitStats();
this.ddlClient = createDdlClient();
setDefaultTransactionOptions();
Expand Down Expand Up @@ -453,6 +456,18 @@ public String getOptimizerStatisticsPackage() {
return this.queryOptions.getOptimizerStatisticsPackage();
}

@Override
public void setRPCPriority(RpcPriority rpcPriority) {
ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG);
this.rpcPriority = rpcPriority;
}

@Override
public RpcPriority getRPCPriority() {
ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG);
return this.rpcPriority;
}

@Override
public void setStatementTimeout(long timeout, TimeUnit unit) {
Preconditions.checkArgument(timeout > 0L, "Zero or negative timeout values are not allowed");
Expand Down Expand Up @@ -1018,6 +1033,19 @@ private QueryOption[] mergeQueryStatementTag(QueryOption... options) {
return options;
}

private QueryOption[] mergeQueryRequestOptions(QueryOption... options) {
if (this.rpcPriority != null) {
// Shortcut for the most common scenario.
if (options == null || options.length == 0) {
options = new QueryOption[] {Options.priority(this.rpcPriority)};
} else {
options = Arrays.copyOf(options, options.length + 1);
options[options.length - 1] = Options.priority(this.rpcPriority);
}
}
return options;
}

private UpdateOption[] mergeUpdateStatementTag(UpdateOption... options) {
if (this.statementTag != null) {
// Shortcut for the most common scenario.
Expand All @@ -1032,6 +1060,19 @@ private UpdateOption[] mergeUpdateStatementTag(UpdateOption... options) {
return options;
}

private UpdateOption[] mergeUpdateRequestOptions(UpdateOption... options) {
if (this.rpcPriority != null) {
// Shortcut for the most common scenario.
if (options == null || options.length == 0) {
options = new UpdateOption[] {Options.priority(this.rpcPriority)};
} else {
options = Arrays.copyOf(options, options.length + 1);
options[options.length - 1] = Options.priority(this.rpcPriority);
}
}
return options;
}

private ResultSet internalExecuteQuery(
final ParsedStatement statement,
final AnalyzeMode analyzeMode,
Expand All @@ -1040,7 +1081,8 @@ private ResultSet internalExecuteQuery(
statement.getType() == StatementType.QUERY, "Statement must be a query");
UnitOfWork transaction = getCurrentUnitOfWorkOrStartNewUnitOfWork();
return get(
transaction.executeQueryAsync(statement, analyzeMode, mergeQueryStatementTag(options)));
transaction.executeQueryAsync(
statement, analyzeMode, mergeQueryRequestOptions(mergeQueryStatementTag(options))));
}

private AsyncResultSet internalExecuteQueryAsync(
Expand All @@ -1051,7 +1093,8 @@ private AsyncResultSet internalExecuteQueryAsync(
statement.getType() == StatementType.QUERY, "Statement must be a query");
UnitOfWork transaction = getCurrentUnitOfWorkOrStartNewUnitOfWork();
return ResultSets.toAsyncResultSet(
transaction.executeQueryAsync(statement, analyzeMode, mergeQueryStatementTag(options)),
transaction.executeQueryAsync(
statement, analyzeMode, mergeQueryRequestOptions(mergeQueryStatementTag(options))),
spanner.getAsyncExecutorProvider(),
options);
}
Expand All @@ -1061,13 +1104,15 @@ private ApiFuture<Long> internalExecuteUpdateAsync(
Preconditions.checkArgument(
update.getType() == StatementType.UPDATE, "Statement must be an update");
UnitOfWork transaction = getCurrentUnitOfWorkOrStartNewUnitOfWork();
return transaction.executeUpdateAsync(update, mergeUpdateStatementTag(options));
return transaction.executeUpdateAsync(
update, mergeUpdateRequestOptions(mergeUpdateStatementTag(options)));
}

private ApiFuture<long[]> internalExecuteBatchUpdateAsync(
List<ParsedStatement> updates, UpdateOption... options) {
UnitOfWork transaction = getCurrentUnitOfWorkOrStartNewUnitOfWork();
return transaction.executeBatchUpdateAsync(updates, mergeUpdateStatementTag(options));
return transaction.executeBatchUpdateAsync(
updates, mergeUpdateRequestOptions(mergeUpdateStatementTag(options)));
}

/**
Expand Down Expand Up @@ -1104,6 +1149,7 @@ UnitOfWork createNewUnitOfWork() {
.setStatementTimeout(statementTimeout)
.withStatementExecutor(statementExecutor)
.setTransactionTag(transactionTag)
.setRpcPriority(rpcPriority)
.build();
case READ_WRITE_TRANSACTION:
return ReadWriteTransaction.newBuilder()
Expand All @@ -1114,6 +1160,7 @@ UnitOfWork createNewUnitOfWork() {
.setStatementTimeout(statementTimeout)
.withStatementExecutor(statementExecutor)
.setTransactionTag(transactionTag)
.setRpcPriority(rpcPriority)
.build();
case DML_BATCH:
// A DML batch can run inside the current transaction. It should therefore only
Expand All @@ -1124,6 +1171,7 @@ UnitOfWork createNewUnitOfWork() {
.setStatementTimeout(statementTimeout)
.withStatementExecutor(statementExecutor)
.setStatementTag(statementTag)
.setRpcPriority(rpcPriority)
.build();
case DDL_BATCH:
return DdlBatch.newBuilder()
Expand Down
Expand Up @@ -25,6 +25,7 @@
import com.google.cloud.ServiceOptions;
import com.google.cloud.spanner.DatabaseId;
import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.spanner.SessionPoolOptions;
import com.google.cloud.spanner.Spanner;
import com.google.cloud.spanner.SpannerException;
Expand Down Expand Up @@ -158,6 +159,7 @@ public String[] getValidValues() {
private static final String DEFAULT_USER_AGENT = null;
private static final String DEFAULT_OPTIMIZER_VERSION = "";
private static final String DEFAULT_OPTIMIZER_STATISTICS_PACKAGE = "";
private static final RpcPriority DEFAULT_RPC_PRIORITY = null;
private static final boolean DEFAULT_RETURN_COMMIT_STATS = false;
private static final boolean DEFAULT_LENIENT = false;

Expand Down Expand Up @@ -196,6 +198,8 @@ public String[] getValidValues() {
"optimizerStatisticsPackage";
/** Name of the 'lenientMode' connection property. */
public static final String LENIENT_PROPERTY_NAME = "lenient";
/** Name of the 'rpcPriority' connection property. */
public static final String RPC_PRIORITY_NAME = "rpcPriority";

/** All valid connection properties. */
public static final Set<ConnectionProperty> VALID_PROPERTIES =
Expand Down Expand Up @@ -252,7 +256,10 @@ public String[] getValidValues() {
ConnectionProperty.createBooleanProperty(
LENIENT_PROPERTY_NAME,
"Silently ignore unknown properties in the connection string/properties (true/false)",
DEFAULT_LENIENT))));
DEFAULT_LENIENT),
ConnectionProperty.createStringProperty(
RPC_PRIORITY_NAME,
"Sets the priority for all RPC invocations from this connection (HIGH/MEDIUM/LOW). The default is HIGH."))));

private static final Set<ConnectionProperty> INTERNAL_PROPERTIES =
Collections.unmodifiableSet(
Expand Down Expand Up @@ -490,6 +497,7 @@ public static Builder newBuilder() {
private final QueryOptions queryOptions;
private final boolean returnCommitStats;
private final boolean autoConfigEmulator;
private final RpcPriority rpcPriority;

private final boolean autocommit;
private final boolean readOnly;
Expand Down Expand Up @@ -533,6 +541,7 @@ private ConnectionOptions(Builder builder) {
this.autoConfigEmulator = parseAutoConfigEmulator(this.uri);
this.usePlainText = this.autoConfigEmulator || parseUsePlainText(this.uri);
this.host = determineHost(matcher, autoConfigEmulator, usePlainText);
this.rpcPriority = parseRPCPriority(this.uri);

this.instanceId = matcher.group(Builder.INSTANCE_GROUP);
this.databaseName = matcher.group(Builder.DATABASE_GROUP);
Expand Down Expand Up @@ -725,6 +734,12 @@ static boolean parseLenient(String uri) {
return value != null ? Boolean.parseBoolean(value) : DEFAULT_LENIENT;
}

@VisibleForTesting
static RpcPriority parseRPCPriority(String uri) {
String value = parseUriProperty(uri, RPC_PRIORITY_NAME);
return value != null ? RpcPriority.valueOf(value) : DEFAULT_RPC_PRIORITY;
}

@VisibleForTesting
static String parseUriProperty(String uri, String property) {
Pattern pattern = Pattern.compile(String.format("(?is)(?:;|\\?)%s=(.*?)(?:;|$)", property));
Expand Down Expand Up @@ -923,6 +938,11 @@ public boolean isAutoConfigEmulator() {
return autoConfigEmulator;
}

/** The {@link RpcPriority} to use for the connection. */
RpcPriority getRPCPriority() {
return rpcPriority;
}

/** Interceptors that should be executed after each statement */
List<StatementExecutionInterceptor> getStatementExecutionInterceptors() {
return statementExecutionInterceptors;
Expand Down
Expand Up @@ -18,6 +18,7 @@

import com.google.cloud.spanner.TimestampBound;
import com.google.protobuf.Duration;
import com.google.spanner.v1.RequestOptions.Priority;

/**
* The Cloud Spanner JDBC driver supports a number of client side statements that are interpreted by
Expand Down Expand Up @@ -97,4 +98,8 @@ interface ConnectionStatementExecutor {
StatementResult statementRunBatch();

StatementResult statementAbortBatch();

StatementResult statementSetRPCPriority(Priority priority);

StatementResult statementShowRPCPriority();
}