From 5aef6c3f6d3e9564cb8728ad51718feb6b64475a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Knut=20Olav=20L=C3=B8ite?= Date: Thu, 25 Jun 2020 12:20:21 +0200 Subject: [PATCH] feat(deps): adopt flatten plugin and google-cloud-shared-dependencies and update ExecutorProvider (#302) * feat(deps): adopt flatten plugin and google-cloud-shared-dependencies * fix: change executor after gax update * tests: create a new Thread to get default group * tests: get threads through Thread.getAllStackTraces() Co-authored-by: yangnuoyu --- google-cloud-spanner/pom.xml | 1 - .../cloud/spanner/spi/v1/GapicSpannerRpc.java | 9 +- ...est.java => SpannerOptionsThreadTest.java} | 122 ++++++++---------- .../connection/AbstractMockServerTest.java | 4 + grpc-google-cloud-spanner-v1/pom.xml | 1 - pom.xml | 17 ++- 6 files changed, 81 insertions(+), 73 deletions(-) rename google-cloud-spanner/src/test/java/com/google/cloud/spanner/{it/ITSpannerOptionsTest.java => SpannerOptionsThreadTest.java} (65%) diff --git a/google-cloud-spanner/pom.xml b/google-cloud-spanner/pom.xml index 23118dd6f0..206ed5e316 100644 --- a/google-cloud-spanner/pom.xml +++ b/google-cloud-spanner/pom.xml @@ -336,7 +336,6 @@ javax.annotation javax.annotation-api - 1.3.2 diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java index 97f4b5c88a..890aebed2d 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java @@ -163,7 +163,8 @@ public class GapicSpannerRpc implements SpannerRpc { * down when the {@link SpannerRpc} is closed. */ private static final class ManagedInstantiatingExecutorProvider implements ExecutorProvider { - private static final int DEFAULT_THREAD_COUNT = 4; + // 4 Gapic clients * 4 channels per client. + private static final int DEFAULT_MIN_THREAD_COUNT = 16; private final List executors = new LinkedList<>(); private final ThreadFactory threadFactory; @@ -178,8 +179,10 @@ public boolean shouldAutoClose() { @Override public ScheduledExecutorService getExecutor() { + int numCpus = Runtime.getRuntime().availableProcessors(); + int numThreads = Math.max(DEFAULT_MIN_THREAD_COUNT, numCpus); ScheduledExecutorService executor = - new ScheduledThreadPoolExecutor(DEFAULT_THREAD_COUNT, threadFactory); + new ScheduledThreadPoolExecutor(numThreads, threadFactory); synchronized (this) { executors.add(executor); } @@ -298,7 +301,7 @@ public GapicSpannerRpc(final SpannerOptions options) { .setMaxInboundMessageSize(MAX_MESSAGE_SIZE) .setMaxInboundMetadataSize(MAX_METADATA_SIZE) .setPoolSize(options.getNumChannels()) - .setExecutorProvider(executorProvider) + .setExecutor(executorProvider.getExecutor()) // Set a keepalive time of 120 seconds to help long running // commit GRPC calls succeed diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITSpannerOptionsTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsThreadTest.java similarity index 65% rename from google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITSpannerOptionsTest.java rename to google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsThreadTest.java index 34e6c43944..8626f5cb5c 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITSpannerOptionsTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsThreadTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 Google LLC + * Copyright 2020 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,55 +14,58 @@ * limitations under the License. */ -package com.google.cloud.spanner.it; +package com.google.cloud.spanner; import static com.google.common.truth.Truth.assertThat; -import com.google.cloud.spanner.Database; -import com.google.cloud.spanner.DatabaseAdminClient; -import com.google.cloud.spanner.DatabaseClient; -import com.google.cloud.spanner.InstanceAdminClient; -import com.google.cloud.spanner.IntegrationTestEnv; -import com.google.cloud.spanner.ParallelIntegrationTest; -import com.google.cloud.spanner.ResultSet; -import com.google.cloud.spanner.Spanner; -import com.google.cloud.spanner.SpannerOptions; -import com.google.cloud.spanner.Statement; +import com.google.api.core.ApiFunction; +import com.google.cloud.NoCredentials; +import com.google.cloud.spanner.connection.AbstractMockServerTest; import com.google.common.base.Stopwatch; +import com.google.spanner.admin.database.v1.ListDatabasesResponse; +import com.google.spanner.admin.instance.v1.ListInstancesResponse; +import io.grpc.ManagedChannelBuilder; import java.util.ArrayList; import java.util.List; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.ClassRule; import org.junit.Test; -import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -@Category(ParallelIntegrationTest.class) @RunWith(JUnit4.class) -public class ITSpannerOptionsTest { - @ClassRule public static IntegrationTestEnv env = new IntegrationTestEnv(); - private static Database db; - - @BeforeClass - public static void setUp() throws Exception { - db = env.getTestHelper().createTestDatabase(); - } - - @AfterClass - public static void tearDown() throws Exception { - db.drop(); - } - +public class SpannerOptionsThreadTest extends AbstractMockServerTest { private static final int NUMBER_OF_TEST_RUNS = 2; - private static final int DEFAULT_NUM_CHANNELS = 4; - private static final int NUM_THREADS_PER_CHANNEL = 4; + private static final int DEFAULT_NUM_CHANNELS_PER_GAPIC_CLIENT = 4; + private static final int NUM_GAPIC_CLIENTS = 4; + private static final int NUM_THREADS = + Math.max( + DEFAULT_NUM_CHANNELS_PER_GAPIC_CLIENT * NUM_GAPIC_CLIENTS, + Runtime.getRuntime().availableProcessors()); private static final String SPANNER_THREAD_NAME = "Cloud-Spanner-TransportChannel"; private static final String THREAD_PATTERN = "%s-[0-9]+"; + private final DatabaseId dbId = DatabaseId.of("p", "i", "d"); + + @SuppressWarnings("rawtypes") + private SpannerOptions createOptions() { + return SpannerOptions.newBuilder() + .setProjectId("p") + // Set a custom channel configurator to allow http instead of https. + .setChannelConfigurator( + new ApiFunction() { + @Override + public ManagedChannelBuilder apply(ManagedChannelBuilder input) { + input.usePlaintext(); + return input; + } + }) + .setHost("http://localhost:" + getPort()) + .setCredentials(NoCredentials.getInstance()) + .build(); + } + @Test public void testCloseAllThreadsWhenClosingSpanner() throws InterruptedException { int baseThreadCount = getNumberOfThreadsWithName(SPANNER_THREAD_NAME); @@ -72,18 +75,15 @@ public void testCloseAllThreadsWhenClosingSpanner() throws InterruptedException // Create Spanner instance. // We make a copy of the options instance, as SpannerOptions caches any service object // that has been handed out. - SpannerOptions options = env.getTestHelper().getOptions().toBuilder().build(); + SpannerOptions options = createOptions(); Spanner spanner = options.getService(); // Get a database client and do a query. This should initiate threads for the Spanner service. - DatabaseClient client = spanner.getDatabaseClient(db.getId()); + DatabaseClient client = spanner.getDatabaseClient(dbId); List resultSets = new ArrayList<>(); // SpannerStub affiliates a channel with a session, so we need to use multiple sessions // to ensure we also hit multiple channels. for (int i2 = 0; i2 < options.getSessionPoolOptions().getMaxSessions(); i2++) { - ResultSet rs = - client - .singleUse() - .executeQuery(Statement.of("SELECT 1 AS COL1 UNION ALL SELECT 2 AS COL2")); + ResultSet rs = client.singleUse().executeQuery(SELECT_COUNT_STATEMENT); // Execute ResultSet#next() to send the query to Spanner. rs.next(); // Delay closing the result set in order to force the use of multiple sessions. @@ -91,8 +91,7 @@ public void testCloseAllThreadsWhenClosingSpanner() throws InterruptedException // sessions should initialize multiple transport channels. resultSets.add(rs); // Check whether the number of expected threads has been reached. - if (getNumberOfThreadsWithName(SPANNER_THREAD_NAME) - == DEFAULT_NUM_CHANNELS * NUM_THREADS_PER_CHANNEL + baseThreadCount) { + if (getNumberOfThreadsWithName(SPANNER_THREAD_NAME) == NUM_THREADS + baseThreadCount) { break; } } @@ -102,25 +101,27 @@ public void testCloseAllThreadsWhenClosingSpanner() throws InterruptedException // Check the number of threads after the query. Doing a request should initialize a thread // pool for the underlying SpannerClient. assertThat(getNumberOfThreadsWithName(SPANNER_THREAD_NAME)) - .isEqualTo(DEFAULT_NUM_CHANNELS * NUM_THREADS_PER_CHANNEL + baseThreadCount); + .isEqualTo(NUM_THREADS + baseThreadCount); // Then do a request to the InstanceAdmin service and check the number of threads. // Doing a request should initialize a thread pool for the underlying InstanceAdminClient. - for (int i2 = 0; i2 < DEFAULT_NUM_CHANNELS * 2; i2++) { + for (int i2 = 0; i2 < DEFAULT_NUM_CHANNELS_PER_GAPIC_CLIENT * 2; i2++) { InstanceAdminClient instanceAdminClient = spanner.getInstanceAdminClient(); + mockInstanceAdmin.addResponse(ListInstancesResponse.getDefaultInstance()); instanceAdminClient.listInstances(); } assertThat(getNumberOfThreadsWithName(SPANNER_THREAD_NAME)) - .isEqualTo(2 * DEFAULT_NUM_CHANNELS * NUM_THREADS_PER_CHANNEL + baseThreadCount); + .isEqualTo(NUM_THREADS + baseThreadCount); // Then do a request to the DatabaseAdmin service and check the number of threads. // Doing a request should initialize a thread pool for the underlying DatabaseAdminClient. - for (int i2 = 0; i2 < DEFAULT_NUM_CHANNELS * 2; i2++) { + for (int i2 = 0; i2 < DEFAULT_NUM_CHANNELS_PER_GAPIC_CLIENT * 2; i2++) { DatabaseAdminClient databaseAdminClient = spanner.getDatabaseAdminClient(); - databaseAdminClient.listDatabases(db.getId().getInstanceId().getInstance()); + mockDatabaseAdmin.addResponse(ListDatabasesResponse.getDefaultInstance()); + databaseAdminClient.listDatabases(dbId.getInstanceId().getInstance()); } assertThat(getNumberOfThreadsWithName(SPANNER_THREAD_NAME)) - .isEqualTo(3 * DEFAULT_NUM_CHANNELS * NUM_THREADS_PER_CHANNEL + baseThreadCount); + .isEqualTo(NUM_THREADS + baseThreadCount); // Now close the Spanner instance and check whether the threads are shutdown or not. spanner.close(); @@ -138,23 +139,17 @@ public void testCloseAllThreadsWhenClosingSpanner() throws InterruptedException public void testMultipleSpannersFromSameSpannerOptions() throws InterruptedException { waitForStartup(); int baseThreadCount = getNumberOfThreadsWithName(SPANNER_THREAD_NAME); - SpannerOptions options = env.getTestHelper().getOptions().toBuilder().build(); + SpannerOptions options = createOptions(); try (Spanner spanner1 = options.getService()) { // Having both in the try-with-resources block is not possible, as it is the same instance. // One will be closed before the other, and the closing of the second instance would fail. Spanner spanner2 = options.getService(); assertThat(spanner1).isSameInstanceAs(spanner2); - DatabaseClient client1 = spanner1.getDatabaseClient(db.getId()); - DatabaseClient client2 = spanner2.getDatabaseClient(db.getId()); + DatabaseClient client1 = spanner1.getDatabaseClient(dbId); + DatabaseClient client2 = spanner2.getDatabaseClient(dbId); assertThat(client1).isSameInstanceAs(client2); - try (ResultSet rs1 = - client1 - .singleUse() - .executeQuery(Statement.of("SELECT 1 AS COL1 UNION ALL SELECT 2 AS COL2")); - ResultSet rs2 = - client2 - .singleUse() - .executeQuery(Statement.of("SELECT 1 AS COL1 UNION ALL SELECT 2 AS COL2")); ) { + try (ResultSet rs1 = client1.singleUse().executeQuery(SELECT_COUNT_STATEMENT); + ResultSet rs2 = client2.singleUse().executeQuery(SELECT_COUNT_STATEMENT)) { while (rs1.next() && rs2.next()) { // Do nothing, just consume the result sets. } @@ -181,15 +176,10 @@ private void waitForStartup() throws InterruptedException { private int getNumberOfThreadsWithName(String serviceName) { Pattern pattern = Pattern.compile(String.format(THREAD_PATTERN, serviceName)); - ThreadGroup group = Thread.currentThread().getThreadGroup(); - while (group.getParent() != null) { - group = group.getParent(); - } - Thread[] threads = new Thread[100 * NUMBER_OF_TEST_RUNS]; - int numberOfThreads = group.enumerate(threads); + Set threadSet = Thread.getAllStackTraces().keySet(); int res = 0; - for (int i = 0; i < numberOfThreads; i++) { - if (pattern.matcher(threads[i].getName()).matches()) { + for (Thread thread : threadSet) { + if (pattern.matcher(thread.getName()).matches()) { res++; } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/AbstractMockServerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/AbstractMockServerTest.java index 3497b42bc7..a54a5b848a 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/AbstractMockServerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/AbstractMockServerTest.java @@ -170,6 +170,10 @@ protected String getBaseUrl() { server.getPort()); } + protected int getPort() { + return server.getPort(); + } + protected ExecuteSqlRequest getLastExecuteSqlRequest() { List requests = mockSpanner.getRequests(); for (int i = requests.size() - 1; i >= 0; i--) { diff --git a/grpc-google-cloud-spanner-v1/pom.xml b/grpc-google-cloud-spanner-v1/pom.xml index 903e53bf94..aa85115cf4 100644 --- a/grpc-google-cloud-spanner-v1/pom.xml +++ b/grpc-google-cloud-spanner-v1/pom.xml @@ -53,7 +53,6 @@ javax.annotation javax.annotation-api - 1.3.2 diff --git a/pom.xml b/pom.xml index fd23c37515..a899259548 100644 --- a/pom.xml +++ b/pom.xml @@ -106,10 +106,23 @@ com.google.cloud google-cloud-shared-dependencies - 0.4.0 + 0.8.1 pom import + + + junit + junit + 4.13 + test + + + com.google.truth + truth + 1.0.1 + test + @@ -187,7 +200,7 @@ https://developers.google.com/protocol-buffers/docs/reference/java/ https://googleapis.dev/java/google-auth-library/latest/ https://googleapis.dev/java/gax/latest/ - https://googleapis.github.io/api-common-java/1.8.1/apidocs/ + https://googleapis.github.io/api-common-java/