From e7ec96ec09a9d273d4f576356d3e4c6cbbb6de9e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Knut=20Olav=20L=C3=B8ite?= Date: Thu, 18 Mar 2021 06:33:32 +0100 Subject: [PATCH] feat!: add closeAsync() method to Connection (#984) Co-authored-by: Thiago Nunes --- .../clirr-ignored-differences.xml | 7 ++ .../cloud/spanner/connection/Connection.java | 11 +++- .../spanner/connection/ConnectionImpl.java | 64 +++++++++++++------ 3 files changed, 62 insertions(+), 20 deletions(-) diff --git a/google-cloud-spanner/clirr-ignored-differences.xml b/google-cloud-spanner/clirr-ignored-differences.xml index f340f5e196..efc5fd4de3 100644 --- a/google-cloud-spanner/clirr-ignored-differences.xml +++ b/google-cloud-spanner/clirr-ignored-differences.xml @@ -571,4 +571,11 @@ com/google/cloud/spanner/BackupInfo$Builder com.google.cloud.spanner.BackupInfo$Builder setEncryptionConfig(com.google.cloud.spanner.encryption.BackupEncryptionConfig) + + + + 7012 + com/google/cloud/spanner/connection/Connection + com.google.api.core.ApiFuture closeAsync() + diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/Connection.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/Connection.java index bd1214d6a1..4a0bcf2701 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/Connection.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/Connection.java @@ -139,10 +139,19 @@ */ @InternalApi public interface Connection extends AutoCloseable { - /** Closes this connection. This is a no-op if the {@link Connection} has alread been closed. */ + + /** Closes this connection. This is a no-op if the {@link Connection} has already been closed. */ @Override void close(); + /** + * Closes this connection without blocking. This is a no-op if the {@link Connection} has already + * been closed. The {@link Connection} is no longer usable directly after calling this method. The + * returned {@link ApiFuture} is done when the running statement(s) (if any) on the connection + * have finished. + */ + ApiFuture closeAsync(); + /** @return true if this connection has been closed. */ boolean isClosed(); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionImpl.java index 4f9703807a..41fca0fcfb 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionImpl.java @@ -18,6 +18,7 @@ import static com.google.cloud.spanner.SpannerApiFutures.get; +import com.google.api.core.ApiFunction; import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; import com.google.cloud.Timestamp; @@ -42,6 +43,7 @@ import com.google.cloud.spanner.connection.UnitOfWork.UnitOfWorkState; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.MoreExecutors; import com.google.spanner.v1.ExecuteSqlRequest.QueryOptions; import java.util.ArrayList; import java.util.Collections; @@ -49,8 +51,11 @@ import java.util.LinkedList; import java.util.List; import java.util.Stack; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.threeten.bp.Instant; /** Implementation for {@link Connection}, the generic Spanner connection API (not JDBC). */ @@ -257,28 +262,49 @@ private DdlClient createDdlClient() { @Override public void close() { + try { + closeAsync().get(10L, TimeUnit.SECONDS); + } catch (SpannerException | InterruptedException | ExecutionException | TimeoutException e) { + // ignore and continue to close the connection. + } finally { + statementExecutor.shutdownNow(); + } + } + + public ApiFuture closeAsync() { if (!isClosed()) { - try { - if (isTransactionStarted()) { - try { - rollback(); - } catch (Exception e) { - // Ignore as we are closing the connection. - } - } - // Try to wait for the current statement to finish (if any) before we actually close the - // connection. - this.closed = true; - statementExecutor.shutdown(); - leakedException = null; - spannerPool.removeConnection(options, this); - statementExecutor.awaitTermination(10L, TimeUnit.SECONDS); - } catch (InterruptedException e) { - // ignore and continue to close the connection. - } finally { - statementExecutor.shutdownNow(); + List> futures = new ArrayList<>(); + if (isTransactionStarted()) { + futures.add(rollbackAsync()); } + // Try to wait for the current statement to finish (if any) before we actually close the + // connection. + this.closed = true; + // Add a no-op statement to the execute. Once this has been executed, we know that all + // preceeding statements have also been executed, as the executor is single-threaded and + // executes all statements in order of submitting. + futures.add( + statementExecutor.submit( + new Callable() { + @Override + public Void call() throws Exception { + return null; + } + })); + statementExecutor.shutdown(); + leakedException = null; + spannerPool.removeConnection(options, this); + return ApiFutures.transform( + ApiFutures.allAsList(futures), + new ApiFunction, Void>() { + @Override + public Void apply(List input) { + return null; + } + }, + MoreExecutors.directExecutor()); } + return ApiFutures.immediateFuture(null); } /** Get the current unit-of-work type of this connection. */