diff --git a/google-cloud-spanner/pom.xml b/google-cloud-spanner/pom.xml index 61fba21c4c..d048e1d848 100644 --- a/google-cloud-spanner/pom.xml +++ b/google-cloud-spanner/pom.xml @@ -111,7 +111,7 @@ org.apache.maven.plugins maven-dependency-plugin - io.grpc:grpc-protobuf-lite,org.hamcrest:hamcrest,org.hamcrest:hamcrest-core,com.google.errorprone:error_prone_annotations,com.google.api.grpc:grpc-google-cloud-spanner-v1,com.google.api.grpc:grpc-google-cloud-spanner-admin-instance-v1,com.google.api.grpc:grpc-google-cloud-spanner-admin-database-v1 + io.grpc:grpc-protobuf-lite,org.hamcrest:hamcrest,org.hamcrest:hamcrest-core,com.google.errorprone:error_prone_annotations,org.openjdk.jmh:jmh-generator-annprocess,com.google.api.grpc:grpc-google-cloud-spanner-v1,com.google.api.grpc:grpc-google-cloud-spanner-admin-instance-v1,com.google.api.grpc:grpc-google-cloud-spanner-admin-database-v1 @@ -305,6 +305,20 @@ 2.2 test + + + + org.openjdk.jmh + jmh-core + 1.23 + test + + + org.openjdk.jmh + jmh-generator-annprocess + 1.23 + test + @@ -320,5 +334,35 @@ + + benchmark + + + + org.codehaus.mojo + exec-maven-plugin + + + run-benchmarks + test + + exec + + + test + java + + -classpath + + org.openjdk.jmh.Main + .* + + + + + + + + \ No newline at end of file diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java index d77a95f48b..b80d26ffba 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java @@ -231,12 +231,21 @@ SessionImpl createSession() { * sessions that could not be created. * * @param sessionCount The number of sessions to create. + * @param distributeOverChannels Whether to distribute the sessions over all available channels + * (true) or create all for the next channel round robin. * @param consumer The {@link SessionConsumer} to use for callbacks when sessions are available. */ - void asyncBatchCreateSessions(final int sessionCount, SessionConsumer consumer) { - // We spread the session creation evenly over all available channels. - int sessionCountPerChannel = sessionCount / spanner.getOptions().getNumChannels(); - int remainder = sessionCount % spanner.getOptions().getNumChannels(); + void asyncBatchCreateSessions( + final int sessionCount, boolean distributeOverChannels, SessionConsumer consumer) { + int sessionCountPerChannel; + int remainder; + if (distributeOverChannels) { + sessionCountPerChannel = sessionCount / spanner.getOptions().getNumChannels(); + remainder = sessionCount % spanner.getOptions().getNumChannels(); + } else { + sessionCountPerChannel = sessionCount; + remainder = 0; + } int numBeingCreated = 0; synchronized (this) { for (int channelIndex = 0; @@ -252,7 +261,7 @@ void asyncBatchCreateSessions(final int sessionCount, SessionConsumer consumer) if (channelIndex == 0) { createCountForChannel = sessionCountPerChannel + remainder; } - if (createCountForChannel > 0) { + if (createCountForChannel > 0 && numBeingCreated < sessionCount) { try { executor.submit( new BatchCreateSessionsRunnable( diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java index 5023f0fafb..0582f4b1f5 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java @@ -1077,7 +1077,7 @@ private void replenishPool() { // If we have gone below min pool size, create that many sessions. int sessionCount = options.getMinSessions() - (totalSessions() + numSessionsBeingCreated); if (sessionCount > 0) { - createSessions(getAllowedCreateSessions(sessionCount)); + createSessions(getAllowedCreateSessions(sessionCount), false); } } } @@ -1269,7 +1269,7 @@ private void initPool() { synchronized (lock) { poolMaintainer.init(); if (options.getMinSessions() > 0) { - createSessions(options.getMinSessions()); + createSessions(options.getMinSessions(), true); } } } @@ -1308,7 +1308,7 @@ private void invalidateSession(PooledSession session) { } allSessions.remove(session); // replenish the pool. - createSessions(getAllowedCreateSessions(1)); + createSessions(getAllowedCreateSessions(1), false); } } @@ -1507,7 +1507,7 @@ private void maybeCreateSession() { if (numWaiters() >= numSessionsBeingCreated) { if (canCreateSession()) { span.addAnnotation("Creating sessions"); - createSessions(getAllowedCreateSessions(numWaiters() - numSessionsBeingCreated + 1)); + createSessions(getAllowedCreateSessions(options.getIncStep()), false); } else if (options.isFailIfPoolExhausted()) { span.addAnnotation("Pool exhausted. Failing"); // throw specific exception @@ -1732,7 +1732,8 @@ public void run() { } // Create a new session if needed to unblock some waiter. if (numWaiters() > numSessionsBeingCreated) { - createSessions(getAllowedCreateSessions(numWaiters() - numSessionsBeingCreated)); + createSessions( + getAllowedCreateSessions(numWaiters() - numSessionsBeingCreated), false); } } } @@ -1794,7 +1795,7 @@ private boolean canCreateSession() { } } - private void createSessions(final int sessionCount) { + private void createSessions(final int sessionCount, boolean distributeOverChannels) { logger.log(Level.FINE, String.format("Creating %d sessions", sessionCount)); synchronized (lock) { numSessionsBeingCreated += sessionCount; @@ -1803,8 +1804,8 @@ private void createSessions(final int sessionCount) { // calls and the session consumer consumes the returned sessions as they become available. // The batchCreateSessions method automatically spreads the sessions evenly over all // available channels. - sessionClient.asyncBatchCreateSessions(sessionCount, sessionConsumer); - logger.log(Level.FINE, "Sessions created"); + sessionClient.asyncBatchCreateSessions( + sessionCount, distributeOverChannels, sessionConsumer); } catch (Throwable t) { // Expose this to customer via a metric. numSessionsBeingCreated -= sessionCount; diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java index 45289fb3cd..31bebac32d 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java @@ -24,9 +24,11 @@ public class SessionPoolOptions { // Default number of channels * 100. private static final int DEFAULT_MAX_SESSIONS = 400; private static final int DEFAULT_MIN_SESSIONS = 100; + private static final int DEFAULT_INC_STEP = 25; private static final ActionOnExhaustion DEFAULT_ACTION = ActionOnExhaustion.BLOCK; private final int minSessions; private final int maxSessions; + private final int incStep; private final int maxIdleSessions; private final float writeSessionsFraction; private final ActionOnExhaustion actionOnExhaustion; @@ -40,6 +42,7 @@ private SessionPoolOptions(Builder builder) { // maxSessions value is less than the default for minSessions. this.minSessions = Math.min(builder.minSessions, builder.maxSessions); this.maxSessions = builder.maxSessions; + this.incStep = builder.incStep; this.maxIdleSessions = builder.maxIdleSessions; this.writeSessionsFraction = builder.writeSessionsFraction; this.actionOnExhaustion = builder.actionOnExhaustion; @@ -56,6 +59,10 @@ public int getMaxSessions() { return maxSessions; } + int getIncStep() { + return incStep; + } + public int getMaxIdleSessions() { return maxIdleSessions; } @@ -105,6 +112,7 @@ public static class Builder { private boolean minSessionsSet = false; private int minSessions = DEFAULT_MIN_SESSIONS; private int maxSessions = DEFAULT_MAX_SESSIONS; + private int incStep = DEFAULT_INC_STEP; private int maxIdleSessions; private float writeSessionsFraction = 0.2f; private ActionOnExhaustion actionOnExhaustion = DEFAULT_ACTION; @@ -135,6 +143,16 @@ public Builder setMaxSessions(int maxSessions) { return this; } + /** + * Number of sessions to batch create when the pool needs at least one more session. Defaults to + * 25. + */ + Builder setIncStep(int incStep) { + Preconditions.checkArgument(incStep > 0, "incStep must be > 0"); + this.incStep = incStep; + return this; + } + /** * Maximum number of idle sessions that this pool will maintain. Pool will close any sessions * beyond this but making sure to always have at least as many sessions as specified by {@link diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchCreateSessionsTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchCreateSessionsTest.java index 5cbd163fae..222b393113 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchCreateSessionsTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchCreateSessionsTest.java @@ -104,6 +104,7 @@ public static void stopServer() throws InterruptedException { @Before public void setUp() throws IOException { mockSpanner.reset(); + mockSpanner.removeAllExecutionTimes(); } private Spanner createSpanner(int minSessions, int maxSessions) { @@ -245,7 +246,7 @@ public void testPrepareSessionFailPropagatesToUser() throws InterruptedException int maxSessions = 1000; DatabaseClientImpl client = null; mockSpanner.setBeginTransactionExecutionTime( - SimulatedExecutionTime.ofException( + SimulatedExecutionTime.ofStickyException( Status.ABORTED.withDescription("BeginTransaction failed").asRuntimeException())); try (Spanner spanner = createSpanner(minSessions, maxSessions)) { client = diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java index 118b2c57fe..e3d002948e 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java @@ -92,6 +92,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -469,6 +470,7 @@ private static void checkException(Queue exceptions, boolean keepExce private ConcurrentMap transactionLastUsed = new ConcurrentHashMap<>(); private int maxNumSessionsInOneBatch = 100; private int maxTotalSessions = Integer.MAX_VALUE; + private AtomicInteger numSessionsCreated = new AtomicInteger(); private SimulatedExecutionTime beginTransactionExecutionTime = NO_EXECUTION_TIME; private SimulatedExecutionTime commitExecutionTime = NO_EXECUTION_TIME; @@ -642,6 +644,7 @@ public void batchCreateSessions( if (sessions.size() <= maxTotalSessions) { sessionLastUsed.put(name, Instant.now()); response.addSession(session); + numSessionsCreated.incrementAndGet(); } else { sessions.remove(name); } @@ -687,6 +690,7 @@ public void createSession( Session prev = sessions.putIfAbsent(name, session); if (prev == null) { sessionLastUsed.put(name, Instant.now()); + numSessionsCreated.incrementAndGet(); responseObserver.onNext(session); responseObserver.onCompleted(); } else { @@ -1623,6 +1627,10 @@ private void partition( } } + public int numSessionsCreated() { + return numSessionsCreated.get(); + } + @Override public List getRequests() { return new ArrayList<>(this.requests); @@ -1652,6 +1660,7 @@ public ServerServiceDefinition getServiceDefinition() { public void reset() { requests.clear(); sessions.clear(); + numSessionsCreated.set(0); sessionLastUsed.clear(); transactions.clear(); isPartitionedDmlTransaction.clear(); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionClientTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionClientTest.java index ce8b8d4409..11a39334ca 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionClientTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionClientTest.java @@ -182,7 +182,7 @@ public void onSessionCreateFailure(Throwable t, int createFailureForSessionCount }; final int numSessions = 10; try (SessionClient client = new SessionClient(spanner, db, new TestExecutorFactory())) { - client.asyncBatchCreateSessions(numSessions, consumer); + client.asyncBatchCreateSessions(numSessions, true, consumer); } assertThat(returnedSessionCount.get()).isEqualTo(numSessions); assertThat(usedChannels.size()).isEqualTo(spannerOptions.getNumChannels()); @@ -275,7 +275,7 @@ public void onSessionCreateFailure(Throwable t, int createFailureForSessionCount }; final int numSessions = 10; try (SessionClient client = new SessionClient(spanner, db, new TestExecutorFactory())) { - client.asyncBatchCreateSessions(numSessions, consumer); + client.asyncBatchCreateSessions(numSessions, true, consumer); } assertThat(errorCount.get()).isEqualTo(errorOnChannels.size()); assertThat(returnedSessionCount.get()) @@ -330,7 +330,7 @@ public void onSessionCreateFailure(Throwable t, int createFailureForSessionCount // sessions. final int numSessions = 100; try (SessionClient client = new SessionClient(spanner, db, new TestExecutorFactory())) { - client.asyncBatchCreateSessions(numSessions, consumer); + client.asyncBatchCreateSessions(numSessions, true, consumer); } assertThat(returnedSessionCount.get()).isEqualTo(numSessions); } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolBenchmark.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolBenchmark.java new file mode 100644 index 0000000000..0e89c23015 --- /dev/null +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolBenchmark.java @@ -0,0 +1,388 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.api.gax.grpc.testing.LocalChannelProvider; +import com.google.cloud.NoCredentials; +import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime; +import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult; +import com.google.cloud.spanner.TransactionRunner.TransactionCallable; +import com.google.common.base.Predicate; +import com.google.common.collect.Collections2; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningScheduledExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.protobuf.AbstractMessage; +import com.google.protobuf.ListValue; +import com.google.spanner.v1.BatchCreateSessionsRequest; +import com.google.spanner.v1.ResultSetMetadata; +import com.google.spanner.v1.StructType; +import com.google.spanner.v1.StructType.Field; +import com.google.spanner.v1.TypeCode; +import io.grpc.Server; +import io.grpc.Status; +import io.grpc.inprocess.InProcessServerBuilder; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import org.openjdk.jmh.annotations.AuxCounters; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; + +/** + * Benchmarks for common session pool scenarios. The simulated execution times are based on + * reasonable estimates and are primarily intended to keep the benchmarks comparable with each other + * before and after changes have been made to the pool. The benchmarks are bound to the build + * profile `benchmark` and can be executed like this: `mvn test -Pbenchmark` + */ +@BenchmarkMode(Mode.AverageTime) +@Fork(value = 1, warmups = 0) +@Measurement(batchSize = 1, iterations = 1, timeUnit = TimeUnit.MILLISECONDS) +@Warmup(batchSize = 0, iterations = 0) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class SessionPoolBenchmark { + private static final String TEST_PROJECT = "my-project"; + private static final String TEST_INSTANCE = "my-instance"; + private static final String TEST_DATABASE = "my-database"; + private static final int HOLD_SESSION_TIME = 100; + private static final int RND_WAIT_TIME_BETWEEN_REQUESTS = 10; + private static final Random RND = new Random(); + + public static void main(String[] args) throws Exception { + org.openjdk.jmh.Main.main(args); + } + + @State(Scope.Thread) + @AuxCounters(org.openjdk.jmh.annotations.AuxCounters.Type.EVENTS) + public static class MockServer { + private static final int NETWORK_LATENCY_TIME = 10; + private static final int BATCH_CREATE_SESSIONS_MIN_TIME = 10; + private static final int BATCH_CREATE_SESSIONS_RND_TIME = 10; + private static final int BEGIN_TRANSACTION_MIN_TIME = 1; + private static final int BEGIN_TRANSACTION_RND_TIME = 1; + private static final int COMMIT_TRANSACTION_MIN_TIME = 5; + private static final int COMMIT_TRANSACTION_RND_TIME = 5; + private static final int ROLLBACK_TRANSACTION_MIN_TIME = 1; + private static final int ROLLBACK_TRANSACTION_RND_TIME = 1; + private static final int EXECUTE_STREAMING_SQL_MIN_TIME = 10; + private static final int EXECUTE_STREAMING_SQL_RND_TIME = 10; + private static final int EXECUTE_SQL_MIN_TIME = 10; + private static final int EXECUTE_SQL_RND_TIME = 10; + + private static final Statement UPDATE_STATEMENT = + Statement.of("UPDATE FOO SET BAR=1 WHERE BAZ=2"); + private static final Statement INVALID_UPDATE_STATEMENT = + Statement.of("UPDATE NON_EXISTENT_TABLE SET BAR=1 WHERE BAZ=2"); + private static final long UPDATE_COUNT = 1L; + private static final Statement SELECT1 = Statement.of("SELECT 1 AS COL1"); + private static final ResultSetMetadata SELECT1_METADATA = + ResultSetMetadata.newBuilder() + .setRowType( + StructType.newBuilder() + .addFields( + Field.newBuilder() + .setName("COL1") + .setType( + com.google.spanner.v1.Type.newBuilder() + .setCode(TypeCode.INT64) + .build()) + .build()) + .build()) + .build(); + private static final com.google.spanner.v1.ResultSet SELECT1_RESULTSET = + com.google.spanner.v1.ResultSet.newBuilder() + .addRows( + ListValue.newBuilder() + .addValues(com.google.protobuf.Value.newBuilder().setStringValue("1").build()) + .build()) + .setMetadata(SELECT1_METADATA) + .build(); + private MockSpannerServiceImpl mockSpanner; + private Server server; + private LocalChannelProvider channelProvider; + + private Spanner spanner; + private DatabaseClientImpl client; + + @Param({"100"}) + int minSessions; + + @Param({"400"}) + int maxSessions; + + @Param({"1", "10", "20", "25", "30", "40", "50", "100"}) + int incStep; + + @Param({"4"}) + int numChannels; + + @Param({"0.2"}) + float writeFraction; + + /** AuxCounter for number of RPCs. */ + public int numBatchCreateSessionsRpcs() { + return countRequests(BatchCreateSessionsRequest.class); + } + + /** AuxCounter for number of sessions created. */ + public int sessionsCreated() { + return mockSpanner.numSessionsCreated(); + } + + @Setup(Level.Invocation) + public void setup() throws Exception { + mockSpanner = new MockSpannerServiceImpl(); + mockSpanner.setAbortProbability( + 0.0D); // We don't want any unpredictable aborted transactions. + mockSpanner.putStatementResult(StatementResult.update(UPDATE_STATEMENT, UPDATE_COUNT)); + mockSpanner.putStatementResult(StatementResult.query(SELECT1, SELECT1_RESULTSET)); + mockSpanner.putStatementResult( + StatementResult.exception( + INVALID_UPDATE_STATEMENT, + Status.INVALID_ARGUMENT.withDescription("invalid statement").asRuntimeException())); + + mockSpanner.setBatchCreateSessionsExecutionTime( + SimulatedExecutionTime.ofMinimumAndRandomTime( + NETWORK_LATENCY_TIME + BATCH_CREATE_SESSIONS_MIN_TIME, + BATCH_CREATE_SESSIONS_RND_TIME)); + mockSpanner.setBeginTransactionExecutionTime( + SimulatedExecutionTime.ofMinimumAndRandomTime( + NETWORK_LATENCY_TIME + BEGIN_TRANSACTION_MIN_TIME, BEGIN_TRANSACTION_RND_TIME)); + mockSpanner.setCommitExecutionTime( + SimulatedExecutionTime.ofMinimumAndRandomTime( + NETWORK_LATENCY_TIME + COMMIT_TRANSACTION_MIN_TIME, COMMIT_TRANSACTION_RND_TIME)); + mockSpanner.setRollbackExecutionTime( + SimulatedExecutionTime.ofMinimumAndRandomTime( + NETWORK_LATENCY_TIME + ROLLBACK_TRANSACTION_MIN_TIME, ROLLBACK_TRANSACTION_RND_TIME)); + mockSpanner.setExecuteStreamingSqlExecutionTime( + SimulatedExecutionTime.ofMinimumAndRandomTime( + NETWORK_LATENCY_TIME + EXECUTE_STREAMING_SQL_MIN_TIME, + EXECUTE_STREAMING_SQL_RND_TIME)); + mockSpanner.setExecuteSqlExecutionTime( + SimulatedExecutionTime.ofMinimumAndRandomTime( + NETWORK_LATENCY_TIME + EXECUTE_SQL_MIN_TIME, EXECUTE_SQL_RND_TIME)); + + String uniqueName = InProcessServerBuilder.generateName(); + server = InProcessServerBuilder.forName(uniqueName).addService(mockSpanner).build().start(); + channelProvider = LocalChannelProvider.create(uniqueName); + + SpannerOptions options = + SpannerOptions.newBuilder() + .setProjectId(TEST_PROJECT) + .setChannelProvider(channelProvider) + .setNumChannels(numChannels) + .setCredentials(NoCredentials.getInstance()) + .setSessionPoolOption( + SessionPoolOptions.newBuilder() + .setMinSessions(minSessions) + .setMaxSessions(maxSessions) + .setIncStep(incStep) + .setWriteSessionsFraction(writeFraction) + .build()) + .build(); + + spanner = options.getService(); + client = + (DatabaseClientImpl) + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + // Wait until the session pool has initialized. + while (client.pool.getNumberOfSessionsInPool() + < spanner.getOptions().getSessionPoolOptions().getMinSessions()) { + Thread.sleep(1L); + } + } + + @TearDown(Level.Invocation) + public void teardown() throws Exception { + spanner.close(); + server.shutdown(); + server.awaitTermination(); + } + + int expectedStepsToMax() { + int remainder = (maxSessions - minSessions) % incStep == 0 ? 0 : 1; + return numChannels + ((maxSessions - minSessions) / incStep) + remainder; + } + + int countRequests(final Class type) { + return Collections2.filter( + mockSpanner.getRequests(), + new Predicate() { + @Override + public boolean apply(AbstractMessage input) { + return input.getClass().equals(type); + } + }) + .size(); + } + } + + /** Measures the time needed to execute a burst of read requests. */ + @Benchmark + public void burstRead(final MockServer server) throws Exception { + int totalQueries = server.maxSessions * 8; + int parallelThreads = server.maxSessions * 2; + final DatabaseClient client = + server.spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + SessionPool pool = ((DatabaseClientImpl) client).pool; + assertThat(pool.totalSessions()).isEqualTo(server.minSessions); + + ListeningScheduledExecutorService service = + MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(parallelThreads)); + List> futures = new ArrayList<>(totalQueries); + for (int i = 0; i < totalQueries; i++) { + futures.add( + service.submit( + new Callable() { + @Override + public Void call() throws Exception { + Thread.sleep(RND.nextInt(RND_WAIT_TIME_BETWEEN_REQUESTS)); + try (ResultSet rs = client.singleUse().executeQuery(MockServer.SELECT1)) { + while (rs.next()) { + Thread.sleep(RND.nextInt(HOLD_SESSION_TIME)); + } + return null; + } + } + })); + } + Futures.allAsList(futures).get(); + service.shutdown(); + } + + /** Measures the time needed to execute a burst of write requests. */ + @Benchmark + public void burstWrite(final MockServer server) throws Exception { + int totalWrites = server.maxSessions * 8; + int parallelThreads = server.maxSessions * 2; + final DatabaseClient client = + server.spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + SessionPool pool = ((DatabaseClientImpl) client).pool; + assertThat(pool.totalSessions()).isEqualTo(server.minSessions); + + ListeningScheduledExecutorService service = + MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(parallelThreads)); + List> futures = new ArrayList<>(totalWrites); + for (int i = 0; i < totalWrites; i++) { + futures.add( + service.submit( + new Callable() { + @Override + public Long call() throws Exception { + Thread.sleep(RND.nextInt(RND_WAIT_TIME_BETWEEN_REQUESTS)); + TransactionRunner runner = client.readWriteTransaction(); + return runner.run( + new TransactionCallable() { + @Override + public Long run(TransactionContext transaction) throws Exception { + return transaction.executeUpdate(MockServer.UPDATE_STATEMENT); + } + }); + } + })); + } + Futures.allAsList(futures).get(); + service.shutdown(); + } + + /** Measures the time needed to execute a burst of read and write requests. */ + @Benchmark + public void burstReadAndWrite(final MockServer server) throws Exception { + int totalWrites = server.maxSessions * 4; + int totalReads = server.maxSessions * 4; + int parallelThreads = server.maxSessions * 2; + final DatabaseClient client = + server.spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + SessionPool pool = ((DatabaseClientImpl) client).pool; + assertThat(pool.totalSessions()).isEqualTo(server.minSessions); + + ListeningScheduledExecutorService service = + MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(parallelThreads)); + List> futures = new ArrayList<>(totalReads + totalWrites); + for (int i = 0; i < totalWrites; i++) { + futures.add( + service.submit( + new Callable() { + @Override + public Long call() throws Exception { + Thread.sleep(RND.nextInt(RND_WAIT_TIME_BETWEEN_REQUESTS)); + TransactionRunner runner = client.readWriteTransaction(); + return runner.run( + new TransactionCallable() { + @Override + public Long run(TransactionContext transaction) throws Exception { + return transaction.executeUpdate(MockServer.UPDATE_STATEMENT); + } + }); + } + })); + } + for (int i = 0; i < totalReads; i++) { + futures.add( + service.submit( + new Callable() { + @Override + public Void call() throws Exception { + Thread.sleep(RND.nextInt(RND_WAIT_TIME_BETWEEN_REQUESTS)); + try (ResultSet rs = client.singleUse().executeQuery(MockServer.SELECT1)) { + while (rs.next()) { + Thread.sleep(RND.nextInt(HOLD_SESSION_TIME)); + } + return null; + } + } + })); + } + Futures.allAsList(futures).get(); + service.shutdown(); + } + + /** Measures the time needed to acquire MaxSessions session sequentially. */ + @Benchmark + public void steadyIncrease(MockServer server) throws Exception { + final DatabaseClient client = + server.spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + SessionPool pool = ((DatabaseClientImpl) client).pool; + assertThat(pool.totalSessions()).isEqualTo(server.minSessions); + + // Checkout maxSessions sessions by starting maxSessions read-only transactions sequentially. + List transactions = new ArrayList<>(server.maxSessions); + for (int i = 0; i < server.maxSessions; i++) { + transactions.add(client.readOnlyTransaction()); + } + for (ReadOnlyTransaction tx : transactions) { + tx.close(); + } + } +} diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolLeakTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolLeakTest.java index 0c92f8d461..dbafb9dd01 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolLeakTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolLeakTest.java @@ -81,11 +81,12 @@ public void setUp() throws Exception { .setChannelProvider(channelProvider) .setCredentials(NoCredentials.getInstance()); // Make sure the session pool is empty by default, does not contain any write-prepared sessions, - // and contains at most 2 sessions. + // contains at most 2 sessions, and creates sessions in steps of 1. builder.setSessionPoolOption( SessionPoolOptions.newBuilder() .setMinSessions(0) .setMaxSessions(2) + .setIncStep(1) .setWriteSessionsFraction(0.0f) .build()); spanner = builder.build().getService(); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolStressTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolStressTest.java index 60eb151f6e..f7c2f02a59 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolStressTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolStressTest.java @@ -128,7 +128,7 @@ public Void answer(InvocationOnMock invocation) throws Throwable { maxAliveSessions = sessions.size(); } SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); consumer.onSessionReady(session); } } @@ -136,7 +136,8 @@ public Void answer(InvocationOnMock invocation) throws Throwable { } }) .when(sessionClient) - .asyncBatchCreateSessions(Mockito.anyInt(), Mockito.any(SessionConsumer.class)); + .asyncBatchCreateSessions( + Mockito.anyInt(), Mockito.anyBoolean(), Mockito.any(SessionConsumer.class)); } private void setupSession(final Session session) { diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java index 6daa36f1d6..a9636dea1b 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java @@ -136,6 +136,7 @@ public void setUp() throws Exception { SessionPoolOptions.newBuilder() .setMinSessions(minSessions) .setMaxSessions(2) + .setIncStep(1) .setBlockIfPoolExhausted() .build(); } @@ -151,7 +152,7 @@ public Void answer(final InvocationOnMock invocation) throws Throwable { public void run() { int sessionCount = invocation.getArgumentAt(0, Integer.class); SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); for (int i = 0; i < sessionCount; i++) { consumer.onSessionReady(mockSession()); } @@ -161,7 +162,8 @@ public void run() { } }) .when(sessionClient) - .asyncBatchCreateSessions(Mockito.anyInt(), any(SessionConsumer.class)); + .asyncBatchCreateSessions( + Mockito.anyInt(), Mockito.anyBoolean(), any(SessionConsumer.class)); } @Test @@ -220,7 +222,7 @@ public Void answer(final InvocationOnMock invocation) throws Throwable { @Override public void run() { SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); consumer.onSessionReady(sessions.pop()); } }); @@ -228,7 +230,7 @@ public void run() { } }) .when(sessionClient) - .asyncBatchCreateSessions(Mockito.eq(1), any(SessionConsumer.class)); + .asyncBatchCreateSessions(Mockito.eq(1), Mockito.anyBoolean(), any(SessionConsumer.class)); pool = createPool(); Session session1 = pool.getReadSession(); // Leaked sessions @@ -278,7 +280,7 @@ public Void answer(final InvocationOnMock invocation) throws Throwable { @Override public void run() { SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); consumer.onSessionReady(session1); } }); @@ -296,7 +298,7 @@ public Void call() throws Exception { insideCreation.countDown(); releaseCreation.await(); SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); consumer.onSessionReady(session2); return null; } @@ -305,7 +307,7 @@ public Void call() throws Exception { } }) .when(sessionClient) - .asyncBatchCreateSessions(Mockito.eq(1), any(SessionConsumer.class)); + .asyncBatchCreateSessions(Mockito.eq(1), Mockito.anyBoolean(), any(SessionConsumer.class)); pool = createPool(); PooledSession leakedSession = pool.getReadSession(); @@ -336,7 +338,7 @@ public Void answer(final InvocationOnMock invocation) throws Throwable { @Override public void run() { SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); consumer.onSessionReady(session1); } }); @@ -354,7 +356,7 @@ public Void call() throws Exception { insideCreation.countDown(); releaseCreation.await(); SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); consumer.onSessionReady(session2); return null; } @@ -363,7 +365,7 @@ public Void call() throws Exception { } }) .when(sessionClient) - .asyncBatchCreateSessions(Mockito.eq(1), any(SessionConsumer.class)); + .asyncBatchCreateSessions(Mockito.eq(1), Mockito.anyBoolean(), any(SessionConsumer.class)); pool = createPool(); PooledSession leakedSession = pool.getReadSession(); @@ -394,7 +396,7 @@ public Void call() throws Exception { insideCreation.countDown(); releaseCreation.await(); SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); consumer.onSessionCreateFailure( SpannerExceptionFactory.newSpannerException(new RuntimeException()), 1); return null; @@ -404,7 +406,7 @@ public Void call() throws Exception { } }) .when(sessionClient) - .asyncBatchCreateSessions(Mockito.eq(1), any(SessionConsumer.class)); + .asyncBatchCreateSessions(Mockito.eq(1), Mockito.anyBoolean(), any(SessionConsumer.class)); pool = createPool(); AtomicBoolean failed = new AtomicBoolean(false); CountDownLatch latch = new CountDownLatch(1); @@ -428,7 +430,7 @@ public Void answer(final InvocationOnMock invocation) throws Throwable { @Override public void run() { SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); consumer.onSessionReady(session); } }); @@ -436,7 +438,7 @@ public void run() { } }) .when(sessionClient) - .asyncBatchCreateSessions(Mockito.eq(1), any(SessionConsumer.class)); + .asyncBatchCreateSessions(Mockito.eq(1), Mockito.anyBoolean(), any(SessionConsumer.class)); final CountDownLatch insidePrepare = new CountDownLatch(1); final CountDownLatch releasePrepare = new CountDownLatch(1); doAnswer( @@ -473,7 +475,7 @@ public Void answer(final InvocationOnMock invocation) throws Throwable { @Override public void run() { SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); consumer.onSessionReady(session); } }); @@ -481,7 +483,7 @@ public void run() { } }) .when(sessionClient) - .asyncBatchCreateSessions(Mockito.eq(1), any(SessionConsumer.class)); + .asyncBatchCreateSessions(Mockito.eq(1), Mockito.anyBoolean(), any(SessionConsumer.class)); pool = createPool(); PooledSession leakedSession = pool.getReadSession(); // Suppress expected leakedSession warning. @@ -503,7 +505,7 @@ public void atMostMaxSessionsCreated() { } Uninterruptibles.awaitUninterruptibly(latch); verify(sessionClient, atMost(options.getMaxSessions())) - .asyncBatchCreateSessions(eq(1), any(SessionConsumer.class)); + .asyncBatchCreateSessions(eq(1), Mockito.anyBoolean(), any(SessionConsumer.class)); assertThat(failed.get()).isFalse(); } @@ -518,7 +520,7 @@ public Void answer(final InvocationOnMock invocation) throws Throwable { @Override public Void call() throws Exception { SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); consumer.onSessionCreateFailure( SpannerExceptionFactory.newSpannerException(ErrorCode.INTERNAL, ""), 1); return null; @@ -528,7 +530,7 @@ public Void call() throws Exception { } }) .when(sessionClient) - .asyncBatchCreateSessions(Mockito.eq(1), any(SessionConsumer.class)); + .asyncBatchCreateSessions(Mockito.eq(1), Mockito.anyBoolean(), any(SessionConsumer.class)); pool = createPool(); expectedException.expect(isSpannerException(ErrorCode.INTERNAL)); pool.getReadSession(); @@ -545,7 +547,7 @@ public Void answer(final InvocationOnMock invocation) throws Throwable { @Override public Void call() throws Exception { SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); consumer.onSessionCreateFailure( SpannerExceptionFactory.newSpannerException(ErrorCode.INTERNAL, ""), 1); return null; @@ -555,7 +557,7 @@ public Void call() throws Exception { } }) .when(sessionClient) - .asyncBatchCreateSessions(Mockito.eq(1), any(SessionConsumer.class)); + .asyncBatchCreateSessions(Mockito.eq(1), Mockito.anyBoolean(), any(SessionConsumer.class)); pool = createPool(); expectedException.expect(isSpannerException(ErrorCode.INTERNAL)); pool.getReadWriteSession(); @@ -573,7 +575,7 @@ public Void answer(final InvocationOnMock invocation) throws Throwable { @Override public void run() { SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); consumer.onSessionReady(session); } }); @@ -581,7 +583,7 @@ public void run() { } }) .when(sessionClient) - .asyncBatchCreateSessions(Mockito.eq(1), any(SessionConsumer.class)); + .asyncBatchCreateSessions(Mockito.eq(1), Mockito.anyBoolean(), any(SessionConsumer.class)); doThrow(SpannerExceptionFactory.newSpannerException(ErrorCode.INTERNAL, "")) .when(session) .prepareReadWriteTransaction(); @@ -602,7 +604,7 @@ public Void answer(final InvocationOnMock invocation) throws Throwable { @Override public void run() { SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); consumer.onSessionReady(mockSession); } }); @@ -610,7 +612,7 @@ public void run() { } }) .when(sessionClient) - .asyncBatchCreateSessions(Mockito.eq(1), any(SessionConsumer.class)); + .asyncBatchCreateSessions(Mockito.eq(1), Mockito.anyBoolean(), any(SessionConsumer.class)); pool = createPool(); try (Session session = pool.getReadWriteSession()) { assertThat(session).isNotNull(); @@ -633,7 +635,7 @@ public Void answer(final InvocationOnMock invocation) throws Throwable { @Override public void run() { SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); consumer.onSessionReady(sessions.pop()); } }); @@ -641,7 +643,7 @@ public void run() { } }) .when(sessionClient) - .asyncBatchCreateSessions(Mockito.eq(1), any(SessionConsumer.class)); + .asyncBatchCreateSessions(Mockito.eq(1), Mockito.anyBoolean(), any(SessionConsumer.class)); pool = createPool(); Session session1 = pool.getReadWriteSession(); Session session2 = pool.getReadWriteSession(); @@ -664,7 +666,7 @@ public Void answer(final InvocationOnMock invocation) throws Throwable { @Override public void run() { SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); consumer.onSessionReady(session); } }); @@ -672,7 +674,7 @@ public void run() { } }) .when(sessionClient) - .asyncBatchCreateSessions(Mockito.eq(1), any(SessionConsumer.class)); + .asyncBatchCreateSessions(Mockito.eq(1), Mockito.anyBoolean(), any(SessionConsumer.class)); pool = createPool(); int numSessions = 5; @@ -719,7 +721,7 @@ public Void answer(final InvocationOnMock invocation) throws Throwable { @Override public void run() { SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); consumer.onSessionReady(mockSession1); consumer.onSessionReady(mockSession2); } @@ -728,7 +730,7 @@ public void run() { } }) .when(sessionClient) - .asyncBatchCreateSessions(Mockito.eq(2), any(SessionConsumer.class)); + .asyncBatchCreateSessions(Mockito.eq(2), Mockito.anyBoolean(), any(SessionConsumer.class)); options = SessionPoolOptions.newBuilder() @@ -770,7 +772,7 @@ public Void answer(final InvocationOnMock invocation) throws Throwable { @Override public void run() { SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); consumer.onSessionReady(mockSession1); } }); @@ -778,7 +780,7 @@ public void run() { } }) .when(sessionClient) - .asyncBatchCreateSessions(Mockito.eq(1), any(SessionConsumer.class)); + .asyncBatchCreateSessions(Mockito.eq(1), Mockito.anyBoolean(), any(SessionConsumer.class)); options = SessionPoolOptions.newBuilder() .setMinSessions(minSessions) @@ -810,7 +812,7 @@ public Void answer(final InvocationOnMock invocation) throws Throwable { @Override public void run() { SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); consumer.onSessionReady(mockSession()); } }); @@ -818,7 +820,7 @@ public void run() { } }) .when(sessionClient) - .asyncBatchCreateSessions(Mockito.eq(1), any(SessionConsumer.class)); + .asyncBatchCreateSessions(Mockito.eq(1), Mockito.anyBoolean(), any(SessionConsumer.class)); pool = createPool(); Session session1 = pool.getReadSession(); expectedException.expect(isSpannerException(ErrorCode.RESOURCE_EXHAUSTED)); @@ -847,7 +849,7 @@ public Void answer(final InvocationOnMock invocation) throws Throwable { @Override public void run() { SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); consumer.onSessionReady(sessions.pop()); } }); @@ -855,7 +857,7 @@ public void run() { } }) .when(sessionClient) - .asyncBatchCreateSessions(Mockito.eq(1), any(SessionConsumer.class)); + .asyncBatchCreateSessions(Mockito.eq(1), Mockito.anyBoolean(), any(SessionConsumer.class)); pool = createPool(); assertThat(pool.getReadWriteSession().delegate).isEqualTo(mockSession2); } @@ -866,6 +868,7 @@ public void idleSessionCleanup() throws Exception { SessionPoolOptions.newBuilder() .setMinSessions(1) .setMaxSessions(3) + .setIncStep(1) .setMaxIdleSessions(0) .build(); SessionImpl session1 = mockSession(); @@ -883,7 +886,7 @@ public Void answer(final InvocationOnMock invocation) throws Throwable { @Override public void run() { SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); consumer.onSessionReady(sessions.pop()); } }); @@ -891,7 +894,7 @@ public void run() { } }) .when(sessionClient) - .asyncBatchCreateSessions(Mockito.eq(1), any(SessionConsumer.class)); + .asyncBatchCreateSessions(Mockito.eq(1), Mockito.anyBoolean(), any(SessionConsumer.class)); for (Session session : new Session[] {session1, session2, session3}) { doAnswer( new Answer>() { @@ -950,7 +953,7 @@ public Void answer(final InvocationOnMock invocation) throws Throwable { public void run() { int sessionCount = invocation.getArgumentAt(0, Integer.class); SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); for (int i = 0; i < sessionCount; i++) { consumer.onSessionReady(session); } @@ -960,7 +963,7 @@ public void run() { } }) .when(sessionClient) - .asyncBatchCreateSessions(anyInt(), any(SessionConsumer.class)); + .asyncBatchCreateSessions(anyInt(), Mockito.anyBoolean(), any(SessionConsumer.class)); FakeClock clock = new FakeClock(); clock.currentTimeMillis = System.currentTimeMillis(); pool = createPool(clock); @@ -1066,7 +1069,7 @@ public Void answer(final InvocationOnMock invocation) throws Throwable { @Override public void run() { SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); consumer.onSessionReady(closedSession); } }); @@ -1082,7 +1085,7 @@ public Void answer(final InvocationOnMock invocation) throws Throwable { @Override public void run() { SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); consumer.onSessionReady(openSession); } }); @@ -1090,7 +1093,7 @@ public void run() { } }) .when(sessionClient) - .asyncBatchCreateSessions(Mockito.eq(1), any(SessionConsumer.class)); + .asyncBatchCreateSessions(Mockito.eq(1), Mockito.anyBoolean(), any(SessionConsumer.class)); FakeClock clock = new FakeClock(); clock.currentTimeMillis = System.currentTimeMillis(); pool = createPool(clock); @@ -1122,7 +1125,7 @@ public Void answer(final InvocationOnMock invocation) throws Throwable { @Override public void run() { SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); consumer.onSessionReady(closedSession); } }); @@ -1138,7 +1141,7 @@ public Void answer(final InvocationOnMock invocation) throws Throwable { @Override public void run() { SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); consumer.onSessionReady(openSession); } }); @@ -1146,7 +1149,7 @@ public void run() { } }) .when(sessionClient) - .asyncBatchCreateSessions(Mockito.eq(1), any(SessionConsumer.class)); + .asyncBatchCreateSessions(Mockito.eq(1), Mockito.anyBoolean(), any(SessionConsumer.class)); FakeClock clock = new FakeClock(); clock.currentTimeMillis = System.currentTimeMillis(); pool = createPool(clock); @@ -1244,7 +1247,7 @@ public Void answer(final InvocationOnMock invocation) throws Throwable { @Override public void run() { SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); consumer.onSessionReady(closedSession); } }); @@ -1260,7 +1263,7 @@ public Void answer(final InvocationOnMock invocation) throws Throwable { @Override public void run() { SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); consumer.onSessionReady(openSession); } }); @@ -1268,11 +1271,13 @@ public void run() { } }) .when(sessionClient) - .asyncBatchCreateSessions(Mockito.eq(1), any(SessionConsumer.class)); + .asyncBatchCreateSessions( + Mockito.eq(1), Mockito.anyBoolean(), any(SessionConsumer.class)); SessionPoolOptions options = SessionPoolOptions.newBuilder() .setMinSessions(0) // The pool should not auto-create any sessions .setMaxSessions(2) + .setIncStep(1) .setBlockIfPoolExhausted() .build(); SpannerOptions spannerOptions = mock(SpannerOptions.class); @@ -1378,7 +1383,7 @@ public Void answer(final InvocationOnMock invocation) throws Throwable { @Override public void run() { SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); consumer.onSessionReady(closedSession); } }); @@ -1394,7 +1399,7 @@ public Void answer(final InvocationOnMock invocation) throws Throwable { @Override public void run() { SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); consumer.onSessionReady(openSession); } }); @@ -1402,7 +1407,7 @@ public void run() { } }) .when(sessionClient) - .asyncBatchCreateSessions(Mockito.eq(1), any(SessionConsumer.class)); + .asyncBatchCreateSessions(Mockito.eq(1), Mockito.anyBoolean(), any(SessionConsumer.class)); FakeClock clock = new FakeClock(); clock.currentTimeMillis = System.currentTimeMillis(); pool = createPool(clock); @@ -1429,7 +1434,7 @@ public Void answer(final InvocationOnMock invocation) throws Throwable { @Override public void run() { SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); consumer.onSessionReady(closedSession); } }); @@ -1445,7 +1450,7 @@ public Void answer(final InvocationOnMock invocation) throws Throwable { @Override public void run() { SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); consumer.onSessionReady(openSession); } }); @@ -1453,7 +1458,7 @@ public void run() { } }) .when(sessionClient) - .asyncBatchCreateSessions(Mockito.eq(1), any(SessionConsumer.class)); + .asyncBatchCreateSessions(Mockito.eq(1), Mockito.anyBoolean(), any(SessionConsumer.class)); FakeClock clock = new FakeClock(); clock.currentTimeMillis = System.currentTimeMillis(); @@ -1481,7 +1486,7 @@ public Void answer(final InvocationOnMock invocation) throws Throwable { @Override public void run() { SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); consumer.onSessionReady(closedSession); } }); @@ -1497,7 +1502,7 @@ public Void answer(final InvocationOnMock invocation) throws Throwable { @Override public void run() { SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); consumer.onSessionReady(openSession); } }); @@ -1505,7 +1510,7 @@ public void run() { } }) .when(sessionClient) - .asyncBatchCreateSessions(Mockito.eq(1), any(SessionConsumer.class)); + .asyncBatchCreateSessions(Mockito.eq(1), Mockito.anyBoolean(), any(SessionConsumer.class)); FakeClock clock = new FakeClock(); clock.currentTimeMillis = System.currentTimeMillis(); pool = createPool(clock); @@ -1532,7 +1537,7 @@ public Void answer(final InvocationOnMock invocation) throws Throwable { @Override public void run() { SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); consumer.onSessionReady(closedSession); } }); @@ -1548,7 +1553,7 @@ public Void answer(final InvocationOnMock invocation) throws Throwable { @Override public void run() { SessionConsumerImpl consumer = - invocation.getArgumentAt(1, SessionConsumerImpl.class); + invocation.getArgumentAt(2, SessionConsumerImpl.class); consumer.onSessionReady(openSession); } }); @@ -1556,7 +1561,7 @@ public void run() { } }) .when(sessionClient) - .asyncBatchCreateSessions(Mockito.eq(1), any(SessionConsumer.class)); + .asyncBatchCreateSessions(Mockito.eq(1), Mockito.anyBoolean(), any(SessionConsumer.class)); FakeClock clock = new FakeClock(); clock.currentTimeMillis = System.currentTimeMillis(); pool = createPool(clock); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpanTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpanTest.java index e60522dc1d..eb1bb67e89 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpanTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpanTest.java @@ -166,7 +166,11 @@ public void setUp() throws Exception { .setProjectId(TEST_PROJECT) .setChannelProvider(channelProvider) .setCredentials(NoCredentials.getInstance()) - .setSessionPoolOption(SessionPoolOptions.newBuilder().setMinSessions(0).build()); + .setSessionPoolOption( + SessionPoolOptions.newBuilder() + .setMinSessions(0) + .setWriteSessionsFraction(0.0f) + .build()); spanner = builder.build().getService(); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java index cc522f3f45..cd94d82800 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java @@ -196,7 +196,7 @@ public void usesPreparedTransaction() { when(transportOptions.getExecutorFactory()).thenReturn(new TestExecutorFactory()); when(options.getTransportOptions()).thenReturn(transportOptions); SessionPoolOptions sessionPoolOptions = - SessionPoolOptions.newBuilder().setMinSessions(0).build(); + SessionPoolOptions.newBuilder().setMinSessions(0).setIncStep(1).build(); when(options.getSessionPoolOptions()).thenReturn(sessionPoolOptions); when(options.getSessionLabels()).thenReturn(Collections.emptyMap()); SpannerRpc rpc = mock(SpannerRpc.class); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java index 3dcd523c13..c45f46fbd1 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java @@ -106,7 +106,7 @@ public void usesPreparedTransaction() { when(transportOptions.getExecutorFactory()).thenReturn(new TestExecutorFactory()); when(options.getTransportOptions()).thenReturn(transportOptions); SessionPoolOptions sessionPoolOptions = - SessionPoolOptions.newBuilder().setMinSessions(0).build(); + SessionPoolOptions.newBuilder().setMinSessions(0).setIncStep(1).build(); when(options.getSessionPoolOptions()).thenReturn(sessionPoolOptions); when(options.getSessionLabels()).thenReturn(Collections.emptyMap()); SpannerRpc rpc = mock(SpannerRpc.class);