Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: increase sessions in the pool in batches #134

Merged
merged 5 commits into from Apr 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
46 changes: 45 additions & 1 deletion google-cloud-spanner/pom.xml
Expand Up @@ -111,7 +111,7 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<configuration>
<ignoredDependencies>io.grpc:grpc-protobuf-lite,org.hamcrest:hamcrest,org.hamcrest:hamcrest-core,com.google.errorprone:error_prone_annotations,com.google.api.grpc:grpc-google-cloud-spanner-v1,com.google.api.grpc:grpc-google-cloud-spanner-admin-instance-v1,com.google.api.grpc:grpc-google-cloud-spanner-admin-database-v1</ignoredDependencies>
<ignoredDependencies>io.grpc:grpc-protobuf-lite,org.hamcrest:hamcrest,org.hamcrest:hamcrest-core,com.google.errorprone:error_prone_annotations,org.openjdk.jmh:jmh-generator-annprocess,com.google.api.grpc:grpc-google-cloud-spanner-v1,com.google.api.grpc:grpc-google-cloud-spanner-admin-instance-v1,com.google.api.grpc:grpc-google-cloud-spanner-admin-database-v1</ignoredDependencies>
</configuration>
</plugin>
</plugins>
Expand Down Expand Up @@ -305,6 +305,20 @@
<version>2.2</version>
<scope>test</scope>
</dependency>

<!-- Benchmarking dependencies -->
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-core</artifactId>
<version>1.23</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-generator-annprocess</artifactId>
<version>1.23</version>
<scope>test</scope>
</dependency>
</dependencies>

