diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java index 3b6f00480b..9d687bb516 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java @@ -563,6 +563,12 @@ RequestOptions buildRequestOptions(Options options) { if (options.hasPriority()) { builder.setPriority(options.priority()); } + if (options.hasTag()) { + builder.setRequestTag(options.tag()); + } + if (getTransactionTag() != null) { + builder.setTransactionTag(getTransactionTag()); + } return builder.build(); } @@ -707,6 +713,15 @@ public void close() { @Nullable abstract TransactionSelector getTransactionSelector(); + /** + * Returns the transaction tag for this {@link AbstractReadContext} or null if this + * {@link AbstractReadContext} does not have a transaction tag. + */ + @Nullable + String getTransactionTag() { + return null; + } + /** This method is called when a statement returned a new transaction as part of its results. */ @Override public void onTransactionMetadata(Transaction transaction, boolean shouldIncludeId) {} diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java index 8435680eab..3d443f3673 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java @@ -101,6 +101,14 @@ public static ReadQueryUpdateTransactionOption priority(RpcPriority priority) { return new PriorityOption(priority); } + /** + * Specifying this will cause the reads, queries, updates and writes operations statistics + * collection to be grouped by tag. + */ + public static ReadQueryUpdateTransactionOption tag(String name) { + return new TagOption(name); + } + /** * Specifying this will cause the list operations to fetch at most this many records in a page. */ @@ -194,6 +202,19 @@ void appendToOptions(Options options) { } } + static final class TagOption extends InternalOption implements ReadQueryUpdateTransactionOption { + private final String tag; + + TagOption(String tag) { + this.tag = tag; + } + + @Override + void appendToOptions(Options options) { + options.tag = tag; + } + } + private boolean withCommitStats; private Long limit; private Integer prefetchChunks; @@ -202,6 +223,7 @@ void appendToOptions(Options options) { private String pageToken; private String filter; private RpcPriority priority; + private String tag; // Construction is via factory methods below. private Options() {} @@ -266,6 +288,14 @@ Priority priority() { return priority == null ? null : priority.proto; } + boolean hasTag() { + return tag != null; + } + + String tag() { + return tag; + } + @Override public String toString() { StringBuilder b = new StringBuilder(); @@ -290,6 +320,9 @@ public String toString() { if (priority != null) { b.append("priority: ").append(priority).append(' '); } + if (tag != null) { + b.append("tag: ").append(tag).append(' '); + } return b.toString(); } @@ -320,7 +353,8 @@ public boolean equals(Object o) { || hasPageSize() && that.hasPageSize() && Objects.equals(pageSize(), that.pageSize())) && Objects.equals(pageToken(), that.pageToken()) && Objects.equals(filter(), that.filter()) - && Objects.equals(priority(), that.priority()); + && Objects.equals(priority(), that.priority()) + && Objects.equals(tag(), that.tag()); } @Override @@ -350,6 +384,9 @@ public int hashCode() { if (priority != null) { result = 31 * result + priority.hashCode(); } + if (tag != null) { + result = 31 * result + tag.hashCode(); + } return result; } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java index 697f3e6351..2aeceb276d 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java @@ -181,11 +181,16 @@ ExecuteSqlRequest newTransactionRequestFrom(final Statement statement, final Opt builder.setResumeToken(ByteString.EMPTY); - if (options.hasPriority()) { - builder.setRequestOptions( - RequestOptions.newBuilder().setPriority(options.priority()).build()); + if (options.hasPriority() || options.hasTag()) { + RequestOptions.Builder requestOptionsBuilder = RequestOptions.newBuilder(); + if (options.hasPriority()) { + requestOptionsBuilder.setPriority(options.priority()); + } + if (options.hasTag()) { + requestOptionsBuilder.setRequestTag(options.tag()); + } + builder.setRequestOptions(requestOptionsBuilder.build()); } - return builder.build(); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java index d122310eb7..874c11a5d8 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java @@ -173,9 +173,15 @@ public CommitResponse writeAtLeastOnceWithOptions( .setSingleUseTransaction( TransactionOptions.newBuilder() .setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance())); - if (commitRequestOptions.hasPriority()) { - requestBuilder.setRequestOptions( - RequestOptions.newBuilder().setPriority(commitRequestOptions.priority()).build()); + if (commitRequestOptions.hasPriority() || commitRequestOptions.hasTag()) { + RequestOptions.Builder requestOptionsBuilder = RequestOptions.newBuilder(); + if (commitRequestOptions.hasPriority()) { + requestOptionsBuilder.setPriority(commitRequestOptions.priority()); + } + if (commitRequestOptions.hasTag()) { + requestOptionsBuilder.setTransactionTag(commitRequestOptions.tag()); + } + requestBuilder.setRequestOptions(requestOptionsBuilder.build()); } Span span = tracer.spanBuilder(SpannerImpl.COMMIT).startSpan(); try (Scope s = tracer.withSpan(span)) { diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java index 860348adff..a6eb6160b2 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java @@ -299,9 +299,15 @@ ApiFuture commitAsync() { CommitRequest.newBuilder() .setSession(session.getName()) .setReturnCommitStats(options.withCommitStats()); - if (options.hasPriority()) { - builder.setRequestOptions( - RequestOptions.newBuilder().setPriority(options.priority()).build()); + if (options.hasPriority() || getTransactionTag() != null) { + RequestOptions.Builder requestOptionsBuilder = RequestOptions.newBuilder(); + if (options.hasPriority()) { + requestOptionsBuilder.setPriority(options.priority()); + } + if (getTransactionTag() != null) { + requestOptionsBuilder.setTransactionTag(getTransactionTag()); + } + builder.setRequestOptions(requestOptionsBuilder.build()); } synchronized (lock) { if (transactionIdFuture == null && transactionId == null && runningAsyncOperations == 0) { @@ -349,9 +355,15 @@ public void run() { requestBuilder.setTransactionId( transactionId == null ? transactionIdFuture.get() : transactionId); } - if (options.hasPriority()) { - requestBuilder.setRequestOptions( - RequestOptions.newBuilder().setPriority(options.priority()).build()); + if (options.hasPriority() || getTransactionTag() != null) { + RequestOptions.Builder requestOptionsBuilder = RequestOptions.newBuilder(); + if (options.hasPriority()) { + requestOptionsBuilder.setPriority(options.priority()); + } + if (getTransactionTag() != null) { + requestOptionsBuilder.setTransactionTag(getTransactionTag()); + } + requestBuilder.setRequestOptions(requestOptionsBuilder.build()); } final CommitRequest commitRequest = requestBuilder.build(); span.addAnnotation("Starting Commit"); @@ -531,6 +543,12 @@ public void onTransactionMetadata(Transaction transaction, boolean shouldInclude } } + @Nullable + String getTransactionTag() { + if (this.options.hasTag()) return this.options.tag(); + return null; + } + @Override public SpannerException onError(SpannerException e, boolean withBeginTransaction) { // If the statement that caused an error was the statement that included a BeginTransaction diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AbstractReadContextTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AbstractReadContextTest.java index 795a26fff7..61ab806f35 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AbstractReadContextTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AbstractReadContextTest.java @@ -64,6 +64,14 @@ TestReadContext build() { } } + class TestReadContextWithTagBuilder + extends AbstractReadContext.Builder { + @Override + TestReadContextWithTag build() { + return new TestReadContextWithTag(this); + } + } + private final class TestReadContext extends AbstractReadContext { TestReadContext(TestReadContextBuilder builder) { super(builder); @@ -75,6 +83,21 @@ TransactionSelector getTransactionSelector() { } } + private final class TestReadContextWithTag extends AbstractReadContext { + TestReadContextWithTag(TestReadContextWithTagBuilder builder) { + super(builder); + } + + @Override + TransactionSelector getTransactionSelector() { + return TransactionSelector.getDefaultInstance(); + } + + String getTransactionTag() { + return "app=spanner,env=test"; + } + } + private TestReadContext context; @Before @@ -162,4 +185,46 @@ public void testGetExecuteBatchDmlRequestBuilderWithPriority() { Options.fromQueryOptions(Options.priority(RpcPriority.LOW))); assertEquals(Priority.PRIORITY_LOW, request.getRequestOptions().getPriority()); } + + public void executeSqlRequestBuilderWithRequestOptions() { + ExecuteSqlRequest request = + context + .getExecuteSqlRequestBuilder( + Statement.newBuilder("SELECT FOO FROM BAR").build(), + QueryMode.NORMAL, + Options.fromUpdateOptions(Options.tag("app=spanner,env=test,action=query")), + false) + .build(); + assertThat(request.getSql()).isEqualTo("SELECT FOO FROM BAR"); + assertThat(request.getRequestOptions().getRequestTag()) + .isEqualTo("app=spanner,env=test,action=query"); + assertThat(request.getRequestOptions().getTransactionTag()).isEmpty(); + } + + @Test + public void executeSqlRequestBuilderWithRequestOptionsWithTxnTag() { + SessionImpl session = mock(SessionImpl.class); + when(session.getName()).thenReturn("session-1"); + TestReadContextWithTagBuilder builder = new TestReadContextWithTagBuilder(); + TestReadContextWithTag contextWithTag = + builder + .setSession(session) + .setRpc(mock(SpannerRpc.class)) + .setDefaultQueryOptions(defaultQueryOptions) + .setExecutorProvider(mock(ExecutorProvider.class)) + .build(); + + ExecuteSqlRequest request = + contextWithTag + .getExecuteSqlRequestBuilder( + Statement.newBuilder("SELECT FOO FROM BAR").build(), + QueryMode.NORMAL, + Options.fromUpdateOptions(Options.tag("app=spanner,env=test,action=query")), + false) + .build(); + assertThat(request.getSql()).isEqualTo("SELECT FOO FROM BAR"); + assertThat(request.getRequestOptions().getRequestTag()) + .isEqualTo("app=spanner,env=test,action=query"); + assertThat(request.getRequestOptions().getTransactionTag()).isEqualTo("app=spanner,env=test"); + } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java index 867d8fb242..8c66b78ef3 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java @@ -271,6 +271,294 @@ public void testWriteAtLeastOnceWithOptions() { assertEquals(Priority.PRIORITY_LOW, commit.getRequestOptions().getPriority()); } + @Test + public void writeAtLeastOnceWithTagOptions() { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + client.writeAtLeastOnceWithOptions( + Arrays.asList( + Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build()), + Options.tag("app=spanner,env=test")); + + List commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class); + assertThat(commitRequests).hasSize(1); + CommitRequest commit = commitRequests.get(0); + assertNotNull(commit.getSingleUseTransaction()); + assertTrue(commit.getSingleUseTransaction().hasReadWrite()); + assertNotNull(commit.getRequestOptions()); + assertThat(commit.getRequestOptions().getTransactionTag()).isEqualTo("app=spanner,env=test"); + assertThat(commit.getRequestOptions().getRequestTag()).isEmpty(); + } + + @Test + public void testExecuteQueryWithTag() { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + try (ResultSet resultSet = + client + .singleUse() + .executeQuery(SELECT1, Options.tag("app=spanner,env=test,action=query"))) { + while (resultSet.next()) {} + } + + List requests = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class); + assertThat(requests).hasSize(1); + ExecuteSqlRequest request = requests.get(0); + assertNotNull(request.getRequestOptions()); + assertThat(request.getRequestOptions().getRequestTag()) + .isEqualTo("app=spanner,env=test,action=query"); + assertThat(request.getRequestOptions().getTransactionTag()).isEmpty(); + } + + @Test + public void testExecuteReadWithTag() { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + try (ResultSet resultSet = + client + .singleUse() + .read( + READ_TABLE_NAME, + KeySet.singleKey(Key.of(1L)), + READ_COLUMN_NAMES, + Options.tag("app=spanner,env=test,action=read"))) { + while (resultSet.next()) {} + } + + List requests = mockSpanner.getRequestsOfType(ReadRequest.class); + assertThat(requests).hasSize(1); + ReadRequest request = requests.get(0); + assertNotNull(request.getRequestOptions()); + assertThat(request.getRequestOptions().getRequestTag()) + .isEqualTo("app=spanner,env=test,action=read"); + assertThat(request.getRequestOptions().getTransactionTag()).isEmpty(); + } + + @Test + public void testReadWriteExecuteQueryWithTag() { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + TransactionRunner runner = + client.readWriteTransaction(Options.tag("app=spanner,env=test,action=txn")); + runner.run( + new TransactionCallable() { + @Override + public Void run(TransactionContext transaction) throws Exception { + try (ResultSet resultSet = + transaction.executeQuery( + SELECT1, Options.tag("app=spanner,env=test,action=query"))) { + while (resultSet.next()) {} + } + return null; + } + }); + + List requests = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class); + assertThat(requests).hasSize(1); + ExecuteSqlRequest request = requests.get(0); + assertNotNull(request.getRequestOptions()); + assertThat(request.getRequestOptions().getRequestTag()) + .isEqualTo("app=spanner,env=test,action=query"); + assertThat(request.getRequestOptions().getTransactionTag()) + .isEqualTo("app=spanner,env=test,action=txn"); + } + + @Test + public void testReadWriteExecuteReadWithTag() { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + TransactionRunner runner = + client.readWriteTransaction(Options.tag("app=spanner,env=test,action=txn")); + runner.run( + new TransactionCallable() { + @Override + public Void run(TransactionContext transaction) throws Exception { + try (ResultSet resultSet = + transaction.read( + READ_TABLE_NAME, + KeySet.singleKey(Key.of(1L)), + READ_COLUMN_NAMES, + Options.tag("app=spanner,env=test,action=read"))) { + while (resultSet.next()) {} + } + return null; + } + }); + + List requests = mockSpanner.getRequestsOfType(ReadRequest.class); + assertThat(requests).hasSize(1); + ReadRequest request = requests.get(0); + assertNotNull(request.getRequestOptions()); + assertThat(request.getRequestOptions().getRequestTag()) + .isEqualTo("app=spanner,env=test,action=read"); + assertThat(request.getRequestOptions().getTransactionTag()) + .isEqualTo("app=spanner,env=test,action=txn"); + } + + @Test + public void testExecuteUpdateWithTag() { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + TransactionRunner runner = client.readWriteTransaction(); + runner.run( + new TransactionCallable() { + @Override + public Long run(TransactionContext transaction) throws Exception { + return transaction.executeUpdate( + UPDATE_STATEMENT, Options.tag("app=spanner,env=test,action=update")); + } + }); + + List requests = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class); + assertThat(requests).hasSize(1); + ExecuteSqlRequest request = requests.get(0); + assertNotNull(request.getRequestOptions()); + assertThat(request.getRequestOptions().getRequestTag()) + .isEqualTo("app=spanner,env=test,action=update"); + assertThat(request.getRequestOptions().getTransactionTag()).isEmpty(); + } + + @Test + public void testBatchUpdateWithTag() { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + TransactionRunner runner = + client.readWriteTransaction(Options.tag("app=spanner,env=test,action=txn")); + runner.run( + new TransactionCallable() { + @Override + public long[] run(TransactionContext transaction) throws Exception { + return transaction.batchUpdate( + Arrays.asList(UPDATE_STATEMENT), Options.tag("app=spanner,env=test,action=batch")); + } + }); + + List requests = + mockSpanner.getRequestsOfType(ExecuteBatchDmlRequest.class); + assertThat(requests).hasSize(1); + ExecuteBatchDmlRequest request = requests.get(0); + assertNotNull(request.getRequestOptions()); + assertThat(request.getRequestOptions().getRequestTag()) + .isEqualTo("app=spanner,env=test,action=batch"); + assertThat(request.getRequestOptions().getTransactionTag()) + .isEqualTo("app=spanner,env=test,action=txn"); + } + + @Test + public void testPartitionedDMLWithTag() { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + client.executePartitionedUpdate( + UPDATE_STATEMENT, Options.tag("app=spanner,env=test,action=dml")); + + List requests = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class); + assertThat(requests).hasSize(1); + ExecuteSqlRequest request = requests.get(0); + assertNotNull(request.getRequestOptions()); + assertThat(request.getRequestOptions().getRequestTag()) + .isEqualTo("app=spanner,env=test,action=dml"); + assertThat(request.getRequestOptions().getTransactionTag()).isEmpty(); + } + + @Test + public void testCommitWithTag() { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + TransactionRunner runner = + client.readWriteTransaction(Options.tag("app=spanner,env=test,action=commit")); + runner.run( + new TransactionCallable() { + @Override + public Void run(TransactionContext transaction) throws Exception { + transaction.buffer(Mutation.delete("TEST", KeySet.all())); + return null; + } + }); + + List requests = mockSpanner.getRequestsOfType(CommitRequest.class); + assertThat(requests).hasSize(1); + CommitRequest request = requests.get(0); + assertNotNull(request.getRequestOptions()); + assertThat(request.getRequestOptions().getRequestTag()).isEmpty(); + assertThat(request.getRequestOptions().getTransactionTag()) + .isEqualTo("app=spanner,env=test,action=commit"); + } + + @Test + public void testTransactionManagerCommitWithTag() { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + TransactionManager manager = + client.transactionManager(Options.tag("app=spanner,env=test,action=manager")); + TransactionContext transaction = manager.begin(); + transaction.buffer(Mutation.delete("TEST", KeySet.all())); + manager.commit(); + + List requests = mockSpanner.getRequestsOfType(CommitRequest.class); + assertThat(requests).hasSize(1); + CommitRequest request = requests.get(0); + assertNotNull(request.getRequestOptions()); + assertThat(request.getRequestOptions().getRequestTag()).isEmpty(); + assertThat(request.getRequestOptions().getTransactionTag()) + .isEqualTo("app=spanner,env=test,action=manager"); + } + + @Test + public void testAsyncRunnerCommitWithTag() { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + AsyncRunner runner = client.runAsync(Options.tag("app=spanner,env=test,action=runner")); + get( + runner.runAsync( + new AsyncWork() { + @Override + public ApiFuture doWorkAsync(TransactionContext txn) { + txn.buffer(Mutation.delete("TEST", KeySet.all())); + return ApiFutures.immediateFuture(null); + } + }, + executor)); + + List requests = mockSpanner.getRequestsOfType(CommitRequest.class); + assertThat(requests).hasSize(1); + CommitRequest request = requests.get(0); + assertNotNull(request.getRequestOptions()); + assertThat(request.getRequestOptions().getRequestTag()).isEmpty(); + assertThat(request.getRequestOptions().getTransactionTag()) + .isEqualTo("app=spanner,env=test,action=runner"); + } + + @Test + public void testAsyncTransactionManagerCommitWithTag() { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + try (AsyncTransactionManager manager = + client.transactionManagerAsync(Options.tag("app=spanner,env=test,action=manager"))) { + TransactionContextFuture transaction = manager.beginAsync(); + get( + transaction + .then( + new AsyncTransactionFunction() { + @Override + public ApiFuture apply(TransactionContext txn, Void input) + throws Exception { + txn.buffer(Mutation.delete("TEST", KeySet.all())); + return ApiFutures.immediateFuture(null); + } + }, + executor) + .commitAsync()); + } + + List requests = mockSpanner.getRequestsOfType(CommitRequest.class); + assertThat(requests).hasSize(1); + CommitRequest request = requests.get(0); + assertNotNull(request.getRequestOptions()); + assertThat(request.getRequestOptions().getRequestTag()).isEmpty(); + assertThat(request.getRequestOptions().getTransactionTag()) + .isEqualTo("app=spanner,env=test,action=manager"); + } + @Test public void singleUse() { DatabaseClient client = diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OptionsTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OptionsTest.java index 4ab08a4785..d0f4794ccd 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OptionsTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OptionsTest.java @@ -91,6 +91,7 @@ public void allOptionsAbsent() { assertThat(options.hasFilter()).isFalse(); assertThat(options.hasPageToken()).isFalse(); assertThat(options.hasPriority()).isFalse(); + assertThat(options.hasTag()).isFalse(); assertThat(options.toString()).isEqualTo(""); assertThat(options.equals(options)).isTrue(); assertThat(options.equals(null)).isFalse(); @@ -100,15 +101,15 @@ public void allOptionsAbsent() { } @Test - public void listOptTest() { + public void listOptionsTest() { int pageSize = 3; String pageToken = "ptok"; String filter = "env"; - Options opts = + Options options = Options.fromListOptions( Options.pageSize(pageSize), Options.pageToken(pageToken), Options.filter(filter)); - assertThat(opts.toString()) + assertThat(options.toString()) .isEqualTo( "pageSize: " + Integer.toString(pageSize) @@ -118,14 +119,14 @@ public void listOptTest() { + filter + " "); - assertThat(opts.hasPageSize()).isTrue(); - assertThat(opts.hasPageToken()).isTrue(); - assertThat(opts.hasFilter()).isTrue(); + assertThat(options.hasPageSize()).isTrue(); + assertThat(options.hasPageToken()).isTrue(); + assertThat(options.hasFilter()).isTrue(); - assertThat(opts.pageSize()).isEqualTo(pageSize); - assertThat(opts.pageToken()).isEqualTo(pageToken); - assertThat(opts.filter()).isEqualTo(filter); - assertThat(opts.hashCode()).isEqualTo(108027089); + assertThat(options.pageSize()).isEqualTo(pageSize); + assertThat(options.pageToken()).isEqualTo(pageToken); + assertThat(options.filter()).isEqualTo(filter); + assertThat(options.hashCode()).isEqualTo(108027089); } @Test @@ -168,12 +169,15 @@ public void listEquality() { } @Test - public void readOptTest() { + public void readOptionsTest() { int limit = 3; - Options opts = Options.fromReadOptions(Options.limit(limit)); + String tag = "app=spanner,env=test,action=read"; + Options options = Options.fromReadOptions(Options.limit(limit), Options.tag(tag)); - assertThat(opts.toString()).isEqualTo("limit: " + Integer.toString(limit) + " "); - assertThat(opts.hashCode()).isEqualTo(964); + assertThat(options.toString()) + .isEqualTo("limit: " + Integer.toString(limit) + " " + "tag: " + tag + " "); + assertThat(options.tag()).isEqualTo(tag); + assertThat(options.hashCode()).isEqualTo(-1111478426); } @Test @@ -198,12 +202,15 @@ public void readEquality() { } @Test - public void queryOptTest() { + public void queryOptionsTest() { int chunks = 3; - Options opts = Options.fromQueryOptions(Options.prefetchChunks(chunks)); - assertThat(opts.toString()).isEqualTo("prefetchChunks: " + Integer.toString(chunks) + " "); - assertThat(opts.prefetchChunks()).isEqualTo(chunks); - assertThat(opts.hashCode()).isEqualTo(964); + String tag = "app=spanner,env=test,action=query"; + Options options = Options.fromQueryOptions(Options.prefetchChunks(chunks), Options.tag(tag)); + assertThat(options.toString()) + .isEqualTo("prefetchChunks: " + Integer.toString(chunks) + " " + "tag: " + tag + " "); + assertThat(options.prefetchChunks()).isEqualTo(chunks); + assertThat(options.tag()).isEqualTo(tag); + assertThat(options.hashCode()).isEqualTo(-97431824); } @Test @@ -229,8 +236,8 @@ public void queryEquality() { @Test public void testFromTransactionOptions_toStringNoOptions() { - Options opts = Options.fromTransactionOptions(); - assertThat(opts.toString()).isEqualTo(""); + Options options = Options.fromTransactionOptions(); + assertThat(options.toString()).isEqualTo(""); } @Test @@ -485,9 +492,9 @@ public void testFromUpdateOptions() { @Test public void testTransactionOptions() { RpcPriority prio = RpcPriority.HIGH; - Options opts = Options.fromTransactionOptions(Options.priority(prio)); - assertThat(opts.toString()).isEqualTo("priority: " + prio + " "); - assertThat(opts.priority()).isEqualTo(Priority.PRIORITY_HIGH); + Options options = Options.fromTransactionOptions(Options.priority(prio)); + assertThat(options.toString()).isEqualTo("priority: " + prio + " "); + assertThat(options.priority()).isEqualTo(Priority.PRIORITY_HIGH); } @Test @@ -510,4 +517,68 @@ public void testTransactionOptionsPriorityEquality() { assertNotEquals(options2, options3); assertNotEquals(options2, options4); } + + @Test + public void updateOptionsTest() { + String tag = "app=spanner,env=test"; + Options options = Options.fromUpdateOptions(Options.tag(tag)); + + assertEquals("tag: " + tag + " ", options.toString()); + assertTrue(options.hasTag()); + assertThat(options.tag()).isEqualTo(tag); + assertThat(options.hashCode()).isEqualTo(-2118248262); + } + + @Test + public void updateEquality() { + Options o1; + Options o2; + Options o3; + + o1 = Options.fromUpdateOptions(); + o2 = Options.fromUpdateOptions(); + assertThat(o1.equals(o2)).isTrue(); + + o2 = Options.fromUpdateOptions(Options.tag("app=spanner,env=test")); + assertThat(o1.equals(o2)).isFalse(); + assertThat(o2.equals(o1)).isFalse(); + + o3 = Options.fromUpdateOptions(Options.tag("app=spanner,env=test")); + assertThat(o2.equals(o3)).isTrue(); + + o3 = Options.fromUpdateOptions(Options.tag("app=spanner,env=stage")); + assertThat(o2.equals(o3)).isFalse(); + } + + @Test + public void transactionOptionsTest() { + String tag = "app=spanner,env=test"; + Options options = Options.fromTransactionOptions(Options.tag(tag)); + + assertEquals("tag: " + tag + " ", options.toString()); + assertTrue(options.hasTag()); + assertThat(options.tag()).isEqualTo(tag); + assertThat(options.hashCode()).isEqualTo(-2118248262); + } + + @Test + public void transactionEquality() { + Options o1; + Options o2; + Options o3; + + o1 = Options.fromTransactionOptions(); + o2 = Options.fromTransactionOptions(); + assertThat(o1.equals(o2)).isTrue(); + + o2 = Options.fromTransactionOptions(Options.tag("app=spanner,env=test")); + assertThat(o1.equals(o2)).isFalse(); + assertThat(o2.equals(o1)).isFalse(); + + o3 = Options.fromTransactionOptions(Options.tag("app=spanner,env=test")); + assertThat(o2.equals(o3)).isTrue(); + + o3 = Options.fromTransactionOptions(Options.tag("app=spanner,env=stage")); + assertThat(o2.equals(o3)).isFalse(); + } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/PartitionedDmlTransactionTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/PartitionedDmlTransactionTest.java index 4048d4b37c..1ca4b85319 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/PartitionedDmlTransactionTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/PartitionedDmlTransactionTest.java @@ -67,6 +67,7 @@ public class PartitionedDmlTransactionTest { private final ByteString txId = ByteString.copyFromUtf8("tx"); private final ByteString resumeToken = ByteString.copyFromUtf8("resume"); private final String sql = "UPDATE FOO SET BAR=1 WHERE TRUE"; + private final String tag = "app=spanner,env=test"; private final ExecuteSqlRequest executeRequestWithoutResumeToken = ExecuteSqlRequest.newBuilder() .setQueryMode(QueryMode.NORMAL) @@ -76,6 +77,11 @@ public class PartitionedDmlTransactionTest { .build(); private final ExecuteSqlRequest executeRequestWithResumeToken = executeRequestWithoutResumeToken.toBuilder().setResumeToken(resumeToken).build(); + private final ExecuteSqlRequest executeRequestWithRequestOptions = + executeRequestWithoutResumeToken + .toBuilder() + .setRequestOptions(RequestOptions.newBuilder().setRequestTag(tag).build()) + .build(); @Before public void setup() { @@ -108,6 +114,28 @@ public void testExecuteStreamingPartitionedUpdate() { Mockito.eq(executeRequestWithoutResumeToken), anyMap(), any(Duration.class)); } + @Test + public void testExecuteStreamingPartitionedUpdateWithUpdateOptions() { + ResultSetStats stats = ResultSetStats.newBuilder().setRowCountLowerBound(1000L).build(); + PartialResultSet p1 = PartialResultSet.newBuilder().setResumeToken(resumeToken).build(); + PartialResultSet p2 = PartialResultSet.newBuilder().setStats(stats).build(); + ServerStream stream = mock(ServerStream.class); + when(stream.iterator()).thenReturn(ImmutableList.of(p1, p2).iterator()); + when(rpc.executeStreamingPartitionedDml( + Mockito.eq(executeRequestWithRequestOptions), anyMap(), any(Duration.class))) + .thenReturn(stream); + + long count = + tx.executeStreamingPartitionedUpdate( + Statement.of(sql), Duration.ofMinutes(10), Options.tag(tag)); + + assertThat(count).isEqualTo(1000L); + verify(rpc).beginTransaction(any(BeginTransactionRequest.class), anyMap()); + verify(rpc) + .executeStreamingPartitionedDml( + Mockito.eq(executeRequestWithRequestOptions), anyMap(), any(Duration.class)); + } + @Test public void testExecuteStreamingPartitionedUpdateAborted() { ResultSetStats stats = ResultSetStats.newBuilder().setRowCountLowerBound(1000L).build(); @@ -355,4 +383,15 @@ public void testRequestWithPriority() { Options.fromUpdateOptions(Options.priority(RpcPriority.LOW))); assertEquals(Priority.PRIORITY_LOW, request.getRequestOptions().getPriority()); } + + @Test + public void testRequestWithPriorityAndRequestTag() { + ExecuteSqlRequest request = + tx.newTransactionRequestFrom( + Statement.of("UPDATE FOO SET BAR=1 WHERE TRUE"), + Options.fromUpdateOptions( + Options.priority(RpcPriority.LOW), Options.tag("app=spanner,env=test"))); + assertEquals(Priority.PRIORITY_LOW, request.getRequestOptions().getPriority()); + assertThat(request.getRequestOptions().getRequestTag()).isEqualTo("app=spanner,env=test"); + } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java index 3827b2a280..5388af76c4 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java @@ -240,6 +240,32 @@ public void writeAtLeastOnce() throws ParseException { assertThat(request.getMutationsList()).containsExactly(mutation); } + @Test + public void writeAtLeastOnceWithOptions() throws ParseException { + String tag = "app=spanner,env=test"; + String timestampString = "2015-10-01T10:54:20.021Z"; + ArgumentCaptor commit = ArgumentCaptor.forClass(CommitRequest.class); + CommitResponse response = + CommitResponse.newBuilder().setCommitTimestamp(Timestamps.parse(timestampString)).build(); + Mockito.when(rpc.commit(commit.capture(), Mockito.eq(options))).thenReturn(response); + session.writeAtLeastOnceWithOptions( + Arrays.asList(Mutation.newInsertBuilder("T").set("C").to("x").build()), Options.tag(tag)); + + CommitRequest request = commit.getValue(); + assertThat(request.getRequestOptions().getTransactionTag()).isEqualTo(tag); + com.google.spanner.v1.Mutation mutation = + com.google.spanner.v1.Mutation.newBuilder() + .setInsert( + Write.newBuilder() + .setTable("T") + .addColumns("C") + .addValues( + ListValue.newBuilder() + .addValues(com.google.protobuf.Value.newBuilder().setStringValue("x")))) + .build(); + assertThat(request.getMutationsList()).containsExactly(mutation); + } + private static long utcTimeSeconds(int year, int month, int day, int hour, int min, int secs) { GregorianCalendar calendar = new GregorianCalendar(TimeZone.getTimeZone("UTC")); calendar.set(year, month, day, hour, min, secs);