Skip to content

Commit

Permalink
fix: session retry could cause infinite wait (#616)
Browse files Browse the repository at this point in the history
A "Session not found" when using an AsyncTransactionManager could cause an
infinite wait for an ApiFuture that would never be done.

Fixes #605
  • Loading branch information
olavloite committed Nov 13, 2020
1 parent b85be2a commit 8a66d84
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 5 deletions.
Expand Up @@ -98,6 +98,7 @@ private ApiFuture<TransactionContext> internalBeginAsync(boolean firstAttempt) {
new ApiFutureCallback<Void>() {
@Override
public void onFailure(Throwable t) {
onError(t);
res.setException(SpannerExceptionFactory.newSpannerException(t));
}

Expand Down Expand Up @@ -130,6 +131,7 @@ public ApiFuture<Timestamp> commitAsync() {
}
ApiFuture<Timestamp> res = txn.commitAsync();
txnState = TransactionState.COMMITTED;

ApiFutures.addCallback(
res,
new ApiFutureCallback<Timestamp>() {
Expand Down Expand Up @@ -174,10 +176,6 @@ public ApiFuture<Void> apply(Empty input) throws Exception {

@Override
public TransactionContextFuture resetForRetryAsync() {
if (txn == null || (!txn.isAborted() && txnState != TransactionState.ABORTED)) {
throw new IllegalStateException(
"resetForRetry can only be called if the previous attempt aborted");
}
return new TransactionContextFutureImpl(this, internalBeginAsync(false));
}

Expand Down
Expand Up @@ -38,6 +38,9 @@ class SessionPoolAsyncTransactionManager
@GuardedBy("lock")
private TransactionState txnState;

@GuardedBy("lock")
private AbortedException abortedException;

private final SessionPool pool;
private volatile PooledSessionFuture session;
private volatile SettableApiFuture<AsyncTransactionManagerImpl> delegate;
Expand Down Expand Up @@ -159,6 +162,7 @@ public void onError(Throwable t) {
if (t instanceof AbortedException) {
synchronized (lock) {
txnState = TransactionState.ABORTED;
abortedException = (AbortedException) t;
}
}
}
Expand All @@ -167,9 +171,12 @@ public void onError(Throwable t) {
public ApiFuture<Timestamp> commitAsync() {
synchronized (lock) {
Preconditions.checkState(
txnState == TransactionState.STARTED,
txnState == TransactionState.STARTED || txnState == TransactionState.ABORTED,
"commit can only be invoked if the transaction is in progress. Current state: "
+ txnState);
if (txnState == TransactionState.ABORTED) {
return ApiFutures.immediateFailedFuture(abortedException);
}
txnState = TransactionState.COMMITTED;
}
return ApiFutures.transformAsync(
Expand All @@ -186,6 +193,7 @@ public void onFailure(Throwable t) {
synchronized (lock) {
if (t instanceof AbortedException) {
txnState = TransactionState.ABORTED;
abortedException = (AbortedException) t;
} else {
txnState = TransactionState.COMMIT_FAILED;
}
Expand Down
Expand Up @@ -109,6 +109,7 @@ class AsyncTransactionStatementImpl<I, O> extends ForwardingApiFuture<O>
@Override
public void onFailure(Throwable t) {
mgr.onError(t);
statementResult.setException(t);
txnResult.setException(t);
}

Expand Down

0 comments on commit 8a66d84

Please sign in to comment.