diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index b522f68e9ced2..5ebd8f7eac90d 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -422,7 +422,7 @@ class BeamModulePlugin implements Plugin { // a dependency version which should match across multiple // Maven artifacts. def activemq_version = "5.14.5" - def autovalue_version = "1.7.2" + def autovalue_version = "1.7.4" def aws_java_sdk_version = "1.11.718" def aws_java_sdk2_version = "2.13.54" def cassandra_driver_version = "3.10.2" @@ -435,7 +435,7 @@ class BeamModulePlugin implements Plugin { def google_code_gson_version = "2.8.6" def google_oauth_clients_version = "1.31.0" // Try to keep grpc_version consistent with gRPC version in google_cloud_platform_libraries_bom - def grpc_version = "1.32.2" + def grpc_version = "1.35.0" def guava_version = "30.1-jre" def hadoop_version = "2.10.1" def hamcrest_version = "2.1" @@ -447,7 +447,7 @@ class BeamModulePlugin implements Plugin { def jsr305_version = "3.0.2" def kafka_version = "2.4.1" def nemo_version = "0.1" - def netty_version = "4.1.51.Final" + def netty_version = "4.1.52.Final" def postgres_version = "42.2.16" def powermock_version = "2.0.9" def protobuf_version = "3.12.0" @@ -509,7 +509,7 @@ class BeamModulePlugin implements Plugin { google_api_client_jackson2 : "com.google.api-client:google-api-client-jackson2:$google_clients_version", google_api_client_java6 : "com.google.api-client:google-api-client-java6:$google_clients_version", google_api_common : "com.google.api:api-common", // google_cloud_platform_libraries_bom sets version - google_api_services_bigquery : "com.google.apis:google-api-services-bigquery:v2-rev20200719-$google_clients_version", + google_api_services_bigquery : "com.google.apis:google-api-services-bigquery:v2-rev20201030-$google_clients_version", google_api_services_clouddebugger : "com.google.apis:google-api-services-clouddebugger:v2-rev20200501-$google_clients_version", google_api_services_cloudresourcemanager : "com.google.apis:google-api-services-cloudresourcemanager:v1-rev20200720-$google_clients_version", google_api_services_dataflow : "com.google.apis:google-api-services-dataflow:v1b3-rev20200713-$google_clients_version", @@ -519,7 +519,7 @@ class BeamModulePlugin implements Plugin { google_auth_library_credentials : "com.google.auth:google-auth-library-credentials", // google_cloud_platform_libraries_bom sets version google_auth_library_oauth2_http : "com.google.auth:google-auth-library-oauth2-http", // google_cloud_platform_libraries_bom sets version google_cloud_bigquery : "com.google.cloud:google-cloud-bigquery", // google_cloud_platform_libraries_bom sets version - google_cloud_bigquery_storage : "com.google.cloud:google-cloud-bigquerystorage", // google_cloud_platform_libraries_bom sets version + google_cloud_bigquery_storage : "com.google.cloud:google-cloud-bigquerystorage:1.8.5", google_cloud_bigtable_client_core : "com.google.cloud.bigtable:bigtable-client-core:1.16.0", google_cloud_bigtable_emulator : "com.google.cloud:google-cloud-bigtable-emulator:0.125.2", google_cloud_core : "com.google.cloud:google-cloud-core", // google_cloud_platform_libraries_bom sets version @@ -530,9 +530,9 @@ class BeamModulePlugin implements Plugin { google_cloud_pubsub : "com.google.cloud:google-cloud-pubsub:$google_cloud_pubsub_version", google_cloud_pubsublite : "com.google.cloud:google-cloud-pubsublite:$google_cloud_pubsublite_version", // The GCP Libraries BOM dashboard shows the versions set by the BOM: - // https://storage.googleapis.com/cloud-opensource-java-dashboard/com.google.cloud/libraries-bom/13.2.0/artifact_details.html + // https://storage.googleapis.com/cloud-opensource-java-dashboard/com.google.cloud/libraries-bom/16.3.0/artifact_details.html // Update libraries-bom version on sdks/java/container/license_scripts/dep_urls_java.yaml - google_cloud_platform_libraries_bom : "com.google.cloud:libraries-bom:13.2.0", + google_cloud_platform_libraries_bom : "com.google.cloud:libraries-bom:16.3.0", google_cloud_spanner : "com.google.cloud:google-cloud-spanner", // google_cloud_platform_libraries_bom sets version google_code_gson : "com.google.code.gson:gson:$google_code_gson_version", // google-http-client's version is explicitly declared for sdks/java/maven-archetypes/examples @@ -601,7 +601,7 @@ class BeamModulePlugin implements Plugin { powermock_mockito : "org.powermock:powermock-api-mockito2:$powermock_version", protobuf_java : "com.google.protobuf:protobuf-java:$protobuf_version", protobuf_java_util : "com.google.protobuf:protobuf-java-util:$protobuf_version", - proto_google_cloud_bigquery_storage_v1beta1 : "com.google.api.grpc:proto-google-cloud-bigquerystorage-v1beta1", // google_cloud_platform_libraries_bom sets version + proto_google_cloud_bigquerybeta2_storage_v1 : "com.google.api.grpc:proto-google-cloud-bigquerystorage-v1beta2", // google_cloud_platform_libraries_bom sets version proto_google_cloud_bigtable_v2 : "com.google.api.grpc:proto-google-cloud-bigtable-v2", // google_cloud_platform_libraries_bom sets version proto_google_cloud_datastore_v1 : "com.google.api.grpc:proto-google-cloud-datastore-v1", // google_cloud_platform_libraries_bom sets version proto_google_cloud_pubsub_v1 : "com.google.api.grpc:proto-google-cloud-pubsub-v1", // google_cloud_platform_libraries_bom sets version diff --git a/sdks/java/container/license_scripts/dep_urls_java.yaml b/sdks/java/container/license_scripts/dep_urls_java.yaml index 88c3cde722446..d7d40e6c80cb1 100644 --- a/sdks/java/container/license_scripts/dep_urls_java.yaml +++ b/sdks/java/container/license_scripts/dep_urls_java.yaml @@ -41,7 +41,7 @@ jaxen: '1.1.6': type: "3-Clause BSD" libraries-bom: - '13.2.0': + '16.3.0': license: "https://raw.githubusercontent.com/GoogleCloudPlatform/cloud-opensource-java/master/LICENSE" type: "Apache License 2.0" paranamer: diff --git a/sdks/java/io/google-cloud-platform/build.gradle b/sdks/java/io/google-cloud-platform/build.gradle index 7ac6878eae807..02fc34fa61d0e 100644 --- a/sdks/java/io/google-cloud-platform/build.gradle +++ b/sdks/java/io/google-cloud-platform/build.gradle @@ -61,6 +61,7 @@ dependencies { compile library.java.google_http_client compile library.java.google_http_client_jackson2 compile library.java.grpc_alts + compile library.java.grpc_api compile library.java.grpc_auth compile library.java.grpc_core compile library.java.grpc_context @@ -77,7 +78,7 @@ dependencies { compile library.java.junit compile library.java.netty_handler compile library.java.netty_tcnative_boringssl_static - compile library.java.proto_google_cloud_bigquery_storage_v1beta1 + compile library.java.proto_google_cloud_bigquerybeta2_storage_v1 compile library.java.proto_google_cloud_bigtable_v2 compile library.java.proto_google_cloud_datastore_v1 compile library.java.proto_google_cloud_pubsub_v1 diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java index 0a7f3ab07bba4..e97e38ef472cd 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java @@ -17,8 +17,10 @@ */ package org.apache.beam.sdk.io.gcp.spanner; +import com.google.api.gax.core.ExecutorProvider; +import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; import com.google.api.gax.retrying.RetrySettings; -import com.google.api.gax.rpc.FixedHeaderProvider; +import com.google.api.gax.rpc.HeaderProvider; import com.google.api.gax.rpc.ServerStreamingCallSettings; import com.google.api.gax.rpc.UnaryCallSettings; import com.google.cloud.NoCredentials; @@ -29,6 +31,7 @@ import com.google.cloud.spanner.DatabaseId; import com.google.cloud.spanner.Spanner; import com.google.cloud.spanner.SpannerOptions; +import com.google.cloud.spanner.spi.v1.SpannerInterceptorProvider; import com.google.spanner.v1.CommitRequest; import com.google.spanner.v1.CommitResponse; import com.google.spanner.v1.ExecuteSqlRequest; @@ -38,11 +41,21 @@ import io.grpc.ClientCall; import io.grpc.ClientInterceptor; import io.grpc.MethodDescriptor; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.util.ReleaseInfo; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.joda.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,7 +67,10 @@ class SpannerAccessor implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(SpannerAccessor.class); - // A common user agent token that indicates that this request was originated from Apache Beam. + /* A common user agent token that indicates that this request was originated from + * Apache Beam. Setting the user-agent allows Cloud Spanner to detect that the + * workload is coming from Dataflow and to potentially apply performance optimizations + */ private static final String USER_AGENT_PREFIX = "Apache_Beam_Java"; // Only create one SpannerAccessor for each different SpannerConfig. @@ -72,6 +88,12 @@ class SpannerAccessor implements AutoCloseable { private final DatabaseAdminClient databaseAdminClient; private final SpannerConfig spannerConfig; + private static final int MAX_MESSAGE_SIZE = 100 * 1024 * 1024; + private static final int MAX_METADATA_SIZE = 32 * 1024; // bytes + private static final int NUM_CHANNELS = 4; + public static final org.threeten.bp.Duration GRPC_KEEP_ALIVE_SECONDS = + org.threeten.bp.Duration.ofSeconds(120); + private SpannerAccessor( Spanner spanner, DatabaseClient databaseClient, @@ -139,6 +161,23 @@ private static SpannerAccessor createAndConnect(SpannerConfig spannerConfig) { .setTotalTimeout(org.threeten.bp.Duration.ofMinutes(120)) .build()); + ManagedInstantiatingExecutorProvider executorProvider = + new ManagedInstantiatingExecutorProvider( + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("Cloud-Spanner-TransportChannel-%d") + .build()); + + InstantiatingGrpcChannelProvider.Builder instantiatingGrpcChannelProvider = + InstantiatingGrpcChannelProvider.newBuilder() + .setMaxInboundMessageSize(MAX_MESSAGE_SIZE) + .setMaxInboundMetadataSize(MAX_METADATA_SIZE) + .setPoolSize(NUM_CHANNELS) + .setExecutorProvider(executorProvider) + .setKeepAliveTime(GRPC_KEEP_ALIVE_SECONDS) + .setInterceptorProvider(SpannerInterceptorProvider.createDefault()) + .setAttemptDirectPath(true); + ValueProvider projectId = spannerConfig.getProjectId(); if (projectId != null) { builder.setProjectId(projectId.get()); @@ -150,14 +189,34 @@ private static SpannerAccessor createAndConnect(SpannerConfig spannerConfig) { ValueProvider host = spannerConfig.getHost(); if (host != null) { builder.setHost(host.get()); + instantiatingGrpcChannelProvider.setEndpoint(getEndpoint(host.get())); } ValueProvider emulatorHost = spannerConfig.getEmulatorHost(); if (emulatorHost != null) { builder.setEmulatorHost(emulatorHost.get()); builder.setCredentials(NoCredentials.getInstance()); + } else { + String userAgentString = USER_AGENT_PREFIX + "/" + ReleaseInfo.getReleaseInfo().getVersion(); + /* Workaround to setup user-agent string. + * InstantiatingGrpcChannelProvider will override the settings provided. + * The section below and all associated artifacts will be removed once the bug + * that prevents setting user-agent is fixed. + * https://github.com/googleapis/java-spanner/pull/871 + * + * Code to be replaced: + * builder.setHeaderProvider(FixedHeaderProvider.create("user-agent", userAgentString)); + */ + instantiatingGrpcChannelProvider.setHeaderProvider( + new HeaderProvider() { + @Override + public Map getHeaders() { + final Map headers = new HashMap<>(); + headers.put("user-agent", userAgentString); + return headers; + } + }); + builder.setChannelProvider(instantiatingGrpcChannelProvider.build()); } - String userAgentString = USER_AGENT_PREFIX + "/" + ReleaseInfo.getReleaseInfo().getVersion(); - builder.setHeaderProvider(FixedHeaderProvider.create("user-agent", userAgentString)); SpannerOptions options = builder.build(); Spanner spanner = options.getService(); @@ -173,6 +232,17 @@ private static SpannerAccessor createAndConnect(SpannerConfig spannerConfig) { spanner, databaseClient, databaseAdminClient, batchClient, spannerConfig); } + private static String getEndpoint(String host) { + URL url; + try { + url = new URL(host); + } catch (MalformedURLException e) { + throw new IllegalArgumentException("Invalid host: " + host, e); + } + return String.format( + "%s:%s", url.getHost(), url.getPort() < 0 ? url.getDefaultPort() : url.getPort()); + } + public DatabaseClient getDatabaseClient() { return databaseClient; } @@ -221,4 +291,32 @@ public ClientCall interceptCall( return next.newCall(method, callOptions); } } + + private static final class ManagedInstantiatingExecutorProvider implements ExecutorProvider { + // 4 Gapic clients * 4 channels per client. + private static final int DEFAULT_MIN_THREAD_COUNT = 16; + private final List executors = new ArrayList<>(); + private final ThreadFactory threadFactory; + + private ManagedInstantiatingExecutorProvider(ThreadFactory threadFactory) { + this.threadFactory = threadFactory; + } + + @Override + public boolean shouldAutoClose() { + return false; + } + + @Override + public ScheduledExecutorService getExecutor() { + int numCpus = Runtime.getRuntime().availableProcessors(); + int numThreads = Math.max(DEFAULT_MIN_THREAD_COUNT, numCpus); + ScheduledExecutorService executor = + new ScheduledThreadPoolExecutor(numThreads, threadFactory); + synchronized (this) { + executors.add(executor); + } + return executor; + } + } }