diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java
index 3b6f00480b..9d687bb516 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java
@@ -563,6 +563,12 @@ RequestOptions buildRequestOptions(Options options) {
if (options.hasPriority()) {
builder.setPriority(options.priority());
}
+ if (options.hasTag()) {
+ builder.setRequestTag(options.tag());
+ }
+ if (getTransactionTag() != null) {
+ builder.setTransactionTag(getTransactionTag());
+ }
return builder.build();
}
@@ -707,6 +713,15 @@ public void close() {
@Nullable
abstract TransactionSelector getTransactionSelector();
+ /**
+ * Returns the transaction tag for this {@link AbstractReadContext} or null
if this
+ * {@link AbstractReadContext} does not have a transaction tag.
+ */
+ @Nullable
+ String getTransactionTag() {
+ return null;
+ }
+
/** This method is called when a statement returned a new transaction as part of its results. */
@Override
public void onTransactionMetadata(Transaction transaction, boolean shouldIncludeId) {}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java
index 8435680eab..3d443f3673 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java
@@ -101,6 +101,14 @@ public static ReadQueryUpdateTransactionOption priority(RpcPriority priority) {
return new PriorityOption(priority);
}
+ /**
+ * Specifying this will cause the reads, queries, updates and writes operations statistics
+ * collection to be grouped by tag.
+ */
+ public static ReadQueryUpdateTransactionOption tag(String name) {
+ return new TagOption(name);
+ }
+
/**
* Specifying this will cause the list operations to fetch at most this many records in a page.
*/
@@ -194,6 +202,19 @@ void appendToOptions(Options options) {
}
}
+ static final class TagOption extends InternalOption implements ReadQueryUpdateTransactionOption {
+ private final String tag;
+
+ TagOption(String tag) {
+ this.tag = tag;
+ }
+
+ @Override
+ void appendToOptions(Options options) {
+ options.tag = tag;
+ }
+ }
+
private boolean withCommitStats;
private Long limit;
private Integer prefetchChunks;
@@ -202,6 +223,7 @@ void appendToOptions(Options options) {
private String pageToken;
private String filter;
private RpcPriority priority;
+ private String tag;
// Construction is via factory methods below.
private Options() {}
@@ -266,6 +288,14 @@ Priority priority() {
return priority == null ? null : priority.proto;
}
+ boolean hasTag() {
+ return tag != null;
+ }
+
+ String tag() {
+ return tag;
+ }
+
@Override
public String toString() {
StringBuilder b = new StringBuilder();
@@ -290,6 +320,9 @@ public String toString() {
if (priority != null) {
b.append("priority: ").append(priority).append(' ');
}
+ if (tag != null) {
+ b.append("tag: ").append(tag).append(' ');
+ }
return b.toString();
}
@@ -320,7 +353,8 @@ public boolean equals(Object o) {
|| hasPageSize() && that.hasPageSize() && Objects.equals(pageSize(), that.pageSize()))
&& Objects.equals(pageToken(), that.pageToken())
&& Objects.equals(filter(), that.filter())
- && Objects.equals(priority(), that.priority());
+ && Objects.equals(priority(), that.priority())
+ && Objects.equals(tag(), that.tag());
}
@Override
@@ -350,6 +384,9 @@ public int hashCode() {
if (priority != null) {
result = 31 * result + priority.hashCode();
}
+ if (tag != null) {
+ result = 31 * result + tag.hashCode();
+ }
return result;
}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java
index 697f3e6351..2aeceb276d 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java
@@ -181,11 +181,16 @@ ExecuteSqlRequest newTransactionRequestFrom(final Statement statement, final Opt
builder.setResumeToken(ByteString.EMPTY);
- if (options.hasPriority()) {
- builder.setRequestOptions(
- RequestOptions.newBuilder().setPriority(options.priority()).build());
+ if (options.hasPriority() || options.hasTag()) {
+ RequestOptions.Builder requestOptionsBuilder = RequestOptions.newBuilder();
+ if (options.hasPriority()) {
+ requestOptionsBuilder.setPriority(options.priority());
+ }
+ if (options.hasTag()) {
+ requestOptionsBuilder.setRequestTag(options.tag());
+ }
+ builder.setRequestOptions(requestOptionsBuilder.build());
}
-
return builder.build();
}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java
index d122310eb7..874c11a5d8 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java
@@ -173,9 +173,15 @@ public CommitResponse writeAtLeastOnceWithOptions(
.setSingleUseTransaction(
TransactionOptions.newBuilder()
.setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance()));
- if (commitRequestOptions.hasPriority()) {
- requestBuilder.setRequestOptions(
- RequestOptions.newBuilder().setPriority(commitRequestOptions.priority()).build());
+ if (commitRequestOptions.hasPriority() || commitRequestOptions.hasTag()) {
+ RequestOptions.Builder requestOptionsBuilder = RequestOptions.newBuilder();
+ if (commitRequestOptions.hasPriority()) {
+ requestOptionsBuilder.setPriority(commitRequestOptions.priority());
+ }
+ if (commitRequestOptions.hasTag()) {
+ requestOptionsBuilder.setTransactionTag(commitRequestOptions.tag());
+ }
+ requestBuilder.setRequestOptions(requestOptionsBuilder.build());
}
Span span = tracer.spanBuilder(SpannerImpl.COMMIT).startSpan();
try (Scope s = tracer.withSpan(span)) {
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java
index 860348adff..a6eb6160b2 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java
@@ -299,9 +299,15 @@ ApiFuture commitAsync() {
CommitRequest.newBuilder()
.setSession(session.getName())
.setReturnCommitStats(options.withCommitStats());
- if (options.hasPriority()) {
- builder.setRequestOptions(
- RequestOptions.newBuilder().setPriority(options.priority()).build());
+ if (options.hasPriority() || getTransactionTag() != null) {
+ RequestOptions.Builder requestOptionsBuilder = RequestOptions.newBuilder();
+ if (options.hasPriority()) {
+ requestOptionsBuilder.setPriority(options.priority());
+ }
+ if (getTransactionTag() != null) {
+ requestOptionsBuilder.setTransactionTag(getTransactionTag());
+ }
+ builder.setRequestOptions(requestOptionsBuilder.build());
}
synchronized (lock) {
if (transactionIdFuture == null && transactionId == null && runningAsyncOperations == 0) {
@@ -349,9 +355,15 @@ public void run() {
requestBuilder.setTransactionId(
transactionId == null ? transactionIdFuture.get() : transactionId);
}
- if (options.hasPriority()) {
- requestBuilder.setRequestOptions(
- RequestOptions.newBuilder().setPriority(options.priority()).build());
+ if (options.hasPriority() || getTransactionTag() != null) {
+ RequestOptions.Builder requestOptionsBuilder = RequestOptions.newBuilder();
+ if (options.hasPriority()) {
+ requestOptionsBuilder.setPriority(options.priority());
+ }
+ if (getTransactionTag() != null) {
+ requestOptionsBuilder.setTransactionTag(getTransactionTag());
+ }
+ requestBuilder.setRequestOptions(requestOptionsBuilder.build());
}
final CommitRequest commitRequest = requestBuilder.build();
span.addAnnotation("Starting Commit");
@@ -531,6 +543,12 @@ public void onTransactionMetadata(Transaction transaction, boolean shouldInclude
}
}
+ @Nullable
+ String getTransactionTag() {
+ if (this.options.hasTag()) return this.options.tag();
+ return null;
+ }
+
@Override
public SpannerException onError(SpannerException e, boolean withBeginTransaction) {
// If the statement that caused an error was the statement that included a BeginTransaction
diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AbstractReadContextTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AbstractReadContextTest.java
index 795a26fff7..61ab806f35 100644
--- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AbstractReadContextTest.java
+++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AbstractReadContextTest.java
@@ -64,6 +64,14 @@ TestReadContext build() {
}
}
+ class TestReadContextWithTagBuilder
+ extends AbstractReadContext.Builder {
+ @Override
+ TestReadContextWithTag build() {
+ return new TestReadContextWithTag(this);
+ }
+ }
+
private final class TestReadContext extends AbstractReadContext {
TestReadContext(TestReadContextBuilder builder) {
super(builder);
@@ -75,6 +83,21 @@ TransactionSelector getTransactionSelector() {
}
}
+ private final class TestReadContextWithTag extends AbstractReadContext {
+ TestReadContextWithTag(TestReadContextWithTagBuilder builder) {
+ super(builder);
+ }
+
+ @Override
+ TransactionSelector getTransactionSelector() {
+ return TransactionSelector.getDefaultInstance();
+ }
+
+ String getTransactionTag() {
+ return "app=spanner,env=test";
+ }
+ }
+
private TestReadContext context;
@Before
@@ -162,4 +185,46 @@ public void testGetExecuteBatchDmlRequestBuilderWithPriority() {
Options.fromQueryOptions(Options.priority(RpcPriority.LOW)));
assertEquals(Priority.PRIORITY_LOW, request.getRequestOptions().getPriority());
}
+
+ public void executeSqlRequestBuilderWithRequestOptions() {
+ ExecuteSqlRequest request =
+ context
+ .getExecuteSqlRequestBuilder(
+ Statement.newBuilder("SELECT FOO FROM BAR").build(),
+ QueryMode.NORMAL,
+ Options.fromUpdateOptions(Options.tag("app=spanner,env=test,action=query")),
+ false)
+ .build();
+ assertThat(request.getSql()).isEqualTo("SELECT FOO FROM BAR");
+ assertThat(request.getRequestOptions().getRequestTag())
+ .isEqualTo("app=spanner,env=test,action=query");
+ assertThat(request.getRequestOptions().getTransactionTag()).isEmpty();
+ }
+
+ @Test
+ public void executeSqlRequestBuilderWithRequestOptionsWithTxnTag() {
+ SessionImpl session = mock(SessionImpl.class);
+ when(session.getName()).thenReturn("session-1");
+ TestReadContextWithTagBuilder builder = new TestReadContextWithTagBuilder();
+ TestReadContextWithTag contextWithTag =
+ builder
+ .setSession(session)
+ .setRpc(mock(SpannerRpc.class))
+ .setDefaultQueryOptions(defaultQueryOptions)
+ .setExecutorProvider(mock(ExecutorProvider.class))
+ .build();
+
+ ExecuteSqlRequest request =
+ contextWithTag
+ .getExecuteSqlRequestBuilder(
+ Statement.newBuilder("SELECT FOO FROM BAR").build(),
+ QueryMode.NORMAL,
+ Options.fromUpdateOptions(Options.tag("app=spanner,env=test,action=query")),
+ false)
+ .build();
+ assertThat(request.getSql()).isEqualTo("SELECT FOO FROM BAR");
+ assertThat(request.getRequestOptions().getRequestTag())
+ .isEqualTo("app=spanner,env=test,action=query");
+ assertThat(request.getRequestOptions().getTransactionTag()).isEqualTo("app=spanner,env=test");
+ }
}
diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java
index 867d8fb242..8c66b78ef3 100644
--- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java
+++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java
@@ -271,6 +271,294 @@ public void testWriteAtLeastOnceWithOptions() {
assertEquals(Priority.PRIORITY_LOW, commit.getRequestOptions().getPriority());
}
+ @Test
+ public void writeAtLeastOnceWithTagOptions() {
+ DatabaseClient client =
+ spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
+ client.writeAtLeastOnceWithOptions(
+ Arrays.asList(
+ Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build()),
+ Options.tag("app=spanner,env=test"));
+
+ List commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class);
+ assertThat(commitRequests).hasSize(1);
+ CommitRequest commit = commitRequests.get(0);
+ assertNotNull(commit.getSingleUseTransaction());
+ assertTrue(commit.getSingleUseTransaction().hasReadWrite());
+ assertNotNull(commit.getRequestOptions());
+ assertThat(commit.getRequestOptions().getTransactionTag()).isEqualTo("app=spanner,env=test");
+ assertThat(commit.getRequestOptions().getRequestTag()).isEmpty();
+ }
+
+ @Test
+ public void testExecuteQueryWithTag() {
+ DatabaseClient client =
+ spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
+ try (ResultSet resultSet =
+ client
+ .singleUse()
+ .executeQuery(SELECT1, Options.tag("app=spanner,env=test,action=query"))) {
+ while (resultSet.next()) {}
+ }
+
+ List requests = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class);
+ assertThat(requests).hasSize(1);
+ ExecuteSqlRequest request = requests.get(0);
+ assertNotNull(request.getRequestOptions());
+ assertThat(request.getRequestOptions().getRequestTag())
+ .isEqualTo("app=spanner,env=test,action=query");
+ assertThat(request.getRequestOptions().getTransactionTag()).isEmpty();
+ }
+
+ @Test
+ public void testExecuteReadWithTag() {
+ DatabaseClient client =
+ spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
+ try (ResultSet resultSet =
+ client
+ .singleUse()
+ .read(
+ READ_TABLE_NAME,
+ KeySet.singleKey(Key.of(1L)),
+ READ_COLUMN_NAMES,
+ Options.tag("app=spanner,env=test,action=read"))) {
+ while (resultSet.next()) {}
+ }
+
+ List requests = mockSpanner.getRequestsOfType(ReadRequest.class);
+ assertThat(requests).hasSize(1);
+ ReadRequest request = requests.get(0);
+ assertNotNull(request.getRequestOptions());
+ assertThat(request.getRequestOptions().getRequestTag())
+ .isEqualTo("app=spanner,env=test,action=read");
+ assertThat(request.getRequestOptions().getTransactionTag()).isEmpty();
+ }
+
+ @Test
+ public void testReadWriteExecuteQueryWithTag() {
+ DatabaseClient client =
+ spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
+ TransactionRunner runner =
+ client.readWriteTransaction(Options.tag("app=spanner,env=test,action=txn"));
+ runner.run(
+ new TransactionCallable() {
+ @Override
+ public Void run(TransactionContext transaction) throws Exception {
+ try (ResultSet resultSet =
+ transaction.executeQuery(
+ SELECT1, Options.tag("app=spanner,env=test,action=query"))) {
+ while (resultSet.next()) {}
+ }
+ return null;
+ }
+ });
+
+ List requests = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class);
+ assertThat(requests).hasSize(1);
+ ExecuteSqlRequest request = requests.get(0);
+ assertNotNull(request.getRequestOptions());
+ assertThat(request.getRequestOptions().getRequestTag())
+ .isEqualTo("app=spanner,env=test,action=query");
+ assertThat(request.getRequestOptions().getTransactionTag())
+ .isEqualTo("app=spanner,env=test,action=txn");
+ }
+
+ @Test
+ public void testReadWriteExecuteReadWithTag() {
+ DatabaseClient client =
+ spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
+ TransactionRunner runner =
+ client.readWriteTransaction(Options.tag("app=spanner,env=test,action=txn"));
+ runner.run(
+ new TransactionCallable() {
+ @Override
+ public Void run(TransactionContext transaction) throws Exception {
+ try (ResultSet resultSet =
+ transaction.read(
+ READ_TABLE_NAME,
+ KeySet.singleKey(Key.of(1L)),
+ READ_COLUMN_NAMES,
+ Options.tag("app=spanner,env=test,action=read"))) {
+ while (resultSet.next()) {}
+ }
+ return null;
+ }
+ });
+
+ List requests = mockSpanner.getRequestsOfType(ReadRequest.class);
+ assertThat(requests).hasSize(1);
+ ReadRequest request = requests.get(0);
+ assertNotNull(request.getRequestOptions());
+ assertThat(request.getRequestOptions().getRequestTag())
+ .isEqualTo("app=spanner,env=test,action=read");
+ assertThat(request.getRequestOptions().getTransactionTag())
+ .isEqualTo("app=spanner,env=test,action=txn");
+ }
+
+ @Test
+ public void testExecuteUpdateWithTag() {
+ DatabaseClient client =
+ spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
+ TransactionRunner runner = client.readWriteTransaction();
+ runner.run(
+ new TransactionCallable() {
+ @Override
+ public Long run(TransactionContext transaction) throws Exception {
+ return transaction.executeUpdate(
+ UPDATE_STATEMENT, Options.tag("app=spanner,env=test,action=update"));
+ }
+ });
+
+ List requests = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class);
+ assertThat(requests).hasSize(1);
+ ExecuteSqlRequest request = requests.get(0);
+ assertNotNull(request.getRequestOptions());
+ assertThat(request.getRequestOptions().getRequestTag())
+ .isEqualTo("app=spanner,env=test,action=update");
+ assertThat(request.getRequestOptions().getTransactionTag()).isEmpty();
+ }
+
+ @Test
+ public void testBatchUpdateWithTag() {
+ DatabaseClient client =
+ spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
+ TransactionRunner runner =
+ client.readWriteTransaction(Options.tag("app=spanner,env=test,action=txn"));
+ runner.run(
+ new TransactionCallable() {
+ @Override
+ public long[] run(TransactionContext transaction) throws Exception {
+ return transaction.batchUpdate(
+ Arrays.asList(UPDATE_STATEMENT), Options.tag("app=spanner,env=test,action=batch"));
+ }
+ });
+
+ List requests =
+ mockSpanner.getRequestsOfType(ExecuteBatchDmlRequest.class);
+ assertThat(requests).hasSize(1);
+ ExecuteBatchDmlRequest request = requests.get(0);
+ assertNotNull(request.getRequestOptions());
+ assertThat(request.getRequestOptions().getRequestTag())
+ .isEqualTo("app=spanner,env=test,action=batch");
+ assertThat(request.getRequestOptions().getTransactionTag())
+ .isEqualTo("app=spanner,env=test,action=txn");
+ }
+
+ @Test
+ public void testPartitionedDMLWithTag() {
+ DatabaseClient client =
+ spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
+ client.executePartitionedUpdate(
+ UPDATE_STATEMENT, Options.tag("app=spanner,env=test,action=dml"));
+
+ List requests = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class);
+ assertThat(requests).hasSize(1);
+ ExecuteSqlRequest request = requests.get(0);
+ assertNotNull(request.getRequestOptions());
+ assertThat(request.getRequestOptions().getRequestTag())
+ .isEqualTo("app=spanner,env=test,action=dml");
+ assertThat(request.getRequestOptions().getTransactionTag()).isEmpty();
+ }
+
+ @Test
+ public void testCommitWithTag() {
+ DatabaseClient client =
+ spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
+ TransactionRunner runner =
+ client.readWriteTransaction(Options.tag("app=spanner,env=test,action=commit"));
+ runner.run(
+ new TransactionCallable() {
+ @Override
+ public Void run(TransactionContext transaction) throws Exception {
+ transaction.buffer(Mutation.delete("TEST", KeySet.all()));
+ return null;
+ }
+ });
+
+ List requests = mockSpanner.getRequestsOfType(CommitRequest.class);
+ assertThat(requests).hasSize(1);
+ CommitRequest request = requests.get(0);
+ assertNotNull(request.getRequestOptions());
+ assertThat(request.getRequestOptions().getRequestTag()).isEmpty();
+ assertThat(request.getRequestOptions().getTransactionTag())
+ .isEqualTo("app=spanner,env=test,action=commit");
+ }
+
+ @Test
+ public void testTransactionManagerCommitWithTag() {
+ DatabaseClient client =
+ spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
+ TransactionManager manager =
+ client.transactionManager(Options.tag("app=spanner,env=test,action=manager"));
+ TransactionContext transaction = manager.begin();
+ transaction.buffer(Mutation.delete("TEST", KeySet.all()));
+ manager.commit();
+
+ List requests = mockSpanner.getRequestsOfType(CommitRequest.class);
+ assertThat(requests).hasSize(1);
+ CommitRequest request = requests.get(0);
+ assertNotNull(request.getRequestOptions());
+ assertThat(request.getRequestOptions().getRequestTag()).isEmpty();
+ assertThat(request.getRequestOptions().getTransactionTag())
+ .isEqualTo("app=spanner,env=test,action=manager");
+ }
+
+ @Test
+ public void testAsyncRunnerCommitWithTag() {
+ DatabaseClient client =
+ spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
+ AsyncRunner runner = client.runAsync(Options.tag("app=spanner,env=test,action=runner"));
+ get(
+ runner.runAsync(
+ new AsyncWork() {
+ @Override
+ public ApiFuture doWorkAsync(TransactionContext txn) {
+ txn.buffer(Mutation.delete("TEST", KeySet.all()));
+ return ApiFutures.immediateFuture(null);
+ }
+ },
+ executor));
+
+ List requests = mockSpanner.getRequestsOfType(CommitRequest.class);
+ assertThat(requests).hasSize(1);
+ CommitRequest request = requests.get(0);
+ assertNotNull(request.getRequestOptions());
+ assertThat(request.getRequestOptions().getRequestTag()).isEmpty();
+ assertThat(request.getRequestOptions().getTransactionTag())
+ .isEqualTo("app=spanner,env=test,action=runner");
+ }
+
+ @Test
+ public void testAsyncTransactionManagerCommitWithTag() {
+ DatabaseClient client =
+ spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
+ try (AsyncTransactionManager manager =
+ client.transactionManagerAsync(Options.tag("app=spanner,env=test,action=manager"))) {
+ TransactionContextFuture transaction = manager.beginAsync();
+ get(
+ transaction
+ .then(
+ new AsyncTransactionFunction() {
+ @Override
+ public ApiFuture apply(TransactionContext txn, Void input)
+ throws Exception {
+ txn.buffer(Mutation.delete("TEST", KeySet.all()));
+ return ApiFutures.immediateFuture(null);
+ }
+ },
+ executor)
+ .commitAsync());
+ }
+
+ List requests = mockSpanner.getRequestsOfType(CommitRequest.class);
+ assertThat(requests).hasSize(1);
+ CommitRequest request = requests.get(0);
+ assertNotNull(request.getRequestOptions());
+ assertThat(request.getRequestOptions().getRequestTag()).isEmpty();
+ assertThat(request.getRequestOptions().getTransactionTag())
+ .isEqualTo("app=spanner,env=test,action=manager");
+ }
+
@Test
public void singleUse() {
DatabaseClient client =
diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OptionsTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OptionsTest.java
index 4ab08a4785..d0f4794ccd 100644
--- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OptionsTest.java
+++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OptionsTest.java
@@ -91,6 +91,7 @@ public void allOptionsAbsent() {
assertThat(options.hasFilter()).isFalse();
assertThat(options.hasPageToken()).isFalse();
assertThat(options.hasPriority()).isFalse();
+ assertThat(options.hasTag()).isFalse();
assertThat(options.toString()).isEqualTo("");
assertThat(options.equals(options)).isTrue();
assertThat(options.equals(null)).isFalse();
@@ -100,15 +101,15 @@ public void allOptionsAbsent() {
}
@Test
- public void listOptTest() {
+ public void listOptionsTest() {
int pageSize = 3;
String pageToken = "ptok";
String filter = "env";
- Options opts =
+ Options options =
Options.fromListOptions(
Options.pageSize(pageSize), Options.pageToken(pageToken), Options.filter(filter));
- assertThat(opts.toString())
+ assertThat(options.toString())
.isEqualTo(
"pageSize: "
+ Integer.toString(pageSize)
@@ -118,14 +119,14 @@ public void listOptTest() {
+ filter
+ " ");
- assertThat(opts.hasPageSize()).isTrue();
- assertThat(opts.hasPageToken()).isTrue();
- assertThat(opts.hasFilter()).isTrue();
+ assertThat(options.hasPageSize()).isTrue();
+ assertThat(options.hasPageToken()).isTrue();
+ assertThat(options.hasFilter()).isTrue();
- assertThat(opts.pageSize()).isEqualTo(pageSize);
- assertThat(opts.pageToken()).isEqualTo(pageToken);
- assertThat(opts.filter()).isEqualTo(filter);
- assertThat(opts.hashCode()).isEqualTo(108027089);
+ assertThat(options.pageSize()).isEqualTo(pageSize);
+ assertThat(options.pageToken()).isEqualTo(pageToken);
+ assertThat(options.filter()).isEqualTo(filter);
+ assertThat(options.hashCode()).isEqualTo(108027089);
}
@Test
@@ -168,12 +169,15 @@ public void listEquality() {
}
@Test
- public void readOptTest() {
+ public void readOptionsTest() {
int limit = 3;
- Options opts = Options.fromReadOptions(Options.limit(limit));
+ String tag = "app=spanner,env=test,action=read";
+ Options options = Options.fromReadOptions(Options.limit(limit), Options.tag(tag));
- assertThat(opts.toString()).isEqualTo("limit: " + Integer.toString(limit) + " ");
- assertThat(opts.hashCode()).isEqualTo(964);
+ assertThat(options.toString())
+ .isEqualTo("limit: " + Integer.toString(limit) + " " + "tag: " + tag + " ");
+ assertThat(options.tag()).isEqualTo(tag);
+ assertThat(options.hashCode()).isEqualTo(-1111478426);
}
@Test
@@ -198,12 +202,15 @@ public void readEquality() {
}
@Test
- public void queryOptTest() {
+ public void queryOptionsTest() {
int chunks = 3;
- Options opts = Options.fromQueryOptions(Options.prefetchChunks(chunks));
- assertThat(opts.toString()).isEqualTo("prefetchChunks: " + Integer.toString(chunks) + " ");
- assertThat(opts.prefetchChunks()).isEqualTo(chunks);
- assertThat(opts.hashCode()).isEqualTo(964);
+ String tag = "app=spanner,env=test,action=query";
+ Options options = Options.fromQueryOptions(Options.prefetchChunks(chunks), Options.tag(tag));
+ assertThat(options.toString())
+ .isEqualTo("prefetchChunks: " + Integer.toString(chunks) + " " + "tag: " + tag + " ");
+ assertThat(options.prefetchChunks()).isEqualTo(chunks);
+ assertThat(options.tag()).isEqualTo(tag);
+ assertThat(options.hashCode()).isEqualTo(-97431824);
}
@Test
@@ -229,8 +236,8 @@ public void queryEquality() {
@Test
public void testFromTransactionOptions_toStringNoOptions() {
- Options opts = Options.fromTransactionOptions();
- assertThat(opts.toString()).isEqualTo("");
+ Options options = Options.fromTransactionOptions();
+ assertThat(options.toString()).isEqualTo("");
}
@Test
@@ -485,9 +492,9 @@ public void testFromUpdateOptions() {
@Test
public void testTransactionOptions() {
RpcPriority prio = RpcPriority.HIGH;
- Options opts = Options.fromTransactionOptions(Options.priority(prio));
- assertThat(opts.toString()).isEqualTo("priority: " + prio + " ");
- assertThat(opts.priority()).isEqualTo(Priority.PRIORITY_HIGH);
+ Options options = Options.fromTransactionOptions(Options.priority(prio));
+ assertThat(options.toString()).isEqualTo("priority: " + prio + " ");
+ assertThat(options.priority()).isEqualTo(Priority.PRIORITY_HIGH);
}
@Test
@@ -510,4 +517,68 @@ public void testTransactionOptionsPriorityEquality() {
assertNotEquals(options2, options3);
assertNotEquals(options2, options4);
}
+
+ @Test
+ public void updateOptionsTest() {
+ String tag = "app=spanner,env=test";
+ Options options = Options.fromUpdateOptions(Options.tag(tag));
+
+ assertEquals("tag: " + tag + " ", options.toString());
+ assertTrue(options.hasTag());
+ assertThat(options.tag()).isEqualTo(tag);
+ assertThat(options.hashCode()).isEqualTo(-2118248262);
+ }
+
+ @Test
+ public void updateEquality() {
+ Options o1;
+ Options o2;
+ Options o3;
+
+ o1 = Options.fromUpdateOptions();
+ o2 = Options.fromUpdateOptions();
+ assertThat(o1.equals(o2)).isTrue();
+
+ o2 = Options.fromUpdateOptions(Options.tag("app=spanner,env=test"));
+ assertThat(o1.equals(o2)).isFalse();
+ assertThat(o2.equals(o1)).isFalse();
+
+ o3 = Options.fromUpdateOptions(Options.tag("app=spanner,env=test"));
+ assertThat(o2.equals(o3)).isTrue();
+
+ o3 = Options.fromUpdateOptions(Options.tag("app=spanner,env=stage"));
+ assertThat(o2.equals(o3)).isFalse();
+ }
+
+ @Test
+ public void transactionOptionsTest() {
+ String tag = "app=spanner,env=test";
+ Options options = Options.fromTransactionOptions(Options.tag(tag));
+
+ assertEquals("tag: " + tag + " ", options.toString());
+ assertTrue(options.hasTag());
+ assertThat(options.tag()).isEqualTo(tag);
+ assertThat(options.hashCode()).isEqualTo(-2118248262);
+ }
+
+ @Test
+ public void transactionEquality() {
+ Options o1;
+ Options o2;
+ Options o3;
+
+ o1 = Options.fromTransactionOptions();
+ o2 = Options.fromTransactionOptions();
+ assertThat(o1.equals(o2)).isTrue();
+
+ o2 = Options.fromTransactionOptions(Options.tag("app=spanner,env=test"));
+ assertThat(o1.equals(o2)).isFalse();
+ assertThat(o2.equals(o1)).isFalse();
+
+ o3 = Options.fromTransactionOptions(Options.tag("app=spanner,env=test"));
+ assertThat(o2.equals(o3)).isTrue();
+
+ o3 = Options.fromTransactionOptions(Options.tag("app=spanner,env=stage"));
+ assertThat(o2.equals(o3)).isFalse();
+ }
}
diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/PartitionedDmlTransactionTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/PartitionedDmlTransactionTest.java
index 4048d4b37c..1ca4b85319 100644
--- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/PartitionedDmlTransactionTest.java
+++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/PartitionedDmlTransactionTest.java
@@ -67,6 +67,7 @@ public class PartitionedDmlTransactionTest {
private final ByteString txId = ByteString.copyFromUtf8("tx");
private final ByteString resumeToken = ByteString.copyFromUtf8("resume");
private final String sql = "UPDATE FOO SET BAR=1 WHERE TRUE";
+ private final String tag = "app=spanner,env=test";
private final ExecuteSqlRequest executeRequestWithoutResumeToken =
ExecuteSqlRequest.newBuilder()
.setQueryMode(QueryMode.NORMAL)
@@ -76,6 +77,11 @@ public class PartitionedDmlTransactionTest {
.build();
private final ExecuteSqlRequest executeRequestWithResumeToken =
executeRequestWithoutResumeToken.toBuilder().setResumeToken(resumeToken).build();
+ private final ExecuteSqlRequest executeRequestWithRequestOptions =
+ executeRequestWithoutResumeToken
+ .toBuilder()
+ .setRequestOptions(RequestOptions.newBuilder().setRequestTag(tag).build())
+ .build();
@Before
public void setup() {
@@ -108,6 +114,28 @@ public void testExecuteStreamingPartitionedUpdate() {
Mockito.eq(executeRequestWithoutResumeToken), anyMap(), any(Duration.class));
}
+ @Test
+ public void testExecuteStreamingPartitionedUpdateWithUpdateOptions() {
+ ResultSetStats stats = ResultSetStats.newBuilder().setRowCountLowerBound(1000L).build();
+ PartialResultSet p1 = PartialResultSet.newBuilder().setResumeToken(resumeToken).build();
+ PartialResultSet p2 = PartialResultSet.newBuilder().setStats(stats).build();
+ ServerStream stream = mock(ServerStream.class);
+ when(stream.iterator()).thenReturn(ImmutableList.of(p1, p2).iterator());
+ when(rpc.executeStreamingPartitionedDml(
+ Mockito.eq(executeRequestWithRequestOptions), anyMap(), any(Duration.class)))
+ .thenReturn(stream);
+
+ long count =
+ tx.executeStreamingPartitionedUpdate(
+ Statement.of(sql), Duration.ofMinutes(10), Options.tag(tag));
+
+ assertThat(count).isEqualTo(1000L);
+ verify(rpc).beginTransaction(any(BeginTransactionRequest.class), anyMap());
+ verify(rpc)
+ .executeStreamingPartitionedDml(
+ Mockito.eq(executeRequestWithRequestOptions), anyMap(), any(Duration.class));
+ }
+
@Test
public void testExecuteStreamingPartitionedUpdateAborted() {
ResultSetStats stats = ResultSetStats.newBuilder().setRowCountLowerBound(1000L).build();
@@ -355,4 +383,15 @@ public void testRequestWithPriority() {
Options.fromUpdateOptions(Options.priority(RpcPriority.LOW)));
assertEquals(Priority.PRIORITY_LOW, request.getRequestOptions().getPriority());
}
+
+ @Test
+ public void testRequestWithPriorityAndRequestTag() {
+ ExecuteSqlRequest request =
+ tx.newTransactionRequestFrom(
+ Statement.of("UPDATE FOO SET BAR=1 WHERE TRUE"),
+ Options.fromUpdateOptions(
+ Options.priority(RpcPriority.LOW), Options.tag("app=spanner,env=test")));
+ assertEquals(Priority.PRIORITY_LOW, request.getRequestOptions().getPriority());
+ assertThat(request.getRequestOptions().getRequestTag()).isEqualTo("app=spanner,env=test");
+ }
}
diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java
index 3827b2a280..5388af76c4 100644
--- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java
+++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java
@@ -240,6 +240,32 @@ public void writeAtLeastOnce() throws ParseException {
assertThat(request.getMutationsList()).containsExactly(mutation);
}
+ @Test
+ public void writeAtLeastOnceWithOptions() throws ParseException {
+ String tag = "app=spanner,env=test";
+ String timestampString = "2015-10-01T10:54:20.021Z";
+ ArgumentCaptor commit = ArgumentCaptor.forClass(CommitRequest.class);
+ CommitResponse response =
+ CommitResponse.newBuilder().setCommitTimestamp(Timestamps.parse(timestampString)).build();
+ Mockito.when(rpc.commit(commit.capture(), Mockito.eq(options))).thenReturn(response);
+ session.writeAtLeastOnceWithOptions(
+ Arrays.asList(Mutation.newInsertBuilder("T").set("C").to("x").build()), Options.tag(tag));
+
+ CommitRequest request = commit.getValue();
+ assertThat(request.getRequestOptions().getTransactionTag()).isEqualTo(tag);
+ com.google.spanner.v1.Mutation mutation =
+ com.google.spanner.v1.Mutation.newBuilder()
+ .setInsert(
+ Write.newBuilder()
+ .setTable("T")
+ .addColumns("C")
+ .addValues(
+ ListValue.newBuilder()
+ .addValues(com.google.protobuf.Value.newBuilder().setStringValue("x"))))
+ .build();
+ assertThat(request.getMutationsList()).containsExactly(mutation);
+ }
+
private static long utcTimeSeconds(int year, int month, int day, int hour, int min, int secs) {
GregorianCalendar calendar = new GregorianCalendar(TimeZone.getTimeZone("UTC"));
calendar.set(year, month, day, hour, min, secs);