Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
feat: optimize maintainer to let sessions be GC'ed instead of deleted (
…#135)

* perf: increase sessions in the pool in batches

When more sessions are requested by the user application than are available in the session pool,
the session pool will now create new sessions in batches instead of in steps of 1. This reduces
the number of RPCs needed to serve a burst of requests.

A benchmark for the session pool has also been added to be able to compare performance and the
number of RPCs needed before and after this change. This benchmark can also be used for future
changes to verify that the change does not deteriorate performance or increase the number of
RPCs needed.
  • Loading branch information
olavloite committed Apr 22, 2020
1 parent fe434ff commit d65747c
Show file tree
Hide file tree
Showing 10 changed files with 989 additions and 199 deletions.
5 changes: 4 additions & 1 deletion google-cloud-spanner/pom.xml
Expand Up @@ -336,6 +336,9 @@
</profile>
<profile>
<id>benchmark</id>
<properties>
<benchmark.name></benchmark.name>
</properties>
<build>
<plugins>
<plugin>
Expand All @@ -355,7 +358,7 @@
<argument>-classpath</argument>
<classpath />
<argument>org.openjdk.jmh.Main</argument>
<argument>.*</argument>
<argument>${benchmark.name}</argument>
</arguments>
</configuration>
</execution>
Expand Down
Expand Up @@ -69,6 +69,7 @@
import io.opencensus.trace.Status;
import io.opencensus.trace.Tracer;
import io.opencensus.trace.Tracing;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
Expand Down Expand Up @@ -848,7 +849,7 @@ private void keepAlive() {
}
}

private void markUsed() {
void markUsed() {
lastUseTime = clock.instant();
}

Expand Down Expand Up @@ -929,24 +930,30 @@ private SessionOrError pollUninterruptiblyWithTimeout(long timeoutMillis) {
}
}

// Background task to maintain the pool. It closes idle sessions, keeps alive sessions that have
// not been used for a user configured time and creates session if needed to bring pool up to
// minimum required sessions. We keep track of the number of concurrent sessions being used.
// The maximum value of that over a window (10 minutes) tells us how many sessions we need in the
// pool. We close the remaining sessions. To prevent bursty traffic, we smear this out over the
// window length. We also smear out the keep alive traffic over the keep alive period.
/**
* Background task to maintain the pool. Tasks:
*
* <ul>
* <li>Removes idle sessions from the pool. Sessions that go above MinSessions that have not
* been used for the last 55 minutes will be removed from the pool. These will automatically
* be garbage collected by the backend.
* <li>Keeps alive sessions that have not been used for a user configured time in order to keep
* MinSessions sessions alive in the pool at any time. The keep-alive traffic is smeared out
* over a window of 10 minutes to avoid bursty traffic.
* </ul>
*/
final class PoolMaintainer {
// Length of the window in millis over which we keep track of maximum number of concurrent
// sessions in use.
private final Duration windowLength = Duration.ofMillis(TimeUnit.MINUTES.toMillis(10));
// Frequency of the timer loop.
@VisibleForTesting static final long LOOP_FREQUENCY = 10 * 1000L;
@VisibleForTesting final long loopFrequency = options.getLoopFrequency();
// Number of loop iterations in which we need to to close all the sessions waiting for closure.
@VisibleForTesting final long numClosureCycles = windowLength.toMillis() / LOOP_FREQUENCY;
@VisibleForTesting final long numClosureCycles = windowLength.toMillis() / loopFrequency;
private final Duration keepAliveMilis =
Duration.ofMillis(TimeUnit.MINUTES.toMillis(options.getKeepAliveIntervalMinutes()));
// Number of loop iterations in which we need to keep alive all the sessions
@VisibleForTesting final long numKeepAliveCycles = keepAliveMilis.toMillis() / LOOP_FREQUENCY;
@VisibleForTesting final long numKeepAliveCycles = keepAliveMilis.toMillis() / loopFrequency;

Instant lastResetTime = Instant.ofEpochMilli(0);
int numSessionsToClose = 0;
Expand All @@ -969,8 +976,8 @@ public void run() {
maintainPool();
}
},
LOOP_FREQUENCY,
LOOP_FREQUENCY,
loopFrequency,
loopFrequency,
TimeUnit.MILLISECONDS);
}
}
Expand All @@ -993,7 +1000,7 @@ void maintainPool() {
running = true;
}
Instant currTime = clock.instant();
closeIdleSessions(currTime);
removeIdleSessions(currTime);
// Now go over all the remaining sessions and see if they need to be kept alive explicitly.
keepAliveSessions(currTime);
replenishPool();
Expand All @@ -1005,46 +1012,43 @@ void maintainPool() {
}
}

