Skip to content

Commit

Permalink
feat: Add non google default creds provider to subscribers in Spark C…
Browse files Browse the repository at this point in the history
…onnector (#440)
  • Loading branch information
jiangmichaellll committed Jan 9, 2021
1 parent 5190989 commit 2099751
Show file tree
Hide file tree
Showing 17 changed files with 150 additions and 62 deletions.
Expand Up @@ -16,7 +16,7 @@

package com.google.cloud.pubsublite;

import static com.google.cloud.pubsublite.internal.ServiceClients.addDefaultSettings;
import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultSettings;

import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.ApiException;
Expand Down
Expand Up @@ -16,7 +16,7 @@
package com.google.cloud.pubsublite.internal;

import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;
import static com.google.cloud.pubsublite.internal.ServiceClients.addDefaultSettings;
import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultSettings;

import com.google.api.gax.rpc.ApiException;
import com.google.auto.value.AutoValue;
Expand Down
Expand Up @@ -16,7 +16,7 @@
package com.google.cloud.pubsublite.internal;

import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;
import static com.google.cloud.pubsublite.internal.ServiceClients.addDefaultSettings;
import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultSettings;

import com.google.api.gax.rpc.ApiException;
import com.google.auto.value.AutoValue;
Expand Down
Expand Up @@ -17,7 +17,7 @@
package com.google.cloud.pubsublite.internal.wire;

import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;
import static com.google.cloud.pubsublite.internal.ServiceClients.addDefaultSettings;
import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultSettings;

import com.google.api.gax.rpc.ApiException;
import com.google.auto.value.AutoValue;
Expand Down
Expand Up @@ -17,7 +17,7 @@
package com.google.cloud.pubsublite.internal.wire;

import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;
import static com.google.cloud.pubsublite.internal.ServiceClients.addDefaultSettings;
import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultSettings;

import com.google.api.gax.rpc.ApiException;
import com.google.auto.value.AutoValue;
Expand Down
Expand Up @@ -17,7 +17,8 @@
package com.google.cloud.pubsublite.internal.wire;

import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;
import static com.google.cloud.pubsublite.internal.ServiceClients.addDefaultSettings;
import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultMetadata;
import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultSettings;

import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.batching.FlowController.LimitExceededBehavior;
Expand All @@ -32,8 +33,6 @@
import com.google.cloud.pubsublite.v1.PublisherServiceClient;
import com.google.cloud.pubsublite.v1.PublisherServiceSettings;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import java.util.Map;
import java.util.Optional;
import org.threeten.bp.Duration;

Expand Down Expand Up @@ -110,19 +109,14 @@ public Publisher<Offset> build() throws ApiException {
serviceClient = autoBuilt.serviceClient().get();
} else {
try {
Map<String, String> metadata = autoBuilt.context().getMetadata();
Map<String, String> routingMetadata =
RoutingMetadata.of(autoBuilt.topic(), autoBuilt.partition());
Map<String, String> allMetadata =
ImmutableMap.<String, String>builder()
.putAll(metadata)
.putAll(routingMetadata)
.build();
PublisherServiceSettings.Builder settingsBuilder = PublisherServiceSettings.newBuilder();
addDefaultMetadata(
autoBuilt.context(),
RoutingMetadata.of(autoBuilt.topic(), autoBuilt.partition()),
settingsBuilder);
serviceClient =
PublisherServiceClient.create(
addDefaultSettings(
autoBuilt.topic().location().region(),
PublisherServiceSettings.newBuilder().setHeaderProvider(() -> allMetadata)));
addDefaultSettings(autoBuilt.topic().location().region(), settingsBuilder));
} catch (Throwable t) {
throw toCanonical(t).underlying;
}
Expand Down
Expand Up @@ -28,31 +28,43 @@
import java.nio.charset.StandardCharsets;
import java.util.Map;

final class RoutingMetadata {
private RoutingMetadata() {}
public final class RoutingMetadata {

static final String PARAMS_HEADER = "x-goog-request-params";
private static final String PARAMS_HEADER = "x-goog-request-params";
private final Map<String, String> metadata;

static Map<String, String> of(TopicPath topic, Partition partition) throws ApiException {
public static RoutingMetadata of(TopicPath topic, Partition partition) throws ApiException {
return new RoutingMetadata(topic, partition);
}

public static RoutingMetadata of(SubscriptionPath subscription, Partition partition)
throws ApiException {
return new RoutingMetadata(subscription, partition);
}

private RoutingMetadata(TopicPath topic, Partition partition) {
try {
String topic_value = URLEncoder.encode(topic.toString(), StandardCharsets.UTF_8.toString());
String params = String.format("partition=%s&topic=%s", partition.value(), topic_value);
return ImmutableMap.of(PARAMS_HEADER, params);
this.metadata = ImmutableMap.of(PARAMS_HEADER, params);
} catch (UnsupportedEncodingException e) {
throw toCanonical(e).underlying;
}
}

static Map<String, String> of(SubscriptionPath subscription, Partition partition)
throws ApiException {
private RoutingMetadata(SubscriptionPath subscription, Partition partition) {
try {
String subscription_value =
URLEncoder.encode(subscription.toString(), StandardCharsets.UTF_8.toString());
String params =
String.format("partition=%s&subscription=%s", partition.value(), subscription_value);
return ImmutableMap.of(PARAMS_HEADER, params);
this.metadata = ImmutableMap.of(PARAMS_HEADER, params);
} catch (UnsupportedEncodingException e) {
throw toCanonical(e).underlying;
}
}

public Map<String, String> getMetadata() {
return metadata;
}
}
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package com.google.cloud.pubsublite.internal;
package com.google.cloud.pubsublite.internal.wire;

import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;

Expand All @@ -25,6 +25,8 @@
import com.google.api.gax.rpc.ClientSettings;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.Endpoints;
import com.google.cloud.pubsublite.internal.Lazy;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.threeten.bp.Duration;
Expand Down Expand Up @@ -60,4 +62,18 @@ Settings addDefaultSettings(CloudRegion target, Builder builder) throws ApiExcep
throw toCanonical(t).underlying;
}
}

// Adds context routing metadata for publisher or subscriber.
public static <
Settings extends ClientSettings<Settings>,
Builder extends ClientSettings.Builder<Settings, Builder>>
Builder addDefaultMetadata(
PubsubContext context, RoutingMetadata routingMetadata, Builder builder) {
return builder.setHeaderProvider(
() ->
ImmutableMap.<String, String>builder()
.putAll(context.getMetadata())
.putAll(routingMetadata.getMetadata())
.build());
}
}
Expand Up @@ -17,7 +17,8 @@
package com.google.cloud.pubsublite.internal.wire;

import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;
import static com.google.cloud.pubsublite.internal.ServiceClients.addDefaultSettings;
import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultMetadata;
import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultSettings;

import com.google.api.gax.rpc.ApiException;
import com.google.auto.value.AutoValue;
Expand All @@ -28,8 +29,6 @@
import com.google.cloud.pubsublite.v1.SubscriberServiceClient;
import com.google.cloud.pubsublite.v1.SubscriberServiceSettings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;

Expand Down Expand Up @@ -77,19 +76,16 @@ public Subscriber build() throws ApiException {
serviceClient = autoBuilt.serviceClient().get();
} else {
try {
Map<String, String> metadata = autoBuilt.context().getMetadata();
Map<String, String> routingMetadata =
RoutingMetadata.of(autoBuilt.subscriptionPath(), autoBuilt.partition());
Map<String, String> allMetadata =
ImmutableMap.<String, String>builder()
.putAll(metadata)
.putAll(routingMetadata)
.build();
SubscriberServiceSettings.Builder settingsBuilder =
SubscriberServiceSettings.newBuilder();
addDefaultMetadata(
autoBuilt.context(),
RoutingMetadata.of(autoBuilt.subscriptionPath(), autoBuilt.partition()),
settingsBuilder);
serviceClient =
SubscriberServiceClient.create(
addDefaultSettings(
autoBuilt.subscriptionPath().location().region(),
SubscriberServiceSettings.newBuilder().setHeaderProvider(() -> allMetadata)));
autoBuilt.subscriptionPath().location().region(), settingsBuilder));
} catch (Throwable t) {
throw toCanonical(t).underlying;
}
Expand Down
@@ -0,0 +1,31 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.spark;

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.internal.wire.Subscriber;
import com.google.common.collect.ImmutableList;
import java.io.Serializable;
import java.util.function.Consumer;

