diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java index 48ec1d87b5..8412fac67e 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java @@ -790,7 +790,7 @@ CloseableIterator startStream(@Nullable ByteString resumeToken SpannerRpc.StreamingCall call = rpc.read(builder.build(), stream.consumer(), session.getOptions()); call.request(prefetchChunks); - stream.setCall(call, selector != null && selector.hasBegin()); + stream.setCall(call, /* withBeginTransaction = */ builder.getTransaction().hasBegin()); return stream; } }; diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java index 49cf579e3a..eb3f65176b 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java @@ -605,7 +605,7 @@ public void testInlinedBeginFirstReadReturnsUnavailable() { DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); mockSpanner.setStreamingReadExecutionTime( SimulatedExecutionTime.ofStreamException(Status.UNAVAILABLE.asRuntimeException(), 0)); - long value = + Long value = client .readWriteTransaction() .run( @@ -625,6 +625,381 @@ public void testInlinedBeginFirstReadReturnsUnavailable() { assertThat(countRequests(CommitRequest.class)).isEqualTo(1); } + @Test + public void testInlinedBeginFirstReadReturnsUnavailableRetryReturnsAborted() { + DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + mockSpanner.setStreamingReadExecutionTime( + SimulatedExecutionTime.ofExceptions( + Arrays.asList( + Status.UNAVAILABLE.asRuntimeException(), Status.ABORTED.asRuntimeException()))); + Long value = + client + .readWriteTransaction() + .run( + transaction -> { + // The first attempt will return UNAVAILABLE and retry internally. + // The second attempt will return ABORTED and should cause the transaction to + // retry. + try (ResultSet rs = + transaction.read("FOO", KeySet.all(), Collections.singletonList("ID"))) { + if (rs.next()) { + return rs.getLong(0); + } + } + return 0L; + }); + assertThat(value).isEqualTo(1L); + assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(1); + assertThat(countRequests(ReadRequest.class)).isEqualTo(3); + assertThat(countRequests(CommitRequest.class)).isEqualTo(1); + } + + @Test + public void testInlinedBeginFirstQueryReturnsUnavailableRetryReturnsAborted() { + DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + mockSpanner.setExecuteStreamingSqlExecutionTime( + SimulatedExecutionTime.ofExceptions( + Arrays.asList( + Status.UNAVAILABLE.asRuntimeException(), Status.ABORTED.asRuntimeException()))); + Long value = + client + .readWriteTransaction() + .run( + transaction -> { + // The first attempt will return UNAVAILABLE and retry internally. + // The second attempt will return ABORTED and should cause the transaction to + // retry. + try (ResultSet rs = transaction.executeQuery(SELECT1)) { + if (rs.next()) { + return rs.getLong(0); + } + } + return 0L; + }); + assertThat(value).isEqualTo(1L); + assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(1); + assertThat(countRequests(ExecuteSqlRequest.class)).isEqualTo(3); + assertThat(countRequests(CommitRequest.class)).isEqualTo(1); + } + + @Test + public void testInlinedBeginFirstDmlReturnsUnavailableRetryReturnsAborted() { + DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + mockSpanner.setExecuteSqlExecutionTime( + SimulatedExecutionTime.ofExceptions( + Arrays.asList( + Status.UNAVAILABLE.asRuntimeException(), Status.ABORTED.asRuntimeException()))); + Long value = + client + .readWriteTransaction() + .run( + transaction -> { + // The first attempt will return UNAVAILABLE and retry internally. + // The second attempt will return ABORTED and should cause the transaction to + // retry. + return transaction.executeUpdate(UPDATE_STATEMENT); + }); + assertThat(value).isEqualTo(UPDATE_COUNT); + assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(1); + assertThat(countRequests(ExecuteSqlRequest.class)).isEqualTo(3); + assertThat(countRequests(CommitRequest.class)).isEqualTo(1); + } + + @Test + public void testInlinedBeginFirstReadReturnsUnavailableRetryReturnsAborted_WithCatchAll() { + DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + mockSpanner.setStreamingReadExecutionTime( + SimulatedExecutionTime.ofExceptions( + Arrays.asList( + Status.UNAVAILABLE.asRuntimeException(), Status.ABORTED.asRuntimeException()))); + Long value = + client + .readWriteTransaction() + .run( + transaction -> { + // The first attempt will return UNAVAILABLE and retry internally. + // The second attempt will return ABORTED and should cause the transaction to + // retry. + try (ResultSet rs = + transaction.read("FOO", KeySet.all(), Collections.singletonList("ID"))) { + if (rs.next()) { + return rs.getLong(0); + } + } catch (AbortedException e) { + // Ignore the AbortedException and let the commit handle it. + } + return 0L; + }); + assertThat(value).isEqualTo(1L); + assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(1); + assertThat(countRequests(ReadRequest.class)).isEqualTo(3); + assertThat(countRequests(CommitRequest.class)).isEqualTo(1); + } + + @Test + public void testInlinedBeginFirstQueryReturnsUnavailableRetryReturnsAborted_WithCatchAll() { + DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + mockSpanner.setExecuteSqlExecutionTime( + SimulatedExecutionTime.ofExceptions( + Arrays.asList( + Status.UNAVAILABLE.asRuntimeException(), Status.ABORTED.asRuntimeException()))); + Long value = + client + .readWriteTransaction() + .run( + transaction -> { + // The first attempt will return UNAVAILABLE and retry internally. + // The second attempt will return ABORTED and should cause the transaction to + // retry. + try { + return transaction.executeUpdate(UPDATE_STATEMENT); + } catch (AbortedException e) { + // Ignore the AbortedException and let the commit handle it. + } + return 0L; + }); + assertThat(value).isEqualTo(UPDATE_COUNT); + assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(1); + assertThat(countRequests(ExecuteSqlRequest.class)).isEqualTo(3); + assertThat(countRequests(CommitRequest.class)).isEqualTo(1); + } + + @Test + public void testInlinedBeginFirstDmlReturnsUnavailableRetryReturnsAborted_WithCatchAll() { + DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + mockSpanner.setExecuteStreamingSqlExecutionTime( + SimulatedExecutionTime.ofExceptions( + Arrays.asList( + Status.UNAVAILABLE.asRuntimeException(), Status.ABORTED.asRuntimeException()))); + Long value = + client + .readWriteTransaction() + .run( + transaction -> { + // The first attempt will return UNAVAILABLE and retry internally. + // The second attempt will return ABORTED and should cause the transaction to + // retry. + try (ResultSet rs = transaction.executeQuery(SELECT1)) { + if (rs.next()) { + return rs.getLong(0); + } + } catch (AbortedException e) { + // Ignore the AbortedException and let the commit handle it. + } + return 0L; + }); + assertThat(value).isEqualTo(1L); + assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(1); + assertThat(countRequests(ExecuteSqlRequest.class)).isEqualTo(3); + assertThat(countRequests(CommitRequest.class)).isEqualTo(1); + } + + @Test + public void testInlinedBeginFirstReadCancelledSecondReadAborted_WithCatch() { + DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + mockSpanner.setStreamingReadExecutionTime( + SimulatedExecutionTime.ofException(Status.CANCELLED.asRuntimeException())); + Long value = + client + .readWriteTransaction() + .run( + transaction -> { + try (ResultSet rs = + transaction.read("FOO", KeySet.all(), Collections.singletonList("ID"))) { + if (rs.next()) { + return rs.getLong(0); + } + } catch (SpannerException e) { + if (e.getErrorCode() == ErrorCode.CANCELLED) { + // Ignore and let the transaction continue. + // Also make sure that the next read operation will return Aborted. + mockSpanner.abortNextTransaction(); + } else if (e.getErrorCode() == ErrorCode.ABORTED) { + // Ignore Aborted errors. This will cause the transaction to try to commit. + } else { + // Propagate any other errors (there should not be any in this test case). + throw e; + } + } + return 0L; + }); + + assertThat(value).isEqualTo(1L); + // 1. The initial attempt will inline the BeginTransaction option. + // 2. The CANCELLED error during the first attempt will cause a retry with a BeginTransaction + // RPC. + // 3. The ABORTED error during the second attempt will NOT cause the next retry to use an + // explicit BeginTransaction RPC, because the previous attempt did return a transaction ID + // (the ID that was returned by the BeginTransaction RPC of that attempt). + assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(1); + // There will be 3 attempts to read: + // 1. The first will return CANCELLED. + // 2. The second will return ABORTED. + // 3. The third will return the results. + assertThat(countRequests(ReadRequest.class)).isEqualTo(3); + // There are two attempts to commit: + // 1. The initial attempt will NOT try to commit, because the initial Read operation did not + // return a transaction ID. + // 2. The second attempt will try to commit, because the BeginTransaction RPC did return a + // transaction ID, and the Aborted error that was returned by the Read operation was caught + // by the application. This means that the TransactionRunner does not know that the + // transaction was aborted. The Commit RPC will return an Aborted error. + // 3. The third attempt will commit, as the Read operation succeeded and returned a + // transaction ID. + assertThat(countRequests(CommitRequest.class)).isEqualTo(2); + } + + @Test + public void testInlinedBeginFirstReadCancelledSecondReadAborted_WithoutCatch() + throws InterruptedException, ExecutionException { + DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + mockSpanner.setStreamingReadExecutionTime( + SimulatedExecutionTime.ofException(Status.CANCELLED.asRuntimeException())); + // The CANCELLED error is not caught by the application, so it will bubble up and cause the + // transaction to fail. + assertThrows( + SpannerException.class, + () -> + client + .readWriteTransaction() + .run( + transaction -> { + try (ResultSet rs = + transaction.read( + "FOO", KeySet.all(), Collections.singletonList("ID"))) { + if (rs.next()) { + return rs.getLong(0); + } + } catch (SpannerException e) { + if (e.getErrorCode() == ErrorCode.CANCELLED) { + // Make sure that the next read operation will return Aborted. + mockSpanner.abortNextTransaction(); + } + // Always propagate the error to the TransactionRunner. + throw e; + } + return 0L; + })); + + // The initial attempt will inline the BeginTransaction option. + // There is no second attempt as the CANCELLED error is not caught. + assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0); + assertThat(countRequests(ReadRequest.class)).isEqualTo(1); + assertThat(countRequests(CommitRequest.class)).isEqualTo(0); + // The CANCELLED error means that there is no transaction ID returned by the Read operation. + // So there is also no transaction to rollback. + assertThat(countRequests(RollbackRequest.class)).isEqualTo(0); + } + + @Test + public void testInlinedBeginFirstReadCancelledSecondReadAborted_WithCatchForCancelled() + throws InterruptedException, ExecutionException { + DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + mockSpanner.setStreamingReadExecutionTime( + SimulatedExecutionTime.ofException(Status.CANCELLED.asRuntimeException())); + Long value = + client + .readWriteTransaction() + .run( + transaction -> { + try (ResultSet rs = + transaction.read("FOO", KeySet.all(), Collections.singletonList("ID"))) { + if (rs.next()) { + return rs.getLong(0); + } + } catch (SpannerException e) { + if (e.getErrorCode() == ErrorCode.CANCELLED) { + // Do not propagate the CANCELLED error. + // Make sure that the next read operation will return Aborted. + mockSpanner.abortNextTransaction(); + } else { + // Propagate all other errors to the TransactionRunner. + throw e; + } + } + return 0L; + }); + + assertThat(value).isEqualTo(1L); + // 1. The initial attempt will inline the BeginTransaction option. + // 2. The CANCELLED error during the first attempt will cause a retry with a BeginTransaction + // RPC, because the error was returned by the first statement in the transaction. + // 3. The ABORTED error during the second attempt will NOT cause the next retry to use an + // explicit BeginTransaction RPC, because the previous attempt did return a transaction ID + // (the ID that was returned by the BeginTransaction RPC of that attempt). + assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(1); + // There will be 3 attempts to read: + // 1. The first will return CANCELLED. + // 2. The second will return ABORTED. + // 3. The third will return the results. + assertThat(countRequests(ReadRequest.class)).isEqualTo(3); + // There is only one attempt to commit: + // 1. The initial attempt will NOT try to commit, because the initial Read operation did not + // return a transaction ID. + // 2. The second attempt will NOT try to commit, because the Aborted error from the Read + // operation is propagated to the TransactionRunner. This means that the TransactionRunner + // knows that the transaction was aborted, and will automatically initiate a retry without + // first trying to commit the transaction. + // 3. The third attempt will commit, as the Read operation succeeded and returned a + // transaction ID. + assertThat(countRequests(CommitRequest.class)).isEqualTo(1); + } + + @Test + public void testInlinedBeginCommitAfterReadReturnsUnavailable() { + DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + mockSpanner.setCommitExecutionTime( + SimulatedExecutionTime.ofException(Status.UNAVAILABLE.asRuntimeException())); + Long value = + client + .readWriteTransaction() + .run( + transaction -> { + // The first attempt will return UNAVAILABLE and retry internally. + try (ResultSet rs = + transaction.read("FOO", KeySet.all(), Collections.singletonList("ID"))) { + if (rs.next()) { + return rs.getLong(0); + } + } + return 0L; + }); + assertThat(value).isEqualTo(1L); + assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0); + assertThat(countRequests(ReadRequest.class)).isEqualTo(1); + assertThat(countRequests(CommitRequest.class)).isEqualTo(2); + } + + @Test + public void testInlinedBeginFirstReadReturnsUnavailableAndCommitAborts() { + DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + mockSpanner.setStreamingReadExecutionTime( + SimulatedExecutionTime.ofStreamException(Status.UNAVAILABLE.asRuntimeException(), 0)); + final AtomicBoolean firstAttempt = new AtomicBoolean(true); + Long value = + client + .readWriteTransaction() + .run( + transaction -> { + long res = 0L; + // The first attempt will return UNAVAILABLE and retry internally. + try (ResultSet rs = + transaction.read("FOO", KeySet.all(), Collections.singletonList("ID"))) { + if (rs.next()) { + res = rs.getLong(0); + } + } + if (firstAttempt.compareAndSet(true, false)) { + mockSpanner.abortTransaction(transaction); + } + return res; + }); + assertThat(value).isEqualTo(1L); + assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0); + assertThat(countRequests(ReadRequest.class)).isEqualTo(3); + assertThat(countRequests(CommitRequest.class)).isEqualTo(2); + } + @Test public void testInlinedBeginTxWithQuery() { DatabaseClient client =