Skip to content

Commit

Permalink
perf: increase sessions in the pool in batches (#134)
Browse files Browse the repository at this point in the history
* perf: increase sessions in the pool in batches

When more sessions are requested by the user application than are available in the session pool,
the session pool will now create new sessions in batches instead of in steps of 1. This reduces
the number of RPCs needed to serve a burst of requests.

A benchmark for the session pool has also been added to be able to compare performance and the
number of RPCs needed before and after this change. This benchmark can also be used for future
changes to verify that the change does not deteriorate performance or increase the number of
RPCs needed.

* fix: remove unused code

* fix: include num rpcs and sessions in benchmark results

* fix: remove commented code

* fix: rename parameter
  • Loading branch information
olavloite committed Apr 8, 2020
1 parent 77c1558 commit 9e5a1cd
Show file tree
Hide file tree
Showing 14 changed files with 567 additions and 86 deletions.
46 changes: 45 additions & 1 deletion google-cloud-spanner/pom.xml
Expand Up @@ -111,7 +111,7 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<configuration>
<ignoredDependencies>io.grpc:grpc-protobuf-lite,org.hamcrest:hamcrest,org.hamcrest:hamcrest-core,com.google.errorprone:error_prone_annotations,com.google.api.grpc:grpc-google-cloud-spanner-v1,com.google.api.grpc:grpc-google-cloud-spanner-admin-instance-v1,com.google.api.grpc:grpc-google-cloud-spanner-admin-database-v1</ignoredDependencies>
<ignoredDependencies>io.grpc:grpc-protobuf-lite,org.hamcrest:hamcrest,org.hamcrest:hamcrest-core,com.google.errorprone:error_prone_annotations,org.openjdk.jmh:jmh-generator-annprocess,com.google.api.grpc:grpc-google-cloud-spanner-v1,com.google.api.grpc:grpc-google-cloud-spanner-admin-instance-v1,com.google.api.grpc:grpc-google-cloud-spanner-admin-database-v1</ignoredDependencies>
</configuration>
</plugin>
</plugins>
Expand Down Expand Up @@ -305,6 +305,20 @@
<version>2.2</version>
<scope>test</scope>
</dependency>

<!-- Benchmarking dependencies -->
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-core</artifactId>
<version>1.23</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-generator-annprocess</artifactId>
<version>1.23</version>
<scope>test</scope>
</dependency>
</dependencies>

<profiles>
Expand All @@ -320,5 +334,35 @@
</dependency>
</dependencies>
</profile>
<profile>
<id>benchmark</id>
<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<executions>
<execution>
<id>run-benchmarks</id>
<phase>test</phase>
<goals>
<goal>exec</goal>
</goals>
<configuration>
<classpathScope>test</classpathScope>
<executable>java</executable>
<arguments>
<argument>-classpath</argument>
<classpath />
<argument>org.openjdk.jmh.Main</argument>
<argument>.*</argument>
</arguments>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>
Expand Up @@ -231,12 +231,21 @@ SessionImpl createSession() {
* sessions that could not be created.
*
* @param sessionCount The number of sessions to create.
* @param distributeOverChannels Whether to distribute the sessions over all available channels
* (true) or create all for the next channel round robin.
* @param consumer The {@link SessionConsumer} to use for callbacks when sessions are available.
*/
void asyncBatchCreateSessions(final int sessionCount, SessionConsumer consumer) {
// We spread the session creation evenly over all available channels.
int sessionCountPerChannel = sessionCount / spanner.getOptions().getNumChannels();
int remainder = sessionCount % spanner.getOptions().getNumChannels();
void asyncBatchCreateSessions(
final int sessionCount, boolean distributeOverChannels, SessionConsumer consumer) {
int sessionCountPerChannel;
int remainder;
if (distributeOverChannels) {
sessionCountPerChannel = sessionCount / spanner.getOptions().getNumChannels();
remainder = sessionCount % spanner.getOptions().getNumChannels();
} else {
sessionCountPerChannel = sessionCount;
remainder = 0;
}
int numBeingCreated = 0;
synchronized (this) {
for (int channelIndex = 0;
Expand All @@ -252,7 +261,7 @@ void asyncBatchCreateSessions(final int sessionCount, SessionConsumer consumer)
if (channelIndex == 0) {
createCountForChannel = sessionCountPerChannel + remainder;
}
if (createCountForChannel > 0) {
if (createCountForChannel > 0 && numBeingCreated < sessionCount) {
try {
executor.submit(
new BatchCreateSessionsRunnable(
Expand Down
Expand Up @@ -1077,7 +1077,7 @@ private void replenishPool() {
// If we have gone below min pool size, create that many sessions.
int sessionCount = options.getMinSessions() - (totalSessions() + numSessionsBeingCreated);
if (sessionCount > 0) {
createSessions(getAllowedCreateSessions(sessionCount));
createSessions(getAllowedCreateSessions(sessionCount), false);
}
}
}
Expand Down Expand Up @@ -1269,7 +1269,7 @@ private void initPool() {
synchronized (lock) {
poolMaintainer.init();
if (options.getMinSessions() > 0) {
createSessions(options.getMinSessions());
createSessions(options.getMinSessions(), true);
}
}
}
Expand Down Expand Up @@ -1308,7 +1308,7 @@ private void invalidateSession(PooledSession session) {
}
allSessions.remove(session);
// replenish the pool.
createSessions(getAllowedCreateSessions(1));
createSessions(getAllowedCreateSessions(1), false);
}
}

Expand Down Expand Up @@ -1507,7 +1507,7 @@ private void maybeCreateSession() {
if (numWaiters() >= numSessionsBeingCreated) {
if (canCreateSession()) {
span.addAnnotation("Creating sessions");
createSessions(getAllowedCreateSessions(numWaiters() - numSessionsBeingCreated + 1));
createSessions(getAllowedCreateSessions(options.getIncStep()), false);
} else if (options.isFailIfPoolExhausted()) {
span.addAnnotation("Pool exhausted. Failing");
// throw specific exception
Expand Down Expand Up @@ -1732,7 +1732,8 @@ public void run() {
}
// Create a new session if needed to unblock some waiter.
if (numWaiters() > numSessionsBeingCreated) {
createSessions(getAllowedCreateSessions(numWaiters() - numSessionsBeingCreated));
createSessions(
getAllowedCreateSessions(numWaiters() - numSessionsBeingCreated), false);
}
}
}
Expand Down Expand Up @@ -1794,7 +1795,7 @@ private boolean canCreateSession() {
}
}

private void createSessions(final int sessionCount) {
private void createSessions(final int sessionCount, boolean distributeOverChannels) {
logger.log(Level.FINE, String.format("Creating %d sessions", sessionCount));
synchronized (lock) {
numSessionsBeingCreated += sessionCount;
Expand All @@ -1803,8 +1804,8 @@ private void createSessions(final int sessionCount) {
// calls and the session consumer consumes the returned sessions as they become available.
// The batchCreateSessions method automatically spreads the sessions evenly over all
// available channels.
sessionClient.asyncBatchCreateSessions(sessionCount, sessionConsumer);
logger.log(Level.FINE, "Sessions created");
sessionClient.asyncBatchCreateSessions(
sessionCount, distributeOverChannels, sessionConsumer);
} catch (Throwable t) {
// Expose this to customer via a metric.
numSessionsBeingCreated -= sessionCount;
Expand Down
Expand Up @@ -24,9 +24,11 @@ public class SessionPoolOptions {
// Default number of channels * 100.
private static final int DEFAULT_MAX_SESSIONS = 400;
private static final int DEFAULT_MIN_SESSIONS = 100;
private static final int DEFAULT_INC_STEP = 25;
private static final ActionOnExhaustion DEFAULT_ACTION = ActionOnExhaustion.BLOCK;
private final int minSessions;
private final int maxSessions;
private final int incStep;
private final int maxIdleSessions;
private final float writeSessionsFraction;
private final ActionOnExhaustion actionOnExhaustion;
Expand All @@ -40,6 +42,7 @@ private SessionPoolOptions(Builder builder) {
// maxSessions value is less than the default for minSessions.
this.minSessions = Math.min(builder.minSessions, builder.maxSessions);
this.maxSessions = builder.maxSessions;
this.incStep = builder.incStep;
this.maxIdleSessions = builder.maxIdleSessions;
this.writeSessionsFraction = builder.writeSessionsFraction;
this.actionOnExhaustion = builder.actionOnExhaustion;
Expand All @@ -56,6 +59,10 @@ public int getMaxSessions() {
return maxSessions;
}

int getIncStep() {
return incStep;
}

public int getMaxIdleSessions() {
return maxIdleSessions;
}
Expand Down Expand Up @@ -105,6 +112,7 @@ public static class Builder {
private boolean minSessionsSet = false;
private int minSessions = DEFAULT_MIN_SESSIONS;
private int maxSessions = DEFAULT_MAX_SESSIONS;
private int incStep = DEFAULT_INC_STEP;
private int maxIdleSessions;
private float writeSessionsFraction = 0.2f;
private ActionOnExhaustion actionOnExhaustion = DEFAULT_ACTION;
Expand Down Expand Up @@ -135,6 +143,16 @@ public Builder setMaxSessions(int maxSessions) {
return this;
}

/**
* Number of sessions to batch create when the pool needs at least one more session. Defaults to
* 25.
*/
Builder setIncStep(int incStep) {
Preconditions.checkArgument(incStep > 0, "incStep must be > 0");
this.incStep = incStep;
return this;
}

/**
* Maximum number of idle sessions that this pool will maintain. Pool will close any sessions
* beyond this but making sure to always have at least as many sessions as specified by {@link
Expand Down
Expand Up @@ -104,6 +104,7 @@ public static void stopServer() throws InterruptedException {
@Before
public void setUp() throws IOException {
mockSpanner.reset();
mockSpanner.removeAllExecutionTimes();
}

private Spanner createSpanner(int minSessions, int maxSessions) {
Expand Down Expand Up @@ -245,7 +246,7 @@ public void testPrepareSessionFailPropagatesToUser() throws InterruptedException
int maxSessions = 1000;
DatabaseClientImpl client = null;
mockSpanner.setBeginTransactionExecutionTime(
SimulatedExecutionTime.ofException(
SimulatedExecutionTime.ofStickyException(
Status.ABORTED.withDescription("BeginTransaction failed").asRuntimeException()));
try (Spanner spanner = createSpanner(minSessions, maxSessions)) {
client =
Expand Down
Expand Up @@ -92,6 +92,7 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
Expand Down Expand Up @@ -469,6 +470,7 @@ private static void checkException(Queue<Exception> exceptions, boolean keepExce
private ConcurrentMap<ByteString, Instant> transactionLastUsed = new ConcurrentHashMap<>();
private int maxNumSessionsInOneBatch = 100;
private int maxTotalSessions = Integer.MAX_VALUE;
private AtomicInteger numSessionsCreated = new AtomicInteger();

private SimulatedExecutionTime beginTransactionExecutionTime = NO_EXECUTION_TIME;
private SimulatedExecutionTime commitExecutionTime = NO_EXECUTION_TIME;
Expand Down Expand Up @@ -642,6 +644,7 @@ public void batchCreateSessions(
if (sessions.size() <= maxTotalSessions) {
sessionLastUsed.put(name, Instant.now());
response.addSession(session);
numSessionsCreated.incrementAndGet();
} else {
sessions.remove(name);
}
Expand Down Expand Up @@ -687,6 +690,7 @@ public void createSession(
Session prev = sessions.putIfAbsent(name, session);
if (prev == null) {
sessionLastUsed.put(name, Instant.now());
numSessionsCreated.incrementAndGet();
responseObserver.onNext(session);
responseObserver.onCompleted();
} else {
Expand Down Expand Up @@ -1623,6 +1627,10 @@ private void partition(
}
}

public int numSessionsCreated() {
return numSessionsCreated.get();
}

@Override
public List<AbstractMessage> getRequests() {
return new ArrayList<>(this.requests);
Expand Down Expand Up @@ -1652,6 +1660,7 @@ public ServerServiceDefinition getServiceDefinition() {
public void reset() {
requests.clear();
sessions.clear();
numSessionsCreated.set(0);
sessionLastUsed.clear();
transactions.clear();
isPartitionedDmlTransaction.clear();
Expand Down
Expand Up @@ -182,7 +182,7 @@ public void onSessionCreateFailure(Throwable t, int createFailureForSessionCount
};
final int numSessions = 10;
try (SessionClient client = new SessionClient(spanner, db, new TestExecutorFactory())) {
client.asyncBatchCreateSessions(numSessions, consumer);
client.asyncBatchCreateSessions(numSessions, true, consumer);
}
assertThat(returnedSessionCount.get()).isEqualTo(numSessions);
assertThat(usedChannels.size()).isEqualTo(spannerOptions.getNumChannels());
Expand Down Expand Up @@ -275,7 +275,7 @@ public void onSessionCreateFailure(Throwable t, int createFailureForSessionCount
};
final int numSessions = 10;
try (SessionClient client = new SessionClient(spanner, db, new TestExecutorFactory())) {
client.asyncBatchCreateSessions(numSessions, consumer);
client.asyncBatchCreateSessions(numSessions, true, consumer);
}
assertThat(errorCount.get()).isEqualTo(errorOnChannels.size());
assertThat(returnedSessionCount.get())
Expand Down Expand Up @@ -330,7 +330,7 @@ public void onSessionCreateFailure(Throwable t, int createFailureForSessionCount
// sessions.
final int numSessions = 100;
try (SessionClient client = new SessionClient(spanner, db, new TestExecutorFactory())) {
client.asyncBatchCreateSessions(numSessions, consumer);
client.asyncBatchCreateSessions(numSessions, true, consumer);
}
assertThat(returnedSessionCount.get()).isEqualTo(numSessions);
}
Expand Down

0 comments on commit 9e5a1cd

Please sign in to comment.