public interface PartitionSubscriberFactory extends Serializable {
Subscriber newSubscriber(
Partition partition, Consumer<ImmutableList<SequencedMessage>> message_consumer)
throws ApiException;
}
Expand Up @@ -20,8 +20,7 @@
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.internal.BlockingPullSubscriberImpl;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.wire.PubsubContext;
import com.google.cloud.pubsublite.internal.wire.SubscriberBuilder;
import com.google.cloud.pubsublite.internal.wire.SubscriberFactory;
import com.google.cloud.pubsublite.proto.Cursor;
import com.google.cloud.pubsublite.proto.SeekRequest;
import java.io.Serializable;
Expand All @@ -33,14 +32,17 @@
public class PslContinuousInputPartition
implements ContinuousInputPartition<InternalRow>, Serializable {

private final SubscriberFactory subscriberFactory;
private final SparkPartitionOffset startOffset;
private final SubscriptionPath subscriptionPath;
private final FlowControlSettings flowControlSettings;

public PslContinuousInputPartition(
SubscriberFactory subscriberFactory,
SparkPartitionOffset startOffset,
SubscriptionPath subscriptionPath,
FlowControlSettings flowControlSettings) {
this.subscriberFactory = subscriberFactory;
this.startOffset = startOffset;
this.subscriptionPath = subscriptionPath;
this.flowControlSettings = flowControlSettings;
Expand All @@ -59,14 +61,7 @@ public InputPartitionReader<InternalRow> createContinuousReader(PartitionOffset
try {
subscriber =
new BlockingPullSubscriberImpl(
// TODO(jiangmichael): Pass credentials settings here.
(consumer) ->
SubscriberBuilder.newBuilder()
.setSubscriptionPath(subscriptionPath)
.setPartition(pslPartitionOffset.partition())
.setContext(PubsubContext.of(Constants.FRAMEWORK))
.setMessageConsumer(consumer)
.build(),
subscriberFactory,
flowControlSettings,
SeekRequest.newBuilder()
.setCursor(
Expand Down
Expand Up @@ -35,6 +35,7 @@ public class PslContinuousReader implements ContinuousReader {

private final CursorClient cursorClient;
private final MultiPartitionCommitter committer;
private final PartitionSubscriberFactory partitionSubscriberFactory;
private final SubscriptionPath subscriptionPath;
private final FlowControlSettings flowControlSettings;
private final long topicPartitionCount;
Expand All @@ -44,11 +45,13 @@ public class PslContinuousReader implements ContinuousReader {
public PslContinuousReader(
CursorClient cursorClient,
MultiPartitionCommitter committer,
PartitionSubscriberFactory partitionSubscriberFactory,
SubscriptionPath subscriptionPath,
FlowControlSettings flowControlSettings,
long topicPartitionCount) {
this.cursorClient = cursorClient;
this.committer = committer;
this.partitionSubscriberFactory = partitionSubscriberFactory;
this.subscriptionPath = subscriptionPath;
this.flowControlSettings = flowControlSettings;
this.topicPartitionCount = topicPartitionCount;
Expand Down Expand Up @@ -104,10 +107,12 @@ public StructType readSchema() {

@Override
public List<InputPartition<InternalRow>> planInputPartitions() {

return startOffset.getPartitionOffsetMap().values().stream()
.map(
v ->
new PslContinuousInputPartition(
(consumer) -> partitionSubscriberFactory.newSubscriber(v.partition(), consumer),
SparkPartitionOffset.builder()
.partition(v.partition())
.offset(v.offset())
Expand Down

0 comments on commit 2099751

Please sign in to comment.