Skip to content

Commit

Permalink
perf: close sessions async (#24)
Browse files Browse the repository at this point in the history
* perf: close sessions async

Sessions should be closed using an async gRPC call in order
to speed up the closing of a large session pool. Instead of
using its own executor, which is limited to 8 worker threads,
to execute asynchronous delete session calls, the session pool
should use the asynchronous call option in gRPC. This allows a
larger number of asynchronous delete session calls to be executed
in parallel and speeds up closing a session pool with a large
number of sessions.

Testing against a real Cloud Spanner database with a session pool
containing 1,000 sessions shows the following performance for
closing the session pool:

Before (3 runs):
6603ms
8169ms
8367ms

After (3 runs):
1297ms
1710ms
1851ms

Fixes #19.

* fix: wait for test servers to terminate

* remove tracing for async call

* do not use directExecutor which could be a gRPC thread

* fix: return failed future instead of throwing exception

* fix: remove commented code
  • Loading branch information
olavloite authored and skuruppu committed Jan 23, 2020
1 parent 11e4a90 commit ab25087
Show file tree
Hide file tree
Showing 17 changed files with 111 additions and 49 deletions.
Expand Up @@ -16,6 +16,9 @@

package com.google.cloud.spanner;

import com.google.api.core.ApiFuture;
import com.google.protobuf.Empty;

/**
* A {@code Session} can be used to perform transactions that read and/or modify data in a Cloud
* Spanner database.
Expand Down Expand Up @@ -54,4 +57,10 @@ public interface Session extends DatabaseClient, AutoCloseable {

@Override
void close();

/**
* Closes the session asynchronously and returns the {@link ApiFuture} that can be used to monitor
* the operation progress.
*/
ApiFuture<Empty> asyncClose();
}
Expand Up @@ -19,6 +19,7 @@
import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException;
import static com.google.common.base.Preconditions.checkNotNull;

import com.google.api.core.ApiFuture;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.AbstractReadContext.MultiUseReadOnlyTransaction;
import com.google.cloud.spanner.AbstractReadContext.SingleReadContext;
Expand All @@ -27,6 +28,7 @@
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
import com.google.protobuf.Empty;
import com.google.spanner.v1.BeginTransactionRequest;
import com.google.spanner.v1.CommitRequest;
import com.google.spanner.v1.CommitResponse;
Expand Down Expand Up @@ -196,6 +198,11 @@ public void prepareReadWriteTransaction() {
readyTransactionId = beginTransaction();
}

@Override
public ApiFuture<Empty> asyncClose() {
return spanner.getRpc().asyncDeleteSession(name, options);
}

