Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: AsyncTransactionManager should rollback on close #505

Merged
merged 3 commits into from Oct 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions google-cloud-spanner/clirr-ignored-differences.xml
Expand Up @@ -371,4 +371,10 @@
<className>com/google/cloud/spanner/ResultSets</className>
<method>com.google.cloud.spanner.AsyncResultSet toAsyncResultSet(com.google.cloud.spanner.ResultSet, com.google.api.gax.core.ExecutorProvider)</method>
</difference>

<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/AsyncTransactionManager</className>
<method>com.google.api.core.ApiFuture closeAsync()</method>
</difference>
</differences>
Expand Up @@ -18,9 +18,6 @@

import com.google.api.core.ApiFuture;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.AsyncTransactionManager.AsyncTransactionFunction;
import com.google.cloud.spanner.AsyncTransactionManager.CommitTimestampFuture;
import com.google.cloud.spanner.AsyncTransactionManager.TransactionContextFuture;
import com.google.cloud.spanner.TransactionManager.TransactionState;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
Expand Down Expand Up @@ -200,4 +197,11 @@ public interface AsyncTransactionFunction<I, O> {
*/
@Override
void close();

/**
* Closes the transaction manager. If there is an active transaction, it will be rolled back. The
* underlying session will be released back to the session pool. The returned {@link ApiFuture} is
* done when the transaction (if any) has been rolled back.
*/
ApiFuture<Void> closeAsync();
}
Expand Up @@ -24,6 +24,7 @@
import com.google.cloud.spanner.SessionImpl.SessionTransaction;
import com.google.cloud.spanner.TransactionContextFutureImpl.CommittableAsyncTransactionManager;
import com.google.cloud.spanner.TransactionManager.TransactionState;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
import io.opencensus.trace.Span;
Expand Down Expand Up @@ -54,7 +55,17 @@ public void setSpan(Span span) {

@Override
public void close() {
closeAsync();
}

@Override
public ApiFuture<Void> closeAsync() {
ApiFuture<Void> res = null;
if (txnState == TransactionState.STARTED) {
res = rollbackAsync();
}
txn.close();
return MoreObjects.firstNonNull(res, ApiFutures.<Void>immediateFuture(null));
}

@Override
Expand Down
Expand Up @@ -22,7 +22,6 @@
import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.AsyncTransactionManager.TransactionContextFuture;
import com.google.cloud.spanner.SessionPool.PooledSessionFuture;
import com.google.cloud.spanner.TransactionContextFutureImpl.CommittableAsyncTransactionManager;
import com.google.cloud.spanner.TransactionManager.TransactionState;
Expand Down Expand Up @@ -59,14 +58,41 @@ public void run() {

@Override
public void close() {
delegate.addListener(
new Runnable() {
SpannerApiFutures.get(closeAsync());
}

@Override
public ApiFuture<Void> closeAsync() {
final SettableApiFuture<Void> res = SettableApiFuture.create();
ApiFutures.addCallback(
delegate,
new ApiFutureCallback<AsyncTransactionManagerImpl>() {
@Override
public void run() {
public void onFailure(Throwable t) {
session.close();
}

@Override
public void onSuccess(AsyncTransactionManagerImpl result) {
ApiFutures.addCallback(
result.closeAsync(),
new ApiFutureCallback<Void>() {
@Override
public void onFailure(Throwable t) {
res.setException(t);
}

@Override
public void onSuccess(Void result) {
session.close();
res.set(result);
}
},
MoreExecutors.directExecutor());
}
},
MoreExecutors.directExecutor());
return res;
}

@Override
Expand Down
Expand Up @@ -36,7 +36,9 @@
import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime;
import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult;
import com.google.cloud.spanner.Options.ReadOption;
import com.google.cloud.spanner.TransactionRunnerImpl.TransactionContextImpl;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Range;
Expand All @@ -47,6 +49,8 @@
import com.google.spanner.v1.CommitRequest;
import com.google.spanner.v1.ExecuteBatchDmlRequest;
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.RollbackRequest;
import com.google.spanner.v1.TransactionSelector;
import io.grpc.Status;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -181,6 +185,30 @@ public void onSuccess(long[] input) {
}
}

@Test
public void asyncTransactionManager_shouldRollbackOnCloseAsync() throws Exception {
AsyncTransactionManager manager = client().transactionManagerAsync();
TransactionContext txn = manager.beginAsync().get();
txn.executeUpdateAsync(UPDATE_STATEMENT).get();
final TransactionSelector selector = ((TransactionContextImpl) txn).getTransactionSelector();

SpannerApiFutures.get(manager.closeAsync());
// The mock server should already have the Rollback request, as we are waiting for the returned
// ApiFuture to be done.
mockSpanner.waitForRequestsToContain(
new Predicate<AbstractMessage>() {
@Override
public boolean apply(AbstractMessage input) {
if (input instanceof RollbackRequest) {
RollbackRequest request = (RollbackRequest) input;
return request.getTransactionId().equals(selector.getId());
}
return false;
}
},
0L);
}

@Test
public void asyncTransactionManagerUpdate() throws Exception {
final SettableApiFuture<Long> updateCount = SettableApiFuture.create();
Expand Down
Expand Up @@ -23,8 +23,10 @@
import com.google.cloud.spanner.TransactionRunnerImpl.TransactionContextImpl;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Stopwatch;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.AbstractMessage;
import com.google.protobuf.ByteString;
Expand Down Expand Up @@ -1927,6 +1929,23 @@ public void waitForRequestsToContain(Class<? extends AbstractMessage> type, long
}
}

public void waitForRequestsToContain(
Predicate<? super AbstractMessage> predicate, long timeoutMillis)
throws InterruptedException, TimeoutException {
Stopwatch watch = Stopwatch.createStarted();
while (true) {
Iterable<AbstractMessage> msg = Iterables.filter(getRequests(), predicate);
if (msg.iterator().hasNext()) {
break;
}
Thread.sleep(10L);
if (watch.elapsed(TimeUnit.MILLISECONDS) > timeoutMillis) {
throw new TimeoutException(
"Timeout while waiting for requests to contain the wanted request");
}
}
}

@Override
public void addResponse(AbstractMessage response) {
throw new UnsupportedOperationException();
Expand Down