From 9e5a1cdaacf71147b67681861f063c3276705f44 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Knut=20Olav=20L=C3=B8ite?= Date: Wed, 8 Apr 2020 05:58:12 +0200 Subject: [PATCH] perf: increase sessions in the pool in batches (#134) * perf: increase sessions in the pool in batches When more sessions are requested by the user application than are available in the session pool, the session pool will now create new sessions in batches instead of in steps of 1. This reduces the number of RPCs needed to serve a burst of requests. A benchmark for the session pool has also been added to be able to compare performance and the number of RPCs needed before and after this change. This benchmark can also be used for future changes to verify that the change does not deteriorate performance or increase the number of RPCs needed. * fix: remove unused code * fix: include num rpcs and sessions in benchmark results * fix: remove commented code * fix: rename parameter --- google-cloud-spanner/pom.xml | 46 ++- .../google/cloud/spanner/SessionClient.java | 19 +- .../com/google/cloud/spanner/SessionPool.java | 17 +- .../cloud/spanner/SessionPoolOptions.java | 18 + .../spanner/BatchCreateSessionsTest.java | 3 +- .../cloud/spanner/MockSpannerServiceImpl.java | 9 + .../cloud/spanner/SessionClientTest.java | 6 +- .../cloud/spanner/SessionPoolBenchmark.java | 388 ++++++++++++++++++ .../cloud/spanner/SessionPoolLeakTest.java | 3 +- .../cloud/spanner/SessionPoolStressTest.java | 5 +- .../google/cloud/spanner/SessionPoolTest.java | 129 +++--- .../com/google/cloud/spanner/SpanTest.java | 6 +- .../spanner/TransactionManagerImplTest.java | 2 +- .../spanner/TransactionRunnerImplTest.java | 2 +- 14 files changed, 567 insertions(+), 86 deletions(-) create mode 100644 google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolBenchmark.java diff --git a/google-cloud-spanner/pom.xml b/google-cloud-spanner/pom.xml index 61fba21c4c..d048e1d848 100644 --- a/google-cloud-spanner/pom.xml +++ b/google-cloud-spanner/pom.xml @@ -111,7 +111,7 @@ org.apache.maven.plugins maven-dependency-plugin - io.grpc:grpc-protobuf-lite,org.hamcrest:hamcrest,org.hamcrest:hamcrest-core,com.google.errorprone:error_prone_annotations,com.google.api.grpc:grpc-google-cloud-spanner-v1,com.google.api.grpc:grpc-google-cloud-spanner-admin-instance-v1,com.google.api.grpc:grpc-google-cloud-spanner-admin-database-v1 + io.grpc:grpc-protobuf-lite,org.hamcrest:hamcrest,org.hamcrest:hamcrest-core,com.google.errorprone:error_prone_annotations,org.openjdk.jmh:jmh-generator-annprocess,com.google.api.grpc:grpc-google-cloud-spanner-v1,com.google.api.grpc:grpc-google-cloud-spanner-admin-instance-v1,com.google.api.grpc:grpc-google-cloud-spanner-admin-database-v1 @@ -305,6 +305,20 @@ 2.2 test + + + + org.openjdk.jmh + jmh-core + 1.23 + test + + + org.openjdk.jmh + jmh-generator-annprocess + 1.23 + test + @@ -320,5 +334,35 @@ + + benchmark + + + + org.codehaus.mojo + exec-maven-plugin + + + run-benchmarks + test + + exec + + + test + java + + -classpath + + org.openjdk.jmh.Main + .* + + + + + + + + \ No newline at end of file diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java index d77a95f48b..b80d26ffba 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java @@ -231,12 +231,21 @@ SessionImpl createSession() { * sessions that could not be created. * * @param sessionCount The number of sessions to create. + * @param distributeOverChannels Whether to distribute the sessions over all available channels + * (true) or create all for the next channel round robin. * @param consumer The {@link SessionConsumer} to use for callbacks when sessions are available. */ - void asyncBatchCreateSessions(final int sessionCount, SessionConsumer consumer) { - // We spread the session creation evenly over all available channels. - int sessionCountPerChannel = sessionCount / spanner.getOptions().getNumChannels(); - int remainder = sessionCount % spanner.getOptions().getNumChannels(); + void asyncBatchCreateSessions( + final int sessionCount, boolean distributeOverChannels, SessionConsumer consumer) { + int sessionCountPerChannel; + int remainder; + if (distributeOverChannels) { + sessionCountPerChannel = sessionCount / spanner.getOptions().getNumChannels(); + remainder = sessionCount % spanner.getOptions().getNumChannels(); + } else { + sessionCountPerChannel = sessionCount; + remainder = 0; + } int numBeingCreated = 0; synchronized (this) { for (int channelIndex = 0; @@ -252,7 +261,7 @@ void asyncBatchCreateSessions(final int sessionCount, SessionConsumer consumer) if (channelIndex == 0) { createCountForChannel = sessionCountPerChannel + remainder; } - if (createCountForChannel > 0) { + if (createCountForChannel > 0 && numBeingCreated < sessionCount) { try { executor.submit( new BatchCreateSessionsRunnable( diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java index 5023f0fafb..0582f4b1f5 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java @@ -1077,7 +1077,7 @@ private void replenishPool() { // If we have gone below min pool size, create that many sessions. int sessionCount = options.getMinSessions() - (totalSessions() + numSessionsBeingCreated); if (sessionCount > 0) { - createSessions(getAllowedCreateSessions(sessionCount)); + createSessions(getAllowedCreateSessions(sessionCount), false); } } } @@ -1269,7 +1269,7 @@ private void initPool() { synchronized (lock) { poolMaintainer.init(); if (options.getMinSessions() > 0) { - createSessions(options.getMinSessions()); + createSessions(options.getMinSessions(), true); } } } @@ -1308,7 +1308,7 @@ private void invalidateSession(PooledSession session) { } allSessions.remove(session); // replenish the pool. - createSessions(getAllowedCreateSessions(1)); + createSessions(getAllowedCreateSessions(1), false); } } @@ -1507,7 +1507,7 @@ private void maybeCreateSession() { if (numWaiters() >= numSessionsBeingCreated) { if (canCreateSession()) { span.addAnnotation("Creating sessions"); - createSessions(getAllowedCreateSessions(numWaiters() - numSessionsBeingCreated + 1)); + createSessions(getAllowedCreateSessions(options.getIncStep()), false); } else if (options.isFailIfPoolExhausted()) { span.addAnnotation("Pool exhausted. Failing"); // throw specific exception @@ -1732,7 +1732,8 @@ public void run() { } // Create a new session if needed to unblock some waiter. if (numWaiters() > numSessionsBeingCreated) { - createSessions(getAllowedCreateSessions(numWaiters() - numSessionsBeingCreated)); + createSessions( + getAllowedCreateSessions(numWaiters() - numSessionsBeingCreated), false); } } } @@ -1794,7 +1795,7 @@ private boolean canCreateSession() { } } - private void createSessions(final int sessionCount) { + private void createSessions(final int sessionCount, boolean distributeOverChannels) { logger.log(Level.FINE, String.format("Creating %d sessions", sessionCount)); synchronized (lock) { numSessionsBeingCreated += sessionCount; @@ -1803,8 +1804,8 @@ private void createSessions(final int sessionCount) { // calls and the session consumer consumes the returned sessions as they become available. // The batchCreateSessions method automatically spreads the sessions evenly over all // available channels. - sessionClient.asyncBatchCreateSessions(sessionCount, sessionConsumer); - logger.log(Level.FINE, "Sessions created"); + sessionClient.asyncBatchCreateSessions( + sessionCount, distributeOverChannels, sessionConsumer); } catch (Throwable t) { // Expose this to customer via a metric. numSessionsBeingCreated -= sessionCount; diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java index 45289fb3cd..31bebac32d 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java @@ -24,9 +24,11 @@ public class SessionPoolOptions { // Default number of channels * 100. private static final int DEFAULT_MAX_SESSIONS = 400; private static final int DEFAULT_MIN_SESSIONS = 100; + private static final int DEFAULT_INC_STEP = 25; private static final ActionOnExhaustion DEFAULT_ACTION = ActionOnExhaustion.BLOCK; private final int minSessions; private final int maxSessions; + private final int incStep; private final int maxIdleSessions; private final float writeSessionsFraction; private final ActionOnExhaustion actionOnExhaustion; @@ -40,6 +42,7 @@ private SessionPoolOptions(Builder builder) { // maxSessions value is less than the default for minSessions. this.minSessions = Math.min(builder.minSessions, builder.maxSessions); this.maxSessions = builder.maxSessions; + this.incStep = builder.incStep; this.maxIdleSessions = builder.maxIdleSessions; this.writeSessionsFraction = builder.writeSessionsFraction; this.actionOnExhaustion = builder.actionOnExhaustion; @@ -56,6 +59,10 @@ public int getMaxSessions() { return maxSessions; } + int getIncStep() { + return incStep; + } + public int getMaxIdleSessions() { return maxIdleSessions; } @@ -105,6 +112,7 @@ public static class Builder { private boolean minSessionsSet = false; private int minSessions = DEFAULT_MIN_SESSIONS; private int maxSessions = DEFAULT_MAX_SESSIONS; + private int incStep = DEFAULT_INC_STEP; private int maxIdleSessions; private float writeSessionsFraction = 0.2f; private ActionOnExhaustion actionOnExhaustion = DEFAULT_ACTION; @@ -135,6 +143,16 @@ public Builder setMaxSessions(int maxSessions) { return this; } + /** + * Number of sessions to batch create when the pool needs at least one more session. Defaults to + * 25. + */ + Builder setIncStep(int incStep) { + Preconditions.checkArgument(incStep > 0, "incStep must be > 0"); + this.incStep = incStep; + return this; + } + /** * Maximum number of idle sessions that this pool will maintain. Pool will close any sessions * beyond this but making sure to always have at least as many sessions as specified by {@link diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchCreateSessionsTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchCreateSessionsTest.java index 5cbd163fae..222b393113 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchCreateSessionsTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchCreateSessionsTest.java @@ -104,6 +104,7 @@ public static void stopServer() throws InterruptedException { @Before public void setUp() throws IOException { mockSpanner.reset(); + mockSpanner.removeAllExecutionTimes(); } private Spanner createSpanner(int minSessions, int maxSessions) { @@ -245,7 +246,7 @@ public void testPrepareSessionFailPropagatesToUser() throws InterruptedException int maxSessions = 1000; DatabaseClientImpl client = null; mockSpanner.setBeginTransactionExecutionTime( - SimulatedExecutionTime.ofException( + SimulatedExecutionTime.ofStickyException( Status.ABORTED.withDescription("BeginTransaction failed").asRuntimeException())); try (Spanner spanner = createSpanner(minSessions, maxSessions)) { client = diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java index 118b2c57fe..e3d002948e 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java @@ -92,6 +92,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -469,6 +470,7 @@ private static void checkException(Queue exceptions, boolean keepExce private ConcurrentMap transactionLastUsed = new ConcurrentHashMap<>(); private int maxNumSessionsInOneBatch = 100; private int maxTotalSessions = Integer.MAX_VALUE; + private AtomicInteger numSessionsCreated = new AtomicInteger(); private SimulatedExecutionTime beginTransactionExecutionTime = NO_EXECUTION_TIME; private SimulatedExecutionTime commitExecutionTime = NO_EXECUTION_TIME; @@ -642,6 +644,7 @@ public void batchCreateSessions( if (sessions.size() <= maxTotalSessions) { sessionLastUsed.put(name, Instant.now()); response.addSession(session); + numSessionsCreated.incrementAndGet(); } else { sessions.remove(name); } @@ -687,6 +690,7 @@ public void createSession( Session prev = sessions.putIfAbsent(name, session); if (prev == null) { sessionLastUsed.put(name, Instant.now()); + numSessionsCreated.incrementAndGet(); responseObserver.onNext(session); responseObserver.onCompleted(); } else { @@ -1623,6 +1627,10 @@ private void partition( } } + public int numSessionsCreated() { + return numSessionsCreated.get(); + } + @Override public List getRequests() { return new ArrayList<>(this.requests); @@ -1652,6 +1660,7 @@ public ServerServiceDefinition getServiceDefinition() { public void reset() { requests.clear(); sessions.clear(); + numSessionsCreated.set(0); sessionLastUsed.clear(); transactions.clear(); isPartitionedDmlTransaction.clear(); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionClientTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionClientTest.java index ce8b8d4409..11a39334ca 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionClientTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionClientTest.java @@ -182,7 +182,7 @@ public void onSessionCreateFailure(Throwable t, int createFailureForSessionCount }; final int numSessions = 10; try (SessionClient client = new SessionClient(spanner, db, new TestExecutorFactory())) { - client.asyncBatchCreateSessions(numSessions, consumer); + client.asyncBatchCreateSessions(numSessions, true, consumer); } assertThat(returnedSessionCount.get()).isEqualTo(numSessions); assertThat(usedChannels.size()).isEqualTo(spannerOptions.getNumChannels()); @@ -275,7 +275,7 @@ public void onSessionCreateFailure(Throwable t, int createFailureForSessionCount }; final int numSessions = 10; try (SessionClient client = new SessionClient(spanner, db, new TestExecutorFactory())) { - client.asyncBatchCreateSessions(numSessions, consumer); + client.asyncBatchCreateSessions(numSessions, true, consumer); } assertThat(errorCount.get()).isEqualTo(errorOnChannels.size()); assertThat(returnedSessionCount.get()) @@ -330,7 +330,7 @@ public void onSessionCreateFailure(Throwable t, int createFailureForSessionCount // sessions. final int numSessions = 100; try (SessionClient client = new SessionClient(spanner, db, new TestExecutorFactory())) { - client.asyncBatchCreateSessions(numSessions, consumer); + client.asyncBatchCreateSessions(numSessions, true, consumer); } assertThat(returnedSessionCount.get()).isEqualTo(numSessions); } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolBenchmark.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolBenchmark.java new file mode 100644 index 0000000000..0e89c23015 --- /dev/null +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolBenchmark.java @@ -0,0 +1,388 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.api.gax.grpc.testing.LocalChannelProvider; +import com.google.cloud.NoCredentials; +import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime; +import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult; +import com.google.cloud.spanner.TransactionRunner.TransactionCallable; +import com.google.common.base.Predicate; +import com.google.common.collect.Collections2; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningScheduledExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.protobuf.AbstractMessage; +import com.google.protobuf.ListValue; +import com.google.spanner.v1.BatchCreateSessionsRequest; +import com.google.spanner.v1.ResultSetMetadata; +import com.google.spanner.v1.StructType; +import com.google.spanner.v1.StructType.Field; +import com.google.spanner.v1.TypeCode; +import io.grpc.Server; +import io.grpc.Status; +import io.grpc.inprocess.InProcessServerBuilder; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import org.openjdk.jmh.annotations.AuxCounters; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; + +/** + * Benchmarks for common session pool scenarios. The simulated execution times are based on + * reasonable estimates and are primarily intended to keep the benchmarks comparable with each other + * before and after changes have been made to the pool. The benchmarks are bound to the build + * profile `benchmark` and can be executed like this: `mvn test -Pbenchmark` + */ +@BenchmarkMode(Mode.AverageTime) +@Fork(value = 1, warmups = 0) +@Measurement(batchSize = 1, iterations = 1, timeUnit = TimeUnit.MILLISECONDS) +@Warmup(batchSize = 0, iterations = 0) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class SessionPoolBenchmark { + private static final String TEST_PROJECT = "my-project"; + private static final String TEST_INSTANCE = "my-instance"; + private static final String TEST_DATABASE = "my-database"; + private static final int HOLD_SESSION_TIME = 100; + private static final int RND_WAIT_TIME_BETWEEN_REQUESTS = 10; + private static final Random RND = new Random(); + + public static void main(String[] args) throws Exception { + org.openjdk.jmh.Main.main(args); + } + + @State(Scope.Thread) + @AuxCounters(org.openjdk.jmh.annotations.AuxCounters.Type.EVENTS) + public static class MockServer { + private static final int NETWORK_LATENCY_TIME = 10; + private static final int BATCH_CREATE_SESSIONS_MIN_TIME = 10; + private static final int BATCH_CREATE_SESSIONS_RND_TIME = 10; + private static final int BEGIN_TRANSACTION_MIN_TIME = 1; + private static final int BEGIN_TRANSACTION_RND_TIME = 1; + private static final int COMMIT_TRANSACTION_MIN_TIME = 5; + private static final int COMMIT_TRANSACTION_RND_TIME = 5; + private static final int ROLLBACK_TRANSACTION_MIN_TIME = 1; + private static final int ROLLBACK_TRANSACTION_RND_TIME = 1; + private static final int EXECUTE_STREAMING_SQL_MIN_TIME = 10; + private static final int EXECUTE_STREAMING_SQL_RND_TIME = 10; + private static final int EXECUTE_SQL_MIN_TIME = 10; + private static final int EXECUTE_SQL_RND_TIME = 10; + + private static final Statement UPDATE_STATEMENT = + Statement.of("UPDATE FOO SET BAR=1 WHERE BAZ=2"); + private static final Statement INVALID_UPDATE_STATEMENT = + Statement.of("UPDATE NON_EXISTENT_TABLE SET BAR=1 WHERE BAZ=2"); + private static final long UPDATE_COUNT = 1L; + private static final Statement SELECT1 = Statement.of("SELECT 1 AS COL1"); + private static final ResultSetMetadata SELECT1_METADATA = + ResultSetMetadata.newBuilder() + .setRowType( + StructType.newBuilder() + .addFields( + Field.newBuilder() + .setName("COL1") + .setType( + com.google.spanner.v1.Type.newBuilder() + .setCode(TypeCode.INT64) + .build()) + .build()) + .build()) + .build(); + private static final com.google.spanner.v1.ResultSet SELECT1_RESULTSET = + com.google.spanner.v1.ResultSet.newBuilder() + .addRows( + ListValue.newBuilder() + .addValues(com.google.protobuf.Value.newBuilder().setStringValue("1").build()) + .build()) + .setMetadata(SELECT1_METADATA) + .build(); + private MockSpannerServiceImpl mockSpanner; + private Server server; + private LocalChannelProvider channelProvider; + + private Spanner spanner; + private DatabaseClientImpl client; + + @Param({"100"}) + int minSessions; + + @Param({"400"}) + int maxSessions; + + @Param({"1", "10", "20", "25", "30", "40", "50", "100"}) + int incStep; + + @Param({"4"}) + int numChannels; + + @Param({"0.2"}) + float writeFraction; + + /** AuxCounter for number of RPCs. */ + public int numBatchCreateSessionsRpcs() { + return countRequests(BatchCreateSessionsRequest.class); + } + + /** AuxCounter for number of sessions created. */ + public int sessionsCreated() { + return mockSpanner.numSessionsCreated(); + } + + @Setup(Level.Invocation) + public void setup() throws Exception { + mockSpanner = new MockSpannerServiceImpl(); + mockSpanner.setAbortProbability( + 0.0D); // We don't want any unpredictable aborted transactions. + mockSpanner.putStatementResult(StatementResult.update(UPDATE_STATEMENT, UPDATE_COUNT)); + mockSpanner.putStatementResult(StatementResult.query(SELECT1, SELECT1_RESULTSET)); + mockSpanner.putStatementResult( + StatementResult.exception( + INVALID_UPDATE_STATEMENT, + Status.INVALID_ARGUMENT.withDescription("invalid statement").asRuntimeException())); + + mockSpanner.setBatchCreateSessionsExecutionTime( + SimulatedExecutionTime.ofMinimumAndRandomTime( + NETWORK_LATENCY_TIME + BATCH_CREATE_SESSIONS_MIN_TIME, + BATCH_CREATE_SESSIONS_RND_TIME)); + mockSpanner.setBeginTransactionExecutionTime( + SimulatedExecutionTime.ofMinimumAndRandomTime( + NETWORK_LATENCY_TIME + BEGIN_TRANSACTION_MIN_TIME, BEGIN_TRANSACTION_RND_TIME)); + mockSpanner.setCommitExecutionTime( + SimulatedExecutionTime.ofMinimumAndRandomTime( + NETWORK_LATENCY_TIME + COMMIT_TRANSACTION_MIN_TIME, COMMIT_TRANSACTION_RND_TIME)); + mockSpanner.setRollbackExecutionTime( + SimulatedExecutionTime.ofMinimumAndRandomTime( + NETWORK_LATENCY_TIME + ROLLBACK_TRANSACTION_MIN_TIME, ROLLBACK_TRANSACTION_RND_TIME)); + mockSpanner.setExecuteStreamingSqlExecutionTime( + SimulatedExecutionTime.ofMinimumAndRandomTime( + NETWORK_LATENCY_TIME + EXECUTE_STREAMING_SQL_MIN_TIME, + EXECUTE_STREAMING_SQL_RND_TIME)); + mockSpanner.setExecuteSqlExecutionTime( + SimulatedExecutionTime.ofMinimumAndRandomTime( + NETWORK_LATENCY_TIME + EXECUTE_SQL_MIN_TIME, EXECUTE_SQL_RND_TIME)); + + String uniqueName = InProcessServerBuilder.generateName(); + server = InProcessServerBuilder.forName(uniqueName).addService(mockSpanner).build().start(); + channelProvider = LocalChannelProvider.create(uniqueName); + + SpannerOptions options = + SpannerOptions.newBuilder() + .setProjectId(TEST_PROJECT) + .setChannelProvider(channelProvider) + .setNumChannels(numChannels) + .setCredentials(NoCredentials.getInstance()) + .setSessionPoolOption( + SessionPoolOptions.newBuilder() + .setMinSessions(minSessions) + .setMaxSessions(maxSessions) + .setIncStep(incStep) + .setWriteSessionsFraction(writeFraction) + .build()) + .build(); + + spanner = options.getService(); + client = + (DatabaseClientImpl) + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + // Wait until the session pool has initialized. + while (client.pool.getNumberOfSessionsInPool() + < spanner.getOptions().getSessionPoolOptions().getMinSessions()) { + Thread.sleep(1L); + } + } + + @TearDown(Level.Invocation) + public void teardown() throws Exception { + spanner.close(); + server.shutdown(); + server.awaitTermination(); + } + + int expectedStepsToMax() { + int remainder = (maxSessions - minSessions) % incStep == 0 ? 0 : 1; + return numChannels + ((maxSessions - minSessions) / incStep) + remainder; + } + + int countRequests(final Class type) { + return Collections2.filter( + mockSpanner.getRequests(), + new Predicate() { + @Override + public boolean apply(AbstractMessage input) { + return input.getClass().equals(type); + } + }) + .size(); + } + } + + /** Measures the time needed to execute a burst of read requests. */ + @Benchmark + public void burstRead(final MockServer server) throws Exception { + int totalQueries = server.maxSessions * 8; + int parallelThreads = server.maxSessions * 2; + final DatabaseClient client = + server.spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + SessionPool pool = ((DatabaseClientImpl) client).pool; + assertThat(pool.totalSessions()).isEqualTo(server.minSessions); + + ListeningScheduledExecutorService service = + MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(parallelThreads)); + List> futures = new ArrayList<>(totalQueries); + for (int i = 0; i < totalQueries; i++) { + futures.add( + service.submit( + new Callable() { + @Override + public Void call() throws Exception { + Thread.sleep(RND.nextInt(RND_WAIT_TIME_BETWEEN_REQUESTS)); + try (ResultSet rs = client.singleUse().executeQuery(MockServer.SELECT1)) { + while (rs.next()) { + Thread.sleep(RND.nextInt(HOLD_SESSION_TIME)); + } + return null; + } + } + })); + } + Futures.allAsList(futures).get(); + service.shutdown(); + } + + /** Measures the time needed to execute a burst of write requests. */ + @Benchmark + public void burstWrite(final MockServer server) throws Exception { + int totalWrites = server.maxSessions * 8; + int parallelThreads = server.maxSessions * 2; + final DatabaseClient client = + server.spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + SessionPool pool = ((DatabaseClientImpl) client).pool; + assertThat(pool.totalSessions()).isEqualTo(server.minSessions); + + ListeningScheduledExecutorService service = + MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(parallelThreads)); + List> futures = new ArrayList<>(totalWrites); + for (int i = 0; i < totalWrites; i++) { + futures.add( + service.submit( + new Callable() { + @Override + public Long call() throws Exception { + Thread.sleep(RND.nextInt(RND_WAIT_TIME_BETWEEN_REQUESTS)); + TransactionRunner runner = client.readWriteTransaction(); + return runner.run( + new TransactionCallable() { + @Override + public Long run(TransactionContext transaction) throws Exception { + return transaction.executeUpdate(MockServer.UPDATE_STATEMENT); + } + }); + } + })); + } + Futures.allAsList(futures).get(); + service.shutdown(); + } + + /** Measures the time needed to execute a burst of read and write requests. */ + @Benchmark + public void burstReadAndWrite(final MockServer server) throws Exception { + int totalWrites = server.maxSessions * 4; + int totalReads = server.maxSessions * 4; + int parallelThreads = server.maxSessions * 2; + final DatabaseClient client = + server.spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + SessionPool pool = ((DatabaseClientImpl) client).pool; + assertThat(pool.totalSessions()).isEqualTo(server.minSessions); + + ListeningScheduledExecutorService service = + MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(parallelThreads)); + List> futures = new ArrayList<>(totalReads + totalWrites); + for (int i = 0; i < totalWrites; i++) { + futures.add( + service.submit( + new Callable() { + @Override + public Long call() throws Exception { + Thread.sleep(RND.nextInt(RND_WAIT_TIME_BETWEEN_REQUESTS)); + TransactionRunner runner = client.readWriteTransaction(); + return runner.run( + new TransactionCallable() { + @Override + public Long run(TransactionContext transaction) throws Exception { + return transaction.executeUpdate(MockServer.UPDATE_STATEMENT); + } + }); + } + })); + } + for (int i = 0; i < totalReads; i++) { + futures.add( + service.submit( + new Callable() { + @Override + public Void call() throws Exception { + Thread.sleep(RND.nextInt(RND_WAIT_TIME_BETWEEN_REQUESTS)); + try (ResultSet rs = client.singleUse().executeQuery(MockServer.SELECT1)) { + while (rs.next()) { + Thread.sleep(RND.nextInt(HOLD_SESSION_TIME)); + } + return null; + } + } + })); + } + Futures.allAsList(futures).get(); + service.shutdown(); + } + + /** Measures the time needed to acquire MaxSessions session sequentially. */ + @Benchmark + public void steadyIncrease(MockServer server) throws Exception { + final DatabaseClient client = + server.spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + SessionPool pool = ((DatabaseClientImpl) client).pool; + assertThat(pool.totalSessions()).isEqualTo(server.minSessions); + + // Checkout maxSessions sessions by starting maxSessions read-only transactions sequentially. + List transactions = new ArrayList<>(server.maxSessions); + for (int i = 0; i < server.maxSessions; i++) { + transactions.add(client.readOnlyTransaction()); + } + for (ReadOnlyTransaction tx : transactions) { + tx.close(); + } + } +} diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolLeakTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolLeakTest.java index 0c92f8d461..dbafb9dd01 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolLeakTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolLeakTest.java @@ -81,11 +81,12 @@ public void setUp() throws Exception { .setChannelProvider(channelProvider) .setCredentials(NoCredentials.getInstance()); // Make sure the session pool is empty by default, does not contain any write-prepared sessions, - // and contains at most 2 sessions. + // contains at most 2 sessions, and creates sessions in steps of 1. builder.setSessionPoolOption( SessionPoolOptions.newBuilder() .setMinSessions(0) .setMaxSessions(2) + .setIncStep(1) .setWriteSessionsFraction(0.0f) .build()); spanner = builder.build().getService(); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolStressTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolStressTest.java index 60eb151f6e..f7c2f02a59 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolStressTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolStressTest.java @@ -128,7 +128,7 @@ public Void answer(InvocationOnMock invocation) throws Throwable { maxAliveSessions = sessions.size(); } SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); consumer.onSessionReady(session); } } @@ -136,7 +136,8 @@ public Void answer(InvocationOnMock invocation) throws Throwable { } }) .when(sessionClient) - .asyncBatchCreateSessions(Mockito.anyInt(), Mockito.any(SessionConsumer.class)); + .asyncBatchCreateSessions( + Mockito.anyInt(), Mockito.anyBoolean(), Mockito.any(SessionConsumer.class)); } private void setupSession(final Session session) { diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java index 6daa36f1d6..a9636dea1b 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java @@ -136,6 +136,7 @@ public void setUp() throws Exception { SessionPoolOptions.newBuilder() .setMinSessions(minSessions) .setMaxSessions(2) + .setIncStep(1) .setBlockIfPoolExhausted() .build(); } @@ -151,7 +152,7 @@ public Void answer(final InvocationOnMock invocation) throws Throwable { public void run() { int sessionCount = invocation.getArgumentAt(0, Integer.class); SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); for (int i = 0; i < sessionCount; i++) { consumer.onSessionReady(mockSession()); } @@ -161,7 +162,8 @@ public void run() { } }) .when(sessionClient) - .asyncBatchCreateSessions(Mockito.anyInt(), any(SessionConsumer.class)); + .asyncBatchCreateSessions( + Mockito.anyInt(), Mockito.anyBoolean(), any(SessionConsumer.class)); } @Test @@ -220,7 +222,7 @@ public Void answer(final InvocationOnMock invocation) throws Throwable { @Override public void run() { SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); consumer.onSessionReady(sessions.pop()); } }); @@ -228,7 +230,7 @@ public void run() { } }) .when(sessionClient) - .asyncBatchCreateSessions(Mockito.eq(1), any(SessionConsumer.class)); + .asyncBatchCreateSessions(Mockito.eq(1), Mockito.anyBoolean(), any(SessionConsumer.class)); pool = createPool(); Session session1 = pool.getReadSession(); // Leaked sessions @@ -278,7 +280,7 @@ public Void answer(final InvocationOnMock invocation) throws Throwable { @Override public void run() { SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); consumer.onSessionReady(session1); } }); @@ -296,7 +298,7 @@ public Void call() throws Exception { insideCreation.countDown(); releaseCreation.await(); SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); consumer.onSessionReady(session2); return null; } @@ -305,7 +307,7 @@ public Void call() throws Exception { } }) .when(sessionClient) - .asyncBatchCreateSessions(Mockito.eq(1), any(SessionConsumer.class)); + .asyncBatchCreateSessions(Mockito.eq(1), Mockito.anyBoolean(), any(SessionConsumer.class)); pool = createPool(); PooledSession leakedSession = pool.getReadSession(); @@ -336,7 +338,7 @@ public Void answer(final InvocationOnMock invocation) throws Throwable { @Override public void run() { SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); consumer.onSessionReady(session1); } }); @@ -354,7 +356,7 @@ public Void call() throws Exception { insideCreation.countDown(); releaseCreation.await(); SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); consumer.onSessionReady(session2); return null; } @@ -363,7 +365,7 @@ public Void call() throws Exception { } }) .when(sessionClient) - .asyncBatchCreateSessions(Mockito.eq(1), any(SessionConsumer.class)); + .asyncBatchCreateSessions(Mockito.eq(1), Mockito.anyBoolean(), any(SessionConsumer.class)); pool = createPool(); PooledSession leakedSession = pool.getReadSession(); @@ -394,7 +396,7 @@ public Void call() throws Exception { insideCreation.countDown(); releaseCreation.await(); SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); consumer.onSessionCreateFailure( SpannerExceptionFactory.newSpannerException(new RuntimeException()), 1); return null; @@ -404,7 +406,7 @@ public Void call() throws Exception { } }) .when(sessionClient) - .asyncBatchCreateSessions(Mockito.eq(1), any(SessionConsumer.class)); + .asyncBatchCreateSessions(Mockito.eq(1), Mockito.anyBoolean(), any(SessionConsumer.class)); pool = createPool(); AtomicBoolean failed = new AtomicBoolean(false); CountDownLatch latch = new CountDownLatch(1); @@ -428,7 +430,7 @@ public Void answer(final InvocationOnMock invocation) throws Throwable { @Override public void run() { SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); consumer.onSessionReady(session); } }); @@ -436,7 +438,7 @@ public void run() { } }) .when(sessionClient) - .asyncBatchCreateSessions(Mockito.eq(1), any(SessionConsumer.class)); + .asyncBatchCreateSessions(Mockito.eq(1), Mockito.anyBoolean(), any(SessionConsumer.class)); final CountDownLatch insidePrepare = new CountDownLatch(1); final CountDownLatch releasePrepare = new CountDownLatch(1); doAnswer( @@ -473,7 +475,7 @@ public Void answer(final InvocationOnMock invocation) throws Throwable { @Override public void run() { SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); consumer.onSessionReady(session); } }); @@ -481,7 +483,7 @@ public void run() { } }) .when(sessionClient) - .asyncBatchCreateSessions(Mockito.eq(1), any(SessionConsumer.class)); + .asyncBatchCreateSessions(Mockito.eq(1), Mockito.anyBoolean(), any(SessionConsumer.class)); pool = createPool(); PooledSession leakedSession = pool.getReadSession(); // Suppress expected leakedSession warning. @@ -503,7 +505,7 @@ public void atMostMaxSessionsCreated() { } Uninterruptibles.awaitUninterruptibly(latch); verify(sessionClient, atMost(options.getMaxSessions())) - .asyncBatchCreateSessions(eq(1), any(SessionConsumer.class)); + .asyncBatchCreateSessions(eq(1), Mockito.anyBoolean(), any(SessionConsumer.class)); assertThat(failed.get()).isFalse(); } @@ -518,7 +520,7 @@ public Void answer(final InvocationOnMock invocation) throws Throwable { @Override public Void call() throws Exception { SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); consumer.onSessionCreateFailure( SpannerExceptionFactory.newSpannerException(ErrorCode.INTERNAL, ""), 1); return null; @@ -528,7 +530,7 @@ public Void call() throws Exception { } }) .when(sessionClient) - .asyncBatchCreateSessions(Mockito.eq(1), any(SessionConsumer.class)); + .asyncBatchCreateSessions(Mockito.eq(1), Mockito.anyBoolean(), any(SessionConsumer.class)); pool = createPool(); expectedException.expect(isSpannerException(ErrorCode.INTERNAL)); pool.getReadSession(); @@ -545,7 +547,7 @@ public Void answer(final InvocationOnMock invocation) throws Throwable { @Override public Void call() throws Exception { SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); consumer.onSessionCreateFailure( SpannerExceptionFactory.newSpannerException(ErrorCode.INTERNAL, ""), 1); return null; @@ -555,7 +557,7 @@ public Void call() throws Exception { } }) .when(sessionClient) - .asyncBatchCreateSessions(Mockito.eq(1), any(SessionConsumer.class)); + .asyncBatchCreateSessions(Mockito.eq(1), Mockito.anyBoolean(), any(SessionConsumer.class)); pool = createPool(); expectedException.expect(isSpannerException(ErrorCode.INTERNAL)); pool.getReadWriteSession(); @@ -573,7 +575,7 @@ public Void answer(final InvocationOnMock invocation) throws Throwable { @Override public void run() { SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); consumer.onSessionReady(session); } }); @@ -581,7 +583,7 @@ public void run() { } }) .when(sessionClient) - .asyncBatchCreateSessions(Mockito.eq(1), any(SessionConsumer.class)); + .asyncBatchCreateSessions(Mockito.eq(1), Mockito.anyBoolean(), any(SessionConsumer.class)); doThrow(SpannerExceptionFactory.newSpannerException(ErrorCode.INTERNAL, "")) .when(session) .prepareReadWriteTransaction(); @@ -602,7 +604,7 @@ public Void answer(final InvocationOnMock invocation) throws Throwable { @Override public void run() { SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); consumer.onSessionReady(mockSession); } }); @@ -610,7 +612,7 @@ public void run() { } }) .when(sessionClient) - .asyncBatchCreateSessions(Mockito.eq(1), any(SessionConsumer.class)); + .asyncBatchCreateSessions(Mockito.eq(1), Mockito.anyBoolean(), any(SessionConsumer.class)); pool = createPool(); try (Session session = pool.getReadWriteSession()) { assertThat(session).isNotNull(); @@ -633,7 +635,7 @@ public Void answer(final InvocationOnMock invocation) throws Throwable { @Override public void run() { SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); consumer.onSessionReady(sessions.pop()); } }); @@ -641,7 +643,7 @@ public void run() { } }) .when(sessionClient) - .asyncBatchCreateSessions(Mockito.eq(1), any(SessionConsumer.class)); + .asyncBatchCreateSessions(Mockito.eq(1), Mockito.anyBoolean(), any(SessionConsumer.class)); pool = createPool(); Session session1 = pool.getReadWriteSession(); Session session2 = pool.getReadWriteSession(); @@ -664,7 +666,7 @@ public Void answer(final InvocationOnMock invocation) throws Throwable { @Override public void run() { SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); consumer.onSessionReady(session); } }); @@ -672,7 +674,7 @@ public void run() { } }) .when(sessionClient) - .asyncBatchCreateSessions(Mockito.eq(1), any(SessionConsumer.class)); + .asyncBatchCreateSessions(Mockito.eq(1), Mockito.anyBoolean(), any(SessionConsumer.class)); pool = createPool(); int numSessions = 5; @@ -719,7 +721,7 @@ public Void answer(final InvocationOnMock invocation) throws Throwable { @Override public void run() { SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); consumer.onSessionReady(mockSession1); consumer.onSessionReady(mockSession2); } @@ -728,7 +730,7 @@ public void run() { } }) .when(sessionClient) - .asyncBatchCreateSessions(Mockito.eq(2), any(SessionConsumer.class)); + .asyncBatchCreateSessions(Mockito.eq(2), Mockito.anyBoolean(), any(SessionConsumer.class)); options = SessionPoolOptions.newBuilder() @@ -770,7 +772,7 @@ public Void answer(final InvocationOnMock invocation) throws Throwable { @Override public void run() { SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); consumer.onSessionReady(mockSession1); } }); @@ -778,7 +780,7 @@ public void run() { } }) .when(sessionClient) - .asyncBatchCreateSessions(Mockito.eq(1), any(SessionConsumer.class)); + .asyncBatchCreateSessions(Mockito.eq(1), Mockito.anyBoolean(), any(SessionConsumer.class)); options = SessionPoolOptions.newBuilder() .setMinSessions(minSessions) @@ -810,7 +812,7 @@ public Void answer(final InvocationOnMock invocation) throws Throwable { @Override public void run() { SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); consumer.onSessionReady(mockSession()); } }); @@ -818,7 +820,7 @@ public void run() { } }) .when(sessionClient) - .asyncBatchCreateSessions(Mockito.eq(1), any(SessionConsumer.class)); + .asyncBatchCreateSessions(Mockito.eq(1), Mockito.anyBoolean(), any(SessionConsumer.class)); pool = createPool(); Session session1 = pool.getReadSession(); expectedException.expect(isSpannerException(ErrorCode.RESOURCE_EXHAUSTED)); @@ -847,7 +849,7 @@ public Void answer(final InvocationOnMock invocation) throws Throwable { @Override public void run() { SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); consumer.onSessionReady(sessions.pop()); } }); @@ -855,7 +857,7 @@ public void run() { } }) .when(sessionClient) - .asyncBatchCreateSessions(Mockito.eq(1), any(SessionConsumer.class)); + .asyncBatchCreateSessions(Mockito.eq(1), Mockito.anyBoolean(), any(SessionConsumer.class)); pool = createPool(); assertThat(pool.getReadWriteSession().delegate).isEqualTo(mockSession2); } @@ -866,6 +868,7 @@ public void idleSessionCleanup() throws Exception { SessionPoolOptions.newBuilder() .setMinSessions(1) .setMaxSessions(3) + .setIncStep(1) .setMaxIdleSessions(0) .build(); SessionImpl session1 = mockSession(); @@ -883,7 +886,7 @@ public Void answer(final InvocationOnMock invocation) throws Throwable { @Override public void run() { SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); consumer.onSessionReady(sessions.pop()); } }); @@ -891,7 +894,7 @@ public void run() { } }) .when(sessionClient) - .asyncBatchCreateSessions(Mockito.eq(1), any(SessionConsumer.class)); + .asyncBatchCreateSessions(Mockito.eq(1), Mockito.anyBoolean(), any(SessionConsumer.class)); for (Session session : new Session[] {session1, session2, session3}) { doAnswer( new Answer>() { @@ -950,7 +953,7 @@ public Void answer(final InvocationOnMock invocation) throws Throwable { public void run() { int sessionCount = invocation.getArgumentAt(0, Integer.class); SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); for (int i = 0; i < sessionCount; i++) { consumer.onSessionReady(session); } @@ -960,7 +963,7 @@ public void run() { } }) .when(sessionClient) - .asyncBatchCreateSessions(anyInt(), any(SessionConsumer.class)); + .asyncBatchCreateSessions(anyInt(), Mockito.anyBoolean(), any(SessionConsumer.class)); FakeClock clock = new FakeClock(); clock.currentTimeMillis = System.currentTimeMillis(); pool = createPool(clock); @@ -1066,7 +1069,7 @@ public Void answer(final InvocationOnMock invocation) throws Throwable { @Override public void run() { SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); consumer.onSessionReady(closedSession); } }); @@ -1082,7 +1085,7 @@ public Void answer(final InvocationOnMock invocation) throws Throwable { @Override public void run() { SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); consumer.onSessionReady(openSession); } }); @@ -1090,7 +1093,7 @@ public void run() { } }) .when(sessionClient) - .asyncBatchCreateSessions(Mockito.eq(1), any(SessionConsumer.class)); + .asyncBatchCreateSessions(Mockito.eq(1), Mockito.anyBoolean(), any(SessionConsumer.class)); FakeClock clock = new FakeClock(); clock.currentTimeMillis = System.currentTimeMillis(); pool = createPool(clock); @@ -1122,7 +1125,7 @@ public Void answer(final InvocationOnMock invocation) throws Throwable { @Override public void run() { SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); consumer.onSessionReady(closedSession); } }); @@ -1138,7 +1141,7 @@ public Void answer(final InvocationOnMock invocation) throws Throwable { @Override public void run() { SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); consumer.onSessionReady(openSession); } }); @@ -1146,7 +1149,7 @@ public void run() { } }) .when(sessionClient) - .asyncBatchCreateSessions(Mockito.eq(1), any(SessionConsumer.class)); + .asyncBatchCreateSessions(Mockito.eq(1), Mockito.anyBoolean(), any(SessionConsumer.class)); FakeClock clock = new FakeClock(); clock.currentTimeMillis = System.currentTimeMillis(); pool = createPool(clock); @@ -1244,7 +1247,7 @@ public Void answer(final InvocationOnMock invocation) throws Throwable { @Override public void run() { SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); consumer.onSessionReady(closedSession); } }); @@ -1260,7 +1263,7 @@ public Void answer(final InvocationOnMock invocation) throws Throwable { @Override public void run() { SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); consumer.onSessionReady(openSession); } }); @@ -1268,11 +1271,13 @@ public void run() { } }) .when(sessionClient) - .asyncBatchCreateSessions(Mockito.eq(1), any(SessionConsumer.class)); + .asyncBatchCreateSessions( + Mockito.eq(1), Mockito.anyBoolean(), any(SessionConsumer.class)); SessionPoolOptions options = SessionPoolOptions.newBuilder() .setMinSessions(0) // The pool should not auto-create any sessions .setMaxSessions(2) + .setIncStep(1) .setBlockIfPoolExhausted() .build(); SpannerOptions spannerOptions = mock(SpannerOptions.class); @@ -1378,7 +1383,7 @@ public Void answer(final InvocationOnMock invocation) throws Throwable { @Override public void run() { SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); consumer.onSessionReady(closedSession); } }); @@ -1394,7 +1399,7 @@ public Void answer(final InvocationOnMock invocation) throws Throwable { @Override public void run() { SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); consumer.onSessionReady(openSession); } }); @@ -1402,7 +1407,7 @@ public void run() { } }) .when(sessionClient) - .asyncBatchCreateSessions(Mockito.eq(1), any(SessionConsumer.class)); + .asyncBatchCreateSessions(Mockito.eq(1), Mockito.anyBoolean(), any(SessionConsumer.class)); FakeClock clock = new FakeClock(); clock.currentTimeMillis = System.currentTimeMillis(); pool = createPool(clock); @@ -1429,7 +1434,7 @@ public Void answer(final InvocationOnMock invocation) throws Throwable { @Override public void run() { SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); consumer.onSessionReady(closedSession); } }); @@ -1445,7 +1450,7 @@ public Void answer(final InvocationOnMock invocation) throws Throwable { @Override public void run() { SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); consumer.onSessionReady(openSession); } }); @@ -1453,7 +1458,7 @@ public void run() { } }) .when(sessionClient) - .asyncBatchCreateSessions(Mockito.eq(1), any(SessionConsumer.class)); + .asyncBatchCreateSessions(Mockito.eq(1), Mockito.anyBoolean(), any(SessionConsumer.class)); FakeClock clock = new FakeClock(); clock.currentTimeMillis = System.currentTimeMillis(); @@ -1481,7 +1486,7 @@ public Void answer(final InvocationOnMock invocation) throws Throwable { @Override public void run() { SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); consumer.onSessionReady(closedSession); } }); @@ -1497,7 +1502,7 @@ public Void answer(final InvocationOnMock invocation) throws Throwable { @Override public void run() { SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); consumer.onSessionReady(openSession); } }); @@ -1505,7 +1510,7 @@ public void run() { } }) .when(sessionClient) - .asyncBatchCreateSessions(Mockito.eq(1), any(SessionConsumer.class)); + .asyncBatchCreateSessions(Mockito.eq(1), Mockito.anyBoolean(), any(SessionConsumer.class)); FakeClock clock = new FakeClock(); clock.currentTimeMillis = System.currentTimeMillis(); pool = createPool(clock); @@ -1532,7 +1537,7 @@ public Void answer(final InvocationOnMock invocation) throws Throwable { @Override public void run() { SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); consumer.onSessionReady(closedSession); } }); @@ -1548,7 +1553,7 @@ public Void answer(final InvocationOnMock invocation) throws Throwable { @Override public void run() { SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); consumer.onSessionReady(openSession); } }); @@ -1556,7 +1561,7 @@ public void run() { } }) .when(sessionClient) - .asyncBatchCreateSessions(Mockito.eq(1), any(SessionConsumer.class)); + .asyncBatchCreateSessions(Mockito.eq(1), Mockito.anyBoolean(), any(SessionConsumer.class)); FakeClock clock = new FakeClock(); clock.currentTimeMillis = System.currentTimeMillis(); pool = createPool(clock); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpanTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpanTest.java index e60522dc1d..eb1bb67e89 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpanTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpanTest.java @@ -166,7 +166,11 @@ public void setUp() throws Exception { .setProjectId(TEST_PROJECT) .setChannelProvider(channelProvider) .setCredentials(NoCredentials.getInstance()) - .setSessionPoolOption(SessionPoolOptions.newBuilder().setMinSessions(0).build()); + .setSessionPoolOption( + SessionPoolOptions.newBuilder() + .setMinSessions(0) + .setWriteSessionsFraction(0.0f) + .build()); spanner = builder.build().getService(); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java index cc522f3f45..cd94d82800 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java @@ -196,7 +196,7 @@ public void usesPreparedTransaction() { when(transportOptions.getExecutorFactory()).thenReturn(new TestExecutorFactory()); when(options.getTransportOptions()).thenReturn(transportOptions); SessionPoolOptions sessionPoolOptions = - SessionPoolOptions.newBuilder().setMinSessions(0).build(); + SessionPoolOptions.newBuilder().setMinSessions(0).setIncStep(1).build(); when(options.getSessionPoolOptions()).thenReturn(sessionPoolOptions); when(options.getSessionLabels()).thenReturn(Collections.emptyMap()); SpannerRpc rpc = mock(SpannerRpc.class); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java index 3dcd523c13..c45f46fbd1 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java @@ -106,7 +106,7 @@ public void usesPreparedTransaction() { when(transportOptions.getExecutorFactory()).thenReturn(new TestExecutorFactory()); when(options.getTransportOptions()).thenReturn(transportOptions); SessionPoolOptions sessionPoolOptions = - SessionPoolOptions.newBuilder().setMinSessions(0).build(); + SessionPoolOptions.newBuilder().setMinSessions(0).setIncStep(1).build(); when(options.getSessionPoolOptions()).thenReturn(sessionPoolOptions); when(options.getSessionLabels()).thenReturn(Collections.emptyMap()); SpannerRpc rpc = mock(SpannerRpc.class);