@Override
public void close() {
Span span = tracer.spanBuilder(SpannerImpl.DELETE_SESSION).startSpan();
Expand Down
Expand Up @@ -18,6 +18,8 @@

import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.cloud.Timestamp;
import com.google.cloud.grpc.GrpcTransportOptions;
import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory;
Expand All @@ -35,6 +37,7 @@
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.Empty;
import io.opencensus.common.Scope;
import io.opencensus.trace.Annotation;
import io.opencensus.trace.AttributeValue;
Expand Down Expand Up @@ -767,6 +770,12 @@ public TransactionRunner readWriteTransaction() {
return new SessionPoolTransactionRunner(SessionPool.this, this);
}

@Override
public ApiFuture<Empty> asyncClose() {
close();
return ApiFutures.immediateFuture(Empty.getDefaultInstance());
}

@Override
public void close() {
synchronized (lock) {
Expand Down Expand Up @@ -999,7 +1008,7 @@ private void closeIdleSessions(Instant currTime) {
}
for (PooledSession sess : sessionsToClose) {
logger.log(Level.FINE, "Closing session {0}", sess.getName());
closeSession(sess);
closeSessionAsync(sess);
}
}

Expand Down Expand Up @@ -1612,37 +1621,27 @@ int totalSessions() {
}
}

private void closeSessionAsync(final PooledSession sess) {
executor.submit(
private ApiFuture<Empty> closeSessionAsync(final PooledSession sess) {
ApiFuture<Empty> res = sess.delegate.asyncClose();
res.addListener(
new Runnable() {
@Override
public void run() {
closeSession(sess);
synchronized (lock) {
allSessions.remove(sess);
if (isClosed()) {
decrementPendingClosures(1);
return;
}
// Create a new session if needed to unblock some waiter.
if (numWaiters() > numSessionsBeingCreated) {
createSessions(getAllowedCreateSessions(numWaiters() - numSessionsBeingCreated));
}
}
}
});
}

private void closeSession(PooledSession sess) {
try {
sess.delegate.close();
} catch (SpannerException e) {
// Backend will delete these sessions after a while even if we fail to close them.
if (logger.isLoggable(Level.FINE)) {
logger.log(Level.FINE, "Failed to close session: " + sess.getName(), e);
}
} finally {
synchronized (lock) {
allSessions.remove(sess);
if (isClosed()) {
decrementPendingClosures(1);
return;
}
// Create a new session if needed to unblock some waiter.
if (numWaiters() > numSessionsBeingCreated) {
createSessions(getAllowedCreateSessions(numWaiters() - numSessionsBeingCreated));
}
}
}
},
executor);
return res;
}

private void prepareSession(final PooledSession sess) {
Expand Down
Expand Up @@ -18,6 +18,7 @@

import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException;

import com.google.api.core.ApiFuture;
import com.google.api.core.NanoClock;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.ExecutorProvider;
Expand Down Expand Up @@ -523,9 +524,14 @@ public Session createSession(
@Override
public void deleteSession(String sessionName, @Nullable Map<Option, ?> options)
throws SpannerException {
get(asyncDeleteSession(sessionName, options));
}

@Override
public ApiFuture<Empty> asyncDeleteSession(String sessionName, @Nullable Map<Option, ?> options) {
DeleteSessionRequest request = DeleteSessionRequest.newBuilder().setName(sessionName).build();
GrpcCallContext context = newCallContext(options, sessionName);
get(spannerStub.deleteSessionCallable().futureCall(request, context));
return spannerStub.deleteSessionCallable().futureCall(request, context);
}

@Override
Expand Down
Expand Up @@ -16,6 +16,7 @@

package com.google.cloud.spanner.spi.v1;

import com.google.api.core.ApiFuture;
import com.google.api.gax.longrunning.OperationFuture;
import com.google.cloud.ServiceRpc;
import com.google.cloud.spanner.SpannerException;
Expand Down Expand Up @@ -219,6 +220,9 @@ Session createSession(

void deleteSession(String sessionName, @Nullable Map<Option, ?> options) throws SpannerException;

ApiFuture<Empty> asyncDeleteSession(String sessionName, @Nullable Map<Option, ?> options)
throws SpannerException;

StreamingCall read(
ReadRequest request, ResultStreamConsumer consumer, @Nullable Map<Option, ?> options);

Expand Down
Expand Up @@ -23,8 +23,10 @@
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;

import com.google.api.core.ApiFutures;
import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory;
import com.google.cloud.spanner.SessionPool.Clock;
import com.google.protobuf.Empty;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
Expand Down Expand Up @@ -61,6 +63,7 @@ SessionImpl mockSession() {
when(session.getName())
.thenReturn(
"projects/dummy/instances/dummy/database/dummy/sessions/session" + sessionIndex);
when(session.asyncClose()).thenReturn(ApiFutures.immediateFuture(Empty.getDefaultInstance()));
sessionIndex++;
return session;
}
Expand Down
Expand Up @@ -216,8 +216,9 @@ public static void startStaticServer() throws IOException {
}

@AfterClass
public static void stopServer() {
public static void stopServer() throws InterruptedException {
server.shutdown();
server.awaitTermination();
}

@Before
Expand Down
Expand Up @@ -219,8 +219,9 @@ public static void startStaticServer() throws IOException {
}

@AfterClass
public static void stopServer() {
public static void stopServer() throws InterruptedException {
server.shutdown();
server.awaitTermination();
}

@Before
Expand Down
Expand Up @@ -172,9 +172,10 @@ public static void startStaticServer() throws IOException {
}

@AfterClass
public static void stopServer() {
public static void stopServer() throws InterruptedException {
spannerClient.close();
server.shutdown();
server.awaitTermination();
}

@Before
Expand Down
Expand Up @@ -29,6 +29,7 @@
import io.grpc.StatusRuntimeException;
import io.grpc.inprocess.InProcessServerBuilder;
import java.io.IOException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
Expand All @@ -55,13 +56,19 @@ public static void startStaticServer() throws IOException {
mockSpanner = new MockSpannerServiceImpl();
mockSpanner.setAbortProbability(0.0D); // We don't want any unpredictable aborted transactions.
String uniqueName = InProcessServerBuilder.generateName();
server = InProcessServerBuilder.forName(uniqueName).addService(mockSpanner).build().start();
server =
InProcessServerBuilder.forName(uniqueName)
.scheduledExecutorService(new ScheduledThreadPoolExecutor(1))
.addService(mockSpanner)
.build()
.start();
channelProvider = LocalChannelProvider.create(uniqueName);
}

@AfterClass
public static void stopServer() {
public static void stopServer() throws InterruptedException {
server.shutdown();
server.awaitTermination();
}

@Before
Expand Down
Expand Up @@ -22,9 +22,12 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.cloud.spanner.SessionClient.SessionConsumer;
import com.google.cloud.spanner.SessionPool.SessionConsumerImpl;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.Empty;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
Expand Down Expand Up @@ -152,14 +155,15 @@ public ResultSet answer(InvocationOnMock invocation) throws Throwable {
});
when(mockResult.next()).thenReturn(true);
doAnswer(
new Answer<Void>() {
new Answer<ApiFuture<Empty>>() {

@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
public ApiFuture<Empty> answer(InvocationOnMock invocation) throws Throwable {
synchronized (lock) {
if (expiredSessions.contains(session.getName())) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.NOT_FOUND, "Session not found");
return ApiFutures.immediateFailedFuture(
SpannerExceptionFactory.newSpannerException(
ErrorCode.NOT_FOUND, "Session not found"));
}
if (sessions.remove(session.getName()) == null) {
setFailed(closedSessions.get(session.getName()));
Expand All @@ -169,11 +173,11 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
minSessionsWhenSessionClosed = sessions.size();
}
}
return null;
return ApiFutures.immediateFuture(Empty.getDefaultInstance());
}
})
.when(session)
.close();
.asyncClose();

doAnswer(
new Answer<Void>() {
Expand Down
Expand Up @@ -32,6 +32,8 @@
import static org.mockito.Mockito.when;
import static org.mockito.MockitoAnnotations.initMocks;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.ReadContext.QueryAnalyzeMode;
import com.google.cloud.spanner.SessionClient.SessionConsumer;
Expand All @@ -45,6 +47,7 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.ByteString;
import com.google.protobuf.Empty;
import com.google.spanner.v1.CommitRequest;
import com.google.spanner.v1.ExecuteBatchDmlRequest;
import com.google.spanner.v1.ExecuteSqlRequest;
Expand Down Expand Up @@ -217,8 +220,8 @@ public void run() {
leakedSession.clearLeakedException();
session1.close();
pool.closeAsync().get(5L, TimeUnit.SECONDS);
verify(mockSession1).close();
verify(mockSession2).close();
verify(mockSession1).asyncClose();
verify(mockSession2).asyncClose();
}

@Test
Expand Down Expand Up @@ -874,16 +877,16 @@ public void run() {
.asyncBatchCreateSessions(Mockito.eq(1), any(SessionConsumer.class));
for (Session session : new Session[] {session1, session2, session3}) {
doAnswer(
new Answer<Void>() {
new Answer<ApiFuture<Empty>>() {

@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
public ApiFuture<Empty> answer(InvocationOnMock invocation) throws Throwable {
numSessionClosed.incrementAndGet();
return null;
return ApiFutures.immediateFuture(Empty.getDefaultInstance());
}
})
.when(session)
.close();
.asyncClose();
}
FakeClock clock = new FakeClock();
clock.currentTimeMillis = System.currentTimeMillis();
Expand Down Expand Up @@ -1161,6 +1164,8 @@ public void testSessionNotFoundReadWriteTransaction() {
SpannerRpc.StreamingCall closedStreamingCall = mock(SpannerRpc.StreamingCall.class);
doThrow(sessionNotFound).when(closedStreamingCall).request(Mockito.anyInt());
SpannerRpc rpc = mock(SpannerRpc.class);
when(rpc.asyncDeleteSession(Mockito.anyString(), Mockito.anyMap()))
.thenReturn(ApiFutures.immediateFuture(Empty.getDefaultInstance()));
when(rpc.executeQuery(
any(ExecuteSqlRequest.class), any(ResultStreamConsumer.class), any(Map.class)))
.thenReturn(closedStreamingCall);
Expand All @@ -1177,13 +1182,17 @@ public void testSessionNotFoundReadWriteTransaction() {
hasPreparedTransaction ? ByteString.copyFromUtf8("test-txn") : null;
final TransactionContextImpl closedTransactionContext =
new TransactionContextImpl(closedSession, preparedTransactionId, rpc, 10);
when(closedSession.asyncClose())
.thenReturn(ApiFutures.immediateFuture(Empty.getDefaultInstance()));
when(closedSession.newTransaction()).thenReturn(closedTransactionContext);
when(closedSession.beginTransaction()).thenThrow(sessionNotFound);
TransactionRunnerImpl closedTransactionRunner =
new TransactionRunnerImpl(closedSession, rpc, 10);
when(closedSession.readWriteTransaction()).thenReturn(closedTransactionRunner);

final SessionImpl openSession = mock(SessionImpl.class);
when(openSession.asyncClose())
.thenReturn(ApiFutures.immediateFuture(Empty.getDefaultInstance()));
when(openSession.getName())
.thenReturn("projects/dummy/instances/dummy/database/dummy/sessions/session-open");
final TransactionContextImpl openTransactionContext = mock(TransactionContextImpl.class);
Expand Down

0 comments on commit ab25087

Please sign in to comment.