From c4776e42ad4a2795b0bfc6e1a9fb10c40d64a809 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Knut=20Olav=20L=C3=B8ite?= Date: Fri, 15 Jan 2021 00:01:49 +0100 Subject: [PATCH] fix: safeguard against statements errors when requesting tx (#800) --- .../cloud/spanner/AbstractReadContext.java | 9 +- .../cloud/spanner/AbstractResultSet.java | 9 +- .../cloud/spanner/TransactionRunnerImpl.java | 83 ++++-- .../cloud/spanner/GrpcResultSetTest.java | 3 +- .../spanner/InlineBeginTransactionTest.java | 274 ++++++++++++++++-- .../cloud/spanner/MockSpannerServiceImpl.java | 35 ++- .../cloud/spanner/ReadFormatTestRunner.java | 3 +- 7 files changed, 344 insertions(+), 72 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java index 9f2e30bf48..6b05864bc2 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java @@ -213,7 +213,7 @@ TransactionSelector getTransactionSelector() { } @Override - public void onTransactionMetadata(Transaction transaction) { + public void onTransactionMetadata(Transaction transaction, boolean shouldIncludeId) { synchronized (lock) { if (!transaction.hasReadTimestamp()) { throw newSpannerException( @@ -394,6 +394,9 @@ void initTransaction() { // much more frequently. private static final int MAX_BUFFERED_CHUNKS = 512; + protected static final String NO_TRANSACTION_RETURNED_MSG = + "The statement did not return a transaction even though one was requested"; + AbstractReadContext(Builder builder) { this.session = builder.session; this.rpc = builder.rpc; @@ -632,7 +635,7 @@ CloseableIterator startStream(@Nullable ByteString resumeToken SpannerRpc.StreamingCall call = rpc.executeQuery(request.build(), stream.consumer(), session.getOptions()); call.request(prefetchChunks); - stream.setCall(call, request.hasTransaction() && request.getTransaction().hasBegin()); + stream.setCall(call, request.getTransaction().hasBegin()); return stream; } }; @@ -685,7 +688,7 @@ public void close() { /** This method is called when a statement returned a new transaction as part of its results. */ @Override - public void onTransactionMetadata(Transaction transaction) {} + public void onTransactionMetadata(Transaction transaction, boolean shouldIncludeId) {} @Override public void onError(SpannerException e, boolean withBeginTransaction) {} diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java index a4d8490197..f9ddb18c43 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java @@ -78,7 +78,8 @@ interface Listener { * Called when transaction metadata is seen. This method may be invoked at most once. If the * method is invoked, it will precede {@link #onError(SpannerException)} or {@link #onDone()}. */ - void onTransactionMetadata(Transaction transaction) throws SpannerException; + void onTransactionMetadata(Transaction transaction, boolean shouldIncludeId) + throws SpannerException; /** Called when the read finishes with an error. */ void onError(SpannerException e, boolean withBeginTransaction); @@ -117,12 +118,12 @@ public boolean next() throws SpannerException { if (currRow == null) { ResultSetMetadata metadata = iterator.getMetadata(); if (metadata.hasTransaction()) { - listener.onTransactionMetadata(metadata.getTransaction()); + listener.onTransactionMetadata( + metadata.getTransaction(), iterator.isWithBeginTransaction()); } else if (iterator.isWithBeginTransaction()) { // The query should have returned a transaction. throw SpannerExceptionFactory.newSpannerException( - ErrorCode.FAILED_PRECONDITION, - "Query requested a transaction to be started, but no transaction was returned"); + ErrorCode.FAILED_PRECONDITION, AbstractReadContext.NO_TRANSACTION_RETURNED_MSG); } currRow = new GrpcStruct(iterator.type(), new ArrayList<>()); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java index 543a3c5487..558da27c59 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java @@ -463,13 +463,9 @@ TransactionSelector getTransactionSelector() { // Aborted error if the call that included the BeginTransaction option fails. The // Aborted error will cause the entire transaction to be retried, and the retry will use // a separate BeginTransaction RPC. - if (trackTransactionStarter) { - TransactionSelector.newBuilder() - .setId(tx.get(waitForTransactionTimeoutMillis, TimeUnit.MILLISECONDS)) - .build(); - } else { - TransactionSelector.newBuilder().setId(tx.get()).build(); - } + TransactionSelector.newBuilder() + .setId(tx.get(waitForTransactionTimeoutMillis, TimeUnit.MILLISECONDS)) + .build(); } } catch (ExecutionException e) { if (e.getCause() instanceof AbortedException) { @@ -479,11 +475,15 @@ TransactionSelector getTransactionSelector() { } throw SpannerExceptionFactory.newSpannerException(e.getCause()); } catch (TimeoutException e) { + // Throw an ABORTED exception to force a retry of the transaction if no transaction + // has been returned by the first statement. SpannerException se = SpannerExceptionFactory.newSpannerException( - ErrorCode.DEADLINE_EXCEEDED, - "Timeout while waiting for a transaction to be returned by another statement. " - + "See the suppressed exception for the stacktrace of the caller that should return a transaction", + ErrorCode.ABORTED, + "Timeout while waiting for a transaction to be returned by another statement." + + (trackTransactionStarter + ? " See the suppressed exception for the stacktrace of the caller that should return a transaction" + : ""), e); if (transactionStarter != null) { se.addSuppressed(transactionStarter); @@ -498,12 +498,20 @@ TransactionSelector getTransactionSelector() { } @Override - public void onTransactionMetadata(Transaction transaction) { - // A transaction has been returned by a statement that was executed. Set the id of the - // transaction on this instance and release the lock to allow other statements to proceed. - if (this.transactionId == null && transaction != null && transaction.getId() != null) { - this.transactionId = transaction.getId(); - this.transactionIdFuture.set(transaction.getId()); + public void onTransactionMetadata(Transaction transaction, boolean shouldIncludeId) { + Preconditions.checkNotNull(transaction); + if (transaction.getId() != ByteString.EMPTY) { + // A transaction has been returned by a statement that was executed. Set the id of the + // transaction on this instance and release the lock to allow other statements to proceed. + if ((transactionIdFuture == null || !this.transactionIdFuture.isDone()) + && this.transactionId == null) { + this.transactionId = transaction.getId(); + this.transactionIdFuture.set(transaction.getId()); + } + } else if (shouldIncludeId) { + // The statement should have returned a transaction. + throw SpannerExceptionFactory.newSpannerException( + ErrorCode.FAILED_PRECONDITION, AbstractReadContext.NO_TRANSACTION_RETURNED_MSG); } } @@ -580,7 +588,8 @@ public long executeUpdate(Statement statement, UpdateOption... options) { com.google.spanner.v1.ResultSet resultSet = rpc.executeQuery(builder.build(), session.getOptions()); if (resultSet.getMetadata().hasTransaction()) { - onTransactionMetadata(resultSet.getMetadata().getTransaction()); + onTransactionMetadata( + resultSet.getMetadata().getTransaction(), builder.getTransaction().hasBegin()); } if (!resultSet.hasStats()) { throw new IllegalArgumentException( @@ -588,9 +597,9 @@ public long executeUpdate(Statement statement, UpdateOption... options) { } // For standard DML, using the exact row count. return resultSet.getStats().getRowCountExact(); - } catch (SpannerException e) { - onError(e, builder.hasTransaction() && builder.getTransaction().hasBegin()); - throw e; + } catch (Throwable t) { + onError(SpannerExceptionFactory.asSpannerException(t), builder.getTransaction().hasBegin()); + throw t; } } @@ -621,6 +630,12 @@ public Long apply(ResultSet input) { ErrorCode.INVALID_ARGUMENT, "DML response missing stats possibly due to non-DML statement as input"); } + if (builder.getTransaction().hasBegin() + && !(input.getMetadata().hasTransaction() + && input.getMetadata().getTransaction().getId() != ByteString.EMPTY)) { + throw SpannerExceptionFactory.newSpannerException( + ErrorCode.FAILED_PRECONDITION, NO_TRANSACTION_RETURNED_MSG); + } // For standard DML, using the exact row count. return input.getStats().getRowCountExact(); } @@ -633,8 +648,8 @@ public Long apply(ResultSet input) { new ApiFunction() { @Override public Long apply(Throwable input) { - SpannerException e = SpannerExceptionFactory.newSpannerException(input); - onError(e, builder.hasTransaction() && builder.getTransaction().hasBegin()); + SpannerException e = SpannerExceptionFactory.asSpannerException(input); + onError(e, builder.getTransaction().hasBegin()); throw e; } }, @@ -645,9 +660,11 @@ public Long apply(Throwable input) { public void run() { try { if (resultSet.get().getMetadata().hasTransaction()) { - onTransactionMetadata(resultSet.get().getMetadata().getTransaction()); + onTransactionMetadata( + resultSet.get().getMetadata().getTransaction(), + builder.getTransaction().hasBegin()); } - } catch (ExecutionException | InterruptedException e) { + } catch (Throwable e) { // Ignore this error here as it is handled by the future that is returned by the // executeUpdateAsync method. } @@ -670,7 +687,9 @@ public long[] batchUpdate(Iterable statements, UpdateOption... option for (int i = 0; i < response.getResultSetsCount(); ++i) { results[i] = response.getResultSets(i).getStats().getRowCountExact(); if (response.getResultSets(i).getMetadata().hasTransaction()) { - onTransactionMetadata(response.getResultSets(i).getMetadata().getTransaction()); + onTransactionMetadata( + response.getResultSets(i).getMetadata().getTransaction(), + builder.getTransaction().hasBegin()); } } @@ -686,8 +705,8 @@ public long[] batchUpdate(Iterable statements, UpdateOption... option results); } return results; - } catch (SpannerException e) { - onError(e, builder.hasTransaction() && builder.getTransaction().hasBegin()); + } catch (Throwable e) { + onError(SpannerExceptionFactory.asSpannerException(e), builder.getTransaction().hasBegin()); throw e; } } @@ -718,7 +737,9 @@ public long[] apply(ExecuteBatchDmlResponse input) { for (int i = 0; i < input.getResultSetsCount(); ++i) { results[i] = input.getResultSets(i).getStats().getRowCountExact(); if (input.getResultSets(i).getMetadata().hasTransaction()) { - onTransactionMetadata(input.getResultSets(i).getMetadata().getTransaction()); + onTransactionMetadata( + input.getResultSets(i).getMetadata().getTransaction(), + builder.getTransaction().hasBegin()); } } // If one of the DML statements was aborted, we should throw an aborted exception. @@ -743,10 +764,8 @@ public long[] apply(ExecuteBatchDmlResponse input) { new ApiFunction() { @Override public long[] apply(Throwable input) { - SpannerException e = SpannerExceptionFactory.newSpannerException(input); - onError( - SpannerExceptionFactory.newSpannerException(e.getCause()), - builder.hasTransaction() && builder.getTransaction().hasBegin()); + SpannerException e = SpannerExceptionFactory.asSpannerException(input); + onError(e, builder.getTransaction().hasBegin()); throw e; } }, diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/GrpcResultSetTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/GrpcResultSetTest.java index b47cdc35ff..8f8c7aada8 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/GrpcResultSetTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/GrpcResultSetTest.java @@ -56,7 +56,8 @@ public class GrpcResultSetTest { private static class NoOpListener implements AbstractResultSet.Listener { @Override - public void onTransactionMetadata(Transaction transaction) throws SpannerException {} + public void onTransactionMetadata(Transaction transaction, boolean shouldIncludeId) + throws SpannerException {} @Override public void onError(SpannerException e, boolean withBeginTransaction) {} diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java index 663dac566c..0ec190d968 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java @@ -37,7 +37,6 @@ import com.google.cloud.spanner.TransactionRunner.TransactionCallable; import com.google.cloud.spanner.TransactionRunnerImpl.TransactionContextImpl; import com.google.common.base.Predicate; -import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.AbstractMessage; @@ -142,6 +141,8 @@ public static Collection data() { .build(); private static final Statement INVALID_SELECT = Statement.of("SELECT * FROM NON_EXISTING_TABLE"); private static final Statement READ_STATEMENT = Statement.of("SELECT ID FROM FOO WHERE 1=1"); + private static final Statement READ_ROW_STATEMENT = + Statement.of("SELECT BAR FROM FOO WHERE ID=1"); private Spanner spanner; @@ -154,6 +155,7 @@ public static void startStaticServer() throws IOException { mockSpanner.putStatementResult( StatementResult.query(SELECT1_UNION_ALL_SELECT2, SELECT1_UNION_ALL_SELECT2_RESULTSET)); mockSpanner.putStatementResult(StatementResult.query(READ_STATEMENT, SELECT1_RESULTSET)); + mockSpanner.putStatementResult(StatementResult.query(READ_ROW_STATEMENT, SELECT1_RESULTSET)); mockSpanner.putStatementResult( StatementResult.exception( INVALID_UPDATE_STATEMENT, @@ -1319,34 +1321,42 @@ public Void run(TransactionContext transaction) throws Exception { @Test public void testWaitForTransactionTimeout() { - mockSpanner.setExecuteSqlExecutionTime(SimulatedExecutionTime.ofMinimumAndRandomTime(1000, 0)); DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); - try { - client - .readWriteTransaction() - .run( - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) throws Exception { - TransactionContextImpl impl = (TransactionContextImpl) transaction; + client + .readWriteTransaction() + .run( + new TransactionCallable() { + int attempt = 0; + + @Override + public Void run(TransactionContext transaction) throws Exception { + attempt++; + TransactionContextImpl impl = (TransactionContextImpl) transaction; + if (attempt == 1) { impl.waitForTransactionTimeoutMillis = 1L; - transaction.executeUpdateAsync(UPDATE_STATEMENT); - try (ResultSet rs = transaction.executeQuery(SELECT1)) { - while (rs.next()) {} - } - return null; + // Freeze the mock server to prevent the first (async) statement from returning a + // transaction. + mockSpanner.freeze(); + } else { + impl.waitForTransactionTimeoutMillis = 60_000L; } - }); - fail("missing expected exception"); - } catch (SpannerException e) { - assertThat(e.getErrorCode()).isEqualTo(ErrorCode.DEADLINE_EXCEEDED); - assertThat(e.getSuppressed()).hasLength(1); - assertThat(Throwables.getStackTraceAsString(e.getSuppressed()[0])) - .contains("TransactionContextImpl.executeUpdateAsync"); - } + transaction.executeUpdateAsync(UPDATE_STATEMENT); + + // Try to execute a query. This will timeout during the first attempt while waiting + // for the first statement to return a transaction, and then force a retry of the + // transaction. + try (ResultSet rs = transaction.executeQuery(SELECT1)) { + while (rs.next()) {} + } catch (Throwable t) { + mockSpanner.unfreeze(); + throw t; + } + return null; + } + }); assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0); - assertThat(countRequests(ExecuteSqlRequest.class)).isEqualTo(1); - assertThat(countRequests(CommitRequest.class)).isEqualTo(0); + assertThat(countRequests(ExecuteSqlRequest.class)).isEqualTo(3); + assertThat(countRequests(CommitRequest.class)).isEqualTo(1); } @Test @@ -1412,6 +1422,220 @@ public boolean apply(AbstractMessage input) { assertThat(countRequests(CommitRequest.class)).isEqualTo(0); } + @Test + public void testQueryWithInlineBeginDidNotReturnTransaction() { + DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + // This will cause the first statement that requests a transaction to not return a transaction + // id. + mockSpanner.ignoreNextInlineBeginRequest(); + try { + client + .readWriteTransaction() + .run( + new TransactionCallable() { + @Override + public Void run(TransactionContext transaction) throws Exception { + try (ResultSet rs = transaction.executeQuery(SELECT1_UNION_ALL_SELECT2)) { + while (rs.next()) {} + } + return null; + } + }); + fail("missing expected exception"); + } catch (SpannerException e) { + assertThat(e.getErrorCode()).isEqualTo(ErrorCode.FAILED_PRECONDITION); + assertThat(e.getMessage()).contains(AbstractReadContext.NO_TRANSACTION_RETURNED_MSG); + } + assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0); + assertThat(countRequests(ExecuteSqlRequest.class)).isEqualTo(1); + assertThat(countRequests(CommitRequest.class)).isEqualTo(0); + } + + @Test + public void testReadWithInlineBeginDidNotReturnTransaction() { + DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + // This will cause the first statement that requests a transaction to not return a transaction + // id. + mockSpanner.ignoreNextInlineBeginRequest(); + try { + client + .readWriteTransaction() + .run( + new TransactionCallable() { + @Override + public Void run(TransactionContext transaction) throws Exception { + transaction.readRow("FOO", Key.of(1L), Arrays.asList("BAR")); + return null; + } + }); + fail("missing expected exception"); + } catch (SpannerException e) { + assertThat(e.getErrorCode()).isEqualTo(ErrorCode.FAILED_PRECONDITION); + assertThat(e.getMessage()).contains(AbstractReadContext.NO_TRANSACTION_RETURNED_MSG); + } + assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0); + assertThat(countRequests(ReadRequest.class)).isEqualTo(1); + assertThat(countRequests(CommitRequest.class)).isEqualTo(0); + } + + @Test + public void testUpdateWithInlineBeginDidNotReturnTransaction() { + DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + // This will cause the first statement that requests a transaction to not return a transaction + // id. + mockSpanner.ignoreNextInlineBeginRequest(); + try { + client + .readWriteTransaction() + .run( + new TransactionCallable() { + @Override + public Void run(TransactionContext transaction) throws Exception { + transaction.executeUpdate(UPDATE_STATEMENT); + return null; + } + }); + fail("missing expected exception"); + } catch (SpannerException e) { + assertThat(e.getErrorCode()).isEqualTo(ErrorCode.FAILED_PRECONDITION); + assertThat(e.getMessage()).contains(AbstractReadContext.NO_TRANSACTION_RETURNED_MSG); + } + assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0); + assertThat(countRequests(ExecuteSqlRequest.class)).isEqualTo(1); + assertThat(countRequests(CommitRequest.class)).isEqualTo(0); + } + + @Test + public void testBatchUpdateWithInlineBeginDidNotReturnTransaction() { + DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + // This will cause the first statement that requests a transaction to not return a transaction + // id. + mockSpanner.ignoreNextInlineBeginRequest(); + try { + client + .readWriteTransaction() + .run( + new TransactionCallable() { + @Override + public Void run(TransactionContext transaction) throws Exception { + transaction.batchUpdate(Arrays.asList(UPDATE_STATEMENT)); + return null; + } + }); + fail("missing expected exception"); + } catch (SpannerException e) { + assertThat(e.getErrorCode()).isEqualTo(ErrorCode.FAILED_PRECONDITION); + assertThat(e.getMessage()).contains(AbstractReadContext.NO_TRANSACTION_RETURNED_MSG); + } + assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0); + assertThat(countRequests(ExecuteBatchDmlRequest.class)).isEqualTo(1); + assertThat(countRequests(CommitRequest.class)).isEqualTo(0); + } + + @Test + public void testQueryAsyncWithInlineBeginDidNotReturnTransaction() { + DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + final ExecutorService executor = Executors.newSingleThreadExecutor(); + // This will cause the first statement that requests a transaction to not return a transaction + // id. + mockSpanner.ignoreNextInlineBeginRequest(); + try { + client + .readWriteTransaction() + .run( + new TransactionCallable() { + @Override + public Void run(TransactionContext transaction) throws Exception { + try (AsyncResultSet rs = + transaction.executeQueryAsync(SELECT1_UNION_ALL_SELECT2)) { + return SpannerApiFutures.get( + rs.setCallback( + executor, + new ReadyCallback() { + @Override + public CallbackResponse cursorReady(AsyncResultSet resultSet) { + try { + while (true) { + switch (resultSet.tryNext()) { + case OK: + break; + case DONE: + return CallbackResponse.DONE; + case NOT_READY: + return CallbackResponse.CONTINUE; + } + } + } catch (SpannerException e) { + return CallbackResponse.DONE; + } + } + })); + } + } + }); + fail("missing expected exception"); + } catch (SpannerException e) { + assertThat(e.getErrorCode()).isEqualTo(ErrorCode.FAILED_PRECONDITION); + assertThat(e.getMessage()).contains(AbstractReadContext.NO_TRANSACTION_RETURNED_MSG); + } + assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0); + assertThat(countRequests(ExecuteSqlRequest.class)).isEqualTo(1); + assertThat(countRequests(CommitRequest.class)).isEqualTo(0); + } + + @Test + public void testUpdateAsyncWithInlineBeginDidNotReturnTransaction() { + DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + // This will cause the first statement that requests a transaction to not return a transaction + // id. + mockSpanner.ignoreNextInlineBeginRequest(); + try { + client + .readWriteTransaction() + .run( + new TransactionCallable() { + @Override + public Long run(TransactionContext transaction) throws Exception { + return SpannerApiFutures.get(transaction.executeUpdateAsync(UPDATE_STATEMENT)); + } + }); + fail("missing expected exception"); + } catch (SpannerException e) { + assertThat(e.getErrorCode()).isEqualTo(ErrorCode.FAILED_PRECONDITION); + assertThat(e.getMessage()).contains(AbstractReadContext.NO_TRANSACTION_RETURNED_MSG); + } + assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0); + assertThat(countRequests(ExecuteSqlRequest.class)).isEqualTo(1); + assertThat(countRequests(CommitRequest.class)).isEqualTo(0); + } + + @Test + public void testBatchUpdateAsyncWithInlineBeginDidNotReturnTransaction() { + DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + // This will cause the first statement that requests a transaction to not return a transaction + // id. + mockSpanner.ignoreNextInlineBeginRequest(); + try { + client + .readWriteTransaction() + .run( + new TransactionCallable() { + @Override + public long[] run(TransactionContext transaction) throws Exception { + return SpannerApiFutures.get( + transaction.batchUpdateAsync(Arrays.asList(UPDATE_STATEMENT))); + } + }); + fail("missing expected exception"); + } catch (SpannerException e) { + assertThat(e.getErrorCode()).isEqualTo(ErrorCode.FAILED_PRECONDITION); + assertThat(e.getMessage()).contains(AbstractReadContext.NO_TRANSACTION_RETURNED_MSG); + } + assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0); + assertThat(countRequests(ExecuteBatchDmlRequest.class)).isEqualTo(1); + assertThat(countRequests(CommitRequest.class)).isEqualTo(0); + } + private int countRequests(Class requestType) { int count = 0; for (AbstractMessage msg : mockSpanner.getRequests()) { diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java index 149e0d2888..5dc442a62e 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java @@ -552,6 +552,7 @@ private static void checkStreamException( private ConcurrentMap abortedTransactions = new ConcurrentHashMap<>(); private final AtomicBoolean abortNextTransaction = new AtomicBoolean(); private final AtomicBoolean abortNextStatement = new AtomicBoolean(); + private final AtomicBoolean ignoreNextInlineBeginRequest = new AtomicBoolean(); private ConcurrentMap transactionCounters = new ConcurrentHashMap<>(); private ConcurrentMap> partitionTokens = new ConcurrentHashMap<>(); private ConcurrentMap transactionLastUsed = new ConcurrentHashMap<>(); @@ -713,6 +714,10 @@ public void abortAllTransactions() { } } + public void ignoreNextInlineBeginRequest() { + ignoreNextInlineBeginRequest.set(true); + } + public void freeze() { freezeLock = new CountDownLatch(1); } @@ -973,7 +978,10 @@ public void executeSql(ExecuteSqlRequest request, StreamObserver resp .build()) .setMetadata( ResultSetMetadata.newBuilder() - .setTransaction(Transaction.newBuilder().setId(transactionId).build()) + .setTransaction( + ignoreNextInlineBeginRequest.getAndSet(false) + ? Transaction.getDefaultInstance() + : Transaction.newBuilder().setId(transactionId).build()) .build()) .build()); } @@ -999,7 +1007,10 @@ private void returnResultSet( metadata = metadata .toBuilder() - .setTransaction(Transaction.newBuilder().setId(transactionId).build()) + .setTransaction( + ignoreNextInlineBeginRequest.getAndSet(false) + ? Transaction.getDefaultInstance() + : Transaction.newBuilder().setId(transactionId).build()) .build(); } else if (transactionSelector.hasBegin() || transactionSelector.hasSingleUse()) { Transaction transaction = getTemporaryTransactionOrNull(transactionSelector); @@ -1085,7 +1096,10 @@ public void executeBatchDml( ResultSetStats.newBuilder().setRowCountExact(res.getUpdateCount()).build()) .setMetadata( ResultSetMetadata.newBuilder() - .setTransaction(Transaction.newBuilder().setId(transactionId).build()) + .setTransaction( + ignoreNextInlineBeginRequest.getAndSet(false) + ? Transaction.getDefaultInstance() + : Transaction.newBuilder().setId(transactionId).build()) .build()) .build()); } @@ -1508,7 +1522,10 @@ private void returnPartialResultSet( metadata = metadata .toBuilder() - .setTransaction(Transaction.newBuilder().setId(transactionId).build()) + .setTransaction( + ignoreNextInlineBeginRequest.getAndSet(false) + ? Transaction.getDefaultInstance() + : Transaction.newBuilder().setId(transactionId).build()) .build(); } resultSet = resultSet.toBuilder().setMetadata(metadata).build(); @@ -1550,7 +1567,10 @@ private void returnPartialResultSet( .setMetadata( ResultSetMetadata.newBuilder() .setRowType(StructType.newBuilder().addFields(field).build()) - .setTransaction(Transaction.newBuilder().setId(transaction.getId()).build()) + .setTransaction( + ignoreNextInlineBeginRequest.getAndSet(false) + ? Transaction.getDefaultInstance() + : Transaction.newBuilder().setId(transaction.getId()).build()) .build()) .setStats(ResultSetStats.newBuilder().setRowCountExact(updateCount).build()) .build()); @@ -1560,7 +1580,10 @@ private void returnPartialResultSet( .setMetadata( ResultSetMetadata.newBuilder() .setRowType(StructType.newBuilder().addFields(field).build()) - .setTransaction(Transaction.newBuilder().setId(transaction.getId()).build()) + .setTransaction( + ignoreNextInlineBeginRequest.getAndSet(false) + ? Transaction.getDefaultInstance() + : Transaction.newBuilder().setId(transaction.getId()).build()) .build()) .setStats(ResultSetStats.newBuilder().setRowCountLowerBound(updateCount).build()) .build()); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadFormatTestRunner.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadFormatTestRunner.java index efe3e3246a..e50639dc0e 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadFormatTestRunner.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadFormatTestRunner.java @@ -44,7 +44,8 @@ public class ReadFormatTestRunner extends ParentRunner { private static class NoOpListener implements AbstractResultSet.Listener { @Override - public void onTransactionMetadata(Transaction transaction) throws SpannerException {} + public void onTransactionMetadata(Transaction transaction, boolean shouldIncludeId) + throws SpannerException {} @Override public void onError(SpannerException e, boolean withBeginTransaction) {}