Skip to content

Commit

Permalink
fix: only register metrics once
Browse files Browse the repository at this point in the history
The default MetricsRegistry used by OpenCensus is a singleton that does
not allow the same metrics to be registered multiple times. The session
pool metrics gathering did not take this into account, and when multiple
session pools were created within the same class loader, it would throw
an InvalidArgumentException for the second session pool.

Fixes #106.
  • Loading branch information
olavloite committed Mar 15, 2020
1 parent ea279c4 commit 625d1fc
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 74 deletions.
Expand Up @@ -15,6 +15,7 @@
*/
package com.google.cloud.spanner;

import com.google.api.gax.core.GaxProperties;
import com.google.common.collect.ImmutableList;
import io.opencensus.metrics.LabelKey;
import io.opencensus.metrics.LabelValue;
Expand All @@ -23,20 +24,13 @@
class MetricRegistryConstants {

// The label keys are used to uniquely identify timeseries.
private static final LabelKey DATABASE = LabelKey.create("database", "Target database");
private static final LabelKey INSTANCE_ID =
LabelKey.create("instance_id", "Name of the instance");
private static final LabelKey LIBRARY_VERSION =
LabelKey.create("library_version", "Library version");

/** The label value is used to represent missing value. */
private static final LabelValue UNSET_LABEL = LabelValue.create(null);

static final ImmutableList<LabelKey> SPANNER_LABEL_KEYS =
ImmutableList.of(DATABASE, INSTANCE_ID, LIBRARY_VERSION);
static final ImmutableList<LabelKey> SPANNER_LABEL_KEYS = ImmutableList.of(LIBRARY_VERSION);

static final ImmutableList<LabelValue> SPANNER_DEFAULT_LABEL_VALUES =
ImmutableList.of(UNSET_LABEL, UNSET_LABEL, UNSET_LABEL);
ImmutableList.of(LabelValue.create(GaxProperties.getLibraryVersion(SpannerImpl.class)));

/** Unit to represent counts. */
static final String COUNT = "1";
Expand Down
Expand Up @@ -69,10 +69,13 @@
import io.opencensus.trace.Status;
import io.opencensus.trace.Tracer;
import io.opencensus.trace.Tracing;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Random;
import java.util.Set;
Expand Down Expand Up @@ -1088,6 +1091,11 @@ private static enum Position {
RANDOM;
}

private static final Object POOLS_LOCK = new Object();

@GuardedBy("POOLS_LOCK")
private static final Map<MetricRegistry, List<SessionPool>> REGISTERED_POOLS = new HashMap<>();

private final SessionPoolOptions options;
private final SessionClient sessionClient;
private final ScheduledExecutorService executor;
Expand Down Expand Up @@ -1150,15 +1158,12 @@ private static enum Position {
* Return pool is immediately ready for use, though getting a session might block for sessions to
* be created.
*/
static SessionPool createPool(
SpannerOptions spannerOptions, SessionClient sessionClient, List<LabelValue> labelValues) {
static SessionPool createPool(SpannerOptions spannerOptions, SessionClient sessionClient) {
return createPool(
spannerOptions.getSessionPoolOptions(),
((GrpcTransportOptions) spannerOptions.getTransportOptions()).getExecutorFactory(),
sessionClient,
new Clock(),
Metrics.getMetricRegistry(),
labelValues);
new Clock());
}

static SessionPool createPool(
Expand Down Expand Up @@ -1210,6 +1215,14 @@ private SessionPool(
Clock clock,
MetricRegistry metricRegistry,
List<LabelValue> labelValues) {
synchronized (POOLS_LOCK) {
if (!REGISTERED_POOLS.containsKey(metricRegistry)) {
initMetricsCollection(metricRegistry, labelValues);
REGISTERED_POOLS.put(metricRegistry, new LinkedList<>(Arrays.asList(this)));
} else {
REGISTERED_POOLS.get(metricRegistry).add(this);
}
}
this.options = options;
this.executorFactory = executorFactory;
this.executor = executor;
Expand All @@ -1229,7 +1242,6 @@ private SessionPool(
this.sessionClient = sessionClient;
this.clock = clock;
this.poolMaintainer = new PoolMaintainer();
this.initMetricsCollection(metricRegistry, labelValues);
}

@VisibleForTesting
Expand Down Expand Up @@ -1862,11 +1874,36 @@ public void onSessionCreateFailure(Throwable t, int createFailureForSessionCount
}
}

private static final class Sum implements ToLongFunction<Void> {
private final MetricRegistry registry;
private final Function<SessionPool, Long> function;

static Sum of(MetricRegistry registry, Function<SessionPool, Long> function) {
return new Sum(registry, function);
}

private Sum(MetricRegistry registry, Function<SessionPool, Long> function) {
this.registry = registry;
this.function = function;
}

@Override
public long applyAsLong(Void input) {
long res = 0L;
synchronized (POOLS_LOCK) {
for (SessionPool pool : REGISTERED_POOLS.get(registry)) {
res += function.apply(pool);
}
}
return res;
}
};

/**
* Initializes and creates Spanner session relevant metrics. When coupled with an exporter, it
* allows users to monitor client behavior.
*/
private void initMetricsCollection(MetricRegistry metricRegistry, List<LabelValue> labelValues) {
static void initMetricsCollection(MetricRegistry metricRegistry, List<LabelValue> labelValues) {
DerivedLongGauge maxInUseSessionsMetric =
metricRegistry.addDerivedLongGauge(
MAX_IN_USE_SESSIONS,
Expand Down Expand Up @@ -1925,68 +1962,80 @@ private void initMetricsCollection(MetricRegistry metricRegistry, List<LabelValu
// invoked whenever metrics are collected.
maxInUseSessionsMetric.createTimeSeries(
labelValues,
this,
new ToLongFunction<SessionPool>() {
@Override
public long applyAsLong(SessionPool sessionPool) {
return sessionPool.maxSessionsInUse;
}
});
null,
Sum.of(
metricRegistry,
new Function<SessionPool, Long>() {
@Override
public Long apply(SessionPool input) {
return Long.valueOf(input.maxSessionsInUse);
}
}));

// The value of a maxSessions is observed from a callback function. This function is invoked
// whenever metrics are collected.
maxAllowedSessionsMetric.createTimeSeries(
labelValues,
options,
new ToLongFunction<SessionPoolOptions>() {
@Override
public long applyAsLong(SessionPoolOptions options) {
return options.getMaxSessions();
}
});
null,
Sum.of(
metricRegistry,
new Function<SessionPool, Long>() {
@Override
public Long apply(SessionPool input) {
return Long.valueOf(input.options.getMaxSessions());
}
}));

// The value of a numSessionsInUse is observed from a callback function. This function is
// invoked whenever metrics are collected.
numInUseSessionsMetric.createTimeSeries(
labelValues,
this,
new ToLongFunction<SessionPool>() {
@Override
public long applyAsLong(SessionPool sessionPool) {
return sessionPool.numSessionsInUse;
}
});
null,
Sum.of(
metricRegistry,
new Function<SessionPool, Long>() {
@Override
public Long apply(SessionPool input) {
return Long.valueOf(input.numSessionsInUse);
}
}));

// The value of a numWaiterTimeouts is observed from a callback function. This function is
// invoked whenever metrics are collected.
sessionsTimeouts.createTimeSeries(
labelValues,
this,
new ToLongFunction<SessionPool>() {
@Override
public long applyAsLong(SessionPool sessionPool) {
return sessionPool.getNumWaiterTimeouts();
}
});
null,
Sum.of(
metricRegistry,
new Function<SessionPool, Long>() {
@Override
public Long apply(SessionPool input) {
return input.getNumWaiterTimeouts();
}
}));

numAcquiredSessionsMetric.createTimeSeries(
labelValues,
this,
new ToLongFunction<SessionPool>() {
@Override
public long applyAsLong(SessionPool sessionPool) {
return sessionPool.numSessionsAcquired;
}
});
null,
Sum.of(
metricRegistry,
new Function<SessionPool, Long>() {
@Override
public Long apply(SessionPool input) {
return input.numSessionsAcquired;
}
}));

numReleasedSessionsMetric.createTimeSeries(
labelValues,
this,
new ToLongFunction<SessionPool>() {
@Override
public long applyAsLong(SessionPool sessionPool) {
return sessionPool.numSessionsReleased;
}
});
null,
Sum.of(
metricRegistry,
new Function<SessionPool, Long>() {
@Override
public Long apply(SessionPool input) {
return input.numSessionsReleased;
}
}));
}
}
Expand Up @@ -16,7 +16,6 @@

