Skip to content

Commit

Permalink
feat: mark when a Spanner client is closed (#198)
Browse files Browse the repository at this point in the history
Closing a Spanner client means that all resources that have been returned
by the client are no longer valid, including all DatabaseClients and
corresponding session pools. This will cause errors for any other process
that might still want to use these resources. This change marks when and by
which call stack a Spanner client is closed, and includes that in any
subsequent IllegalStateException that is returned to any process that tries
to use the resources that have been returned by the Spanner client. This
makes it easier to track down where and when a Spanner client is closed by
accident.
  • Loading branch information
olavloite committed May 14, 2020
1 parent a608460 commit 50cb174
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 31 deletions.
Expand Up @@ -18,6 +18,7 @@

import com.google.cloud.Timestamp;
import com.google.cloud.spanner.SessionPool.PooledSession;
import com.google.cloud.spanner.SpannerImpl.ClosedException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.util.concurrent.ListenableFuture;
Expand Down Expand Up @@ -225,7 +226,7 @@ private <T> T runWithSessionRetry(SessionMode mode, Function<Session, T> callabl
}
}

ListenableFuture<Void> closeAsync() {
return pool.closeAsync();
ListenableFuture<Void> closeAsync(ClosedException closedException) {
return pool.closeAsync(closedException);
}
}
Expand Up @@ -42,6 +42,7 @@
import com.google.cloud.spanner.Options.ReadOption;
import com.google.cloud.spanner.SessionClient.SessionConsumer;
import com.google.cloud.spanner.SpannerException.ResourceNotFoundException;
import com.google.cloud.spanner.SpannerImpl.ClosedException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.MoreObjects;
Expand Down Expand Up @@ -1123,6 +1124,9 @@ private static enum Position {
@GuardedBy("lock")
private SettableFuture<Void> closureFuture;

@GuardedBy("lock")
private ClosedException closedException;

@GuardedBy("lock")
private ResourceNotFoundException resourceNotFoundException;

Expand Down Expand Up @@ -1428,7 +1432,7 @@ PooledSession getReadSession() throws SpannerException {
synchronized (lock) {
if (closureFuture != null) {
span.addAnnotation("Pool has been closed");
throw new IllegalStateException("Pool has been closed");
throw new IllegalStateException("Pool has been closed", closedException);
}
if (resourceNotFoundException != null) {
span.addAnnotation("Database has been deleted");
Expand Down Expand Up @@ -1497,7 +1501,7 @@ PooledSession getReadWriteSession() {
synchronized (lock) {
if (closureFuture != null) {
span.addAnnotation("Pool has been closed");
throw new IllegalStateException("Pool has been closed");
throw new IllegalStateException("Pool has been closed", closedException);
}
if (resourceNotFoundException != null) {
span.addAnnotation("Database has been deleted");
Expand Down Expand Up @@ -1761,12 +1765,13 @@ private void decrementPendingClosures(int count) {
* #getReadWriteSession()} will start throwing {@code IllegalStateException}. The returned future
* blocks till all the sessions created in this pool have been closed.
*/
ListenableFuture<Void> closeAsync() {
ListenableFuture<Void> closeAsync(ClosedException closedException) {
ListenableFuture<Void> retFuture = null;
synchronized (lock) {
if (closureFuture != null) {
throw new IllegalStateException("Close has already been invoked");
throw new IllegalStateException("Close has already been invoked", this.closedException);
}
this.closedException = closedException;
// Fail all pending waiters.
Waiter waiter = readWaiters.poll();
while (waiter != null) {
Expand Down
Expand Up @@ -46,6 +46,7 @@
import java.util.logging.Logger;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.threeten.bp.Instant;

/** Default implementation of the Cloud Spanner interface. */
class SpannerImpl extends BaseService<SpannerOptions> implements Spanner {
Expand Down Expand Up @@ -94,8 +95,22 @@ private static String nextDatabaseClientId(DatabaseId databaseId) {
private final DatabaseAdminClient dbAdminClient;
private final InstanceAdminClient instanceClient;

/**
* Exception class used to track the stack trace at the point when a Spanner instance is closed.
* This exception will be thrown if a user tries to use any resources that were returned by this
* Spanner instance after the instance has been closed. This makes it easier to track down the
* code that (accidently) closed the Spanner instance.
*/
static final class ClosedException extends RuntimeException {
private static final long serialVersionUID = 1451131180314064914L;

ClosedException() {
super("Spanner client was closed at " + Instant.now());
}
}

@GuardedBy("this")
private boolean spannerIsClosed = false;
private ClosedException closedException;

@VisibleForTesting
SpannerImpl(SpannerRpc gapicRpc, SpannerOptions options) {
Expand Down Expand Up @@ -131,9 +146,17 @@ SessionImpl sessionWithId(String name) {
return getSessionClient(id.getDatabaseId()).sessionWithId(name);
}

void checkClosed() {
synchronized (this) {
if (closedException != null) {
throw new IllegalStateException("Cloud Spanner client has been closed", closedException);
}
}
}

SessionClient getSessionClient(DatabaseId db) {
synchronized (this) {
Preconditions.checkState(!spannerIsClosed, "Cloud Spanner client has been closed");
checkClosed();
if (sessionClients.containsKey(db)) {
return sessionClients.get(db);
} else {
Expand Down Expand Up @@ -161,7 +184,7 @@ public InstanceAdminClient getInstanceAdminClient() {
@Override
public DatabaseClient getDatabaseClient(DatabaseId db) {
synchronized (this) {
Preconditions.checkState(!spannerIsClosed, "Cloud Spanner client has been closed");
checkClosed();
if (dbClients.containsKey(db) && !dbClients.get(db).pool.isValid()) {
// Move the invalidated client to a separate list, so we can close it together with the
// other database clients when the Spanner instance is closed.
Expand Down Expand Up @@ -206,12 +229,12 @@ public void close() {
void close(long timeout, TimeUnit unit) {
List<ListenableFuture<Void>> closureFutures = null;
synchronized (this) {
Preconditions.checkState(!spannerIsClosed, "Cloud Spanner client has been closed");
spannerIsClosed = true;
checkClosed();
closedException = new ClosedException();
closureFutures = new ArrayList<>();
invalidatedDbClients.addAll(dbClients.values());
for (DatabaseClientImpl dbClient : invalidatedDbClients) {
closureFutures.add(dbClient.closeAsync());
closureFutures.add(dbClient.closeAsync(closedException));
}
dbClients.clear();
}
Expand All @@ -234,7 +257,9 @@ void close(long timeout, TimeUnit unit) {

@Override
public boolean isClosed() {
return spannerIsClosed;
synchronized (this) {
return closedException != null;
}
}

/** Helper class for gRPC calls that can return paginated results. */
Expand Down
Expand Up @@ -159,18 +159,18 @@ public void run() {

@Test
public void closeQuicklyDoesNotBlockIndefinitely() throws Exception {
pool.closeAsync().get();
pool.closeAsync(new SpannerImpl.ClosedException()).get();
}

@Test
public void closeAfterInitialCreateDoesNotBlockIndefinitely() throws Exception {
pool.getReadSession().close();
pool.closeAsync().get();
pool.closeAsync(new SpannerImpl.ClosedException()).get();
}

@Test
public void closeWhenSessionsActiveFinishes() throws Exception {
Session session = pool.getReadSession();
pool.closeAsync().get();
pool.closeAsync(new SpannerImpl.ClosedException()).get();
}
}
Expand Up @@ -322,7 +322,7 @@ public void run() {
assertThat(maxAliveSessions).isAtMost(maxSessions);
}
stopMaintenance.set(true);
pool.closeAsync().get();
pool.closeAsync(new SpannerImpl.ClosedException()).get();
Exception e = getFailedError();
if (e != null) {
throw e;
Expand Down
Expand Up @@ -42,6 +42,7 @@
import com.google.cloud.spanner.SessionPool.Clock;
import com.google.cloud.spanner.SessionPool.PooledSession;
import com.google.cloud.spanner.SessionPool.SessionConsumerImpl;
import com.google.cloud.spanner.SpannerImpl.ClosedException;
import com.google.cloud.spanner.TransactionRunner.TransactionCallable;
import com.google.cloud.spanner.TransactionRunnerImpl.TransactionContextImpl;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
Expand All @@ -58,6 +59,8 @@
import com.google.spanner.v1.RollbackRequest;
import io.opencensus.metrics.LabelValue;
import io.opencensus.metrics.MetricRegistry;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -165,6 +168,26 @@ public void run() {
Mockito.anyInt(), Mockito.anyBoolean(), any(SessionConsumer.class));
}

@Test
public void testClosedPoolIncludesClosedException() {
pool = createPool();
assertThat(pool.isValid()).isTrue();
closePoolWithStacktrace();
try {
pool.getReadSession();
fail("missing expected exception");
} catch (IllegalStateException e) {
assertThat(e.getCause()).isInstanceOf(ClosedException.class);
StringWriter sw = new StringWriter();
e.getCause().printStackTrace(new PrintWriter(sw));
assertThat(sw.toString()).contains("closePoolWithStacktrace");
}
}

private void closePoolWithStacktrace() {
pool.closeAsync(new SpannerImpl.ClosedException());
}

@Test
public void sessionCreation() {
setupMockSessionCreation();
Expand Down Expand Up @@ -203,7 +226,7 @@ public void poolLifo() {
public void poolClosure() throws Exception {
setupMockSessionCreation();
pool = createPool();
pool.closeAsync().get(5L, TimeUnit.SECONDS);
pool.closeAsync(new SpannerImpl.ClosedException()).get(5L, TimeUnit.SECONDS);
}

@Test
Expand Down Expand Up @@ -237,7 +260,7 @@ public void run() {
// Clear the leaked exception to suppress logging of expected exceptions.
leakedSession.clearLeakedException();
session1.close();
pool.closeAsync().get(5L, TimeUnit.SECONDS);
pool.closeAsync(new SpannerImpl.ClosedException()).get(5L, TimeUnit.SECONDS);
verify(mockSession1).asyncClose();
verify(mockSession2).asyncClose();
}
Expand All @@ -260,7 +283,7 @@ public void run() {
}
})
.start();
pool.closeAsync().get(5L, TimeUnit.SECONDS);
pool.closeAsync(new SpannerImpl.ClosedException()).get(5L, TimeUnit.SECONDS);
stop.set(true);
}

Expand Down Expand Up @@ -316,7 +339,7 @@ public Void call() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
getSessionAsync(latch, failed);
insideCreation.await();
pool.closeAsync();
pool.closeAsync(new SpannerImpl.ClosedException());
releaseCreation.countDown();
latch.await();
assertThat(failed.get()).isTrue();
Expand Down Expand Up @@ -374,7 +397,7 @@ public Void call() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
getReadWriteSessionAsync(latch, failed);
insideCreation.await();
pool.closeAsync();
pool.closeAsync(new SpannerImpl.ClosedException());
releaseCreation.countDown();
latch.await();
assertThat(failed.get()).isTrue();
Expand Down Expand Up @@ -411,7 +434,7 @@ public Void call() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
getSessionAsync(latch, failed);
insideCreation.await();
ListenableFuture<Void> f = pool.closeAsync();
ListenableFuture<Void> f = pool.closeAsync(new SpannerImpl.ClosedException());
releaseCreation.countDown();
f.get();
assertThat(f.isDone()).isTrue();
Expand Down Expand Up @@ -456,7 +479,7 @@ public Session answer(InvocationOnMock invocation) throws Throwable {
CountDownLatch latch = new CountDownLatch(1);
getReadWriteSessionAsync(latch, failed);
insidePrepare.await();
ListenableFuture<Void> f = pool.closeAsync();
ListenableFuture<Void> f = pool.closeAsync(new SpannerImpl.ClosedException());
releasePrepare.countDown();
f.get();
assertThat(f.isDone()).isTrue();
Expand Down Expand Up @@ -487,7 +510,7 @@ public void run() {
PooledSession leakedSession = pool.getReadSession();
// Suppress expected leakedSession warning.
leakedSession.clearLeakedException();
pool.closeAsync();
pool.closeAsync(new SpannerImpl.ClosedException());
expectedException.expect(IllegalStateException.class);
pool.getReadSession();
}
Expand Down Expand Up @@ -925,7 +948,7 @@ public void run() {
runMaintainanceLoop(clock, pool, cycles);
// We will still close 2 sessions since at any point in time only 1 session was in use.
assertThat(pool.numIdleSessionsRemoved()).isEqualTo(2L);
pool.closeAsync().get(5L, TimeUnit.SECONDS);
pool.closeAsync(new SpannerImpl.ClosedException()).get(5L, TimeUnit.SECONDS);
}

@Test
Expand Down Expand Up @@ -976,7 +999,7 @@ public void run() {
// The session pool only keeps MinSessions + MaxIdleSessions alive.
verify(session, times(options.getMinSessions() + options.getMaxIdleSessions()))
.singleUse(any(TimestampBound.class));
pool.closeAsync().get(5L, TimeUnit.SECONDS);
pool.closeAsync(new SpannerImpl.ClosedException()).get(5L, TimeUnit.SECONDS);
}

@Test
Expand Down Expand Up @@ -1061,7 +1084,7 @@ public void run() {
assertThat(pool.getNumberOfAvailableWritePreparedSessions())
.isEqualTo((int) Math.ceil(options.getMinSessions() * options.getWriteSessionsFraction()));

pool.closeAsync().get(5L, TimeUnit.SECONDS);
pool.closeAsync(new SpannerImpl.ClosedException()).get(5L, TimeUnit.SECONDS);
}

private void waitForExpectedSessionPool(int expectedSessions, float writeFraction)
Expand Down Expand Up @@ -1447,7 +1470,7 @@ public Integer run(TransactionContext transaction) throws Exception {
.isTrue();
}
}
pool.closeAsync();
pool.closeAsync(new SpannerImpl.ClosedException());
}
}
}
Expand Down
Expand Up @@ -27,8 +27,11 @@
import com.google.cloud.NoCredentials;
import com.google.cloud.ServiceRpc;
import com.google.cloud.grpc.GrpcTransportOptions;
import com.google.cloud.spanner.SpannerImpl.ClosedException;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.spanner.v1.ExecuteSqlRequest.QueryOptions;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -222,13 +225,36 @@ public void testClientId() {

// Get a database client for the same database as the first database. As this goes through a
// different Spanner instance with potentially different options, it will get a different
// client
// id.
// client id.
DatabaseClientImpl databaseClient3 = (DatabaseClientImpl) spanner.getDatabaseClient(db);
assertThat(databaseClient3.clientId).isEqualTo("client-2");
}
}

@Test
public void testClosedException() {
Spanner spanner = new SpannerImpl(rpc, spannerOptions);
assertThat(spanner.isClosed()).isFalse();
// Close the Spanner instance in a different method so we can actually verify that the entire
// stacktrace of the method that closed the instance is included in the exception that will be
// thrown by the instance after it has been closed.
closeSpannerAndIncludeStacktrace(spanner);
assertThat(spanner.isClosed()).isTrue();
try {
spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
fail("missing expected exception");
} catch (IllegalStateException e) {
assertThat(e.getCause()).isInstanceOf(ClosedException.class);
StringWriter sw = new StringWriter();
e.getCause().printStackTrace(new PrintWriter(sw));
assertThat(sw.toString()).contains("closeSpannerAndIncludeStacktrace");
}
}

private void closeSpannerAndIncludeStacktrace(Spanner spanner) {
spanner.close();
}

private SpannerOptions createSpannerOptions() {
return SpannerOptions.newBuilder()
.setProjectId("[PROJECT]")
Expand Down

0 comments on commit 50cb174

Please sign in to comment.