diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java index 6b05864bc2..5e0e30b7dc 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java @@ -558,7 +558,7 @@ QueryOptions buildQueryOptions(QueryOptions requestOptions) { } ExecuteSqlRequest.Builder getExecuteSqlRequestBuilder( - Statement statement, QueryMode queryMode, Options options) { + Statement statement, QueryMode queryMode, Options options, boolean withTransactionSelector) { ExecuteSqlRequest.Builder builder = ExecuteSqlRequest.newBuilder() .setSql(statement.getSql()) @@ -572,9 +572,11 @@ ExecuteSqlRequest.Builder getExecuteSqlRequestBuilder( builder.putParamTypes(param.getKey(), param.getValue().getType().toProto()); } } - TransactionSelector selector = getTransactionSelector(); - if (selector != null) { - builder.setTransaction(selector); + if (withTransactionSelector) { + TransactionSelector selector = getTransactionSelector(); + if (selector != null) { + builder.setTransaction(selector); + } } builder.setSeqno(getSeqNo()); builder.setQueryOptions(buildQueryOptions(statement.getQueryOptions())); @@ -619,18 +621,26 @@ ResultSet executeQueryInternalWithOptions( beforeReadOrQuery(); final int prefetchChunks = options.hasPrefetchChunks() ? options.prefetchChunks() : defaultPrefetchChunks; + final ExecuteSqlRequest.Builder request = + getExecuteSqlRequestBuilder( + statement, queryMode, options, /* withTransactionSelector = */ false); ResumableStreamIterator stream = new ResumableStreamIterator(MAX_BUFFERED_CHUNKS, SpannerImpl.QUERY, span) { @Override CloseableIterator startStream(@Nullable ByteString resumeToken) { GrpcStreamIterator stream = new GrpcStreamIterator(statement, prefetchChunks); - final ExecuteSqlRequest.Builder request = - getExecuteSqlRequestBuilder(statement, queryMode, options); if (partitionToken != null) { request.setPartitionToken(partitionToken); } + TransactionSelector selector = null; if (resumeToken != null) { request.setResumeToken(resumeToken); + selector = getTransactionSelector(); + } else if (!request.hasTransaction()) { + selector = getTransactionSelector(); + } + if (selector != null) { + request.setTransaction(selector); } SpannerRpc.StreamingCall call = rpc.executeQuery(request.build(), stream.consumer(), session.getOptions()); @@ -738,10 +748,13 @@ ResultSet readInternalWithOptions( @Override CloseableIterator startStream(@Nullable ByteString resumeToken) { GrpcStreamIterator stream = new GrpcStreamIterator(prefetchChunks); + TransactionSelector selector = null; if (resumeToken != null) { builder.setResumeToken(resumeToken); + selector = getTransactionSelector(); + } else if (!builder.hasTransaction()) { + selector = getTransactionSelector(); } - TransactionSelector selector = getTransactionSelector(); if (selector != null) { builder.setTransaction(selector); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java index f9ddb18c43..1d26adc757 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java @@ -1080,6 +1080,7 @@ protected PartialResultSet computeNext() { backoffSleep(context, backOff); } } + continue; } span.addAnnotation("Stream broken. Not safe to retry"); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java index 558da27c59..c3973c1807 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java @@ -583,7 +583,10 @@ public long executeUpdate(Statement statement, UpdateOption... options) { beforeReadOrQuery(); final ExecuteSqlRequest.Builder builder = getExecuteSqlRequestBuilder( - statement, QueryMode.NORMAL, Options.fromUpdateOptions(options)); + statement, + QueryMode.NORMAL, + Options.fromUpdateOptions(options), + /* withTransactionSelector = */ true); try { com.google.spanner.v1.ResultSet resultSet = rpc.executeQuery(builder.build(), session.getOptions()); @@ -608,7 +611,10 @@ public ApiFuture executeUpdateAsync(Statement statement, UpdateOption... o beforeReadOrQuery(); final ExecuteSqlRequest.Builder builder = getExecuteSqlRequestBuilder( - statement, QueryMode.NORMAL, Options.fromUpdateOptions(options)); + statement, + QueryMode.NORMAL, + Options.fromUpdateOptions(options), + /* withTransactionSelector = */ true); final ApiFuture resultSet; try { // Register the update as an async operation that must finish before the transaction may diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AbstractReadContextTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AbstractReadContextTest.java index 1ca3164e2f..bc0e475f44 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AbstractReadContextTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AbstractReadContextTest.java @@ -90,7 +90,10 @@ public void executeSqlRequestBuilderWithoutQueryOptions() { ExecuteSqlRequest request = context .getExecuteSqlRequestBuilder( - Statement.of("SELECT FOO FROM BAR"), QueryMode.NORMAL, Options.fromQueryOptions()) + Statement.of("SELECT FOO FROM BAR"), + QueryMode.NORMAL, + Options.fromQueryOptions(), + true) .build(); assertThat(request.getSql()).isEqualTo("SELECT FOO FROM BAR"); assertThat(request.getQueryOptions()).isEqualTo(defaultQueryOptions); @@ -105,7 +108,8 @@ public void executeSqlRequestBuilderWithQueryOptions() { .withQueryOptions(QueryOptions.newBuilder().setOptimizerVersion("2.0").build()) .build(), QueryMode.NORMAL, - Options.fromQueryOptions()) + Options.fromQueryOptions(), + true) .build(); assertThat(request.getSql()).isEqualTo("SELECT FOO FROM BAR"); assertThat(request.getQueryOptions().getOptimizerVersion()).isEqualTo("2.0"); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java index 0ec190d968..e86ffd8554 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java @@ -257,6 +257,130 @@ public Long run(TransactionContext transaction) throws Exception { assertThat(countTransactionsStarted()).isEqualTo(2); } + @Test + public void testInlinedBeginFirstUpdateAborts() { + DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + long updateCount = + client + .readWriteTransaction() + .run( + new TransactionCallable() { + boolean firstAttempt = true; + + @Override + public Long run(TransactionContext transaction) throws Exception { + if (firstAttempt) { + firstAttempt = false; + mockSpanner.putStatementResult( + StatementResult.exception( + UPDATE_STATEMENT, + mockSpanner.createAbortedException( + ByteString.copyFromUtf8("some-tx")))); + } else { + mockSpanner.putStatementResult( + StatementResult.update(UPDATE_STATEMENT, UPDATE_COUNT)); + } + return transaction.executeUpdate(UPDATE_STATEMENT); + } + }); + assertThat(updateCount).isEqualTo(UPDATE_COUNT); + assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(1); + assertThat(countRequests(ExecuteSqlRequest.class)).isEqualTo(2); + assertThat(countRequests(CommitRequest.class)).isEqualTo(1); + } + + @Test + public void testInlinedBeginFirstQueryAborts() { + DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + long updateCount = + client + .readWriteTransaction() + .run( + new TransactionCallable() { + boolean firstAttempt = true; + + @Override + public Long run(TransactionContext transaction) throws Exception { + if (firstAttempt) { + firstAttempt = false; + mockSpanner.putStatementResult( + StatementResult.exception( + SELECT1, + mockSpanner.createAbortedException( + ByteString.copyFromUtf8("some-tx")))); + } else { + mockSpanner.putStatementResult( + StatementResult.query(SELECT1, SELECT1_RESULTSET)); + } + try (ResultSet rs = transaction.executeQuery(SELECT1)) { + while (rs.next()) { + return rs.getLong(0); + } + } + return 0L; + } + }); + assertThat(updateCount).isEqualTo(1L); + assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(1); + assertThat(countRequests(ExecuteSqlRequest.class)).isEqualTo(2); + assertThat(countRequests(CommitRequest.class)).isEqualTo(1); + } + + @Test + public void testInlinedBeginFirstQueryReturnsUnavailable() { + DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + mockSpanner.setExecuteStreamingSqlExecutionTime( + SimulatedExecutionTime.ofStreamException(Status.UNAVAILABLE.asRuntimeException(), 0)); + long value = + client + .readWriteTransaction() + .run( + new TransactionCallable() { + @Override + public Long run(TransactionContext transaction) throws Exception { + // The first attempt will return UNAVAILABLE and retry internally. + try (ResultSet rs = transaction.executeQuery(SELECT1)) { + while (rs.next()) { + return rs.getLong(0); + } + } + return 0L; + } + }); + assertThat(value).isEqualTo(1L); + assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0); + assertThat(countRequests(ExecuteSqlRequest.class)).isEqualTo(2); + assertThat(countRequests(CommitRequest.class)).isEqualTo(1); + } + + @Test + public void testInlinedBeginFirstReadReturnsUnavailable() { + DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + mockSpanner.setStreamingReadExecutionTime( + SimulatedExecutionTime.ofStreamException(Status.UNAVAILABLE.asRuntimeException(), 0)); + long value = + client + .readWriteTransaction() + .run( + new TransactionCallable() { + @Override + public Long run(TransactionContext transaction) throws Exception { + // The first attempt will return UNAVAILABLE and retry internally. + try (ResultSet rs = + transaction.read("FOO", KeySet.all(), Arrays.asList("ID"))) { + while (rs.next()) { + return rs.getLong(0); + } + } + return 0L; + } + }); + assertThat(value).isEqualTo(1L); + assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0); + assertThat(countRequests(ReadRequest.class)).isEqualTo(2); + assertThat(countRequests(CommitRequest.class)).isEqualTo(1); + } + @Test public void testInlinedBeginTxWithQuery() { DatabaseClient client = @@ -285,8 +409,7 @@ public Long run(TransactionContext transaction) throws Exception { @Test public void testInlinedBeginTxWithRead() { - DatabaseClient client = - spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]")); + DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); long updateCount = client .readWriteTransaction()