From c7dc6e6b11af76cb5db1f160c4466a5d75b524b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Knut=20Olav=20L=C3=B8ite?= Date: Sun, 17 Jan 2021 23:58:52 +0100 Subject: [PATCH] fix: UNAVAILABLE error on first query could cause transaction to get stuck (#807) If the first query or read operation of a read/write transaction would return UNAVAILABLE for the first element of the result stream, the transaction could get stuck. This was caused by the internal retry mechanism that would wait for the initial attempt to return a transaction, which was never returned as the UNAVAILABLE exception was internally handled by the result stream iterator. Fixes #799 --- .../cloud/spanner/AbstractReadContext.java | 27 +++- .../cloud/spanner/AbstractResultSet.java | 1 + .../cloud/spanner/TransactionRunnerImpl.java | 10 +- .../spanner/AbstractReadContextTest.java | 8 +- .../spanner/InlineBeginTransactionTest.java | 127 +++++++++++++++++- 5 files changed, 160 insertions(+), 13 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java index 6b05864bc2..5e0e30b7dc 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java @@ -558,7 +558,7 @@ QueryOptions buildQueryOptions(QueryOptions requestOptions) { } ExecuteSqlRequest.Builder getExecuteSqlRequestBuilder( - Statement statement, QueryMode queryMode, Options options) { + Statement statement, QueryMode queryMode, Options options, boolean withTransactionSelector) { ExecuteSqlRequest.Builder builder = ExecuteSqlRequest.newBuilder() .setSql(statement.getSql()) @@ -572,9 +572,11 @@ ExecuteSqlRequest.Builder getExecuteSqlRequestBuilder( builder.putParamTypes(param.getKey(), param.getValue().getType().toProto()); } } - TransactionSelector selector = getTransactionSelector(); - if (selector != null) { - builder.setTransaction(selector); + if (withTransactionSelector) { + TransactionSelector selector = getTransactionSelector(); + if (selector != null) { + builder.setTransaction(selector); + } } builder.setSeqno(getSeqNo()); builder.setQueryOptions(buildQueryOptions(statement.getQueryOptions())); @@ -619,18 +621,26 @@ ResultSet executeQueryInternalWithOptions( beforeReadOrQuery(); final int prefetchChunks = options.hasPrefetchChunks() ? options.prefetchChunks() : defaultPrefetchChunks; + final ExecuteSqlRequest.Builder request = + getExecuteSqlRequestBuilder( + statement, queryMode, options, /* withTransactionSelector = */ false); ResumableStreamIterator stream = new ResumableStreamIterator(MAX_BUFFERED_CHUNKS, SpannerImpl.QUERY, span) { @Override CloseableIterator startStream(@Nullable ByteString resumeToken) { GrpcStreamIterator stream = new GrpcStreamIterator(statement, prefetchChunks); - final ExecuteSqlRequest.Builder request = - getExecuteSqlRequestBuilder(statement, queryMode, options); if (partitionToken != null) { request.setPartitionToken(partitionToken); } + TransactionSelector selector = null; if (resumeToken != null) { request.setResumeToken(resumeToken); + selector = getTransactionSelector(); + } else if (!request.hasTransaction()) { + selector = getTransactionSelector(); + } + if (selector != null) { + request.setTransaction(selector); } SpannerRpc.StreamingCall call = rpc.executeQuery(request.build(), stream.consumer(), session.getOptions()); @@ -738,10 +748,13 @@ ResultSet readInternalWithOptions( @Override CloseableIterator startStream(@Nullable ByteString resumeToken) { GrpcStreamIterator stream = new GrpcStreamIterator(prefetchChunks); + TransactionSelector selector = null; if (resumeToken != null) { builder.setResumeToken(resumeToken); + selector = getTransactionSelector(); + } else if (!builder.hasTransaction()) { + selector = getTransactionSelector(); } - TransactionSelector selector = getTransactionSelector(); if (selector != null) { builder.setTransaction(selector); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java index f9ddb18c43..1d26adc757 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java @@ -1080,6 +1080,7 @@ protected PartialResultSet computeNext() { backoffSleep(context, backOff); } } + continue; } span.addAnnotation("Stream broken. Not safe to retry"); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java index 558da27c59..c3973c1807 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java @@ -583,7 +583,10 @@ public long executeUpdate(Statement statement, UpdateOption... options) { beforeReadOrQuery(); final ExecuteSqlRequest.Builder builder = getExecuteSqlRequestBuilder( - statement, QueryMode.NORMAL, Options.fromUpdateOptions(options)); + statement, + QueryMode.NORMAL, + Options.fromUpdateOptions(options), + /* withTransactionSelector = */ true); try { com.google.spanner.v1.ResultSet resultSet = rpc.executeQuery(builder.build(), session.getOptions()); @@ -608,7 +611,10 @@ public ApiFuture executeUpdateAsync(Statement statement, UpdateOption... o beforeReadOrQuery(); final ExecuteSqlRequest.Builder builder = getExecuteSqlRequestBuilder( - statement, QueryMode.NORMAL, Options.fromUpdateOptions(options)); + statement, + QueryMode.NORMAL, + Options.fromUpdateOptions(options), + /* withTransactionSelector = */ true); final ApiFuture resultSet; try { // Register the update as an async operation that must finish before the transaction may diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AbstractReadContextTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AbstractReadContextTest.java index 1ca3164e2f..bc0e475f44 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AbstractReadContextTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AbstractReadContextTest.java @@ -90,7 +90,10 @@ public void executeSqlRequestBuilderWithoutQueryOptions() { ExecuteSqlRequest request = context .getExecuteSqlRequestBuilder( - Statement.of("SELECT FOO FROM BAR"), QueryMode.NORMAL, Options.fromQueryOptions()) + Statement.of("SELECT FOO FROM BAR"), + QueryMode.NORMAL, + Options.fromQueryOptions(), + true) .build(); assertThat(request.getSql()).isEqualTo("SELECT FOO FROM BAR"); assertThat(request.getQueryOptions()).isEqualTo(defaultQueryOptions); @@ -105,7 +108,8 @@ public void executeSqlRequestBuilderWithQueryOptions() { .withQueryOptions(QueryOptions.newBuilder().setOptimizerVersion("2.0").build()) .build(), QueryMode.NORMAL, - Options.fromQueryOptions()) + Options.fromQueryOptions(), + true) .build(); assertThat(request.getSql()).isEqualTo("SELECT FOO FROM BAR"); assertThat(request.getQueryOptions().getOptimizerVersion()).isEqualTo("2.0"); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java index 0ec190d968..e86ffd8554 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java @@ -257,6 +257,130 @@ public Long run(TransactionContext transaction) throws Exception { assertThat(countTransactionsStarted()).isEqualTo(2); } + @Test + public void testInlinedBeginFirstUpdateAborts() { + DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + long updateCount = + client + .readWriteTransaction() + .run( + new TransactionCallable() { + boolean firstAttempt = true; + + @Override + public Long run(TransactionContext transaction) throws Exception { + if (firstAttempt) { + firstAttempt = false; + mockSpanner.putStatementResult( + StatementResult.exception( + UPDATE_STATEMENT, + mockSpanner.createAbortedException( + ByteString.copyFromUtf8("some-tx")))); + } else { + mockSpanner.putStatementResult( + StatementResult.update(UPDATE_STATEMENT, UPDATE_COUNT)); + } + return transaction.executeUpdate(UPDATE_STATEMENT); + } + }); + assertThat(updateCount).isEqualTo(UPDATE_COUNT); + assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(1); + assertThat(countRequests(ExecuteSqlRequest.class)).isEqualTo(2); + assertThat(countRequests(CommitRequest.class)).isEqualTo(1); + } + + @Test + public void testInlinedBeginFirstQueryAborts() { + DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + long updateCount = + client + .readWriteTransaction() + .run( + new TransactionCallable() { + boolean firstAttempt = true; + + @Override + public Long run(TransactionContext transaction) throws Exception { + if (firstAttempt) { + firstAttempt = false; + mockSpanner.putStatementResult( + StatementResult.exception( + SELECT1, + mockSpanner.createAbortedException( + ByteString.copyFromUtf8("some-tx")))); + } else { + mockSpanner.putStatementResult( + StatementResult.query(SELECT1, SELECT1_RESULTSET)); + } + try (ResultSet rs = transaction.executeQuery(SELECT1)) { + while (rs.next()) { + return rs.getLong(0); + } + } + return 0L; + } + }); + assertThat(updateCount).isEqualTo(1L); + assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(1); + assertThat(countRequests(ExecuteSqlRequest.class)).isEqualTo(2); + assertThat(countRequests(CommitRequest.class)).isEqualTo(1); + } + + @Test + public void testInlinedBeginFirstQueryReturnsUnavailable() { + DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + mockSpanner.setExecuteStreamingSqlExecutionTime( + SimulatedExecutionTime.ofStreamException(Status.UNAVAILABLE.asRuntimeException(), 0)); + long value = + client + .readWriteTransaction() + .run( + new TransactionCallable() { + @Override + public Long run(TransactionContext transaction) throws Exception { + // The first attempt will return UNAVAILABLE and retry internally. + try (ResultSet rs = transaction.executeQuery(SELECT1)) { + while (rs.next()) { + return rs.getLong(0); + } + } + return 0L; + } + }); + assertThat(value).isEqualTo(1L); + assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0); + assertThat(countRequests(ExecuteSqlRequest.class)).isEqualTo(2); + assertThat(countRequests(CommitRequest.class)).isEqualTo(1); + } + + @Test + public void testInlinedBeginFirstReadReturnsUnavailable() { + DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + mockSpanner.setStreamingReadExecutionTime( + SimulatedExecutionTime.ofStreamException(Status.UNAVAILABLE.asRuntimeException(), 0)); + long value = + client + .readWriteTransaction() + .run( + new TransactionCallable() { + @Override + public Long run(TransactionContext transaction) throws Exception { + // The first attempt will return UNAVAILABLE and retry internally. + try (ResultSet rs = + transaction.read("FOO", KeySet.all(), Arrays.asList("ID"))) { + while (rs.next()) { + return rs.getLong(0); + } + } + return 0L; + } + }); + assertThat(value).isEqualTo(1L); + assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0); + assertThat(countRequests(ReadRequest.class)).isEqualTo(2); + assertThat(countRequests(CommitRequest.class)).isEqualTo(1); + } + @Test public void testInlinedBeginTxWithQuery() { DatabaseClient client = @@ -285,8 +409,7 @@ public Long run(TransactionContext transaction) throws Exception { @Test public void testInlinedBeginTxWithRead() { - DatabaseClient client = - spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]")); + DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); long updateCount = client .readWriteTransaction()