Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support RPC priority #676

Merged
merged 7 commits into from Mar 31, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -47,6 +47,7 @@
import com.google.spanner.v1.ExecuteSqlRequest.QueryOptions;
import com.google.spanner.v1.PartialResultSet;
import com.google.spanner.v1.ReadRequest;
import com.google.spanner.v1.RequestOptions;
import com.google.spanner.v1.Transaction;
import com.google.spanner.v1.TransactionOptions;
import com.google.spanner.v1.TransactionSelector;
Expand Down Expand Up @@ -557,6 +558,14 @@ QueryOptions buildQueryOptions(QueryOptions requestOptions) {
return builder.build();
}

RequestOptions buildRequestOptions(Options options) {
RequestOptions.Builder builder = RequestOptions.newBuilder();
if (options.hasPriority()) {
builder.setPriority(options.priority());
}
return builder.build();
}

ExecuteSqlRequest.Builder getExecuteSqlRequestBuilder(
Statement statement, QueryMode queryMode, Options options, boolean withTransactionSelector) {
ExecuteSqlRequest.Builder builder =
Expand All @@ -580,6 +589,7 @@ ExecuteSqlRequest.Builder getExecuteSqlRequestBuilder(
}
builder.setSeqno(getSeqNo());
builder.setQueryOptions(buildQueryOptions(statement.getQueryOptions()));
builder.setRequestOptions(buildRequestOptions(options));
return builder;
}

Expand Down Expand Up @@ -610,6 +620,7 @@ ExecuteBatchDmlRequest.Builder getExecuteBatchDmlRequestBuilder(
builder.setTransaction(selector);
}
builder.setSeqno(getSeqNo());
builder.setRequestOptions(buildRequestOptions(options));
return builder;
}

Expand Down Expand Up @@ -760,6 +771,7 @@ CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken
if (selector != null) {
builder.setTransaction(selector);
}
builder.setRequestOptions(buildRequestOptions(readOptions));
SpannerRpc.StreamingCall call =
rpc.read(builder.build(), stream.consumer(), session.getOptions());
call.request(prefetchChunks);
Expand Down
Expand Up @@ -17,6 +17,7 @@
package com.google.cloud.spanner;

import com.google.cloud.Timestamp;
import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.spanner.Options.TransactionOption;
import com.google.cloud.spanner.Options.UpdateOption;

