Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
fix: check for timeout in connection after last statement finished (#…
…1086)

The check whether the previous statement timed out in the Connection API was
done when a statement was submitted to the connection, and not when the statement
was executed. That could cause a race condition, as statements are executed using
a separate thread, while submitting a statement is done using the main thread.

This could cause a statement to return an error with code ABORTED instead of
FAILED_PRECONDITION. A statement on a read/write transaction would always return
an error when a/the previous statement timed out, only the error code could
be different depending on whether the race condition occurred or not. This is
now fixed so that the error code is always FAILED_PRECONDITION and the error
indicates that a read/write transaction is no longer usable after a statement
timeout.

Fixes #1077
  • Loading branch information
olavloite committed Apr 25, 2021
1 parent 4a3829f commit 51d753c
Showing 1 changed file with 74 additions and 62 deletions.
Expand Up @@ -201,6 +201,10 @@ private void checkValidState() {
+ "or "
+ UnitOfWorkState.ABORTED
+ " is allowed.");
checkTimedOut();
}

private void checkTimedOut() {
ConnectionPreconditions.checkState(
!timedOutOrCancelled,
"The last statement of this transaction timed out or was cancelled. "
Expand Down Expand Up @@ -313,34 +317,35 @@ public ApiFuture<ResultSet> executeQueryAsync(
res =
executeStatementAsync(
statement,
() ->
runWithRetry(
() -> {
try {
getStatementExecutor()
.invokeInterceptors(
statement,
StatementExecutionStep.EXECUTE_STATEMENT,
ReadWriteTransaction.this);
ResultSet delegate =
DirectExecuteResultSet.ofResultSet(
internalExecuteQuery(statement, analyzeMode, options));
return createAndAddRetryResultSet(
delegate, statement, analyzeMode, options);
} catch (AbortedException e) {
throw e;
} catch (SpannerException e) {
createAndAddFailedQuery(e, statement, analyzeMode, options);
throw e;
}
}),
() -> {
checkTimedOut();
return runWithRetry(
() -> {
try {
getStatementExecutor()
.invokeInterceptors(
statement,
StatementExecutionStep.EXECUTE_STATEMENT,
ReadWriteTransaction.this);
ResultSet delegate =
DirectExecuteResultSet.ofResultSet(
internalExecuteQuery(statement, analyzeMode, options));
return createAndAddRetryResultSet(
delegate, statement, analyzeMode, options);
} catch (AbortedException e) {
throw e;
} catch (SpannerException e) {
createAndAddFailedQuery(e, statement, analyzeMode, options);
throw e;
}
});
},
// ignore interceptors here as they are invoked in the Callable.
InterceptorsUsage.IGNORE_INTERCEPTORS,
ImmutableList.<MethodDescriptor<?, ?>>of(SpannerGrpc.getExecuteStreamingSqlMethod()));
} else {
res = super.executeQueryAsync(statement, analyzeMode, options);
}

ApiFutures.addCallback(
res,
new ApiFutureCallback<ResultSet>() {
Expand Down Expand Up @@ -368,26 +373,28 @@ public ApiFuture<Long> executeUpdateAsync(final ParsedStatement update) {
res =
executeStatementAsync(
update,
() ->
runWithRetry(
() -> {
try {
getStatementExecutor()
.invokeInterceptors(
update,
StatementExecutionStep.EXECUTE_STATEMENT,
ReadWriteTransaction.this);
long updateCount =
get(txContextFuture).executeUpdate(update.getStatement());
createAndAddRetriableUpdate(update, updateCount);
return updateCount;
} catch (AbortedException e) {
throw e;
} catch (SpannerException e) {
createAndAddFailedUpdate(e, update);
throw e;
}
}),
() -> {
checkTimedOut();
return runWithRetry(
() -> {
try {
getStatementExecutor()
.invokeInterceptors(
update,
StatementExecutionStep.EXECUTE_STATEMENT,
ReadWriteTransaction.this);
long updateCount =
get(txContextFuture).executeUpdate(update.getStatement());
createAndAddRetriableUpdate(update, updateCount);
return updateCount;
} catch (AbortedException e) {
throw e;
} catch (SpannerException e) {
createAndAddFailedUpdate(e, update);
throw e;
}
});
},
// ignore interceptors here as they are invoked in the Callable.
InterceptorsUsage.IGNORE_INTERCEPTORS,
ImmutableList.<MethodDescriptor<?, ?>>of(SpannerGrpc.getExecuteSqlMethod()));
Expand All @@ -396,6 +403,7 @@ public ApiFuture<Long> executeUpdateAsync(final ParsedStatement update) {
executeStatementAsync(
update,
() -> {
checkTimedOut();
checkAborted();
return get(txContextFuture).executeUpdate(update.getStatement());
},
Expand Down Expand Up @@ -449,25 +457,27 @@ public ApiFuture<long[]> executeBatchUpdateAsync(Iterable<ParsedStatement> updat
res =
executeStatementAsync(
EXECUTE_BATCH_UPDATE_STATEMENT,
() ->
runWithRetry(
() -> {
try {
getStatementExecutor()
.invokeInterceptors(
EXECUTE_BATCH_UPDATE_STATEMENT,
StatementExecutionStep.EXECUTE_STATEMENT,
ReadWriteTransaction.this);
long[] updateCounts = get(txContextFuture).batchUpdate(updateStatements);
createAndAddRetriableBatchUpdate(updateStatements, updateCounts);
return updateCounts;
} catch (AbortedException e) {
throw e;
} catch (SpannerException e) {
createAndAddFailedBatchUpdate(e, updateStatements);
throw e;
}
}),
() -> {
checkTimedOut();
return runWithRetry(
() -> {
try {
getStatementExecutor()
.invokeInterceptors(
EXECUTE_BATCH_UPDATE_STATEMENT,
StatementExecutionStep.EXECUTE_STATEMENT,
ReadWriteTransaction.this);
long[] updateCounts = get(txContextFuture).batchUpdate(updateStatements);
createAndAddRetriableBatchUpdate(updateStatements, updateCounts);
return updateCounts;
} catch (AbortedException e) {
throw e;
} catch (SpannerException e) {
createAndAddFailedBatchUpdate(e, updateStatements);
throw e;
}
});
},
// ignore interceptors here as they are invoked in the Callable.
InterceptorsUsage.IGNORE_INTERCEPTORS,
ImmutableList.<MethodDescriptor<?, ?>>of(SpannerGrpc.getExecuteBatchDmlMethod()));
Expand All @@ -476,12 +486,12 @@ public ApiFuture<long[]> executeBatchUpdateAsync(Iterable<ParsedStatement> updat
executeStatementAsync(
EXECUTE_BATCH_UPDATE_STATEMENT,
() -> {
checkTimedOut();
checkAborted();
return get(txContextFuture).batchUpdate(updateStatements);
},
SpannerGrpc.getExecuteBatchDmlMethod());
}

ApiFutures.addCallback(
res,
new ApiFutureCallback<long[]>() {
Expand Down Expand Up @@ -546,6 +556,7 @@ public ApiFuture<Void> commitAsync() {
executeStatementAsync(
COMMIT_STATEMENT,
() -> {
checkTimedOut();
try {
return runWithRetry(
() -> {
Expand Down Expand Up @@ -574,6 +585,7 @@ public ApiFuture<Void> commitAsync() {
executeStatementAsync(
COMMIT_STATEMENT,
() -> {
checkTimedOut();
try {
return commitCallable.call();
} catch (Throwable t) {
Expand Down

0 comments on commit 51d753c

Please sign in to comment.