Skip to content

Commit

Permalink
feat: allow session pool settings in connection url (#821)
Browse files Browse the repository at this point in the history
* feat: allow session pool settings in connection url

* fix: use NoCredentials in test
  • Loading branch information
olavloite committed Jan 31, 2021
1 parent facda8a commit e1e9152
Show file tree
Hide file tree
Showing 6 changed files with 432 additions and 160 deletions.
Expand Up @@ -18,6 +18,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.Objects;
import org.threeten.bp.Duration;

/** Options for the session pool used by {@code DatabaseClient}. */
Expand Down Expand Up @@ -63,6 +64,48 @@ private SessionPoolOptions(Builder builder) {
this.removeInactiveSessionAfter = builder.removeInactiveSessionAfter;
}

@Override
public boolean equals(Object o) {
if (!(o instanceof SessionPoolOptions)) {
return false;
}
SessionPoolOptions other = (SessionPoolOptions) o;
return Objects.equals(this.minSessions, other.minSessions)
&& Objects.equals(this.maxSessions, other.maxSessions)
&& Objects.equals(this.incStep, other.incStep)
&& Objects.equals(this.maxIdleSessions, other.maxIdleSessions)
&& Objects.equals(this.writeSessionsFraction, other.writeSessionsFraction)
&& Objects.equals(this.actionOnExhaustion, other.actionOnExhaustion)
&& Objects.equals(this.actionOnSessionNotFound, other.actionOnSessionNotFound)
&& Objects.equals(this.actionOnSessionLeak, other.actionOnSessionLeak)
&& Objects.equals(
this.initialWaitForSessionTimeoutMillis, other.initialWaitForSessionTimeoutMillis)
&& Objects.equals(this.loopFrequency, other.loopFrequency)
&& Objects.equals(this.keepAliveIntervalMinutes, other.keepAliveIntervalMinutes)
&& Objects.equals(this.removeInactiveSessionAfter, other.removeInactiveSessionAfter);
}

@Override
public int hashCode() {
return Objects.hash(
this.minSessions,
this.maxSessions,
this.incStep,
this.maxIdleSessions,
this.writeSessionsFraction,
this.actionOnExhaustion,
this.actionOnSessionNotFound,
this.actionOnSessionLeak,
this.initialWaitForSessionTimeoutMillis,
this.loopFrequency,
this.keepAliveIntervalMinutes,
this.removeInactiveSessionAfter);
}

public Builder toBuilder() {
return new Builder(this);
}

public int getMinSessions() {
return minSessions;
}
Expand Down Expand Up @@ -165,6 +208,24 @@ public static class Builder {
private int keepAliveIntervalMinutes = 30;
private Duration removeInactiveSessionAfter = Duration.ofMinutes(55L);

public Builder() {}

private Builder(SessionPoolOptions options) {
this.minSessionsSet = true;
this.minSessions = options.minSessions;
this.maxSessions = options.maxSessions;
this.incStep = options.incStep;
this.maxIdleSessions = options.maxIdleSessions;
this.writeSessionsFraction = options.writeSessionsFraction;
this.actionOnExhaustion = options.actionOnExhaustion;
this.initialWaitForSessionTimeoutMillis = options.initialWaitForSessionTimeoutMillis;
this.actionOnSessionNotFound = options.actionOnSessionNotFound;
this.actionOnSessionLeak = options.actionOnSessionLeak;
this.loopFrequency = options.loopFrequency;
this.keepAliveIntervalMinutes = options.keepAliveIntervalMinutes;
this.removeInactiveSessionAfter = options.removeInactiveSessionAfter;
}

/**
* Minimum number of sessions that this pool will always maintain. These will be created eagerly
* in parallel. Defaults to 100.
Expand Down
Expand Up @@ -150,6 +150,8 @@ public String[] getValidValues() {
static final boolean DEFAULT_RETRY_ABORTS_INTERNALLY = true;
private static final String DEFAULT_CREDENTIALS = null;
private static final String DEFAULT_OAUTH_TOKEN = null;
private static final String DEFAULT_MIN_SESSIONS = null;
private static final String DEFAULT_MAX_SESSIONS = null;
private static final String DEFAULT_NUM_CHANNELS = null;
private static final String DEFAULT_USER_AGENT = null;
private static final String DEFAULT_OPTIMIZER_VERSION = "";
Expand All @@ -172,6 +174,10 @@ public String[] getValidValues() {
* OAuth token to use for authentication. Cannot be used in combination with a credentials file.
*/
public static final String OAUTH_TOKEN_PROPERTY_NAME = "oauthToken";
/** Name of the 'minSessions' connection property. */
public static final String MIN_SESSIONS_PROPERTY_NAME = "minSessions";
/** Name of the 'numChannels' connection property. */
public static final String MAX_SESSIONS_PROPERTY_NAME = "maxSessions";
/** Name of the 'numChannels' connection property. */
public static final String NUM_CHANNELS_PROPERTY_NAME = "numChannels";
/** Custom user agent string is only for other Google libraries. */
Expand Down Expand Up @@ -204,6 +210,12 @@ public String[] getValidValues() {
ConnectionProperty.createStringProperty(
OAUTH_TOKEN_PROPERTY_NAME,
"A valid pre-existing OAuth token to use for authentication for this connection. Setting this property will take precedence over any value set for a credentials file."),
ConnectionProperty.createStringProperty(
MIN_SESSIONS_PROPERTY_NAME,
"The minimum number of sessions in the backing session pool. The default is 100."),
ConnectionProperty.createStringProperty(
MAX_SESSIONS_PROPERTY_NAME,
"The maximum number of sessions in the backing session pool. The default is 400."),
ConnectionProperty.createStringProperty(
NUM_CHANNELS_PROPERTY_NAME,
"The number of gRPC channels to use to communicate with Cloud Spanner. The default is 4."),
Expand Down Expand Up @@ -327,6 +339,9 @@ private boolean isValidUri(String uri) {
* true.
* <li>readonly (boolean): Sets the initial readonly mode for the connection. Default is
* false.
* <li>minSessions (int): Sets the minimum number of sessions in the backing session pool.
* <li>maxSessions (int): Sets the maximum number of sessions in the backing session pool.
* <li>numChannels (int): Sets the number of gRPC channels to use for the connection.
* <li>retryAbortsInternally (boolean): Sets the initial retryAbortsInternally mode for the
* connection. Default is true.
* <li>optimizerVersion (string): Sets the query optimizer version to use for the connection.
Expand Down Expand Up @@ -437,6 +452,8 @@ public static Builder newBuilder() {
private final Credentials credentials;
private final SessionPoolOptions sessionPoolOptions;
private final Integer numChannels;
private final Integer minSessions;
private final Integer maxSessions;
private final String userAgent;
private final QueryOptions queryOptions;

Expand All @@ -453,7 +470,6 @@ private ConnectionOptions(Builder builder) {
this.warnings = checkValidProperties(builder.uri);

this.uri = builder.uri;
this.sessionPoolOptions = builder.sessionPoolOptions;
this.credentialsUrl =
builder.credentialsUrl != null ? builder.credentialsUrl : parseCredentials(builder.uri);
this.oauthToken =
Expand Down Expand Up @@ -492,19 +508,12 @@ private ConnectionOptions(Builder builder) {
} else {
this.credentials = getCredentialsService().createCredentials(this.credentialsUrl);
}
String numChannelsValue = parseNumChannels(builder.uri);
if (numChannelsValue != null) {
try {
this.numChannels = Integer.valueOf(numChannelsValue);
} catch (NumberFormatException e) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.INVALID_ARGUMENT,
"Invalid numChannels value specified: " + numChannelsValue,
e);
}
} else {
this.numChannels = null;
}
this.minSessions =
parseIntegerProperty(MIN_SESSIONS_PROPERTY_NAME, parseMinSessions(builder.uri));
this.maxSessions =
parseIntegerProperty(MAX_SESSIONS_PROPERTY_NAME, parseMaxSessions(builder.uri));
this.numChannels =
parseIntegerProperty(NUM_CHANNELS_PROPERTY_NAME, parseNumChannels(builder.uri));

String projectId = matcher.group(Builder.PROJECT_GROUP);
if (Builder.DEFAULT_PROJECT_ID_PLACEHOLDER.equalsIgnoreCase(projectId)) {
Expand All @@ -518,6 +527,36 @@ private ConnectionOptions(Builder builder) {
this.statementExecutionInterceptors =
Collections.unmodifiableList(builder.statementExecutionInterceptors);
this.configurator = builder.configurator;

if (this.minSessions != null || this.maxSessions != null) {
SessionPoolOptions.Builder sessionPoolOptionsBuilder =
builder.sessionPoolOptions == null
? SessionPoolOptions.newBuilder()
: builder.sessionPoolOptions.toBuilder();
if (this.minSessions != null) {
sessionPoolOptionsBuilder.setMinSessions(this.minSessions);
}
if (this.maxSessions != null) {
sessionPoolOptionsBuilder.setMaxSessions(this.maxSessions);
}
this.sessionPoolOptions = sessionPoolOptionsBuilder.build();
} else {
this.sessionPoolOptions = builder.sessionPoolOptions;
}
}

private static Integer parseIntegerProperty(String propertyName, String value) {
if (value != null) {
try {
return Integer.valueOf(value);
} catch (NumberFormatException e) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.INVALID_ARGUMENT,
String.format("Invalid %s value specified: %s", propertyName, value),
e);
}
}
return null;
}

SpannerOptionsConfigurator getConfigurator() {
Expand Down Expand Up @@ -565,6 +604,18 @@ static String parseOAuthToken(String uri) {
return value != null ? value : DEFAULT_OAUTH_TOKEN;
}

@VisibleForTesting
static String parseMinSessions(String uri) {
String value = parseUriProperty(uri, MIN_SESSIONS_PROPERTY_NAME);
return value != null ? value : DEFAULT_MIN_SESSIONS;
}

@VisibleForTesting
static String parseMaxSessions(String uri) {
String value = parseUriProperty(uri, MAX_SESSIONS_PROPERTY_NAME);
return value != null ? value : DEFAULT_MAX_SESSIONS;
}

@VisibleForTesting
static String parseNumChannels(String uri) {
String value = parseUriProperty(uri, NUM_CHANNELS_PROPERTY_NAME);
Expand Down Expand Up @@ -671,6 +722,24 @@ public SessionPoolOptions getSessionPoolOptions() {
return sessionPoolOptions;
}

/**
* The minimum number of sessions in the backing session pool of this connection. The session pool
* is shared between all connections in the same JVM that connect to the same Cloud Spanner
* database using the same connection settings.
*/
public Integer getMinSessions() {
return minSessions;
}

/**
* The maximum number of sessions in the backing session pool of this connection. The session pool
* is shared between all connections in the same JVM that connect to the same Cloud Spanner
* database using the same connection settings.
*/
public Integer getMaxSessions() {
return maxSessions;
}

/** The number of channels to use for the connection. */
public Integer getNumChannels() {
return numChannels;
Expand Down
Expand Up @@ -148,15 +148,19 @@ static class SpannerPoolKey {
private final boolean usePlainText;
private final String userAgent;

private static SpannerPoolKey of(ConnectionOptions options) {
@VisibleForTesting
static SpannerPoolKey of(ConnectionOptions options) {
return new SpannerPoolKey(options);
}

private SpannerPoolKey(ConnectionOptions options) {
this.host = options.getHost();
this.projectId = options.getProjectId();
this.credentialsKey = CredentialsKey.create(options);
this.sessionPoolOptions = options.getSessionPoolOptions();
this.sessionPoolOptions =
options.getSessionPoolOptions() == null
? SessionPoolOptions.newBuilder().build()
: options.getSessionPoolOptions();
this.numChannels = options.getNumChannels();
this.usePlainText = options.isUsePlainText();
this.userAgent = options.getUserAgent();
Expand Down
Expand Up @@ -421,4 +421,28 @@ public void testLenient() {
assertThat(e.getMessage()).contains("bar");
}
}

@Test
public void testMinSessions() {
ConnectionOptions options =
ConnectionOptions.newBuilder()
.setUri(
"cloudspanner:/projects/test-project-123/instances/test-instance/databases/test-database?minSessions=400")
.setCredentialsUrl(FILE_TEST_PATH)
.build();
assertThat(options.getMinSessions()).isEqualTo(400);
assertThat(options.getSessionPoolOptions().getMinSessions()).isEqualTo(400);
}

@Test
public void testMaxSessions() {
ConnectionOptions options =
ConnectionOptions.newBuilder()
.setUri(
"cloudspanner:/projects/test-project-123/instances/test-instance/databases/test-database?maxSessions=4000")
.setCredentialsUrl(FILE_TEST_PATH)
.build();
assertThat(options.getMaxSessions()).isEqualTo(4000);
assertThat(options.getSessionPoolOptions().getMaxSessions()).isEqualTo(4000);
}
}

0 comments on commit e1e9152

Please sign in to comment.