From 9c173d513ec4fe438f66edd220536431c4b4ce84 Mon Sep 17 00:00:00 2001 From: Brian Hulette Date: Tue, 5 Oct 2021 11:57:12 -0700 Subject: [PATCH] [BEAM-11831] Parially Revert "[BEAM-11805] Replace user-agent for spanner (#13990)" (#15591) This partially reverts commit 4f78ab33535a4139819d86bfb070b683c05d380a. --- .../sdk/io/gcp/spanner/SpannerAccessor.java | 101 +----------------- 1 file changed, 3 insertions(+), 98 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java index c34e21c3e171f..faff06e9a7972 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java @@ -17,10 +17,8 @@ */ package org.apache.beam.sdk.io.gcp.spanner; -import com.google.api.gax.core.ExecutorProvider; -import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; import com.google.api.gax.retrying.RetrySettings; -import com.google.api.gax.rpc.HeaderProvider; +import com.google.api.gax.rpc.FixedHeaderProvider; import com.google.api.gax.rpc.ServerStreamingCallSettings; import com.google.api.gax.rpc.UnaryCallSettings; import com.google.cloud.NoCredentials; @@ -31,7 +29,6 @@ import com.google.cloud.spanner.DatabaseId; import com.google.cloud.spanner.Spanner; import com.google.cloud.spanner.SpannerOptions; -import com.google.cloud.spanner.spi.v1.SpannerInterceptorProvider; import com.google.spanner.v1.CommitRequest; import com.google.spanner.v1.CommitResponse; import com.google.spanner.v1.ExecuteSqlRequest; @@ -41,21 +38,11 @@ import io.grpc.ClientCall; import io.grpc.ClientInterceptor; import io.grpc.MethodDescriptor; -import java.net.MalformedURLException; -import java.net.URL; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.util.ReleaseInfo; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.joda.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -88,12 +75,6 @@ public class SpannerAccessor implements AutoCloseable { private final DatabaseAdminClient databaseAdminClient; private final SpannerConfig spannerConfig; - private static final int MAX_MESSAGE_SIZE = 100 * 1024 * 1024; - private static final int MAX_METADATA_SIZE = 32 * 1024; // bytes - private static final int NUM_CHANNELS = 4; - public static final org.threeten.bp.Duration GRPC_KEEP_ALIVE_SECONDS = - org.threeten.bp.Duration.ofSeconds(120); - private SpannerAccessor( Spanner spanner, DatabaseClient databaseClient, @@ -161,23 +142,6 @@ private static SpannerAccessor createAndConnect(SpannerConfig spannerConfig) { .setTotalTimeout(org.threeten.bp.Duration.ofMinutes(120)) .build()); - ManagedInstantiatingExecutorProvider executorProvider = - new ManagedInstantiatingExecutorProvider( - new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("Cloud-Spanner-TransportChannel-%d") - .build()); - - InstantiatingGrpcChannelProvider.Builder instantiatingGrpcChannelProvider = - InstantiatingGrpcChannelProvider.newBuilder() - .setMaxInboundMessageSize(MAX_MESSAGE_SIZE) - .setMaxInboundMetadataSize(MAX_METADATA_SIZE) - .setPoolSize(NUM_CHANNELS) - .setExecutorProvider(executorProvider) - .setKeepAliveTime(GRPC_KEEP_ALIVE_SECONDS) - .setInterceptorProvider(SpannerInterceptorProvider.createDefault()) - .setAttemptDirectPath(true); - ValueProvider projectId = spannerConfig.getProjectId(); if (projectId != null) { builder.setProjectId(projectId.get()); @@ -189,34 +153,14 @@ private static SpannerAccessor createAndConnect(SpannerConfig spannerConfig) { ValueProvider host = spannerConfig.getHost(); if (host != null) { builder.setHost(host.get()); - instantiatingGrpcChannelProvider.setEndpoint(getEndpoint(host.get())); } ValueProvider emulatorHost = spannerConfig.getEmulatorHost(); if (emulatorHost != null) { builder.setEmulatorHost(emulatorHost.get()); builder.setCredentials(NoCredentials.getInstance()); - } else { - String userAgentString = USER_AGENT_PREFIX + "/" + ReleaseInfo.getReleaseInfo().getVersion(); - /* Workaround to setup user-agent string. - * InstantiatingGrpcChannelProvider will override the settings provided. - * The section below and all associated artifacts will be removed once the bug - * that prevents setting user-agent is fixed. - * https://github.com/googleapis/java-spanner/pull/871 - * - * Code to be replaced: - * builder.setHeaderProvider(FixedHeaderProvider.create("user-agent", userAgentString)); - */ - instantiatingGrpcChannelProvider.setHeaderProvider( - new HeaderProvider() { - @Override - public Map getHeaders() { - final Map headers = new HashMap<>(); - headers.put("user-agent", userAgentString); - return headers; - } - }); - builder.setChannelProvider(instantiatingGrpcChannelProvider.build()); } + String userAgentString = USER_AGENT_PREFIX + "/" + ReleaseInfo.getReleaseInfo().getVersion(); + builder.setHeaderProvider(FixedHeaderProvider.create("user-agent", userAgentString)); SpannerOptions options = builder.build(); Spanner spanner = options.getService(); @@ -232,17 +176,6 @@ public Map getHeaders() { spanner, databaseClient, databaseAdminClient, batchClient, spannerConfig); } - private static String getEndpoint(String host) { - URL url; - try { - url = new URL(host); - } catch (MalformedURLException e) { - throw new IllegalArgumentException("Invalid host: " + host, e); - } - return String.format( - "%s:%s", url.getHost(), url.getPort() < 0 ? url.getDefaultPort() : url.getPort()); - } - public DatabaseClient getDatabaseClient() { return databaseClient; } @@ -291,32 +224,4 @@ public ClientCall interceptCall( return next.newCall(method, callOptions); } } - - private static final class ManagedInstantiatingExecutorProvider implements ExecutorProvider { - // 4 Gapic clients * 4 channels per client. - private static final int DEFAULT_MIN_THREAD_COUNT = 16; - private final List executors = new ArrayList<>(); - private final ThreadFactory threadFactory; - - private ManagedInstantiatingExecutorProvider(ThreadFactory threadFactory) { - this.threadFactory = threadFactory; - } - - @Override - public boolean shouldAutoClose() { - return false; - } - - @Override - public ScheduledExecutorService getExecutor() { - int numCpus = Runtime.getRuntime().availableProcessors(); - int numThreads = Math.max(DEFAULT_MIN_THREAD_COUNT, numCpus); - ScheduledExecutorService executor = - new ScheduledThreadPoolExecutor(numThreads, threadFactory); - synchronized (this) { - executors.add(executor); - } - return executor; - } - } }