Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for tagging #576

Merged
merged 12 commits into from Apr 6, 2021
Expand Up @@ -563,6 +563,12 @@ RequestOptions buildRequestOptions(Options options) {
if (options.hasPriority()) {
builder.setPriority(options.priority());
}
if (options.hasTag()) {
builder.setRequestTag(options.tag());
}
if (getTransactionTag() != null) {
builder.setTransactionTag(getTransactionTag());
}
return builder.build();
}

Expand Down Expand Up @@ -707,6 +713,15 @@ public void close() {
@Nullable
abstract TransactionSelector getTransactionSelector();

/**
* Returns the transaction tag for this {@link AbstractReadContext} or <code>null</code> if this
* {@link AbstractReadContext} does not have a transaction tag.
*/
@Nullable
String getTransactionTag() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could we also create a hasTransactionTag() that checks if getTransactionTag() != null?

return null;
}

/** This method is called when a statement returned a new transaction as part of its results. */
@Override
public void onTransactionMetadata(Transaction transaction, boolean shouldIncludeId) {}
Expand Down
Expand Up @@ -101,6 +101,14 @@ public static ReadQueryUpdateTransactionOption priority(RpcPriority priority) {
return new PriorityOption(priority);
}

/**
* Specifying this will cause the reads, queries, updates and writes operations statistics
* collection to be grouped by tag.
*/
public static ReadQueryUpdateTransactionOption tag(String name) {
return new TagOption(name);
}

/**
* Specifying this will cause the list operations to fetch at most this many records in a page.
*/
Expand Down Expand Up @@ -194,6 +202,19 @@ void appendToOptions(Options options) {
}
}

static final class TagOption extends InternalOption implements ReadQueryUpdateTransactionOption {
private final String tag;

TagOption(String tag) {
this.tag = tag;
}

@Override
void appendToOptions(Options options) {
options.tag = tag;
}
}