<profiles>
Expand All @@ -320,5 +334,35 @@
</dependency>
</dependencies>
</profile>
<profile>
<id>benchmark</id>
<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<executions>
<execution>
<id>run-benchmarks</id>
<phase>test</phase>
<goals>
<goal>exec</goal>
</goals>
<configuration>
<classpathScope>test</classpathScope>
<executable>java</executable>
<arguments>
<argument>-classpath</argument>
<classpath />
<argument>org.openjdk.jmh.Main</argument>
<argument>.*</argument>
</arguments>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>
Expand Up @@ -231,12 +231,21 @@ SessionImpl createSession() {
* sessions that could not be created.
*
* @param sessionCount The number of sessions to create.
* @param distributeOverChannels Whether to distribute the sessions over all available channels
* (true) or create all for the next channel round robin.
* @param consumer The {@link SessionConsumer} to use for callbacks when sessions are available.
*/
void asyncBatchCreateSessions(final int sessionCount, SessionConsumer consumer) {
// We spread the session creation evenly over all available channels.
int sessionCountPerChannel = sessionCount / spanner.getOptions().getNumChannels();
int remainder = sessionCount % spanner.getOptions().getNumChannels();
void asyncBatchCreateSessions(
final int sessionCount, boolean distributeOverChannels, SessionConsumer consumer) {
int sessionCountPerChannel;
int remainder;
if (distributeOverChannels) {
sessionCountPerChannel = sessionCount / spanner.getOptions().getNumChannels();
remainder = sessionCount % spanner.getOptions().getNumChannels();
} else {
sessionCountPerChannel = sessionCount;
remainder = 0;
}
int numBeingCreated = 0;
synchronized (this) {
for (int channelIndex = 0;
Expand All @@ -252,7 +261,7 @@ void asyncBatchCreateSessions(final int sessionCount, SessionConsumer consumer)
if (channelIndex == 0) {
createCountForChannel = sessionCountPerChannel + remainder;
}
if (createCountForChannel > 0) {
if (createCountForChannel > 0 && numBeingCreated < sessionCount) {
try {
executor.submit(
new BatchCreateSessionsRunnable(
Expand Down
Expand Up @@ -1077,7 +1077,7 @@ private void replenishPool() {
// If we have gone below min pool size, create that many sessions.
int sessionCount = options.getMinSessions() - (totalSessions() + numSessionsBeingCreated);
if (sessionCount > 0) {
createSessions(getAllowedCreateSessions(sessionCount));
createSessions(getAllowedCreateSessions(sessionCount), false);
olavloite marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Expand Down Expand Up @@ -1269,7 +1269,7 @@ private void initPool() {
synchronized (lock) {
poolMaintainer.init();
if (options.getMinSessions() > 0) {
createSessions(options.getMinSessions());
createSessions(options.getMinSessions(), true);
}
}
}
Expand Down Expand Up @@ -1308,7 +1308,7 @@ private void invalidateSession(PooledSession session) {
}
allSessions.remove(session);
// replenish the pool.
createSessions(getAllowedCreateSessions(1));
createSessions(getAllowedCreateSessions(1), false);
}
}

Expand Down Expand Up @@ -1507,7 +1507,7 @@ private void maybeCreateSession() {
if (numWaiters() >= numSessionsBeingCreated) {
if (canCreateSession()) {
span.addAnnotation("Creating sessions");
createSessions(getAllowedCreateSessions(numWaiters() - numSessionsBeingCreated + 1));
createSessions(getAllowedCreateSessions(options.getIncStep()), false);
} else if (options.isFailIfPoolExhausted()) {
span.addAnnotation("Pool exhausted. Failing");
// throw specific exception
Expand Down Expand Up @@ -1732,7 +1732,8 @@ public void run() {
}
// Create a new session if needed to unblock some waiter.
if (numWaiters() > numSessionsBeingCreated) {
createSessions(getAllowedCreateSessions(numWaiters() - numSessionsBeingCreated));
createSessions(
getAllowedCreateSessions(numWaiters() - numSessionsBeingCreated), false);
}
}
}
Expand Down Expand Up @@ -1794,7 +1795,7 @@ private boolean canCreateSession() {
}
}

private void createSessions(final int sessionCount) {
private void createSessions(final int sessionCount, boolean distributeOverChannels) {
logger.log(Level.FINE, String.format("Creating %d sessions", sessionCount));
synchronized (lock) {
numSessionsBeingCreated += sessionCount;
Expand All @@ -1803,8 +1804,8 @@ private void createSessions(final int sessionCount) {
// calls and the session consumer consumes the returned sessions as they become available.
// The batchCreateSessions method automatically spreads the sessions evenly over all
// available channels.
sessionClient.asyncBatchCreateSessions(sessionCount, sessionConsumer);
logger.log(Level.FINE, "Sessions created");
sessionClient.asyncBatchCreateSessions(
sessionCount, distributeOverChannels, sessionConsumer);
} catch (Throwable t) {
// Expose this to customer via a metric.
numSessionsBeingCreated -= sessionCount;
Expand Down
Expand Up @@ -24,9 +24,11 @@ public class SessionPoolOptions {
// Default number of channels * 100.
private static final int DEFAULT_MAX_SESSIONS = 400;
private static final int DEFAULT_MIN_SESSIONS = 100;
private static final int DEFAULT_INC_STEP = 25;
private static final ActionOnExhaustion DEFAULT_ACTION = ActionOnExhaustion.BLOCK;
private final int minSessions;
private final int maxSessions;
private final int incStep;
private final int maxIdleSessions;
private final float writeSessionsFraction;
private final ActionOnExhaustion actionOnExhaustion;
Expand All @@ -40,6 +42,7 @@ private SessionPoolOptions(Builder builder) {
// maxSessions value is less than the default for minSessions.
this.minSessions = Math.min(builder.minSessions, builder.maxSessions);
this.maxSessions = builder.maxSessions;
this.incStep = builder.incStep;
olavloite marked this conversation as resolved.
Show resolved Hide resolved
this.maxIdleSessions = builder.maxIdleSessions;
this.writeSessionsFraction = builder.writeSessionsFraction;
this.actionOnExhaustion = builder.actionOnExhaustion;
Expand All @@ -56,6 +59,10 @@ public int getMaxSessions() {
return maxSessions;
}

int getIncStep() {
return incStep;
}

public int getMaxIdleSessions() {
return maxIdleSessions;
}
Expand Down Expand Up @@ -105,6 +112,7 @@ public static class Builder {
private boolean minSessionsSet = false;
private int minSessions = DEFAULT_MIN_SESSIONS;
private int maxSessions = DEFAULT_MAX_SESSIONS;
private int incStep = DEFAULT_INC_STEP;
private int maxIdleSessions;
private float writeSessionsFraction = 0.2f;
private ActionOnExhaustion actionOnExhaustion = DEFAULT_ACTION;
Expand Down Expand Up @@ -135,6 +143,16 @@ public Builder setMaxSessions(int maxSessions) {
return this;
}

/**
* Number of sessions to batch create when the pool needs at least one more session. Defaults to
* 25.
*/
Builder setIncStep(int incStep) {
Preconditions.checkArgument(incStep > 0, "incStep must be > 0");
this.incStep = incStep;
return this;
}

/**
* Maximum number of idle sessions that this pool will maintain. Pool will close any sessions
* beyond this but making sure to always have at least as many sessions as specified by {@link
Expand Down
Expand Up @@ -104,6 +104,7 @@ public static void stopServer() throws InterruptedException {
@Before
public void setUp() throws IOException {
mockSpanner.reset();
mockSpanner.removeAllExecutionTimes();
}

private Spanner createSpanner(int minSessions, int maxSessions) {
Expand Down Expand Up @@ -245,7 +246,7 @@ public void testPrepareSessionFailPropagatesToUser() throws InterruptedException
int maxSessions = 1000;
DatabaseClientImpl client = null;
mockSpanner.setBeginTransactionExecutionTime(
SimulatedExecutionTime.ofException(
SimulatedExecutionTime.ofStickyException(
Status.ABORTED.withDescription("BeginTransaction failed").asRuntimeException()));
try (Spanner spanner = createSpanner(minSessions, maxSessions)) {
client =
Expand Down
Expand Up @@ -92,6 +92,7 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
Expand Down Expand Up @@ -469,6 +470,7 @@ private static void checkException(Queue<Exception> exceptions, boolean keepExce
private ConcurrentMap<ByteString, Instant> transactionLastUsed = new ConcurrentHashMap<>();
private int maxNumSessionsInOneBatch = 100;
private int maxTotalSessions = Integer.MAX_VALUE;
private AtomicInteger numSessionsCreated = new AtomicInteger();

private SimulatedExecutionTime beginTransactionExecutionTime = NO_EXECUTION_TIME;
private SimulatedExecutionTime commitExecutionTime = NO_EXECUTION_TIME;
Expand Down Expand Up @@ -642,6 +644,7 @@ public void batchCreateSessions(
if (sessions.size() <= maxTotalSessions) {
sessionLastUsed.put(name, Instant.now());
response.addSession(session);
numSessionsCreated.incrementAndGet();
} else {
sessions.remove(name);
}
Expand Down Expand Up @@ -687,6 +690,7 @@ public void createSession(
Session prev = sessions.putIfAbsent(name, session);
if (prev == null) {
sessionLastUsed.put(name, Instant.now());
numSessionsCreated.incrementAndGet();
responseObserver.onNext(session);
responseObserver.onCompleted();
} else {
Expand Down Expand Up @@ -1623,6 +1627,10 @@ private void partition(
}
}

public int numSessionsCreated() {
return numSessionsCreated.get();
}

@Override
public List<AbstractMessage> getRequests() {
return new ArrayList<>(this.requests);
Expand Down Expand Up @@ -1652,6 +1660,7 @@ public ServerServiceDefinition getServiceDefinition() {
public void reset() {
requests.clear();
sessions.clear();
numSessionsCreated.set(0);
sessionLastUsed.clear();
transactions.clear();
isPartitionedDmlTransaction.clear();
Expand Down
Expand Up @@ -182,7 +182,7 @@ public void onSessionCreateFailure(Throwable t, int createFailureForSessionCount
};
final int numSessions = 10;
try (SessionClient client = new SessionClient(spanner, db, new TestExecutorFactory())) {
client.asyncBatchCreateSessions(numSessions, consumer);
client.asyncBatchCreateSessions(numSessions, true, consumer);
}
assertThat(returnedSessionCount.get()).isEqualTo(numSessions);
assertThat(usedChannels.size()).isEqualTo(spannerOptions.getNumChannels());
Expand Down Expand Up @@ -275,7 +275,7 @@ public void onSessionCreateFailure(Throwable t, int createFailureForSessionCount
};
final int numSessions = 10;
try (SessionClient client = new SessionClient(spanner, db, new TestExecutorFactory())) {
client.asyncBatchCreateSessions(numSessions, consumer);
client.asyncBatchCreateSessions(numSessions, true, consumer);
}
assertThat(errorCount.get()).isEqualTo(errorOnChannels.size());
assertThat(returnedSessionCount.get())
Expand Down Expand Up @@ -330,7 +330,7 @@ public void onSessionCreateFailure(Throwable t, int createFailureForSessionCount
// sessions.
final int numSessions = 100;
try (SessionClient client = new SessionClient(spanner, db, new TestExecutorFactory())) {
client.asyncBatchCreateSessions(numSessions, consumer);
client.asyncBatchCreateSessions(numSessions, true, consumer);
}
assertThat(returnedSessionCount.get()).isEqualTo(numSessions);
}
Expand Down