Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: closes pool maintainer on invalidation #784

Merged
merged 5 commits into from Jan 17, 2021
Merged
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -1590,6 +1590,7 @@ final class PoolMaintainer {
Instant lastResetTime = Instant.ofEpochMilli(0);
int numSessionsToClose = 0;
int sessionsToClosePerLoop = 0;
boolean closed = false;

@GuardedBy("lock")
ScheduledFuture<?> scheduledFuture;
Expand All @@ -1616,17 +1617,24 @@ public void run() {

void close() {
synchronized (lock) {
closed = true;
scheduledFuture.cancel(false);
if (!running) {
decrementPendingClosures(1);
}
thiagotnunes marked this conversation as resolved.
Show resolved Hide resolved
}
}

boolean isClosed() {
synchronized (lock) {
return closed;
}
}

// Does various pool maintenance activities.
void maintainPool() {
synchronized (lock) {
if (isClosed()) {
if (SessionPool.this.isClosed()) {
return;
}
running = true;
Expand All @@ -1638,7 +1646,7 @@ void maintainPool() {
replenishPool();
synchronized (lock) {
running = false;
if (isClosed()) {
if (SessionPool.this.isClosed()) {
decrementPendingClosures(1);
}
}
Expand Down Expand Up @@ -2121,6 +2129,7 @@ private void handleCreateSessionsFailure(SpannerException e, int count) {
}
if (isDatabaseOrInstanceNotFound(e)) {
setResourceNotFoundException((ResourceNotFoundException) e);
poolMaintainer.close();
}
}
}
Expand Down Expand Up @@ -2156,10 +2165,14 @@ ListenableFuture<Void> closeAsync(ClosedException closedException) {
}
closureFuture = SettableFuture.create();
retFuture = closureFuture;
pendingClosure =
totalSessions() + numSessionsBeingCreated + 1 /* For pool maintenance thread */;

poolMaintainer.close();
pendingClosure = totalSessions() + numSessionsBeingCreated;

if (!poolMaintainer.isClosed()) {
pendingClosure += 1; // For pool maintenance thread
poolMaintainer.close();
}

sessions.clear();
for (PooledSessionFuture session : checkedOutSessions) {
if (session.leakedException != null) {
Expand All @@ -2176,6 +2189,12 @@ ListenableFuture<Void> closeAsync(ClosedException closedException) {
}
}
}

// Nothing to be closed, mark as complete
if (pendingClosure == 0) {
thiagotnunes marked this conversation as resolved.
Show resolved Hide resolved
closureFuture.set(null);
}

retFuture.addListener(
new Runnable() {
@Override
Expand Down