From d65747cbc704508f6f1bcef6eea53aa411d42ee2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Knut=20Olav=20L=C3=B8ite?= Date: Wed, 22 Apr 2020 03:46:46 +0200 Subject: [PATCH] feat: optimize maintainer to let sessions be GC'ed instead of deleted (#135) * perf: increase sessions in the pool in batches When more sessions are requested by the user application than are available in the session pool, the session pool will now create new sessions in batches instead of in steps of 1. This reduces the number of RPCs needed to serve a burst of requests. A benchmark for the session pool has also been added to be able to compare performance and the number of RPCs needed before and after this change. This benchmark can also be used for future changes to verify that the change does not deteriorate performance or increase the number of RPCs needed. --- google-cloud-spanner/pom.xml | 5 +- .../com/google/cloud/spanner/SessionPool.java | 127 ++++--- .../cloud/spanner/SessionPoolOptions.java | 25 ++ .../cloud/spanner/BaseSessionPoolTest.java | 2 +- .../cloud/spanner/SessionPoolBenchmark.java | 151 ++------- .../SessionPoolMaintainerBenchmark.java | 268 +++++++++++++++ .../spanner/SessionPoolMaintainerTest.java | 315 ++++++++++++++++++ .../cloud/spanner/SessionPoolStressTest.java | 18 +- .../google/cloud/spanner/SessionPoolTest.java | 138 ++++++-- .../spanner/StandardBenchmarkMockServer.java | 139 ++++++++ 10 files changed, 989 insertions(+), 199 deletions(-) create mode 100644 google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolMaintainerBenchmark.java create mode 100644 google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolMaintainerTest.java create mode 100644 google-cloud-spanner/src/test/java/com/google/cloud/spanner/StandardBenchmarkMockServer.java diff --git a/google-cloud-spanner/pom.xml b/google-cloud-spanner/pom.xml index d048e1d848..15307569a0 100644 --- a/google-cloud-spanner/pom.xml +++ b/google-cloud-spanner/pom.xml @@ -336,6 +336,9 @@ benchmark + + + @@ -355,7 +358,7 @@ -classpath org.openjdk.jmh.Main - .* + ${benchmark.name} diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java index 0582f4b1f5..8d731f191c 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java @@ -69,6 +69,7 @@ import io.opencensus.trace.Status; import io.opencensus.trace.Tracer; import io.opencensus.trace.Tracing; +import java.util.Arrays; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; @@ -848,7 +849,7 @@ private void keepAlive() { } } - private void markUsed() { + void markUsed() { lastUseTime = clock.instant(); } @@ -929,24 +930,30 @@ private SessionOrError pollUninterruptiblyWithTimeout(long timeoutMillis) { } } - // Background task to maintain the pool. It closes idle sessions, keeps alive sessions that have - // not been used for a user configured time and creates session if needed to bring pool up to - // minimum required sessions. We keep track of the number of concurrent sessions being used. - // The maximum value of that over a window (10 minutes) tells us how many sessions we need in the - // pool. We close the remaining sessions. To prevent bursty traffic, we smear this out over the - // window length. We also smear out the keep alive traffic over the keep alive period. + /** + * Background task to maintain the pool. Tasks: + * + *
    + *
  • Removes idle sessions from the pool. Sessions that go above MinSessions that have not + * been used for the last 55 minutes will be removed from the pool. These will automatically + * be garbage collected by the backend. + *
  • Keeps alive sessions that have not been used for a user configured time in order to keep + * MinSessions sessions alive in the pool at any time. The keep-alive traffic is smeared out + * over a window of 10 minutes to avoid bursty traffic. + *
