From 393951f82b815cd8e5ae88c7d84a9cab9d022b73 Mon Sep 17 00:00:00 2001 From: Kiranmayi Bhamidimarri Date: Wed, 29 Sep 2021 18:08:06 +0530 Subject: [PATCH 1/8] GFE Latency --- google-cloud-spanner/pom.xml | 29 +- .../spanner/spi/v1/HeaderInterceptor.java | 130 +++++++ .../spi/v1/SpannerInterceptorProvider.java | 3 +- .../cloud/spanner/spi/v1/SpannerRpcViews.java | 115 ++++++ .../cloud/spanner/spi/v1/GfeLatencyTest.java | 361 ++++++++++++++++++ 5 files changed, 619 insertions(+), 19 deletions(-) create mode 100644 google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/HeaderInterceptor.java create mode 100644 google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpcViews.java create mode 100644 google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GfeLatencyTest.java diff --git a/google-cloud-spanner/pom.xml b/google-cloud-spanner/pom.xml index 051d41753d..91fadd21bc 100644 --- a/google-cloud-spanner/pom.xml +++ b/google-cloud-spanner/pom.xml @@ -3,7 +3,7 @@ 4.0.0 com.google.cloud google-cloud-spanner - 6.12.4 + 6.12.6-SNAPSHOT jar Google Cloud Spanner https://github.com/googleapis/java-spanner @@ -11,7 +11,7 @@ com.google.cloud google-cloud-spanner-parent - 6.12.4 + 6.12.6-SNAPSHOT google-cloud-spanner @@ -74,26 +74,14 @@ gcloud-devel projects/gcloud-devel/locations/us-central1/keyRings/spanner-test-keyring/cryptoKeys/spanner-test-key - 3000 + 6000 default - com.google.cloud.spanner.SerialIntegrationTest - - - - - parallel-integration-test - - integration-test - - - com.google.cloud.spanner.ParallelIntegrationTest - 8 - true + com.google.cloud.spanner.SerialIntegrationTest, com.google.cloud.spanner.ParallelIntegrationTest @@ -110,7 +98,7 @@ org.apache.maven.plugins maven-dependency-plugin - io.grpc:grpc-protobuf-lite,org.hamcrest:hamcrest,org.hamcrest:hamcrest-core,com.google.errorprone:error_prone_annotations,org.openjdk.jmh:jmh-generator-annprocess,com.google.api.grpc:grpc-google-cloud-spanner-v1,com.google.api.grpc:grpc-google-cloud-spanner-admin-instance-v1,com.google.api.grpc:grpc-google-cloud-spanner-admin-database-v1,javax.annotation:javax.annotation-api + io.grpc:grpc-protobuf-lite,org.hamcrest:hamcrest,org.hamcrest:hamcrest-core,com.google.errorprone:error_prone_annotations,org.openjdk.jmh:jmh-generator-annprocess,com.google.api.grpc:grpc-google-cloud-spanner-v1,com.google.api.grpc:grpc-google-cloud-spanner-admin-instance-v1,com.google.api.grpc:grpc-google-cloud-spanner-admin-database-v1,javax.annotation:javax.annotation-api,io.opencensus:opencensus-impl @@ -191,6 +179,11 @@ io.opencensus opencensus-contrib-grpc-util + + io.opencensus + opencensus-impl + test + com.google.auth google-auth-library-oauth2-http @@ -422,7 +415,7 @@ true ipv4 - 3000 + 6000 diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/HeaderInterceptor.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/HeaderInterceptor.java new file mode 100644 index 0000000000..834a769b0a --- /dev/null +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/HeaderInterceptor.java @@ -0,0 +1,130 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.spanner.spi.v1; + +import static com.google.cloud.spanner.spi.v1.SpannerRpcViews.*; + +import com.google.cloud.spanner.DatabaseId; +import com.google.spanner.admin.instance.v1.ProjectName; +import io.grpc.*; +import io.grpc.ForwardingClientCall.SimpleForwardingClientCall; +import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener; +import io.opencensus.stats.*; +import io.opencensus.tags.*; +import java.util.logging.Level; +import java.util.logging.Logger; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Intercepts all gRPC calls to extract server-timing header. Captures GFE Latency and GFE Header + * Missing count metrics. + */ +class HeaderInterceptor implements ClientInterceptor { + + private static final Metadata.Key SERVER_TIMING_HEADER_KEY = + Metadata.Key.of("server-timing", Metadata.ASCII_STRING_MARSHALLER); + private static final Pattern SERVER_TIMING_HEADER_PATTERN = Pattern.compile(".*dur=(?\\d+)"); + private static final Metadata.Key GOOGLE_CLOUD_RESOURCE_PREFIX_KEY = + Metadata.Key.of("google-cloud-resource-prefix", Metadata.ASCII_STRING_MARSHALLER); + private static final Pattern GOOGLE_CLOUD_RESOURCE_PREFIX_PATTERN = + Pattern.compile( + ".*projects/(?\\w\\p{ASCII}+)/instances/(?\\w\\p{ASCII}+)/databases/(?\\w\\p{ASCII}+)"); + + // Get the global singleton Tagger object. + private static final Tagger tagger = Tags.getTagger(); + + private static final Logger logger = Logger.getLogger(HeaderInterceptor.class.getName()); + private static final Level level = Level.INFO; + + HeaderInterceptor() {} + + @Override + public ClientCall interceptCall( + MethodDescriptor method, CallOptions callOptions, Channel next) { + return new SimpleForwardingClientCall(next.newCall(method, callOptions)) { + @Override + public void start(Listener responseListener, Metadata headers) { + TagContext tagContext = getTagContext(headers, method.getFullMethodName()); + super.start( + new SimpleForwardingClientCallListener(responseListener) { + @Override + public void onHeaders(Metadata metadata) { + processHeader(metadata, tagContext); + super.onHeaders(metadata); + } + }, + headers); + } + }; + } + + private void processHeader(Metadata metadata, TagContext tagContext) { + MeasureMap measureMap = STATS_RECORDER.newMeasureMap(); + if (metadata.get(SERVER_TIMING_HEADER_KEY) != null) { + String serverTiming = metadata.get(SERVER_TIMING_HEADER_KEY); + Matcher matcher = SERVER_TIMING_HEADER_PATTERN.matcher(serverTiming); + if (matcher.find()) { + try { + long latency = Long.parseLong(matcher.group("dur")); + measureMap.put(SPANNER_GFE_LATENCY, latency).record(tagContext); + measureMap.put(SPANNER_GFE_HEADER_MISSING_COUNT, 0L).record(tagContext); + } catch (NumberFormatException e) { + logger.log(level, "invalid server-timing object in header"); + } + } + } else { + measureMap.put(SPANNER_GFE_HEADER_MISSING_COUNT, 1L).record(tagContext); + } + } + + private TagContext getTagContext( + String method, String projectId, String instanceId, String databaseId) { + return tagger + .currentBuilder() + .putLocal(SpannerRpcViews.PROJECT_ID, TagValue.create(projectId)) + .putLocal(INSTANCE_ID, TagValue.create(instanceId)) + .putLocal(DATABASE_ID, TagValue.create(databaseId)) + .putLocal(SpannerRpcViews.METHOD, TagValue.create(method)) + .build(); + } + + private TagContext getTagContext(String method) { + return tagger + .currentBuilder() + .putLocal(PROJECT_ID, TagValue.create("undefined-project")) + .putLocal(INSTANCE_ID, TagValue.create("undefined-instance")) + .putLocal(DATABASE_ID, TagValue.create("undefined-database")) + .putLocal(SpannerRpcViews.METHOD, TagValue.create(method)) + .build(); + } + + private TagContext getTagContext(Metadata headers, String method) { + if (headers.get(GOOGLE_CLOUD_RESOURCE_PREFIX_KEY) != null) { + String googleResourcePrefix = headers.get(GOOGLE_CLOUD_RESOURCE_PREFIX_KEY); + try{ + DatabaseId database = DatabaseId.of(googleResourcePrefix); + String databaseId = database.getDatabase(); + String instanceId = database.getInstanceId().getInstance(); + String projectId = database.getInstanceId().getProject(); + return getTagContext(method, projectId, instanceId, databaseId); + }catch(IllegalArgumentException e){ + logger.log(level, e.toString()); + } + } + return getTagContext(method); + } +} diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerInterceptorProvider.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerInterceptorProvider.java index c262780935..e45702de8d 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerInterceptorProvider.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerInterceptorProvider.java @@ -33,7 +33,8 @@ public class SpannerInterceptorProvider implements GrpcInterceptorProvider { private static final List defaultInterceptors = ImmutableList.of( new SpannerErrorInterceptor(), - new LoggingInterceptor(Logger.getLogger(GapicSpannerRpc.class.getName()), Level.FINER)); + new LoggingInterceptor(Logger.getLogger(GapicSpannerRpc.class.getName()), Level.FINER), + new HeaderInterceptor()); private final List clientInterceptors; diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpcViews.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpcViews.java new file mode 100644 index 0000000000..3cd8867c49 --- /dev/null +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpcViews.java @@ -0,0 +1,115 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.spanner.spi.v1; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import io.opencensus.stats.*; +import io.opencensus.stats.Aggregation.Distribution; +import io.opencensus.stats.Aggregation.Sum; +import io.opencensus.stats.Measure.MeasureLong; +import io.opencensus.tags.TagKey; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +@VisibleForTesting +public class SpannerRpcViews { + + /** Unit to represent milliseconds. */ + private static final String MILLISECOND = "ms"; + /** Unit to represent counts. */ + private static final String COUNT = "1"; + + /** TagKeys */ + public static final TagKey METHOD = TagKey.create("method"); + + public static final TagKey PROJECT_ID = TagKey.create("project_id"); + public static final TagKey INSTANCE_ID = TagKey.create("instance_id"); + public static final TagKey DATABASE_ID = TagKey.create("database"); + + public static final StatsRecorder STATS_RECORDER = Stats.getStatsRecorder(); + /** GFE t4t7 latency extracted from server-timing header. */ + public static final MeasureLong SPANNER_GFE_LATENCY = + MeasureLong.create( + "cloud.google.com/java/spanner/gfe_latency", + "Latency between Google's network receiving an RPC and reading back the first byte of the response", + MILLISECOND); + /** Number of responses without the server-timing header. */ + public static final MeasureLong SPANNER_GFE_HEADER_MISSING_COUNT = + MeasureLong.create( + "cloud.google.com/java/spanner/gfe_header_missing_count", + "Number of RPC responses received without the server-timing header, most likely means that the RPC never reached Google's network", + COUNT); + + static final List RPC_MILLIS_BUCKET_BOUNDARIES = + Collections.unmodifiableList( + Arrays.asList( + 0.0, 0.01, 0.05, 0.1, 0.3, 0.6, 0.8, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 8.0, 10.0, 13.0, + 16.0, 20.0, 25.0, 30.0, 40.0, 50.0, 65.0, 80.0, 100.0, 130.0, 160.0, 200.0, 250.0, + 300.0, 400.0, 500.0, 650.0, 800.0, 1000.0, 2000.0, 5000.0, 10000.0, 20000.0, 50000.0, + 100000.0)); + static final Aggregation AGGREGATION_WITH_MILLIS_HISTOGRAM = + Distribution.create(BucketBoundaries.create(RPC_MILLIS_BUCKET_BOUNDARIES)); + static final View SPANNER_GFE_LATENCY_VIEW = + View.create( + View.Name.create("cloud.google.com/java/spanner/gfe_latency"), + "Latency between Google's network receiving an RPC and reading back the first byte of the response", + SPANNER_GFE_LATENCY, + AGGREGATION_WITH_MILLIS_HISTOGRAM, + ImmutableList.of(METHOD, PROJECT_ID, INSTANCE_ID, DATABASE_ID)); + + private static final Aggregation SUM = Sum.create(); + static final View SPANNER_GFE_HEADER_MISSING_COUNT_VIEW = + View.create( + View.Name.create("cloud.google.com/java/spanner/gfe_header_missing_count"), + "Number of RPC responses received without the server-timing header, most likely means that the RPC never reached Google's network", + SPANNER_GFE_HEADER_MISSING_COUNT, + SUM, + ImmutableList.of(METHOD, PROJECT_ID, INSTANCE_ID, DATABASE_ID)); + + public static ViewManager viewManager = Stats.getViewManager(); + + /** + * Register views for GFE metrics, including gfe_latency and gfe_header_missing_count. gfe_latency + * measures the latency between Google's network receives an RPC and reads back the first byte of + * the response. gfe_header_missing_count is a counter of the number of RPC responses without a + * server-timing header. + */ + @VisibleForTesting + public static void registerGfeLatencyAndHeaderMissingCountViews() { + viewManager.registerView(SPANNER_GFE_LATENCY_VIEW); + viewManager.registerView(SPANNER_GFE_HEADER_MISSING_COUNT_VIEW); + } + + /** + * Register GFE Latency view. gfe_latency measures the latency between Google's network receives + * an RPC and reads back the first byte of the response. + */ + @VisibleForTesting + public static void registerGfeLatencyView() { + viewManager.registerView(SPANNER_GFE_LATENCY_VIEW); + } + + /** + * Register GFE Header Missing Count view. gfe_header_missing_count is a counter of the number of + * RPC responses without a server-timing header. + */ + @VisibleForTesting + public static void registerGfeHeaderMissingCountView() { + viewManager.registerView(SPANNER_GFE_HEADER_MISSING_COUNT_VIEW); + } +} diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GfeLatencyTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GfeLatencyTest.java new file mode 100644 index 0000000000..3f05c55400 --- /dev/null +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GfeLatencyTest.java @@ -0,0 +1,361 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner.spi.v1; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assume.assumeFalse; + +import com.google.auth.oauth2.AccessToken; +import com.google.auth.oauth2.OAuth2Credentials; +import com.google.cloud.spanner.*; +import com.google.cloud.spanner.MockSpannerServiceImpl; +import com.google.cloud.spanner.testing.EmulatorSpannerHelper; +import com.google.protobuf.ListValue; +import com.google.spanner.v1.ResultSetMetadata; +import com.google.spanner.v1.StructType; +import com.google.spanner.v1.TypeCode; +import io.grpc.*; +import io.grpc.auth.MoreCallCredentials; +import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder; +import io.opencensus.stats.*; +import io.opencensus.tags.TagKey; +import io.opencensus.tags.TagValue; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.*; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class GfeLatencyTest { + + private static final String STATIC_OAUTH_TOKEN = "STATIC_TEST_OAUTH_TOKEN"; + private static final String VARIABLE_OAUTH_TOKEN = "VARIABLE_TEST_OAUTH_TOKEN"; + private static final OAuth2Credentials STATIC_CREDENTIALS = + OAuth2Credentials.create( + new AccessToken( + STATIC_OAUTH_TOKEN, + new java.util.Date( + System.currentTimeMillis() + TimeUnit.MILLISECONDS.convert(1L, TimeUnit.DAYS)))); + private static final OAuth2Credentials VARIABLE_CREDENTIALS = + OAuth2Credentials.create( + new AccessToken( + VARIABLE_OAUTH_TOKEN, + new java.util.Date( + System.currentTimeMillis() + TimeUnit.MILLISECONDS.convert(1L, TimeUnit.DAYS)))); + + private static MockSpannerServiceImpl mockSpanner; + private static Server server; + private static InetSocketAddress address; + private static Spanner spanner; + private static DatabaseClient databaseClient; + + private static final Map optionsMap = new HashMap<>(); + + private static MockSpannerServiceImpl mockSpannerNoHeader; + private static Server serverNoHeader; + private static InetSocketAddress addressNoHeader; + private static Spanner spannerNoHeader; + private static DatabaseClient databaseClientNoHeader; + + private static String instanceId = "fake-instance"; + private static String databaseId = "fake-database"; + private static String projectId = "fake-project"; + + private static final long WAIT_FOR_METRICS_TIME_MS = 1_000; + private static final int MAXIMUM_RETRIES = 5; + + private static AtomicInteger fakeServerTiming = new AtomicInteger(new Random().nextInt(1000) + 1); + + private static final Statement SELECT1AND2 = + Statement.of("SELECT 1 AS COL1 UNION ALL SELECT 2 AS COL1"); + + private static final ResultSetMetadata SELECT1AND2_METADATA = + ResultSetMetadata.newBuilder() + .setRowType( + StructType.newBuilder() + .addFields( + StructType.Field.newBuilder() + .setName("COL1") + .setType( + com.google.spanner.v1.Type.newBuilder() + .setCode(TypeCode.INT64) + .build()) + .build()) + .build()) + .build(); + private static final com.google.spanner.v1.ResultSet SELECT1_RESULTSET = + com.google.spanner.v1.ResultSet.newBuilder() + .addRows( + ListValue.newBuilder() + .addValues(com.google.protobuf.Value.newBuilder().setStringValue("1").build()) + .build()) + .addRows( + ListValue.newBuilder() + .addValues(com.google.protobuf.Value.newBuilder().setStringValue("2").build()) + .build()) + .setMetadata(SELECT1AND2_METADATA) + .build(); + private static final Statement UPDATE_FOO_STATEMENT = + Statement.of("UPDATE FOO SET BAR=1 WHERE BAZ=2"); + + @BeforeClass + public static void startServer() throws IOException { + SpannerRpcViews.registerGfeLatencyAndHeaderMissingCountViews(); + assumeFalse(EmulatorSpannerHelper.isUsingEmulator()); + + mockSpanner = new MockSpannerServiceImpl(); + mockSpanner.setAbortProbability(0.0D); // We don't want any unpredictable aborted transactions. + mockSpanner.putStatementResult( + MockSpannerServiceImpl.StatementResult.query(SELECT1AND2, SELECT1_RESULTSET)); + mockSpanner.putStatementResult( + MockSpannerServiceImpl.StatementResult.update(UPDATE_FOO_STATEMENT, 1L)); + address = new InetSocketAddress("localhost", 0); + server = + NettyServerBuilder.forAddress(address) + .addService(mockSpanner) + .intercept( + new ServerInterceptor() { + @Override + public ServerCall.Listener interceptCall( + ServerCall serverCall, + Metadata headers, + ServerCallHandler serverCallHandler) { + return serverCallHandler.startCall( + new ForwardingServerCall.SimpleForwardingServerCall( + serverCall) { + @Override + public void sendHeaders(Metadata headers) { + headers.put( + Metadata.Key.of("server-timing", Metadata.ASCII_STRING_MARSHALLER), + String.format("gfet4t7; dur=%d", fakeServerTiming.get())); + super.sendHeaders(headers); + } + }, + headers); + } + }) + .build() + .start(); + optionsMap.put(SpannerRpc.Option.CHANNEL_HINT, 1L); + spanner = createSpannerOptions(address, server).getService(); + databaseClient = spanner.getDatabaseClient(DatabaseId.of(projectId, instanceId, databaseId)); + + mockSpannerNoHeader = new MockSpannerServiceImpl(); + mockSpannerNoHeader.setAbortProbability(0.0D); + mockSpannerNoHeader.putStatementResult( + MockSpannerServiceImpl.StatementResult.query(SELECT1AND2, SELECT1_RESULTSET)); + mockSpannerNoHeader.putStatementResult( + MockSpannerServiceImpl.StatementResult.update(UPDATE_FOO_STATEMENT, 1L)); + addressNoHeader = new InetSocketAddress("localhost", 0); + serverNoHeader = + NettyServerBuilder.forAddress(addressNoHeader) + .addService(mockSpannerNoHeader) + .build() + .start(); + spannerNoHeader = createSpannerOptions(addressNoHeader, serverNoHeader).getService(); + databaseClientNoHeader = + spannerNoHeader.getDatabaseClient(DatabaseId.of(projectId, instanceId, databaseId)); + } + + @AfterClass + public static void stopServer() throws InterruptedException { + if (spanner != null) { + spanner.close(); + server.shutdown(); + server.awaitTermination(); + } + + if (spannerNoHeader != null) { + spannerNoHeader.close(); + serverNoHeader.shutdown(); + serverNoHeader.awaitTermination(); + } + } + + @After + public void reset() { + mockSpanner.reset(); + mockSpannerNoHeader.reset(); + } + + @Test + public void testGfeLatencyExecuteStreamingSql() throws InterruptedException { + try (ResultSet rs = databaseClient.singleUse().executeQuery(SELECT1AND2)) { + rs.next(); + } + + long latency = + getMetric( + SpannerRpcViews.SPANNER_GFE_LATENCY_VIEW, + "google.spanner.v1.Spanner/ExecuteStreamingSql"); + assertEquals(fakeServerTiming.get(), latency); + } + + @Test + public void testGfeLatencyExecuteSql() throws InterruptedException { + databaseClient + .readWriteTransaction() + .run(transaction -> transaction.executeUpdate(UPDATE_FOO_STATEMENT)); + + long latency = + getMetric(SpannerRpcViews.SPANNER_GFE_LATENCY_VIEW, "google.spanner.v1.Spanner/ExecuteSql"); + assertEquals(fakeServerTiming.get(), latency); + } + + @Test + public void testGfeMissingHeaderCountExecuteStreamingSql() throws InterruptedException { + try (ResultSet rs = databaseClient.singleUse().executeQuery(SELECT1AND2)) { + rs.next(); + } + long count = + getMetric( + SpannerRpcViews.SPANNER_GFE_HEADER_MISSING_COUNT_VIEW, + "google.spanner.v1.Spanner/ExecuteStreamingSql"); + assertEquals(0, count); + + Thread.sleep(WAIT_FOR_METRICS_TIME_MS); + try (ResultSet rs = databaseClientNoHeader.singleUse().executeQuery(SELECT1AND2)) { + rs.next(); + } + Thread.sleep(WAIT_FOR_METRICS_TIME_MS); + long count1 = + getMetric( + SpannerRpcViews.SPANNER_GFE_HEADER_MISSING_COUNT_VIEW, + "google.spanner.v1.Spanner/ExecuteStreamingSql"); + assertEquals(1, count1); + } + + @Test + public void testGfeMissingHeaderExecuteSql() throws InterruptedException { + databaseClient + .readWriteTransaction() + .run(transaction -> transaction.executeUpdate(UPDATE_FOO_STATEMENT)); + long count = + getMetric( + SpannerRpcViews.SPANNER_GFE_HEADER_MISSING_COUNT_VIEW, + "google.spanner.v1.Spanner/ExecuteSql"); + assertEquals(0, count); + + Thread.sleep(WAIT_FOR_METRICS_TIME_MS); + databaseClientNoHeader + .readWriteTransaction() + .run(transaction -> transaction.executeUpdate(UPDATE_FOO_STATEMENT)); + Thread.sleep(WAIT_FOR_METRICS_TIME_MS); + long count1 = + getMetric( + SpannerRpcViews.SPANNER_GFE_HEADER_MISSING_COUNT_VIEW, + "google.spanner.v1.Spanner/ExecuteSql"); + assertEquals(1, count1); + } + + private static SpannerOptions createSpannerOptions(InetSocketAddress address, Server server) { + String endpoint = address.getHostString() + ":" + server.getPort(); + return SpannerOptions.newBuilder() + .setProjectId("[PROJECT]") + // Set a custom channel configurator to allow http instead of https. + .setChannelConfigurator( + input -> { + input.usePlaintext(); + return input; + }) + .setHost("http://" + endpoint) + // Set static credentials that will return the static OAuth test token. + .setCredentials(STATIC_CREDENTIALS) + // Also set a CallCredentialsProvider. These credentials should take precedence above + // the static credentials. + .setCallCredentialsProvider(() -> MoreCallCredentials.from(VARIABLE_CREDENTIALS)) + .build(); + } + + private long getAggregationValueAsLong(AggregationData aggregationData) { + return aggregationData.match( + new io.opencensus.common.Function() { + @Override + public Long apply(AggregationData.SumDataDouble arg) { + return (long) arg.getSum(); + } + }, + new io.opencensus.common.Function() { + @Override + public Long apply(AggregationData.SumDataLong arg) { + return arg.getSum(); + } + }, + new io.opencensus.common.Function() { + @Override + public Long apply(AggregationData.CountData arg) { + return arg.getCount(); + } + }, + new io.opencensus.common.Function() { + @Override + public Long apply(AggregationData.DistributionData arg) { + return (long) arg.getMean(); + } + }, + new io.opencensus.common.Function() { + @Override + public Long apply(AggregationData.LastValueDataDouble arg) { + return (long) arg.getLastValue(); + } + }, + new io.opencensus.common.Function() { + @Override + public Long apply(AggregationData.LastValueDataLong arg) { + return arg.getLastValue(); + } + }, + new io.opencensus.common.Function() { + @Override + public Long apply(AggregationData arg) { + throw new UnsupportedOperationException(); + } + }); + } + + private long getMetric(View view, String method) throws InterruptedException { + List tagValues = new java.util.ArrayList<>(); + for (TagKey column : view.getColumns()) { + if (column == SpannerRpcViews.INSTANCE_ID) { + tagValues.add(TagValue.create(instanceId)); + } else if (column == SpannerRpcViews.DATABASE_ID) { + tagValues.add(TagValue.create(databaseId)); + } else if (column == SpannerRpcViews.METHOD) { + tagValues.add(TagValue.create(method)); + } else if (column == SpannerRpcViews.PROJECT_ID) { + tagValues.add(TagValue.create(projectId)); + } + } + for (int i = 0; i < MAXIMUM_RETRIES; i++) { + Thread.sleep(WAIT_FOR_METRICS_TIME_MS); + ViewData viewData = SpannerRpcViews.viewManager.getView(view.getName()); + if (viewData.getAggregationMap() != null) { + Map, AggregationData> aggregationMap = viewData.getAggregationMap(); + AggregationData aggregationData = aggregationMap.get(tagValues); + return getAggregationValueAsLong(aggregationData); + } + } + return -1; + } +} From d773f7153ee7c255bc757663c0055c9d11b1135f Mon Sep 17 00:00:00 2001 From: Kiranmayi Bhamidimarri Date: Wed, 29 Sep 2021 18:17:21 +0530 Subject: [PATCH 2/8] feat: GFE Latency --- .../com/google/cloud/spanner/spi/v1/HeaderInterceptor.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/HeaderInterceptor.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/HeaderInterceptor.java index 834a769b0a..a25880ab98 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/HeaderInterceptor.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/HeaderInterceptor.java @@ -18,7 +18,6 @@ import static com.google.cloud.spanner.spi.v1.SpannerRpcViews.*; import com.google.cloud.spanner.DatabaseId; -import com.google.spanner.admin.instance.v1.ProjectName; import io.grpc.*; import io.grpc.ForwardingClientCall.SimpleForwardingClientCall; import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener; @@ -115,13 +114,13 @@ private TagContext getTagContext(String method) { private TagContext getTagContext(Metadata headers, String method) { if (headers.get(GOOGLE_CLOUD_RESOURCE_PREFIX_KEY) != null) { String googleResourcePrefix = headers.get(GOOGLE_CLOUD_RESOURCE_PREFIX_KEY); - try{ + try { DatabaseId database = DatabaseId.of(googleResourcePrefix); String databaseId = database.getDatabase(); String instanceId = database.getInstanceId().getInstance(); String projectId = database.getInstanceId().getProject(); return getTagContext(method, projectId, instanceId, databaseId); - }catch(IllegalArgumentException e){ + } catch (IllegalArgumentException e) { logger.log(level, e.toString()); } } From ee3e9da6b243735264c57ca2e494176c5b0bb0a5 Mon Sep 17 00:00:00 2001 From: Kiranmayi Bhamidimarri Date: Thu, 30 Sep 2021 16:31:15 +0530 Subject: [PATCH 3/8] Busy waiting for header missing count --- .../cloud/spanner/spi/v1/GfeLatencyTest.java | 40 +++++++++++++++---- 1 file changed, 33 insertions(+), 7 deletions(-) diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GfeLatencyTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GfeLatencyTest.java index 3f05c55400..c12723277a 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GfeLatencyTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GfeLatencyTest.java @@ -234,13 +234,11 @@ public void testGfeMissingHeaderCountExecuteStreamingSql() throws InterruptedExc "google.spanner.v1.Spanner/ExecuteStreamingSql"); assertEquals(0, count); - Thread.sleep(WAIT_FOR_METRICS_TIME_MS); try (ResultSet rs = databaseClientNoHeader.singleUse().executeQuery(SELECT1AND2)) { rs.next(); } - Thread.sleep(WAIT_FOR_METRICS_TIME_MS); long count1 = - getMetric( + getOverriddenHeaderMissingCount( SpannerRpcViews.SPANNER_GFE_HEADER_MISSING_COUNT_VIEW, "google.spanner.v1.Spanner/ExecuteStreamingSql"); assertEquals(1, count1); @@ -257,13 +255,11 @@ public void testGfeMissingHeaderExecuteSql() throws InterruptedException { "google.spanner.v1.Spanner/ExecuteSql"); assertEquals(0, count); - Thread.sleep(WAIT_FOR_METRICS_TIME_MS); databaseClientNoHeader .readWriteTransaction() .run(transaction -> transaction.executeUpdate(UPDATE_FOO_STATEMENT)); - Thread.sleep(WAIT_FOR_METRICS_TIME_MS); long count1 = - getMetric( + getOverriddenHeaderMissingCount( SpannerRpcViews.SPANNER_GFE_HEADER_MISSING_COUNT_VIEW, "google.spanner.v1.Spanner/ExecuteSql"); assertEquals(1, count1); @@ -348,14 +344,44 @@ private long getMetric(View view, String method) throws InterruptedException { } } for (int i = 0; i < MAXIMUM_RETRIES; i++) { - Thread.sleep(WAIT_FOR_METRICS_TIME_MS); ViewData viewData = SpannerRpcViews.viewManager.getView(view.getName()); if (viewData.getAggregationMap() != null) { Map, AggregationData> aggregationMap = viewData.getAggregationMap(); AggregationData aggregationData = aggregationMap.get(tagValues); return getAggregationValueAsLong(aggregationData); } + Thread.sleep(WAIT_FOR_METRICS_TIME_MS); } return -1; } + + private long getOverriddenHeaderMissingCount(View view, String method) + throws InterruptedException { + List tagValues = new java.util.ArrayList<>(); + for (TagKey column : view.getColumns()) { + if (column == SpannerRpcViews.INSTANCE_ID) { + tagValues.add(TagValue.create(instanceId)); + } else if (column == SpannerRpcViews.DATABASE_ID) { + tagValues.add(TagValue.create(databaseId)); + } else if (column == SpannerRpcViews.METHOD) { + tagValues.add(TagValue.create(method)); + } else if (column == SpannerRpcViews.PROJECT_ID) { + tagValues.add(TagValue.create(projectId)); + } + } + for (int i = 0; i < MAXIMUM_RETRIES; i++) { + Thread.sleep(WAIT_FOR_METRICS_TIME_MS); + ViewData viewData = SpannerRpcViews.viewManager.getView(view.getName()); + if (viewData.getAggregationMap() != null) { + Map, AggregationData> aggregationMap = viewData.getAggregationMap(); + AggregationData aggregationData = aggregationMap.get(tagValues); + long count = getAggregationValueAsLong(aggregationData); + if (count == 0) { + continue; + } + return count; + } + } + return 0; + } } From f166a5211db529775337becdf8662c2b9aac6bfb Mon Sep 17 00:00:00 2001 From: Kiranmayi Bhamidimarri Date: Thu, 30 Sep 2021 17:05:05 +0530 Subject: [PATCH 4/8] Thread sleep before checking view Data --- .../google/cloud/spanner/spi/v1/GfeLatencyTest.java | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GfeLatencyTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GfeLatencyTest.java index c12723277a..6920412ffe 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GfeLatencyTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GfeLatencyTest.java @@ -238,7 +238,7 @@ public void testGfeMissingHeaderCountExecuteStreamingSql() throws InterruptedExc rs.next(); } long count1 = - getOverriddenHeaderMissingCount( + getOverriddenHeaderMissingCount( SpannerRpcViews.SPANNER_GFE_HEADER_MISSING_COUNT_VIEW, "google.spanner.v1.Spanner/ExecuteStreamingSql"); assertEquals(1, count1); @@ -259,7 +259,7 @@ public void testGfeMissingHeaderExecuteSql() throws InterruptedException { .readWriteTransaction() .run(transaction -> transaction.executeUpdate(UPDATE_FOO_STATEMENT)); long count1 = - getOverriddenHeaderMissingCount( + getOverriddenHeaderMissingCount( SpannerRpcViews.SPANNER_GFE_HEADER_MISSING_COUNT_VIEW, "google.spanner.v1.Spanner/ExecuteSql"); assertEquals(1, count1); @@ -344,19 +344,18 @@ private long getMetric(View view, String method) throws InterruptedException { } } for (int i = 0; i < MAXIMUM_RETRIES; i++) { + Thread.sleep(WAIT_FOR_METRICS_TIME_MS); ViewData viewData = SpannerRpcViews.viewManager.getView(view.getName()); if (viewData.getAggregationMap() != null) { Map, AggregationData> aggregationMap = viewData.getAggregationMap(); AggregationData aggregationData = aggregationMap.get(tagValues); return getAggregationValueAsLong(aggregationData); } - Thread.sleep(WAIT_FOR_METRICS_TIME_MS); } return -1; } - private long getOverriddenHeaderMissingCount(View view, String method) - throws InterruptedException { + private long getOverriddenHeaderMissingCount(View view, String method) throws InterruptedException { List tagValues = new java.util.ArrayList<>(); for (TagKey column : view.getColumns()) { if (column == SpannerRpcViews.INSTANCE_ID) { @@ -375,8 +374,8 @@ private long getOverriddenHeaderMissingCount(View view, String method) if (viewData.getAggregationMap() != null) { Map, AggregationData> aggregationMap = viewData.getAggregationMap(); AggregationData aggregationData = aggregationMap.get(tagValues); - long count = getAggregationValueAsLong(aggregationData); - if (count == 0) { + long count = getAggregationValueAsLong(aggregationData); + if(count == 0){ continue; } return count; From de6cc25b757bd6b87c91181fed529902d0acf07c Mon Sep 17 00:00:00 2001 From: Kiranmayi Bhamidimarri Date: Thu, 30 Sep 2021 17:16:29 +0530 Subject: [PATCH 5/8] lint errors and changing method signatures in unit test --- .../cloud/spanner/spi/v1/SpannerRpcViews.java | 6 +++ .../cloud/spanner/spi/v1/GfeLatencyTest.java | 44 ++++++++++++++----- 2 files changed, 40 insertions(+), 10 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpcViews.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpcViews.java index 3cd8867c49..8c112eae1d 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpcViews.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpcViews.java @@ -95,6 +95,12 @@ public static void registerGfeLatencyAndHeaderMissingCountViews() { viewManager.registerView(SPANNER_GFE_HEADER_MISSING_COUNT_VIEW); } + @VisibleForTesting + public static void registerGfeLatencyAndHeaderMissingCountViews(ViewManager viewManager) { + viewManager.registerView(SPANNER_GFE_LATENCY_VIEW); + viewManager.registerView(SPANNER_GFE_HEADER_MISSING_COUNT_VIEW); + } + /** * Register GFE Latency view. gfe_latency measures the latency between Google's network receives * an RPC and reads back the first byte of the response. diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GfeLatencyTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GfeLatencyTest.java index 6920412ffe..3cc53de232 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GfeLatencyTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GfeLatencyTest.java @@ -208,6 +208,9 @@ public void testGfeLatencyExecuteStreamingSql() throws InterruptedException { long latency = getMetric( SpannerRpcViews.SPANNER_GFE_LATENCY_VIEW, + projectId, + instanceId, + databaseId, "google.spanner.v1.Spanner/ExecuteStreamingSql"); assertEquals(fakeServerTiming.get(), latency); } @@ -219,7 +222,12 @@ public void testGfeLatencyExecuteSql() throws InterruptedException { .run(transaction -> transaction.executeUpdate(UPDATE_FOO_STATEMENT)); long latency = - getMetric(SpannerRpcViews.SPANNER_GFE_LATENCY_VIEW, "google.spanner.v1.Spanner/ExecuteSql"); + getMetric( + SpannerRpcViews.SPANNER_GFE_LATENCY_VIEW, + projectId, + instanceId, + databaseId, + "google.spanner.v1.Spanner/ExecuteSql"); assertEquals(fakeServerTiming.get(), latency); } @@ -231,6 +239,9 @@ public void testGfeMissingHeaderCountExecuteStreamingSql() throws InterruptedExc long count = getMetric( SpannerRpcViews.SPANNER_GFE_HEADER_MISSING_COUNT_VIEW, + projectId, + instanceId, + databaseId, "google.spanner.v1.Spanner/ExecuteStreamingSql"); assertEquals(0, count); @@ -238,8 +249,11 @@ public void testGfeMissingHeaderCountExecuteStreamingSql() throws InterruptedExc rs.next(); } long count1 = - getOverriddenHeaderMissingCount( + getOverriddenHeaderMissingCount( SpannerRpcViews.SPANNER_GFE_HEADER_MISSING_COUNT_VIEW, + projectId, + instanceId, + databaseId, "google.spanner.v1.Spanner/ExecuteStreamingSql"); assertEquals(1, count1); } @@ -252,6 +266,9 @@ public void testGfeMissingHeaderExecuteSql() throws InterruptedException { long count = getMetric( SpannerRpcViews.SPANNER_GFE_HEADER_MISSING_COUNT_VIEW, + projectId, + instanceId, + databaseId, "google.spanner.v1.Spanner/ExecuteSql"); assertEquals(0, count); @@ -259,8 +276,11 @@ public void testGfeMissingHeaderExecuteSql() throws InterruptedException { .readWriteTransaction() .run(transaction -> transaction.executeUpdate(UPDATE_FOO_STATEMENT)); long count1 = - getOverriddenHeaderMissingCount( + getOverriddenHeaderMissingCount( SpannerRpcViews.SPANNER_GFE_HEADER_MISSING_COUNT_VIEW, + projectId, + instanceId, + databaseId, "google.spanner.v1.Spanner/ExecuteSql"); assertEquals(1, count1); } @@ -330,7 +350,9 @@ public Long apply(AggregationData arg) { }); } - private long getMetric(View view, String method) throws InterruptedException { + private long getMetric( + View view, String projectId, String instanceId, String databaseId, String method) + throws InterruptedException { List tagValues = new java.util.ArrayList<>(); for (TagKey column : view.getColumns()) { if (column == SpannerRpcViews.INSTANCE_ID) { @@ -355,17 +377,19 @@ private long getMetric(View view, String method) throws InterruptedException { return -1; } - private long getOverriddenHeaderMissingCount(View view, String method) throws InterruptedException { + private long getOverriddenHeaderMissingCount( + View view, String instanceId, String projectId, String databaseId, String method) + throws InterruptedException { List tagValues = new java.util.ArrayList<>(); for (TagKey column : view.getColumns()) { if (column == SpannerRpcViews.INSTANCE_ID) { - tagValues.add(TagValue.create(instanceId)); + tagValues.add(TagValue.create(GfeLatencyTest.instanceId)); } else if (column == SpannerRpcViews.DATABASE_ID) { - tagValues.add(TagValue.create(databaseId)); + tagValues.add(TagValue.create(GfeLatencyTest.databaseId)); } else if (column == SpannerRpcViews.METHOD) { tagValues.add(TagValue.create(method)); } else if (column == SpannerRpcViews.PROJECT_ID) { - tagValues.add(TagValue.create(projectId)); + tagValues.add(TagValue.create(GfeLatencyTest.projectId)); } } for (int i = 0; i < MAXIMUM_RETRIES; i++) { @@ -374,8 +398,8 @@ private long getOverriddenHeaderMissingCount(View view, String method) throws In if (viewData.getAggregationMap() != null) { Map, AggregationData> aggregationMap = viewData.getAggregationMap(); AggregationData aggregationData = aggregationMap.get(tagValues); - long count = getAggregationValueAsLong(aggregationData); - if(count == 0){ + long count = getAggregationValueAsLong(aggregationData); + if (count == 0) { continue; } return count; From b112c322e7f75eae1554123d007f64dff801d957 Mon Sep 17 00:00:00 2001 From: Kiranmayi Bhamidimarri Date: Tue, 5 Oct 2021 12:34:11 +0530 Subject: [PATCH 6/8] lint changes --- .../spanner/spi/v1/HeaderInterceptor.java | 47 ++++++---- .../cloud/spanner/spi/v1/SpannerRpcViews.java | 13 ++- .../cloud/spanner/spi/v1/GfeLatencyTest.java | 86 +++++++++---------- 3 files changed, 77 insertions(+), 69 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/HeaderInterceptor.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/HeaderInterceptor.java index a25880ab98..07514cda4f 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/HeaderInterceptor.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/HeaderInterceptor.java @@ -15,14 +15,29 @@ */ package com.google.cloud.spanner.spi.v1; -import static com.google.cloud.spanner.spi.v1.SpannerRpcViews.*; +import static com.google.cloud.spanner.spi.v1.SpannerRpcViews.DATABASE_ID; +import static com.google.cloud.spanner.spi.v1.SpannerRpcViews.INSTANCE_ID; +import static com.google.cloud.spanner.spi.v1.SpannerRpcViews.METHOD; +import static com.google.cloud.spanner.spi.v1.SpannerRpcViews.PROJECT_ID; +import static com.google.cloud.spanner.spi.v1.SpannerRpcViews.SPANNER_GFE_HEADER_MISSING_COUNT; +import static com.google.cloud.spanner.spi.v1.SpannerRpcViews.SPANNER_GFE_LATENCY; import com.google.cloud.spanner.DatabaseId; -import io.grpc.*; +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; import io.grpc.ForwardingClientCall.SimpleForwardingClientCall; import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener; -import io.opencensus.stats.*; -import io.opencensus.tags.*; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.opencensus.stats.MeasureMap; +import io.opencensus.stats.Stats; +import io.opencensus.stats.StatsRecorder; +import io.opencensus.tags.TagContext; +import io.opencensus.tags.TagValue; +import io.opencensus.tags.Tagger; +import io.opencensus.tags.Tags; import java.util.logging.Level; import java.util.logging.Logger; import java.util.regex.Matcher; @@ -39,15 +54,13 @@ class HeaderInterceptor implements ClientInterceptor { private static final Pattern SERVER_TIMING_HEADER_PATTERN = Pattern.compile(".*dur=(?\\d+)"); private static final Metadata.Key GOOGLE_CLOUD_RESOURCE_PREFIX_KEY = Metadata.Key.of("google-cloud-resource-prefix", Metadata.ASCII_STRING_MARSHALLER); - private static final Pattern GOOGLE_CLOUD_RESOURCE_PREFIX_PATTERN = - Pattern.compile( - ".*projects/(?\\w\\p{ASCII}+)/instances/(?\\w\\p{ASCII}+)/databases/(?\\w\\p{ASCII}+)"); // Get the global singleton Tagger object. - private static final Tagger tagger = Tags.getTagger(); + private static final Tagger TAGGER = Tags.getTagger(); + private static final StatsRecorder STATS_RECORDER = Stats.getStatsRecorder(); - private static final Logger logger = Logger.getLogger(HeaderInterceptor.class.getName()); - private static final Level level = Level.INFO; + private static final Logger LOGGER = Logger.getLogger(HeaderInterceptor.class.getName()); + private static final Level LEVEL = Level.INFO; HeaderInterceptor() {} @@ -82,7 +95,7 @@ private void processHeader(Metadata metadata, TagContext tagContext) { measureMap.put(SPANNER_GFE_LATENCY, latency).record(tagContext); measureMap.put(SPANNER_GFE_HEADER_MISSING_COUNT, 0L).record(tagContext); } catch (NumberFormatException e) { - logger.log(level, "invalid server-timing object in header"); + LOGGER.log(LEVEL, "Invalid server-timing object in header", matcher.group("dur")); } } } else { @@ -92,22 +105,22 @@ private void processHeader(Metadata metadata, TagContext tagContext) { private TagContext getTagContext( String method, String projectId, String instanceId, String databaseId) { - return tagger + return TAGGER .currentBuilder() - .putLocal(SpannerRpcViews.PROJECT_ID, TagValue.create(projectId)) + .putLocal(PROJECT_ID, TagValue.create(projectId)) .putLocal(INSTANCE_ID, TagValue.create(instanceId)) .putLocal(DATABASE_ID, TagValue.create(databaseId)) - .putLocal(SpannerRpcViews.METHOD, TagValue.create(method)) + .putLocal(METHOD, TagValue.create(method)) .build(); } private TagContext getTagContext(String method) { - return tagger + return TAGGER .currentBuilder() .putLocal(PROJECT_ID, TagValue.create("undefined-project")) .putLocal(INSTANCE_ID, TagValue.create("undefined-instance")) .putLocal(DATABASE_ID, TagValue.create("undefined-database")) - .putLocal(SpannerRpcViews.METHOD, TagValue.create(method)) + .putLocal(METHOD, TagValue.create(method)) .build(); } @@ -121,7 +134,7 @@ private TagContext getTagContext(Metadata headers, String method) { String projectId = database.getInstanceId().getProject(); return getTagContext(method, projectId, instanceId, databaseId); } catch (IllegalArgumentException e) { - logger.log(level, e.toString()); + LOGGER.log(LEVEL, "Error parsing google cloud resource header", e); } } return getTagContext(method); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpcViews.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpcViews.java index 8c112eae1d..b1b68e95d1 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpcViews.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpcViews.java @@ -17,10 +17,14 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; -import io.opencensus.stats.*; +import io.opencensus.stats.Aggregation; import io.opencensus.stats.Aggregation.Distribution; import io.opencensus.stats.Aggregation.Sum; +import io.opencensus.stats.BucketBoundaries; import io.opencensus.stats.Measure.MeasureLong; +import io.opencensus.stats.Stats; +import io.opencensus.stats.View; +import io.opencensus.stats.ViewManager; import io.opencensus.tags.TagKey; import java.util.Arrays; import java.util.Collections; @@ -41,7 +45,6 @@ public class SpannerRpcViews { public static final TagKey INSTANCE_ID = TagKey.create("instance_id"); public static final TagKey DATABASE_ID = TagKey.create("database"); - public static final StatsRecorder STATS_RECORDER = Stats.getStatsRecorder(); /** GFE t4t7 latency extracted from server-timing header. */ public static final MeasureLong SPANNER_GFE_LATENCY = MeasureLong.create( @@ -95,12 +98,6 @@ public static void registerGfeLatencyAndHeaderMissingCountViews() { viewManager.registerView(SPANNER_GFE_HEADER_MISSING_COUNT_VIEW); } - @VisibleForTesting - public static void registerGfeLatencyAndHeaderMissingCountViews(ViewManager viewManager) { - viewManager.registerView(SPANNER_GFE_LATENCY_VIEW); - viewManager.registerView(SPANNER_GFE_HEADER_MISSING_COUNT_VIEW); - } - /** * Register GFE Latency view. gfe_latency measures the latency between Google's network receives * an RPC and reads back the first byte of the response. diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GfeLatencyTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GfeLatencyTest.java index 3cc53de232..48fae49946 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GfeLatencyTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GfeLatencyTest.java @@ -21,22 +21,37 @@ import com.google.auth.oauth2.AccessToken; import com.google.auth.oauth2.OAuth2Credentials; -import com.google.cloud.spanner.*; +import com.google.cloud.spanner.DatabaseClient; +import com.google.cloud.spanner.DatabaseId; import com.google.cloud.spanner.MockSpannerServiceImpl; +import com.google.cloud.spanner.ResultSet; +import com.google.cloud.spanner.Spanner; +import com.google.cloud.spanner.SpannerOptions; +import com.google.cloud.spanner.Statement; import com.google.cloud.spanner.testing.EmulatorSpannerHelper; import com.google.protobuf.ListValue; import com.google.spanner.v1.ResultSetMetadata; import com.google.spanner.v1.StructType; import com.google.spanner.v1.TypeCode; -import io.grpc.*; +import io.grpc.ForwardingServerCall; +import io.grpc.Metadata; +import io.grpc.Server; +import io.grpc.ServerCall; +import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; import io.grpc.auth.MoreCallCredentials; import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder; -import io.opencensus.stats.*; +import io.opencensus.stats.AggregationData; +import io.opencensus.stats.View; +import io.opencensus.stats.ViewData; import io.opencensus.tags.TagKey; import io.opencensus.tags.TagValue; import java.io.IOException; import java.net.InetSocketAddress; -import java.util.*; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.junit.After; @@ -211,7 +226,8 @@ public void testGfeLatencyExecuteStreamingSql() throws InterruptedException { projectId, instanceId, databaseId, - "google.spanner.v1.Spanner/ExecuteStreamingSql"); + "google.spanner.v1.Spanner/ExecuteStreamingSql", + false); assertEquals(fakeServerTiming.get(), latency); } @@ -227,7 +243,8 @@ public void testGfeLatencyExecuteSql() throws InterruptedException { projectId, instanceId, databaseId, - "google.spanner.v1.Spanner/ExecuteSql"); + "google.spanner.v1.Spanner/ExecuteSql", + false); assertEquals(fakeServerTiming.get(), latency); } @@ -242,19 +259,21 @@ public void testGfeMissingHeaderCountExecuteStreamingSql() throws InterruptedExc projectId, instanceId, databaseId, - "google.spanner.v1.Spanner/ExecuteStreamingSql"); + "google.spanner.v1.Spanner/ExecuteStreamingSql", + false); assertEquals(0, count); try (ResultSet rs = databaseClientNoHeader.singleUse().executeQuery(SELECT1AND2)) { rs.next(); } long count1 = - getOverriddenHeaderMissingCount( + getMetric( SpannerRpcViews.SPANNER_GFE_HEADER_MISSING_COUNT_VIEW, projectId, instanceId, databaseId, - "google.spanner.v1.Spanner/ExecuteStreamingSql"); + "google.spanner.v1.Spanner/ExecuteStreamingSql", + true); assertEquals(1, count1); } @@ -269,19 +288,21 @@ public void testGfeMissingHeaderExecuteSql() throws InterruptedException { projectId, instanceId, databaseId, - "google.spanner.v1.Spanner/ExecuteSql"); + "google.spanner.v1.Spanner/ExecuteSql", + false); assertEquals(0, count); databaseClientNoHeader .readWriteTransaction() .run(transaction -> transaction.executeUpdate(UPDATE_FOO_STATEMENT)); long count1 = - getOverriddenHeaderMissingCount( + getMetric( SpannerRpcViews.SPANNER_GFE_HEADER_MISSING_COUNT_VIEW, projectId, instanceId, databaseId, - "google.spanner.v1.Spanner/ExecuteSql"); + "google.spanner.v1.Spanner/ExecuteSql", + true); assertEquals(1, count1); } @@ -351,7 +372,12 @@ public Long apply(AggregationData arg) { } private long getMetric( - View view, String projectId, String instanceId, String databaseId, String method) + View view, + String projectId, + String instanceId, + String databaseId, + String method, + boolean withOverride) throws InterruptedException { List tagValues = new java.util.ArrayList<>(); for (TagKey column : view.getColumns()) { @@ -371,40 +397,12 @@ private long getMetric( if (viewData.getAggregationMap() != null) { Map, AggregationData> aggregationMap = viewData.getAggregationMap(); AggregationData aggregationData = aggregationMap.get(tagValues); - return getAggregationValueAsLong(aggregationData); - } - } - return -1; - } - - private long getOverriddenHeaderMissingCount( - View view, String instanceId, String projectId, String databaseId, String method) - throws InterruptedException { - List tagValues = new java.util.ArrayList<>(); - for (TagKey column : view.getColumns()) { - if (column == SpannerRpcViews.INSTANCE_ID) { - tagValues.add(TagValue.create(GfeLatencyTest.instanceId)); - } else if (column == SpannerRpcViews.DATABASE_ID) { - tagValues.add(TagValue.create(GfeLatencyTest.databaseId)); - } else if (column == SpannerRpcViews.METHOD) { - tagValues.add(TagValue.create(method)); - } else if (column == SpannerRpcViews.PROJECT_ID) { - tagValues.add(TagValue.create(GfeLatencyTest.projectId)); - } - } - for (int i = 0; i < MAXIMUM_RETRIES; i++) { - Thread.sleep(WAIT_FOR_METRICS_TIME_MS); - ViewData viewData = SpannerRpcViews.viewManager.getView(view.getName()); - if (viewData.getAggregationMap() != null) { - Map, AggregationData> aggregationMap = viewData.getAggregationMap(); - AggregationData aggregationData = aggregationMap.get(tagValues); - long count = getAggregationValueAsLong(aggregationData); - if (count == 0) { + if (withOverride && getAggregationValueAsLong(aggregationData) == 0) { continue; } - return count; + return getAggregationValueAsLong(aggregationData); } } - return 0; + return -1; } } From 412855dd96962dec5fb23cbc0638831144169253 Mon Sep 17 00:00:00 2001 From: Kiranmayi Bhamidimarri Date: Wed, 6 Oct 2021 11:17:42 +0530 Subject: [PATCH 7/8] Adding pattern for Admin calls --- .../spanner/spi/v1/HeaderInterceptor.java | 32 +++++++++++++------ 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/HeaderInterceptor.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/HeaderInterceptor.java index 07514cda4f..51af36c952 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/HeaderInterceptor.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/HeaderInterceptor.java @@ -22,7 +22,6 @@ import static com.google.cloud.spanner.spi.v1.SpannerRpcViews.SPANNER_GFE_HEADER_MISSING_COUNT; import static com.google.cloud.spanner.spi.v1.SpannerRpcViews.SPANNER_GFE_LATENCY; -import com.google.cloud.spanner.DatabaseId; import io.grpc.CallOptions; import io.grpc.Channel; import io.grpc.ClientCall; @@ -54,6 +53,12 @@ class HeaderInterceptor implements ClientInterceptor { private static final Pattern SERVER_TIMING_HEADER_PATTERN = Pattern.compile(".*dur=(?\\d+)"); private static final Metadata.Key GOOGLE_CLOUD_RESOURCE_PREFIX_KEY = Metadata.Key.of("google-cloud-resource-prefix", Metadata.ASCII_STRING_MARSHALLER); + private static final Pattern GOOGLE_CLOUD_RESOURCE_PREFIX_PATTERN = + Pattern.compile( + ".*projects/(?\\w\\p{ASCII}+)/instances/(?\\w\\p{ASCII}+)/databases/(?\\w\\p{ASCII}+)"); + private static final Pattern GOOGLE_CLOUD_RESOURCE_PREFIX_ADMIN_PATTERN = + Pattern.compile( + ".*projects/(?\\w\\p{ASCII}+)/instances/(?\\w\\p{ASCII}+)"); // Get the global singleton Tagger object. private static final Tagger TAGGER = Tags.getTagger(); @@ -75,6 +80,7 @@ public void start(Listener responseListener, Metadata headers) { new SimpleForwardingClientCallListener(responseListener) { @Override public void onHeaders(Metadata metadata) { + processHeader(metadata, tagContext); super.onHeaders(metadata); } @@ -125,18 +131,24 @@ private TagContext getTagContext(String method) { } private TagContext getTagContext(Metadata headers, String method) { + String projectId = "undefined-project"; + String instanceId = "undefined-database"; + String databaseId = "undefined-database"; if (headers.get(GOOGLE_CLOUD_RESOURCE_PREFIX_KEY) != null) { String googleResourcePrefix = headers.get(GOOGLE_CLOUD_RESOURCE_PREFIX_KEY); - try { - DatabaseId database = DatabaseId.of(googleResourcePrefix); - String databaseId = database.getDatabase(); - String instanceId = database.getInstanceId().getInstance(); - String projectId = database.getInstanceId().getProject(); - return getTagContext(method, projectId, instanceId, databaseId); - } catch (IllegalArgumentException e) { - LOGGER.log(LEVEL, "Error parsing google cloud resource header", e); + Matcher matcher = GOOGLE_CLOUD_RESOURCE_PREFIX_PATTERN.matcher(googleResourcePrefix); + Matcher matcher2 = GOOGLE_CLOUD_RESOURCE_PREFIX_ADMIN_PATTERN.matcher(googleResourcePrefix); + if (matcher.find()) { + projectId = matcher.group("project"); + instanceId = matcher.group("instance"); + databaseId = matcher.group("database"); + } else if (matcher2.find()) { + projectId = matcher2.group("project"); + instanceId = matcher2.group("instance"); + } else { + LOGGER.log(LEVEL, "Error parsing google cloud resource header", googleResourcePrefix); } } - return getTagContext(method); + return getTagContext(method, projectId, instanceId, databaseId); } } From eb4c17c08e58edbbd6667b5a17ced12e9072d97f Mon Sep 17 00:00:00 2001 From: Kiranmayi Bhamidimarri Date: Wed, 6 Oct 2021 19:43:33 +0530 Subject: [PATCH 8/8] Correcting regex pattern --- .../spanner/spi/v1/HeaderInterceptor.java | 29 +++++-------------- 1 file changed, 8 insertions(+), 21 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/HeaderInterceptor.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/HeaderInterceptor.java index 51af36c952..3f4372db84 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/HeaderInterceptor.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/HeaderInterceptor.java @@ -55,10 +55,7 @@ class HeaderInterceptor implements ClientInterceptor { Metadata.Key.of("google-cloud-resource-prefix", Metadata.ASCII_STRING_MARSHALLER); private static final Pattern GOOGLE_CLOUD_RESOURCE_PREFIX_PATTERN = Pattern.compile( - ".*projects/(?\\w\\p{ASCII}+)/instances/(?\\w\\p{ASCII}+)/databases/(?\\w\\p{ASCII}+)"); - private static final Pattern GOOGLE_CLOUD_RESOURCE_PREFIX_ADMIN_PATTERN = - Pattern.compile( - ".*projects/(?\\w\\p{ASCII}+)/instances/(?\\w\\p{ASCII}+)"); + ".*projects/(?\\p{ASCII}[^/]*)(/instances/(?\\p{ASCII}[^/]*))?(/databases/(?\\p{ASCII}[^/]*))?"); // Get the global singleton Tagger object. private static final Tagger TAGGER = Tags.getTagger(); @@ -120,16 +117,6 @@ private TagContext getTagContext( .build(); } - private TagContext getTagContext(String method) { - return TAGGER - .currentBuilder() - .putLocal(PROJECT_ID, TagValue.create("undefined-project")) - .putLocal(INSTANCE_ID, TagValue.create("undefined-instance")) - .putLocal(DATABASE_ID, TagValue.create("undefined-database")) - .putLocal(METHOD, TagValue.create(method)) - .build(); - } - private TagContext getTagContext(Metadata headers, String method) { String projectId = "undefined-project"; String instanceId = "undefined-database"; @@ -137,16 +124,16 @@ private TagContext getTagContext(Metadata headers, String method) { if (headers.get(GOOGLE_CLOUD_RESOURCE_PREFIX_KEY) != null) { String googleResourcePrefix = headers.get(GOOGLE_CLOUD_RESOURCE_PREFIX_KEY); Matcher matcher = GOOGLE_CLOUD_RESOURCE_PREFIX_PATTERN.matcher(googleResourcePrefix); - Matcher matcher2 = GOOGLE_CLOUD_RESOURCE_PREFIX_ADMIN_PATTERN.matcher(googleResourcePrefix); if (matcher.find()) { projectId = matcher.group("project"); - instanceId = matcher.group("instance"); - databaseId = matcher.group("database"); - } else if (matcher2.find()) { - projectId = matcher2.group("project"); - instanceId = matcher2.group("instance"); + if (matcher.group("instance") != null) { + instanceId = matcher.group("instance"); + } + if (matcher.group("database") != null) { + databaseId = matcher.group("database"); + } } else { - LOGGER.log(LEVEL, "Error parsing google cloud resource header", googleResourcePrefix); + LOGGER.log(LEVEL, "Error parsing google cloud resource header: " + googleResourcePrefix); } } return getTagContext(method, projectId, instanceId, databaseId);