Skip to content

Commit

Permalink
feat: add backend query options (#90)
Browse files Browse the repository at this point in the history
* feat: add backend query options

Adds support for setting QueryOptions that will be used by the backend
to execute queries.

* fix: set QueryOptions on Statement

QueryOptions should be an option on a Statement instead of a parameter
to the executeQuery method. By setting these options on a Statement, it
is possible to use it with analyzeQuery as well.

* feat: add toBuilder() method to Statement

* fix: code review comments

* fix: remove unused interface and class
  • Loading branch information
olavloite committed Mar 12, 2020
1 parent 57f5fd0 commit e96e172
Show file tree
Hide file tree
Showing 17 changed files with 983 additions and 85 deletions.
Expand Up @@ -30,11 +30,13 @@
import com.google.cloud.spanner.Options.ReadOption;
import com.google.cloud.spanner.SessionImpl.SessionTransaction;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import com.google.spanner.v1.BeginTransactionRequest;
import com.google.spanner.v1.ExecuteBatchDmlRequest;
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.ExecuteSqlRequest.QueryMode;
import com.google.spanner.v1.ExecuteSqlRequest.QueryOptions;
import com.google.spanner.v1.PartialResultSet;
import com.google.spanner.v1.ReadRequest;
import com.google.spanner.v1.Transaction;
Expand All @@ -53,20 +55,86 @@
*/
abstract class AbstractReadContext
implements ReadContext, AbstractResultSet.Listener, SessionTransaction {

abstract static class Builder<B extends Builder<?, T>, T extends AbstractReadContext> {
private SessionImpl session;
private SpannerRpc rpc;
private Span span = Tracing.getTracer().getCurrentSpan();
private int defaultPrefetchChunks = SpannerOptions.Builder.DEFAULT_PREFETCH_CHUNKS;
private QueryOptions defaultQueryOptions = SpannerOptions.Builder.DEFAULT_QUERY_OPTIONS;

Builder() {}

@SuppressWarnings("unchecked")
B self() {
return (B) this;
}

B setSession(SessionImpl session) {
this.session = session;
return self();
}

B setRpc(SpannerRpc rpc) {
this.rpc = rpc;
return self();
}

B setSpan(Span span) {
this.span = span;
return self();
}

B setDefaultPrefetchChunks(int defaultPrefetchChunks) {
this.defaultPrefetchChunks = defaultPrefetchChunks;
return self();
}

B setDefaultQueryOptions(QueryOptions defaultQueryOptions) {
this.defaultQueryOptions = defaultQueryOptions;
return self();
}

abstract T build();
}

/**
* A {@code ReadContext} for standalone reads. This can only be used for a single operation, since
* each standalone read may see a different timestamp of Cloud Spanner data.
*/
static class SingleReadContext extends AbstractReadContext {
static class Builder extends AbstractReadContext.Builder<Builder, SingleReadContext> {
private TimestampBound bound;

private Builder() {}

Builder setTimestampBound(TimestampBound bound) {
this.bound = bound;
return self();
}

@Override
SingleReadContext build() {
return new SingleReadContext(this);
}

SingleUseReadOnlyTransaction buildSingleUseReadOnlyTransaction() {
return new SingleUseReadOnlyTransaction(this);
}
}

static Builder newBuilder() {
return new Builder();
}

final TimestampBound bound;

@GuardedBy("lock")
private boolean used;

SingleReadContext(
SessionImpl session, TimestampBound bound, SpannerRpc rpc, int defaultPrefetchChunks) {
super(session, rpc, defaultPrefetchChunks);
this.bound = bound;
private SingleReadContext(Builder builder) {
super(builder);
this.bound = builder.bound;
}

@GuardedBy("lock")
Expand Down Expand Up @@ -99,9 +167,8 @@ static class SingleUseReadOnlyTransaction extends SingleReadContext
@GuardedBy("lock")
private Timestamp timestamp;

SingleUseReadOnlyTransaction(
SessionImpl session, TimestampBound bound, SpannerRpc rpc, int defaultPrefetchChunks) {
super(session, bound, rpc, defaultPrefetchChunks);
private SingleUseReadOnlyTransaction(SingleReadContext.Builder builder) {
super(builder);
}

@Override
Expand Down Expand Up @@ -139,6 +206,38 @@ public void onTransactionMetadata(Transaction transaction) {

static class MultiUseReadOnlyTransaction extends AbstractReadContext
implements ReadOnlyTransaction {
static class Builder extends AbstractReadContext.Builder<Builder, MultiUseReadOnlyTransaction> {
private TimestampBound bound;
private Timestamp timestamp;
private ByteString transactionId;

private Builder() {}

Builder setTimestampBound(TimestampBound bound) {
this.bound = bound;
return this;
}

Builder setTimestamp(Timestamp timestamp) {
this.timestamp = timestamp;
return this;
}

Builder setTransactionId(ByteString transactionId) {
this.transactionId = transactionId;
return this;
}

@Override
MultiUseReadOnlyTransaction build() {
return new MultiUseReadOnlyTransaction(this);
}
}

static Builder newBuilder() {
return new Builder();
}

private TimestampBound bound;
private final Object txnLock = new Object();

Expand All @@ -148,27 +247,24 @@ static class MultiUseReadOnlyTransaction extends AbstractReadContext
@GuardedBy("txnLock")
private ByteString transactionId;

MultiUseReadOnlyTransaction(
SessionImpl session, TimestampBound bound, SpannerRpc rpc, int defaultPrefetchChunks) {
super(session, rpc, defaultPrefetchChunks);
MultiUseReadOnlyTransaction(Builder builder) {
super(builder);
checkArgument(
bound.getMode() != TimestampBound.Mode.MAX_STALENESS
&& bound.getMode() != TimestampBound.Mode.MIN_READ_TIMESTAMP,
"Bounded staleness mode %s is not supported for multi-use read-only transactions."
+ " Create a single-use read or read-only transaction instead.",
bound.getMode());
this.bound = bound;
}

MultiUseReadOnlyTransaction(
SessionImpl session,
ByteString transactionId,
Timestamp timestamp,
SpannerRpc rpc,
int defaultPrefetchChunks) {
super(session, rpc, defaultPrefetchChunks);
this.transactionId = transactionId;
this.timestamp = timestamp;
!(builder.bound != null && builder.transactionId != null)
&& !(builder.bound == null && builder.transactionId == null),
"Either TimestampBound or TransactionId must be specified");
if (builder.bound != null) {
checkArgument(
builder.bound.getMode() != TimestampBound.Mode.MAX_STALENESS
&& builder.bound.getMode() != TimestampBound.Mode.MIN_READ_TIMESTAMP,
"Bounded staleness mode %s is not supported for multi-use read-only transactions."
+ " Create a single-use read or read-only transaction instead.",
builder.bound.getMode());
this.bound = builder.bound;
} else {
this.timestamp = builder.timestamp;
this.transactionId = builder.transactionId;
}
}

@Override
Expand Down Expand Up @@ -256,6 +352,7 @@ void initTransaction() {
final SpannerRpc rpc;
final Span span;
private final int defaultPrefetchChunks;
private final QueryOptions defaultQueryOptions;

@GuardedBy("lock")
private boolean isValid = true;
Expand All @@ -271,16 +368,12 @@ void initTransaction() {
// much more frequently.
private static final int MAX_BUFFERED_CHUNKS = 512;

AbstractReadContext(SessionImpl session, SpannerRpc rpc, int defaultPrefetchChunks) {
this(session, rpc, defaultPrefetchChunks, Tracing.getTracer().getCurrentSpan());
}

private AbstractReadContext(
SessionImpl session, SpannerRpc rpc, int defaultPrefetchChunks, Span span) {
this.session = session;
this.rpc = rpc;
this.defaultPrefetchChunks = defaultPrefetchChunks;
this.span = span;
AbstractReadContext(Builder<?, ?> builder) {
this.session = builder.session;
this.rpc = builder.rpc;
this.defaultPrefetchChunks = builder.defaultPrefetchChunks;
this.defaultQueryOptions = builder.defaultQueryOptions;
this.span = builder.span;
}

long getSeqNo() {
Expand Down Expand Up @@ -341,9 +434,36 @@ private ResultSet executeQueryInternal(
Statement statement,
com.google.spanner.v1.ExecuteSqlRequest.QueryMode queryMode,
QueryOption... options) {
Options readOptions = Options.fromQueryOptions(options);
Options queryOptions = Options.fromQueryOptions(options);
return executeQueryInternalWithOptions(
statement, queryMode, readOptions, null /*partitionToken*/);
statement, queryMode, queryOptions, null /*partitionToken*/);
}

/**
* Determines the {@link QueryOptions} to use for a query. This is determined using the following
* precedence:
*
* <ol>
* <li>Specific {@link QueryOptions} passed in for this query.
* <li>Any value specified in a valid environment variable when the {@link SpannerOptions}
* instance was created.
* <li>The default {@link SpannerOptions#getDefaultQueryOptions()} specified for the database
* where the query is executed.
* </ol>
*/
@VisibleForTesting
QueryOptions buildQueryOptions(QueryOptions requestOptions) {
// Shortcut for the most common return value.
if (defaultQueryOptions.equals(QueryOptions.getDefaultInstance()) && requestOptions == null) {
return QueryOptions.getDefaultInstance();
}
// Create a builder based on the default query options.
QueryOptions.Builder builder = defaultQueryOptions.toBuilder();
// Then overwrite with specific options for this query.
if (requestOptions != null) {
builder.mergeFrom(requestOptions);
}
return builder.build();
}

ExecuteSqlRequest.Builder getExecuteSqlRequestBuilder(Statement statement, QueryMode queryMode) {
Expand All @@ -365,6 +485,7 @@ ExecuteSqlRequest.Builder getExecuteSqlRequestBuilder(Statement statement, Query
builder.setTransaction(selector);
}
builder.setSeqno(getSeqNo());
builder.setQueryOptions(buildQueryOptions(statement.getQueryOptions()));
return builder;
}

Expand Down Expand Up @@ -400,15 +521,15 @@ ExecuteBatchDmlRequest.Builder getExecuteBatchDmlRequestBuilder(Iterable<Stateme
ResultSet executeQueryInternalWithOptions(
Statement statement,
com.google.spanner.v1.ExecuteSqlRequest.QueryMode queryMode,
Options readOptions,
Options options,
ByteString partitionToken) {
beforeReadOrQuery();
final ExecuteSqlRequest.Builder request = getExecuteSqlRequestBuilder(statement, queryMode);
if (partitionToken != null) {
request.setPartitionToken(partitionToken);
}
final int prefetchChunks =
readOptions.hasPrefetchChunks() ? readOptions.prefetchChunks() : defaultPrefetchChunks;
options.hasPrefetchChunks() ? options.prefetchChunks() : defaultPrefetchChunks;
ResumableStreamIterator stream =
new ResumableStreamIterator(MAX_BUFFERED_CHUNKS, SpannerImpl.QUERY, span) {
@Override
Expand Down
Expand Up @@ -45,41 +45,48 @@ public class BatchClientImpl implements BatchClient {
public BatchReadOnlyTransaction batchReadOnlyTransaction(TimestampBound bound) {
SessionImpl session = sessionClient.createSession();
return new BatchReadOnlyTransactionImpl(
sessionClient.getSpanner(), session, checkNotNull(bound));
MultiUseReadOnlyTransaction.newBuilder()
.setSession(session)
.setRpc(sessionClient.getSpanner().getRpc())
.setTimestampBound(bound)
.setDefaultQueryOptions(
sessionClient.getSpanner().getDefaultQueryOptions(sessionClient.getDatabaseId()))
.setDefaultPrefetchChunks(sessionClient.getSpanner().getDefaultPrefetchChunks()),
checkNotNull(bound));
}

@Override
public BatchReadOnlyTransaction batchReadOnlyTransaction(BatchTransactionId batchTransactionId) {
SessionImpl session =
sessionClient.sessionWithId(checkNotNull(batchTransactionId).getSessionId());
return new BatchReadOnlyTransactionImpl(
sessionClient.getSpanner(), session, batchTransactionId);
MultiUseReadOnlyTransaction.newBuilder()
.setSession(session)
.setRpc(sessionClient.getSpanner().getRpc())
.setTransactionId(batchTransactionId.getTransactionId())
.setTimestamp(batchTransactionId.getTimestamp())
.setDefaultQueryOptions(
sessionClient.getSpanner().getDefaultQueryOptions(sessionClient.getDatabaseId()))
.setDefaultPrefetchChunks(sessionClient.getSpanner().getDefaultPrefetchChunks()),
batchTransactionId);
}

private static class BatchReadOnlyTransactionImpl extends MultiUseReadOnlyTransaction
implements BatchReadOnlyTransaction {
private final String sessionName;
private final Map<SpannerRpc.Option, ?> options;

BatchReadOnlyTransactionImpl(SpannerImpl spanner, SessionImpl session, TimestampBound bound) {
super(
checkNotNull(session),
checkNotNull(bound),
checkNotNull(spanner).getOptions().getSpannerRpcV1(),
spanner.getOptions().getPrefetchChunks());
BatchReadOnlyTransactionImpl(
MultiUseReadOnlyTransaction.Builder builder, TimestampBound bound) {
super(builder.setTimestampBound(bound));
this.sessionName = session.getName();
this.options = session.getOptions();
initTransaction();
}

BatchReadOnlyTransactionImpl(
SpannerImpl spanner, SessionImpl session, BatchTransactionId batchTransactionId) {
super(
checkNotNull(session),
checkNotNull(batchTransactionId).getTransactionId(),
batchTransactionId.getTimestamp(),
checkNotNull(spanner).getOptions().getSpannerRpcV1(),
spanner.getOptions().getPrefetchChunks());
MultiUseReadOnlyTransaction.Builder builder, BatchTransactionId batchTransactionId) {
super(builder.setTransactionId(batchTransactionId.getTransactionId()));
this.sessionName = session.getName();
this.options = session.getOptions();
}
Expand Down
Expand Up @@ -193,6 +193,10 @@ SpannerImpl getSpanner() {
return spanner;
}

DatabaseId getDatabaseId() {
return db;
}

/** Create a single session. */
SessionImpl createSession() {
// The sessionChannelCounter could overflow, but that will just flip it to Integer.MIN_VALUE,
Expand Down

0 comments on commit e96e172

Please sign in to comment.