Skip to content

Commit

Permalink
perf: close sessions async
Browse files Browse the repository at this point in the history
Sessions should be closed using an async gRPC call in order
to speed up the closing of a large session pool. Instead of
using its own executor, which is limited to 8 worker threads,
to execute asynchronous delete session calls, the session pool
should use the asynchronous call option in gRPC. This allows a
larger number of asynchronous delete session calls to be executed
in parallel and speeds up closing a session pool with a large
number of sessions.

Testing against a real Cloud Spanner database with a session pool
containing 1,000 sessions shows the following performance for
closing the session pool:

Before (3 runs):
6603ms
8169ms
8367ms

After (3 runs):
1297ms
1710ms
1851ms

Fixes #19.
  • Loading branch information
olavloite committed Jan 9, 2020
1 parent 93c0aed commit d528a0b
Show file tree
Hide file tree
Showing 11 changed files with 113 additions and 40 deletions.
Expand Up @@ -16,6 +16,9 @@

package com.google.cloud.spanner;

import com.google.api.core.ApiFuture;
import com.google.protobuf.Empty;

/**
* A {@code Session} can be used to perform transactions that read and/or modify data in a Cloud
* Spanner database.
Expand Down Expand Up @@ -54,4 +57,10 @@ public interface Session extends DatabaseClient, AutoCloseable {

@Override
void close();

/**
* Closes the session asynchronously and returns the {@link ApiFuture} that can be used to monitor
* the operation progress.
*/
ApiFuture<Empty> asyncClose();
}
Expand Up @@ -19,14 +19,17 @@
import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException;
import static com.google.common.base.Preconditions.checkNotNull;

import com.google.api.core.ApiFuture;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.AbstractReadContext.MultiUseReadOnlyTransaction;
import com.google.cloud.spanner.AbstractReadContext.SingleReadContext;
import com.google.cloud.spanner.AbstractReadContext.SingleUseReadOnlyTransaction;
import com.google.cloud.spanner.TransactionRunnerImpl.TransactionContextImpl;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.protobuf.Empty;
import com.google.spanner.v1.BeginTransactionRequest;
import com.google.spanner.v1.CommitRequest;
import com.google.spanner.v1.CommitResponse;
Expand Down Expand Up @@ -196,6 +199,27 @@ public void prepareReadWriteTransaction() {
readyTransactionId = beginTransaction();
}

@Override
public ApiFuture<Empty> asyncClose() {
final Span span = tracer.spanBuilder(SpannerImpl.DELETE_SESSION).startSpan();
final ApiFuture<Empty> res = spanner.getRpc().asyncDeleteSession(name, options);
res.addListener(
new Runnable() {
@Override
public void run() {
try {
// Get the result to trigger an exception if the operation failed.
res.get();
span.end();
} catch (Exception e) {
TraceUtil.endSpanWithFailure(span, e);
}
}
},
MoreExecutors.directExecutor());
return res;
}

