Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: check for timeout in connection after last statement finished #1086

Merged
merged 1 commit into from Apr 25, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -201,6 +201,10 @@ private void checkValidState() {
+ "or "
+ UnitOfWorkState.ABORTED
+ " is allowed.");
checkTimedOut();
}

private void checkTimedOut() {
ConnectionPreconditions.checkState(
!timedOutOrCancelled,
"The last statement of this transaction timed out or was cancelled. "
Expand Down Expand Up @@ -313,34 +317,35 @@ public ApiFuture<ResultSet> executeQueryAsync(
res =
executeStatementAsync(
statement,
() ->
runWithRetry(
() -> {
try {
getStatementExecutor()
.invokeInterceptors(
statement,
StatementExecutionStep.EXECUTE_STATEMENT,
ReadWriteTransaction.this);
ResultSet delegate =
DirectExecuteResultSet.ofResultSet(
internalExecuteQuery(statement, analyzeMode, options));
return createAndAddRetryResultSet(
delegate, statement, analyzeMode, options);
} catch (AbortedException e) {
throw e;
} catch (SpannerException e) {
createAndAddFailedQuery(e, statement, analyzeMode, options);
throw e;
}
}),
() -> {
checkTimedOut();
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes seem bigger than they are. The only real difference is that the checkTimeout() call is added as the first statement of the lambda.

return runWithRetry(
() -> {
try {
getStatementExecutor()
.invokeInterceptors(
statement,
StatementExecutionStep.EXECUTE_STATEMENT,
ReadWriteTransaction.this);
ResultSet delegate =
DirectExecuteResultSet.ofResultSet(
internalExecuteQuery(statement, analyzeMode, options));
return createAndAddRetryResultSet(
delegate, statement, analyzeMode, options);
} catch (AbortedException e) {
throw e;
} catch (SpannerException e) {
createAndAddFailedQuery(e, statement, analyzeMode, options);
throw e;
}
});
},
// ignore interceptors here as they are invoked in the Callable.
InterceptorsUsage.IGNORE_INTERCEPTORS,
ImmutableList.<MethodDescriptor<?, ?>>of(SpannerGrpc.getExecuteStreamingSqlMethod()));
} else {
res = super.executeQueryAsync(statement, analyzeMode, options);
}

ApiFutures.addCallback(
res,
new ApiFutureCallback<ResultSet>() {
Expand Down Expand Up @@ -368,26 +373,28 @@ public ApiFuture<Long> executeUpdateAsync(final ParsedStatement update) {
res =
executeStatementAsync(
update,
() ->
runWithRetry(
() -> {
try {
getStatementExecutor()
.invokeInterceptors(
update,
StatementExecutionStep.EXECUTE_STATEMENT,
ReadWriteTransaction.this);
long updateCount =
get(txContextFuture).executeUpdate(update.getStatement());
createAndAddRetriableUpdate(update, updateCount);
return updateCount;
} catch (AbortedException e) {
throw e;
} catch (SpannerException e) {
createAndAddFailedUpdate(e, update);
throw e;
}
}),
() -> {
checkTimedOut();
return runWithRetry(
() -> {
try {
getStatementExecutor()
.invokeInterceptors(
update,
StatementExecutionStep.EXECUTE_STATEMENT,
ReadWriteTransaction.this);
long updateCount =
get(txContextFuture).executeUpdate(update.getStatement());
createAndAddRetriableUpdate(update, updateCount);
return updateCount;
} catch (AbortedException e) {
throw e;
} catch (SpannerException e) {
createAndAddFailedUpdate(e, update);
throw e;
}
});
},
// ignore interceptors here as they are invoked in the Callable.
InterceptorsUsage.IGNORE_INTERCEPTORS,
ImmutableList.<MethodDescriptor<?, ?>>of(SpannerGrpc.getExecuteSqlMethod()));
Expand All @@ -396,6 +403,7 @@ public ApiFuture<Long> executeUpdateAsync(final ParsedStatement update) {
executeStatementAsync(
update,
() -> {
checkTimedOut();
checkAborted();
return get(txContextFuture).executeUpdate(update.getStatement());
},
Expand Down Expand Up @@ -449,25 +457,27 @@ public ApiFuture<long[]> executeBatchUpdateAsync(Iterable<ParsedStatement> updat
res =
executeStatementAsync(
EXECUTE_BATCH_UPDATE_STATEMENT,
() ->
runWithRetry(
() -> {
try {
getStatementExecutor()
.invokeInterceptors(
EXECUTE_BATCH_UPDATE_STATEMENT,
StatementExecutionStep.EXECUTE_STATEMENT,
ReadWriteTransaction.this);
long[] updateCounts = get(txContextFuture).batchUpdate(updateStatements);
createAndAddRetriableBatchUpdate(updateStatements, updateCounts);
return updateCounts;
} catch (AbortedException e) {
throw e;
} catch (SpannerException e) {
createAndAddFailedBatchUpdate(e, updateStatements);
throw e;
}
}),
() -> {
checkTimedOut();
return runWithRetry(
() -> {
try {
getStatementExecutor()
.invokeInterceptors(
EXECUTE_BATCH_UPDATE_STATEMENT,
StatementExecutionStep.EXECUTE_STATEMENT,
ReadWriteTransaction.this);
long[] updateCounts = get(txContextFuture).batchUpdate(updateStatements);
createAndAddRetriableBatchUpdate(updateStatements, updateCounts);
return updateCounts;
} catch (AbortedException e) {
throw e;
} catch (SpannerException e) {
createAndAddFailedBatchUpdate(e, updateStatements);
throw e;
}
});
},
// ignore interceptors here as they are invoked in the Callable.
InterceptorsUsage.IGNORE_INTERCEPTORS,
ImmutableList.<MethodDescriptor<?, ?>>of(SpannerGrpc.getExecuteBatchDmlMethod()));
Expand All @@ -476,12 +486,12 @@ public ApiFuture<long[]> executeBatchUpdateAsync(Iterable<ParsedStatement> updat
executeStatementAsync(
EXECUTE_BATCH_UPDATE_STATEMENT,
() -> {
checkTimedOut();
checkAborted();
return get(txContextFuture).batchUpdate(updateStatements);
},
SpannerGrpc.getExecuteBatchDmlMethod());
}

ApiFutures.addCallback(
res,
new ApiFutureCallback<long[]>() {
Expand Down Expand Up @@ -546,6 +556,7 @@ public ApiFuture<Void> commitAsync() {
executeStatementAsync(
COMMIT_STATEMENT,
() -> {
checkTimedOut();
try {
return runWithRetry(
() -> {
Expand Down Expand Up @@ -574,6 +585,7 @@ public ApiFuture<Void> commitAsync() {
executeStatementAsync(
COMMIT_STATEMENT,
() -> {
checkTimedOut();
try {
return commitCallable.call();
} catch (Throwable t) {
Expand Down