+ */ final class PoolMaintainer { // Length of the window in millis over which we keep track of maximum number of concurrent // sessions in use. private final Duration windowLength = Duration.ofMillis(TimeUnit.MINUTES.toMillis(10)); // Frequency of the timer loop. - @VisibleForTesting static final long LOOP_FREQUENCY = 10 * 1000L; + @VisibleForTesting final long loopFrequency = options.getLoopFrequency(); // Number of loop iterations in which we need to to close all the sessions waiting for closure. - @VisibleForTesting final long numClosureCycles = windowLength.toMillis() / LOOP_FREQUENCY; + @VisibleForTesting final long numClosureCycles = windowLength.toMillis() / loopFrequency; private final Duration keepAliveMilis = Duration.ofMillis(TimeUnit.MINUTES.toMillis(options.getKeepAliveIntervalMinutes())); // Number of loop iterations in which we need to keep alive all the sessions - @VisibleForTesting final long numKeepAliveCycles = keepAliveMilis.toMillis() / LOOP_FREQUENCY; + @VisibleForTesting final long numKeepAliveCycles = keepAliveMilis.toMillis() / loopFrequency; Instant lastResetTime = Instant.ofEpochMilli(0); int numSessionsToClose = 0; @@ -969,8 +976,8 @@ public void run() { maintainPool(); } }, - LOOP_FREQUENCY, - LOOP_FREQUENCY, + loopFrequency, + loopFrequency, TimeUnit.MILLISECONDS); } } @@ -993,7 +1000,7 @@ void maintainPool() { running = true; } Instant currTime = clock.instant(); - closeIdleSessions(currTime); + removeIdleSessions(currTime); // Now go over all the remaining sessions and see if they need to be kept alive explicitly. keepAliveSessions(currTime); replenishPool(); @@ -1005,46 +1012,43 @@ void maintainPool() { } } - private void closeIdleSessions(Instant currTime) { - LinkedList sessionsToClose = new LinkedList<>(); + private void removeIdleSessions(Instant currTime) { synchronized (lock) { - // Every ten minutes figure out how many sessions need to be closed then close them over - // next ten minutes. - if (currTime.isAfter(lastResetTime.plus(windowLength))) { - int sessionsToKeep = - Math.max(options.getMinSessions(), maxSessionsInUse + options.getMaxIdleSessions()); - numSessionsToClose = totalSessions() - sessionsToKeep; - sessionsToClosePerLoop = (int) Math.ceil((double) numSessionsToClose / numClosureCycles); - maxSessionsInUse = 0; - lastResetTime = currTime; - } - if (numSessionsToClose > 0) { - while (sessionsToClose.size() < Math.min(numSessionsToClose, sessionsToClosePerLoop)) { - PooledSession sess = - readSessions.size() > 0 ? readSessions.poll() : writePreparedSessions.poll(); - if (sess != null) { - if (sess.state != SessionState.CLOSING) { - sess.markClosing(); - sessionsToClose.add(sess); + // Determine the minimum last use time for a session to be deemed to still be alive. Remove + // all sessions that have a lastUseTime before that time, unless it would cause us to go + // below MinSessions. Prefer to remove read sessions above write-prepared sessions. + Instant minLastUseTime = currTime.minus(options.getRemoveInactiveSessionAfter()); + for (Iterator iterator : + Arrays.asList( + readSessions.descendingIterator(), writePreparedSessions.descendingIterator())) { + while (iterator.hasNext()) { + PooledSession session = iterator.next(); + if (session.lastUseTime.isBefore(minLastUseTime)) { + if (session.state != SessionState.CLOSING) { + removeFromPool(session); + iterator.remove(); } - } else { - break; } } - numSessionsToClose -= sessionsToClose.size(); } } - for (PooledSession sess : sessionsToClose) { - logger.log(Level.FINE, "Closing session {0}", sess.getName()); - closeSessionAsync(sess); - } } private void keepAliveSessions(Instant currTime) { long numSessionsToKeepAlive = 0; synchronized (lock) { + if (numSessionsInUse >= (options.getMinSessions() + options.getMaxIdleSessions())) { + // At least MinSessions are in use, so we don't have to ping any sessions. + return; + } // In each cycle only keep alive a subset of sessions to prevent burst of traffic. - numSessionsToKeepAlive = (long) Math.ceil((double) totalSessions() / numKeepAliveCycles); + numSessionsToKeepAlive = + (long) + Math.ceil( + (double) + ((options.getMinSessions() + options.getMaxIdleSessions()) + - numSessionsInUse) + / numKeepAliveCycles); } // Now go over all the remaining sessions and see if they need to be kept alive explicitly. Instant keepAliveThreshold = currTime.minus(keepAliveMilis); @@ -1053,9 +1057,11 @@ private void keepAliveSessions(Instant currTime) { while (numSessionsToKeepAlive > 0) { PooledSession sessionToKeepAlive = null; synchronized (lock) { - sessionToKeepAlive = findSessionToKeepAlive(readSessions, keepAliveThreshold); + sessionToKeepAlive = findSessionToKeepAlive(readSessions, keepAliveThreshold, 0); if (sessionToKeepAlive == null) { - sessionToKeepAlive = findSessionToKeepAlive(writePreparedSessions, keepAliveThreshold); + sessionToKeepAlive = + findSessionToKeepAlive( + writePreparedSessions, keepAliveThreshold, readSessions.size()); } } if (sessionToKeepAlive == null) { @@ -1137,6 +1143,9 @@ private static enum Position { @GuardedBy("lock") private long numSessionsReleased = 0; + @GuardedBy("lock") + private long numIdleSessionsRemoved = 0; + private AtomicLong numWaiterTimeouts = new AtomicLong(); @GuardedBy("lock") @@ -1144,6 +1153,8 @@ private static enum Position { private final SessionConsumer sessionConsumer = new SessionConsumerImpl(); + @VisibleForTesting Function idleSessionRemovedListener; + /** * Create a session pool with the given options and for the given database. It will also start * eagerly creating sessions if {@link SessionPoolOptions#getMinSessions()} is greater than 0. @@ -1232,6 +1243,28 @@ private SessionPool( this.initMetricsCollection(metricRegistry, labelValues); } + @VisibleForTesting + void removeFromPool(PooledSession session) { + synchronized (lock) { + if (isClosed()) { + decrementPendingClosures(1); + return; + } + session.markClosing(); + allSessions.remove(session); + numIdleSessionsRemoved++; + if (idleSessionRemovedListener != null) { + idleSessionRemovedListener.apply(session); + } + } + } + + long numIdleSessionsRemoved() { + synchronized (lock) { + return numIdleSessionsRemoved; + } + } + @VisibleForTesting int getNumberOfAvailableWritePreparedSessions() { synchronized (lock) { @@ -1313,14 +1346,18 @@ private void invalidateSession(PooledSession session) { } private PooledSession findSessionToKeepAlive( - Queue queue, Instant keepAliveThreshold) { + Queue queue, Instant keepAliveThreshold, int numAlreadyChecked) { + int numChecked = 0; Iterator iterator = queue.iterator(); - while (iterator.hasNext()) { + while (iterator.hasNext() + && (numChecked + numAlreadyChecked) + < (options.getMinSessions() + options.getMaxIdleSessions() - numSessionsInUse)) { PooledSession session = iterator.next(); if (session.lastUseTime.isBefore(keepAliveThreshold)) { iterator.remove(); return session; } + numChecked++; } return null; } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java index 31bebac32d..17295a38ab 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java @@ -18,6 +18,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import org.threeten.bp.Duration; /** Options for the session pool used by {@code DatabaseClient}. */ public class SessionPoolOptions { @@ -32,7 +33,9 @@ public class SessionPoolOptions { private final int maxIdleSessions; private final float writeSessionsFraction; private final ActionOnExhaustion actionOnExhaustion; + private final long loopFrequency; private final int keepAliveIntervalMinutes; + private final Duration removeInactiveSessionAfter; private final ActionOnSessionNotFound actionOnSessionNotFound; private final long initialWaitForSessionTimeoutMillis; @@ -48,7 +51,9 @@ private SessionPoolOptions(Builder builder) { this.actionOnExhaustion = builder.actionOnExhaustion; this.actionOnSessionNotFound = builder.actionOnSessionNotFound; this.initialWaitForSessionTimeoutMillis = builder.initialWaitForSessionTimeoutMillis; + this.loopFrequency = builder.loopFrequency; this.keepAliveIntervalMinutes = builder.keepAliveIntervalMinutes; + this.removeInactiveSessionAfter = builder.removeInactiveSessionAfter; } public int getMinSessions() { @@ -71,10 +76,18 @@ public float getWriteSessionsFraction() { return writeSessionsFraction; } + long getLoopFrequency() { + return loopFrequency; + } + public int getKeepAliveIntervalMinutes() { return keepAliveIntervalMinutes; } + public Duration getRemoveInactiveSessionAfter() { + return removeInactiveSessionAfter; + } + public boolean isFailIfPoolExhausted() { return actionOnExhaustion == ActionOnExhaustion.FAIL; } @@ -118,7 +131,9 @@ public static class Builder { private ActionOnExhaustion actionOnExhaustion = DEFAULT_ACTION; private long initialWaitForSessionTimeoutMillis = 30_000L; private ActionOnSessionNotFound actionOnSessionNotFound = ActionOnSessionNotFound.RETRY; + private long loopFrequency = 10 * 1000L; private int keepAliveIntervalMinutes = 30; + private Duration removeInactiveSessionAfter = Duration.ofMinutes(55L); /** * Minimum number of sessions that this pool will always maintain. These will be created eagerly @@ -165,6 +180,16 @@ public Builder setMaxIdleSessions(int maxIdleSessions) { return this; } + Builder setLoopFrequency(long loopFrequency) { + this.loopFrequency = loopFrequency; + return this; + } + + public Builder setRemoveInactiveSessionAfter(Duration duration) { + this.removeInactiveSessionAfter = duration; + return this; + } + /** * How frequently to keep alive idle sessions. This should be less than 60 since an idle session * is automatically closed after 60 minutes. Sessions will be kept alive by sending a dummy diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BaseSessionPoolTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BaseSessionPoolTest.java index c3f724edea..26bbef4535 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BaseSessionPoolTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BaseSessionPoolTest.java @@ -71,7 +71,7 @@ SessionImpl mockSession() { void runMaintainanceLoop(FakeClock clock, SessionPool pool, long numCycles) { for (int i = 0; i < numCycles; i++) { pool.poolMaintainer.maintainPool(); - clock.currentTimeMillis += SessionPool.PoolMaintainer.LOOP_FREQUENCY; + clock.currentTimeMillis += pool.poolMaintainer.loopFrequency; } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolBenchmark.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolBenchmark.java index 6fc750b57a..fe5599b32c 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolBenchmark.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolBenchmark.java @@ -18,27 +18,14 @@ import static com.google.common.truth.Truth.assertThat; -import com.google.api.gax.grpc.testing.LocalChannelProvider; +import com.google.api.gax.rpc.TransportChannelProvider; import com.google.cloud.NoCredentials; -import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime; -import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult; import com.google.cloud.spanner.TransactionRunner.TransactionCallable; -import com.google.common.base.Predicate; -import com.google.common.collect.Collections2; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningScheduledExecutorService; import com.google.common.util.concurrent.MoreExecutors; -import com.google.protobuf.AbstractMessage; -import com.google.protobuf.ListValue; import com.google.spanner.v1.BatchCreateSessionsRequest; -import com.google.spanner.v1.ResultSetMetadata; -import com.google.spanner.v1.StructType; -import com.google.spanner.v1.StructType.Field; -import com.google.spanner.v1.TypeCode; -import io.grpc.Server; -import io.grpc.Status; -import io.grpc.inprocess.InProcessServerBuilder; import java.util.ArrayList; import java.util.List; import java.util.Random; @@ -63,8 +50,10 @@ /** * Benchmarks for common session pool scenarios. The simulated execution times are based on * reasonable estimates and are primarily intended to keep the benchmarks comparable with each other - * before and after changes have been made to the pool. The benchmarks are bound to the build - * profile `benchmark` and can be executed like this: `mvn test -Pbenchmark` + * before and after changes have been made to the pool. The benchmarks are bound to the Maven + * profile `benchmark` and can be executed like this: + * mvn clean test -DskipTests -Pbenchmark -Dbenchmark.name=SessionPoolBenchmark + * */ @BenchmarkMode(Mode.AverageTime) @Fork(value = 1, warmups = 0) @@ -79,59 +68,10 @@ public class SessionPoolBenchmark { private static final int RND_WAIT_TIME_BETWEEN_REQUESTS = 10; private static final Random RND = new Random(); - public static void main(String[] args) throws Exception { - org.openjdk.jmh.Main.main(args); - } - @State(Scope.Thread) @AuxCounters(org.openjdk.jmh.annotations.AuxCounters.Type.EVENTS) - public static class MockServer { - private static final int NETWORK_LATENCY_TIME = 10; - private static final int BATCH_CREATE_SESSIONS_MIN_TIME = 10; - private static final int BATCH_CREATE_SESSIONS_RND_TIME = 10; - private static final int BEGIN_TRANSACTION_MIN_TIME = 1; - private static final int BEGIN_TRANSACTION_RND_TIME = 1; - private static final int COMMIT_TRANSACTION_MIN_TIME = 5; - private static final int COMMIT_TRANSACTION_RND_TIME = 5; - private static final int ROLLBACK_TRANSACTION_MIN_TIME = 1; - private static final int ROLLBACK_TRANSACTION_RND_TIME = 1; - private static final int EXECUTE_STREAMING_SQL_MIN_TIME = 10; - private static final int EXECUTE_STREAMING_SQL_RND_TIME = 10; - private static final int EXECUTE_SQL_MIN_TIME = 10; - private static final int EXECUTE_SQL_RND_TIME = 10; - - private static final Statement UPDATE_STATEMENT = - Statement.of("UPDATE FOO SET BAR=1 WHERE BAZ=2"); - private static final Statement INVALID_UPDATE_STATEMENT = - Statement.of("UPDATE NON_EXISTENT_TABLE SET BAR=1 WHERE BAZ=2"); - private static final long UPDATE_COUNT = 1L; - private static final Statement SELECT1 = Statement.of("SELECT 1 AS COL1"); - private static final ResultSetMetadata SELECT1_METADATA = - ResultSetMetadata.newBuilder() - .setRowType( - StructType.newBuilder() - .addFields( - Field.newBuilder() - .setName("COL1") - .setType( - com.google.spanner.v1.Type.newBuilder() - .setCode(TypeCode.INT64) - .build()) - .build()) - .build()) - .build(); - private static final com.google.spanner.v1.ResultSet SELECT1_RESULTSET = - com.google.spanner.v1.ResultSet.newBuilder() - .addRows( - ListValue.newBuilder() - .addValues(com.google.protobuf.Value.newBuilder().setStringValue("1").build()) - .build()) - .setMetadata(SELECT1_METADATA) - .build(); - private MockSpannerServiceImpl mockSpanner; - private Server server; - private LocalChannelProvider channelProvider; - + public static class BenchmarkState { + private StandardBenchmarkMockServer mockServer; private Spanner spanner; private DatabaseClientImpl client; @@ -152,50 +92,18 @@ public static class MockServer { /** AuxCounter for number of RPCs. */ public int numBatchCreateSessionsRpcs() { - return countRequests(BatchCreateSessionsRequest.class); + return mockServer.countRequests(BatchCreateSessionsRequest.class); } /** AuxCounter for number of sessions created. */ public int sessionsCreated() { - return mockSpanner.numSessionsCreated(); + return mockServer.getMockSpanner().numSessionsCreated(); } @Setup(Level.Invocation) public void setup() throws Exception { - mockSpanner = new MockSpannerServiceImpl(); - mockSpanner.setAbortProbability( - 0.0D); // We don't want any unpredictable aborted transactions. - mockSpanner.putStatementResult(StatementResult.update(UPDATE_STATEMENT, UPDATE_COUNT)); - mockSpanner.putStatementResult(StatementResult.query(SELECT1, SELECT1_RESULTSET)); - mockSpanner.putStatementResult( - StatementResult.exception( - INVALID_UPDATE_STATEMENT, - Status.INVALID_ARGUMENT.withDescription("invalid statement").asRuntimeException())); - - mockSpanner.setBatchCreateSessionsExecutionTime( - SimulatedExecutionTime.ofMinimumAndRandomTime( - NETWORK_LATENCY_TIME + BATCH_CREATE_SESSIONS_MIN_TIME, - BATCH_CREATE_SESSIONS_RND_TIME)); - mockSpanner.setBeginTransactionExecutionTime( - SimulatedExecutionTime.ofMinimumAndRandomTime( - NETWORK_LATENCY_TIME + BEGIN_TRANSACTION_MIN_TIME, BEGIN_TRANSACTION_RND_TIME)); - mockSpanner.setCommitExecutionTime( - SimulatedExecutionTime.ofMinimumAndRandomTime( - NETWORK_LATENCY_TIME + COMMIT_TRANSACTION_MIN_TIME, COMMIT_TRANSACTION_RND_TIME)); - mockSpanner.setRollbackExecutionTime( - SimulatedExecutionTime.ofMinimumAndRandomTime( - NETWORK_LATENCY_TIME + ROLLBACK_TRANSACTION_MIN_TIME, ROLLBACK_TRANSACTION_RND_TIME)); - mockSpanner.setExecuteStreamingSqlExecutionTime( - SimulatedExecutionTime.ofMinimumAndRandomTime( - NETWORK_LATENCY_TIME + EXECUTE_STREAMING_SQL_MIN_TIME, - EXECUTE_STREAMING_SQL_RND_TIME)); - mockSpanner.setExecuteSqlExecutionTime( - SimulatedExecutionTime.ofMinimumAndRandomTime( - NETWORK_LATENCY_TIME + EXECUTE_SQL_MIN_TIME, EXECUTE_SQL_RND_TIME)); - - String uniqueName = InProcessServerBuilder.generateName(); - server = InProcessServerBuilder.forName(uniqueName).addService(mockSpanner).build().start(); - channelProvider = LocalChannelProvider.create(uniqueName); + mockServer = new StandardBenchmarkMockServer(); + TransportChannelProvider channelProvider = mockServer.start(); SpannerOptions options = SpannerOptions.newBuilder() @@ -226,31 +134,18 @@ public void setup() throws Exception { @TearDown(Level.Invocation) public void teardown() throws Exception { spanner.close(); - server.shutdown(); - server.awaitTermination(); + mockServer.shutdown(); } int expectedStepsToMax() { int remainder = (maxSessions - minSessions) % incStep == 0 ? 0 : 1; return numChannels + ((maxSessions - minSessions) / incStep) + remainder; } - - int countRequests(final Class type) { - return Collections2.filter( - mockSpanner.getRequests(), - new Predicate() { - @Override - public boolean apply(AbstractMessage input) { - return input.getClass().equals(type); - } - }) - .size(); - } } /** Measures the time needed to execute a burst of read requests. */ @Benchmark - public void burstRead(final MockServer server) throws Exception { + public void burstRead(final BenchmarkState server) throws Exception { int totalQueries = server.maxSessions * 8; int parallelThreads = server.maxSessions * 2; final DatabaseClient client = @@ -268,7 +163,8 @@ public void burstRead(final MockServer server) throws Exception { @Override public Void call() throws Exception { Thread.sleep(RND.nextInt(RND_WAIT_TIME_BETWEEN_REQUESTS)); - try (ResultSet rs = client.singleUse().executeQuery(MockServer.SELECT1)) { + try (ResultSet rs = + client.singleUse().executeQuery(StandardBenchmarkMockServer.SELECT1)) { while (rs.next()) { Thread.sleep(RND.nextInt(HOLD_SESSION_TIME)); } @@ -283,7 +179,7 @@ public Void call() throws Exception { /** Measures the time needed to execute a burst of write requests. */ @Benchmark - public void burstWrite(final MockServer server) throws Exception { + public void burstWrite(final BenchmarkState server) throws Exception { int totalWrites = server.maxSessions * 8; int parallelThreads = server.maxSessions * 2; final DatabaseClient client = @@ -306,7 +202,8 @@ public Long call() throws Exception { new TransactionCallable() { @Override public Long run(TransactionContext transaction) throws Exception { - return transaction.executeUpdate(MockServer.UPDATE_STATEMENT); + return transaction.executeUpdate( + StandardBenchmarkMockServer.UPDATE_STATEMENT); } }); } @@ -318,7 +215,7 @@ public Long run(TransactionContext transaction) throws Exception { /** Measures the time needed to execute a burst of read and write requests. */ @Benchmark - public void burstReadAndWrite(final MockServer server) throws Exception { + public void burstReadAndWrite(final BenchmarkState server) throws Exception { int totalWrites = server.maxSessions * 4; int totalReads = server.maxSessions * 4; int parallelThreads = server.maxSessions * 2; @@ -342,7 +239,8 @@ public Long call() throws Exception { new TransactionCallable() { @Override public Long run(TransactionContext transaction) throws Exception { - return transaction.executeUpdate(MockServer.UPDATE_STATEMENT); + return transaction.executeUpdate( + StandardBenchmarkMockServer.UPDATE_STATEMENT); } }); } @@ -355,7 +253,8 @@ public Long run(TransactionContext transaction) throws Exception { @Override public Void call() throws Exception { Thread.sleep(RND.nextInt(RND_WAIT_TIME_BETWEEN_REQUESTS)); - try (ResultSet rs = client.singleUse().executeQuery(MockServer.SELECT1)) { + try (ResultSet rs = + client.singleUse().executeQuery(StandardBenchmarkMockServer.SELECT1)) { while (rs.next()) { Thread.sleep(RND.nextInt(HOLD_SESSION_TIME)); } @@ -370,7 +269,7 @@ public Void call() throws Exception { /** Measures the time needed to acquire MaxSessions session sequentially. */ @Benchmark - public void steadyIncrease(MockServer server) throws Exception { + public void steadyIncrease(BenchmarkState server) throws Exception { final DatabaseClient client = server.spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); SessionPool pool = ((DatabaseClientImpl) client).pool; @@ -380,7 +279,7 @@ public void steadyIncrease(MockServer server) throws Exception { List transactions = new ArrayList<>(server.maxSessions); for (int i = 0; i < server.maxSessions; i++) { ReadOnlyTransaction tx = client.readOnlyTransaction(); - tx.executeQuery(MockServer.SELECT1); + tx.executeQuery(StandardBenchmarkMockServer.SELECT1); transactions.add(tx); } for (ReadOnlyTransaction tx : transactions) { diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolMaintainerBenchmark.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolMaintainerBenchmark.java new file mode 100644 index 0000000000..baba73f9e9 --- /dev/null +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolMaintainerBenchmark.java @@ -0,0 +1,268 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.api.gax.rpc.TransportChannelProvider; +import com.google.cloud.NoCredentials; +import com.google.cloud.spanner.TransactionRunner.TransactionCallable; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningScheduledExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.spanner.v1.BatchCreateSessionsRequest; +import com.google.spanner.v1.BeginTransactionRequest; +import com.google.spanner.v1.DeleteSessionRequest; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import org.openjdk.jmh.annotations.AuxCounters; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; +import org.threeten.bp.Duration; + +/** + * Benchmarks for the SessionPoolMaintainer. Run these benchmarks from the command line like this: + * + * mvn clean test -DskipTests -Pbenchmark -Dbenchmark.name=SessionPoolMaintainerBenchmark + * + */ +@BenchmarkMode(Mode.AverageTime) +@Fork(value = 1, warmups = 0) +@Measurement(batchSize = 1, iterations = 1, timeUnit = TimeUnit.MILLISECONDS) +@Warmup(batchSize = 0, iterations = 0) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class SessionPoolMaintainerBenchmark { + private static final String TEST_PROJECT = "my-project"; + private static final String TEST_INSTANCE = "my-instance"; + private static final String TEST_DATABASE = "my-database"; + private static final int HOLD_SESSION_TIME = 10; + private static final int RND_WAIT_TIME_BETWEEN_REQUESTS = 100; + private static final Random RND = new Random(); + + @State(Scope.Thread) + @AuxCounters(org.openjdk.jmh.annotations.AuxCounters.Type.EVENTS) + public static class MockServer { + private StandardBenchmarkMockServer mockServer; + private Spanner spanner; + private DatabaseClientImpl client; + + /** + * The tests set the session idle timeout to an extremely low value to force timeouts and + * sessions to be evicted from the pool. This is not intended to replicate a realistic scenario, + * only to detect whether certain changes to the client library might cause the number of RPCs + * or the execution time to change drastically. + */ + @Param({"100"}) + long idleTimeout; + + /** AuxCounter for number of create RPCs. */ + public int numBatchCreateSessionsRpcs() { + return mockServer.countRequests(BatchCreateSessionsRequest.class); + } + + /** AuxCounter for number of delete RPCs. */ + public int numDeleteSessionRpcs() { + return mockServer.countRequests(DeleteSessionRequest.class); + } + + /** AuxCounter for number of begin tx RPCs. */ + public int numBeginTransactionRpcs() { + return mockServer.countRequests(BeginTransactionRequest.class); + } + + @Setup(Level.Invocation) + public void setup() throws Exception { + mockServer = new StandardBenchmarkMockServer(); + TransportChannelProvider channelProvider = mockServer.start(); + + SpannerOptions options = + SpannerOptions.newBuilder() + .setProjectId(TEST_PROJECT) + .setChannelProvider(channelProvider) + .setCredentials(NoCredentials.getInstance()) + .setSessionPoolOption( + SessionPoolOptions.newBuilder() + // Set idle timeout and loop frequency to very low values. + .setRemoveInactiveSessionAfter(Duration.ofMillis(idleTimeout)) + .setLoopFrequency(idleTimeout / 10) + .build()) + .build(); + + spanner = options.getService(); + client = + (DatabaseClientImpl) + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + // Wait until the session pool has initialized. + while (client.pool.getNumberOfSessionsInPool() + < spanner.getOptions().getSessionPoolOptions().getMinSessions()) { + Thread.sleep(1L); + } + } + + @TearDown(Level.Invocation) + public void teardown() throws Exception { + spanner.close(); + mockServer.shutdown(); + } + } + + /** Measures the time and RPCs needed to execute read requests. */ + @Benchmark + public void read(final MockServer server) throws Exception { + int min = server.spanner.getOptions().getSessionPoolOptions().getMinSessions(); + int max = server.spanner.getOptions().getSessionPoolOptions().getMaxSessions(); + int totalQueries = max * 4; + int parallelThreads = min; + final DatabaseClient client = + server.spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + SessionPool pool = ((DatabaseClientImpl) client).pool; + assertThat(pool.totalSessions()).isEqualTo(min); + + ListeningScheduledExecutorService service = + MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(parallelThreads)); + List> futures = new ArrayList<>(totalQueries); + for (int i = 0; i < totalQueries; i++) { + futures.add( + service.submit( + new Callable() { + @Override + public Void call() throws Exception { + Thread.sleep(RND.nextInt(RND_WAIT_TIME_BETWEEN_REQUESTS)); + try (ResultSet rs = + client.singleUse().executeQuery(StandardBenchmarkMockServer.SELECT1)) { + while (rs.next()) { + Thread.sleep(RND.nextInt(HOLD_SESSION_TIME)); + } + return null; + } + } + })); + } + Futures.allAsList(futures).get(); + service.shutdown(); + } + + /** Measures the time and RPCs needed to execute write requests. */ + @Benchmark + public void write(final MockServer server) throws Exception { + int min = server.spanner.getOptions().getSessionPoolOptions().getMinSessions(); + int max = server.spanner.getOptions().getSessionPoolOptions().getMaxSessions(); + int totalWrites = max * 4; + int parallelThreads = max; + final DatabaseClient client = + server.spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + SessionPool pool = ((DatabaseClientImpl) client).pool; + assertThat(pool.totalSessions()).isEqualTo(min); + + ListeningScheduledExecutorService service = + MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(parallelThreads)); + List> futures = new ArrayList<>(totalWrites); + for (int i = 0; i < totalWrites; i++) { + futures.add( + service.submit( + new Callable() { + @Override + public Long call() throws Exception { + Thread.sleep(RND.nextInt(RND_WAIT_TIME_BETWEEN_REQUESTS)); + TransactionRunner runner = client.readWriteTransaction(); + return runner.run( + new TransactionCallable() { + @Override + public Long run(TransactionContext transaction) throws Exception { + return transaction.executeUpdate( + StandardBenchmarkMockServer.UPDATE_STATEMENT); + } + }); + } + })); + } + Futures.allAsList(futures).get(); + service.shutdown(); + } + + /** Measures the time and RPCs needed to execute read and write requests. */ + @Benchmark + public void readAndWrite(final MockServer server) throws Exception { + int min = server.spanner.getOptions().getSessionPoolOptions().getMinSessions(); + int max = server.spanner.getOptions().getSessionPoolOptions().getMaxSessions(); + int totalWrites = max * 2; + int totalReads = max * 2; + int parallelThreads = max; + final DatabaseClient client = + server.spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + SessionPool pool = ((DatabaseClientImpl) client).pool; + assertThat(pool.totalSessions()).isEqualTo(min); + + ListeningScheduledExecutorService service = + MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(parallelThreads)); + List> futures = new ArrayList<>(totalReads + totalWrites); + for (int i = 0; i < totalWrites; i++) { + futures.add( + service.submit( + new Callable() { + @Override + public Long call() throws Exception { + Thread.sleep(RND.nextInt(RND_WAIT_TIME_BETWEEN_REQUESTS)); + TransactionRunner runner = client.readWriteTransaction(); + return runner.run( + new TransactionCallable() { + @Override + public Long run(TransactionContext transaction) throws Exception { + return transaction.executeUpdate( + StandardBenchmarkMockServer.UPDATE_STATEMENT); + } + }); + } + })); + } + for (int i = 0; i < totalReads; i++) { + futures.add( + service.submit( + new Callable() { + @Override + public Void call() throws Exception { + Thread.sleep(RND.nextInt(RND_WAIT_TIME_BETWEEN_REQUESTS)); + try (ResultSet rs = + client.singleUse().executeQuery(StandardBenchmarkMockServer.SELECT1)) { + while (rs.next()) { + Thread.sleep(RND.nextInt(HOLD_SESSION_TIME)); + } + return null; + } + } + })); + } + Futures.allAsList(futures).get(); + service.shutdown(); + } +} diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolMaintainerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolMaintainerTest.java new file mode 100644 index 0000000000..8d1b780432 --- /dev/null +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolMaintainerTest.java @@ -0,0 +1,315 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner; + +import static com.google.common.truth.Truth.assertThat; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.MockitoAnnotations.initMocks; + +import com.google.cloud.spanner.SessionClient.SessionConsumer; +import com.google.cloud.spanner.SessionPool.PooledSession; +import com.google.cloud.spanner.SessionPool.SessionConsumerImpl; +import com.google.common.base.Function; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +@RunWith(JUnit4.class) +public class SessionPoolMaintainerTest extends BaseSessionPoolTest { + private ExecutorService executor = Executors.newSingleThreadExecutor(); + private @Mock SpannerImpl client; + private @Mock SessionClient sessionClient; + private @Mock SpannerOptions spannerOptions; + private DatabaseId db = DatabaseId.of("projects/p/instances/i/databases/unused"); + private SessionPoolOptions options; + private FakeClock clock = new FakeClock(); + private List idledSessions = new ArrayList<>(); + private Map pingedSessions = new HashMap<>(); + + @Before + public void setUp() throws Exception { + initMocks(this); + when(client.getOptions()).thenReturn(spannerOptions); + when(client.getSessionClient(db)).thenReturn(sessionClient); + when(spannerOptions.getNumChannels()).thenReturn(4); + setupMockSessionCreation(); + options = + SessionPoolOptions.newBuilder() + .setMinSessions(1) + .setMaxIdleSessions(1) + .setMaxSessions(5) + .setIncStep(1) + .setKeepAliveIntervalMinutes(2) + .build(); + idledSessions.clear(); + pingedSessions.clear(); + } + + private void setupMockSessionCreation() { + doAnswer( + new Answer() { + @Override + public Void answer(final InvocationOnMock invocation) throws Throwable { + executor.submit( + new Runnable() { + @Override + public void run() { + int sessionCount = invocation.getArgumentAt(0, Integer.class); + SessionConsumerImpl consumer = + invocation.getArgumentAt(2, SessionConsumerImpl.class); + for (int i = 0; i < sessionCount; i++) { + consumer.onSessionReady(setupMockSession(mockSession())); + } + } + }); + return null; + } + }) + .when(sessionClient) + .asyncBatchCreateSessions( + Mockito.anyInt(), Mockito.anyBoolean(), any(SessionConsumer.class)); + } + + private SessionImpl setupMockSession(final SessionImpl session) { + ReadContext mockContext = mock(ReadContext.class); + final ResultSet mockResult = mock(ResultSet.class); + when(session.singleUse(any(TimestampBound.class))).thenReturn(mockContext); + when(mockContext.executeQuery(any(Statement.class))) + .thenAnswer( + new Answer() { + @Override + public ResultSet answer(InvocationOnMock invocation) throws Throwable { + Integer currentValue = pingedSessions.get(session.getName()); + if (currentValue == null) { + currentValue = 0; + } + pingedSessions.put(session.getName(), ++currentValue); + return mockResult; + } + }); + when(mockResult.next()).thenReturn(true); + return session; + } + + private SessionPool createPool() throws Exception { + SessionPool pool = + SessionPool.createPool( + options, new TestExecutorFactory(), client.getSessionClient(db), clock); + pool.idleSessionRemovedListener = + new Function() { + @Override + public Void apply(PooledSession input) { + idledSessions.add(input); + return null; + } + }; + // Wait until pool has initialized. + while (pool.totalSessions() < options.getMinSessions()) { + Thread.sleep(1L); + } + return pool; + } + + @Test + public void testKeepAlive() throws Exception { + SessionPool pool = createPool(); + assertThat(pingedSessions).isEmpty(); + // Run one maintenance loop. No sessions should get a keep-alive ping. + runMaintainanceLoop(clock, pool, 1); + assertThat(pingedSessions).isEmpty(); + + // Checkout two sessions and do a maintenance loop. Still no sessions should be getting any + // pings. + Session session1 = pool.getReadSession(); + Session session2 = pool.getReadSession(); + runMaintainanceLoop(clock, pool, 1); + assertThat(pingedSessions).isEmpty(); + + // Check the sessions back into the pool and do a maintenance loop. + session2.close(); + session1.close(); + runMaintainanceLoop(clock, pool, 1); + assertThat(pingedSessions).isEmpty(); + + // Now advance the time enough for both sessions in the pool to be idled. Then do one + // maintenance loop. This should cause the last session to have been checked back into the pool + // to get a ping, but not the second session. + clock.currentTimeMillis += TimeUnit.MINUTES.toMillis(options.getKeepAliveIntervalMinutes()) + 1; + runMaintainanceLoop(clock, pool, 1); + assertThat(pingedSessions).containsExactly(session1.getName(), 1); + // Do another maintenance loop. This should cause the other session to also get a ping. + runMaintainanceLoop(clock, pool, 1); + assertThat(pingedSessions).containsExactly(session1.getName(), 1, session2.getName(), 1); + + // Now check out three sessions so the pool will create an additional session. The pool will + // only keep 2 sessions alive, as that is the setting for MinSessions. + Session session3 = pool.getReadSession(); + Session session4 = pool.getReadSession(); + Session session5 = pool.getReadSession(); + // Note that session2 was now the first session in the pool as it was the last to receive a + // ping. + assertThat(session3.getName()).isEqualTo(session2.getName()); + assertThat(session4.getName()).isEqualTo(session1.getName()); + session5.close(); + session4.close(); + session3.close(); + // Advance the clock to force pings for the sessions in the pool and do three maintenance loops. + clock.currentTimeMillis += TimeUnit.MINUTES.toMillis(options.getKeepAliveIntervalMinutes()) + 1; + runMaintainanceLoop(clock, pool, 3); + assertThat(pingedSessions).containsExactly(session1.getName(), 2, session2.getName(), 2); + + // Advance the clock to idle all sessions in the pool again and then check out one session. This + // should cause only one session to get a ping. + clock.currentTimeMillis += TimeUnit.MINUTES.toMillis(options.getKeepAliveIntervalMinutes()) + 1; + // We are now checking out session2 because + Session session6 = pool.getReadSession(); + // The session that was first in the pool now is equal to the initial first session as each full + // round of pings will swap the order of the first MinSessions sessions in the pool. + assertThat(session6.getName()).isEqualTo(session1.getName()); + runMaintainanceLoop(clock, pool, 3); + assertThat(pool.totalSessions()).isEqualTo(3); + assertThat(pingedSessions).containsExactly(session1.getName(), 2, session2.getName(), 3); + // Update the last use date and release the session to the pool and do another maintenance + // cycle. + ((PooledSession) session6).markUsed(); + session6.close(); + runMaintainanceLoop(clock, pool, 3); + assertThat(pingedSessions).containsExactly(session1.getName(), 2, session2.getName(), 3); + + // Now check out 3 sessions again and make sure the 'extra' session is checked in last. That + // will make it eligible for pings. + Session session7 = pool.getReadSession(); + Session session8 = pool.getReadSession(); + Session session9 = pool.getReadSession(); + + assertThat(session7.getName()).isEqualTo(session1.getName()); + assertThat(session8.getName()).isEqualTo(session2.getName()); + assertThat(session9.getName()).isEqualTo(session5.getName()); + + session7.close(); + session8.close(); + session9.close(); + + clock.currentTimeMillis += TimeUnit.MINUTES.toMillis(options.getKeepAliveIntervalMinutes()) + 1; + runMaintainanceLoop(clock, pool, 3); + // session1 will not get a ping this time, as it was checked in first and is now the last + // session in the pool. + assertThat(pingedSessions) + .containsExactly(session1.getName(), 2, session2.getName(), 4, session5.getName(), 1); + } + + @Test + public void testIdleSessions() throws Exception { + SessionPool pool = createPool(); + long loopsToIdleSessions = + Double.valueOf( + Math.ceil( + (double) options.getRemoveInactiveSessionAfter().toMillis() + / pool.poolMaintainer.loopFrequency)) + .longValue() + + 2L; + assertThat(idledSessions).isEmpty(); + // Run one maintenance loop. No sessions should be removed from the pool. + runMaintainanceLoop(clock, pool, 1); + assertThat(idledSessions).isEmpty(); + + // Checkout two sessions and do a maintenance loop. Still no sessions should be removed. + Session session1 = pool.getReadSession(); + Session session2 = pool.getReadSession(); + runMaintainanceLoop(clock, pool, 1); + assertThat(idledSessions).isEmpty(); + + // Check the sessions back into the pool and do a maintenance loop. + session2.close(); + session1.close(); + runMaintainanceLoop(clock, pool, 1); + assertThat(idledSessions).isEmpty(); + + // Now advance the time enough for both sessions in the pool to be idled. Both sessions should + // be kept alive by the maintainer and remain in the pool. + runMaintainanceLoop(clock, pool, loopsToIdleSessions); + assertThat(idledSessions).isEmpty(); + + // Now check out three sessions so the pool will create an additional session. The pool will + // only keep 2 sessions alive, as that is the setting for MinSessions. + Session session3 = pool.getReadSession(); + Session session4 = pool.getReadSession(); + Session session5 = pool.getReadSession(); + // Note that session2 was now the first session in the pool as it was the last to receive a + // ping. + assertThat(session3.getName()).isEqualTo(session2.getName()); + assertThat(session4.getName()).isEqualTo(session1.getName()); + session5.close(); + session4.close(); + session3.close(); + // Advance the clock to idle sessions. The pool will keep session4 and session3 alive, session5 + // will be idled and removed. + runMaintainanceLoop(clock, pool, loopsToIdleSessions); + assertThat(idledSessions).containsExactly(session5); + assertThat(pool.totalSessions()).isEqualTo(2); + + // Check out three sessions again and keep one session checked out. + Session session6 = pool.getReadSession(); + Session session7 = pool.getReadSession(); + Session session8 = pool.getReadSession(); + session8.close(); + session7.close(); + // Now advance the clock to idle sessions. This should remove session8 from the pool. + runMaintainanceLoop(clock, pool, loopsToIdleSessions); + assertThat(idledSessions).containsExactly(session5, session8); + assertThat(pool.totalSessions()).isEqualTo(2); + ((PooledSession) session6).markUsed(); + session6.close(); + + // Check out three sessions and keep them all checked out. No sessions should be removed from + // the pool. + Session session9 = pool.getReadSession(); + Session session10 = pool.getReadSession(); + Session session11 = pool.getReadSession(); + runMaintainanceLoop(clock, pool, loopsToIdleSessions); + assertThat(idledSessions).containsExactly(session5, session8); + assertThat(pool.totalSessions()).isEqualTo(3); + // Return the sessions to the pool. As they have not been used, they are all into idle time. + // Running the maintainer will now remove all the sessions from the pool and then start the + // replenish method. + session9.close(); + session10.close(); + session11.close(); + runMaintainanceLoop(clock, pool, 1); + assertThat(idledSessions).containsExactly(session5, session8, session9, session10, session11); + // Check that the pool is replenished. + while (pool.totalSessions() < options.getMinSessions()) { + Thread.sleep(1L); + } + assertThat(pool.totalSessions()).isEqualTo(options.getMinSessions()); + } +} diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolStressTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolStressTest.java index f7c2f02a59..bc0460c030 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolStressTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolStressTest.java @@ -17,7 +17,7 @@ package com.google.cloud.spanner; import static com.google.common.truth.Truth.assertThat; -import static org.mockito.Matchers.any; +import static org.mockito.Mockito.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -25,7 +25,9 @@ import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; import com.google.cloud.spanner.SessionClient.SessionConsumer; +import com.google.cloud.spanner.SessionPool.PooledSession; import com.google.cloud.spanner.SessionPool.SessionConsumerImpl; +import com.google.common.base.Function; import com.google.common.util.concurrent.Uninterruptibles; import com.google.protobuf.Empty; import java.util.ArrayList; @@ -102,7 +104,7 @@ private void setupSpanner(DatabaseId db) { @Override public Session answer(InvocationOnMock invocation) throws Throwable { synchronized (lock) { - Session session = mockSession(); + SessionImpl session = mockSession(); setupSession(session); sessions.put(session.getName(), false); @@ -140,7 +142,7 @@ public Void answer(InvocationOnMock invocation) throws Throwable { Mockito.anyInt(), Mockito.anyBoolean(), Mockito.any(SessionConsumer.class)); } - private void setupSession(final Session session) { + private void setupSession(final SessionImpl session) { ReadContext mockContext = mock(ReadContext.class); final ResultSet mockResult = mock(ResultSet.class); when(session.singleUse(any(TimestampBound.class))).thenReturn(mockContext); @@ -258,6 +260,16 @@ public void stressTest() throws Exception { pool = SessionPool.createPool( builder.build(), new TestExecutorFactory(), mockSpanner.getSessionClient(db), clock); + pool.idleSessionRemovedListener = + new Function() { + @Override + public Void apply(PooledSession pooled) { + synchronized (lock) { + sessions.remove(pooled.getName()); + return null; + } + } + }; for (int i = 0; i < concurrentThreads; i++) { new Thread( new Runnable() { diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java index a9636dea1b..ab02cb9b4c 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java @@ -33,7 +33,6 @@ import static org.mockito.Mockito.when; import static org.mockito.MockitoAnnotations.initMocks; -import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; import com.google.cloud.Timestamp; import com.google.cloud.spanner.MetricRegistryTestUtils.FakeMetricRegistry; @@ -47,6 +46,7 @@ import com.google.cloud.spanner.TransactionRunnerImpl.TransactionContextImpl; import com.google.cloud.spanner.spi.v1.SpannerRpc; import com.google.cloud.spanner.spi.v1.SpannerRpc.ResultStreamConsumer; +import com.google.common.base.Stopwatch; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.Uninterruptibles; import com.google.protobuf.ByteString; @@ -71,7 +71,6 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -874,7 +873,6 @@ public void idleSessionCleanup() throws Exception { SessionImpl session1 = mockSession(); SessionImpl session2 = mockSession(); SessionImpl session3 = mockSession(); - final AtomicInteger numSessionClosed = new AtomicInteger(); final LinkedList sessions = new LinkedList<>(Arrays.asList(session1, session2, session3)); doAnswer( @@ -895,18 +893,8 @@ public void run() { }) .when(sessionClient) .asyncBatchCreateSessions(Mockito.eq(1), Mockito.anyBoolean(), any(SessionConsumer.class)); - for (Session session : new Session[] {session1, session2, session3}) { - doAnswer( - new Answer>() { - - @Override - public ApiFuture answer(InvocationOnMock invocation) throws Throwable { - numSessionClosed.incrementAndGet(); - return ApiFutures.immediateFuture(Empty.getDefaultInstance()); - } - }) - .when(session) - .asyncClose(); + for (SessionImpl session : sessions) { + mockKeepAlive(session); } FakeClock clock = new FakeClock(); clock.currentTimeMillis = System.currentTimeMillis(); @@ -914,25 +902,29 @@ public ApiFuture answer(InvocationOnMock invocation) throws Throwable { // Make sure pool has been initialized pool.getReadSession().close(); runMaintainanceLoop(clock, pool, pool.poolMaintainer.numClosureCycles); - assertThat(numSessionClosed.get()).isEqualTo(0); + assertThat(pool.numIdleSessionsRemoved()).isEqualTo(0L); Session readSession1 = pool.getReadSession(); Session readSession2 = pool.getReadSession(); Session readSession3 = pool.getReadSession(); readSession1.close(); readSession2.close(); readSession3.close(); - // Now there are 3 sessions in the pool but since all were used in parallel, we will not close - // any. + // Now there are 3 sessions in the pool but since none of them has timed out, they will all be + // kept in the pool. runMaintainanceLoop(clock, pool, pool.poolMaintainer.numClosureCycles); - assertThat(numSessionClosed.get()).isEqualTo(0); + assertThat(pool.numIdleSessionsRemoved()).isEqualTo(0L); // Counters have now been reset // Use all 3 sessions sequentially pool.getReadSession().close(); pool.getReadSession().close(); pool.getReadSession().close(); - runMaintainanceLoop(clock, pool, pool.poolMaintainer.numClosureCycles); + // Advance the time by running the maintainer. This should cause + // one session to be kept alive and two sessions to be removed. + long cycles = + options.getRemoveInactiveSessionAfter().toMillis() / pool.poolMaintainer.loopFrequency; + runMaintainanceLoop(clock, pool, cycles); // We will still close 2 sessions since at any point in time only 1 session was in use. - assertThat(numSessionClosed.get()).isEqualTo(2); + assertThat(pool.numIdleSessionsRemoved()).isEqualTo(2L); pool.closeAsync().get(5L, TimeUnit.SECONDS); } @@ -975,15 +967,114 @@ public void run() { verify(session, never()).singleUse(any(TimestampBound.class)); runMaintainanceLoop(clock, pool, pool.poolMaintainer.numKeepAliveCycles); verify(session, times(2)).singleUse(any(TimestampBound.class)); - clock.currentTimeMillis += clock.currentTimeMillis + 35 * 60 * 1000; + clock.currentTimeMillis += + clock.currentTimeMillis + (options.getKeepAliveIntervalMinutes() + 5) * 60 * 1000; session1 = pool.getReadSession(); session1.writeAtLeastOnce(new ArrayList()); session1.close(); runMaintainanceLoop(clock, pool, pool.poolMaintainer.numKeepAliveCycles); - verify(session, times(3)).singleUse(any(TimestampBound.class)); + // The session pool only keeps MinSessions + MaxIdleSessions alive. + verify(session, times(options.getMinSessions() + options.getMaxIdleSessions())) + .singleUse(any(TimestampBound.class)); pool.closeAsync().get(5L, TimeUnit.SECONDS); } + @Test + public void testMaintainerKeepsWriteProportion() throws Exception { + options = + SessionPoolOptions.newBuilder() + .setMinSessions(10) + .setMaxSessions(20) + .setWriteSessionsFraction(0.5f) + .build(); + final SessionImpl session = mockSession(); + mockKeepAlive(session); + // This is cheating as we are returning the same session each but it makes the verification + // easier. + doAnswer( + new Answer() { + @Override + public Void answer(final InvocationOnMock invocation) throws Throwable { + executor.submit( + new Runnable() { + @Override + public void run() { + int sessionCount = invocation.getArgumentAt(0, Integer.class); + SessionConsumerImpl consumer = + invocation.getArgumentAt(2, SessionConsumerImpl.class); + for (int i = 0; i < sessionCount; i++) { + consumer.onSessionReady(session); + } + } + }); + return null; + } + }) + .when(sessionClient) + .asyncBatchCreateSessions(anyInt(), Mockito.anyBoolean(), any(SessionConsumer.class)); + FakeClock clock = new FakeClock(); + clock.currentTimeMillis = System.currentTimeMillis(); + pool = createPool(clock); + // Wait until all sessions have been created and prepared. + waitForExpectedSessionPool(options.getMinSessions(), options.getWriteSessionsFraction()); + assertThat(pool.getNumberOfSessionsInPool()).isEqualTo(options.getMinSessions()); + assertThat(pool.getNumberOfAvailableWritePreparedSessions()) + .isEqualTo((int) Math.ceil(options.getMinSessions() * options.getWriteSessionsFraction())); + + // Run maintainer numKeepAliveCycles. No pings should be executed during these. + runMaintainanceLoop(clock, pool, pool.poolMaintainer.numKeepAliveCycles); + verify(session, never()).singleUse(any(TimestampBound.class)); + // Run maintainer numKeepAliveCycles again. All sessions should now be pinged. + runMaintainanceLoop(clock, pool, pool.poolMaintainer.numKeepAliveCycles); + verify(session, times(options.getMinSessions())).singleUse(any(TimestampBound.class)); + // Verify that all sessions are still in the pool, and that the write fraction is maintained. + assertThat(pool.getNumberOfSessionsInPool()).isEqualTo(options.getMinSessions()); + assertThat( + pool.getNumberOfAvailableWritePreparedSessions() + + pool.getNumberOfSessionsBeingPrepared()) + .isEqualTo( + (int) Math.ceil(pool.getNumberOfSessionsInPool() * options.getWriteSessionsFraction())); + + // Check out MaxSessions sessions to add additional sessions to the pool. + List sessions = new ArrayList<>(options.getMaxSessions()); + for (int i = 0; i < options.getMaxSessions(); i++) { + sessions.add(pool.getReadSession()); + } + for (Session s : sessions) { + s.close(); + } + // There should be MaxSessions in the pool and the writeFraction should be respected. + waitForExpectedSessionPool(options.getMaxSessions(), options.getWriteSessionsFraction()); + assertThat(pool.getNumberOfSessionsInPool()).isEqualTo(options.getMaxSessions()); + assertThat(pool.getNumberOfAvailableWritePreparedSessions()) + .isEqualTo((int) Math.ceil(options.getMaxSessions() * options.getWriteSessionsFraction())); + + // Advance the clock to allow the sessions to time out or be kept alive. + clock.currentTimeMillis += + clock.currentTimeMillis + (options.getKeepAliveIntervalMinutes() + 5) * 60 * 1000; + runMaintainanceLoop(clock, pool, pool.poolMaintainer.numKeepAliveCycles); + // The session pool only keeps MinSessions alive. + verify(session, times(options.getMinSessions())).singleUse(any(TimestampBound.class)); + // Verify that MinSessions and WriteFraction are respected. + waitForExpectedSessionPool(options.getMinSessions(), options.getWriteSessionsFraction()); + assertThat(pool.getNumberOfSessionsInPool()).isEqualTo(options.getMinSessions()); + assertThat(pool.getNumberOfAvailableWritePreparedSessions()) + .isEqualTo((int) Math.ceil(options.getMinSessions() * options.getWriteSessionsFraction())); + + pool.closeAsync().get(5L, TimeUnit.SECONDS); + } + + private void waitForExpectedSessionPool(int expectedSessions, float writeFraction) + throws InterruptedException { + Stopwatch watch = Stopwatch.createStarted(); + while ((pool.getNumberOfSessionsInPool() < expectedSessions + || pool.getNumberOfAvailableWritePreparedSessions() + < Math.ceil(expectedSessions * writeFraction)) + && watch.elapsed(TimeUnit.SECONDS) < 5) { + Thread.sleep(1L); + } + } + @Test public void blockAndTimeoutOnPoolExhaustion() throws Exception { // Try to take a read or a read/write session. These requests should block. @@ -1650,6 +1741,7 @@ public Void call() { private void mockKeepAlive(Session session) { ReadContext context = mock(ReadContext.class); ResultSet resultSet = mock(ResultSet.class); + when(resultSet.next()).thenReturn(true, false); when(session.singleUse(any(TimestampBound.class))).thenReturn(context); when(context.executeQuery(any(Statement.class))).thenReturn(resultSet); } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/StandardBenchmarkMockServer.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/StandardBenchmarkMockServer.java new file mode 100644 index 0000000000..7262fa163b --- /dev/null +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/StandardBenchmarkMockServer.java @@ -0,0 +1,139 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner; + +import com.google.api.gax.grpc.testing.LocalChannelProvider; +import com.google.api.gax.rpc.TransportChannelProvider; +import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime; +import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult; +import com.google.common.base.Predicate; +import com.google.common.collect.Collections2; +import com.google.protobuf.AbstractMessage; +import com.google.protobuf.ListValue; +import com.google.spanner.v1.ResultSetMetadata; +import com.google.spanner.v1.StructType; +import com.google.spanner.v1.StructType.Field; +import com.google.spanner.v1.TypeCode; +import io.grpc.Server; +import io.grpc.Status; +import io.grpc.inprocess.InProcessServerBuilder; +import java.io.IOException; + +/** Standard mock server used for benchmarking. */ +class StandardBenchmarkMockServer { + private static final int NETWORK_LATENCY_TIME = 10; + private static final int BATCH_CREATE_SESSIONS_MIN_TIME = 10; + private static final int BATCH_CREATE_SESSIONS_RND_TIME = 10; + private static final int BEGIN_TRANSACTION_MIN_TIME = 1; + private static final int BEGIN_TRANSACTION_RND_TIME = 1; + private static final int COMMIT_TRANSACTION_MIN_TIME = 5; + private static final int COMMIT_TRANSACTION_RND_TIME = 5; + private static final int ROLLBACK_TRANSACTION_MIN_TIME = 1; + private static final int ROLLBACK_TRANSACTION_RND_TIME = 1; + private static final int EXECUTE_STREAMING_SQL_MIN_TIME = 10; + private static final int EXECUTE_STREAMING_SQL_RND_TIME = 10; + private static final int EXECUTE_SQL_MIN_TIME = 10; + private static final int EXECUTE_SQL_RND_TIME = 10; + + static final Statement UPDATE_STATEMENT = Statement.of("UPDATE FOO SET BAR=1 WHERE BAZ=2"); + static final Statement INVALID_UPDATE_STATEMENT = + Statement.of("UPDATE NON_EXISTENT_TABLE SET BAR=1 WHERE BAZ=2"); + static final long UPDATE_COUNT = 1L; + static final Statement SELECT1 = Statement.of("SELECT 1 AS COL1"); + private static final ResultSetMetadata SELECT1_METADATA = + ResultSetMetadata.newBuilder() + .setRowType( + StructType.newBuilder() + .addFields( + Field.newBuilder() + .setName("COL1") + .setType( + com.google.spanner.v1.Type.newBuilder() + .setCode(TypeCode.INT64) + .build()) + .build()) + .build()) + .build(); + private static final com.google.spanner.v1.ResultSet SELECT1_RESULTSET = + com.google.spanner.v1.ResultSet.newBuilder() + .addRows( + ListValue.newBuilder() + .addValues(com.google.protobuf.Value.newBuilder().setStringValue("1").build()) + .build()) + .setMetadata(SELECT1_METADATA) + .build(); + private MockSpannerServiceImpl mockSpanner; + private Server server; + private LocalChannelProvider channelProvider; + + TransportChannelProvider start() throws IOException { + mockSpanner = new MockSpannerServiceImpl(); + mockSpanner.setAbortProbability(0.0D); // We don't want any unpredictable aborted transactions. + mockSpanner.putStatementResult(StatementResult.update(UPDATE_STATEMENT, UPDATE_COUNT)); + mockSpanner.putStatementResult(StatementResult.query(SELECT1, SELECT1_RESULTSET)); + mockSpanner.putStatementResult( + StatementResult.exception( + INVALID_UPDATE_STATEMENT, + Status.INVALID_ARGUMENT.withDescription("invalid statement").asRuntimeException())); + + mockSpanner.setBatchCreateSessionsExecutionTime( + SimulatedExecutionTime.ofMinimumAndRandomTime( + NETWORK_LATENCY_TIME + BATCH_CREATE_SESSIONS_MIN_TIME, BATCH_CREATE_SESSIONS_RND_TIME)); + mockSpanner.setBeginTransactionExecutionTime( + SimulatedExecutionTime.ofMinimumAndRandomTime( + NETWORK_LATENCY_TIME + BEGIN_TRANSACTION_MIN_TIME, BEGIN_TRANSACTION_RND_TIME)); + mockSpanner.setCommitExecutionTime( + SimulatedExecutionTime.ofMinimumAndRandomTime( + NETWORK_LATENCY_TIME + COMMIT_TRANSACTION_MIN_TIME, COMMIT_TRANSACTION_RND_TIME)); + mockSpanner.setRollbackExecutionTime( + SimulatedExecutionTime.ofMinimumAndRandomTime( + NETWORK_LATENCY_TIME + ROLLBACK_TRANSACTION_MIN_TIME, ROLLBACK_TRANSACTION_RND_TIME)); + mockSpanner.setExecuteStreamingSqlExecutionTime( + SimulatedExecutionTime.ofMinimumAndRandomTime( + NETWORK_LATENCY_TIME + EXECUTE_STREAMING_SQL_MIN_TIME, EXECUTE_STREAMING_SQL_RND_TIME)); + mockSpanner.setExecuteSqlExecutionTime( + SimulatedExecutionTime.ofMinimumAndRandomTime( + NETWORK_LATENCY_TIME + EXECUTE_SQL_MIN_TIME, EXECUTE_SQL_RND_TIME)); + + String uniqueName = InProcessServerBuilder.generateName(); + server = InProcessServerBuilder.forName(uniqueName).addService(mockSpanner).build().start(); + channelProvider = LocalChannelProvider.create(uniqueName); + + return channelProvider; + } + + void shutdown() throws InterruptedException { + server.shutdown(); + server.awaitTermination(); + } + + MockSpannerServiceImpl getMockSpanner() { + return mockSpanner; + } + + int countRequests(final Class type) { + return Collections2.filter( + mockSpanner.getRequests(), + new Predicate() { + @Override + public boolean apply(AbstractMessage input) { + return input.getClass().equals(type); + } + }) + .size(); + } +}