Skip to content

Commit

Permalink
feat(deps): adopt flatten plugin and google-cloud-shared-dependencies…
Browse files Browse the repository at this point in the history
… and update ExecutorProvider (#302)

* feat(deps): adopt flatten plugin and google-cloud-shared-dependencies

* fix: change executor after gax update

* tests: create a new Thread to get default group

* tests: get threads through Thread.getAllStackTraces()

Co-authored-by: yangnuoyu <yangnuoyu@google.com>
  • Loading branch information
olavloite and yangnuoyu committed Jun 25, 2020
1 parent 8536a3e commit 5aef6c3
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 73 deletions.
1 change: 0 additions & 1 deletion google-cloud-spanner/pom.xml
Expand Up @@ -336,7 +336,6 @@
<dependency>
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
<version>1.3.2</version>
</dependency>
</dependencies>
</profile>
Expand Down
Expand Up @@ -163,7 +163,8 @@ public class GapicSpannerRpc implements SpannerRpc {
* down when the {@link SpannerRpc} is closed.
*/
private static final class ManagedInstantiatingExecutorProvider implements ExecutorProvider {
private static final int DEFAULT_THREAD_COUNT = 4;
// 4 Gapic clients * 4 channels per client.
private static final int DEFAULT_MIN_THREAD_COUNT = 16;
private final List<ScheduledExecutorService> executors = new LinkedList<>();
private final ThreadFactory threadFactory;

Expand All @@ -178,8 +179,10 @@ public boolean shouldAutoClose() {

@Override
public ScheduledExecutorService getExecutor() {
int numCpus = Runtime.getRuntime().availableProcessors();
int numThreads = Math.max(DEFAULT_MIN_THREAD_COUNT, numCpus);
ScheduledExecutorService executor =
new ScheduledThreadPoolExecutor(DEFAULT_THREAD_COUNT, threadFactory);
new ScheduledThreadPoolExecutor(numThreads, threadFactory);
synchronized (this) {
executors.add(executor);
}
Expand Down Expand Up @@ -298,7 +301,7 @@ public GapicSpannerRpc(final SpannerOptions options) {
.setMaxInboundMessageSize(MAX_MESSAGE_SIZE)
.setMaxInboundMetadataSize(MAX_METADATA_SIZE)
.setPoolSize(options.getNumChannels())
.setExecutorProvider(executorProvider)
.setExecutor(executorProvider.getExecutor())

// Set a keepalive time of 120 seconds to help long running
// commit GRPC calls succeed
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2019 Google LLC
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -14,55 +14,58 @@
* limitations under the License.
*/

package com.google.cloud.spanner.it;
package com.google.cloud.spanner;

import static com.google.common.truth.Truth.assertThat;

import com.google.cloud.spanner.Database;
import com.google.cloud.spanner.DatabaseAdminClient;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.InstanceAdminClient;
import com.google.cloud.spanner.IntegrationTestEnv;
import com.google.cloud.spanner.ParallelIntegrationTest;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.Spanner;
import com.google.cloud.spanner.SpannerOptions;
import com.google.cloud.spanner.Statement;
import com.google.api.core.ApiFunction;
import com.google.cloud.NoCredentials;
import com.google.cloud.spanner.connection.AbstractMockServerTest;
import com.google.common.base.Stopwatch;
import com.google.spanner.admin.database.v1.ListDatabasesResponse;
import com.google.spanner.admin.instance.v1.ListInstancesResponse;
import io.grpc.ManagedChannelBuilder;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@Category(ParallelIntegrationTest.class)
@RunWith(JUnit4.class)
public class ITSpannerOptionsTest {
@ClassRule public static IntegrationTestEnv env = new IntegrationTestEnv();
private static Database db;

@BeforeClass
public static void setUp() throws Exception {
db = env.getTestHelper().createTestDatabase();
}

@AfterClass
public static void tearDown() throws Exception {
db.drop();
}

public class SpannerOptionsThreadTest extends AbstractMockServerTest {
private static final int NUMBER_OF_TEST_RUNS = 2;
private static final int DEFAULT_NUM_CHANNELS = 4;
private static final int NUM_THREADS_PER_CHANNEL = 4;
private static final int DEFAULT_NUM_CHANNELS_PER_GAPIC_CLIENT = 4;
private static final int NUM_GAPIC_CLIENTS = 4;
private static final int NUM_THREADS =
Math.max(
DEFAULT_NUM_CHANNELS_PER_GAPIC_CLIENT * NUM_GAPIC_CLIENTS,
Runtime.getRuntime().availableProcessors());
private static final String SPANNER_THREAD_NAME = "Cloud-Spanner-TransportChannel";
private static final String THREAD_PATTERN = "%s-[0-9]+";

private final DatabaseId dbId = DatabaseId.of("p", "i", "d");

@SuppressWarnings("rawtypes")
private SpannerOptions createOptions() {
return SpannerOptions.newBuilder()
.setProjectId("p")
// Set a custom channel configurator to allow http instead of https.
.setChannelConfigurator(
new ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder>() {
@Override
public ManagedChannelBuilder apply(ManagedChannelBuilder input) {
input.usePlaintext();
return input;
}
})
.setHost("http://localhost:" + getPort())
.setCredentials(NoCredentials.getInstance())
.build();
}

@Test
public void testCloseAllThreadsWhenClosingSpanner() throws InterruptedException {
int baseThreadCount = getNumberOfThreadsWithName(SPANNER_THREAD_NAME);
Expand All @@ -72,27 +75,23 @@ public void testCloseAllThreadsWhenClosingSpanner() throws InterruptedException
// Create Spanner instance.
// We make a copy of the options instance, as SpannerOptions caches any service object
// that has been handed out.
SpannerOptions options = env.getTestHelper().getOptions().toBuilder().build();
SpannerOptions options = createOptions();
Spanner spanner = options.getService();
// Get a database client and do a query. This should initiate threads for the Spanner service.
DatabaseClient client = spanner.getDatabaseClient(db.getId());
DatabaseClient client = spanner.getDatabaseClient(dbId);
List<ResultSet> resultSets = new ArrayList<>();
// SpannerStub affiliates a channel with a session, so we need to use multiple sessions
// to ensure we also hit multiple channels.
for (int i2 = 0; i2 < options.getSessionPoolOptions().getMaxSessions(); i2++) {
ResultSet rs =
client
.singleUse()
.executeQuery(Statement.of("SELECT 1 AS COL1 UNION ALL SELECT 2 AS COL2"));
ResultSet rs = client.singleUse().executeQuery(SELECT_COUNT_STATEMENT);
// Execute ResultSet#next() to send the query to Spanner.
rs.next();
// Delay closing the result set in order to force the use of multiple sessions.
// As each session is linked to one transport channel, using multiple different
// sessions should initialize multiple transport channels.
resultSets.add(rs);
// Check whether the number of expected threads has been reached.
if (getNumberOfThreadsWithName(SPANNER_THREAD_NAME)
== DEFAULT_NUM_CHANNELS * NUM_THREADS_PER_CHANNEL + baseThreadCount) {
if (getNumberOfThreadsWithName(SPANNER_THREAD_NAME) == NUM_THREADS + baseThreadCount) {
break;
}
}
Expand All @@ -102,25 +101,27 @@ public void testCloseAllThreadsWhenClosingSpanner() throws InterruptedException
// Check the number of threads after the query. Doing a request should initialize a thread
// pool for the underlying SpannerClient.
assertThat(getNumberOfThreadsWithName(SPANNER_THREAD_NAME))
.isEqualTo(DEFAULT_NUM_CHANNELS * NUM_THREADS_PER_CHANNEL + baseThreadCount);
.isEqualTo(NUM_THREADS + baseThreadCount);

// Then do a request to the InstanceAdmin service and check the number of threads.
// Doing a request should initialize a thread pool for the underlying InstanceAdminClient.
for (int i2 = 0; i2 < DEFAULT_NUM_CHANNELS * 2; i2++) {
for (int i2 = 0; i2 < DEFAULT_NUM_CHANNELS_PER_GAPIC_CLIENT * 2; i2++) {
InstanceAdminClient instanceAdminClient = spanner.getInstanceAdminClient();
mockInstanceAdmin.addResponse(ListInstancesResponse.getDefaultInstance());
instanceAdminClient.listInstances();
}
assertThat(getNumberOfThreadsWithName(SPANNER_THREAD_NAME))
.isEqualTo(2 * DEFAULT_NUM_CHANNELS * NUM_THREADS_PER_CHANNEL + baseThreadCount);
.isEqualTo(NUM_THREADS + baseThreadCount);

// Then do a request to the DatabaseAdmin service and check the number of threads.
// Doing a request should initialize a thread pool for the underlying DatabaseAdminClient.
for (int i2 = 0; i2 < DEFAULT_NUM_CHANNELS * 2; i2++) {
for (int i2 = 0; i2 < DEFAULT_NUM_CHANNELS_PER_GAPIC_CLIENT * 2; i2++) {
DatabaseAdminClient databaseAdminClient = spanner.getDatabaseAdminClient();
databaseAdminClient.listDatabases(db.getId().getInstanceId().getInstance());
mockDatabaseAdmin.addResponse(ListDatabasesResponse.getDefaultInstance());
databaseAdminClient.listDatabases(dbId.getInstanceId().getInstance());
}
assertThat(getNumberOfThreadsWithName(SPANNER_THREAD_NAME))
.isEqualTo(3 * DEFAULT_NUM_CHANNELS * NUM_THREADS_PER_CHANNEL + baseThreadCount);
.isEqualTo(NUM_THREADS + baseThreadCount);

// Now close the Spanner instance and check whether the threads are shutdown or not.
spanner.close();
Expand All @@ -138,23 +139,17 @@ public void testCloseAllThreadsWhenClosingSpanner() throws InterruptedException
public void testMultipleSpannersFromSameSpannerOptions() throws InterruptedException {
waitForStartup();
int baseThreadCount = getNumberOfThreadsWithName(SPANNER_THREAD_NAME);
SpannerOptions options = env.getTestHelper().getOptions().toBuilder().build();
SpannerOptions options = createOptions();
try (Spanner spanner1 = options.getService()) {
// Having both in the try-with-resources block is not possible, as it is the same instance.
// One will be closed before the other, and the closing of the second instance would fail.
Spanner spanner2 = options.getService();
assertThat(spanner1).isSameInstanceAs(spanner2);
DatabaseClient client1 = spanner1.getDatabaseClient(db.getId());
DatabaseClient client2 = spanner2.getDatabaseClient(db.getId());
DatabaseClient client1 = spanner1.getDatabaseClient(dbId);
DatabaseClient client2 = spanner2.getDatabaseClient(dbId);
assertThat(client1).isSameInstanceAs(client2);
try (ResultSet rs1 =
client1
.singleUse()
.executeQuery(Statement.of("SELECT 1 AS COL1 UNION ALL SELECT 2 AS COL2"));
ResultSet rs2 =
client2
.singleUse()
.executeQuery(Statement.of("SELECT 1 AS COL1 UNION ALL SELECT 2 AS COL2")); ) {
try (ResultSet rs1 = client1.singleUse().executeQuery(SELECT_COUNT_STATEMENT);
ResultSet rs2 = client2.singleUse().executeQuery(SELECT_COUNT_STATEMENT)) {
while (rs1.next() && rs2.next()) {
// Do nothing, just consume the result sets.
}
Expand All @@ -181,15 +176,10 @@ private void waitForStartup() throws InterruptedException {

private int getNumberOfThreadsWithName(String serviceName) {
Pattern pattern = Pattern.compile(String.format(THREAD_PATTERN, serviceName));
ThreadGroup group = Thread.currentThread().getThreadGroup();
while (group.getParent() != null) {
group = group.getParent();
}
Thread[] threads = new Thread[100 * NUMBER_OF_TEST_RUNS];
int numberOfThreads = group.enumerate(threads);
Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
int res = 0;
for (int i = 0; i < numberOfThreads; i++) {
if (pattern.matcher(threads[i].getName()).matches()) {
for (Thread thread : threadSet) {
if (pattern.matcher(thread.getName()).matches()) {
res++;
}
}
Expand Down
Expand Up @@ -170,6 +170,10 @@ protected String getBaseUrl() {
server.getPort());
}

protected int getPort() {
return server.getPort();
}

protected ExecuteSqlRequest getLastExecuteSqlRequest() {
List<AbstractMessage> requests = mockSpanner.getRequests();
for (int i = requests.size() - 1; i >= 0; i--) {
Expand Down
1 change: 0 additions & 1 deletion grpc-google-cloud-spanner-v1/pom.xml
Expand Up @@ -53,7 +53,6 @@
<dependency>
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
<version>1.3.2</version>
</dependency>
</dependencies>
</profile>
Expand Down
17 changes: 15 additions & 2 deletions pom.xml
Expand Up @@ -106,10 +106,23 @@
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-shared-dependencies</artifactId>
<version>0.4.0</version>
<version>0.8.1</version>
<type>pom</type>
<scope>import</scope>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.truth</groupId>
<artifactId>truth</artifactId>
<version>1.0.1</version>
<scope>test</scope>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down Expand Up @@ -187,7 +200,7 @@
<link>https://developers.google.com/protocol-buffers/docs/reference/java/</link>
<link>https://googleapis.dev/java/google-auth-library/latest/</link>
<link>https://googleapis.dev/java/gax/latest/</link>
<link>https://googleapis.github.io/api-common-java/1.8.1/apidocs/</link>
<link>https://googleapis.github.io/api-common-java/</link>
</links>
</configuration>
</plugin>
Expand Down

0 comments on commit 5aef6c3

Please sign in to comment.