Skip to content

Commit

Permalink
feat!: add closeAsync() method to Connection (#984)
Browse files Browse the repository at this point in the history
Co-authored-by: Thiago Nunes <thiagotnunes@google.com>
  • Loading branch information
olavloite and thiagotnunes committed Mar 18, 2021
1 parent 29a200d commit e7ec96e
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 20 deletions.
7 changes: 7 additions & 0 deletions google-cloud-spanner/clirr-ignored-differences.xml
Expand Up @@ -571,4 +571,11 @@
<className>com/google/cloud/spanner/BackupInfo$Builder</className>
<method>com.google.cloud.spanner.BackupInfo$Builder setEncryptionConfig(com.google.cloud.spanner.encryption.BackupEncryptionConfig)</method>
</difference>

<!-- Connection#closeAsync() -->
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>com.google.api.core.ApiFuture closeAsync()</method>
</difference>
</differences>
Expand Up @@ -139,10 +139,19 @@
*/
@InternalApi
public interface Connection extends AutoCloseable {
/** Closes this connection. This is a no-op if the {@link Connection} has alread been closed. */

/** Closes this connection. This is a no-op if the {@link Connection} has already been closed. */
@Override
void close();

/**
* Closes this connection without blocking. This is a no-op if the {@link Connection} has already
* been closed. The {@link Connection} is no longer usable directly after calling this method. The
* returned {@link ApiFuture} is done when the running statement(s) (if any) on the connection
* have finished.
*/
ApiFuture<Void> closeAsync();

/** @return <code>true</code> if this connection has been closed. */
boolean isClosed();

Expand Down
Expand Up @@ -18,6 +18,7 @@

import static com.google.cloud.spanner.SpannerApiFutures.get;

import com.google.api.core.ApiFunction;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.cloud.Timestamp;
Expand All @@ -42,15 +43,19 @@
import com.google.cloud.spanner.connection.UnitOfWork.UnitOfWorkState;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.spanner.v1.ExecuteSqlRequest.QueryOptions;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Stack;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.threeten.bp.Instant;

/** Implementation for {@link Connection}, the generic Spanner connection API (not JDBC). */
Expand Down Expand Up @@ -257,28 +262,49 @@ private DdlClient createDdlClient() {

@Override
public void close() {
try {
closeAsync().get(10L, TimeUnit.SECONDS);
} catch (SpannerException | InterruptedException | ExecutionException | TimeoutException e) {
// ignore and continue to close the connection.
} finally {
statementExecutor.shutdownNow();
}
}

public ApiFuture<Void> closeAsync() {
if (!isClosed()) {
try {
if (isTransactionStarted()) {
try {
rollback();
} catch (Exception e) {
// Ignore as we are closing the connection.
}
}
// Try to wait for the current statement to finish (if any) before we actually close the
// connection.
this.closed = true;
statementExecutor.shutdown();
leakedException = null;
spannerPool.removeConnection(options, this);
statementExecutor.awaitTermination(10L, TimeUnit.SECONDS);
} catch (InterruptedException e) {
// ignore and continue to close the connection.
} finally {
statementExecutor.shutdownNow();
List<ApiFuture<Void>> futures = new ArrayList<>();
if (isTransactionStarted()) {
futures.add(rollbackAsync());
}
// Try to wait for the current statement to finish (if any) before we actually close the
// connection.
this.closed = true;
// Add a no-op statement to the execute. Once this has been executed, we know that all
// preceeding statements have also been executed, as the executor is single-threaded and
// executes all statements in order of submitting.
futures.add(
statementExecutor.submit(
new Callable<Void>() {
@Override
public Void call() throws Exception {
return null;
}
}));
statementExecutor.shutdown();
leakedException = null;
spannerPool.removeConnection(options, this);
return ApiFutures.transform(
ApiFutures.allAsList(futures),
new ApiFunction<List<Void>, Void>() {
@Override
public Void apply(List<Void> input) {
return null;
}
},
MoreExecutors.directExecutor());
}
return ApiFutures.immediateFuture(null);
}

/** Get the current unit-of-work type of this connection. */
Expand Down

0 comments on commit e7ec96e

Please sign in to comment.