From de82f7809f8585fcbd13e117a2e29e06f1424de4 Mon Sep 17 00:00:00 2001 From: Kiranmayi Bhamidimarri <65816936+KiranmayiB@users.noreply.github.com> Date: Thu, 7 Oct 2021 14:35:33 +0530 Subject: [PATCH] feat: expose GFE latency metrics (#1473) * GFE Latency * feat: GFE Latency * Busy waiting for header missing count * Thread sleep before checking view Data * lint errors and changing method signatures in unit test * lint changes * Adding pattern for Admin calls * Correcting regex pattern --- google-cloud-spanner/pom.xml | 7 +- .../spanner/spi/v1/HeaderInterceptor.java | 141 ++++++ .../spi/v1/SpannerInterceptorProvider.java | 3 +- .../cloud/spanner/spi/v1/SpannerRpcViews.java | 118 +++++ .../cloud/spanner/spi/v1/GfeLatencyTest.java | 408 ++++++++++++++++++ 5 files changed, 675 insertions(+), 2 deletions(-) create mode 100644 google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/HeaderInterceptor.java create mode 100644 google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpcViews.java create mode 100644 google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GfeLatencyTest.java diff --git a/google-cloud-spanner/pom.xml b/google-cloud-spanner/pom.xml index 27eb4900fb..91fadd21bc 100644 --- a/google-cloud-spanner/pom.xml +++ b/google-cloud-spanner/pom.xml @@ -98,7 +98,7 @@ org.apache.maven.plugins maven-dependency-plugin - io.grpc:grpc-protobuf-lite,org.hamcrest:hamcrest,org.hamcrest:hamcrest-core,com.google.errorprone:error_prone_annotations,org.openjdk.jmh:jmh-generator-annprocess,com.google.api.grpc:grpc-google-cloud-spanner-v1,com.google.api.grpc:grpc-google-cloud-spanner-admin-instance-v1,com.google.api.grpc:grpc-google-cloud-spanner-admin-database-v1,javax.annotation:javax.annotation-api + io.grpc:grpc-protobuf-lite,org.hamcrest:hamcrest,org.hamcrest:hamcrest-core,com.google.errorprone:error_prone_annotations,org.openjdk.jmh:jmh-generator-annprocess,com.google.api.grpc:grpc-google-cloud-spanner-v1,com.google.api.grpc:grpc-google-cloud-spanner-admin-instance-v1,com.google.api.grpc:grpc-google-cloud-spanner-admin-database-v1,javax.annotation:javax.annotation-api,io.opencensus:opencensus-impl @@ -179,6 +179,11 @@ io.opencensus opencensus-contrib-grpc-util + + io.opencensus + opencensus-impl + test + com.google.auth google-auth-library-oauth2-http diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/HeaderInterceptor.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/HeaderInterceptor.java new file mode 100644 index 0000000000..3f4372db84 --- /dev/null +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/HeaderInterceptor.java @@ -0,0 +1,141 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.spanner.spi.v1; + +import static com.google.cloud.spanner.spi.v1.SpannerRpcViews.DATABASE_ID; +import static com.google.cloud.spanner.spi.v1.SpannerRpcViews.INSTANCE_ID; +import static com.google.cloud.spanner.spi.v1.SpannerRpcViews.METHOD; +import static com.google.cloud.spanner.spi.v1.SpannerRpcViews.PROJECT_ID; +import static com.google.cloud.spanner.spi.v1.SpannerRpcViews.SPANNER_GFE_HEADER_MISSING_COUNT; +import static com.google.cloud.spanner.spi.v1.SpannerRpcViews.SPANNER_GFE_LATENCY; + +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; +import io.grpc.ForwardingClientCall.SimpleForwardingClientCall; +import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.opencensus.stats.MeasureMap; +import io.opencensus.stats.Stats; +import io.opencensus.stats.StatsRecorder; +import io.opencensus.tags.TagContext; +import io.opencensus.tags.TagValue; +import io.opencensus.tags.Tagger; +import io.opencensus.tags.Tags; +import java.util.logging.Level; +import java.util.logging.Logger; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Intercepts all gRPC calls to extract server-timing header. Captures GFE Latency and GFE Header + * Missing count metrics. + */ +class HeaderInterceptor implements ClientInterceptor { + + private static final Metadata.Key SERVER_TIMING_HEADER_KEY = + Metadata.Key.of("server-timing", Metadata.ASCII_STRING_MARSHALLER); + private static final Pattern SERVER_TIMING_HEADER_PATTERN = Pattern.compile(".*dur=(?\\d+)"); + private static final Metadata.Key GOOGLE_CLOUD_RESOURCE_PREFIX_KEY = + Metadata.Key.of("google-cloud-resource-prefix", Metadata.ASCII_STRING_MARSHALLER); + private static final Pattern GOOGLE_CLOUD_RESOURCE_PREFIX_PATTERN = + Pattern.compile( + ".*projects/(?\\p{ASCII}[^/]*)(/instances/(?\\p{ASCII}[^/]*))?(/databases/(?\\p{ASCII}[^/]*))?"); + + // Get the global singleton Tagger object. + private static final Tagger TAGGER = Tags.getTagger(); + private static final StatsRecorder STATS_RECORDER = Stats.getStatsRecorder(); + + private static final Logger LOGGER = Logger.getLogger(HeaderInterceptor.class.getName()); + private static final Level LEVEL = Level.INFO; + + HeaderInterceptor() {} + + @Override + public ClientCall interceptCall( + MethodDescriptor method, CallOptions callOptions, Channel next) { + return new SimpleForwardingClientCall(next.newCall(method, callOptions)) { + @Override + public void start(Listener responseListener, Metadata headers) { + TagContext tagContext = getTagContext(headers, method.getFullMethodName()); + super.start( + new SimpleForwardingClientCallListener(responseListener) { + @Override + public void onHeaders(Metadata metadata) { + + processHeader(metadata, tagContext); + super.onHeaders(metadata); + } + }, + headers); + } + }; + } + + private void processHeader(Metadata metadata, TagContext tagContext) { + MeasureMap measureMap = STATS_RECORDER.newMeasureMap(); + if (metadata.get(SERVER_TIMING_HEADER_KEY) != null) { + String serverTiming = metadata.get(SERVER_TIMING_HEADER_KEY); + Matcher matcher = SERVER_TIMING_HEADER_PATTERN.matcher(serverTiming); + if (matcher.find()) { + try { + long latency = Long.parseLong(matcher.group("dur")); + measureMap.put(SPANNER_GFE_LATENCY, latency).record(tagContext); + measureMap.put(SPANNER_GFE_HEADER_MISSING_COUNT, 0L).record(tagContext); + } catch (NumberFormatException e) { + LOGGER.log(LEVEL, "Invalid server-timing object in header", matcher.group("dur")); + } + } + } else { + measureMap.put(SPANNER_GFE_HEADER_MISSING_COUNT, 1L).record(tagContext); + } + } + + private TagContext getTagContext( + String method, String projectId, String instanceId, String databaseId) { + return TAGGER + .currentBuilder() + .putLocal(PROJECT_ID, TagValue.create(projectId)) + .putLocal(INSTANCE_ID, TagValue.create(instanceId)) + .putLocal(DATABASE_ID, TagValue.create(databaseId)) + .putLocal(METHOD, TagValue.create(method)) + .build(); + } + + private TagContext getTagContext(Metadata headers, String method) { + String projectId = "undefined-project"; + String instanceId = "undefined-database"; + String databaseId = "undefined-database"; + if (headers.get(GOOGLE_CLOUD_RESOURCE_PREFIX_KEY) != null) { + String googleResourcePrefix = headers.get(GOOGLE_CLOUD_RESOURCE_PREFIX_KEY); + Matcher matcher = GOOGLE_CLOUD_RESOURCE_PREFIX_PATTERN.matcher(googleResourcePrefix); + if (matcher.find()) { + projectId = matcher.group("project"); + if (matcher.group("instance") != null) { + instanceId = matcher.group("instance"); + } + if (matcher.group("database") != null) { + databaseId = matcher.group("database"); + } + } else { + LOGGER.log(LEVEL, "Error parsing google cloud resource header: " + googleResourcePrefix); + } + } + return getTagContext(method, projectId, instanceId, databaseId); + } +} diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerInterceptorProvider.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerInterceptorProvider.java index c262780935..e45702de8d 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerInterceptorProvider.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerInterceptorProvider.java @@ -33,7 +33,8 @@ public class SpannerInterceptorProvider implements GrpcInterceptorProvider { private static final List defaultInterceptors = ImmutableList.of( new SpannerErrorInterceptor(), - new LoggingInterceptor(Logger.getLogger(GapicSpannerRpc.class.getName()), Level.FINER)); + new LoggingInterceptor(Logger.getLogger(GapicSpannerRpc.class.getName()), Level.FINER), + new HeaderInterceptor()); private final List clientInterceptors; diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpcViews.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpcViews.java new file mode 100644 index 0000000000..b1b68e95d1 --- /dev/null +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpcViews.java @@ -0,0 +1,118 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.spanner.spi.v1; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import io.opencensus.stats.Aggregation; +import io.opencensus.stats.Aggregation.Distribution; +import io.opencensus.stats.Aggregation.Sum; +import io.opencensus.stats.BucketBoundaries; +import io.opencensus.stats.Measure.MeasureLong; +import io.opencensus.stats.Stats; +import io.opencensus.stats.View; +import io.opencensus.stats.ViewManager; +import io.opencensus.tags.TagKey; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +@VisibleForTesting +public class SpannerRpcViews { + + /** Unit to represent milliseconds. */ + private static final String MILLISECOND = "ms"; + /** Unit to represent counts. */ + private static final String COUNT = "1"; + + /** TagKeys */ + public static final TagKey METHOD = TagKey.create("method"); + + public static final TagKey PROJECT_ID = TagKey.create("project_id"); + public static final TagKey INSTANCE_ID = TagKey.create("instance_id"); + public static final TagKey DATABASE_ID = TagKey.create("database"); + + /** GFE t4t7 latency extracted from server-timing header. */ + public static final MeasureLong SPANNER_GFE_LATENCY = + MeasureLong.create( + "cloud.google.com/java/spanner/gfe_latency", + "Latency between Google's network receiving an RPC and reading back the first byte of the response", + MILLISECOND); + /** Number of responses without the server-timing header. */ + public static final MeasureLong SPANNER_GFE_HEADER_MISSING_COUNT = + MeasureLong.create( + "cloud.google.com/java/spanner/gfe_header_missing_count", + "Number of RPC responses received without the server-timing header, most likely means that the RPC never reached Google's network", + COUNT); + + static final List RPC_MILLIS_BUCKET_BOUNDARIES = + Collections.unmodifiableList( + Arrays.asList( + 0.0, 0.01, 0.05, 0.1, 0.3, 0.6, 0.8, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 8.0, 10.0, 13.0, + 16.0, 20.0, 25.0, 30.0, 40.0, 50.0, 65.0, 80.0, 100.0, 130.0, 160.0, 200.0, 250.0, + 300.0, 400.0, 500.0, 650.0, 800.0, 1000.0, 2000.0, 5000.0, 10000.0, 20000.0, 50000.0, + 100000.0)); + static final Aggregation AGGREGATION_WITH_MILLIS_HISTOGRAM = + Distribution.create(BucketBoundaries.create(RPC_MILLIS_BUCKET_BOUNDARIES)); + static final View SPANNER_GFE_LATENCY_VIEW = + View.create( + View.Name.create("cloud.google.com/java/spanner/gfe_latency"), + "Latency between Google's network receiving an RPC and reading back the first byte of the response", + SPANNER_GFE_LATENCY, + AGGREGATION_WITH_MILLIS_HISTOGRAM, + ImmutableList.of(METHOD, PROJECT_ID, INSTANCE_ID, DATABASE_ID)); + + private static final Aggregation SUM = Sum.create(); + static final View SPANNER_GFE_HEADER_MISSING_COUNT_VIEW = + View.create( + View.Name.create("cloud.google.com/java/spanner/gfe_header_missing_count"), + "Number of RPC responses received without the server-timing header, most likely means that the RPC never reached Google's network", + SPANNER_GFE_HEADER_MISSING_COUNT, + SUM, + ImmutableList.of(METHOD, PROJECT_ID, INSTANCE_ID, DATABASE_ID)); + + public static ViewManager viewManager = Stats.getViewManager(); + + /** + * Register views for GFE metrics, including gfe_latency and gfe_header_missing_count. gfe_latency + * measures the latency between Google's network receives an RPC and reads back the first byte of + * the response. gfe_header_missing_count is a counter of the number of RPC responses without a + * server-timing header. + */ + @VisibleForTesting + public static void registerGfeLatencyAndHeaderMissingCountViews() { + viewManager.registerView(SPANNER_GFE_LATENCY_VIEW); + viewManager.registerView(SPANNER_GFE_HEADER_MISSING_COUNT_VIEW); + } + + /** + * Register GFE Latency view. gfe_latency measures the latency between Google's network receives + * an RPC and reads back the first byte of the response. + */ + @VisibleForTesting + public static void registerGfeLatencyView() { + viewManager.registerView(SPANNER_GFE_LATENCY_VIEW); + } + + /** + * Register GFE Header Missing Count view. gfe_header_missing_count is a counter of the number of + * RPC responses without a server-timing header. + */ + @VisibleForTesting + public static void registerGfeHeaderMissingCountView() { + viewManager.registerView(SPANNER_GFE_HEADER_MISSING_COUNT_VIEW); + } +} diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GfeLatencyTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GfeLatencyTest.java new file mode 100644 index 0000000000..48fae49946 --- /dev/null +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GfeLatencyTest.java @@ -0,0 +1,408 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner.spi.v1; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assume.assumeFalse; + +import com.google.auth.oauth2.AccessToken; +import com.google.auth.oauth2.OAuth2Credentials; +import com.google.cloud.spanner.DatabaseClient; +import com.google.cloud.spanner.DatabaseId; +import com.google.cloud.spanner.MockSpannerServiceImpl; +import com.google.cloud.spanner.ResultSet; +import com.google.cloud.spanner.Spanner; +import com.google.cloud.spanner.SpannerOptions; +import com.google.cloud.spanner.Statement; +import com.google.cloud.spanner.testing.EmulatorSpannerHelper; +import com.google.protobuf.ListValue; +import com.google.spanner.v1.ResultSetMetadata; +import com.google.spanner.v1.StructType; +import com.google.spanner.v1.TypeCode; +import io.grpc.ForwardingServerCall; +import io.grpc.Metadata; +import io.grpc.Server; +import io.grpc.ServerCall; +import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; +import io.grpc.auth.MoreCallCredentials; +import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder; +import io.opencensus.stats.AggregationData; +import io.opencensus.stats.View; +import io.opencensus.stats.ViewData; +import io.opencensus.tags.TagKey; +import io.opencensus.tags.TagValue; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class GfeLatencyTest { + + private static final String STATIC_OAUTH_TOKEN = "STATIC_TEST_OAUTH_TOKEN"; + private static final String VARIABLE_OAUTH_TOKEN = "VARIABLE_TEST_OAUTH_TOKEN"; + private static final OAuth2Credentials STATIC_CREDENTIALS = + OAuth2Credentials.create( + new AccessToken( + STATIC_OAUTH_TOKEN, + new java.util.Date( + System.currentTimeMillis() + TimeUnit.MILLISECONDS.convert(1L, TimeUnit.DAYS)))); + private static final OAuth2Credentials VARIABLE_CREDENTIALS = + OAuth2Credentials.create( + new AccessToken( + VARIABLE_OAUTH_TOKEN, + new java.util.Date( + System.currentTimeMillis() + TimeUnit.MILLISECONDS.convert(1L, TimeUnit.DAYS)))); + + private static MockSpannerServiceImpl mockSpanner; + private static Server server; + private static InetSocketAddress address; + private static Spanner spanner; + private static DatabaseClient databaseClient; + + private static final Map optionsMap = new HashMap<>(); + + private static MockSpannerServiceImpl mockSpannerNoHeader; + private static Server serverNoHeader; + private static InetSocketAddress addressNoHeader; + private static Spanner spannerNoHeader; + private static DatabaseClient databaseClientNoHeader; + + private static String instanceId = "fake-instance"; + private static String databaseId = "fake-database"; + private static String projectId = "fake-project"; + + private static final long WAIT_FOR_METRICS_TIME_MS = 1_000; + private static final int MAXIMUM_RETRIES = 5; + + private static AtomicInteger fakeServerTiming = new AtomicInteger(new Random().nextInt(1000) + 1); + + private static final Statement SELECT1AND2 = + Statement.of("SELECT 1 AS COL1 UNION ALL SELECT 2 AS COL1"); + + private static final ResultSetMetadata SELECT1AND2_METADATA = + ResultSetMetadata.newBuilder() + .setRowType( + StructType.newBuilder() + .addFields( + StructType.Field.newBuilder() + .setName("COL1") + .setType( + com.google.spanner.v1.Type.newBuilder() + .setCode(TypeCode.INT64) + .build()) + .build()) + .build()) + .build(); + private static final com.google.spanner.v1.ResultSet SELECT1_RESULTSET = + com.google.spanner.v1.ResultSet.newBuilder() + .addRows( + ListValue.newBuilder() + .addValues(com.google.protobuf.Value.newBuilder().setStringValue("1").build()) + .build()) + .addRows( + ListValue.newBuilder() + .addValues(com.google.protobuf.Value.newBuilder().setStringValue("2").build()) + .build()) + .setMetadata(SELECT1AND2_METADATA) + .build(); + private static final Statement UPDATE_FOO_STATEMENT = + Statement.of("UPDATE FOO SET BAR=1 WHERE BAZ=2"); + + @BeforeClass + public static void startServer() throws IOException { + SpannerRpcViews.registerGfeLatencyAndHeaderMissingCountViews(); + assumeFalse(EmulatorSpannerHelper.isUsingEmulator()); + + mockSpanner = new MockSpannerServiceImpl(); + mockSpanner.setAbortProbability(0.0D); // We don't want any unpredictable aborted transactions. + mockSpanner.putStatementResult( + MockSpannerServiceImpl.StatementResult.query(SELECT1AND2, SELECT1_RESULTSET)); + mockSpanner.putStatementResult( + MockSpannerServiceImpl.StatementResult.update(UPDATE_FOO_STATEMENT, 1L)); + address = new InetSocketAddress("localhost", 0); + server = + NettyServerBuilder.forAddress(address) + .addService(mockSpanner) + .intercept( + new ServerInterceptor() { + @Override + public ServerCall.Listener interceptCall( + ServerCall serverCall, + Metadata headers, + ServerCallHandler serverCallHandler) { + return serverCallHandler.startCall( + new ForwardingServerCall.SimpleForwardingServerCall( + serverCall) { + @Override + public void sendHeaders(Metadata headers) { + headers.put( + Metadata.Key.of("server-timing", Metadata.ASCII_STRING_MARSHALLER), + String.format("gfet4t7; dur=%d", fakeServerTiming.get())); + super.sendHeaders(headers); + } + }, + headers); + } + }) + .build() + .start(); + optionsMap.put(SpannerRpc.Option.CHANNEL_HINT, 1L); + spanner = createSpannerOptions(address, server).getService(); + databaseClient = spanner.getDatabaseClient(DatabaseId.of(projectId, instanceId, databaseId)); + + mockSpannerNoHeader = new MockSpannerServiceImpl(); + mockSpannerNoHeader.setAbortProbability(0.0D); + mockSpannerNoHeader.putStatementResult( + MockSpannerServiceImpl.StatementResult.query(SELECT1AND2, SELECT1_RESULTSET)); + mockSpannerNoHeader.putStatementResult( + MockSpannerServiceImpl.StatementResult.update(UPDATE_FOO_STATEMENT, 1L)); + addressNoHeader = new InetSocketAddress("localhost", 0); + serverNoHeader = + NettyServerBuilder.forAddress(addressNoHeader) + .addService(mockSpannerNoHeader) + .build() + .start(); + spannerNoHeader = createSpannerOptions(addressNoHeader, serverNoHeader).getService(); + databaseClientNoHeader = + spannerNoHeader.getDatabaseClient(DatabaseId.of(projectId, instanceId, databaseId)); + } + + @AfterClass + public static void stopServer() throws InterruptedException { + if (spanner != null) { + spanner.close(); + server.shutdown(); + server.awaitTermination(); + } + + if (spannerNoHeader != null) { + spannerNoHeader.close(); + serverNoHeader.shutdown(); + serverNoHeader.awaitTermination(); + } + } + + @After + public void reset() { + mockSpanner.reset(); + mockSpannerNoHeader.reset(); + } + + @Test + public void testGfeLatencyExecuteStreamingSql() throws InterruptedException { + try (ResultSet rs = databaseClient.singleUse().executeQuery(SELECT1AND2)) { + rs.next(); + } + + long latency = + getMetric( + SpannerRpcViews.SPANNER_GFE_LATENCY_VIEW, + projectId, + instanceId, + databaseId, + "google.spanner.v1.Spanner/ExecuteStreamingSql", + false); + assertEquals(fakeServerTiming.get(), latency); + } + + @Test + public void testGfeLatencyExecuteSql() throws InterruptedException { + databaseClient + .readWriteTransaction() + .run(transaction -> transaction.executeUpdate(UPDATE_FOO_STATEMENT)); + + long latency = + getMetric( + SpannerRpcViews.SPANNER_GFE_LATENCY_VIEW, + projectId, + instanceId, + databaseId, + "google.spanner.v1.Spanner/ExecuteSql", + false); + assertEquals(fakeServerTiming.get(), latency); + } + + @Test + public void testGfeMissingHeaderCountExecuteStreamingSql() throws InterruptedException { + try (ResultSet rs = databaseClient.singleUse().executeQuery(SELECT1AND2)) { + rs.next(); + } + long count = + getMetric( + SpannerRpcViews.SPANNER_GFE_HEADER_MISSING_COUNT_VIEW, + projectId, + instanceId, + databaseId, + "google.spanner.v1.Spanner/ExecuteStreamingSql", + false); + assertEquals(0, count); + + try (ResultSet rs = databaseClientNoHeader.singleUse().executeQuery(SELECT1AND2)) { + rs.next(); + } + long count1 = + getMetric( + SpannerRpcViews.SPANNER_GFE_HEADER_MISSING_COUNT_VIEW, + projectId, + instanceId, + databaseId, + "google.spanner.v1.Spanner/ExecuteStreamingSql", + true); + assertEquals(1, count1); + } + + @Test + public void testGfeMissingHeaderExecuteSql() throws InterruptedException { + databaseClient + .readWriteTransaction() + .run(transaction -> transaction.executeUpdate(UPDATE_FOO_STATEMENT)); + long count = + getMetric( + SpannerRpcViews.SPANNER_GFE_HEADER_MISSING_COUNT_VIEW, + projectId, + instanceId, + databaseId, + "google.spanner.v1.Spanner/ExecuteSql", + false); + assertEquals(0, count); + + databaseClientNoHeader + .readWriteTransaction() + .run(transaction -> transaction.executeUpdate(UPDATE_FOO_STATEMENT)); + long count1 = + getMetric( + SpannerRpcViews.SPANNER_GFE_HEADER_MISSING_COUNT_VIEW, + projectId, + instanceId, + databaseId, + "google.spanner.v1.Spanner/ExecuteSql", + true); + assertEquals(1, count1); + } + + private static SpannerOptions createSpannerOptions(InetSocketAddress address, Server server) { + String endpoint = address.getHostString() + ":" + server.getPort(); + return SpannerOptions.newBuilder() + .setProjectId("[PROJECT]") + // Set a custom channel configurator to allow http instead of https. + .setChannelConfigurator( + input -> { + input.usePlaintext(); + return input; + }) + .setHost("http://" + endpoint) + // Set static credentials that will return the static OAuth test token. + .setCredentials(STATIC_CREDENTIALS) + // Also set a CallCredentialsProvider. These credentials should take precedence above + // the static credentials. + .setCallCredentialsProvider(() -> MoreCallCredentials.from(VARIABLE_CREDENTIALS)) + .build(); + } + + private long getAggregationValueAsLong(AggregationData aggregationData) { + return aggregationData.match( + new io.opencensus.common.Function() { + @Override + public Long apply(AggregationData.SumDataDouble arg) { + return (long) arg.getSum(); + } + }, + new io.opencensus.common.Function() { + @Override + public Long apply(AggregationData.SumDataLong arg) { + return arg.getSum(); + } + }, + new io.opencensus.common.Function() { + @Override + public Long apply(AggregationData.CountData arg) { + return arg.getCount(); + } + }, + new io.opencensus.common.Function() { + @Override + public Long apply(AggregationData.DistributionData arg) { + return (long) arg.getMean(); + } + }, + new io.opencensus.common.Function() { + @Override + public Long apply(AggregationData.LastValueDataDouble arg) { + return (long) arg.getLastValue(); + } + }, + new io.opencensus.common.Function() { + @Override + public Long apply(AggregationData.LastValueDataLong arg) { + return arg.getLastValue(); + } + }, + new io.opencensus.common.Function() { + @Override + public Long apply(AggregationData arg) { + throw new UnsupportedOperationException(); + } + }); + } + + private long getMetric( + View view, + String projectId, + String instanceId, + String databaseId, + String method, + boolean withOverride) + throws InterruptedException { + List tagValues = new java.util.ArrayList<>(); + for (TagKey column : view.getColumns()) { + if (column == SpannerRpcViews.INSTANCE_ID) { + tagValues.add(TagValue.create(instanceId)); + } else if (column == SpannerRpcViews.DATABASE_ID) { + tagValues.add(TagValue.create(databaseId)); + } else if (column == SpannerRpcViews.METHOD) { + tagValues.add(TagValue.create(method)); + } else if (column == SpannerRpcViews.PROJECT_ID) { + tagValues.add(TagValue.create(projectId)); + } + } + for (int i = 0; i < MAXIMUM_RETRIES; i++) { + Thread.sleep(WAIT_FOR_METRICS_TIME_MS); + ViewData viewData = SpannerRpcViews.viewManager.getView(view.getName()); + if (viewData.getAggregationMap() != null) { + Map, AggregationData> aggregationMap = viewData.getAggregationMap(); + AggregationData aggregationData = aggregationMap.get(tagValues); + if (withOverride && getAggregationValueAsLong(aggregationData) == 0) { + continue; + } + return getAggregationValueAsLong(aggregationData); + } + } + return -1; + } +}