Skip to content

Commit

Permalink
fix: mark transaction as invalid if no tx is returned before RS is cl…
Browse files Browse the repository at this point in the history
…osed (#791)

If a query requests the begin of a new transaction, the transaction id is returned by the first call to ResultSet#next(). If the ResultSet is closed by another thread before the first result has been returned, or before that result has been consumed internally to set the transaction id, no transaction id will be set. This will cause any subsequent statement in the same transaction to timeout while waiting for a transaction to be returned.
  • Loading branch information
olavloite committed Jan 12, 2021
1 parent 3f28c46 commit e02e5a7
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 8 deletions.
Expand Up @@ -691,7 +691,7 @@ public void onTransactionMetadata(Transaction transaction) {}
public void onError(SpannerException e, boolean withBeginTransaction) {}

@Override
public void onDone() {}
public void onDone(boolean withBeginTransaction) {}

private ResultSet readInternal(
String table,
Expand Down
Expand Up @@ -84,7 +84,7 @@ interface Listener {
void onError(SpannerException e, boolean withBeginTransaction);

/** Called when the read finishes normally. */
void onDone();
void onDone(boolean withBeginTransaction);
}

@VisibleForTesting
Expand Down Expand Up @@ -118,6 +118,11 @@ public boolean next() throws SpannerException {
ResultSetMetadata metadata = iterator.getMetadata();
if (metadata.hasTransaction()) {
listener.onTransactionMetadata(metadata.getTransaction());
} else if (iterator.isWithBeginTransaction()) {
// The query should have returned a transaction.
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.FAILED_PRECONDITION,
"Query requested a transaction to be started, but no transaction was returned");
}
currRow = new GrpcStruct(iterator.type(), new ArrayList<>());
}
Expand All @@ -126,8 +131,10 @@ public boolean next() throws SpannerException {
statistics = iterator.getStats();
}
return hasNext;
} catch (SpannerException e) {
throw yieldError(e, iterator.isWithBeginTransaction() && currRow == null);
} catch (Throwable t) {
throw yieldError(
SpannerExceptionFactory.asSpannerException(t),
iterator.isWithBeginTransaction() && currRow == null);
}
}

Expand All @@ -139,6 +146,7 @@ public ResultSetStats getStats() {

@Override
public void close() {
listener.onDone(iterator.isWithBeginTransaction());
iterator.close("ResultSet closed");
closed = true;
}
Expand All @@ -150,8 +158,8 @@ public Type getType() {
}

private SpannerException yieldError(SpannerException e, boolean beginTransaction) {
close();
listener.onError(e, beginTransaction);
close();
throw e;
}
}
Expand Down
Expand Up @@ -539,6 +539,19 @@ public void onError(SpannerException e, boolean withBeginTransaction) {
}
}

@Override
public void onDone(boolean withBeginTransaction) {
if (withBeginTransaction
&& transactionIdFuture != null
&& !this.transactionIdFuture.isDone()) {
// Context was done (closed) before a transaction id was returned.
this.transactionIdFuture.setException(
SpannerExceptionFactory.newSpannerException(
ErrorCode.FAILED_PRECONDITION,
"ResultSet was closed before a transaction id was returned"));
}
}

@Override
public void buffer(Mutation mutation) {
synchronized (lock) {
Expand Down
Expand Up @@ -62,7 +62,7 @@ public void onTransactionMetadata(Transaction transaction) throws SpannerExcepti
public void onError(SpannerException e, boolean withBeginTransaction) {}

@Override
public void onDone() {}
public void onDone(boolean withBeginTransaction) {}
}

@Before
Expand Down
Expand Up @@ -16,7 +16,6 @@

package com.google.cloud.spanner;

import static com.google.cloud.spanner.MockSpannerTestUtil.SELECT1;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.fail;

Expand All @@ -37,6 +36,7 @@
import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult;
import com.google.cloud.spanner.TransactionRunner.TransactionCallable;
import com.google.cloud.spanner.TransactionRunnerImpl.TransactionContextImpl;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.MoreExecutors;
Expand Down Expand Up @@ -204,6 +204,7 @@ public void setUp() throws IOException {
public void tearDown() throws Exception {
spanner.close();
mockSpanner.reset();
mockSpanner.clearRequests();
}

@Test
Expand Down Expand Up @@ -1348,6 +1349,69 @@ public Void run(TransactionContext transaction) throws Exception {
assertThat(countRequests(CommitRequest.class)).isEqualTo(0);
}

@Test
public void testCloseResultSetWhileRequestInFlight() {
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
final ExecutorService service = Executors.newSingleThreadExecutor();
try {
client
.readWriteTransaction()
.run(
new TransactionCallable<Void>() {
@Override
public Void run(TransactionContext transaction) throws Exception {
final ResultSet rs = transaction.executeQuery(SELECT1);
// Prevent the server from executing the query.
mockSpanner.freeze();
service.submit(
new Runnable() {
@Override
public void run() {
// This call will be stuck on the server until the mock server is
// unfrozen.
rs.next();
}
});

// Close the result set while the request is in flight.
mockSpanner.waitForRequestsToContain(
new Predicate<AbstractMessage>() {
@Override
public boolean apply(AbstractMessage input) {
return input instanceof ExecuteSqlRequest
&& ((ExecuteSqlRequest) input).getTransaction().hasBegin();
}
},
100L);
rs.close();
// The next statement should now fail before it is sent to the server because the
// first statement failed to return a transaction while the result set was still
// open.
mockSpanner.unfreeze();
try {
transaction.executeUpdate(UPDATE_STATEMENT);
fail("missing expected exception");
} catch (SpannerException e) {
assertThat(e.getErrorCode()).isEqualTo(ErrorCode.FAILED_PRECONDITION);
assertThat(e.getMessage())
.contains("ResultSet was closed before a transaction id was returned");
}
return null;
}
});
fail("missing expected exception");
} catch (SpannerException e) {
// The commit request will also fail, which means that the entire transaction will fail.
assertThat(e.getErrorCode()).isEqualTo(ErrorCode.FAILED_PRECONDITION);
assertThat(e.getMessage())
.contains("ResultSet was closed before a transaction id was returned");
}
service.shutdown();
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0);
assertThat(countRequests(ExecuteSqlRequest.class)).isEqualTo(1);
assertThat(countRequests(CommitRequest.class)).isEqualTo(0);
}

private int countRequests(Class<? extends AbstractMessage> requestType) {
int count = 0;
for (AbstractMessage msg : mockSpanner.getRequests()) {
Expand Down
Expand Up @@ -50,7 +50,7 @@ public void onTransactionMetadata(Transaction transaction) throws SpannerExcepti
public void onError(SpannerException e, boolean withBeginTransaction) {}

@Override
public void onDone() {}
public void onDone(boolean withBeginTransaction) {}
}

public ReadFormatTestRunner(Class<?> clazz) throws InitializationError {
Expand Down

0 comments on commit e02e5a7

Please sign in to comment.