diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java index 7381a489af..9f2e30bf48 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java @@ -691,7 +691,7 @@ public void onTransactionMetadata(Transaction transaction) {} public void onError(SpannerException e, boolean withBeginTransaction) {} @Override - public void onDone() {} + public void onDone(boolean withBeginTransaction) {} private ResultSet readInternal( String table, diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java index 6520b7b8fd..a4d8490197 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java @@ -84,7 +84,7 @@ interface Listener { void onError(SpannerException e, boolean withBeginTransaction); /** Called when the read finishes normally. */ - void onDone(); + void onDone(boolean withBeginTransaction); } @VisibleForTesting @@ -118,6 +118,11 @@ public boolean next() throws SpannerException { ResultSetMetadata metadata = iterator.getMetadata(); if (metadata.hasTransaction()) { listener.onTransactionMetadata(metadata.getTransaction()); + } else if (iterator.isWithBeginTransaction()) { + // The query should have returned a transaction. + throw SpannerExceptionFactory.newSpannerException( + ErrorCode.FAILED_PRECONDITION, + "Query requested a transaction to be started, but no transaction was returned"); } currRow = new GrpcStruct(iterator.type(), new ArrayList<>()); } @@ -126,8 +131,10 @@ public boolean next() throws SpannerException { statistics = iterator.getStats(); } return hasNext; - } catch (SpannerException e) { - throw yieldError(e, iterator.isWithBeginTransaction() && currRow == null); + } catch (Throwable t) { + throw yieldError( + SpannerExceptionFactory.asSpannerException(t), + iterator.isWithBeginTransaction() && currRow == null); } } @@ -139,6 +146,7 @@ public ResultSetStats getStats() { @Override public void close() { + listener.onDone(iterator.isWithBeginTransaction()); iterator.close("ResultSet closed"); closed = true; } @@ -150,8 +158,8 @@ public Type getType() { } private SpannerException yieldError(SpannerException e, boolean beginTransaction) { - close(); listener.onError(e, beginTransaction); + close(); throw e; } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java index 68f8c75054..543a3c5487 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java @@ -539,6 +539,19 @@ public void onError(SpannerException e, boolean withBeginTransaction) { } } + @Override + public void onDone(boolean withBeginTransaction) { + if (withBeginTransaction + && transactionIdFuture != null + && !this.transactionIdFuture.isDone()) { + // Context was done (closed) before a transaction id was returned. + this.transactionIdFuture.setException( + SpannerExceptionFactory.newSpannerException( + ErrorCode.FAILED_PRECONDITION, + "ResultSet was closed before a transaction id was returned")); + } + } + @Override public void buffer(Mutation mutation) { synchronized (lock) { diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/GrpcResultSetTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/GrpcResultSetTest.java index f1a1a3e296..b47cdc35ff 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/GrpcResultSetTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/GrpcResultSetTest.java @@ -62,7 +62,7 @@ public void onTransactionMetadata(Transaction transaction) throws SpannerExcepti public void onError(SpannerException e, boolean withBeginTransaction) {} @Override - public void onDone() {} + public void onDone(boolean withBeginTransaction) {} } @Before diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java index 0553cbf7d0..663dac566c 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java @@ -16,7 +16,6 @@ package com.google.cloud.spanner; -import static com.google.cloud.spanner.MockSpannerTestUtil.SELECT1; import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.fail; @@ -37,6 +36,7 @@ import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult; import com.google.cloud.spanner.TransactionRunner.TransactionCallable; import com.google.cloud.spanner.TransactionRunnerImpl.TransactionContextImpl; +import com.google.common.base.Predicate; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.MoreExecutors; @@ -204,6 +204,7 @@ public void setUp() throws IOException { public void tearDown() throws Exception { spanner.close(); mockSpanner.reset(); + mockSpanner.clearRequests(); } @Test @@ -1348,6 +1349,69 @@ public Void run(TransactionContext transaction) throws Exception { assertThat(countRequests(CommitRequest.class)).isEqualTo(0); } + @Test + public void testCloseResultSetWhileRequestInFlight() { + DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + final ExecutorService service = Executors.newSingleThreadExecutor(); + try { + client + .readWriteTransaction() + .run( + new TransactionCallable() { + @Override + public Void run(TransactionContext transaction) throws Exception { + final ResultSet rs = transaction.executeQuery(SELECT1); + // Prevent the server from executing the query. + mockSpanner.freeze(); + service.submit( + new Runnable() { + @Override + public void run() { + // This call will be stuck on the server until the mock server is + // unfrozen. + rs.next(); + } + }); + + // Close the result set while the request is in flight. + mockSpanner.waitForRequestsToContain( + new Predicate() { + @Override + public boolean apply(AbstractMessage input) { + return input instanceof ExecuteSqlRequest + && ((ExecuteSqlRequest) input).getTransaction().hasBegin(); + } + }, + 100L); + rs.close(); + // The next statement should now fail before it is sent to the server because the + // first statement failed to return a transaction while the result set was still + // open. + mockSpanner.unfreeze(); + try { + transaction.executeUpdate(UPDATE_STATEMENT); + fail("missing expected exception"); + } catch (SpannerException e) { + assertThat(e.getErrorCode()).isEqualTo(ErrorCode.FAILED_PRECONDITION); + assertThat(e.getMessage()) + .contains("ResultSet was closed before a transaction id was returned"); + } + return null; + } + }); + fail("missing expected exception"); + } catch (SpannerException e) { + // The commit request will also fail, which means that the entire transaction will fail. + assertThat(e.getErrorCode()).isEqualTo(ErrorCode.FAILED_PRECONDITION); + assertThat(e.getMessage()) + .contains("ResultSet was closed before a transaction id was returned"); + } + service.shutdown(); + assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0); + assertThat(countRequests(ExecuteSqlRequest.class)).isEqualTo(1); + assertThat(countRequests(CommitRequest.class)).isEqualTo(0); + } + private int countRequests(Class requestType) { int count = 0; for (AbstractMessage msg : mockSpanner.getRequests()) { diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadFormatTestRunner.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadFormatTestRunner.java index aa479f71d4..efe3e3246a 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadFormatTestRunner.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadFormatTestRunner.java @@ -50,7 +50,7 @@ public void onTransactionMetadata(Transaction transaction) throws SpannerExcepti public void onError(SpannerException e, boolean withBeginTransaction) {} @Override - public void onDone() {} + public void onDone(boolean withBeginTransaction) {} } public ReadFormatTestRunner(Class clazz) throws InitializationError {