Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "perf: close sessions async" #43

Merged
merged 1 commit into from Jan 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -16,9 +16,6 @@

package com.google.cloud.spanner;

import com.google.api.core.ApiFuture;
import com.google.protobuf.Empty;

/**
* A {@code Session} can be used to perform transactions that read and/or modify data in a Cloud
* Spanner database.
Expand Down Expand Up @@ -57,10 +54,4 @@ public interface Session extends DatabaseClient, AutoCloseable {

@Override
void close();

/**
* Closes the session asynchronously and returns the {@link ApiFuture} that can be used to monitor
* the operation progress.
*/
ApiFuture<Empty> asyncClose();
}
Expand Up @@ -19,7 +19,6 @@
import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException;
import static com.google.common.base.Preconditions.checkNotNull;

import com.google.api.core.ApiFuture;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.AbstractReadContext.MultiUseReadOnlyTransaction;
import com.google.cloud.spanner.AbstractReadContext.SingleReadContext;
Expand All @@ -28,7 +27,6 @@
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
import com.google.protobuf.Empty;
import com.google.spanner.v1.BeginTransactionRequest;
import com.google.spanner.v1.CommitRequest;
import com.google.spanner.v1.CommitResponse;
Expand Down Expand Up @@ -198,11 +196,6 @@ public void prepareReadWriteTransaction() {
readyTransactionId = beginTransaction();
}

@Override
public ApiFuture<Empty> asyncClose() {
return spanner.getRpc().asyncDeleteSession(name, options);
}

