diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ReadWriteTransaction.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ReadWriteTransaction.java index 39ec31ce8a..feb8fa2994 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ReadWriteTransaction.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ReadWriteTransaction.java @@ -201,6 +201,10 @@ private void checkValidState() { + "or " + UnitOfWorkState.ABORTED + " is allowed."); + checkTimedOut(); + } + + private void checkTimedOut() { ConnectionPreconditions.checkState( !timedOutOrCancelled, "The last statement of this transaction timed out or was cancelled. " @@ -313,34 +317,35 @@ public ApiFuture executeQueryAsync( res = executeStatementAsync( statement, - () -> - runWithRetry( - () -> { - try { - getStatementExecutor() - .invokeInterceptors( - statement, - StatementExecutionStep.EXECUTE_STATEMENT, - ReadWriteTransaction.this); - ResultSet delegate = - DirectExecuteResultSet.ofResultSet( - internalExecuteQuery(statement, analyzeMode, options)); - return createAndAddRetryResultSet( - delegate, statement, analyzeMode, options); - } catch (AbortedException e) { - throw e; - } catch (SpannerException e) { - createAndAddFailedQuery(e, statement, analyzeMode, options); - throw e; - } - }), + () -> { + checkTimedOut(); + return runWithRetry( + () -> { + try { + getStatementExecutor() + .invokeInterceptors( + statement, + StatementExecutionStep.EXECUTE_STATEMENT, + ReadWriteTransaction.this); + ResultSet delegate = + DirectExecuteResultSet.ofResultSet( + internalExecuteQuery(statement, analyzeMode, options)); + return createAndAddRetryResultSet( + delegate, statement, analyzeMode, options); + } catch (AbortedException e) { + throw e; + } catch (SpannerException e) { + createAndAddFailedQuery(e, statement, analyzeMode, options); + throw e; + } + }); + }, // ignore interceptors here as they are invoked in the Callable. InterceptorsUsage.IGNORE_INTERCEPTORS, ImmutableList.>of(SpannerGrpc.getExecuteStreamingSqlMethod())); } else { res = super.executeQueryAsync(statement, analyzeMode, options); } - ApiFutures.addCallback( res, new ApiFutureCallback() { @@ -368,26 +373,28 @@ public ApiFuture executeUpdateAsync(final ParsedStatement update) { res = executeStatementAsync( update, - () -> - runWithRetry( - () -> { - try { - getStatementExecutor() - .invokeInterceptors( - update, - StatementExecutionStep.EXECUTE_STATEMENT, - ReadWriteTransaction.this); - long updateCount = - get(txContextFuture).executeUpdate(update.getStatement()); - createAndAddRetriableUpdate(update, updateCount); - return updateCount; - } catch (AbortedException e) { - throw e; - } catch (SpannerException e) { - createAndAddFailedUpdate(e, update); - throw e; - } - }), + () -> { + checkTimedOut(); + return runWithRetry( + () -> { + try { + getStatementExecutor() + .invokeInterceptors( + update, + StatementExecutionStep.EXECUTE_STATEMENT, + ReadWriteTransaction.this); + long updateCount = + get(txContextFuture).executeUpdate(update.getStatement()); + createAndAddRetriableUpdate(update, updateCount); + return updateCount; + } catch (AbortedException e) { + throw e; + } catch (SpannerException e) { + createAndAddFailedUpdate(e, update); + throw e; + } + }); + }, // ignore interceptors here as they are invoked in the Callable. InterceptorsUsage.IGNORE_INTERCEPTORS, ImmutableList.>of(SpannerGrpc.getExecuteSqlMethod())); @@ -396,6 +403,7 @@ public ApiFuture executeUpdateAsync(final ParsedStatement update) { executeStatementAsync( update, () -> { + checkTimedOut(); checkAborted(); return get(txContextFuture).executeUpdate(update.getStatement()); }, @@ -449,25 +457,27 @@ public ApiFuture executeBatchUpdateAsync(Iterable updat res = executeStatementAsync( EXECUTE_BATCH_UPDATE_STATEMENT, - () -> - runWithRetry( - () -> { - try { - getStatementExecutor() - .invokeInterceptors( - EXECUTE_BATCH_UPDATE_STATEMENT, - StatementExecutionStep.EXECUTE_STATEMENT, - ReadWriteTransaction.this); - long[] updateCounts = get(txContextFuture).batchUpdate(updateStatements); - createAndAddRetriableBatchUpdate(updateStatements, updateCounts); - return updateCounts; - } catch (AbortedException e) { - throw e; - } catch (SpannerException e) { - createAndAddFailedBatchUpdate(e, updateStatements); - throw e; - } - }), + () -> { + checkTimedOut(); + return runWithRetry( + () -> { + try { + getStatementExecutor() + .invokeInterceptors( + EXECUTE_BATCH_UPDATE_STATEMENT, + StatementExecutionStep.EXECUTE_STATEMENT, + ReadWriteTransaction.this); + long[] updateCounts = get(txContextFuture).batchUpdate(updateStatements); + createAndAddRetriableBatchUpdate(updateStatements, updateCounts); + return updateCounts; + } catch (AbortedException e) { + throw e; + } catch (SpannerException e) { + createAndAddFailedBatchUpdate(e, updateStatements); + throw e; + } + }); + }, // ignore interceptors here as they are invoked in the Callable. InterceptorsUsage.IGNORE_INTERCEPTORS, ImmutableList.>of(SpannerGrpc.getExecuteBatchDmlMethod())); @@ -476,12 +486,12 @@ public ApiFuture executeBatchUpdateAsync(Iterable updat executeStatementAsync( EXECUTE_BATCH_UPDATE_STATEMENT, () -> { + checkTimedOut(); checkAborted(); return get(txContextFuture).batchUpdate(updateStatements); }, SpannerGrpc.getExecuteBatchDmlMethod()); } - ApiFutures.addCallback( res, new ApiFutureCallback() { @@ -546,6 +556,7 @@ public ApiFuture commitAsync() { executeStatementAsync( COMMIT_STATEMENT, () -> { + checkTimedOut(); try { return runWithRetry( () -> { @@ -574,6 +585,7 @@ public ApiFuture commitAsync() { executeStatementAsync( COMMIT_STATEMENT, () -> { + checkTimedOut(); try { return commitCallable.call(); } catch (Throwable t) {