Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-11805] Replace user-agent for spanner #13990

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -422,7 +422,7 @@ class BeamModulePlugin implements Plugin<Project> {
// a dependency version which should match across multiple
// Maven artifacts.
def activemq_version = "5.14.5"
def autovalue_version = "1.7.2"
def autovalue_version = "1.7.4"
def aws_java_sdk_version = "1.11.718"
def aws_java_sdk2_version = "2.13.54"
def cassandra_driver_version = "3.10.2"
Expand All @@ -435,7 +435,7 @@ class BeamModulePlugin implements Plugin<Project> {
def google_code_gson_version = "2.8.6"
def google_oauth_clients_version = "1.31.0"
// Try to keep grpc_version consistent with gRPC version in google_cloud_platform_libraries_bom
def grpc_version = "1.32.2"
def grpc_version = "1.35.0"
def guava_version = "30.1-jre"
def hadoop_version = "2.10.1"
def hamcrest_version = "2.1"
Expand All @@ -447,7 +447,7 @@ class BeamModulePlugin implements Plugin<Project> {
def jsr305_version = "3.0.2"
def kafka_version = "2.4.1"
def nemo_version = "0.1"
def netty_version = "4.1.51.Final"
def netty_version = "4.1.52.Final"
def postgres_version = "42.2.16"
def powermock_version = "2.0.9"
def protobuf_version = "3.12.0"
Expand Down Expand Up @@ -509,7 +509,7 @@ class BeamModulePlugin implements Plugin<Project> {
google_api_client_jackson2 : "com.google.api-client:google-api-client-jackson2:$google_clients_version",
google_api_client_java6 : "com.google.api-client:google-api-client-java6:$google_clients_version",
google_api_common : "com.google.api:api-common", // google_cloud_platform_libraries_bom sets version
google_api_services_bigquery : "com.google.apis:google-api-services-bigquery:v2-rev20200719-$google_clients_version",
google_api_services_bigquery : "com.google.apis:google-api-services-bigquery:v2-rev20201030-$google_clients_version",
google_api_services_clouddebugger : "com.google.apis:google-api-services-clouddebugger:v2-rev20200501-$google_clients_version",
google_api_services_cloudresourcemanager : "com.google.apis:google-api-services-cloudresourcemanager:v1-rev20200720-$google_clients_version",
google_api_services_dataflow : "com.google.apis:google-api-services-dataflow:v1b3-rev20200713-$google_clients_version",
Expand All @@ -519,7 +519,7 @@ class BeamModulePlugin implements Plugin<Project> {
google_auth_library_credentials : "com.google.auth:google-auth-library-credentials", // google_cloud_platform_libraries_bom sets version
google_auth_library_oauth2_http : "com.google.auth:google-auth-library-oauth2-http", // google_cloud_platform_libraries_bom sets version
google_cloud_bigquery : "com.google.cloud:google-cloud-bigquery", // google_cloud_platform_libraries_bom sets version
google_cloud_bigquery_storage : "com.google.cloud:google-cloud-bigquerystorage", // google_cloud_platform_libraries_bom sets version
google_cloud_bigquery_storage : "com.google.cloud:google-cloud-bigquerystorage:1.8.5",
google_cloud_bigtable_client_core : "com.google.cloud.bigtable:bigtable-client-core:1.16.0",
google_cloud_bigtable_emulator : "com.google.cloud:google-cloud-bigtable-emulator:0.125.2",
google_cloud_core : "com.google.cloud:google-cloud-core", // google_cloud_platform_libraries_bom sets version
Expand All @@ -530,9 +530,9 @@ class BeamModulePlugin implements Plugin<Project> {
google_cloud_pubsub : "com.google.cloud:google-cloud-pubsub:$google_cloud_pubsub_version",
google_cloud_pubsublite : "com.google.cloud:google-cloud-pubsublite:$google_cloud_pubsublite_version",
// The GCP Libraries BOM dashboard shows the versions set by the BOM:
// https://storage.googleapis.com/cloud-opensource-java-dashboard/com.google.cloud/libraries-bom/13.2.0/artifact_details.html
// https://storage.googleapis.com/cloud-opensource-java-dashboard/com.google.cloud/libraries-bom/16.3.0/artifact_details.html
// Update libraries-bom version on sdks/java/container/license_scripts/dep_urls_java.yaml
google_cloud_platform_libraries_bom : "com.google.cloud:libraries-bom:13.2.0",
google_cloud_platform_libraries_bom : "com.google.cloud:libraries-bom:16.3.0",
google_cloud_spanner : "com.google.cloud:google-cloud-spanner", // google_cloud_platform_libraries_bom sets version
google_code_gson : "com.google.code.gson:gson:$google_code_gson_version",
// google-http-client's version is explicitly declared for sdks/java/maven-archetypes/examples
Expand Down Expand Up @@ -601,7 +601,7 @@ class BeamModulePlugin implements Plugin<Project> {
powermock_mockito : "org.powermock:powermock-api-mockito2:$powermock_version",
protobuf_java : "com.google.protobuf:protobuf-java:$protobuf_version",
protobuf_java_util : "com.google.protobuf:protobuf-java-util:$protobuf_version",
proto_google_cloud_bigquery_storage_v1beta1 : "com.google.api.grpc:proto-google-cloud-bigquerystorage-v1beta1", // google_cloud_platform_libraries_bom sets version
proto_google_cloud_bigquerybeta2_storage_v1 : "com.google.api.grpc:proto-google-cloud-bigquerystorage-v1beta2", // google_cloud_platform_libraries_bom sets version
proto_google_cloud_bigtable_v2 : "com.google.api.grpc:proto-google-cloud-bigtable-v2", // google_cloud_platform_libraries_bom sets version
proto_google_cloud_datastore_v1 : "com.google.api.grpc:proto-google-cloud-datastore-v1", // google_cloud_platform_libraries_bom sets version
proto_google_cloud_pubsub_v1 : "com.google.api.grpc:proto-google-cloud-pubsub-v1", // google_cloud_platform_libraries_bom sets version
Expand Down
2 changes: 1 addition & 1 deletion sdks/java/container/license_scripts/dep_urls_java.yaml
Expand Up @@ -41,7 +41,7 @@ jaxen:
'1.1.6':
type: "3-Clause BSD"
libraries-bom:
'13.2.0':
'16.3.0':
license: "https://raw.githubusercontent.com/GoogleCloudPlatform/cloud-opensource-java/master/LICENSE"
type: "Apache License 2.0"
paranamer:
Expand Down
3 changes: 2 additions & 1 deletion sdks/java/io/google-cloud-platform/build.gradle
Expand Up @@ -61,6 +61,7 @@ dependencies {
compile library.java.google_http_client
compile library.java.google_http_client_jackson2
compile library.java.grpc_alts
compile library.java.grpc_api
compile library.java.grpc_auth
compile library.java.grpc_core
compile library.java.grpc_context
Expand All @@ -77,7 +78,7 @@ dependencies {
compile library.java.junit
compile library.java.netty_handler
compile library.java.netty_tcnative_boringssl_static
compile library.java.proto_google_cloud_bigquery_storage_v1beta1
compile library.java.proto_google_cloud_bigquerybeta2_storage_v1
compile library.java.proto_google_cloud_bigtable_v2
compile library.java.proto_google_cloud_datastore_v1
compile library.java.proto_google_cloud_pubsub_v1
Expand Down
Expand Up @@ -17,8 +17,10 @@
*/
package org.apache.beam.sdk.io.gcp.spanner;

import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.FixedHeaderProvider;
import com.google.api.gax.rpc.HeaderProvider;
import com.google.api.gax.rpc.ServerStreamingCallSettings;
import com.google.api.gax.rpc.UnaryCallSettings;
import com.google.cloud.NoCredentials;
Expand All @@ -29,6 +31,7 @@
import com.google.cloud.spanner.DatabaseId;
import com.google.cloud.spanner.Spanner;
import com.google.cloud.spanner.SpannerOptions;
import com.google.cloud.spanner.spi.v1.SpannerInterceptorProvider;
import com.google.spanner.v1.CommitRequest;
import com.google.spanner.v1.CommitResponse;
import com.google.spanner.v1.ExecuteSqlRequest;
Expand All @@ -38,11 +41,21 @@
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.MethodDescriptor;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.util.ReleaseInfo;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -54,7 +67,10 @@
class SpannerAccessor implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(SpannerAccessor.class);

// A common user agent token that indicates that this request was originated from Apache Beam.
/* A common user agent token that indicates that this request was originated from
* Apache Beam. Setting the user-agent allows Cloud Spanner to detect that the
* workload is coming from Dataflow and to potentially apply performance optimizations
*/
private static final String USER_AGENT_PREFIX = "Apache_Beam_Java";

// Only create one SpannerAccessor for each different SpannerConfig.
Expand All @@ -72,6 +88,12 @@ class SpannerAccessor implements AutoCloseable {
private final DatabaseAdminClient databaseAdminClient;
private final SpannerConfig spannerConfig;

private static final int MAX_MESSAGE_SIZE = 100 * 1024 * 1024;
private static final int MAX_METADATA_SIZE = 32 * 1024; // bytes
private static final int NUM_CHANNELS = 4;
public static final org.threeten.bp.Duration GRPC_KEEP_ALIVE_SECONDS =
org.threeten.bp.Duration.ofSeconds(120);

private SpannerAccessor(
Spanner spanner,
DatabaseClient databaseClient,
Expand Down Expand Up @@ -139,6 +161,23 @@ private static SpannerAccessor createAndConnect(SpannerConfig spannerConfig) {
.setTotalTimeout(org.threeten.bp.Duration.ofMinutes(120))
.build());

ManagedInstantiatingExecutorProvider executorProvider =
new ManagedInstantiatingExecutorProvider(
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("Cloud-Spanner-TransportChannel-%d")
.build());

InstantiatingGrpcChannelProvider.Builder instantiatingGrpcChannelProvider =
InstantiatingGrpcChannelProvider.newBuilder()
.setMaxInboundMessageSize(MAX_MESSAGE_SIZE)
.setMaxInboundMetadataSize(MAX_METADATA_SIZE)
.setPoolSize(NUM_CHANNELS)
.setExecutorProvider(executorProvider)
.setKeepAliveTime(GRPC_KEEP_ALIVE_SECONDS)
.setInterceptorProvider(SpannerInterceptorProvider.createDefault())
.setAttemptDirectPath(true);

ValueProvider<String> projectId = spannerConfig.getProjectId();
if (projectId != null) {
builder.setProjectId(projectId.get());
Expand All @@ -150,14 +189,34 @@ private static SpannerAccessor createAndConnect(SpannerConfig spannerConfig) {
ValueProvider<String> host = spannerConfig.getHost();
if (host != null) {
builder.setHost(host.get());
instantiatingGrpcChannelProvider.setEndpoint(getEndpoint(host.get()));
}
ValueProvider<String> emulatorHost = spannerConfig.getEmulatorHost();
if (emulatorHost != null) {
builder.setEmulatorHost(emulatorHost.get());
builder.setCredentials(NoCredentials.getInstance());
} else {
TheNeuralBit marked this conversation as resolved.
Show resolved Hide resolved
String userAgentString = USER_AGENT_PREFIX + "/" + ReleaseInfo.getReleaseInfo().getVersion();
/* Workaround to setup user-agent string.
* InstantiatingGrpcChannelProvider will override the settings provided.
* The section below and all associated artifacts will be removed once the bug
* that prevents setting user-agent is fixed.
* https://github.com/googleapis/java-spanner/pull/871
*
* Code to be replaced:
* builder.setHeaderProvider(FixedHeaderProvider.create("user-agent", userAgentString));
*/
instantiatingGrpcChannelProvider.setHeaderProvider(
new HeaderProvider() {
@Override
public Map<String, String> getHeaders() {
final Map<String, String> headers = new HashMap<>();
headers.put("user-agent", userAgentString);
return headers;
}
});
builder.setChannelProvider(instantiatingGrpcChannelProvider.build());
}
String userAgentString = USER_AGENT_PREFIX + "/" + ReleaseInfo.getReleaseInfo().getVersion();
builder.setHeaderProvider(FixedHeaderProvider.create("user-agent", userAgentString));
SpannerOptions options = builder.build();

Spanner spanner = options.getService();
Expand All @@ -173,6 +232,17 @@ private static SpannerAccessor createAndConnect(SpannerConfig spannerConfig) {
spanner, databaseClient, databaseAdminClient, batchClient, spannerConfig);
}

private static String getEndpoint(String host) {
URL url;
try {
url = new URL(host);
} catch (MalformedURLException e) {
throw new IllegalArgumentException("Invalid host: " + host, e);
}
return String.format(
"%s:%s", url.getHost(), url.getPort() < 0 ? url.getDefaultPort() : url.getPort());
}

public DatabaseClient getDatabaseClient() {
return databaseClient;
}
Expand Down Expand Up @@ -221,4 +291,32 @@ public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
return next.newCall(method, callOptions);
}
}

private static final class ManagedInstantiatingExecutorProvider implements ExecutorProvider {
// 4 Gapic clients * 4 channels per client.
private static final int DEFAULT_MIN_THREAD_COUNT = 16;
private final List<ScheduledExecutorService> executors = new ArrayList<>();
private final ThreadFactory threadFactory;

private ManagedInstantiatingExecutorProvider(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
}

@Override
public boolean shouldAutoClose() {
return false;
}

@Override
public ScheduledExecutorService getExecutor() {
int numCpus = Runtime.getRuntime().availableProcessors();
int numThreads = Math.max(DEFAULT_MIN_THREAD_COUNT, numCpus);
ScheduledExecutorService executor =
new ScheduledThreadPoolExecutor(numThreads, threadFactory);
synchronized (this) {
executors.add(executor);
}
return executor;
}
}
}