Skip to content

Commit

Permalink
fix: AsyncTransactionManager should rollback on close
Browse files Browse the repository at this point in the history
AsyncTransctionManager should rollback the transaction if close is called
while the transaction is still in state STARTED. Failing to do so, will
keep the transaction open on the backend for longer than necessary, holding
on to locks until it is garbage collected after 10 seconds.

Fixes #504
  • Loading branch information
olavloite committed Oct 9, 2020
1 parent 3ab7348 commit d79c460
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 3 deletions.
Expand Up @@ -54,6 +54,9 @@ public void setSpan(Span span) {

@Override
public void close() {
if (txnState == TransactionState.STARTED) {
rollbackAsync();
}
txn.close();
}

Expand Down
Expand Up @@ -59,10 +59,17 @@ public void run() {

@Override
public void close() {
delegate.addListener(
new Runnable() {
ApiFutures.addCallback(
delegate,
new ApiFutureCallback<AsyncTransactionManagerImpl>() {
@Override
public void run() {
public void onFailure(Throwable t) {
session.close();
}

@Override
public void onSuccess(AsyncTransactionManagerImpl result) {
result.close();
session.close();
}
},
Expand Down
Expand Up @@ -36,7 +36,9 @@
import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime;
import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult;
import com.google.cloud.spanner.Options.ReadOption;
import com.google.cloud.spanner.TransactionRunnerImpl.TransactionContextImpl;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Range;
Expand All @@ -47,6 +49,8 @@
import com.google.spanner.v1.CommitRequest;
import com.google.spanner.v1.ExecuteBatchDmlRequest;
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.RollbackRequest;
import com.google.spanner.v1.TransactionSelector;
import io.grpc.Status;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -181,6 +185,28 @@ public void onSuccess(long[] input) {
}
}

@Test
public void asyncTransactionManager_shouldRollbackOnClose() throws Exception {
AsyncTransactionManager manager = client().transactionManagerAsync();
TransactionContext txn = manager.beginAsync().get();
txn.executeUpdateAsync(UPDATE_STATEMENT).get();
final TransactionSelector selector = ((TransactionContextImpl) txn).getTransactionSelector();

manager.close();
mockSpanner.waitForRequestsToContain(
new Predicate<AbstractMessage>() {
@Override
public boolean apply(AbstractMessage input) {
if (input instanceof RollbackRequest) {
RollbackRequest request = (RollbackRequest) input;
return request.getTransactionId().equals(selector.getId());
}
return false;
}
},
5000L);
}

@Test
public void asyncTransactionManagerUpdate() throws Exception {
final SettableApiFuture<Long> updateCount = SettableApiFuture.create();
Expand Down
Expand Up @@ -23,8 +23,10 @@
import com.google.cloud.spanner.TransactionRunnerImpl.TransactionContextImpl;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Stopwatch;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.AbstractMessage;
import com.google.protobuf.ByteString;
Expand Down Expand Up @@ -1927,6 +1929,23 @@ public void waitForRequestsToContain(Class<? extends AbstractMessage> type, long
}
}

public void waitForRequestsToContain(
Predicate<? super AbstractMessage> predicate, long timeoutMillis)
throws InterruptedException, TimeoutException {
Stopwatch watch = Stopwatch.createStarted();
while (true) {
Iterable<AbstractMessage> msg = Iterables.filter(getRequests(), predicate);
if (msg.iterator().hasNext()) {
break;
}
Thread.sleep(10L);
if (watch.elapsed(TimeUnit.MILLISECONDS) > timeoutMillis) {
throw new TimeoutException(
"Timeout while waiting for requests to contain the wanted request");
}
}
}

@Override
public void addResponse(AbstractMessage response) {
throw new UnsupportedOperationException();
Expand Down

0 comments on commit d79c460

Please sign in to comment.