Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support RPC priority for JDBC connections and statements #1548

Merged
merged 6 commits into from Nov 15, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 15 additions & 0 deletions google-cloud-spanner/clirr-ignored-differences.xml
Expand Up @@ -615,6 +615,21 @@
<method>void setTransactionTag(java.lang.String)</method>
</difference>

<!-- Support for RPC Priority in Connection API -->
<!-- These are not breaking changes, since we provide default interface implementation -->
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>com.google.cloud.spanner.Options$RpcPriority getRPCPriority()</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>void setRPCPriority(com.google.cloud.spanner.Options$RpcPriority)</method>
</difference>



<!-- Adds getValue to ResultSet -->
<!-- These are not breaking changes, since we provide default interface implementation -->
<difference>
Expand Down
Expand Up @@ -17,6 +17,7 @@
package com.google.cloud.spanner.connection;

import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.spanner.SpannerExceptionFactory;
import com.google.cloud.spanner.TimestampBound;
import com.google.cloud.spanner.TimestampBound.Mode;
Expand Down Expand Up @@ -240,4 +241,22 @@ public TransactionMode convert(String value) {
return values.get(valueWithSingleSpaces);
}
}

/** Converter for converting strings to {@link RpcPriority} values. */
static class RpcPriorityConverter implements ClientSideStatementValueConverter<RpcPriority> {
private final CaseInsensitiveEnumMap<RpcPriority> values =
new CaseInsensitiveEnumMap<>(RpcPriority.class);

public RpcPriorityConverter(String allowedValues) {}

@Override
public Class<RpcPriority> getParameterClass() {
return RpcPriority.class;
}

@Override
public RpcPriority convert(String value) {
return values.get(value);
}
}
}
Expand Up @@ -26,6 +26,7 @@
import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.Options.QueryOption;
import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.spanner.ReadContext.QueryAnalyzeMode;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.SpannerBatchUpdateException;
Expand Down Expand Up @@ -532,6 +533,26 @@ default String getOptimizerStatisticsPackage() {
/** @return true if this connection requests commit statistics from Cloud Spanner */
boolean isReturnCommitStats();

/**
* Sets the priority to use for RPCs executed by this connection..
*
* @param rpcPriority The RPC priority to use. Must be a string from (HIGH/MEDIUM/LOW). The empty
thiagotnunes marked this conversation as resolved.
Show resolved Hide resolved
* string will instruct the connection to use the priority set in the connection URL. If none
* is set, the default rpcPriority of Cloud Spanner is used.
*/
default void setRPCPriority(RpcPriority rpcPriority) {
throw new UnsupportedOperationException("Unimplemented");
}

/**
* Gets the current RPC priority of this connection.
*
* @return The RPC priority that is currently used by this connection.
*/
default RpcPriority getRPCPriority() {
throw new UnsupportedOperationException("Unimplemented");
}

/**
* Commits the current transaction of this connection. All mutations that have been buffered
* during the current transaction will be written to the database.
Expand Down
Expand Up @@ -28,6 +28,7 @@
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.Options;
import com.google.cloud.spanner.Options.QueryOption;
import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.spanner.Options.UpdateOption;
import com.google.cloud.spanner.ReadContext.QueryAnalyzeMode;
import com.google.cloud.spanner.ResultSet;
Expand Down Expand Up @@ -208,6 +209,7 @@ static UnitOfWorkType of(TransactionMode transactionMode) {
private AutocommitDmlMode autocommitDmlMode = AutocommitDmlMode.TRANSACTIONAL;
private TimestampBound readOnlyStaleness = TimestampBound.strong();
private QueryOptions queryOptions = QueryOptions.getDefaultInstance();
private RpcPriority rpcPriority = RpcPriority.HIGH;
thiagotnunes marked this conversation as resolved.
Show resolved Hide resolved

private String transactionTag;
private String statementTag;
Expand All @@ -227,6 +229,7 @@ static UnitOfWorkType of(TransactionMode transactionMode) {
this.readOnly = options.isReadOnly();
this.autocommit = options.isAutocommit();
this.queryOptions = this.queryOptions.toBuilder().mergeFrom(options.getQueryOptions()).build();
this.rpcPriority = options.getRPCPriority();
this.returnCommitStats = options.isReturnCommitStats();
this.ddlClient = createDdlClient();
setDefaultTransactionOptions();
Expand Down Expand Up @@ -453,6 +456,19 @@ public String getOptimizerStatisticsPackage() {
return this.queryOptions.getOptimizerStatisticsPackage();
}

@Override
public void setRPCPriority(RpcPriority rpcPriority) {
Preconditions.checkNotNull(rpcPriority);
thiagotnunes marked this conversation as resolved.
Show resolved Hide resolved
ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG);
this.rpcPriority = rpcPriority;
}

@Override
public RpcPriority getRPCPriority() {
ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG);
return this.rpcPriority;
}

@Override
public void setStatementTimeout(long timeout, TimeUnit unit) {
Preconditions.checkArgument(timeout > 0L, "Zero or negative timeout values are not allowed");
Expand Down Expand Up @@ -1008,10 +1024,11 @@ private QueryOption[] mergeQueryStatementTag(QueryOption... options) {
if (this.statementTag != null) {
thiagotnunes marked this conversation as resolved.
Show resolved Hide resolved
// Shortcut for the most common scenario.
if (options == null || options.length == 0) {
options = new QueryOption[] {Options.tag(statementTag)};
options = new QueryOption[] {Options.tag(statementTag), Options.priority(this.rpcPriority)};
} else {
options = Arrays.copyOf(options, options.length + 1);
options[options.length - 1] = Options.tag(statementTag);
options = Arrays.copyOf(options, options.length + 2);
options[options.length - 2] = Options.tag(statementTag);
options[options.length - 1] = Options.priority(this.rpcPriority);
}
this.statementTag = null;
}
Expand All @@ -1022,10 +1039,12 @@ private UpdateOption[] mergeUpdateStatementTag(UpdateOption... options) {
if (this.statementTag != null) {
// Shortcut for the most common scenario.
if (options == null || options.length == 0) {
options = new UpdateOption[] {Options.tag(statementTag)};
options =
new UpdateOption[] {Options.tag(statementTag), Options.priority(this.rpcPriority)};
} else {
options = Arrays.copyOf(options, options.length + 1);
options[options.length - 1] = Options.tag(statementTag);
options = Arrays.copyOf(options, options.length + 2);
options[options.length - 2] = Options.tag(statementTag);
options[options.length - 1] = Options.priority(this.rpcPriority);
}
this.statementTag = null;
}
Expand Down
Expand Up @@ -25,6 +25,7 @@
import com.google.cloud.ServiceOptions;
import com.google.cloud.spanner.DatabaseId;
import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.spanner.SessionPoolOptions;
import com.google.cloud.spanner.Spanner;
import com.google.cloud.spanner.SpannerException;
Expand Down Expand Up @@ -158,6 +159,7 @@ public String[] getValidValues() {
private static final String DEFAULT_USER_AGENT = null;
private static final String DEFAULT_OPTIMIZER_VERSION = "";
private static final String DEFAULT_OPTIMIZER_STATISTICS_PACKAGE = "";
private static final RpcPriority DEFAULT_RPC_PRIORITY = RpcPriority.HIGH;
thiagotnunes marked this conversation as resolved.
Show resolved Hide resolved
private static final boolean DEFAULT_RETURN_COMMIT_STATS = false;
private static final boolean DEFAULT_LENIENT = false;

Expand Down Expand Up @@ -196,6 +198,8 @@ public String[] getValidValues() {
"optimizerStatisticsPackage";
/** Name of the 'lenientMode' connection property. */
public static final String LENIENT_PROPERTY_NAME = "lenient";
/** Name of the 'rpcPriority' connection property. */
public static final String RPC_PRIORITY_NAME = "rpcPriority";

/** All valid connection properties. */
public static final Set<ConnectionProperty> VALID_PROPERTIES =
Expand Down Expand Up @@ -252,7 +256,10 @@ public String[] getValidValues() {
ConnectionProperty.createBooleanProperty(
LENIENT_PROPERTY_NAME,
"Silently ignore unknown properties in the connection string/properties (true/false)",
DEFAULT_LENIENT))));
DEFAULT_LENIENT),
ConnectionProperty.createStringProperty(
RPC_PRIORITY_NAME,
"Sets the priority for all RPC invocations from this connection (HIGH/MEDIUM/LOW). The default is HIGH."))));