package com.google.cloud.spanner;

import com.google.api.gax.core.GaxProperties;
import com.google.api.gax.paging.Page;
import com.google.cloud.BaseService;
import com.google.cloud.PageImpl;
Expand All @@ -28,11 +27,9 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.spanner.v1.ExecuteSqlRequest.QueryOptions;
import io.opencensus.metrics.LabelValue;
import io.opencensus.trace.Tracer;
import io.opencensus.trace.Tracing;
import java.util.ArrayList;
Expand Down Expand Up @@ -153,14 +150,8 @@ public DatabaseClient getDatabaseClient(DatabaseId db) {
if (dbClients.containsKey(db)) {
return dbClients.get(db);
} else {
List<LabelValue> labelValues =
ImmutableList.of(
LabelValue.create(db.getDatabase()),
LabelValue.create(db.getInstanceId().getName()),
LabelValue.create(GaxProperties.getLibraryVersion(getOptions().getClass())));
SessionPool pool =
SessionPool.createPool(
getOptions(), SpannerImpl.this.getSessionClient(db), labelValues);
SessionPool.createPool(getOptions(), SpannerImpl.this.getSessionClient(db));
DatabaseClientImpl dbClient = createDatabaseClient(pool);
dbClients.put(db, dbClient);
return dbClient;
Expand Down
Expand Up @@ -1577,11 +1577,7 @@ public void testSessionMetrics() throws Exception {
FakeClock clock = new FakeClock();
clock.currentTimeMillis = System.currentTimeMillis();
FakeMetricRegistry metricRegistry = new FakeMetricRegistry();
List<LabelValue> labelValues =
Arrays.asList(
LabelValue.create("database1"),
LabelValue.create("instance1"),
LabelValue.create("1.0.0"));
List<LabelValue> labelValues = Arrays.asList(LabelValue.create("1.0.0"));

setupMockSessionCreation();
pool = createPool(clock, metricRegistry, labelValues);
Expand All @@ -1590,7 +1586,10 @@ public void testSessionMetrics() throws Exception {

MetricsRecord record = metricRegistry.pollRecord();
assertThat(record.getMetrics().size()).isEqualTo(6);
assertThat(record.getMetrics()).containsEntry(MetricRegistryConstants.IN_USE_SESSIONS, 2L);
assertThat(record.getMetrics()).containsKey(MetricRegistryConstants.IN_USE_SESSIONS);
assertThat(record.getMetrics().get(MetricRegistryConstants.IN_USE_SESSIONS)).isEqualTo(2L);
// assertThat(record.getMetrics()).containsEntry(MetricRegistryConstants.IN_USE_SESSIONS,
// 2L);
assertThat(record.getMetrics()).containsEntry(MetricRegistryConstants.MAX_IN_USE_SESSIONS, 2L);
assertThat(record.getMetrics()).containsEntry(MetricRegistryConstants.GET_SESSION_TIMEOUTS, 0L);
assertThat(record.getMetrics())
Expand Down

0 comments on commit 625d1fc

Please sign in to comment.