From 51d753c507e7248132eb5d6ea2c4b735542eda49 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Knut=20Olav=20L=C3=B8ite?= Date: Sun, 25 Apr 2021 04:48:14 +0200 Subject: [PATCH] fix: check for timeout in connection after last statement finished (#1086) The check whether the previous statement timed out in the Connection API was done when a statement was submitted to the connection, and not when the statement was executed. That could cause a race condition, as statements are executed using a separate thread, while submitting a statement is done using the main thread. This could cause a statement to return an error with code ABORTED instead of FAILED_PRECONDITION. A statement on a read/write transaction would always return an error when a/the previous statement timed out, only the error code could be different depending on whether the race condition occurred or not. This is now fixed so that the error code is always FAILED_PRECONDITION and the error indicates that a read/write transaction is no longer usable after a statement timeout. Fixes #1077 --- .../connection/ReadWriteTransaction.java | 136 ++++++++++-------- 1 file changed, 74 insertions(+), 62 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ReadWriteTransaction.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ReadWriteTransaction.java index 39ec31ce8a..feb8fa2994 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ReadWriteTransaction.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ReadWriteTransaction.java @@ -201,6 +201,10 @@ private void checkValidState() { + "or " + UnitOfWorkState.ABORTED + " is allowed."); + checkTimedOut(); + } + + private void checkTimedOut() { ConnectionPreconditions.checkState( !timedOutOrCancelled, "The last statement of this transaction timed out or was cancelled. " @@ -313,34 +317,35 @@ public ApiFuture executeQueryAsync( res = executeStatementAsync( statement, - () -> - runWithRetry( - () -> { - try { - getStatementExecutor() - .invokeInterceptors( - statement, - StatementExecutionStep.EXECUTE_STATEMENT, - ReadWriteTransaction.this); - ResultSet delegate = - DirectExecuteResultSet.ofResultSet( - internalExecuteQuery(statement, analyzeMode, options)); - return createAndAddRetryResultSet( - delegate, statement, analyzeMode, options); - } catch (AbortedException e) { - throw e; - } catch (SpannerException e) { - createAndAddFailedQuery(e, statement, analyzeMode, options); - throw e; - } - }), + () -> { + checkTimedOut(); + return runWithRetry( + () -> { + try { + getStatementExecutor() + .invokeInterceptors( + statement, + StatementExecutionStep.EXECUTE_STATEMENT, + ReadWriteTransaction.this); + ResultSet delegate = + DirectExecuteResultSet.ofResultSet( + internalExecuteQuery(statement, analyzeMode, options)); + return createAndAddRetryResultSet( + delegate, statement, analyzeMode, options); + } catch (AbortedException e) { + throw e; + } catch (SpannerException e) { + createAndAddFailedQuery(e, statement, analyzeMode, options); + throw e; + } + }); + }, // ignore interceptors here as they are invoked in the Callable. InterceptorsUsage.IGNORE_INTERCEPTORS, ImmutableList.>of(SpannerGrpc.getExecuteStreamingSqlMethod())); } else { res = super.executeQueryAsync(statement, analyzeMode, options); } - ApiFutures.addCallback( res, new ApiFutureCallback() { @@ -368,26 +373,28 @@ public ApiFuture executeUpdateAsync(final ParsedStatement update) { res = executeStatementAsync( update, - () -> - runWithRetry( - () -> { - try { - getStatementExecutor() - .invokeInterceptors( - update, - StatementExecutionStep.EXECUTE_STATEMENT, - ReadWriteTransaction.this); - long updateCount = - get(txContextFuture).executeUpdate(update.getStatement()); - createAndAddRetriableUpdate(update, updateCount); - return updateCount; - } catch (AbortedException e) { - throw e; - } catch (SpannerException e) { - createAndAddFailedUpdate(e, update); - throw e; - } - }), + () -> { + checkTimedOut(); + return runWithRetry( + () -> { + try { + getStatementExecutor() + .invokeInterceptors( + update, + StatementExecutionStep.EXECUTE_STATEMENT, + ReadWriteTransaction.this); + long updateCount = + get(txContextFuture).executeUpdate(update.getStatement()); + createAndAddRetriableUpdate(update, updateCount); + return updateCount; + } catch (AbortedException e) { + throw e; + } catch (SpannerException e) { + createAndAddFailedUpdate(e, update); + throw e; + } + }); + }, // ignore interceptors here as they are invoked in the Callable. InterceptorsUsage.IGNORE_INTERCEPTORS, ImmutableList.>of(SpannerGrpc.getExecuteSqlMethod())); @@ -396,6 +403,7 @@ public ApiFuture executeUpdateAsync(final ParsedStatement update) { executeStatementAsync( update, () -> { + checkTimedOut(); checkAborted(); return get(txContextFuture).executeUpdate(update.getStatement()); }, @@ -449,25 +457,27 @@ public ApiFuture executeBatchUpdateAsync(Iterable updat res = executeStatementAsync( EXECUTE_BATCH_UPDATE_STATEMENT, - () -> - runWithRetry( - () -> { - try { - getStatementExecutor() - .invokeInterceptors( - EXECUTE_BATCH_UPDATE_STATEMENT, - StatementExecutionStep.EXECUTE_STATEMENT, - ReadWriteTransaction.this); - long[] updateCounts = get(txContextFuture).batchUpdate(updateStatements); - createAndAddRetriableBatchUpdate(updateStatements, updateCounts); - return updateCounts; - } catch (AbortedException e) { - throw e; - } catch (SpannerException e) { - createAndAddFailedBatchUpdate(e, updateStatements); - throw e; - } - }), + () -> { + checkTimedOut(); + return runWithRetry( + () -> { + try { + getStatementExecutor() + .invokeInterceptors( + EXECUTE_BATCH_UPDATE_STATEMENT, + StatementExecutionStep.EXECUTE_STATEMENT, + ReadWriteTransaction.this); + long[] updateCounts = get(txContextFuture).batchUpdate(updateStatements); + createAndAddRetriableBatchUpdate(updateStatements, updateCounts); + return updateCounts; + } catch (AbortedException e) { + throw e; + } catch (SpannerException e) { + createAndAddFailedBatchUpdate(e, updateStatements); + throw e; + } + }); + }, // ignore interceptors here as they are invoked in the Callable. InterceptorsUsage.IGNORE_INTERCEPTORS, ImmutableList.>of(SpannerGrpc.getExecuteBatchDmlMethod())); @@ -476,12 +486,12 @@ public ApiFuture executeBatchUpdateAsync(Iterable updat executeStatementAsync( EXECUTE_BATCH_UPDATE_STATEMENT, () -> { + checkTimedOut(); checkAborted(); return get(txContextFuture).batchUpdate(updateStatements); }, SpannerGrpc.getExecuteBatchDmlMethod()); } - ApiFutures.addCallback( res, new ApiFutureCallback() { @@ -546,6 +556,7 @@ public ApiFuture commitAsync() { executeStatementAsync( COMMIT_STATEMENT, () -> { + checkTimedOut(); try { return runWithRetry( () -> { @@ -574,6 +585,7 @@ public ApiFuture commitAsync() { executeStatementAsync( COMMIT_STATEMENT, () -> { + checkTimedOut(); try { return commitCallable.call(); } catch (Throwable t) {