Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(deps): adopt flatten plugin and google-cloud-shared-dependencies and update ExecutorProvider #302

Merged
merged 4 commits into from Jun 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 0 additions & 1 deletion google-cloud-spanner/pom.xml
Expand Up @@ -336,7 +336,6 @@
<dependency>
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
<version>1.3.2</version>
</dependency>
</dependencies>
</profile>
Expand Down
Expand Up @@ -163,7 +163,8 @@ public class GapicSpannerRpc implements SpannerRpc {
* down when the {@link SpannerRpc} is closed.
*/
private static final class ManagedInstantiatingExecutorProvider implements ExecutorProvider {
private static final int DEFAULT_THREAD_COUNT = 4;
// 4 Gapic clients * 4 channels per client.
private static final int DEFAULT_MIN_THREAD_COUNT = 16;
private final List<ScheduledExecutorService> executors = new LinkedList<>();
private final ThreadFactory threadFactory;

Expand All @@ -178,8 +179,10 @@ public boolean shouldAutoClose() {

@Override
public ScheduledExecutorService getExecutor() {
int numCpus = Runtime.getRuntime().availableProcessors();
int numThreads = Math.max(DEFAULT_MIN_THREAD_COUNT, numCpus);
ScheduledExecutorService executor =
new ScheduledThreadPoolExecutor(DEFAULT_THREAD_COUNT, threadFactory);
new ScheduledThreadPoolExecutor(numThreads, threadFactory);
synchronized (this) {
executors.add(executor);
}
Expand Down Expand Up @@ -298,7 +301,7 @@ public GapicSpannerRpc(final SpannerOptions options) {
.setMaxInboundMessageSize(MAX_MESSAGE_SIZE)
.setMaxInboundMetadataSize(MAX_METADATA_SIZE)
.setPoolSize(options.getNumChannels())
.setExecutorProvider(executorProvider)
.setExecutor(executorProvider.getExecutor())

// Set a keepalive time of 120 seconds to help long running
// commit GRPC calls succeed
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2019 Google LLC
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -14,55 +14,58 @@
* limitations under the License.
*/

package com.google.cloud.spanner.it;
package com.google.cloud.spanner;

import static com.google.common.truth.Truth.assertThat;

import com.google.cloud.spanner.Database;
import com.google.cloud.spanner.DatabaseAdminClient;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.InstanceAdminClient;
import com.google.cloud.spanner.IntegrationTestEnv;
import com.google.cloud.spanner.ParallelIntegrationTest;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.Spanner;
import com.google.cloud.spanner.SpannerOptions;
import com.google.cloud.spanner.Statement;
import com.google.api.core.ApiFunction;
import com.google.cloud.NoCredentials;
import com.google.cloud.spanner.connection.AbstractMockServerTest;
import com.google.common.base.Stopwatch;
import com.google.spanner.admin.database.v1.ListDatabasesResponse;
import com.google.spanner.admin.instance.v1.ListInstancesResponse;
import io.grpc.ManagedChannelBuilder;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@Category(ParallelIntegrationTest.class)
@RunWith(JUnit4.class)
public class ITSpannerOptionsTest {
@ClassRule public static IntegrationTestEnv env = new IntegrationTestEnv();
private static Database db;

@BeforeClass
public static void setUp() throws Exception {
db = env.getTestHelper().createTestDatabase();
}

@AfterClass
public static void tearDown() throws Exception {
db.drop();
}

public class SpannerOptionsThreadTest extends AbstractMockServerTest {
private static final int NUMBER_OF_TEST_RUNS = 2;
private static final int DEFAULT_NUM_CHANNELS = 4;
private static final int NUM_THREADS_PER_CHANNEL = 4;
private static final int DEFAULT_NUM_CHANNELS_PER_GAPIC_CLIENT = 4;
private static final int NUM_GAPIC_CLIENTS = 4;
private static final int NUM_THREADS =
Math.max(
DEFAULT_NUM_CHANNELS_PER_GAPIC_CLIENT * NUM_GAPIC_CLIENTS,
Runtime.getRuntime().availableProcessors());
private static final String SPANNER_THREAD_NAME = "Cloud-Spanner-TransportChannel";
private static final String THREAD_PATTERN = "%s-[0-9]+";

private final DatabaseId dbId = DatabaseId.of("p", "i", "d");

@SuppressWarnings("rawtypes")
private SpannerOptions createOptions() {
return SpannerOptions.newBuilder()
.setProjectId("p")
// Set a custom channel configurator to allow http instead of https.
.setChannelConfigurator(
new ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder>() {
@Override
public ManagedChannelBuilder apply(ManagedChannelBuilder input) {
input.usePlaintext();
return input;
}
})
.setHost("http://localhost:" + getPort())
.setCredentials(NoCredentials.getInstance())
.build();
}

@Test
public void testCloseAllThreadsWhenClosingSpanner() throws InterruptedException {
int baseThreadCount = getNumberOfThreadsWithName(SPANNER_THREAD_NAME);
Expand All @@ -72,27 +75,23 @@ public void testCloseAllThreadsWhenClosingSpanner() throws InterruptedException
// Create Spanner instance.
// We make a copy of the options instance, as SpannerOptions caches any service object
// that has been handed out.
SpannerOptions options = env.getTestHelper().getOptions().toBuilder().build();
SpannerOptions options = createOptions();
Spanner spanner = options.getService();
// Get a database client and do a query. This should initiate threads for the Spanner service.
DatabaseClient client = spanner.getDatabaseClient(db.getId());
DatabaseClient client = spanner.getDatabaseClient(dbId);
List<ResultSet> resultSets = new ArrayList<>();
// SpannerStub affiliates a channel with a session, so we need to use multiple sessions
// to ensure we also hit multiple channels.
for (int i2 = 0; i2 < options.getSessionPoolOptions().getMaxSessions(); i2++) {
ResultSet rs =
client
.singleUse()
.executeQuery(Statement.of("SELECT 1 AS COL1 UNION ALL SELECT 2 AS COL2"));
ResultSet rs = client.singleUse().executeQuery(SELECT_COUNT_STATEMENT);
// Execute ResultSet#next() to send the query to Spanner.
rs.next();
// Delay closing the result set in order to force the use of multiple sessions.
// As each session is linked to one transport channel, using multiple different
// sessions should initialize multiple transport channels.
resultSets.add(rs);
// Check whether the number of expected threads has been reached.
if (getNumberOfThreadsWithName(SPANNER_THREAD_NAME)
== DEFAULT_NUM_CHANNELS * NUM_THREADS_PER_CHANNEL + baseThreadCount) {
if (getNumberOfThreadsWithName(SPANNER_THREAD_NAME) == NUM_THREADS + baseThreadCount) {
break;
}
}
Expand All @@ -102,25 +101,27 @@ public void testCloseAllThreadsWhenClosingSpanner() throws InterruptedException
// Check the number of threads after the query. Doing a request should initialize a thread
// pool for the underlying SpannerClient.
assertThat(getNumberOfThreadsWithName(SPANNER_THREAD_NAME))
.isEqualTo(DEFAULT_NUM_CHANNELS * NUM_THREADS_PER_CHANNEL + baseThreadCount);
.isEqualTo(NUM_THREADS + baseThreadCount);

// Then do a request to the InstanceAdmin service and check the number of threads.
// Doing a request should initialize a thread pool for the underlying InstanceAdminClient.
for (int i2 = 0; i2 < DEFAULT_NUM_CHANNELS * 2; i2++) {
for (int i2 = 0; i2 < DEFAULT_NUM_CHANNELS_PER_GAPIC_CLIENT * 2; i2++) {
InstanceAdminClient instanceAdminClient = spanner.getInstanceAdminClient();
mockInstanceAdmin.addResponse(ListInstancesResponse.getDefaultInstance());
instanceAdminClient.listInstances();
}
assertThat(getNumberOfThreadsWithName(SPANNER_THREAD_NAME))
.isEqualTo(2 * DEFAULT_NUM_CHANNELS * NUM_THREADS_PER_CHANNEL + baseThreadCount);
.isEqualTo(NUM_THREADS + baseThreadCount);

// Then do a request to the DatabaseAdmin service and check the number of threads.
// Doing a request should initialize a thread pool for the underlying DatabaseAdminClient.
for (int i2 = 0; i2 < DEFAULT_NUM_CHANNELS * 2; i2++) {
for (int i2 = 0; i2 < DEFAULT_NUM_CHANNELS_PER_GAPIC_CLIENT * 2; i2++) {
DatabaseAdminClient databaseAdminClient = spanner.getDatabaseAdminClient();
databaseAdminClient.listDatabases(db.getId().getInstanceId().getInstance());
mockDatabaseAdmin.addResponse(ListDatabasesResponse.getDefaultInstance());
databaseAdminClient.listDatabases(dbId.getInstanceId().getInstance());
}
assertThat(getNumberOfThreadsWithName(SPANNER_THREAD_NAME))
.isEqualTo(3 * DEFAULT_NUM_CHANNELS * NUM_THREADS_PER_CHANNEL + baseThreadCount);
.isEqualTo(NUM_THREADS + baseThreadCount);

// Now close the Spanner instance and check whether the threads are shutdown or not.
spanner.close();
Expand All @@ -138,23 +139,17 @@ public void testCloseAllThreadsWhenClosingSpanner() throws InterruptedException
public void testMultipleSpannersFromSameSpannerOptions() throws InterruptedException {
waitForStartup();
int baseThreadCount = getNumberOfThreadsWithName(SPANNER_THREAD_NAME);
SpannerOptions options = env.getTestHelper().getOptions().toBuilder().build();
SpannerOptions options = createOptions();
try (Spanner spanner1 = options.getService()) {
// Having both in the try-with-resources block is not possible, as it is the same instance.
// One will be closed before the other, and the closing of the second instance would fail.
Spanner spanner2 = options.getService();
assertThat(spanner1).isSameInstanceAs(spanner2);
DatabaseClient client1 = spanner1.getDatabaseClient(db.getId());
DatabaseClient client2 = spanner2.getDatabaseClient(db.getId());
DatabaseClient client1 = spanner1.getDatabaseClient(dbId);
DatabaseClient client2 = spanner2.getDatabaseClient(dbId);
assertThat(client1).isSameInstanceAs(client2);
try (ResultSet rs1 =
client1
.singleUse()
.executeQuery(Statement.of("SELECT 1 AS COL1 UNION ALL SELECT 2 AS COL2"));
ResultSet rs2 =
client2
.singleUse()
.executeQuery(Statement.of("SELECT 1 AS COL1 UNION ALL SELECT 2 AS COL2")); ) {
try (ResultSet rs1 = client1.singleUse().executeQuery(SELECT_COUNT_STATEMENT);
ResultSet rs2 = client2.singleUse().executeQuery(SELECT_COUNT_STATEMENT)) {
while (rs1.next() && rs2.next()) {
// Do nothing, just consume the result sets.
}
Expand All @@ -181,15 +176,10 @@ private void waitForStartup() throws InterruptedException {

private int getNumberOfThreadsWithName(String serviceName) {
Pattern pattern = Pattern.compile(String.format(THREAD_PATTERN, serviceName));
ThreadGroup group = Thread.currentThread().getThreadGroup();
while (group.getParent() != null) {
group = group.getParent();
}
Thread[] threads = new Thread[100 * NUMBER_OF_TEST_RUNS];
int numberOfThreads = group.enumerate(threads);
Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
int res = 0;
for (int i = 0; i < numberOfThreads; i++) {
if (pattern.matcher(threads[i].getName()).matches()) {
for (Thread thread : threadSet) {
if (pattern.matcher(thread.getName()).matches()) {
res++;
}
}
Expand Down
Expand Up @@ -170,6 +170,10 @@ protected String getBaseUrl() {
server.getPort());
}

protected int getPort() {
return server.getPort();
}

protected ExecuteSqlRequest getLastExecuteSqlRequest() {
List<AbstractMessage> requests = mockSpanner.getRequests();
for (int i = requests.size() - 1; i >= 0; i--) {
Expand Down
1 change: 0 additions & 1 deletion grpc-google-cloud-spanner-v1/pom.xml
Expand Up @@ -53,7 +53,6 @@
<dependency>
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
<version>1.3.2</version>
</dependency>
</dependencies>
</profile>
Expand Down
17 changes: 15 additions & 2 deletions pom.xml
Expand Up @@ -106,10 +106,23 @@
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-shared-dependencies</artifactId>
<version>0.4.0</version>
<version>0.8.1</version>
<type>pom</type>
<scope>import</scope>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.truth</groupId>
<artifactId>truth</artifactId>
<version>1.0.1</version>
<scope>test</scope>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down Expand Up @@ -187,7 +200,7 @@
<link>https://developers.google.com/protocol-buffers/docs/reference/java/</link>
<link>https://googleapis.dev/java/google-auth-library/latest/</link>
<link>https://googleapis.dev/java/gax/latest/</link>
<link>https://googleapis.github.io/api-common-java/1.8.1/apidocs/</link>
<link>https://googleapis.github.io/api-common-java/</link>
</links>
</configuration>
</plugin>
Expand Down