Expand Down Expand Up @@ -75,9 +76,21 @@ public interface DatabaseClient {
* .set("LastName")
* .to("Joel")
* .build();
* dbClient.writeWithOptions(Collections.singletonList(mutation));
* dbClient.writeWithOptions(
* Collections.singletonList(mutation),
* Options.priority(RpcPriority.HIGH));
* }</pre>
*
* Options for a transaction can include:
*
* <ul>
* <li>{@link Options#priority(com.google.cloud.spanner.Options.RpcPriority)}: The {@link
* RpcPriority} to use for the commit request of the transaction. The priority will not be
* applied to any other requests on the transaction.
* <li>{@link Options#commitStats()}: Request that the server includes commit statistics in the
* {@link CommitResponse}.
* </ul>
*
* @return a response with the timestamp at which the write was committed
*/
CommitResponse writeWithOptions(Iterable<Mutation> mutations, TransactionOption... options)
Expand Down Expand Up @@ -138,9 +151,21 @@ CommitResponse writeWithOptions(Iterable<Mutation> mutations, TransactionOption.
* .set("LastName")
* .to("Joel")
* .build();
* dbClient.writeAtLeastOnce(Collections.singletonList(mutation));
* dbClient.writeAtLeastOnceWithOptions(
* Collections.singletonList(mutation),
* Options.priority(RpcPriority.LOW));
* }</pre>
*
* Options for a transaction can include:
*
* <ul>
* <li>{@link Options#priority(com.google.cloud.spanner.Options.RpcPriority)}: The {@link
* RpcPriority} to use for the commit request of the transaction. The priority will not be
* applied to any other requests on the transaction.
* <li>{@link Options#commitStats()}: Request that the server includes commit statistics in the
* {@link CommitResponse}.
* </ul>
*
* @return a response with the timestamp at which the write was committed
*/
CommitResponse writeAtLeastOnceWithOptions(
Expand Down Expand Up @@ -308,6 +333,16 @@ CommitResponse writeAtLeastOnceWithOptions(
* }
* });
* </code></pre>
*
* Options for a transaction can include:
*
* <ul>
* <li>{@link Options#priority(com.google.cloud.spanner.Options.RpcPriority)}: The {@link
* RpcPriority} to use for the commit request of the transaction. The priority will not be
* applied to any other requests on the transaction.
* <li>{@link Options#commitStats()}: Request that the server includes commit statistics in the
* {@link CommitResponse}.
* </ul>
*/
TransactionRunner readWriteTransaction(TransactionOption... options);

Expand Down Expand Up @@ -338,6 +373,16 @@ CommitResponse writeAtLeastOnceWithOptions(
* }
* }
* }</pre>
*
* Options for a transaction can include:
*
* <ul>
* <li>{@link Options#priority(com.google.cloud.spanner.Options.RpcPriority)}: The {@link
* RpcPriority} to use for the commit request of the transaction. The priority will not be
* applied to any other requests on the transaction.
* <li>{@link Options#commitStats()}: Request that the server includes commit statistics in the
* {@link CommitResponse}.
* </ul>
*/
TransactionManager transactionManager(TransactionOption... options);

Expand Down Expand Up @@ -371,6 +416,16 @@ CommitResponse writeAtLeastOnceWithOptions(
* },
* executor);
* </code></pre>
*
* Options for a transaction can include:
*
* <ul>
* <li>{@link Options#priority(com.google.cloud.spanner.Options.RpcPriority)}: The {@link
* RpcPriority} to use for the commit request of the transaction. The priority will not be
* applied to any other requests on the transaction.
* <li>{@link Options#commitStats()}: Request that the server includes commit statistics in the
* {@link CommitResponse}.
* </ul>
*/
AsyncRunner runAsync(TransactionOption... options);

Expand Down Expand Up @@ -459,6 +514,18 @@ CommitResponse writeAtLeastOnceWithOptions(
* }
* }
* }</pre>
*
* Options for a transaction can include:
*
* <p>Options for a transaction can include:
*
* <ul>
* <li>{@link Options#priority(com.google.cloud.spanner.Options.RpcPriority)}: The {@link
* RpcPriority} to use for the commit request of the transaction. The priority will not be
* applied to any other requests on the transaction.
* <li>{@link Options#commitStats()}: Request that the server includes commit statistics in the
* {@link CommitResponse}.
* </ul>
*/
AsyncTransactionManager transactionManagerAsync(TransactionOption... options);

Expand Down
Expand Up @@ -17,13 +17,30 @@
package com.google.cloud.spanner;

import com.google.common.base.Preconditions;
import com.google.spanner.v1.RequestOptions.Priority;
import java.io.Serializable;
import java.util.Objects;

/** Specifies options for various spanner operations */
public final class Options implements Serializable {
private static final long serialVersionUID = 8067099123096783941L;

/**
* Priority for an RPC invocation. The default priority is {@link #HIGH}. This enum can be used to
* set a lower priority for a specific RPC invocation.
*/
public enum RpcPriority {
LOW(Priority.PRIORITY_LOW),
MEDIUM(Priority.PRIORITY_MEDIUM),
HIGH(Priority.PRIORITY_HIGH);

private final Priority proto;

private RpcPriority(Priority proto) {
this.proto = Preconditions.checkNotNull(proto);
}
}

/** Marker interface to mark options applicable to both Read and Query operations */
public interface ReadAndQueryOption extends ReadOption, QueryOption {}

Expand Down Expand Up @@ -79,6 +96,11 @@ public static ReadAndQueryOption bufferRows(int bufferRows) {
return new BufferRowsOption(bufferRows);
}

/** Specifies the priority to use for the RPC. */
public static ReadQueryUpdateTransactionOption priority(RpcPriority priority) {
thiagotnunes marked this conversation as resolved.
Show resolved Hide resolved
return new PriorityOption(priority);
}

/**
* Specifying this will cause the list operations to fetch at most this many records in a page.
*/
Expand Down Expand Up @@ -158,13 +180,28 @@ void appendToOptions(Options options) {
}
}

static final class PriorityOption extends InternalOption
implements ReadQueryUpdateTransactionOption {
private final RpcPriority priority;

PriorityOption(RpcPriority priority) {
this.priority = priority;
}

@Override
void appendToOptions(Options options) {
options.priority = priority;
}
}

private boolean withCommitStats;
private Long limit;
private Integer prefetchChunks;
private Integer bufferRows;
private Integer pageSize;
private String pageToken;
private String filter;
private RpcPriority priority;

// Construction is via factory methods below.
private Options() {}
Expand Down Expand Up @@ -221,6 +258,14 @@ String filter() {
return filter;
}

boolean hasPriority() {
return priority != null;
}

Priority priority() {
return priority == null ? null : priority.proto;
}

@Override
public String toString() {
StringBuilder b = new StringBuilder();
Expand All @@ -242,6 +287,9 @@ public String toString() {
if (filter != null) {
b.append("filter: ").append(filter).append(' ');
}
if (priority != null) {
b.append("priority: ").append(priority).append(' ');
}
return b.toString();
}

Expand Down Expand Up @@ -271,7 +319,8 @@ public boolean equals(Object o) {
&& (!hasPageSize() && !that.hasPageSize()
|| hasPageSize() && that.hasPageSize() && Objects.equals(pageSize(), that.pageSize()))
&& Objects.equals(pageToken(), that.pageToken())
&& Objects.equals(filter(), that.filter());
&& Objects.equals(filter(), that.filter())
&& Objects.equals(priority(), that.priority());
}

@Override
Expand All @@ -298,6 +347,9 @@ public int hashCode() {
if (filter != null) {
result = 31 * result + filter.hashCode();
}
if (priority != null) {
result = 31 * result + priority.hashCode();
}
return result;
}

Expand Down
Expand Up @@ -26,12 +26,14 @@
import com.google.api.gax.rpc.UnavailableException;
import com.google.cloud.spanner.Options.UpdateOption;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import com.google.common.base.Ticker;
import com.google.protobuf.ByteString;
import com.google.spanner.v1.BeginTransactionRequest;
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.PartialResultSet;
import com.google.spanner.v1.RequestOptions;
import com.google.spanner.v1.Transaction;
import com.google.spanner.v1.TransactionOptions;
import com.google.spanner.v1.TransactionSelector;
Expand Down Expand Up @@ -162,8 +164,8 @@ private ExecuteSqlRequest resumeOrRestartRequest(
}
}

private ExecuteSqlRequest newTransactionRequestFrom(
final Statement statement, final Options options) {
@VisibleForTesting
ExecuteSqlRequest newTransactionRequestFrom(final Statement statement, final Options options) {
ByteString transactionId = initTransaction();

final TransactionSelector transactionSelector =
Expand All @@ -179,6 +181,11 @@ private ExecuteSqlRequest newTransactionRequestFrom(

builder.setResumeToken(ByteString.EMPTY);

if (options.hasPriority()) {
builder.setRequestOptions(
RequestOptions.newBuilder().setPriority(options.priority()).build());
}

return builder.build();
}

Expand Down
Expand Up @@ -37,6 +37,7 @@
import com.google.protobuf.Empty;
import com.google.spanner.v1.BeginTransactionRequest;
import com.google.spanner.v1.CommitRequest;
import com.google.spanner.v1.RequestOptions;
import com.google.spanner.v1.Transaction;
import com.google.spanner.v1.TransactionOptions;
import io.opencensus.common.Scope;
Expand Down Expand Up @@ -160,22 +161,26 @@ public CommitResponse writeAtLeastOnceWithOptions(
Iterable<Mutation> mutations, TransactionOption... transactionOptions)
throws SpannerException {
setActive(null);
Options commitRequestOptions = Options.fromTransactionOptions(transactionOptions);
List<com.google.spanner.v1.Mutation> mutationsProto = new ArrayList<>();
Mutation.toProto(mutations, mutationsProto);
final CommitRequest request =
final CommitRequest.Builder requestBuilder =
CommitRequest.newBuilder()
.setSession(name)
.setReturnCommitStats(
Options.fromTransactionOptions(transactionOptions).withCommitStats())
.addAllMutations(mutationsProto)
.setSingleUseTransaction(
TransactionOptions.newBuilder()
.setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance()))
.build();
.setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance()));
if (commitRequestOptions.hasPriority()) {
requestBuilder.setRequestOptions(
RequestOptions.newBuilder().setPriority(commitRequestOptions.priority()).build());
}
Span span = tracer.spanBuilder(SpannerImpl.COMMIT).startSpan();
try (Scope s = tracer.withSpan(span)) {
com.google.spanner.v1.CommitResponse response =
spanner.getRpc().commit(request, this.options);
spanner.getRpc().commit(requestBuilder.build(), this.options);
return new CommitResponse(response);
} catch (RuntimeException e) {
TraceUtil.setWithFailure(span, e);
Expand Down
Expand Up @@ -43,6 +43,7 @@
import com.google.spanner.v1.ExecuteBatchDmlResponse;
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.ExecuteSqlRequest.QueryMode;
import com.google.spanner.v1.RequestOptions;
import com.google.spanner.v1.ResultSet;
import com.google.spanner.v1.RollbackRequest;
import com.google.spanner.v1.Transaction;
Expand Down Expand Up @@ -298,6 +299,10 @@ ApiFuture<CommitResponse> commitAsync() {
CommitRequest.newBuilder()
.setSession(session.getName())
.setReturnCommitStats(options.withCommitStats());
if (options.hasPriority()) {
builder.setRequestOptions(
RequestOptions.newBuilder().setPriority(options.priority()).build());
}
synchronized (lock) {
if (transactionIdFuture == null && transactionId == null && runningAsyncOperations == 0) {
finishOps = SettableApiFuture.create();
Expand Down Expand Up @@ -344,6 +349,10 @@ public void run() {
requestBuilder.setTransactionId(
transactionId == null ? transactionIdFuture.get() : transactionId);
}
if (options.hasPriority()) {
requestBuilder.setRequestOptions(
RequestOptions.newBuilder().setPriority(options.priority()).build());
}
final CommitRequest commitRequest = requestBuilder.build();
span.addAnnotation("Starting Commit");
final Span opSpan =
Expand Down