Skip to content

Commit

Permalink
feat: add support for tagging (#576)
Browse files Browse the repository at this point in the history
  • Loading branch information
mayurkale22 committed Apr 6, 2021
1 parent 6a58433 commit 2a9086f
Show file tree
Hide file tree
Showing 10 changed files with 608 additions and 38 deletions.
Expand Up @@ -563,6 +563,12 @@ RequestOptions buildRequestOptions(Options options) {
if (options.hasPriority()) {
builder.setPriority(options.priority());
}
if (options.hasTag()) {
builder.setRequestTag(options.tag());
}
if (getTransactionTag() != null) {
builder.setTransactionTag(getTransactionTag());
}
return builder.build();
}

Expand Down Expand Up @@ -707,6 +713,15 @@ public void close() {
@Nullable
abstract TransactionSelector getTransactionSelector();

/**
* Returns the transaction tag for this {@link AbstractReadContext} or <code>null</code> if this
* {@link AbstractReadContext} does not have a transaction tag.
*/
@Nullable
String getTransactionTag() {
return null;
}

/** This method is called when a statement returned a new transaction as part of its results. */
@Override
public void onTransactionMetadata(Transaction transaction, boolean shouldIncludeId) {}
Expand Down
Expand Up @@ -101,6 +101,14 @@ public static ReadQueryUpdateTransactionOption priority(RpcPriority priority) {
return new PriorityOption(priority);
}

/**
* Specifying this will cause the reads, queries, updates and writes operations statistics
* collection to be grouped by tag.
*/
public static ReadQueryUpdateTransactionOption tag(String name) {
return new TagOption(name);
}

/**
* Specifying this will cause the list operations to fetch at most this many records in a page.
*/
Expand Down Expand Up @@ -194,6 +202,19 @@ void appendToOptions(Options options) {
}
}

static final class TagOption extends InternalOption implements ReadQueryUpdateTransactionOption {
private final String tag;

TagOption(String tag) {
this.tag = tag;
}

@Override
void appendToOptions(Options options) {
options.tag = tag;
}
}

private boolean withCommitStats;
private Long limit;
private Integer prefetchChunks;
Expand All @@ -202,6 +223,7 @@ void appendToOptions(Options options) {
private String pageToken;
private String filter;
private RpcPriority priority;
private String tag;

// Construction is via factory methods below.
private Options() {}
Expand Down Expand Up @@ -266,6 +288,14 @@ Priority priority() {
return priority == null ? null : priority.proto;
}

boolean hasTag() {
return tag != null;
}

String tag() {
return tag;
}

@Override
public String toString() {
StringBuilder b = new StringBuilder();
Expand All @@ -290,6 +320,9 @@ public String toString() {
if (priority != null) {
b.append("priority: ").append(priority).append(' ');
}
if (tag != null) {
b.append("tag: ").append(tag).append(' ');
}
return b.toString();
}

Expand Down Expand Up @@ -320,7 +353,8 @@ public boolean equals(Object o) {
|| hasPageSize() && that.hasPageSize() && Objects.equals(pageSize(), that.pageSize()))
&& Objects.equals(pageToken(), that.pageToken())
&& Objects.equals(filter(), that.filter())
&& Objects.equals(priority(), that.priority());
&& Objects.equals(priority(), that.priority())
&& Objects.equals(tag(), that.tag());
}

@Override
Expand Down Expand Up @@ -350,6 +384,9 @@ public int hashCode() {
if (priority != null) {
result = 31 * result + priority.hashCode();
}
if (tag != null) {
result = 31 * result + tag.hashCode();
}
return result;
}

Expand Down
Expand Up @@ -181,11 +181,16 @@ ExecuteSqlRequest newTransactionRequestFrom(final Statement statement, final Opt

builder.setResumeToken(ByteString.EMPTY);

if (options.hasPriority()) {
builder.setRequestOptions(
RequestOptions.newBuilder().setPriority(options.priority()).build());
if (options.hasPriority() || options.hasTag()) {
RequestOptions.Builder requestOptionsBuilder = RequestOptions.newBuilder();
if (options.hasPriority()) {
requestOptionsBuilder.setPriority(options.priority());
}
if (options.hasTag()) {
requestOptionsBuilder.setRequestTag(options.tag());
}
builder.setRequestOptions(requestOptionsBuilder.build());
}

return builder.build();
}

Expand Down
Expand Up @@ -173,9 +173,15 @@ public CommitResponse writeAtLeastOnceWithOptions(
.setSingleUseTransaction(
TransactionOptions.newBuilder()
.setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance()));
if (commitRequestOptions.hasPriority()) {
requestBuilder.setRequestOptions(
RequestOptions.newBuilder().setPriority(commitRequestOptions.priority()).build());
if (commitRequestOptions.hasPriority() || commitRequestOptions.hasTag()) {
RequestOptions.Builder requestOptionsBuilder = RequestOptions.newBuilder();
if (commitRequestOptions.hasPriority()) {
requestOptionsBuilder.setPriority(commitRequestOptions.priority());
}
if (commitRequestOptions.hasTag()) {
requestOptionsBuilder.setTransactionTag(commitRequestOptions.tag());
}
requestBuilder.setRequestOptions(requestOptionsBuilder.build());
}
Span span = tracer.spanBuilder(SpannerImpl.COMMIT).startSpan();
try (Scope s = tracer.withSpan(span)) {
Expand Down
Expand Up @@ -299,9 +299,15 @@ ApiFuture<CommitResponse> commitAsync() {
CommitRequest.newBuilder()
.setSession(session.getName())
.setReturnCommitStats(options.withCommitStats());
if (options.hasPriority()) {
builder.setRequestOptions(
RequestOptions.newBuilder().setPriority(options.priority()).build());
if (options.hasPriority() || getTransactionTag() != null) {
RequestOptions.Builder requestOptionsBuilder = RequestOptions.newBuilder();
if (options.hasPriority()) {
requestOptionsBuilder.setPriority(options.priority());
}
if (getTransactionTag() != null) {
requestOptionsBuilder.setTransactionTag(getTransactionTag());
}
builder.setRequestOptions(requestOptionsBuilder.build());
}
synchronized (lock) {
if (transactionIdFuture == null && transactionId == null && runningAsyncOperations == 0) {
Expand Down Expand Up @@ -349,9 +355,15 @@ public void run() {
requestBuilder.setTransactionId(
transactionId == null ? transactionIdFuture.get() : transactionId);
}
if (options.hasPriority()) {
requestBuilder.setRequestOptions(
RequestOptions.newBuilder().setPriority(options.priority()).build());
if (options.hasPriority() || getTransactionTag() != null) {
RequestOptions.Builder requestOptionsBuilder = RequestOptions.newBuilder();
if (options.hasPriority()) {
requestOptionsBuilder.setPriority(options.priority());
}
if (getTransactionTag() != null) {
requestOptionsBuilder.setTransactionTag(getTransactionTag());
}
requestBuilder.setRequestOptions(requestOptionsBuilder.build());
}
final CommitRequest commitRequest = requestBuilder.build();
span.addAnnotation("Starting Commit");
Expand Down Expand Up @@ -531,6 +543,12 @@ public void onTransactionMetadata(Transaction transaction, boolean shouldInclude
}
}

@Nullable
String getTransactionTag() {
if (this.options.hasTag()) return this.options.tag();
return null;
}

@Override
public SpannerException onError(SpannerException e, boolean withBeginTransaction) {
// If the statement that caused an error was the statement that included a BeginTransaction
Expand Down
Expand Up @@ -64,6 +64,14 @@ TestReadContext build() {
}
}

class TestReadContextWithTagBuilder
extends AbstractReadContext.Builder<TestReadContextWithTagBuilder, TestReadContextWithTag> {
@Override
TestReadContextWithTag build() {
return new TestReadContextWithTag(this);
}
}

private final class TestReadContext extends AbstractReadContext {
TestReadContext(TestReadContextBuilder builder) {
super(builder);
Expand All @@ -75,6 +83,21 @@ TransactionSelector getTransactionSelector() {
}
}

private final class TestReadContextWithTag extends AbstractReadContext {
TestReadContextWithTag(TestReadContextWithTagBuilder builder) {
super(builder);
}

@Override
TransactionSelector getTransactionSelector() {
return TransactionSelector.getDefaultInstance();
}

String getTransactionTag() {
return "app=spanner,env=test";
}
}

private TestReadContext context;

@Before
Expand Down Expand Up @@ -162,4 +185,46 @@ public void testGetExecuteBatchDmlRequestBuilderWithPriority() {
Options.fromQueryOptions(Options.priority(RpcPriority.LOW)));
assertEquals(Priority.PRIORITY_LOW, request.getRequestOptions().getPriority());
}

public void executeSqlRequestBuilderWithRequestOptions() {
ExecuteSqlRequest request =
context
.getExecuteSqlRequestBuilder(
Statement.newBuilder("SELECT FOO FROM BAR").build(),
QueryMode.NORMAL,
Options.fromUpdateOptions(Options.tag("app=spanner,env=test,action=query")),
false)
.build();
assertThat(request.getSql()).isEqualTo("SELECT FOO FROM BAR");
assertThat(request.getRequestOptions().getRequestTag())
.isEqualTo("app=spanner,env=test,action=query");
assertThat(request.getRequestOptions().getTransactionTag()).isEmpty();
}

@Test
public void executeSqlRequestBuilderWithRequestOptionsWithTxnTag() {
SessionImpl session = mock(SessionImpl.class);
when(session.getName()).thenReturn("session-1");
TestReadContextWithTagBuilder builder = new TestReadContextWithTagBuilder();
TestReadContextWithTag contextWithTag =
builder
.setSession(session)
.setRpc(mock(SpannerRpc.class))
.setDefaultQueryOptions(defaultQueryOptions)
.setExecutorProvider(mock(ExecutorProvider.class))
.build();

ExecuteSqlRequest request =
contextWithTag
.getExecuteSqlRequestBuilder(
Statement.newBuilder("SELECT FOO FROM BAR").build(),
QueryMode.NORMAL,
Options.fromUpdateOptions(Options.tag("app=spanner,env=test,action=query")),
false)
.build();
assertThat(request.getSql()).isEqualTo("SELECT FOO FROM BAR");
assertThat(request.getRequestOptions().getRequestTag())
.isEqualTo("app=spanner,env=test,action=query");
assertThat(request.getRequestOptions().getTransactionTag()).isEqualTo("app=spanner,env=test");
}
}

0 comments on commit 2a9086f

Please sign in to comment.