Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: add closeAsync() method to Connection #984

Merged
merged 2 commits into from Mar 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 7 additions & 0 deletions google-cloud-spanner/clirr-ignored-differences.xml
Expand Up @@ -571,4 +571,11 @@
<className>com/google/cloud/spanner/BackupInfo$Builder</className>
<method>com.google.cloud.spanner.BackupInfo$Builder setEncryptionConfig(com.google.cloud.spanner.encryption.BackupEncryptionConfig)</method>
</difference>

<!-- Connection#closeAsync() -->
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>com.google.api.core.ApiFuture closeAsync()</method>
</difference>
</differences>
Expand Up @@ -139,10 +139,19 @@
*/
@InternalApi
public interface Connection extends AutoCloseable {
/** Closes this connection. This is a no-op if the {@link Connection} has alread been closed. */

/** Closes this connection. This is a no-op if the {@link Connection} has already been closed. */
@Override
void close();

/**
* Closes this connection without blocking. This is a no-op if the {@link Connection} has already
* been closed. The {@link Connection} is no longer usable directly after calling this method. The
* returned {@link ApiFuture} is done when the running statement(s) (if any) on the connection
* have finished.
*/
ApiFuture<Void> closeAsync();

/** @return <code>true</code> if this connection has been closed. */
boolean isClosed();

Expand Down
Expand Up @@ -18,6 +18,7 @@

import static com.google.cloud.spanner.SpannerApiFutures.get;

import com.google.api.core.ApiFunction;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.cloud.Timestamp;
Expand All @@ -42,15 +43,19 @@
import com.google.cloud.spanner.connection.UnitOfWork.UnitOfWorkState;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.spanner.v1.ExecuteSqlRequest.QueryOptions;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Stack;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.threeten.bp.Instant;

/** Implementation for {@link Connection}, the generic Spanner connection API (not JDBC). */
Expand Down Expand Up @@ -257,28 +262,49 @@ private DdlClient createDdlClient() {

@Override
public void close() {
try {
closeAsync().get(10L, TimeUnit.SECONDS);
} catch (SpannerException | InterruptedException | ExecutionException | TimeoutException e) {
// ignore and continue to close the connection.
} finally {
statementExecutor.shutdownNow();
}
}

public ApiFuture<Void> closeAsync() {
if (!isClosed()) {
try {
if (isTransactionStarted()) {
try {
rollback();
} catch (Exception e) {
// Ignore as we are closing the connection.
}
}
// Try to wait for the current statement to finish (if any) before we actually close the
// connection.
this.closed = true;
statementExecutor.shutdown();
leakedException = null;
spannerPool.removeConnection(options, this);
statementExecutor.awaitTermination(10L, TimeUnit.SECONDS);
} catch (InterruptedException e) {
// ignore and continue to close the connection.
} finally {
statementExecutor.shutdownNow();
List<ApiFuture<Void>> futures = new ArrayList<>();
if (isTransactionStarted()) {
futures.add(rollbackAsync());
}
// Try to wait for the current statement to finish (if any) before we actually close the
// connection.
this.closed = true;
// Add a no-op statement to the execute. Once this has been executed, we know that all
// preceeding statements have also been executed, as the executor is single-threaded and
// executes all statements in order of submitting.
futures.add(
statementExecutor.submit(
new Callable<Void>() {
@Override
public Void call() throws Exception {
return null;
}
}));
statementExecutor.shutdown();
leakedException = null;
spannerPool.removeConnection(options, this);
return ApiFutures.transform(
ApiFutures.allAsList(futures),
new ApiFunction<List<Void>, Void>() {
@Override
public Void apply(List<Void> input) {
return null;
}
},
MoreExecutors.directExecutor());
}
return ApiFutures.immediateFuture(null);
}

/** Get the current unit-of-work type of this connection. */
Expand Down