From ab250871cae51b3f496719d579db5bb6e263d5c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Knut=20Olav=20L=C3=B8ite?= Date: Thu, 23 Jan 2020 03:51:37 +0100 Subject: [PATCH] perf: close sessions async (#24) * perf: close sessions async Sessions should be closed using an async gRPC call in order to speed up the closing of a large session pool. Instead of using its own executor, which is limited to 8 worker threads, to execute asynchronous delete session calls, the session pool should use the asynchronous call option in gRPC. This allows a larger number of asynchronous delete session calls to be executed in parallel and speeds up closing a session pool with a large number of sessions. Testing against a real Cloud Spanner database with a session pool containing 1,000 sessions shows the following performance for closing the session pool: Before (3 runs): 6603ms 8169ms 8367ms After (3 runs): 1297ms 1710ms 1851ms Fixes #19. * fix: wait for test servers to terminate * remove tracing for async call * do not use directExecutor which could be a gRPC thread * fix: return failed future instead of throwing exception * fix: remove commented code --- .../com/google/cloud/spanner/Session.java | 9 +++ .../com/google/cloud/spanner/SessionImpl.java | 7 +++ .../com/google/cloud/spanner/SessionPool.java | 55 +++++++++---------- .../cloud/spanner/spi/v1/GapicSpannerRpc.java | 8 ++- .../cloud/spanner/spi/v1/SpannerRpc.java | 4 ++ .../cloud/spanner/BaseSessionPoolTest.java | 3 + .../cloud/spanner/DatabaseAdminGaxTest.java | 3 +- .../cloud/spanner/InstanceAdminGaxTest.java | 3 +- .../RetryOnInvalidatedSessionTest.java | 3 +- .../cloud/spanner/SessionPoolLeakTest.java | 11 +++- .../cloud/spanner/SessionPoolStressTest.java | 16 ++++-- .../google/cloud/spanner/SessionPoolTest.java | 21 +++++-- .../cloud/spanner/SpannerGaxRetryTest.java | 3 +- .../TransactionManagerAbortedTest.java | 3 +- .../spanner/TransactionManagerImplTest.java | 4 ++ .../spanner/TransactionRunnerImplTest.java | 4 ++ .../spanner/spi/v1/GapicSpannerRpcTest.java | 3 +- 17 files changed, 111 insertions(+), 49 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Session.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Session.java index 4eefefd1c5..7dd7803e34 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Session.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Session.java @@ -16,6 +16,9 @@ package com.google.cloud.spanner; +import com.google.api.core.ApiFuture; +import com.google.protobuf.Empty; + /** * A {@code Session} can be used to perform transactions that read and/or modify data in a Cloud * Spanner database. @@ -54,4 +57,10 @@ public interface Session extends DatabaseClient, AutoCloseable { @Override void close(); + + /** + * Closes the session asynchronously and returns the {@link ApiFuture} that can be used to monitor + * the operation progress. + */ + ApiFuture asyncClose(); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java index 5df62a9bae..3190286e29 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java @@ -19,6 +19,7 @@ import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException; import static com.google.common.base.Preconditions.checkNotNull; +import com.google.api.core.ApiFuture; import com.google.cloud.Timestamp; import com.google.cloud.spanner.AbstractReadContext.MultiUseReadOnlyTransaction; import com.google.cloud.spanner.AbstractReadContext.SingleReadContext; @@ -27,6 +28,7 @@ import com.google.cloud.spanner.spi.v1.SpannerRpc; import com.google.common.collect.Lists; import com.google.protobuf.ByteString; +import com.google.protobuf.Empty; import com.google.spanner.v1.BeginTransactionRequest; import com.google.spanner.v1.CommitRequest; import com.google.spanner.v1.CommitResponse; @@ -196,6 +198,11 @@ public void prepareReadWriteTransaction() { readyTransactionId = beginTransaction(); } + @Override + public ApiFuture asyncClose() { + return spanner.getRpc().asyncDeleteSession(name, options); + } + @Override public void close() { Span span = tracer.spanBuilder(SpannerImpl.DELETE_SESSION).startSpan(); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java index cdbf383d00..e501ee37a9 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java @@ -18,6 +18,8 @@ import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException; +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutures; import com.google.cloud.Timestamp; import com.google.cloud.grpc.GrpcTransportOptions; import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory; @@ -35,6 +37,7 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import com.google.common.util.concurrent.Uninterruptibles; +import com.google.protobuf.Empty; import io.opencensus.common.Scope; import io.opencensus.trace.Annotation; import io.opencensus.trace.AttributeValue; @@ -767,6 +770,12 @@ public TransactionRunner readWriteTransaction() { return new SessionPoolTransactionRunner(SessionPool.this, this); } + @Override + public ApiFuture asyncClose() { + close(); + return ApiFutures.immediateFuture(Empty.getDefaultInstance()); + } + @Override public void close() { synchronized (lock) { @@ -999,7 +1008,7 @@ private void closeIdleSessions(Instant currTime) { } for (PooledSession sess : sessionsToClose) { logger.log(Level.FINE, "Closing session {0}", sess.getName()); - closeSession(sess); + closeSessionAsync(sess); } } @@ -1612,37 +1621,27 @@ int totalSessions() { } } - private void closeSessionAsync(final PooledSession sess) { - executor.submit( + private ApiFuture closeSessionAsync(final PooledSession sess) { + ApiFuture res = sess.delegate.asyncClose(); + res.addListener( new Runnable() { @Override public void run() { - closeSession(sess); + synchronized (lock) { + allSessions.remove(sess); + if (isClosed()) { + decrementPendingClosures(1); + return; + } + // Create a new session if needed to unblock some waiter. + if (numWaiters() > numSessionsBeingCreated) { + createSessions(getAllowedCreateSessions(numWaiters() - numSessionsBeingCreated)); + } + } } - }); - } - - private void closeSession(PooledSession sess) { - try { - sess.delegate.close(); - } catch (SpannerException e) { - // Backend will delete these sessions after a while even if we fail to close them. - if (logger.isLoggable(Level.FINE)) { - logger.log(Level.FINE, "Failed to close session: " + sess.getName(), e); - } - } finally { - synchronized (lock) { - allSessions.remove(sess); - if (isClosed()) { - decrementPendingClosures(1); - return; - } - // Create a new session if needed to unblock some waiter. - if (numWaiters() > numSessionsBeingCreated) { - createSessions(getAllowedCreateSessions(numWaiters() - numSessionsBeingCreated)); - } - } - } + }, + executor); + return res; } private void prepareSession(final PooledSession sess) { diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java index 19f41618fe..b3a6c9b92b 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java @@ -18,6 +18,7 @@ import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException; +import com.google.api.core.ApiFuture; import com.google.api.core.NanoClock; import com.google.api.gax.core.CredentialsProvider; import com.google.api.gax.core.ExecutorProvider; @@ -523,9 +524,14 @@ public Session createSession( @Override public void deleteSession(String sessionName, @Nullable Map options) throws SpannerException { + get(asyncDeleteSession(sessionName, options)); + } + + @Override + public ApiFuture asyncDeleteSession(String sessionName, @Nullable Map options) { DeleteSessionRequest request = DeleteSessionRequest.newBuilder().setName(sessionName).build(); GrpcCallContext context = newCallContext(options, sessionName); - get(spannerStub.deleteSessionCallable().futureCall(request, context)); + return spannerStub.deleteSessionCallable().futureCall(request, context); } @Override diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java index d608b3730a..ec1a23a0bb 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java @@ -16,6 +16,7 @@ package com.google.cloud.spanner.spi.v1; +import com.google.api.core.ApiFuture; import com.google.api.gax.longrunning.OperationFuture; import com.google.cloud.ServiceRpc; import com.google.cloud.spanner.SpannerException; @@ -219,6 +220,9 @@ Session createSession( void deleteSession(String sessionName, @Nullable Map options) throws SpannerException; + ApiFuture asyncDeleteSession(String sessionName, @Nullable Map options) + throws SpannerException; + StreamingCall read( ReadRequest request, ResultStreamConsumer consumer, @Nullable Map options); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BaseSessionPoolTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BaseSessionPoolTest.java index df4a0bc445..c3f724edea 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BaseSessionPoolTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BaseSessionPoolTest.java @@ -23,8 +23,10 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; +import com.google.api.core.ApiFutures; import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory; import com.google.cloud.spanner.SessionPool.Clock; +import com.google.protobuf.Empty; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -61,6 +63,7 @@ SessionImpl mockSession() { when(session.getName()) .thenReturn( "projects/dummy/instances/dummy/database/dummy/sessions/session" + sessionIndex); + when(session.asyncClose()).thenReturn(ApiFutures.immediateFuture(Empty.getDefaultInstance())); sessionIndex++; return session; } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseAdminGaxTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseAdminGaxTest.java index 51f0cb21be..d24d136448 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseAdminGaxTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseAdminGaxTest.java @@ -216,8 +216,9 @@ public static void startStaticServer() throws IOException { } @AfterClass - public static void stopServer() { + public static void stopServer() throws InterruptedException { server.shutdown(); + server.awaitTermination(); } @Before diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InstanceAdminGaxTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InstanceAdminGaxTest.java index 6c14428f02..c8652051b1 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InstanceAdminGaxTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InstanceAdminGaxTest.java @@ -219,8 +219,9 @@ public static void startStaticServer() throws IOException { } @AfterClass - public static void stopServer() { + public static void stopServer() throws InterruptedException { server.shutdown(); + server.awaitTermination(); } @Before diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnInvalidatedSessionTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnInvalidatedSessionTest.java index 2a6cc327a8..72d537f11a 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnInvalidatedSessionTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnInvalidatedSessionTest.java @@ -172,9 +172,10 @@ public static void startStaticServer() throws IOException { } @AfterClass - public static void stopServer() { + public static void stopServer() throws InterruptedException { spannerClient.close(); server.shutdown(); + server.awaitTermination(); } @Before diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolLeakTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolLeakTest.java index dd5ecd1191..0c92f8d461 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolLeakTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolLeakTest.java @@ -29,6 +29,7 @@ import io.grpc.StatusRuntimeException; import io.grpc.inprocess.InProcessServerBuilder; import java.io.IOException; +import java.util.concurrent.ScheduledThreadPoolExecutor; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -55,13 +56,19 @@ public static void startStaticServer() throws IOException { mockSpanner = new MockSpannerServiceImpl(); mockSpanner.setAbortProbability(0.0D); // We don't want any unpredictable aborted transactions. String uniqueName = InProcessServerBuilder.generateName(); - server = InProcessServerBuilder.forName(uniqueName).addService(mockSpanner).build().start(); + server = + InProcessServerBuilder.forName(uniqueName) + .scheduledExecutorService(new ScheduledThreadPoolExecutor(1)) + .addService(mockSpanner) + .build() + .start(); channelProvider = LocalChannelProvider.create(uniqueName); } @AfterClass - public static void stopServer() { + public static void stopServer() throws InterruptedException { server.shutdown(); + server.awaitTermination(); } @Before diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolStressTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolStressTest.java index 75df0f061a..beb2e35cd1 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolStressTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolStressTest.java @@ -22,9 +22,12 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutures; import com.google.cloud.spanner.SessionClient.SessionConsumer; import com.google.cloud.spanner.SessionPool.SessionConsumerImpl; import com.google.common.util.concurrent.Uninterruptibles; +import com.google.protobuf.Empty; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -152,14 +155,15 @@ public ResultSet answer(InvocationOnMock invocation) throws Throwable { }); when(mockResult.next()).thenReturn(true); doAnswer( - new Answer() { + new Answer>() { @Override - public Void answer(InvocationOnMock invocation) throws Throwable { + public ApiFuture answer(InvocationOnMock invocation) throws Throwable { synchronized (lock) { if (expiredSessions.contains(session.getName())) { - throw SpannerExceptionFactory.newSpannerException( - ErrorCode.NOT_FOUND, "Session not found"); + return ApiFutures.immediateFailedFuture( + SpannerExceptionFactory.newSpannerException( + ErrorCode.NOT_FOUND, "Session not found")); } if (sessions.remove(session.getName()) == null) { setFailed(closedSessions.get(session.getName())); @@ -169,11 +173,11 @@ public Void answer(InvocationOnMock invocation) throws Throwable { minSessionsWhenSessionClosed = sessions.size(); } } - return null; + return ApiFutures.immediateFuture(Empty.getDefaultInstance()); } }) .when(session) - .close(); + .asyncClose(); doAnswer( new Answer() { diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java index 7ebc1b2923..853256f4ad 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java @@ -32,6 +32,8 @@ import static org.mockito.Mockito.when; import static org.mockito.MockitoAnnotations.initMocks; +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutures; import com.google.cloud.Timestamp; import com.google.cloud.spanner.ReadContext.QueryAnalyzeMode; import com.google.cloud.spanner.SessionClient.SessionConsumer; @@ -45,6 +47,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.Uninterruptibles; import com.google.protobuf.ByteString; +import com.google.protobuf.Empty; import com.google.spanner.v1.CommitRequest; import com.google.spanner.v1.ExecuteBatchDmlRequest; import com.google.spanner.v1.ExecuteSqlRequest; @@ -217,8 +220,8 @@ public void run() { leakedSession.clearLeakedException(); session1.close(); pool.closeAsync().get(5L, TimeUnit.SECONDS); - verify(mockSession1).close(); - verify(mockSession2).close(); + verify(mockSession1).asyncClose(); + verify(mockSession2).asyncClose(); } @Test @@ -874,16 +877,16 @@ public void run() { .asyncBatchCreateSessions(Mockito.eq(1), any(SessionConsumer.class)); for (Session session : new Session[] {session1, session2, session3}) { doAnswer( - new Answer() { + new Answer>() { @Override - public Void answer(InvocationOnMock invocation) throws Throwable { + public ApiFuture answer(InvocationOnMock invocation) throws Throwable { numSessionClosed.incrementAndGet(); - return null; + return ApiFutures.immediateFuture(Empty.getDefaultInstance()); } }) .when(session) - .close(); + .asyncClose(); } FakeClock clock = new FakeClock(); clock.currentTimeMillis = System.currentTimeMillis(); @@ -1161,6 +1164,8 @@ public void testSessionNotFoundReadWriteTransaction() { SpannerRpc.StreamingCall closedStreamingCall = mock(SpannerRpc.StreamingCall.class); doThrow(sessionNotFound).when(closedStreamingCall).request(Mockito.anyInt()); SpannerRpc rpc = mock(SpannerRpc.class); + when(rpc.asyncDeleteSession(Mockito.anyString(), Mockito.anyMap())) + .thenReturn(ApiFutures.immediateFuture(Empty.getDefaultInstance())); when(rpc.executeQuery( any(ExecuteSqlRequest.class), any(ResultStreamConsumer.class), any(Map.class))) .thenReturn(closedStreamingCall); @@ -1177,6 +1182,8 @@ public void testSessionNotFoundReadWriteTransaction() { hasPreparedTransaction ? ByteString.copyFromUtf8("test-txn") : null; final TransactionContextImpl closedTransactionContext = new TransactionContextImpl(closedSession, preparedTransactionId, rpc, 10); + when(closedSession.asyncClose()) + .thenReturn(ApiFutures.immediateFuture(Empty.getDefaultInstance())); when(closedSession.newTransaction()).thenReturn(closedTransactionContext); when(closedSession.beginTransaction()).thenThrow(sessionNotFound); TransactionRunnerImpl closedTransactionRunner = @@ -1184,6 +1191,8 @@ public void testSessionNotFoundReadWriteTransaction() { when(closedSession.readWriteTransaction()).thenReturn(closedTransactionRunner); final SessionImpl openSession = mock(SessionImpl.class); + when(openSession.asyncClose()) + .thenReturn(ApiFutures.immediateFuture(Empty.getDefaultInstance())); when(openSession.getName()) .thenReturn("projects/dummy/instances/dummy/database/dummy/sessions/session-open"); final TransactionContextImpl openTransactionContext = mock(TransactionContextImpl.class); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerGaxRetryTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerGaxRetryTest.java index ef57cf7cb9..54d7ad5a12 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerGaxRetryTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerGaxRetryTest.java @@ -122,8 +122,9 @@ public static void startStaticServer() throws IOException { } @AfterClass - public static void stopServer() { + public static void stopServer() throws InterruptedException { server.shutdown(); + server.awaitTermination(); } @Before diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerAbortedTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerAbortedTest.java index b926460769..8272ac4aa8 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerAbortedTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerAbortedTest.java @@ -160,9 +160,10 @@ public static void startStaticServer() throws IOException { } @AfterClass - public static void stopServer() { + public static void stopServer() throws InterruptedException { spannerClient.close(); server.shutdown(); + server.awaitTermination(); } @Before diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java index 9b0dcdf4a7..4b064a047d 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java @@ -25,12 +25,14 @@ import static org.mockito.Mockito.when; import static org.mockito.MockitoAnnotations.initMocks; +import com.google.api.core.ApiFutures; import com.google.cloud.Timestamp; import com.google.cloud.grpc.GrpcTransportOptions; import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory; import com.google.cloud.spanner.TransactionManager.TransactionState; import com.google.cloud.spanner.spi.v1.SpannerRpc; import com.google.protobuf.ByteString; +import com.google.protobuf.Empty; import com.google.spanner.v1.BeginTransactionRequest; import com.google.spanner.v1.CommitRequest; import com.google.spanner.v1.CommitResponse; @@ -198,6 +200,8 @@ public void usesPreparedTransaction() { when(options.getSessionPoolOptions()).thenReturn(sessionPoolOptions); when(options.getSessionLabels()).thenReturn(Collections.emptyMap()); SpannerRpc rpc = mock(SpannerRpc.class); + when(rpc.asyncDeleteSession(Mockito.anyString(), Mockito.anyMap())) + .thenReturn(ApiFutures.immediateFuture(Empty.getDefaultInstance())); when(rpc.batchCreateSessions( Mockito.anyString(), Mockito.eq(1), Mockito.anyMap(), Mockito.anyMap())) .thenAnswer( diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java index 35cdefa0bb..e46a74b96b 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java @@ -24,6 +24,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.google.api.core.ApiFutures; import com.google.cloud.grpc.GrpcTransportOptions; import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory; import com.google.cloud.spanner.TransactionRunner.TransactionCallable; @@ -32,6 +33,7 @@ import com.google.common.base.Preconditions; import com.google.protobuf.ByteString; import com.google.protobuf.Duration; +import com.google.protobuf.Empty; import com.google.protobuf.Timestamp; import com.google.rpc.Code; import com.google.rpc.RetryInfo; @@ -107,6 +109,8 @@ public void usesPreparedTransaction() { when(options.getSessionPoolOptions()).thenReturn(sessionPoolOptions); when(options.getSessionLabels()).thenReturn(Collections.emptyMap()); SpannerRpc rpc = mock(SpannerRpc.class); + when(rpc.asyncDeleteSession(Mockito.anyString(), Mockito.anyMap())) + .thenReturn(ApiFutures.immediateFuture(Empty.getDefaultInstance())); when(rpc.batchCreateSessions( Mockito.anyString(), Mockito.eq(1), Mockito.anyMap(), Mockito.anyMap())) .thenAnswer( diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpcTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpcTest.java index 413799bc2c..3d9917ff5c 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpcTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpcTest.java @@ -163,8 +163,9 @@ public ServerCall.Listener interceptCall( } @After - public void stopServer() { + public void stopServer() throws InterruptedException { server.shutdown(); + server.awaitTermination(); } private static final int NUMBER_OF_TEST_RUNS = 2;