diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java index 2c68fd317e..c7432ea40d 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java @@ -18,6 +18,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import java.util.Objects; import org.threeten.bp.Duration; /** Options for the session pool used by {@code DatabaseClient}. */ @@ -63,6 +64,48 @@ private SessionPoolOptions(Builder builder) { this.removeInactiveSessionAfter = builder.removeInactiveSessionAfter; } + @Override + public boolean equals(Object o) { + if (!(o instanceof SessionPoolOptions)) { + return false; + } + SessionPoolOptions other = (SessionPoolOptions) o; + return Objects.equals(this.minSessions, other.minSessions) + && Objects.equals(this.maxSessions, other.maxSessions) + && Objects.equals(this.incStep, other.incStep) + && Objects.equals(this.maxIdleSessions, other.maxIdleSessions) + && Objects.equals(this.writeSessionsFraction, other.writeSessionsFraction) + && Objects.equals(this.actionOnExhaustion, other.actionOnExhaustion) + && Objects.equals(this.actionOnSessionNotFound, other.actionOnSessionNotFound) + && Objects.equals(this.actionOnSessionLeak, other.actionOnSessionLeak) + && Objects.equals( + this.initialWaitForSessionTimeoutMillis, other.initialWaitForSessionTimeoutMillis) + && Objects.equals(this.loopFrequency, other.loopFrequency) + && Objects.equals(this.keepAliveIntervalMinutes, other.keepAliveIntervalMinutes) + && Objects.equals(this.removeInactiveSessionAfter, other.removeInactiveSessionAfter); + } + + @Override + public int hashCode() { + return Objects.hash( + this.minSessions, + this.maxSessions, + this.incStep, + this.maxIdleSessions, + this.writeSessionsFraction, + this.actionOnExhaustion, + this.actionOnSessionNotFound, + this.actionOnSessionLeak, + this.initialWaitForSessionTimeoutMillis, + this.loopFrequency, + this.keepAliveIntervalMinutes, + this.removeInactiveSessionAfter); + } + + public Builder toBuilder() { + return new Builder(this); + } + public int getMinSessions() { return minSessions; } @@ -165,6 +208,24 @@ public static class Builder { private int keepAliveIntervalMinutes = 30; private Duration removeInactiveSessionAfter = Duration.ofMinutes(55L); + public Builder() {} + + private Builder(SessionPoolOptions options) { + this.minSessionsSet = true; + this.minSessions = options.minSessions; + this.maxSessions = options.maxSessions; + this.incStep = options.incStep; + this.maxIdleSessions = options.maxIdleSessions; + this.writeSessionsFraction = options.writeSessionsFraction; + this.actionOnExhaustion = options.actionOnExhaustion; + this.initialWaitForSessionTimeoutMillis = options.initialWaitForSessionTimeoutMillis; + this.actionOnSessionNotFound = options.actionOnSessionNotFound; + this.actionOnSessionLeak = options.actionOnSessionLeak; + this.loopFrequency = options.loopFrequency; + this.keepAliveIntervalMinutes = options.keepAliveIntervalMinutes; + this.removeInactiveSessionAfter = options.removeInactiveSessionAfter; + } + /** * Minimum number of sessions that this pool will always maintain. These will be created eagerly * in parallel. Defaults to 100. diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionOptions.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionOptions.java index a668342f1b..a688454f30 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionOptions.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionOptions.java @@ -150,6 +150,8 @@ public String[] getValidValues() { static final boolean DEFAULT_RETRY_ABORTS_INTERNALLY = true; private static final String DEFAULT_CREDENTIALS = null; private static final String DEFAULT_OAUTH_TOKEN = null; + private static final String DEFAULT_MIN_SESSIONS = null; + private static final String DEFAULT_MAX_SESSIONS = null; private static final String DEFAULT_NUM_CHANNELS = null; private static final String DEFAULT_USER_AGENT = null; private static final String DEFAULT_OPTIMIZER_VERSION = ""; @@ -172,6 +174,10 @@ public String[] getValidValues() { * OAuth token to use for authentication. Cannot be used in combination with a credentials file. */ public static final String OAUTH_TOKEN_PROPERTY_NAME = "oauthToken"; + /** Name of the 'minSessions' connection property. */ + public static final String MIN_SESSIONS_PROPERTY_NAME = "minSessions"; + /** Name of the 'numChannels' connection property. */ + public static final String MAX_SESSIONS_PROPERTY_NAME = "maxSessions"; /** Name of the 'numChannels' connection property. */ public static final String NUM_CHANNELS_PROPERTY_NAME = "numChannels"; /** Custom user agent string is only for other Google libraries. */ @@ -204,6 +210,12 @@ public String[] getValidValues() { ConnectionProperty.createStringProperty( OAUTH_TOKEN_PROPERTY_NAME, "A valid pre-existing OAuth token to use for authentication for this connection. Setting this property will take precedence over any value set for a credentials file."), + ConnectionProperty.createStringProperty( + MIN_SESSIONS_PROPERTY_NAME, + "The minimum number of sessions in the backing session pool. The default is 100."), + ConnectionProperty.createStringProperty( + MAX_SESSIONS_PROPERTY_NAME, + "The maximum number of sessions in the backing session pool. The default is 400."), ConnectionProperty.createStringProperty( NUM_CHANNELS_PROPERTY_NAME, "The number of gRPC channels to use to communicate with Cloud Spanner. The default is 4."), @@ -327,6 +339,9 @@ private boolean isValidUri(String uri) { * true. *
  • readonly (boolean): Sets the initial readonly mode for the connection. Default is * false. + *
  • minSessions (int): Sets the minimum number of sessions in the backing session pool. + *
  • maxSessions (int): Sets the maximum number of sessions in the backing session pool. + *
  • numChannels (int): Sets the number of gRPC channels to use for the connection. *
  • retryAbortsInternally (boolean): Sets the initial retryAbortsInternally mode for the * connection. Default is true. *
  • optimizerVersion (string): Sets the query optimizer version to use for the connection. @@ -437,6 +452,8 @@ public static Builder newBuilder() { private final Credentials credentials; private final SessionPoolOptions sessionPoolOptions; private final Integer numChannels; + private final Integer minSessions; + private final Integer maxSessions; private final String userAgent; private final QueryOptions queryOptions; @@ -453,7 +470,6 @@ private ConnectionOptions(Builder builder) { this.warnings = checkValidProperties(builder.uri); this.uri = builder.uri; - this.sessionPoolOptions = builder.sessionPoolOptions; this.credentialsUrl = builder.credentialsUrl != null ? builder.credentialsUrl : parseCredentials(builder.uri); this.oauthToken = @@ -492,19 +508,12 @@ private ConnectionOptions(Builder builder) { } else { this.credentials = getCredentialsService().createCredentials(this.credentialsUrl); } - String numChannelsValue = parseNumChannels(builder.uri); - if (numChannelsValue != null) { - try { - this.numChannels = Integer.valueOf(numChannelsValue); - } catch (NumberFormatException e) { - throw SpannerExceptionFactory.newSpannerException( - ErrorCode.INVALID_ARGUMENT, - "Invalid numChannels value specified: " + numChannelsValue, - e); - } - } else { - this.numChannels = null; - } + this.minSessions = + parseIntegerProperty(MIN_SESSIONS_PROPERTY_NAME, parseMinSessions(builder.uri)); + this.maxSessions = + parseIntegerProperty(MAX_SESSIONS_PROPERTY_NAME, parseMaxSessions(builder.uri)); + this.numChannels = + parseIntegerProperty(NUM_CHANNELS_PROPERTY_NAME, parseNumChannels(builder.uri)); String projectId = matcher.group(Builder.PROJECT_GROUP); if (Builder.DEFAULT_PROJECT_ID_PLACEHOLDER.equalsIgnoreCase(projectId)) { @@ -518,6 +527,36 @@ private ConnectionOptions(Builder builder) { this.statementExecutionInterceptors = Collections.unmodifiableList(builder.statementExecutionInterceptors); this.configurator = builder.configurator; + + if (this.minSessions != null || this.maxSessions != null) { + SessionPoolOptions.Builder sessionPoolOptionsBuilder = + builder.sessionPoolOptions == null + ? SessionPoolOptions.newBuilder() + : builder.sessionPoolOptions.toBuilder(); + if (this.minSessions != null) { + sessionPoolOptionsBuilder.setMinSessions(this.minSessions); + } + if (this.maxSessions != null) { + sessionPoolOptionsBuilder.setMaxSessions(this.maxSessions); + } + this.sessionPoolOptions = sessionPoolOptionsBuilder.build(); + } else { + this.sessionPoolOptions = builder.sessionPoolOptions; + } + } + + private static Integer parseIntegerProperty(String propertyName, String value) { + if (value != null) { + try { + return Integer.valueOf(value); + } catch (NumberFormatException e) { + throw SpannerExceptionFactory.newSpannerException( + ErrorCode.INVALID_ARGUMENT, + String.format("Invalid %s value specified: %s", propertyName, value), + e); + } + } + return null; } SpannerOptionsConfigurator getConfigurator() { @@ -565,6 +604,18 @@ static String parseOAuthToken(String uri) { return value != null ? value : DEFAULT_OAUTH_TOKEN; } + @VisibleForTesting + static String parseMinSessions(String uri) { + String value = parseUriProperty(uri, MIN_SESSIONS_PROPERTY_NAME); + return value != null ? value : DEFAULT_MIN_SESSIONS; + } + + @VisibleForTesting + static String parseMaxSessions(String uri) { + String value = parseUriProperty(uri, MAX_SESSIONS_PROPERTY_NAME); + return value != null ? value : DEFAULT_MAX_SESSIONS; + } + @VisibleForTesting static String parseNumChannels(String uri) { String value = parseUriProperty(uri, NUM_CHANNELS_PROPERTY_NAME); @@ -671,6 +722,24 @@ public SessionPoolOptions getSessionPoolOptions() { return sessionPoolOptions; } + /** + * The minimum number of sessions in the backing session pool of this connection. The session pool + * is shared between all connections in the same JVM that connect to the same Cloud Spanner + * database using the same connection settings. + */ + public Integer getMinSessions() { + return minSessions; + } + + /** + * The maximum number of sessions in the backing session pool of this connection. The session pool + * is shared between all connections in the same JVM that connect to the same Cloud Spanner + * database using the same connection settings. + */ + public Integer getMaxSessions() { + return maxSessions; + } + /** The number of channels to use for the connection. */ public Integer getNumChannels() { return numChannels; diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/SpannerPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/SpannerPool.java index de351c87c9..be5b16e393 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/SpannerPool.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/SpannerPool.java @@ -148,7 +148,8 @@ static class SpannerPoolKey { private final boolean usePlainText; private final String userAgent; - private static SpannerPoolKey of(ConnectionOptions options) { + @VisibleForTesting + static SpannerPoolKey of(ConnectionOptions options) { return new SpannerPoolKey(options); } @@ -156,7 +157,10 @@ private SpannerPoolKey(ConnectionOptions options) { this.host = options.getHost(); this.projectId = options.getProjectId(); this.credentialsKey = CredentialsKey.create(options); - this.sessionPoolOptions = options.getSessionPoolOptions(); + this.sessionPoolOptions = + options.getSessionPoolOptions() == null + ? SessionPoolOptions.newBuilder().build() + : options.getSessionPoolOptions(); this.numChannels = options.getNumChannels(); this.usePlainText = options.isUsePlainText(); this.userAgent = options.getUserAgent(); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ConnectionOptionsTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ConnectionOptionsTest.java index b2f4ea086c..46452d3fc0 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ConnectionOptionsTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ConnectionOptionsTest.java @@ -421,4 +421,28 @@ public void testLenient() { assertThat(e.getMessage()).contains("bar"); } } + + @Test + public void testMinSessions() { + ConnectionOptions options = + ConnectionOptions.newBuilder() + .setUri( + "cloudspanner:/projects/test-project-123/instances/test-instance/databases/test-database?minSessions=400") + .setCredentialsUrl(FILE_TEST_PATH) + .build(); + assertThat(options.getMinSessions()).isEqualTo(400); + assertThat(options.getSessionPoolOptions().getMinSessions()).isEqualTo(400); + } + + @Test + public void testMaxSessions() { + ConnectionOptions options = + ConnectionOptions.newBuilder() + .setUri( + "cloudspanner:/projects/test-project-123/instances/test-instance/databases/test-database?maxSessions=4000") + .setCredentialsUrl(FILE_TEST_PATH) + .build(); + assertThat(options.getMaxSessions()).isEqualTo(4000); + assertThat(options.getSessionPoolOptions().getMaxSessions()).isEqualTo(4000); + } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ConnectionTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ConnectionTest.java index de820ccbcc..4a26721fe8 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ConnectionTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ConnectionTest.java @@ -19,156 +19,241 @@ import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.fail; +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutures; import com.google.cloud.spanner.AbortedException; import com.google.cloud.spanner.ErrorCode; import com.google.cloud.spanner.ResultSet; import com.google.cloud.spanner.SpannerException; import com.google.cloud.spanner.SpannerOptions; import com.google.cloud.spanner.Statement; +import com.google.common.base.Predicate; import com.google.common.collect.ImmutableList; +import com.google.protobuf.AbstractMessage; +import com.google.spanner.v1.BatchCreateSessionsRequest; import com.google.spanner.v1.ExecuteSqlRequest; import com.google.spanner.v1.ExecuteSqlRequest.QueryOptions; +import java.util.Arrays; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.junit.AfterClass; import org.junit.Test; +import org.junit.experimental.runners.Enclosed; +import org.junit.runner.RunWith; -public class ConnectionTest extends AbstractMockServerTest { - @Test - public void testDefaultOptimizerVersion() { - try (Connection connection = createConnection()) { - try (ResultSet rs = - connection.executeQuery(Statement.of("SHOW VARIABLE OPTIMIZER_VERSION"))) { - assertThat(rs.next()).isTrue(); - assertThat(rs.getString("OPTIMIZER_VERSION")).isEqualTo(""); - assertThat(rs.next()).isFalse(); - } - } - } +@RunWith(Enclosed.class) +public class ConnectionTest { - @Test - public void testUseOptimizerVersionFromEnvironment() { - try { - SpannerOptions.useEnvironment( - new SpannerOptions.SpannerEnvironment() { - @Override - public String getOptimizerVersion() { - return "20"; - } - }); + public static class DefaultConnectionOptionsTest extends AbstractMockServerTest { + @Test + public void testDefaultOptimizerVersion() { try (Connection connection = createConnection()) { - // Do a query and verify that the version from the environment is used. - try (ResultSet rs = connection.executeQuery(SELECT_COUNT_STATEMENT)) { - assertThat(rs.next()).isTrue(); - assertThat(rs.getLong(0)).isEqualTo(COUNT_BEFORE_INSERT); - assertThat(rs.next()).isFalse(); - // Verify query options from the environment. - ExecuteSqlRequest request = getLastExecuteSqlRequest(); - assertThat(request.getQueryOptions().getOptimizerVersion()).isEqualTo("20"); - } - // Now set one of the query options on the connection. That option should be used in - // combination with the other option from the environment. - connection.execute(Statement.of("SET OPTIMIZER_VERSION='30'")); - try (ResultSet rs = connection.executeQuery(SELECT_COUNT_STATEMENT)) { - assertThat(rs.next()).isTrue(); - assertThat(rs.getLong(0)).isEqualTo(COUNT_BEFORE_INSERT); - assertThat(rs.next()).isFalse(); - - ExecuteSqlRequest request = getLastExecuteSqlRequest(); - // Optimizer version should come from the connection. - assertThat(request.getQueryOptions().getOptimizerVersion()).isEqualTo("30"); - } - // Now specify options directly for the query. These should override both the environment - // and what is set on the connection. try (ResultSet rs = - connection.executeQuery( - Statement.newBuilder(SELECT_COUNT_STATEMENT.getSql()) - .withQueryOptions( - QueryOptions.newBuilder() - .setOptimizerVersion("user-defined-version") - .build()) - .build())) { + connection.executeQuery(Statement.of("SHOW VARIABLE OPTIMIZER_VERSION"))) { assertThat(rs.next()).isTrue(); - assertThat(rs.getLong(0)).isEqualTo(COUNT_BEFORE_INSERT); + assertThat(rs.getString("OPTIMIZER_VERSION")).isEqualTo(""); assertThat(rs.next()).isFalse(); - - ExecuteSqlRequest request = getLastExecuteSqlRequest(); - // Optimizer version should come from the query. - assertThat(request.getQueryOptions().getOptimizerVersion()) - .isEqualTo("user-defined-version"); } } - } finally { - SpannerOptions.useDefaultEnvironment(); } - } - @Test - public void testExecuteInvalidBatchUpdate() { - try (Connection connection = createConnection()) { + @Test + public void testUseOptimizerVersionFromEnvironment() { try { - connection.executeBatchUpdate(ImmutableList.of(INSERT_STATEMENT, SELECT_RANDOM_STATEMENT)); - fail("Missing expected exception"); - } catch (SpannerException e) { - assertThat(e.getErrorCode()).isEqualTo(ErrorCode.INVALID_ARGUMENT); + SpannerOptions.useEnvironment( + new SpannerOptions.SpannerEnvironment() { + @Override + public String getOptimizerVersion() { + return "20"; + } + }); + try (Connection connection = createConnection()) { + // Do a query and verify that the version from the environment is used. + try (ResultSet rs = connection.executeQuery(SELECT_COUNT_STATEMENT)) { + assertThat(rs.next()).isTrue(); + assertThat(rs.getLong(0)).isEqualTo(COUNT_BEFORE_INSERT); + assertThat(rs.next()).isFalse(); + // Verify query options from the environment. + ExecuteSqlRequest request = getLastExecuteSqlRequest(); + assertThat(request.getQueryOptions().getOptimizerVersion()).isEqualTo("20"); + } + // Now set one of the query options on the connection. That option should be used in + // combination with the other option from the environment. + connection.execute(Statement.of("SET OPTIMIZER_VERSION='30'")); + try (ResultSet rs = connection.executeQuery(SELECT_COUNT_STATEMENT)) { + assertThat(rs.next()).isTrue(); + assertThat(rs.getLong(0)).isEqualTo(COUNT_BEFORE_INSERT); + assertThat(rs.next()).isFalse(); + + ExecuteSqlRequest request = getLastExecuteSqlRequest(); + // Optimizer version should come from the connection. + assertThat(request.getQueryOptions().getOptimizerVersion()).isEqualTo("30"); + } + // Now specify options directly for the query. These should override both the environment + // and what is set on the connection. + try (ResultSet rs = + connection.executeQuery( + Statement.newBuilder(SELECT_COUNT_STATEMENT.getSql()) + .withQueryOptions( + QueryOptions.newBuilder() + .setOptimizerVersion("user-defined-version") + .build()) + .build())) { + assertThat(rs.next()).isTrue(); + assertThat(rs.getLong(0)).isEqualTo(COUNT_BEFORE_INSERT); + assertThat(rs.next()).isFalse(); + + ExecuteSqlRequest request = getLastExecuteSqlRequest(); + // Optimizer version should come from the query. + assertThat(request.getQueryOptions().getOptimizerVersion()) + .isEqualTo("user-defined-version"); + } + } + } finally { + SpannerOptions.useDefaultEnvironment(); } } - } - @Test - public void testQueryAborted() { - try (Connection connection = createConnection()) { - connection.setRetryAbortsInternally(false); - for (boolean abort : new Boolean[] {true, false}) { + @Test + public void testExecuteInvalidBatchUpdate() { + try (Connection connection = createConnection()) { try { - if (abort) { - mockSpanner.abortNextStatement(); + connection.executeBatchUpdate( + ImmutableList.of(INSERT_STATEMENT, SELECT_RANDOM_STATEMENT)); + fail("Missing expected exception"); + } catch (SpannerException e) { + assertThat(e.getErrorCode()).isEqualTo(ErrorCode.INVALID_ARGUMENT); + } + } + } + + @Test + public void testQueryAborted() { + try (Connection connection = createConnection()) { + connection.setRetryAbortsInternally(false); + for (boolean abort : new Boolean[] {true, false}) { + try { + if (abort) { + mockSpanner.abortNextStatement(); + } + connection.executeQuery(SELECT_RANDOM_STATEMENT); + assertThat(abort).isFalse(); + connection.commit(); + } catch (AbortedException e) { + assertThat(abort).isTrue(); + connection.rollback(); } - connection.executeQuery(SELECT_RANDOM_STATEMENT); - assertThat(abort).isFalse(); - connection.commit(); - } catch (AbortedException e) { - assertThat(abort).isTrue(); - connection.rollback(); } } } - } - @Test - public void testUpdateAborted() { - try (Connection connection = createConnection()) { - connection.setRetryAbortsInternally(false); - for (boolean abort : new Boolean[] {true, false}) { - try { - if (abort) { - mockSpanner.abortNextStatement(); + @Test + public void testUpdateAborted() { + try (Connection connection = createConnection()) { + connection.setRetryAbortsInternally(false); + for (boolean abort : new Boolean[] {true, false}) { + try { + if (abort) { + mockSpanner.abortNextStatement(); + } + connection.executeUpdate(INSERT_STATEMENT); + assertThat(abort).isFalse(); + connection.commit(); + } catch (AbortedException e) { + assertThat(abort).isTrue(); + connection.rollback(); } - connection.executeUpdate(INSERT_STATEMENT); - assertThat(abort).isFalse(); - connection.commit(); - } catch (AbortedException e) { - assertThat(abort).isTrue(); - connection.rollback(); } } } - } - @Test - public void testBatchUpdateAborted() { - try (Connection connection = createConnection()) { - connection.setRetryAbortsInternally(false); - for (boolean abort : new Boolean[] {true, false}) { - try { - if (abort) { - mockSpanner.abortNextStatement(); + @Test + public void testBatchUpdateAborted() { + try (Connection connection = createConnection()) { + connection.setRetryAbortsInternally(false); + for (boolean abort : new Boolean[] {true, false}) { + try { + if (abort) { + mockSpanner.abortNextStatement(); + } + connection.executeBatchUpdate(ImmutableList.of(INSERT_STATEMENT, INSERT_STATEMENT)); + assertThat(abort).isFalse(); + connection.commit(); + } catch (AbortedException e) { + assertThat(abort).isTrue(); + connection.rollback(); } - connection.executeBatchUpdate(ImmutableList.of(INSERT_STATEMENT, INSERT_STATEMENT)); - assertThat(abort).isFalse(); - connection.commit(); - } catch (AbortedException e) { - assertThat(abort).isTrue(); - connection.rollback(); } } } } + + public static class ConnectionMinSessionsTest extends AbstractMockServerTest { + + @AfterClass + public static void reset() { + mockSpanner.reset(); + } + + protected String getBaseUrl() { + return super.getBaseUrl() + ";minSessions=1"; + } + + @Test + public void testMinSessions() throws InterruptedException, TimeoutException { + try (Connection connection = createConnection()) { + mockSpanner.waitForRequestsToContain( + new Predicate() { + @Override + public boolean apply(AbstractMessage input) { + return input instanceof BatchCreateSessionsRequest + && ((BatchCreateSessionsRequest) input).getSessionCount() == 1; + } + }, + 5000L); + } + } + } + + public static class ConnectionMaxSessionsTest extends AbstractMockServerTest { + + @AfterClass + public static void reset() { + mockSpanner.reset(); + } + + protected String getBaseUrl() { + return super.getBaseUrl() + ";maxSessions=1"; + } + + @Test + public void testMaxSessions() + throws InterruptedException, TimeoutException, ExecutionException { + try (Connection connection1 = createConnection(); + Connection connection2 = createConnection()) { + connection1.beginTransactionAsync(); + connection2.beginTransactionAsync(); + + ApiFuture count1 = connection1.executeUpdateAsync(INSERT_STATEMENT); + ApiFuture count2 = connection2.executeUpdateAsync(INSERT_STATEMENT); + + // Commit the transactions. Both should be able to finish, but both used the same session. + ApiFuture commit1 = connection1.commitAsync(); + ApiFuture commit2 = connection2.commitAsync(); + + // At least one transaction must wait until the other has finished before it can get a + // session. + assertThat(count1.isDone() && count2.isDone()).isFalse(); + assertThat(commit1.isDone() && commit2.isDone()).isFalse(); + + // Wait until both finishes. + ApiFutures.allAsList(Arrays.asList(commit1, commit2)).get(5L, TimeUnit.SECONDS); + + assertThat(count1.isDone()).isTrue(); + assertThat(count2.isDone()).isTrue(); + } + assertThat(mockSpanner.numSessionsCreated()).isEqualTo(1); + } + } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/SpannerPoolTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/SpannerPoolTest.java index afc0512b4e..19d6dfddcd 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/SpannerPoolTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/SpannerPoolTest.java @@ -16,10 +16,7 @@ package com.google.cloud.spanner.connection; -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.CoreMatchers.not; -import static org.hamcrest.MatcherAssert.assertThat; +import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -33,6 +30,7 @@ import com.google.cloud.spanner.SpannerException; import com.google.cloud.spanner.connection.ConnectionImpl.LeakedConnectionException; import com.google.cloud.spanner.connection.SpannerPool.CheckAndCloseSpannersMode; +import com.google.cloud.spanner.connection.SpannerPool.SpannerPoolKey; import com.google.common.base.Ticker; import com.google.common.testing.FakeTicker; import java.io.ByteArrayOutputStream; @@ -108,40 +106,40 @@ public void testGetSpanner() { // assert equal spanner1 = pool.getSpanner(options1, connection1); spanner2 = pool.getSpanner(options1, connection2); - assertThat(spanner1, is(equalTo(spanner2))); + assertThat(spanner1).isEqualTo(spanner2); spanner1 = pool.getSpanner(options2, connection1); spanner2 = pool.getSpanner(options2, connection2); - assertThat(spanner1, is(equalTo(spanner2))); + assertThat(spanner1).isEqualTo(spanner2); spanner1 = pool.getSpanner(options3, connection1); spanner2 = pool.getSpanner(options3, connection2); - assertThat(spanner1, is(equalTo(spanner2))); + assertThat(spanner1).isEqualTo(spanner2); spanner1 = pool.getSpanner(options4, connection1); spanner2 = pool.getSpanner(options4, connection2); - assertThat(spanner1, is(equalTo(spanner2))); + assertThat(spanner1).isEqualTo(spanner2); // Options 5 and 6 both use default credentials. spanner1 = pool.getSpanner(options5, connection1); spanner2 = pool.getSpanner(options6, connection2); - assertThat(spanner1, is(equalTo(spanner2))); + assertThat(spanner1).isEqualTo(spanner2); // assert not equal spanner1 = pool.getSpanner(options1, connection1); spanner2 = pool.getSpanner(options2, connection2); - assertThat(spanner1, not(equalTo(spanner2))); + assertThat(spanner1).isNotEqualTo(spanner2); spanner1 = pool.getSpanner(options1, connection1); spanner2 = pool.getSpanner(options3, connection2); - assertThat(spanner1, not(equalTo(spanner2))); + assertThat(spanner1).isNotEqualTo(spanner2); spanner1 = pool.getSpanner(options1, connection1); spanner2 = pool.getSpanner(options4, connection2); - assertThat(spanner1, not(equalTo(spanner2))); + assertThat(spanner1).isNotEqualTo(spanner2); spanner1 = pool.getSpanner(options2, connection1); spanner2 = pool.getSpanner(options3, connection2); - assertThat(spanner1, not(equalTo(spanner2))); + assertThat(spanner1).isNotEqualTo(spanner2); spanner1 = pool.getSpanner(options2, connection1); spanner2 = pool.getSpanner(options4, connection2); - assertThat(spanner1, not(equalTo(spanner2))); + assertThat(spanner1).isNotEqualTo(spanner2); spanner1 = pool.getSpanner(options3, connection1); spanner2 = pool.getSpanner(options4, connection2); - assertThat(spanner1, not(equalTo(spanner2))); + assertThat(spanner1).isNotEqualTo(spanner2); } @Test @@ -153,17 +151,17 @@ public void testRemoveConnection() { // assert equal spanner1 = pool.getSpanner(options1, connection1); spanner2 = pool.getSpanner(options1, connection2); - assertThat(spanner1, is(equalTo(spanner2))); + assertThat(spanner1).isEqualTo(spanner2); // one connection removed, assert that we would still get the same Spanner pool.removeConnection(options1, connection1); spanner1 = pool.getSpanner(options1, connection1); - assertThat(spanner1, is(equalTo(spanner2))); + assertThat(spanner1).isEqualTo(spanner2); // remove two connections, assert that we would still get the same Spanner, as Spanners are not // directly closed and removed. pool.removeConnection(options1, connection1); pool.removeConnection(options1, connection2); spanner1 = pool.getSpanner(options1, connection1); - assertThat(spanner1, is(equalTo(spanner2))); + assertThat(spanner1).isEqualTo(spanner2); // remove the last connection again pool.removeConnection(options1, connection1); } @@ -200,7 +198,7 @@ public void testRemoveConnectionOptionsNotRegistered() { pool.getSpanner(options1, connection1); pool.removeConnection(options2, connection1); String capturedLog = getTestCapturedLog(); - assertThat(capturedLog.contains(expectedLogPart), is(true)); + assertThat(capturedLog.contains(expectedLogPart)).isTrue(); } @Test @@ -211,7 +209,7 @@ public void testRemoveConnectionConnectionNotRegistered() { pool.getSpanner(options1, connection1); pool.removeConnection(options1, connection2); String capturedLog = getTestCapturedLog(); - assertThat(capturedLog.contains(expectedLogPart), is(true)); + assertThat(capturedLog.contains(expectedLogPart)).isTrue(); } @Test @@ -223,7 +221,7 @@ public void testRemoveConnectionConnectionAlreadyRemoved() { pool.removeConnection(options1, connection1); pool.removeConnection(options1, connection1); String capturedLog = getTestCapturedLog(); - assertThat(capturedLog.contains(expectedLogPart), is(true)); + assertThat(capturedLog.contains(expectedLogPart)).isTrue(); } @Test @@ -237,7 +235,7 @@ public void testCloseSpanner() { } catch (SpannerException e) { exception = e.getErrorCode() == ErrorCode.FAILED_PRECONDITION; } - assertThat(exception, is(true)); + assertThat(exception).isTrue(); // remove the connection and verify that it is possible to close pool.removeConnection(options1, connection1); @@ -249,7 +247,7 @@ public void testCloseSpanner() { Spanner spanner2 = pool.getSpanner(options1, connection1); pool.checkAndCloseSpanners(CheckAndCloseSpannersMode.WARN); String capturedLog = getTestCapturedLog(); - assertThat(capturedLog.contains(expectedLogPart), is(true)); + assertThat(capturedLog.contains(expectedLogPart)).isTrue(); verify(spanner2, never()).close(); // remove the connection and verify that it is possible to close @@ -273,11 +271,11 @@ public void testLeakedConnection() { ConnectionOptions.closeSpanner(); fail("missing expected exception"); } catch (SpannerException e) { - assertThat(e.getErrorCode(), is(equalTo(ErrorCode.FAILED_PRECONDITION))); + assertThat(e.getErrorCode()).isEqualTo(ErrorCode.FAILED_PRECONDITION); } String capturedLog = getTestCapturedLog(); - assertThat(capturedLog.contains(LeakedConnectionException.class.getName()), is(true)); - assertThat(capturedLog.contains("testLeakedConnection"), is(true)); + assertThat(capturedLog.contains(LeakedConnectionException.class.getName())).isTrue(); + assertThat(capturedLog.contains("testLeakedConnection")).isTrue(); // Now close the connection to avoid trouble with other test cases. connection.close(); } @@ -292,7 +290,7 @@ public void testCloseUnusedSpanners() { // create two connections that use the same Spanner spanner1 = pool.getSpanner(options1, connection1); spanner2 = pool.getSpanner(options1, connection2); - assertThat(spanner1, is(equalTo(spanner2))); + assertThat(spanner1).isEqualTo(spanner2); // all spanners are in use, this should have no effect pool.closeUnusedSpanners(-1L); @@ -312,8 +310,8 @@ public void testCloseUnusedSpanners() { spanner1 = pool.getSpanner(options1, connection1); spanner2 = pool.getSpanner(options2, connection2); spanner3 = pool.getSpanner(options2, connection3); - assertThat(spanner1, not(equalTo(spanner2))); - assertThat(spanner2, is(equalTo(spanner3))); + assertThat(spanner1).isNotEqualTo(spanner2); + assertThat(spanner2).isEqualTo(spanner3); // all spanners are in use, this should have no effect pool.closeUnusedSpanners(-1L); @@ -359,7 +357,7 @@ public void testAutomaticCloser() throws InterruptedException { // create two connections that use the same Spanner spanner1 = pool.getSpanner(options1, connection1); spanner2 = pool.getSpanner(options1, connection2); - assertThat(spanner1, is(equalTo(spanner2))); + assertThat(spanner1).isEqualTo(spanner2); // all spanners are in use, this should have no effect ticker.advance(TEST_AUTOMATIC_CLOSE_TIMEOUT_NANOS + MILLISECOND); @@ -382,8 +380,8 @@ public void testAutomaticCloser() throws InterruptedException { spanner1 = pool.getSpanner(options1, connection1); spanner2 = pool.getSpanner(options2, connection2); spanner3 = pool.getSpanner(options2, connection3); - assertThat(spanner1, not(equalTo(spanner2))); - assertThat(spanner2, is(equalTo(spanner3))); + assertThat(spanner1).isNotEqualTo(spanner2); + assertThat(spanner2).isEqualTo(spanner3); // all spanners are in use, this should have no effect ticker.advance(TEST_AUTOMATIC_CLOSE_TIMEOUT_NANOS + MILLISECOND); @@ -416,4 +414,35 @@ public void testAutomaticCloser() throws InterruptedException { verify(spanner2).close(); verify(spanner3).close(); } + + @Test + public void testSpannerPoolKeyEquality() { + ConnectionOptions options1 = + ConnectionOptions.newBuilder() + .setUri( + "cloudspanner:/projects/p/instances/i/databases/d?minSessions=200;maxSessions=400") + .setCredentials(NoCredentials.getInstance()) + .build(); + // options2 equals the default session pool options, and is therefore equal to ConnectionOptions + // without any session pool configuration. + ConnectionOptions options2 = + ConnectionOptions.newBuilder() + .setUri( + "cloudspanner:/projects/p/instances/i/databases/d?minSessions=100;maxSessions=400") + .setCredentials(NoCredentials.getInstance()) + .build(); + ConnectionOptions options3 = + ConnectionOptions.newBuilder() + .setUri("cloudspanner:/projects/p/instances/i/databases/d") + .setCredentials(NoCredentials.getInstance()) + .build(); + + SpannerPoolKey key1 = SpannerPoolKey.of(options1); + SpannerPoolKey key2 = SpannerPoolKey.of(options2); + SpannerPoolKey key3 = SpannerPoolKey.of(options3); + + assertThat(key1).isNotEqualTo(key2); + assertThat(key2).isEqualTo(key3); + assertThat(key1).isNotEqualTo(key3); + } }