Skip to content

Commit

Permalink
Revert "perf: close sessions async (#24)" (#43)
Browse files Browse the repository at this point in the history
This reverts commit ab25087.
  • Loading branch information
skuruppu authored and hengfengli committed Jan 23, 2020
1 parent 5a4fc74 commit 809ed88
Show file tree
Hide file tree
Showing 17 changed files with 49 additions and 111 deletions.
Expand Up @@ -16,9 +16,6 @@

package com.google.cloud.spanner;

import com.google.api.core.ApiFuture;
import com.google.protobuf.Empty;

/**
* A {@code Session} can be used to perform transactions that read and/or modify data in a Cloud
* Spanner database.
Expand Down Expand Up @@ -57,10 +54,4 @@ public interface Session extends DatabaseClient, AutoCloseable {

@Override
void close();

/**
* Closes the session asynchronously and returns the {@link ApiFuture} that can be used to monitor
* the operation progress.
*/
ApiFuture<Empty> asyncClose();
}
Expand Up @@ -19,7 +19,6 @@
import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException;
import static com.google.common.base.Preconditions.checkNotNull;

import com.google.api.core.ApiFuture;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.AbstractReadContext.MultiUseReadOnlyTransaction;
import com.google.cloud.spanner.AbstractReadContext.SingleReadContext;
Expand All @@ -28,7 +27,6 @@
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
import com.google.protobuf.Empty;
import com.google.spanner.v1.BeginTransactionRequest;
import com.google.spanner.v1.CommitRequest;
import com.google.spanner.v1.CommitResponse;
Expand Down Expand Up @@ -198,11 +196,6 @@ public void prepareReadWriteTransaction() {
readyTransactionId = beginTransaction();
}

@Override
public ApiFuture<Empty> asyncClose() {
return spanner.getRpc().asyncDeleteSession(name, options);
}

