Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: AsyncTransactionManager should rollback on close #505

Merged
merged 3 commits into from Oct 14, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -54,6 +54,9 @@ public void setSpan(Span span) {

@Override
public void close() {
if (txnState == TransactionState.STARTED) {
rollbackAsync();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can there be a version of close() that propagates the ApiFutrue from rollbackAsync() back to the caller? Otherwise the application could shut down (or, in my case, the test terminate) before the rollback happens.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a closeAsync() method to the AsyncTransactionManager interface that returns an ApiFuture that is done when everything has been rolled back and released. PTAL.

}
txn.close();
}

Expand Down
Expand Up @@ -59,10 +59,17 @@ public void run() {

@Override
public void close() {
delegate.addListener(
new Runnable() {
ApiFutures.addCallback(
delegate,
new ApiFutureCallback<AsyncTransactionManagerImpl>() {
@Override
public void run() {
public void onFailure(Throwable t) {
session.close();
}

@Override
public void onSuccess(AsyncTransactionManagerImpl result) {
result.close();
session.close();
}
},
Expand Down
Expand Up @@ -36,7 +36,9 @@
import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime;
import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult;
import com.google.cloud.spanner.Options.ReadOption;
import com.google.cloud.spanner.TransactionRunnerImpl.TransactionContextImpl;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Range;
Expand All @@ -47,6 +49,8 @@
import com.google.spanner.v1.CommitRequest;
import com.google.spanner.v1.ExecuteBatchDmlRequest;
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.RollbackRequest;
import com.google.spanner.v1.TransactionSelector;
import io.grpc.Status;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -181,6 +185,28 @@ public void onSuccess(long[] input) {
}
}

@Test
public void asyncTransactionManager_shouldRollbackOnClose() throws Exception {
AsyncTransactionManager manager = client().transactionManagerAsync();
TransactionContext txn = manager.beginAsync().get();
txn.executeUpdateAsync(UPDATE_STATEMENT).get();
final TransactionSelector selector = ((TransactionContextImpl) txn).getTransactionSelector();

manager.close();
mockSpanner.waitForRequestsToContain(
new Predicate<AbstractMessage>() {
@Override
public boolean apply(AbstractMessage input) {
if (input instanceof RollbackRequest) {
RollbackRequest request = (RollbackRequest) input;
return request.getTransactionId().equals(selector.getId());
}
return false;
}
},
5000L);
}

@Test
public void asyncTransactionManagerUpdate() throws Exception {
final SettableApiFuture<Long> updateCount = SettableApiFuture.create();
Expand Down
Expand Up @@ -23,8 +23,10 @@
import com.google.cloud.spanner.TransactionRunnerImpl.TransactionContextImpl;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Stopwatch;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.AbstractMessage;
import com.google.protobuf.ByteString;
Expand Down Expand Up @@ -1927,6 +1929,23 @@ public void waitForRequestsToContain(Class<? extends AbstractMessage> type, long
}
}

public void waitForRequestsToContain(
Predicate<? super AbstractMessage> predicate, long timeoutMillis)
throws InterruptedException, TimeoutException {
Stopwatch watch = Stopwatch.createStarted();
while (true) {
Iterable<AbstractMessage> msg = Iterables.filter(getRequests(), predicate);
if (msg.iterator().hasNext()) {
break;
}
Thread.sleep(10L);
if (watch.elapsed(TimeUnit.MILLISECONDS) > timeoutMillis) {
throw new TimeoutException(
"Timeout while waiting for requests to contain the wanted request");
}
}
}

@Override
public void addResponse(AbstractMessage response) {
throw new UnsupportedOperationException();
Expand Down