private void closeIdleSessions(Instant currTime) {
LinkedList<PooledSession> sessionsToClose = new LinkedList<>();
private void removeIdleSessions(Instant currTime) {
synchronized (lock) {
// Every ten minutes figure out how many sessions need to be closed then close them over
// next ten minutes.
if (currTime.isAfter(lastResetTime.plus(windowLength))) {
int sessionsToKeep =
Math.max(options.getMinSessions(), maxSessionsInUse + options.getMaxIdleSessions());
numSessionsToClose = totalSessions() - sessionsToKeep;
sessionsToClosePerLoop = (int) Math.ceil((double) numSessionsToClose / numClosureCycles);
maxSessionsInUse = 0;
lastResetTime = currTime;
}
if (numSessionsToClose > 0) {
while (sessionsToClose.size() < Math.min(numSessionsToClose, sessionsToClosePerLoop)) {
PooledSession sess =
readSessions.size() > 0 ? readSessions.poll() : writePreparedSessions.poll();
if (sess != null) {
if (sess.state != SessionState.CLOSING) {
sess.markClosing();
sessionsToClose.add(sess);
// Determine the minimum last use time for a session to be deemed to still be alive. Remove
// all sessions that have a lastUseTime before that time, unless it would cause us to go
// below MinSessions. Prefer to remove read sessions above write-prepared sessions.
Instant minLastUseTime = currTime.minus(options.getRemoveInactiveSessionAfter());
for (Iterator<PooledSession> iterator :
Arrays.asList(
readSessions.descendingIterator(), writePreparedSessions.descendingIterator())) {
while (iterator.hasNext()) {
PooledSession session = iterator.next();
if (session.lastUseTime.isBefore(minLastUseTime)) {
if (session.state != SessionState.CLOSING) {
removeFromPool(session);
iterator.remove();
}
} else {
break;
}
}
numSessionsToClose -= sessionsToClose.size();
}
}
for (PooledSession sess : sessionsToClose) {
logger.log(Level.FINE, "Closing session {0}", sess.getName());
closeSessionAsync(sess);
}
}

private void keepAliveSessions(Instant currTime) {
long numSessionsToKeepAlive = 0;
synchronized (lock) {
if (numSessionsInUse >= (options.getMinSessions() + options.getMaxIdleSessions())) {
// At least MinSessions are in use, so we don't have to ping any sessions.
return;
}
// In each cycle only keep alive a subset of sessions to prevent burst of traffic.
numSessionsToKeepAlive = (long) Math.ceil((double) totalSessions() / numKeepAliveCycles);
numSessionsToKeepAlive =
(long)
Math.ceil(
(double)
((options.getMinSessions() + options.getMaxIdleSessions())
- numSessionsInUse)
/ numKeepAliveCycles);
}
// Now go over all the remaining sessions and see if they need to be kept alive explicitly.
Instant keepAliveThreshold = currTime.minus(keepAliveMilis);
Expand All @@ -1053,9 +1057,11 @@ private void keepAliveSessions(Instant currTime) {
while (numSessionsToKeepAlive > 0) {
PooledSession sessionToKeepAlive = null;
synchronized (lock) {
sessionToKeepAlive = findSessionToKeepAlive(readSessions, keepAliveThreshold);
sessionToKeepAlive = findSessionToKeepAlive(readSessions, keepAliveThreshold, 0);
if (sessionToKeepAlive == null) {
sessionToKeepAlive = findSessionToKeepAlive(writePreparedSessions, keepAliveThreshold);
sessionToKeepAlive =
findSessionToKeepAlive(
writePreparedSessions, keepAliveThreshold, readSessions.size());
}
}
if (sessionToKeepAlive == null) {
Expand Down Expand Up @@ -1137,13 +1143,18 @@ private static enum Position {
@GuardedBy("lock")
private long numSessionsReleased = 0;

@GuardedBy("lock")
private long numIdleSessionsRemoved = 0;

private AtomicLong numWaiterTimeouts = new AtomicLong();

@GuardedBy("lock")
private final Set<PooledSession> allSessions = new HashSet<>();

private final SessionConsumer sessionConsumer = new SessionConsumerImpl();

@VisibleForTesting Function<PooledSession, Void> idleSessionRemovedListener;

/**
* Create a session pool with the given options and for the given database. It will also start
* eagerly creating sessions if {@link SessionPoolOptions#getMinSessions()} is greater than 0.
Expand Down Expand Up @@ -1232,6 +1243,28 @@ private SessionPool(
this.initMetricsCollection(metricRegistry, labelValues);
}

@VisibleForTesting
void removeFromPool(PooledSession session) {
synchronized (lock) {
if (isClosed()) {
decrementPendingClosures(1);
return;
}
session.markClosing();
allSessions.remove(session);
numIdleSessionsRemoved++;
if (idleSessionRemovedListener != null) {
idleSessionRemovedListener.apply(session);
}
}
}

long numIdleSessionsRemoved() {
synchronized (lock) {
return numIdleSessionsRemoved;
}
}

@VisibleForTesting
int getNumberOfAvailableWritePreparedSessions() {
synchronized (lock) {
Expand Down Expand Up @@ -1313,14 +1346,18 @@ private void invalidateSession(PooledSession session) {
}

private PooledSession findSessionToKeepAlive(
Queue<PooledSession> queue, Instant keepAliveThreshold) {
Queue<PooledSession> queue, Instant keepAliveThreshold, int numAlreadyChecked) {
int numChecked = 0;
Iterator<PooledSession> iterator = queue.iterator();
while (iterator.hasNext()) {
while (iterator.hasNext()
&& (numChecked + numAlreadyChecked)
< (options.getMinSessions() + options.getMaxIdleSessions() - numSessionsInUse)) {
PooledSession session = iterator.next();
if (session.lastUseTime.isBefore(keepAliveThreshold)) {
iterator.remove();
return session;
}
numChecked++;
}
return null;
}
Expand Down
Expand Up @@ -18,6 +18,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.threeten.bp.Duration;

/** Options for the session pool used by {@code DatabaseClient}. */
public class SessionPoolOptions {
Expand All @@ -32,7 +33,9 @@ public class SessionPoolOptions {
private final int maxIdleSessions;
private final float writeSessionsFraction;
private final ActionOnExhaustion actionOnExhaustion;
private final long loopFrequency;
private final int keepAliveIntervalMinutes;
private final Duration removeInactiveSessionAfter;
private final ActionOnSessionNotFound actionOnSessionNotFound;
private final long initialWaitForSessionTimeoutMillis;

Expand All @@ -48,7 +51,9 @@ private SessionPoolOptions(Builder builder) {
this.actionOnExhaustion = builder.actionOnExhaustion;
this.actionOnSessionNotFound = builder.actionOnSessionNotFound;
this.initialWaitForSessionTimeoutMillis = builder.initialWaitForSessionTimeoutMillis;
this.loopFrequency = builder.loopFrequency;
this.keepAliveIntervalMinutes = builder.keepAliveIntervalMinutes;
this.removeInactiveSessionAfter = builder.removeInactiveSessionAfter;
}

public int getMinSessions() {
Expand All @@ -71,10 +76,18 @@ public float getWriteSessionsFraction() {
return writeSessionsFraction;
}

long getLoopFrequency() {
return loopFrequency;
}

public int getKeepAliveIntervalMinutes() {
return keepAliveIntervalMinutes;
}

public Duration getRemoveInactiveSessionAfter() {
return removeInactiveSessionAfter;
}

public boolean isFailIfPoolExhausted() {
return actionOnExhaustion == ActionOnExhaustion.FAIL;
}
Expand Down Expand Up @@ -118,7 +131,9 @@ public static class Builder {
private ActionOnExhaustion actionOnExhaustion = DEFAULT_ACTION;
private long initialWaitForSessionTimeoutMillis = 30_000L;
private ActionOnSessionNotFound actionOnSessionNotFound = ActionOnSessionNotFound.RETRY;
private long loopFrequency = 10 * 1000L;
private int keepAliveIntervalMinutes = 30;
private Duration removeInactiveSessionAfter = Duration.ofMinutes(55L);

/**
* Minimum number of sessions that this pool will always maintain. These will be created eagerly
Expand Down Expand Up @@ -165,6 +180,16 @@ public Builder setMaxIdleSessions(int maxIdleSessions) {
return this;
}

Builder setLoopFrequency(long loopFrequency) {
this.loopFrequency = loopFrequency;
return this;
}

public Builder setRemoveInactiveSessionAfter(Duration duration) {
this.removeInactiveSessionAfter = duration;
return this;
}

/**
* How frequently to keep alive idle sessions. This should be less than 60 since an idle session
* is automatically closed after 60 minutes. Sessions will be kept alive by sending a dummy
Expand Down
Expand Up @@ -71,7 +71,7 @@ SessionImpl mockSession() {
void runMaintainanceLoop(FakeClock clock, SessionPool pool, long numCycles) {
for (int i = 0; i < numCycles; i++) {
pool.poolMaintainer.maintainPool();
clock.currentTimeMillis += SessionPool.PoolMaintainer.LOOP_FREQUENCY;
clock.currentTimeMillis += pool.poolMaintainer.loopFrequency;
}
}

Expand Down

0 comments on commit d65747c

Please sign in to comment.