@Override
public void close() {
Span span = tracer.spanBuilder(SpannerImpl.DELETE_SESSION).startSpan();
Expand Down
Expand Up @@ -18,8 +18,6 @@

import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.cloud.Timestamp;
import com.google.cloud.grpc.GrpcTransportOptions;
import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory;
Expand All @@ -37,7 +35,6 @@
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.Empty;
import io.opencensus.common.Scope;
import io.opencensus.trace.Annotation;
import io.opencensus.trace.AttributeValue;
Expand Down Expand Up @@ -770,12 +767,6 @@ public TransactionRunner readWriteTransaction() {
return new SessionPoolTransactionRunner(SessionPool.this, this);
}

@Override
public ApiFuture<Empty> asyncClose() {
close();
return ApiFutures.immediateFuture(Empty.getDefaultInstance());
}

@Override
public void close() {
synchronized (lock) {
Expand Down Expand Up @@ -1008,7 +999,7 @@ private void closeIdleSessions(Instant currTime) {
}
for (PooledSession sess : sessionsToClose) {
logger.log(Level.FINE, "Closing session {0}", sess.getName());
closeSessionAsync(sess);
closeSession(sess);
}
}

Expand Down Expand Up @@ -1621,27 +1612,37 @@ int totalSessions() {
}
}

private ApiFuture<Empty> closeSessionAsync(final PooledSession sess) {
ApiFuture<Empty> res = sess.delegate.asyncClose();
res.addListener(
private void closeSessionAsync(final PooledSession sess) {
executor.submit(
new Runnable() {
@Override
public void run() {
synchronized (lock) {
allSessions.remove(sess);
if (isClosed()) {
decrementPendingClosures(1);
return;
}
// Create a new session if needed to unblock some waiter.
if (numWaiters() > numSessionsBeingCreated) {
createSessions(getAllowedCreateSessions(numWaiters() - numSessionsBeingCreated));
}
}
closeSession(sess);
}
},
executor);
return res;
});
}

private void closeSession(PooledSession sess) {
try {
sess.delegate.close();
} catch (SpannerException e) {
// Backend will delete these sessions after a while even if we fail to close them.
if (logger.isLoggable(Level.FINE)) {
logger.log(Level.FINE, "Failed to close session: " + sess.getName(), e);
}
} finally {
synchronized (lock) {
allSessions.remove(sess);
if (isClosed()) {
decrementPendingClosures(1);
return;
}
// Create a new session if needed to unblock some waiter.
if (numWaiters() > numSessionsBeingCreated) {
createSessions(getAllowedCreateSessions(numWaiters() - numSessionsBeingCreated));
}
}
}
}

private void prepareSession(final PooledSession sess) {
Expand Down
Expand Up @@ -18,7 +18,6 @@

import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException;

import com.google.api.core.ApiFuture;
import com.google.api.core.NanoClock;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.ExecutorProvider;
Expand Down Expand Up @@ -524,14 +523,9 @@ public Session createSession(
@Override
public void deleteSession(String sessionName, @Nullable Map<Option, ?> options)
throws SpannerException {
get(asyncDeleteSession(sessionName, options));
}

@Override
public ApiFuture<Empty> asyncDeleteSession(String sessionName, @Nullable Map<Option, ?> options) {
DeleteSessionRequest request = DeleteSessionRequest.newBuilder().setName(sessionName).build();
GrpcCallContext context = newCallContext(options, sessionName);
return spannerStub.deleteSessionCallable().futureCall(request, context);
get(spannerStub.deleteSessionCallable().futureCall(request, context));
}

@Override
Expand Down
Expand Up @@ -16,7 +16,6 @@

package com.google.cloud.spanner.spi.v1;

import com.google.api.core.ApiFuture;
import com.google.api.gax.longrunning.OperationFuture;
import com.google.cloud.ServiceRpc;
import com.google.cloud.spanner.SpannerException;
Expand Down Expand Up @@ -220,9 +219,6 @@ Session createSession(

void deleteSession(String sessionName, @Nullable Map<Option, ?> options) throws SpannerException;

ApiFuture<Empty> asyncDeleteSession(String sessionName, @Nullable Map<Option, ?> options)
throws SpannerException;

StreamingCall read(
ReadRequest request, ResultStreamConsumer consumer, @Nullable Map<Option, ?> options);

Expand Down
Expand Up @@ -23,10 +23,8 @@
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;

import com.google.api.core.ApiFutures;
import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory;
import com.google.cloud.spanner.SessionPool.Clock;
import com.google.protobuf.Empty;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
Expand Down Expand Up @@ -63,7 +61,6 @@ SessionImpl mockSession() {
when(session.getName())
.thenReturn(
"projects/dummy/instances/dummy/database/dummy/sessions/session" + sessionIndex);
when(session.asyncClose()).thenReturn(ApiFutures.immediateFuture(Empty.getDefaultInstance()));
sessionIndex++;
return session;
}
Expand Down
Expand Up @@ -216,9 +216,8 @@ public static void startStaticServer() throws IOException {
}

@AfterClass
public static void stopServer() throws InterruptedException {
public static void stopServer() {
server.shutdown();
server.awaitTermination();
}

@Before
Expand Down
Expand Up @@ -219,9 +219,8 @@ public static void startStaticServer() throws IOException {
}

@AfterClass
public static void stopServer() throws InterruptedException {
public static void stopServer() {
server.shutdown();
server.awaitTermination();
}

@Before
Expand Down
Expand Up @@ -172,10 +172,9 @@ public static void startStaticServer() throws IOException {
}

@AfterClass
public static void stopServer() throws InterruptedException {
public static void stopServer() {
spannerClient.close();
server.shutdown();
server.awaitTermination();
}

@Before
Expand Down
Expand Up @@ -29,7 +29,6 @@
import io.grpc.StatusRuntimeException;
import io.grpc.inprocess.InProcessServerBuilder;
import java.io.IOException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
Expand All @@ -56,19 +55,13 @@ public static void startStaticServer() throws IOException {
mockSpanner = new MockSpannerServiceImpl();
mockSpanner.setAbortProbability(0.0D); // We don't want any unpredictable aborted transactions.
String uniqueName = InProcessServerBuilder.generateName();
server =
InProcessServerBuilder.forName(uniqueName)
.scheduledExecutorService(new ScheduledThreadPoolExecutor(1))
.addService(mockSpanner)
.build()
.start();
server = InProcessServerBuilder.forName(uniqueName).addService(mockSpanner).build().start();
channelProvider = LocalChannelProvider.create(uniqueName);
}

@AfterClass
public static void stopServer() throws InterruptedException {
public static void stopServer() {
server.shutdown();
server.awaitTermination();
}

@Before
Expand Down
Expand Up @@ -22,12 +22,9 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.cloud.spanner.SessionClient.SessionConsumer;
import com.google.cloud.spanner.SessionPool.SessionConsumerImpl;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.Empty;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
Expand Down Expand Up @@ -155,15 +152,14 @@ public ResultSet answer(InvocationOnMock invocation) throws Throwable {
});
when(mockResult.next()).thenReturn(true);
doAnswer(
new Answer<ApiFuture<Empty>>() {
new Answer<Void>() {

@Override
public ApiFuture<Empty> answer(InvocationOnMock invocation) throws Throwable {
public Void answer(InvocationOnMock invocation) throws Throwable {
synchronized (lock) {
if (expiredSessions.contains(session.getName())) {
return ApiFutures.immediateFailedFuture(
SpannerExceptionFactory.newSpannerException(
ErrorCode.NOT_FOUND, "Session not found"));
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.NOT_FOUND, "Session not found");
}
if (sessions.remove(session.getName()) == null) {
setFailed(closedSessions.get(session.getName()));
Expand All @@ -173,11 +169,11 @@ public ApiFuture<Empty> answer(InvocationOnMock invocation) throws Throwable {
minSessionsWhenSessionClosed = sessions.size();
}
}
return ApiFutures.immediateFuture(Empty.getDefaultInstance());
return null;
}
})
.when(session)
.asyncClose();
.close();

doAnswer(
new Answer<Void>() {
Expand Down
Expand Up @@ -32,8 +32,6 @@
import static org.mockito.Mockito.when;
import static org.mockito.MockitoAnnotations.initMocks;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.ReadContext.QueryAnalyzeMode;
import com.google.cloud.spanner.SessionClient.SessionConsumer;
Expand All @@ -47,7 +45,6 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.ByteString;
import com.google.protobuf.Empty;
import com.google.spanner.v1.CommitRequest;
import com.google.spanner.v1.ExecuteBatchDmlRequest;
import com.google.spanner.v1.ExecuteSqlRequest;
Expand Down Expand Up @@ -220,8 +217,8 @@ public void run() {
leakedSession.clearLeakedException();
session1.close();
pool.closeAsync().get(5L, TimeUnit.SECONDS);
verify(mockSession1).asyncClose();
verify(mockSession2).asyncClose();
verify(mockSession1).close();
verify(mockSession2).close();
}

@Test
Expand Down Expand Up @@ -877,16 +874,16 @@ public void run() {
.asyncBatchCreateSessions(Mockito.eq(1), any(SessionConsumer.class));
for (Session session : new Session[] {session1, session2, session3}) {
doAnswer(
new Answer<ApiFuture<Empty>>() {
new Answer<Void>() {

@Override
public ApiFuture<Empty> answer(InvocationOnMock invocation) throws Throwable {
public Void answer(InvocationOnMock invocation) throws Throwable {
numSessionClosed.incrementAndGet();
return ApiFutures.immediateFuture(Empty.getDefaultInstance());
return null;
}
})
.when(session)
.asyncClose();
.close();
}
FakeClock clock = new FakeClock();
clock.currentTimeMillis = System.currentTimeMillis();
Expand Down Expand Up @@ -1164,8 +1161,6 @@ public void testSessionNotFoundReadWriteTransaction() {
SpannerRpc.StreamingCall closedStreamingCall = mock(SpannerRpc.StreamingCall.class);
doThrow(sessionNotFound).when(closedStreamingCall).request(Mockito.anyInt());
SpannerRpc rpc = mock(SpannerRpc.class);
when(rpc.asyncDeleteSession(Mockito.anyString(), Mockito.anyMap()))
.thenReturn(ApiFutures.immediateFuture(Empty.getDefaultInstance()));
when(rpc.executeQuery(
any(ExecuteSqlRequest.class), any(ResultStreamConsumer.class), any(Map.class)))
.thenReturn(closedStreamingCall);
Expand All @@ -1182,17 +1177,13 @@ public void testSessionNotFoundReadWriteTransaction() {
hasPreparedTransaction ? ByteString.copyFromUtf8("test-txn") : null;
final TransactionContextImpl closedTransactionContext =
new TransactionContextImpl(closedSession, preparedTransactionId, rpc, 10);
when(closedSession.asyncClose())
.thenReturn(ApiFutures.immediateFuture(Empty.getDefaultInstance()));
when(closedSession.newTransaction()).thenReturn(closedTransactionContext);
when(closedSession.beginTransaction()).thenThrow(sessionNotFound);
TransactionRunnerImpl closedTransactionRunner =
new TransactionRunnerImpl(closedSession, rpc, 10);
when(closedSession.readWriteTransaction()).thenReturn(closedTransactionRunner);

final SessionImpl openSession = mock(SessionImpl.class);
when(openSession.asyncClose())
.thenReturn(ApiFutures.immediateFuture(Empty.getDefaultInstance()));
when(openSession.getName())
.thenReturn("projects/dummy/instances/dummy/database/dummy/sessions/session-open");
final TransactionContextImpl openTransactionContext = mock(TransactionContextImpl.class);
Expand Down