From 88bcbe31d04ce3fceeebf3bbde0eb9984c5487ea Mon Sep 17 00:00:00 2001 From: Allen Pradeep Xavier Date: Fri, 12 Feb 2021 18:55:27 -0800 Subject: [PATCH 1/5] [BEAM-11805] Replace user-agent for spanner --- .../sdk/io/gcp/spanner/SpannerAccessor.java | 92 ++++++++++++++++++- 1 file changed, 89 insertions(+), 3 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java index 0a7f3ab07bba4..fcdf14ad2c521 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java @@ -17,8 +17,10 @@ */ package org.apache.beam.sdk.io.gcp.spanner; +import com.google.api.gax.core.ExecutorProvider; +import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; import com.google.api.gax.retrying.RetrySettings; -import com.google.api.gax.rpc.FixedHeaderProvider; +import com.google.api.gax.rpc.HeaderProvider; import com.google.api.gax.rpc.ServerStreamingCallSettings; import com.google.api.gax.rpc.UnaryCallSettings; import com.google.cloud.NoCredentials; @@ -29,6 +31,8 @@ import com.google.cloud.spanner.DatabaseId; import com.google.cloud.spanner.Spanner; import com.google.cloud.spanner.SpannerOptions; +import com.google.cloud.spanner.spi.v1.SpannerInterceptorProvider; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.spanner.v1.CommitRequest; import com.google.spanner.v1.CommitResponse; import com.google.spanner.v1.ExecuteSqlRequest; @@ -38,7 +42,14 @@ import io.grpc.ClientCall; import io.grpc.ClientInterceptor; import io.grpc.MethodDescriptor; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.beam.sdk.options.ValueProvider; @@ -54,7 +65,10 @@ class SpannerAccessor implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(SpannerAccessor.class); - // A common user agent token that indicates that this request was originated from Apache Beam. + /* A common user agent token that indicates that this request was originated from + * Apache Beam. Setting the user-agent allows Cloud Spanner to detect that the + * workload is coming from Dataflow and to potentially apply performance optimizations + */ private static final String USER_AGENT_PREFIX = "Apache_Beam_Java"; // Only create one SpannerAccessor for each different SpannerConfig. @@ -72,6 +86,12 @@ class SpannerAccessor implements AutoCloseable { private final DatabaseAdminClient databaseAdminClient; private final SpannerConfig spannerConfig; + private static final int MAX_MESSAGE_SIZE = 100 * 1024 * 1024; + private static final int MAX_METADATA_SIZE = 32 * 1024; // bytes + private static final int NUM_CHANNELS = 4; + public static final org.threeten.bp.Duration GRPC_KEEP_ALIVE_SECONDS = + org.threeten.bp.Duration.ofSeconds(120); + private SpannerAccessor( Spanner spanner, DatabaseClient databaseClient, @@ -139,6 +159,23 @@ private static SpannerAccessor createAndConnect(SpannerConfig spannerConfig) { .setTotalTimeout(org.threeten.bp.Duration.ofMinutes(120)) .build()); + ManagedInstantiatingExecutorProvider executorProvider = + new ManagedInstantiatingExecutorProvider( + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("Cloud-Spanner-TransportChannel-%d") + .build()); + + InstantiatingGrpcChannelProvider.Builder instantiatingGrpcChannelProvider = + InstantiatingGrpcChannelProvider.newBuilder() + .setMaxInboundMessageSize(MAX_MESSAGE_SIZE) + .setMaxInboundMetadataSize(MAX_METADATA_SIZE) + .setPoolSize(NUM_CHANNELS) + .setExecutorProvider(executorProvider) + .setKeepAliveTime(GRPC_KEEP_ALIVE_SECONDS) + .setInterceptorProvider(SpannerInterceptorProvider.createDefault()) + .setAttemptDirectPath(true); + ValueProvider projectId = spannerConfig.getProjectId(); if (projectId != null) { builder.setProjectId(projectId.get()); @@ -150,14 +187,35 @@ private static SpannerAccessor createAndConnect(SpannerConfig spannerConfig) { ValueProvider host = spannerConfig.getHost(); if (host != null) { builder.setHost(host.get()); + instantiatingGrpcChannelProvider.setEndpoint(host.get()); } ValueProvider emulatorHost = spannerConfig.getEmulatorHost(); if (emulatorHost != null) { builder.setEmulatorHost(emulatorHost.get()); builder.setCredentials(NoCredentials.getInstance()); } + String userAgentString = USER_AGENT_PREFIX + "/" + ReleaseInfo.getReleaseInfo().getVersion(); - builder.setHeaderProvider(FixedHeaderProvider.create("user-agent", userAgentString)); + /* Workaround to setup user-agent string. + * InstantiatingGrpcChannelProvider will override the settings provided. + * The section below and all associated artifacts will be removed once the bug + * that prevents setting user-agent is fixed. + * https://github.com/googleapis/java-spanner/pull/747 + * + * Code to be replaced: + * builder.setHeaderProvider(FixedHeaderProvider.create("user-agent", userAgentString)); + */ + instantiatingGrpcChannelProvider.setHeaderProvider( + new HeaderProvider() { + @Override + public Map getHeaders() { + final Map headers = new HashMap<>(); + headers.put("user-agent", userAgentString); + return headers; + } + }); + builder.setChannelProvider(instantiatingGrpcChannelProvider.build()); + SpannerOptions options = builder.build(); Spanner spanner = options.getService(); @@ -221,4 +279,32 @@ public ClientCall interceptCall( return next.newCall(method, callOptions); } } + + private static final class ManagedInstantiatingExecutorProvider implements ExecutorProvider { + // 4 Gapic clients * 4 channels per client. + private static final int DEFAULT_MIN_THREAD_COUNT = 16; + private final List executors = new ArrayList<>(); + private final ThreadFactory threadFactory; + + private ManagedInstantiatingExecutorProvider(ThreadFactory threadFactory) { + this.threadFactory = threadFactory; + } + + @Override + public boolean shouldAutoClose() { + return false; + } + + @Override + public ScheduledExecutorService getExecutor() { + int numCpus = Runtime.getRuntime().availableProcessors(); + int numThreads = Math.max(DEFAULT_MIN_THREAD_COUNT, numCpus); + ScheduledExecutorService executor = + new ScheduledThreadPoolExecutor(numThreads, threadFactory); + synchronized (this) { + executors.add(executor); + } + return executor; + } + } } From 5fb9bebeda3d7874d835c09c49f500b4113cbe4c Mon Sep 17 00:00:00 2001 From: Allen Pradeep Xavier Date: Sun, 14 Feb 2021 16:22:19 -0800 Subject: [PATCH 2/5] [BEAM-11805] Fix checkstyle and emulator issues --- .../sdk/io/gcp/spanner/SpannerAccessor.java | 45 +++++++++---------- 1 file changed, 22 insertions(+), 23 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java index fcdf14ad2c521..2ccf48039f89b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java @@ -32,7 +32,6 @@ import com.google.cloud.spanner.Spanner; import com.google.cloud.spanner.SpannerOptions; import com.google.cloud.spanner.spi.v1.SpannerInterceptorProvider; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.spanner.v1.CommitRequest; import com.google.spanner.v1.CommitResponse; import com.google.spanner.v1.ExecuteSqlRequest; @@ -54,6 +53,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.util.ReleaseInfo; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.joda.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -193,29 +193,28 @@ private static SpannerAccessor createAndConnect(SpannerConfig spannerConfig) { if (emulatorHost != null) { builder.setEmulatorHost(emulatorHost.get()); builder.setCredentials(NoCredentials.getInstance()); + } else { + String userAgentString = USER_AGENT_PREFIX + "/" + ReleaseInfo.getReleaseInfo().getVersion(); + /* Workaround to setup user-agent string. + * InstantiatingGrpcChannelProvider will override the settings provided. + * The section below and all associated artifacts will be removed once the bug + * that prevents setting user-agent is fixed. + * https://github.com/googleapis/java-spanner/pull/747 + * + * Code to be replaced: + * builder.setHeaderProvider(FixedHeaderProvider.create("user-agent", userAgentString)); + */ + instantiatingGrpcChannelProvider.setHeaderProvider( + new HeaderProvider() { + @Override + public Map getHeaders() { + final Map headers = new HashMap<>(); + headers.put("user-agent", userAgentString); + return headers; + } + }); + builder.setChannelProvider(instantiatingGrpcChannelProvider.build()); } - - String userAgentString = USER_AGENT_PREFIX + "/" + ReleaseInfo.getReleaseInfo().getVersion(); - /* Workaround to setup user-agent string. - * InstantiatingGrpcChannelProvider will override the settings provided. - * The section below and all associated artifacts will be removed once the bug - * that prevents setting user-agent is fixed. - * https://github.com/googleapis/java-spanner/pull/747 - * - * Code to be replaced: - * builder.setHeaderProvider(FixedHeaderProvider.create("user-agent", userAgentString)); - */ - instantiatingGrpcChannelProvider.setHeaderProvider( - new HeaderProvider() { - @Override - public Map getHeaders() { - final Map headers = new HashMap<>(); - headers.put("user-agent", userAgentString); - return headers; - } - }); - builder.setChannelProvider(instantiatingGrpcChannelProvider.build()); - SpannerOptions options = builder.build(); Spanner spanner = options.getService(); From 4d6c0aba1040a25bf733ae474ae93d239ce5ed4c Mon Sep 17 00:00:00 2001 From: Allen Pradeep Xavier Date: Tue, 16 Feb 2021 17:39:51 -0800 Subject: [PATCH 3/5] [BEAM-11805] Fix spanner test issues --- .../sdk/io/gcp/spanner/SpannerAccessor.java | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java index 2ccf48039f89b..e97e38ef472cd 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java @@ -41,6 +41,8 @@ import io.grpc.ClientCall; import io.grpc.ClientInterceptor; import io.grpc.MethodDescriptor; +import java.net.MalformedURLException; +import java.net.URL; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -187,7 +189,7 @@ private static SpannerAccessor createAndConnect(SpannerConfig spannerConfig) { ValueProvider host = spannerConfig.getHost(); if (host != null) { builder.setHost(host.get()); - instantiatingGrpcChannelProvider.setEndpoint(host.get()); + instantiatingGrpcChannelProvider.setEndpoint(getEndpoint(host.get())); } ValueProvider emulatorHost = spannerConfig.getEmulatorHost(); if (emulatorHost != null) { @@ -199,7 +201,7 @@ private static SpannerAccessor createAndConnect(SpannerConfig spannerConfig) { * InstantiatingGrpcChannelProvider will override the settings provided. * The section below and all associated artifacts will be removed once the bug * that prevents setting user-agent is fixed. - * https://github.com/googleapis/java-spanner/pull/747 + * https://github.com/googleapis/java-spanner/pull/871 * * Code to be replaced: * builder.setHeaderProvider(FixedHeaderProvider.create("user-agent", userAgentString)); @@ -230,6 +232,17 @@ public Map getHeaders() { spanner, databaseClient, databaseAdminClient, batchClient, spannerConfig); } + private static String getEndpoint(String host) { + URL url; + try { + url = new URL(host); + } catch (MalformedURLException e) { + throw new IllegalArgumentException("Invalid host: " + host, e); + } + return String.format( + "%s:%s", url.getHost(), url.getPort() < 0 ? url.getDefaultPort() : url.getPort()); + } + public DatabaseClient getDatabaseClient() { return databaseClient; } From b9e6491ffbec4ee3baaddc9763f02c68b1c0d224 Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Fri, 15 Jan 2021 16:25:45 -0800 Subject: [PATCH 4/5] update versions --- .../org/apache/beam/gradle/BeamModulePlugin.groovy | 14 +++++++------- .../container/license_scripts/dep_urls_java.yaml | 2 +- sdks/java/io/google-cloud-platform/build.gradle | 3 ++- 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index b522f68e9ced2..d6b5cb583ca8f 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -435,7 +435,7 @@ class BeamModulePlugin implements Plugin { def google_code_gson_version = "2.8.6" def google_oauth_clients_version = "1.31.0" // Try to keep grpc_version consistent with gRPC version in google_cloud_platform_libraries_bom - def grpc_version = "1.32.2" + def grpc_version = "1.35.0" def guava_version = "30.1-jre" def hadoop_version = "2.10.1" def hamcrest_version = "2.1" @@ -447,7 +447,7 @@ class BeamModulePlugin implements Plugin { def jsr305_version = "3.0.2" def kafka_version = "2.4.1" def nemo_version = "0.1" - def netty_version = "4.1.51.Final" + def netty_version = "4.1.52.Final" def postgres_version = "42.2.16" def powermock_version = "2.0.9" def protobuf_version = "3.12.0" @@ -509,7 +509,7 @@ class BeamModulePlugin implements Plugin { google_api_client_jackson2 : "com.google.api-client:google-api-client-jackson2:$google_clients_version", google_api_client_java6 : "com.google.api-client:google-api-client-java6:$google_clients_version", google_api_common : "com.google.api:api-common", // google_cloud_platform_libraries_bom sets version - google_api_services_bigquery : "com.google.apis:google-api-services-bigquery:v2-rev20200719-$google_clients_version", + google_api_services_bigquery : "com.google.apis:google-api-services-bigquery:v2-rev20201030-$google_clients_version", google_api_services_clouddebugger : "com.google.apis:google-api-services-clouddebugger:v2-rev20200501-$google_clients_version", google_api_services_cloudresourcemanager : "com.google.apis:google-api-services-cloudresourcemanager:v1-rev20200720-$google_clients_version", google_api_services_dataflow : "com.google.apis:google-api-services-dataflow:v1b3-rev20200713-$google_clients_version", @@ -519,7 +519,7 @@ class BeamModulePlugin implements Plugin { google_auth_library_credentials : "com.google.auth:google-auth-library-credentials", // google_cloud_platform_libraries_bom sets version google_auth_library_oauth2_http : "com.google.auth:google-auth-library-oauth2-http", // google_cloud_platform_libraries_bom sets version google_cloud_bigquery : "com.google.cloud:google-cloud-bigquery", // google_cloud_platform_libraries_bom sets version - google_cloud_bigquery_storage : "com.google.cloud:google-cloud-bigquerystorage", // google_cloud_platform_libraries_bom sets version + google_cloud_bigquery_storage : "com.google.cloud:google-cloud-bigquerystorage:1.8.5", google_cloud_bigtable_client_core : "com.google.cloud.bigtable:bigtable-client-core:1.16.0", google_cloud_bigtable_emulator : "com.google.cloud:google-cloud-bigtable-emulator:0.125.2", google_cloud_core : "com.google.cloud:google-cloud-core", // google_cloud_platform_libraries_bom sets version @@ -530,9 +530,9 @@ class BeamModulePlugin implements Plugin { google_cloud_pubsub : "com.google.cloud:google-cloud-pubsub:$google_cloud_pubsub_version", google_cloud_pubsublite : "com.google.cloud:google-cloud-pubsublite:$google_cloud_pubsublite_version", // The GCP Libraries BOM dashboard shows the versions set by the BOM: - // https://storage.googleapis.com/cloud-opensource-java-dashboard/com.google.cloud/libraries-bom/13.2.0/artifact_details.html + // https://storage.googleapis.com/cloud-opensource-java-dashboard/com.google.cloud/libraries-bom/16.3.0/artifact_details.html // Update libraries-bom version on sdks/java/container/license_scripts/dep_urls_java.yaml - google_cloud_platform_libraries_bom : "com.google.cloud:libraries-bom:13.2.0", + google_cloud_platform_libraries_bom : "com.google.cloud:libraries-bom:16.3.0", google_cloud_spanner : "com.google.cloud:google-cloud-spanner", // google_cloud_platform_libraries_bom sets version google_code_gson : "com.google.code.gson:gson:$google_code_gson_version", // google-http-client's version is explicitly declared for sdks/java/maven-archetypes/examples @@ -601,7 +601,7 @@ class BeamModulePlugin implements Plugin { powermock_mockito : "org.powermock:powermock-api-mockito2:$powermock_version", protobuf_java : "com.google.protobuf:protobuf-java:$protobuf_version", protobuf_java_util : "com.google.protobuf:protobuf-java-util:$protobuf_version", - proto_google_cloud_bigquery_storage_v1beta1 : "com.google.api.grpc:proto-google-cloud-bigquerystorage-v1beta1", // google_cloud_platform_libraries_bom sets version + proto_google_cloud_bigquerybeta2_storage_v1 : "com.google.api.grpc:proto-google-cloud-bigquerystorage-v1beta2", // google_cloud_platform_libraries_bom sets version proto_google_cloud_bigtable_v2 : "com.google.api.grpc:proto-google-cloud-bigtable-v2", // google_cloud_platform_libraries_bom sets version proto_google_cloud_datastore_v1 : "com.google.api.grpc:proto-google-cloud-datastore-v1", // google_cloud_platform_libraries_bom sets version proto_google_cloud_pubsub_v1 : "com.google.api.grpc:proto-google-cloud-pubsub-v1", // google_cloud_platform_libraries_bom sets version diff --git a/sdks/java/container/license_scripts/dep_urls_java.yaml b/sdks/java/container/license_scripts/dep_urls_java.yaml index 88c3cde722446..d7d40e6c80cb1 100644 --- a/sdks/java/container/license_scripts/dep_urls_java.yaml +++ b/sdks/java/container/license_scripts/dep_urls_java.yaml @@ -41,7 +41,7 @@ jaxen: '1.1.6': type: "3-Clause BSD" libraries-bom: - '13.2.0': + '16.3.0': license: "https://raw.githubusercontent.com/GoogleCloudPlatform/cloud-opensource-java/master/LICENSE" type: "Apache License 2.0" paranamer: diff --git a/sdks/java/io/google-cloud-platform/build.gradle b/sdks/java/io/google-cloud-platform/build.gradle index 7ac6878eae807..02fc34fa61d0e 100644 --- a/sdks/java/io/google-cloud-platform/build.gradle +++ b/sdks/java/io/google-cloud-platform/build.gradle @@ -61,6 +61,7 @@ dependencies { compile library.java.google_http_client compile library.java.google_http_client_jackson2 compile library.java.grpc_alts + compile library.java.grpc_api compile library.java.grpc_auth compile library.java.grpc_core compile library.java.grpc_context @@ -77,7 +78,7 @@ dependencies { compile library.java.junit compile library.java.netty_handler compile library.java.netty_tcnative_boringssl_static - compile library.java.proto_google_cloud_bigquery_storage_v1beta1 + compile library.java.proto_google_cloud_bigquerybeta2_storage_v1 compile library.java.proto_google_cloud_bigtable_v2 compile library.java.proto_google_cloud_datastore_v1 compile library.java.proto_google_cloud_pubsub_v1 From a88a034e666235e6204f800c5f238358d8ac3b57 Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Wed, 10 Feb 2021 17:29:10 -0800 Subject: [PATCH 5/5] update autovalue version --- .../main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index d6b5cb583ca8f..5ebd8f7eac90d 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -422,7 +422,7 @@ class BeamModulePlugin implements Plugin { // a dependency version which should match across multiple // Maven artifacts. def activemq_version = "5.14.5" - def autovalue_version = "1.7.2" + def autovalue_version = "1.7.4" def aws_java_sdk_version = "1.11.718" def aws_java_sdk2_version = "2.13.54" def cassandra_driver_version = "3.10.2"