Skip to content

Commit

Permalink
fix: closes pool maintainer on invalidation (#784)
Browse files Browse the repository at this point in the history
* fix: closes pool maintainer on invalidation

When the session pool is marked as invalid, we immediately close the
pool maintainer in order to keep it from trying to replinish the pool.
This way we prevent useless batch create sessions requests.

* fix: checks for pool maintainer closed status

When closing the pool, only waits for the pool maintainer to close if it
has not been closed before.

* fix: only closes pool maintainer if not closed

Makes sure to close the pool maintainer only if it has not been closed
already. Also before returning to the caller, makes sure to mark the
closing as complete if there are no pending closures.

* fix: avoids npe when closing pool maintainer

* fix: checks pool maintainer is not closed on close

Verifies that the pool maintainer is not closed before closing it. Also
moves the check of pendingClosures into the synchronized block to make
sure no stale reads are made.
  • Loading branch information
thiagotnunes committed Jan 17, 2021
1 parent c7dc6e6 commit d122ed9
Showing 1 changed file with 29 additions and 8 deletions.
Expand Up @@ -1595,6 +1595,7 @@ final class PoolMaintainer {
Instant lastResetTime = Instant.ofEpochMilli(0);
int numSessionsToClose = 0;
int sessionsToClosePerLoop = 0;
boolean closed = false;

@GuardedBy("lock")
ScheduledFuture<?> scheduledFuture;
Expand All @@ -1621,17 +1622,26 @@ public void run() {

void close() {
synchronized (lock) {
scheduledFuture.cancel(false);
if (!running) {
decrementPendingClosures(1);
if (!closed) {
closed = true;
scheduledFuture.cancel(false);
if (!running) {
decrementPendingClosures(1);
}
}
}
}

boolean isClosed() {
synchronized (lock) {
return closed;
}
}

// Does various pool maintenance activities.
void maintainPool() {
synchronized (lock) {
if (isClosed()) {
if (SessionPool.this.isClosed()) {
return;
}
running = true;
Expand All @@ -1643,7 +1653,7 @@ void maintainPool() {
replenishPool();
synchronized (lock) {
running = false;
if (isClosed()) {
if (SessionPool.this.isClosed()) {
decrementPendingClosures(1);
}
}
Expand Down Expand Up @@ -2126,6 +2136,7 @@ private void handleCreateSessionsFailure(SpannerException e, int count) {
}
if (isDatabaseOrInstanceNotFound(e)) {
setResourceNotFoundException((ResourceNotFoundException) e);
poolMaintainer.close();
}
}
}
Expand Down Expand Up @@ -2161,10 +2172,14 @@ ListenableFuture<Void> closeAsync(ClosedException closedException) {
}
closureFuture = SettableFuture.create();
retFuture = closureFuture;
pendingClosure =
totalSessions() + numSessionsBeingCreated + 1 /* For pool maintenance thread */;

poolMaintainer.close();
pendingClosure = totalSessions() + numSessionsBeingCreated;

if (!poolMaintainer.isClosed()) {
pendingClosure += 1; // For pool maintenance thread
poolMaintainer.close();
}

sessions.clear();
for (PooledSessionFuture session : checkedOutSessions) {
if (session.leakedException != null) {
Expand All @@ -2180,7 +2195,13 @@ ListenableFuture<Void> closeAsync(ClosedException closedException) {
closeSessionAsync(session);
}
}

// Nothing to be closed, mark as complete
if (pendingClosure == 0) {
closureFuture.set(null);
}
}

retFuture.addListener(
new Runnable() {
@Override
Expand Down

0 comments on commit d122ed9

Please sign in to comment.