private boolean withCommitStats;
private Long limit;
private Integer prefetchChunks;
Expand All @@ -202,6 +223,7 @@ void appendToOptions(Options options) {
private String pageToken;
private String filter;
private RpcPriority priority;
private String tag;

// Construction is via factory methods below.
private Options() {}
Expand Down Expand Up @@ -266,6 +288,14 @@ Priority priority() {
return priority == null ? null : priority.proto;
}

boolean hasTag() {
return tag != null;
}

String tag() {
return tag;
}

@Override
public String toString() {
StringBuilder b = new StringBuilder();
Expand All @@ -290,6 +320,9 @@ public String toString() {
if (priority != null) {
b.append("priority: ").append(priority).append(' ');
}
if (tag != null) {
b.append("tag: ").append(tag).append(' ');
}
return b.toString();
}

Expand Down Expand Up @@ -320,7 +353,8 @@ public boolean equals(Object o) {
|| hasPageSize() && that.hasPageSize() && Objects.equals(pageSize(), that.pageSize()))
&& Objects.equals(pageToken(), that.pageToken())
&& Objects.equals(filter(), that.filter())
&& Objects.equals(priority(), that.priority());
&& Objects.equals(priority(), that.priority())
&& Objects.equals(tag(), that.tag());
}

@Override
Expand Down Expand Up @@ -350,6 +384,9 @@ public int hashCode() {
if (priority != null) {
result = 31 * result + priority.hashCode();
}
if (tag != null) {
result = 31 * result + tag.hashCode();
}
return result;
}

Expand Down
Expand Up @@ -181,11 +181,16 @@ ExecuteSqlRequest newTransactionRequestFrom(final Statement statement, final Opt

builder.setResumeToken(ByteString.EMPTY);

if (options.hasPriority()) {
builder.setRequestOptions(
RequestOptions.newBuilder().setPriority(options.priority()).build());
if (options.hasPriority() || options.hasTag()) {
RequestOptions.Builder requestOptionsBuilder = RequestOptions.newBuilder();
if (options.hasPriority()) {
requestOptionsBuilder.setPriority(options.priority());
}
if (options.hasTag()) {
requestOptionsBuilder.setRequestTag(options.tag());
}
builder.setRequestOptions(requestOptionsBuilder.build());
}

return builder.build();
}

Expand Down
Expand Up @@ -173,9 +173,15 @@ public CommitResponse writeAtLeastOnceWithOptions(
.setSingleUseTransaction(
TransactionOptions.newBuilder()
.setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance()));
if (commitRequestOptions.hasPriority()) {
requestBuilder.setRequestOptions(
RequestOptions.newBuilder().setPriority(commitRequestOptions.priority()).build());
if (commitRequestOptions.hasPriority() || commitRequestOptions.hasTag()) {
RequestOptions.Builder requestOptionsBuilder = RequestOptions.newBuilder();
if (commitRequestOptions.hasPriority()) {
requestOptionsBuilder.setPriority(commitRequestOptions.priority());
}
if (commitRequestOptions.hasTag()) {
requestOptionsBuilder.setTransactionTag(commitRequestOptions.tag());
}
requestBuilder.setRequestOptions(requestOptionsBuilder.build());
}
Span span = tracer.spanBuilder(SpannerImpl.COMMIT).startSpan();
try (Scope s = tracer.withSpan(span)) {
Expand Down
Expand Up @@ -299,9 +299,15 @@ ApiFuture<CommitResponse> commitAsync() {
CommitRequest.newBuilder()
.setSession(session.getName())
.setReturnCommitStats(options.withCommitStats());
if (options.hasPriority()) {
builder.setRequestOptions(
RequestOptions.newBuilder().setPriority(options.priority()).build());
if (options.hasPriority() || getTransactionTag() != null) {
RequestOptions.Builder requestOptionsBuilder = RequestOptions.newBuilder();
if (options.hasPriority()) {
requestOptionsBuilder.setPriority(options.priority());
}
if (getTransactionTag() != null) {
requestOptionsBuilder.setTransactionTag(getTransactionTag());
}
builder.setRequestOptions(requestOptionsBuilder.build());
}
synchronized (lock) {
if (transactionIdFuture == null && transactionId == null && runningAsyncOperations == 0) {
Expand Down Expand Up @@ -349,9 +355,15 @@ public void run() {
requestBuilder.setTransactionId(
transactionId == null ? transactionIdFuture.get() : transactionId);
}
if (options.hasPriority()) {
requestBuilder.setRequestOptions(
RequestOptions.newBuilder().setPriority(options.priority()).build());
if (options.hasPriority() || getTransactionTag() != null) {
RequestOptions.Builder requestOptionsBuilder = RequestOptions.newBuilder();
if (options.hasPriority()) {
requestOptionsBuilder.setPriority(options.priority());
}
if (getTransactionTag() != null) {
requestOptionsBuilder.setTransactionTag(getTransactionTag());
}
requestBuilder.setRequestOptions(requestOptionsBuilder.build());
}
final CommitRequest commitRequest = requestBuilder.build();
span.addAnnotation("Starting Commit");
Expand Down Expand Up @@ -531,6 +543,12 @@ public void onTransactionMetadata(Transaction transaction, boolean shouldInclude
}
}

@Nullable
String getTransactionTag() {
if (this.options.hasTag()) return this.options.tag();
return null;
}

@Override
public SpannerException onError(SpannerException e, boolean withBeginTransaction) {
// If the statement that caused an error was the statement that included a BeginTransaction
Expand Down
Expand Up @@ -64,6 +64,14 @@ TestReadContext build() {
}
}

class TestReadContextWithTagBuilder
extends AbstractReadContext.Builder<TestReadContextWithTagBuilder, TestReadContextWithTag> {
@Override
TestReadContextWithTag build() {
return new TestReadContextWithTag(this);
}
}

private final class TestReadContext extends AbstractReadContext {
TestReadContext(TestReadContextBuilder builder) {
super(builder);
Expand All @@ -75,6 +83,21 @@ TransactionSelector getTransactionSelector() {
}
}

private final class TestReadContextWithTag extends AbstractReadContext {
TestReadContextWithTag(TestReadContextWithTagBuilder builder) {
super(builder);
}

@Override
TransactionSelector getTransactionSelector() {
return TransactionSelector.getDefaultInstance();
}

String getTransactionTag() {
return "app=spanner,env=test";
}
}

private TestReadContext context;

@Before
Expand Down Expand Up @@ -162,4 +185,46 @@ public void testGetExecuteBatchDmlRequestBuilderWithPriority() {
Options.fromQueryOptions(Options.priority(RpcPriority.LOW)));
assertEquals(Priority.PRIORITY_LOW, request.getRequestOptions().getPriority());
}

public void executeSqlRequestBuilderWithRequestOptions() {
mayurkale22 marked this conversation as resolved.
Show resolved Hide resolved
ExecuteSqlRequest request =
context
.getExecuteSqlRequestBuilder(
Statement.newBuilder("SELECT FOO FROM BAR").build(),
QueryMode.NORMAL,
Options.fromUpdateOptions(Options.tag("app=spanner,env=test,action=query")),
false)
.build();
assertThat(request.getSql()).isEqualTo("SELECT FOO FROM BAR");
assertThat(request.getRequestOptions().getRequestTag())
.isEqualTo("app=spanner,env=test,action=query");
assertThat(request.getRequestOptions().getTransactionTag()).isEmpty();
}

@Test
public void executeSqlRequestBuilderWithRequestOptionsWithTxnTag() {
SessionImpl session = mock(SessionImpl.class);
when(session.getName()).thenReturn("session-1");
TestReadContextWithTagBuilder builder = new TestReadContextWithTagBuilder();
TestReadContextWithTag contextWithTag =
builder
.setSession(session)
.setRpc(mock(SpannerRpc.class))
.setDefaultQueryOptions(defaultQueryOptions)
.setExecutorProvider(mock(ExecutorProvider.class))
.build();

ExecuteSqlRequest request =
contextWithTag
.getExecuteSqlRequestBuilder(
Statement.newBuilder("SELECT FOO FROM BAR").build(),
QueryMode.NORMAL,
Options.fromUpdateOptions(Options.tag("app=spanner,env=test,action=query")),
false)
.build();
assertThat(request.getSql()).isEqualTo("SELECT FOO FROM BAR");
assertThat(request.getRequestOptions().getRequestTag())
.isEqualTo("app=spanner,env=test,action=query");
assertThat(request.getRequestOptions().getTransactionTag()).isEqualTo("app=spanner,env=test");
}
}