Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: mark transaction as invalid if no tx is returned before close #791

Merged
merged 1 commit into from Jan 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -691,7 +691,7 @@ public void onTransactionMetadata(Transaction transaction) {}
public void onError(SpannerException e, boolean withBeginTransaction) {}

@Override
public void onDone() {}
public void onDone(boolean withBeginTransaction) {}

private ResultSet readInternal(
String table,
Expand Down
Expand Up @@ -84,7 +84,7 @@ interface Listener {
void onError(SpannerException e, boolean withBeginTransaction);

/** Called when the read finishes normally. */
void onDone();
void onDone(boolean withBeginTransaction);
}

@VisibleForTesting
Expand Down Expand Up @@ -118,6 +118,11 @@ public boolean next() throws SpannerException {
ResultSetMetadata metadata = iterator.getMetadata();
if (metadata.hasTransaction()) {
listener.onTransactionMetadata(metadata.getTransaction());
} else if (iterator.isWithBeginTransaction()) {
// The query should have returned a transaction.
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.FAILED_PRECONDITION,
"Query requested a transaction to be started, but no transaction was returned");
}
currRow = new GrpcStruct(iterator.type(), new ArrayList<>());
}
Expand All @@ -126,8 +131,10 @@ public boolean next() throws SpannerException {
statistics = iterator.getStats();
}
return hasNext;
} catch (SpannerException e) {
throw yieldError(e, iterator.isWithBeginTransaction() && currRow == null);
} catch (Throwable t) {
throw yieldError(
SpannerExceptionFactory.asSpannerException(t),
iterator.isWithBeginTransaction() && currRow == null);
}
}

Expand All @@ -139,6 +146,7 @@ public ResultSetStats getStats() {

@Override
public void close() {
listener.onDone(iterator.isWithBeginTransaction());
iterator.close("ResultSet closed");
closed = true;
}
Expand All @@ -150,8 +158,8 @@ public Type getType() {
}

private SpannerException yieldError(SpannerException e, boolean beginTransaction) {
close();
listener.onError(e, beginTransaction);
close();
throw e;
}
}
Expand Down
Expand Up @@ -539,6 +539,19 @@ public void onError(SpannerException e, boolean withBeginTransaction) {
}
}

@Override
public void onDone(boolean withBeginTransaction) {
if (withBeginTransaction
&& transactionIdFuture != null
&& !this.transactionIdFuture.isDone()) {
// Context was done (closed) before a transaction id was returned.
this.transactionIdFuture.setException(
SpannerExceptionFactory.newSpannerException(
ErrorCode.FAILED_PRECONDITION,
"ResultSet was closed before a transaction id was returned"));
}
}

@Override
public void buffer(Mutation mutation) {
synchronized (lock) {
Expand Down
Expand Up @@ -62,7 +62,7 @@ public void onTransactionMetadata(Transaction transaction) throws SpannerExcepti
public void onError(SpannerException e, boolean withBeginTransaction) {}

@Override
public void onDone() {}
public void onDone(boolean withBeginTransaction) {}
}

@Before
Expand Down
Expand Up @@ -16,7 +16,6 @@

package com.google.cloud.spanner;

import static com.google.cloud.spanner.MockSpannerTestUtil.SELECT1;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.fail;

Expand All @@ -37,6 +36,7 @@
import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult;
import com.google.cloud.spanner.TransactionRunner.TransactionCallable;
import com.google.cloud.spanner.TransactionRunnerImpl.TransactionContextImpl;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.MoreExecutors;
Expand Down Expand Up @@ -204,6 +204,7 @@ public void setUp() throws IOException {
public void tearDown() throws Exception {
spanner.close();
mockSpanner.reset();
mockSpanner.clearRequests();
}

@Test
Expand Down Expand Up @@ -1348,6 +1349,69 @@ public Void run(TransactionContext transaction) throws Exception {
assertThat(countRequests(CommitRequest.class)).isEqualTo(0);
}

@Test
public void testCloseResultSetWhileRequestInFlight() {
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
final ExecutorService service = Executors.newSingleThreadExecutor();
try {
client
.readWriteTransaction()
.run(
new TransactionCallable<Void>() {
@Override
public Void run(TransactionContext transaction) throws Exception {
final ResultSet rs = transaction.executeQuery(SELECT1);
// Prevent the server from executing the query.
mockSpanner.freeze();
service.submit(
new Runnable() {
@Override
public void run() {
// This call will be stuck on the server until the mock server is
// unfrozen.
rs.next();
}
});

// Close the result set while the request is in flight.
mockSpanner.waitForRequestsToContain(
new Predicate<AbstractMessage>() {
@Override
public boolean apply(AbstractMessage input) {
return input instanceof ExecuteSqlRequest
&& ((ExecuteSqlRequest) input).getTransaction().hasBegin();
}
},
100L);
rs.close();
// The next statement should now fail before it is sent to the server because the
// first statement failed to return a transaction while the result set was still
// open.
mockSpanner.unfreeze();
try {
transaction.executeUpdate(UPDATE_STATEMENT);
fail("missing expected exception");
} catch (SpannerException e) {
assertThat(e.getErrorCode()).isEqualTo(ErrorCode.FAILED_PRECONDITION);
assertThat(e.getMessage())
.contains("ResultSet was closed before a transaction id was returned");
}
return null;
}
});
fail("missing expected exception");
} catch (SpannerException e) {
// The commit request will also fail, which means that the entire transaction will fail.
assertThat(e.getErrorCode()).isEqualTo(ErrorCode.FAILED_PRECONDITION);
assertThat(e.getMessage())
.contains("ResultSet was closed before a transaction id was returned");
}
service.shutdown();
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0);
assertThat(countRequests(ExecuteSqlRequest.class)).isEqualTo(1);
assertThat(countRequests(CommitRequest.class)).isEqualTo(0);
}

private int countRequests(Class<? extends AbstractMessage> requestType) {
int count = 0;
for (AbstractMessage msg : mockSpanner.getRequests()) {
Expand Down
Expand Up @@ -50,7 +50,7 @@ public void onTransactionMetadata(Transaction transaction) throws SpannerExcepti
public void onError(SpannerException e, boolean withBeginTransaction) {}

@Override
public void onDone() {}
public void onDone(boolean withBeginTransaction) {}
}

public ReadFormatTestRunner(Class<?> clazz) throws InitializationError {
Expand Down