@Override
public void close() {
Span span = tracer.spanBuilder(SpannerImpl.DELETE_SESSION).startSpan();
Expand Down
Expand Up @@ -18,6 +18,8 @@

import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.cloud.Timestamp;
import com.google.cloud.grpc.GrpcTransportOptions;
import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory;
Expand All @@ -34,6 +36,7 @@
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.Empty;
import io.opencensus.common.Scope;
import io.opencensus.trace.Annotation;
import io.opencensus.trace.AttributeValue;
Expand Down Expand Up @@ -766,6 +769,12 @@ public TransactionRunner readWriteTransaction() {
return new SessionPoolTransactionRunner(SessionPool.this, this);
}

@Override
public ApiFuture<Empty> asyncClose() {
close();
return ApiFutures.immediateFuture(Empty.getDefaultInstance());
}

@Override
public void close() {
synchronized (lock) {
Expand Down Expand Up @@ -989,7 +998,7 @@ private void closeIdleSessions(Instant currTime) {
}
for (PooledSession sess : sessionsToClose) {
logger.log(Level.FINE, "Closing session {0}", sess.getName());
closeSession(sess);
closeSessionAsync(sess);
}
}

Expand Down Expand Up @@ -1566,37 +1575,27 @@ int totalSessions() {
}
}

private void closeSessionAsync(final PooledSession sess) {
executor.submit(
private ApiFuture<Empty> closeSessionAsync(final PooledSession sess) {
ApiFuture<Empty> res = sess.delegate.asyncClose();
res.addListener(
new Runnable() {
@Override
public void run() {
closeSession(sess);
synchronized (lock) {
allSessions.remove(sess);
if (isClosed()) {
decrementPendingClosures(1);
return;
}
// Create a new session if needed to unblock some waiter.
if (numWaiters() > numSessionsBeingCreated) {
createSessions(getAllowedCreateSessions(numWaiters() - numSessionsBeingCreated));
}
}
}
});
}

private void closeSession(PooledSession sess) {
try {
sess.delegate.close();
} catch (SpannerException e) {
// Backend will delete these sessions after a while even if we fail to close them.
if (logger.isLoggable(Level.FINE)) {
logger.log(Level.FINE, "Failed to close session: " + sess.getName(), e);
}
} finally {
synchronized (lock) {
allSessions.remove(sess);
if (isClosed()) {
decrementPendingClosures(1);
return;
}
// Create a new session if needed to unblock some waiter.
if (numWaiters() > numSessionsBeingCreated) {
createSessions(getAllowedCreateSessions(numWaiters() - numSessionsBeingCreated));
}
}
}
},
MoreExecutors.directExecutor());
return res;
}

private void prepareSession(final PooledSession sess) {
Expand Down
Expand Up @@ -18,6 +18,7 @@

import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException;

import com.google.api.core.ApiFuture;
import com.google.api.core.NanoClock;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.ExecutorProvider;
Expand Down Expand Up @@ -513,9 +514,14 @@ public Session createSession(
@Override
public void deleteSession(String sessionName, @Nullable Map<Option, ?> options)
throws SpannerException {
get(asyncDeleteSession(sessionName, options));
}

@Override
public ApiFuture<Empty> asyncDeleteSession(String sessionName, @Nullable Map<Option, ?> options) {
DeleteSessionRequest request = DeleteSessionRequest.newBuilder().setName(sessionName).build();
GrpcCallContext context = newCallContext(options, sessionName);
get(spannerStub.deleteSessionCallable().futureCall(request, context));
return spannerStub.deleteSessionCallable().futureCall(request, context);
}

@Override
Expand Down
Expand Up @@ -16,6 +16,7 @@

package com.google.cloud.spanner.spi.v1;

import com.google.api.core.ApiFuture;
import com.google.api.gax.longrunning.OperationFuture;
import com.google.cloud.ServiceRpc;
import com.google.cloud.spanner.SpannerException;
Expand Down Expand Up @@ -219,6 +220,9 @@ Session createSession(

void deleteSession(String sessionName, @Nullable Map<Option, ?> options) throws SpannerException;

ApiFuture<Empty> asyncDeleteSession(String sessionName, @Nullable Map<Option, ?> options)
throws SpannerException;

StreamingCall read(
ReadRequest request, ResultStreamConsumer consumer, @Nullable Map<Option, ?> options);

Expand Down
Expand Up @@ -23,8 +23,10 @@
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;

import com.google.api.core.ApiFutures;
import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory;
import com.google.cloud.spanner.SessionPool.Clock;
import com.google.protobuf.Empty;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
Expand Down Expand Up @@ -61,6 +63,7 @@ SessionImpl mockSession() {
when(session.getName())
.thenReturn(
"projects/dummy/instances/dummy/database/dummy/sessions/session" + sessionIndex);
when(session.asyncClose()).thenReturn(ApiFutures.immediateFuture(Empty.getDefaultInstance()));
sessionIndex++;
return session;
}
Expand Down
Expand Up @@ -29,6 +29,7 @@
import io.grpc.StatusRuntimeException;
import io.grpc.inprocess.InProcessServerBuilder;
import java.io.IOException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
Expand All @@ -55,7 +56,12 @@ public static void startStaticServer() throws IOException {
mockSpanner = new MockSpannerServiceImpl();
mockSpanner.setAbortProbability(0.0D); // We don't want any unpredictable aborted transactions.
String uniqueName = InProcessServerBuilder.generateName();
server = InProcessServerBuilder.forName(uniqueName).addService(mockSpanner).build().start();
server =
InProcessServerBuilder.forName(uniqueName)
.scheduledExecutorService(new ScheduledThreadPoolExecutor(1))
.addService(mockSpanner)
.build()
.start();
channelProvider = LocalChannelProvider.create(uniqueName);
}

Expand Down
Expand Up @@ -22,9 +22,12 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.cloud.spanner.SessionClient.SessionConsumer;
import com.google.cloud.spanner.SessionPool.SessionConsumerImpl;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.Empty;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
Expand Down Expand Up @@ -139,6 +142,8 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
private void setupSession(final Session session) {
ReadContext mockContext = mock(ReadContext.class);
final ResultSet mockResult = mock(ResultSet.class);
//
// when(session.asyncClose()).thenReturn(ApiFutures.immediateFuture(Empty.getDefaultInstance()));
when(session.singleUse(any(TimestampBound.class))).thenReturn(mockContext);
when(mockContext.executeQuery(any(Statement.class)))
.thenAnswer(
Expand All @@ -152,10 +157,10 @@ public ResultSet answer(InvocationOnMock invocation) throws Throwable {
});
when(mockResult.next()).thenReturn(true);
doAnswer(
new Answer<Void>() {
new Answer<ApiFuture<Empty>>() {

@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
public ApiFuture<Empty> answer(InvocationOnMock invocation) throws Throwable {
synchronized (lock) {
if (expiredSessions.contains(session.getName())) {
throw SpannerExceptionFactory.newSpannerException(
Expand All @@ -169,11 +174,11 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
minSessionsWhenSessionClosed = sessions.size();
}
}
return null;
return ApiFutures.immediateFuture(Empty.getDefaultInstance());
}
})
.when(session)
.close();
.asyncClose();

doAnswer(
new Answer<Void>() {
Expand Down
Expand Up @@ -32,6 +32,8 @@
import static org.mockito.Mockito.when;
import static org.mockito.MockitoAnnotations.initMocks;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.ReadContext.QueryAnalyzeMode;
import com.google.cloud.spanner.SessionClient.SessionConsumer;
Expand All @@ -45,6 +47,7 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.ByteString;
import com.google.protobuf.Empty;
import com.google.spanner.v1.CommitRequest;
import com.google.spanner.v1.ExecuteBatchDmlRequest;
import com.google.spanner.v1.ExecuteSqlRequest;
Expand Down Expand Up @@ -217,8 +220,8 @@ public void run() {
leakedSession.clearLeakedException();
session1.close();
pool.closeAsync().get(5L, TimeUnit.SECONDS);
verify(mockSession1).close();
verify(mockSession2).close();
verify(mockSession1).asyncClose();
verify(mockSession2).asyncClose();
}

@Test
Expand Down Expand Up @@ -874,16 +877,16 @@ public void run() {
.asyncBatchCreateSessions(Mockito.eq(1), any(SessionConsumer.class));
for (Session session : new Session[] {session1, session2, session3}) {
doAnswer(
new Answer<Void>() {
new Answer<ApiFuture<Empty>>() {

@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
public ApiFuture<Empty> answer(InvocationOnMock invocation) throws Throwable {
numSessionClosed.incrementAndGet();
return null;
return ApiFutures.immediateFuture(Empty.getDefaultInstance());
}
})
.when(session)
.close();
.asyncClose();
}
FakeClock clock = new FakeClock();
clock.currentTimeMillis = System.currentTimeMillis();
Expand Down Expand Up @@ -1161,6 +1164,8 @@ public void testSessionNotFoundReadWriteTransaction() {
SpannerRpc.StreamingCall closedStreamingCall = mock(SpannerRpc.StreamingCall.class);
doThrow(sessionNotFound).when(closedStreamingCall).request(Mockito.anyInt());
SpannerRpc rpc = mock(SpannerRpc.class);
when(rpc.asyncDeleteSession(Mockito.anyString(), Mockito.anyMap()))
.thenReturn(ApiFutures.immediateFuture(Empty.getDefaultInstance()));
when(rpc.executeQuery(
any(ExecuteSqlRequest.class), any(ResultStreamConsumer.class), any(Map.class)))
.thenReturn(closedStreamingCall);
Expand All @@ -1177,13 +1182,17 @@ public void testSessionNotFoundReadWriteTransaction() {
hasPreparedTransaction ? ByteString.copyFromUtf8("test-txn") : null;
final TransactionContextImpl closedTransactionContext =
new TransactionContextImpl(closedSession, preparedTransactionId, rpc, 10);
when(closedSession.asyncClose())
.thenReturn(ApiFutures.immediateFuture(Empty.getDefaultInstance()));
when(closedSession.newTransaction()).thenReturn(closedTransactionContext);
when(closedSession.beginTransaction()).thenThrow(sessionNotFound);
TransactionRunnerImpl closedTransactionRunner =
new TransactionRunnerImpl(closedSession, rpc, 10);
when(closedSession.readWriteTransaction()).thenReturn(closedTransactionRunner);

final SessionImpl openSession = mock(SessionImpl.class);
when(openSession.asyncClose())
.thenReturn(ApiFutures.immediateFuture(Empty.getDefaultInstance()));
when(openSession.getName())
.thenReturn("projects/dummy/instances/dummy/database/dummy/sessions/session-open");
final TransactionContextImpl openTransactionContext = mock(TransactionContextImpl.class);
Expand Down
Expand Up @@ -25,12 +25,14 @@
import static org.mockito.Mockito.when;
import static org.mockito.MockitoAnnotations.initMocks;

import com.google.api.core.ApiFutures;
import com.google.cloud.Timestamp;
import com.google.cloud.grpc.GrpcTransportOptions;
import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory;
import com.google.cloud.spanner.TransactionManager.TransactionState;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.protobuf.ByteString;
import com.google.protobuf.Empty;
import com.google.spanner.v1.BeginTransactionRequest;
import com.google.spanner.v1.CommitRequest;
import com.google.spanner.v1.CommitResponse;
Expand Down Expand Up @@ -198,6 +200,8 @@ public void usesPreparedTransaction() {
when(options.getSessionPoolOptions()).thenReturn(sessionPoolOptions);
when(options.getSessionLabels()).thenReturn(Collections.<String, String>emptyMap());
SpannerRpc rpc = mock(SpannerRpc.class);
when(rpc.asyncDeleteSession(Mockito.anyString(), Mockito.anyMap()))
.thenReturn(ApiFutures.immediateFuture(Empty.getDefaultInstance()));
when(rpc.batchCreateSessions(
Mockito.anyString(), Mockito.eq(1), Mockito.anyMap(), Mockito.anyMap()))
.thenAnswer(
Expand Down

0 comments on commit d528a0b

Please sign in to comment.