diff --git a/google-cloud-spanner/pom.xml b/google-cloud-spanner/pom.xml
index 23118dd6f0..206ed5e316 100644
--- a/google-cloud-spanner/pom.xml
+++ b/google-cloud-spanner/pom.xml
@@ -336,7 +336,6 @@
javax.annotationjavax.annotation-api
- 1.3.2
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java
index 97f4b5c88a..890aebed2d 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java
@@ -163,7 +163,8 @@ public class GapicSpannerRpc implements SpannerRpc {
* down when the {@link SpannerRpc} is closed.
*/
private static final class ManagedInstantiatingExecutorProvider implements ExecutorProvider {
- private static final int DEFAULT_THREAD_COUNT = 4;
+ // 4 Gapic clients * 4 channels per client.
+ private static final int DEFAULT_MIN_THREAD_COUNT = 16;
private final List executors = new LinkedList<>();
private final ThreadFactory threadFactory;
@@ -178,8 +179,10 @@ public boolean shouldAutoClose() {
@Override
public ScheduledExecutorService getExecutor() {
+ int numCpus = Runtime.getRuntime().availableProcessors();
+ int numThreads = Math.max(DEFAULT_MIN_THREAD_COUNT, numCpus);
ScheduledExecutorService executor =
- new ScheduledThreadPoolExecutor(DEFAULT_THREAD_COUNT, threadFactory);
+ new ScheduledThreadPoolExecutor(numThreads, threadFactory);
synchronized (this) {
executors.add(executor);
}
@@ -298,7 +301,7 @@ public GapicSpannerRpc(final SpannerOptions options) {
.setMaxInboundMessageSize(MAX_MESSAGE_SIZE)
.setMaxInboundMetadataSize(MAX_METADATA_SIZE)
.setPoolSize(options.getNumChannels())
- .setExecutorProvider(executorProvider)
+ .setExecutor(executorProvider.getExecutor())
// Set a keepalive time of 120 seconds to help long running
// commit GRPC calls succeed
diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITSpannerOptionsTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsThreadTest.java
similarity index 65%
rename from google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITSpannerOptionsTest.java
rename to google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsThreadTest.java
index 34e6c43944..8626f5cb5c 100644
--- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITSpannerOptionsTest.java
+++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsThreadTest.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2019 Google LLC
+ * Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -14,55 +14,58 @@
* limitations under the License.
*/
-package com.google.cloud.spanner.it;
+package com.google.cloud.spanner;
import static com.google.common.truth.Truth.assertThat;
-import com.google.cloud.spanner.Database;
-import com.google.cloud.spanner.DatabaseAdminClient;
-import com.google.cloud.spanner.DatabaseClient;
-import com.google.cloud.spanner.InstanceAdminClient;
-import com.google.cloud.spanner.IntegrationTestEnv;
-import com.google.cloud.spanner.ParallelIntegrationTest;
-import com.google.cloud.spanner.ResultSet;
-import com.google.cloud.spanner.Spanner;
-import com.google.cloud.spanner.SpannerOptions;
-import com.google.cloud.spanner.Statement;
+import com.google.api.core.ApiFunction;
+import com.google.cloud.NoCredentials;
+import com.google.cloud.spanner.connection.AbstractMockServerTest;
import com.google.common.base.Stopwatch;
+import com.google.spanner.admin.database.v1.ListDatabasesResponse;
+import com.google.spanner.admin.instance.v1.ListInstancesResponse;
+import io.grpc.ManagedChannelBuilder;
import java.util.ArrayList;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
import org.junit.Test;
-import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
-@Category(ParallelIntegrationTest.class)
@RunWith(JUnit4.class)
-public class ITSpannerOptionsTest {
- @ClassRule public static IntegrationTestEnv env = new IntegrationTestEnv();
- private static Database db;
-
- @BeforeClass
- public static void setUp() throws Exception {
- db = env.getTestHelper().createTestDatabase();
- }
-
- @AfterClass
- public static void tearDown() throws Exception {
- db.drop();
- }
-
+public class SpannerOptionsThreadTest extends AbstractMockServerTest {
private static final int NUMBER_OF_TEST_RUNS = 2;
- private static final int DEFAULT_NUM_CHANNELS = 4;
- private static final int NUM_THREADS_PER_CHANNEL = 4;
+ private static final int DEFAULT_NUM_CHANNELS_PER_GAPIC_CLIENT = 4;
+ private static final int NUM_GAPIC_CLIENTS = 4;
+ private static final int NUM_THREADS =
+ Math.max(
+ DEFAULT_NUM_CHANNELS_PER_GAPIC_CLIENT * NUM_GAPIC_CLIENTS,
+ Runtime.getRuntime().availableProcessors());
private static final String SPANNER_THREAD_NAME = "Cloud-Spanner-TransportChannel";
private static final String THREAD_PATTERN = "%s-[0-9]+";
+ private final DatabaseId dbId = DatabaseId.of("p", "i", "d");
+
+ @SuppressWarnings("rawtypes")
+ private SpannerOptions createOptions() {
+ return SpannerOptions.newBuilder()
+ .setProjectId("p")
+ // Set a custom channel configurator to allow http instead of https.
+ .setChannelConfigurator(
+ new ApiFunction() {
+ @Override
+ public ManagedChannelBuilder apply(ManagedChannelBuilder input) {
+ input.usePlaintext();
+ return input;
+ }
+ })
+ .setHost("http://localhost:" + getPort())
+ .setCredentials(NoCredentials.getInstance())
+ .build();
+ }
+
@Test
public void testCloseAllThreadsWhenClosingSpanner() throws InterruptedException {
int baseThreadCount = getNumberOfThreadsWithName(SPANNER_THREAD_NAME);
@@ -72,18 +75,15 @@ public void testCloseAllThreadsWhenClosingSpanner() throws InterruptedException
// Create Spanner instance.
// We make a copy of the options instance, as SpannerOptions caches any service object
// that has been handed out.
- SpannerOptions options = env.getTestHelper().getOptions().toBuilder().build();
+ SpannerOptions options = createOptions();
Spanner spanner = options.getService();
// Get a database client and do a query. This should initiate threads for the Spanner service.
- DatabaseClient client = spanner.getDatabaseClient(db.getId());
+ DatabaseClient client = spanner.getDatabaseClient(dbId);
List resultSets = new ArrayList<>();
// SpannerStub affiliates a channel with a session, so we need to use multiple sessions
// to ensure we also hit multiple channels.
for (int i2 = 0; i2 < options.getSessionPoolOptions().getMaxSessions(); i2++) {
- ResultSet rs =
- client
- .singleUse()
- .executeQuery(Statement.of("SELECT 1 AS COL1 UNION ALL SELECT 2 AS COL2"));
+ ResultSet rs = client.singleUse().executeQuery(SELECT_COUNT_STATEMENT);
// Execute ResultSet#next() to send the query to Spanner.
rs.next();
// Delay closing the result set in order to force the use of multiple sessions.
@@ -91,8 +91,7 @@ public void testCloseAllThreadsWhenClosingSpanner() throws InterruptedException
// sessions should initialize multiple transport channels.
resultSets.add(rs);
// Check whether the number of expected threads has been reached.
- if (getNumberOfThreadsWithName(SPANNER_THREAD_NAME)
- == DEFAULT_NUM_CHANNELS * NUM_THREADS_PER_CHANNEL + baseThreadCount) {
+ if (getNumberOfThreadsWithName(SPANNER_THREAD_NAME) == NUM_THREADS + baseThreadCount) {
break;
}
}
@@ -102,25 +101,27 @@ public void testCloseAllThreadsWhenClosingSpanner() throws InterruptedException
// Check the number of threads after the query. Doing a request should initialize a thread
// pool for the underlying SpannerClient.
assertThat(getNumberOfThreadsWithName(SPANNER_THREAD_NAME))
- .isEqualTo(DEFAULT_NUM_CHANNELS * NUM_THREADS_PER_CHANNEL + baseThreadCount);
+ .isEqualTo(NUM_THREADS + baseThreadCount);
// Then do a request to the InstanceAdmin service and check the number of threads.
// Doing a request should initialize a thread pool for the underlying InstanceAdminClient.
- for (int i2 = 0; i2 < DEFAULT_NUM_CHANNELS * 2; i2++) {
+ for (int i2 = 0; i2 < DEFAULT_NUM_CHANNELS_PER_GAPIC_CLIENT * 2; i2++) {
InstanceAdminClient instanceAdminClient = spanner.getInstanceAdminClient();
+ mockInstanceAdmin.addResponse(ListInstancesResponse.getDefaultInstance());
instanceAdminClient.listInstances();
}
assertThat(getNumberOfThreadsWithName(SPANNER_THREAD_NAME))
- .isEqualTo(2 * DEFAULT_NUM_CHANNELS * NUM_THREADS_PER_CHANNEL + baseThreadCount);
+ .isEqualTo(NUM_THREADS + baseThreadCount);
// Then do a request to the DatabaseAdmin service and check the number of threads.
// Doing a request should initialize a thread pool for the underlying DatabaseAdminClient.
- for (int i2 = 0; i2 < DEFAULT_NUM_CHANNELS * 2; i2++) {
+ for (int i2 = 0; i2 < DEFAULT_NUM_CHANNELS_PER_GAPIC_CLIENT * 2; i2++) {
DatabaseAdminClient databaseAdminClient = spanner.getDatabaseAdminClient();
- databaseAdminClient.listDatabases(db.getId().getInstanceId().getInstance());
+ mockDatabaseAdmin.addResponse(ListDatabasesResponse.getDefaultInstance());
+ databaseAdminClient.listDatabases(dbId.getInstanceId().getInstance());
}
assertThat(getNumberOfThreadsWithName(SPANNER_THREAD_NAME))
- .isEqualTo(3 * DEFAULT_NUM_CHANNELS * NUM_THREADS_PER_CHANNEL + baseThreadCount);
+ .isEqualTo(NUM_THREADS + baseThreadCount);
// Now close the Spanner instance and check whether the threads are shutdown or not.
spanner.close();
@@ -138,23 +139,17 @@ public void testCloseAllThreadsWhenClosingSpanner() throws InterruptedException
public void testMultipleSpannersFromSameSpannerOptions() throws InterruptedException {
waitForStartup();
int baseThreadCount = getNumberOfThreadsWithName(SPANNER_THREAD_NAME);
- SpannerOptions options = env.getTestHelper().getOptions().toBuilder().build();
+ SpannerOptions options = createOptions();
try (Spanner spanner1 = options.getService()) {
// Having both in the try-with-resources block is not possible, as it is the same instance.
// One will be closed before the other, and the closing of the second instance would fail.
Spanner spanner2 = options.getService();
assertThat(spanner1).isSameInstanceAs(spanner2);
- DatabaseClient client1 = spanner1.getDatabaseClient(db.getId());
- DatabaseClient client2 = spanner2.getDatabaseClient(db.getId());
+ DatabaseClient client1 = spanner1.getDatabaseClient(dbId);
+ DatabaseClient client2 = spanner2.getDatabaseClient(dbId);
assertThat(client1).isSameInstanceAs(client2);
- try (ResultSet rs1 =
- client1
- .singleUse()
- .executeQuery(Statement.of("SELECT 1 AS COL1 UNION ALL SELECT 2 AS COL2"));
- ResultSet rs2 =
- client2
- .singleUse()
- .executeQuery(Statement.of("SELECT 1 AS COL1 UNION ALL SELECT 2 AS COL2")); ) {
+ try (ResultSet rs1 = client1.singleUse().executeQuery(SELECT_COUNT_STATEMENT);
+ ResultSet rs2 = client2.singleUse().executeQuery(SELECT_COUNT_STATEMENT)) {
while (rs1.next() && rs2.next()) {
// Do nothing, just consume the result sets.
}
@@ -181,15 +176,10 @@ private void waitForStartup() throws InterruptedException {
private int getNumberOfThreadsWithName(String serviceName) {
Pattern pattern = Pattern.compile(String.format(THREAD_PATTERN, serviceName));
- ThreadGroup group = Thread.currentThread().getThreadGroup();
- while (group.getParent() != null) {
- group = group.getParent();
- }
- Thread[] threads = new Thread[100 * NUMBER_OF_TEST_RUNS];
- int numberOfThreads = group.enumerate(threads);
+ Set threadSet = Thread.getAllStackTraces().keySet();
int res = 0;
- for (int i = 0; i < numberOfThreads; i++) {
- if (pattern.matcher(threads[i].getName()).matches()) {
+ for (Thread thread : threadSet) {
+ if (pattern.matcher(thread.getName()).matches()) {
res++;
}
}
diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/AbstractMockServerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/AbstractMockServerTest.java
index 3497b42bc7..a54a5b848a 100644
--- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/AbstractMockServerTest.java
+++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/AbstractMockServerTest.java
@@ -170,6 +170,10 @@ protected String getBaseUrl() {
server.getPort());
}
+ protected int getPort() {
+ return server.getPort();
+ }
+
protected ExecuteSqlRequest getLastExecuteSqlRequest() {
List requests = mockSpanner.getRequests();
for (int i = requests.size() - 1; i >= 0; i--) {
diff --git a/grpc-google-cloud-spanner-v1/pom.xml b/grpc-google-cloud-spanner-v1/pom.xml
index 903e53bf94..aa85115cf4 100644
--- a/grpc-google-cloud-spanner-v1/pom.xml
+++ b/grpc-google-cloud-spanner-v1/pom.xml
@@ -53,7 +53,6 @@
javax.annotationjavax.annotation-api
- 1.3.2
diff --git a/pom.xml b/pom.xml
index fd23c37515..a899259548 100644
--- a/pom.xml
+++ b/pom.xml
@@ -106,10 +106,23 @@
com.google.cloudgoogle-cloud-shared-dependencies
- 0.4.0
+ 0.8.1pomimport
+
+
+ junit
+ junit
+ 4.13
+ test
+
+
+ com.google.truth
+ truth
+ 1.0.1
+ test
+
@@ -187,7 +200,7 @@
https://developers.google.com/protocol-buffers/docs/reference/java/
https://googleapis.dev/java/google-auth-library/latest/
https://googleapis.dev/java/gax/latest/
- https://googleapis.github.io/api-common-java/1.8.1/apidocs/
+ https://googleapis.github.io/api-common-java/