Skip to content

Commit

Permalink
[BEAM-11831] Parially Revert "[BEAM-11805] Replace user-agent for spa…
Browse files Browse the repository at this point in the history
…nner (apache#13990)" (apache#15591)

This partially reverts commit 4f78ab3.
  • Loading branch information
TheNeuralBit authored and dmitriikuzinepam committed Nov 2, 2021
1 parent f4062f8 commit 9c173d5
Showing 1 changed file with 3 additions and 98 deletions.
Expand Up @@ -17,10 +17,8 @@
*/
package org.apache.beam.sdk.io.gcp.spanner;

import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.HeaderProvider;
import com.google.api.gax.rpc.FixedHeaderProvider;
import com.google.api.gax.rpc.ServerStreamingCallSettings;
import com.google.api.gax.rpc.UnaryCallSettings;
import com.google.cloud.NoCredentials;
Expand All @@ -31,7 +29,6 @@
import com.google.cloud.spanner.DatabaseId;
import com.google.cloud.spanner.Spanner;
import com.google.cloud.spanner.SpannerOptions;
import com.google.cloud.spanner.spi.v1.SpannerInterceptorProvider;
import com.google.spanner.v1.CommitRequest;
import com.google.spanner.v1.CommitResponse;
import com.google.spanner.v1.ExecuteSqlRequest;
Expand All @@ -41,21 +38,11 @@
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.MethodDescriptor;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.util.ReleaseInfo;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -88,12 +75,6 @@ public class SpannerAccessor implements AutoCloseable {
private final DatabaseAdminClient databaseAdminClient;
private final SpannerConfig spannerConfig;

private static final int MAX_MESSAGE_SIZE = 100 * 1024 * 1024;
private static final int MAX_METADATA_SIZE = 32 * 1024; // bytes
private static final int NUM_CHANNELS = 4;
public static final org.threeten.bp.Duration GRPC_KEEP_ALIVE_SECONDS =
org.threeten.bp.Duration.ofSeconds(120);

private SpannerAccessor(
Spanner spanner,
DatabaseClient databaseClient,
Expand Down Expand Up @@ -161,23 +142,6 @@ private static SpannerAccessor createAndConnect(SpannerConfig spannerConfig) {
.setTotalTimeout(org.threeten.bp.Duration.ofMinutes(120))
.build());

ManagedInstantiatingExecutorProvider executorProvider =
new ManagedInstantiatingExecutorProvider(
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("Cloud-Spanner-TransportChannel-%d")
.build());

InstantiatingGrpcChannelProvider.Builder instantiatingGrpcChannelProvider =
InstantiatingGrpcChannelProvider.newBuilder()
.setMaxInboundMessageSize(MAX_MESSAGE_SIZE)
.setMaxInboundMetadataSize(MAX_METADATA_SIZE)
.setPoolSize(NUM_CHANNELS)
.setExecutorProvider(executorProvider)
.setKeepAliveTime(GRPC_KEEP_ALIVE_SECONDS)
.setInterceptorProvider(SpannerInterceptorProvider.createDefault())
.setAttemptDirectPath(true);

ValueProvider<String> projectId = spannerConfig.getProjectId();
if (projectId != null) {
builder.setProjectId(projectId.get());
Expand All @@ -189,34 +153,14 @@ private static SpannerAccessor createAndConnect(SpannerConfig spannerConfig) {
ValueProvider<String> host = spannerConfig.getHost();
if (host != null) {
builder.setHost(host.get());
instantiatingGrpcChannelProvider.setEndpoint(getEndpoint(host.get()));
}
ValueProvider<String> emulatorHost = spannerConfig.getEmulatorHost();
if (emulatorHost != null) {
builder.setEmulatorHost(emulatorHost.get());
builder.setCredentials(NoCredentials.getInstance());
} else {
String userAgentString = USER_AGENT_PREFIX + "/" + ReleaseInfo.getReleaseInfo().getVersion();
/* Workaround to setup user-agent string.
* InstantiatingGrpcChannelProvider will override the settings provided.
* The section below and all associated artifacts will be removed once the bug
* that prevents setting user-agent is fixed.
* https://github.com/googleapis/java-spanner/pull/871
*
* Code to be replaced:
* builder.setHeaderProvider(FixedHeaderProvider.create("user-agent", userAgentString));
*/
instantiatingGrpcChannelProvider.setHeaderProvider(
new HeaderProvider() {
@Override
public Map<String, String> getHeaders() {
final Map<String, String> headers = new HashMap<>();
headers.put("user-agent", userAgentString);
return headers;
}
});
builder.setChannelProvider(instantiatingGrpcChannelProvider.build());
}
String userAgentString = USER_AGENT_PREFIX + "/" + ReleaseInfo.getReleaseInfo().getVersion();
builder.setHeaderProvider(FixedHeaderProvider.create("user-agent", userAgentString));
SpannerOptions options = builder.build();

Spanner spanner = options.getService();
Expand All @@ -232,17 +176,6 @@ public Map<String, String> getHeaders() {
spanner, databaseClient, databaseAdminClient, batchClient, spannerConfig);
}

private static String getEndpoint(String host) {
URL url;
try {
url = new URL(host);
} catch (MalformedURLException e) {
throw new IllegalArgumentException("Invalid host: " + host, e);
}
return String.format(
"%s:%s", url.getHost(), url.getPort() < 0 ? url.getDefaultPort() : url.getPort());
}

public DatabaseClient getDatabaseClient() {
return databaseClient;
}
Expand Down Expand Up @@ -291,32 +224,4 @@ public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
return next.newCall(method, callOptions);
}
}

private static final class ManagedInstantiatingExecutorProvider implements ExecutorProvider {
// 4 Gapic clients * 4 channels per client.
private static final int DEFAULT_MIN_THREAD_COUNT = 16;
private final List<ScheduledExecutorService> executors = new ArrayList<>();
private final ThreadFactory threadFactory;

private ManagedInstantiatingExecutorProvider(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
}

@Override
public boolean shouldAutoClose() {
return false;
}

@Override
public ScheduledExecutorService getExecutor() {
int numCpus = Runtime.getRuntime().availableProcessors();
int numThreads = Math.max(DEFAULT_MIN_THREAD_COUNT, numCpus);
ScheduledExecutorService executor =
new ScheduledThreadPoolExecutor(numThreads, threadFactory);
synchronized (this) {
executors.add(executor);
}
return executor;
}
}
}

0 comments on commit 9c173d5

Please sign in to comment.