@Override
public void close() {
Span span = tracer.spanBuilder(SpannerImpl.DELETE_SESSION).startSpan();
Expand Down
Expand Up @@ -18,8 +18,6 @@

import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.cloud.Timestamp;
import com.google.cloud.grpc.GrpcTransportOptions;
import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory;
Expand All @@ -37,7 +35,6 @@
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.Empty;
import io.opencensus.common.Scope;
import io.opencensus.trace.Annotation;
import io.opencensus.trace.AttributeValue;
Expand Down Expand Up @@ -770,12 +767,6 @@ public TransactionRunner readWriteTransaction() {
return new SessionPoolTransactionRunner(SessionPool.this, this);
}

@Override
public ApiFuture<Empty> asyncClose() {
close();
return ApiFutures.immediateFuture(Empty.getDefaultInstance());
}

@Override
public void close() {
synchronized (lock) {
Expand Down Expand Up @@ -1008,7 +999,7 @@ private void closeIdleSessions(Instant currTime) {
}
for (PooledSession sess : sessionsToClose) {
logger.log(Level.FINE, "Closing session {0}", sess.getName());
closeSessionAsync(sess);
closeSession(sess);
}
}

Expand Down Expand Up @@ -1621,27 +1612,37 @@ int totalSessions() {
}
}

private ApiFuture<Empty> closeSessionAsync(final PooledSession sess) {
ApiFuture<Empty> res = sess.delegate.asyncClose();
res.addListener(
private void closeSessionAsync(final PooledSession sess) {
executor.submit(
new Runnable() {
@Override
public void run() {
synchronized (lock) {
allSessions.remove(sess);
if (isClosed()) {
decrementPendingClosures(1);
return;
}
// Create a new session if needed to unblock some waiter.
if (numWaiters() > numSessionsBeingCreated) {
createSessions(getAllowedCreateSessions(numWaiters() - numSessionsBeingCreated));
}
}
closeSession(sess);
}
},
executor);
return res;
});
}

private void closeSession(PooledSession sess) {
try {
sess.delegate.close();
} catch (SpannerException e) {
// Backend will delete these sessions after a while even if we fail to close them.
if (logger.isLoggable(Level.FINE)) {
logger.log(Level.FINE, "Failed to close session: " + sess.getName(), e);
}
} finally {
synchronized (lock) {
allSessions.remove(sess);
if (isClosed()) {
decrementPendingClosures(1);
return;
}
// Create a new session if needed to unblock some waiter.
if (numWaiters() > numSessionsBeingCreated) {
createSessions(getAllowedCreateSessions(numWaiters() - numSessionsBeingCreated));
}
}
}
}

private void prepareSession(final PooledSession sess) {
Expand Down
Expand Up @@ -18,7 +18,6 @@

import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException;

import com.google.api.core.ApiFuture;
import com.google.api.core.NanoClock;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.ExecutorProvider;
Expand Down Expand Up @@ -524,14 +523,9 @@ public Session createSession(
@Override
public void deleteSession(String sessionName, @Nullable Map<Option, ?> options)
throws SpannerException {
get(asyncDeleteSession(sessionName, options));
}

@Override
public ApiFuture<Empty> asyncDeleteSession(String sessionName, @Nullable Map<Option, ?> options) {
DeleteSessionRequest request = DeleteSessionRequest.newBuilder().setName(sessionName).build();
GrpcCallContext context = newCallContext(options, sessionName);
return spannerStub.deleteSessionCallable().futureCall(request, context);
get(spannerStub.deleteSessionCallable().futureCall(request, context));
}

@Override
Expand Down
Expand Up @@ -16,7 +16,6 @@

package com.google.cloud.spanner.spi.v1;

import com.google.api.core.ApiFuture;
import com.google.api.gax.longrunning.OperationFuture;
import com.google.cloud.ServiceRpc;
import com.google.cloud.spanner.SpannerException;
Expand Down Expand Up @@ -220,9 +219,6 @@ Session createSession(

void deleteSession(String sessionName, @Nullable Map<Option, ?> options) throws SpannerException;

ApiFuture<Empty> asyncDeleteSession(String sessionName, @Nullable Map<Option, ?> options)
throws SpannerException;

StreamingCall read(
ReadRequest request, ResultStreamConsumer consumer, @Nullable Map<Option, ?> options);

Expand Down
Expand Up @@ -23,10 +23,8 @@
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;

import com.google.api.core.ApiFutures;
import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory;
import com.google.cloud.spanner.SessionPool.Clock;
import com.google.protobuf.Empty;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
Expand Down Expand Up @@ -63,7 +61,6 @@ SessionImpl mockSession() {
when(session.getName())
.thenReturn(
"projects/dummy/instances/dummy/database/dummy/sessions/session" + sessionIndex);
when(session.asyncClose()).thenReturn(ApiFutures.immediateFuture(Empty.getDefaultInstance()));
sessionIndex++;
return session;
}
Expand Down
Expand Up @@ -216,9 +216,8 @@ public static void startStaticServer() throws IOException {
}

@AfterClass
public static void stopServer() throws InterruptedException {
public static void stopServer() {
server.shutdown();
server.awaitTermination();
}

@Before
Expand Down
Expand Up @@ -219,9 +219,8 @@ public static void startStaticServer() throws IOException {
}

@AfterClass
public static void stopServer() throws InterruptedException {
public static void stopServer() {
server.shutdown();
server.awaitTermination();
}

@Before
Expand Down
Expand Up @@ -172,10 +172,9 @@ public static void startStaticServer() throws IOException {
}

@AfterClass
public static void stopServer() throws InterruptedException {
public static void stopServer() {
spannerClient.close();
server.shutdown();
server.awaitTermination();
}

@Before
Expand Down
Expand Up @@ -29,7 +29,6 @@
import io.grpc.StatusRuntimeException;
import io.grpc.inprocess.InProcessServerBuilder;
import java.io.IOException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
Expand All @@ -56,19 +55,13 @@ public static void startStaticServer() throws IOException {
mockSpanner = new MockSpannerServiceImpl();
mockSpanner.setAbortProbability(0.0D); // We don't want any unpredictable aborted transactions.
String uniqueName = InProcessServerBuilder.generateName();
server =
InProcessServerBuilder.forName(uniqueName)
.scheduledExecutorService(new ScheduledThreadPoolExecutor(1))
.addService(mockSpanner)
.build()
.start();
server = InProcessServerBuilder.forName(uniqueName).addService(mockSpanner).build().start();
channelProvider = LocalChannelProvider.create(uniqueName);
}

@AfterClass
public static void stopServer() throws InterruptedException {
public static void stopServer() {
server.shutdown();
server.awaitTermination();
}

@Before
Expand Down
Expand Up @@ -22,12 +22,9 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.cloud.spanner.SessionClient.SessionConsumer;
import com.google.cloud.spanner.SessionPool.SessionConsumerImpl;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.Empty;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
Expand Down Expand Up @@ -155,15 +152,14 @@ public ResultSet answer(InvocationOnMock invocation) throws Throwable {
});
when(mockResult.next()).thenReturn(true);
doAnswer(
new Answer<ApiFuture<Empty>>() {
new Answer<Void>() {

@Override
public ApiFuture<Empty> answer(InvocationOnMock invocation) throws Throwable {
public Void answer(InvocationOnMock invocation) throws Throwable {
synchronized (lock) {
if (expiredSessions.contains(session.getName())) {
return ApiFutures.immediateFailedFuture(
SpannerExceptionFactory.newSpannerException(
ErrorCode.NOT_FOUND, "Session not found"));
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.NOT_FOUND, "Session not found");
}
if (sessions.remove(session.getName()) == null) {
setFailed(closedSessions.get(session.getName()));
Expand All @@ -173,11 +169,11 @@ public ApiFuture<Empty> answer(InvocationOnMock invocation) throws Throwable {
minSessionsWhenSessionClosed = sessions.size();
}
}
return ApiFutures.immediateFuture(Empty.getDefaultInstance());
return null;
}
})
.when(session)
.asyncClose();
.close();

doAnswer(
new Answer<Void>() {
Expand Down
Expand Up @@ -32,8 +32,6 @@
import static org.mockito.Mockito.when;
import static org.mockito.MockitoAnnotations.initMocks;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.ReadContext.QueryAnalyzeMode;
import com.google.cloud.spanner.SessionClient.SessionConsumer;
Expand All @@ -47,7 +45,6 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.ByteString;
import com.google.protobuf.Empty;
import com.google.spanner.v1.CommitRequest;
import com.google.spanner.v1.ExecuteBatchDmlRequest;
import com.google.spanner.v1.ExecuteSqlRequest;
Expand Down Expand Up @@ -220,8 +217,8 @@ public void run() {
leakedSession.clearLeakedException();
session1.close();
pool.closeAsync().get(5L, TimeUnit.SECONDS);
verify(mockSession1).asyncClose();
verify(mockSession2).asyncClose();
verify(mockSession1).close();
verify(mockSession2).close();
}

@Test
Expand Down Expand Up @@ -877,16 +874,16 @@ public void run() {
.asyncBatchCreateSessions(Mockito.eq(1), any(SessionConsumer.class));
for (Session session : new Session[] {session1, session2, session3}) {
doAnswer(
new Answer<ApiFuture<Empty>>() {
new Answer<Void>() {

@Override
public ApiFuture<Empty> answer(InvocationOnMock invocation) throws Throwable {
public Void answer(InvocationOnMock invocation) throws Throwable {
numSessionClosed.incrementAndGet();
return ApiFutures.immediateFuture(Empty.getDefaultInstance());
return null;
}
})
.when(session)
.asyncClose();
.close();
}
FakeClock clock = new FakeClock();
clock.currentTimeMillis = System.currentTimeMillis();
Expand Down Expand Up @@ -1164,8 +1161,6 @@ public void testSessionNotFoundReadWriteTransaction() {
SpannerRpc.StreamingCall closedStreamingCall = mock(SpannerRpc.StreamingCall.class);
doThrow(sessionNotFound).when(closedStreamingCall).request(Mockito.anyInt());
SpannerRpc rpc = mock(SpannerRpc.class);
when(rpc.asyncDeleteSession(Mockito.anyString(), Mockito.anyMap()))
.thenReturn(ApiFutures.immediateFuture(Empty.getDefaultInstance()));
when(rpc.executeQuery(
any(ExecuteSqlRequest.class), any(ResultStreamConsumer.class), any(Map.class)))
.thenReturn(closedStreamingCall);
Expand All @@ -1182,17 +1177,13 @@ public void testSessionNotFoundReadWriteTransaction() {
hasPreparedTransaction ? ByteString.copyFromUtf8("test-txn") : null;
final TransactionContextImpl closedTransactionContext =
new TransactionContextImpl(closedSession, preparedTransactionId, rpc, 10);
when(closedSession.asyncClose())
.thenReturn(ApiFutures.immediateFuture(Empty.getDefaultInstance()));
when(closedSession.newTransaction()).thenReturn(closedTransactionContext);
when(closedSession.beginTransaction()).thenThrow(sessionNotFound);
TransactionRunnerImpl closedTransactionRunner =
new TransactionRunnerImpl(closedSession, rpc, 10);
when(closedSession.readWriteTransaction()).thenReturn(closedTransactionRunner);

final SessionImpl openSession = mock(SessionImpl.class);
when(openSession.asyncClose())
.thenReturn(ApiFutures.immediateFuture(Empty.getDefaultInstance()));
when(openSession.getName())
.thenReturn("projects/dummy/instances/dummy/database/dummy/sessions/session-open");
final TransactionContextImpl openTransactionContext = mock(TransactionContextImpl.class);
Expand Down

0 comments on commit 809ed88

Please sign in to comment.