private static final Set<ConnectionProperty> INTERNAL_PROPERTIES =
Collections.unmodifiableSet(
Expand Down Expand Up @@ -490,6 +497,7 @@ public static Builder newBuilder() {
private final QueryOptions queryOptions;
private final boolean returnCommitStats;
private final boolean autoConfigEmulator;
private final RpcPriority rpcPriority;

private final boolean autocommit;
private final boolean readOnly;
Expand Down Expand Up @@ -533,6 +541,7 @@ private ConnectionOptions(Builder builder) {
this.autoConfigEmulator = parseAutoConfigEmulator(this.uri);
this.usePlainText = this.autoConfigEmulator || parseUsePlainText(this.uri);
this.host = determineHost(matcher, autoConfigEmulator, usePlainText);
this.rpcPriority = parseRPCPriority(this.uri);

this.instanceId = matcher.group(Builder.INSTANCE_GROUP);
this.databaseName = matcher.group(Builder.DATABASE_GROUP);
Expand Down Expand Up @@ -725,6 +734,12 @@ static boolean parseLenient(String uri) {
return value != null ? Boolean.parseBoolean(value) : DEFAULT_LENIENT;
}

@VisibleForTesting
static RpcPriority parseRPCPriority(String uri) {
String value = parseUriProperty(uri, RPC_PRIORITY_NAME);
return value != null ? RpcPriority.valueOf(value) : DEFAULT_RPC_PRIORITY;
}

@VisibleForTesting
static String parseUriProperty(String uri, String property) {
Pattern pattern = Pattern.compile(String.format("(?is)(?:;|\\?)%s=(.*?)(?:;|$)", property));
Expand Down Expand Up @@ -923,6 +938,11 @@ public boolean isAutoConfigEmulator() {
return autoConfigEmulator;
}

/** The {@link RpcPriority} to use for the connection. */
RpcPriority getRPCPriority() {
return rpcPriority;
}

/** Interceptors that should be executed after each statement */
List<StatementExecutionInterceptor> getStatementExecutionInterceptors() {
return statementExecutionInterceptors;
Expand Down
Expand Up @@ -16,6 +16,7 @@

package com.google.cloud.spanner.connection;

import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.spanner.TimestampBound;
import com.google.protobuf.Duration;

Expand Down Expand Up @@ -97,4 +98,8 @@ interface ConnectionStatementExecutor {
StatementResult statementRunBatch();

StatementResult statementAbortBatch();

StatementResult statementSetRPCPriority(RpcPriority priority);

StatementResult statementShowRPCPriority();
}
Expand Up @@ -29,6 +29,7 @@
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SET_READ_ONLY_STALENESS;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SET_RETRY_ABORTS_INTERNALLY;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SET_RETURN_COMMIT_STATS;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SET_RPC_PRIORITY;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SET_STATEMENT_TAG;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SET_STATEMENT_TIMEOUT;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SET_TRANSACTION_MODE;
Expand All @@ -44,6 +45,7 @@
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SHOW_READ_TIMESTAMP;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SHOW_RETRY_ABORTS_INTERNALLY;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SHOW_RETURN_COMMIT_STATS;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SHOW_RPC_PRIORITY;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SHOW_STATEMENT_TAG;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SHOW_STATEMENT_TIMEOUT;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SHOW_TRANSACTION_TAG;
Expand All @@ -54,6 +56,7 @@

import com.google.cloud.spanner.CommitResponse;
import com.google.cloud.spanner.CommitStats;
import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.ResultSets;
import com.google.cloud.spanner.Struct;
Expand Down Expand Up @@ -338,4 +341,15 @@ public StatementResult statementAbortBatch() {
getConnection().abortBatch();
return noResult(ABORT_BATCH);
}

@Override
public StatementResult statementSetRPCPriority(RpcPriority rpcPriority) {
getConnection().setRPCPriority(rpcPriority);
return noResult(SET_RPC_PRIORITY);
}

@Override
public StatementResult statementShowRPCPriority() {
return resultSet("RPC_PRIORITY", getConnection().getRPCPriority(), SHOW_RPC_PRIORITY);
}
}
Expand Up @@ -80,7 +80,9 @@ enum ClientSideStatementType {
START_BATCH_DDL,
START_BATCH_DML,
RUN_BATCH,
ABORT_BATCH
ABORT_BATCH,
SET_RPC_PRIORITY,
SHOW_RPC_PRIORITY
}

/**
Expand Down
Expand Up @@ -117,6 +117,14 @@
"method": "statementShowTransactionTag",
"exampleStatements": ["show variable transaction_tag"]
},
{
"name": "SHOW VARIABLE RPC_PRIORITY",
"executorName": "ClientSideStatementNoParamExecutor",
"resultType": "RESULT_SET",
"regex": "(?is)\\A\\s*show\\s+variable\\s+rpc_priority\\s*\\z",
"method": "statementShowRPCPriority",
"exampleStatements": ["show variable rpc_priority"]
},
{
"name": "BEGIN TRANSACTION",
"executorName": "ClientSideStatementNoParamExecutor",
Expand Down Expand Up @@ -361,6 +369,23 @@
"allowedValues": "'(([a-zA-Z][a-zA-Z0-9_\\-]{1,63})|())'",
"converterName": "ClientSideStatementValueConverters$StringValueConverter"
}
},
{
"name": "SET RPC_PRIORITY = 'HIGH'|'MEDIUM'|'LOW'",
thiagotnunes marked this conversation as resolved.
Show resolved Hide resolved
"executorName": "ClientSideStatementSetExecutor",
"resultType": "NO_RESULT",
"regex": "(?is)\\A\\s*set\\s+rpc_priority\\s*(?:=)\\s*(.*)\\z",
"method": "statementSetRPCPriority",
"exampleStatements": [
"set rpc_priority='HIGH'",
"set rpc_priority='MEDIUM'"
],
"setStatement": {
"propertyName": "RPC_PRIORITY",
"separator": "=",
"allowedValues": "'(HIGH|MEDIUM|LOW)'",
thiagotnunes marked this conversation as resolved.
Show resolved Hide resolved
"converterName": "ClientSideStatementValueConverters$RpcPriorityConverter"
}
}
]
}