Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: PSL Connector Writer support #121

Merged
merged 16 commits into from Apr 2, 2021
5 changes: 5 additions & 0 deletions pom.xml
Expand Up @@ -113,6 +113,11 @@
<version>${scala.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-java8-compat_2.11</artifactId>
<version>0.9.1</version>
</dependency>

<!--test dependencies-->
<dependency>
Expand Down
129 changes: 129 additions & 0 deletions src/main/java/com/google/cloud/pubsublite/spark/CachedPublishers.java
@@ -0,0 +1,129 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.spark;

import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;
import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultMetadata;
import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultSettings;

import com.google.api.core.ApiService;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.AdminClientSettings;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.internal.CloseableMonitor;
import com.google.cloud.pubsublite.internal.Publisher;
import com.google.cloud.pubsublite.internal.wire.PartitionCountWatchingPublisherSettings;
import com.google.cloud.pubsublite.internal.wire.PubsubContext;
import com.google.cloud.pubsublite.internal.wire.RoutingMetadata;
import com.google.cloud.pubsublite.internal.wire.SinglePartitionPublisherBuilder;
import com.google.cloud.pubsublite.v1.AdminServiceClient;
import com.google.cloud.pubsublite.v1.AdminServiceSettings;
import com.google.cloud.pubsublite.v1.PublisherServiceClient;
import com.google.cloud.pubsublite.v1.PublisherServiceSettings;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import javax.annotation.concurrent.GuardedBy;

/** Cached {@link Publisher}s to reuse publisher of same settings in the same task. */
public class CachedPublishers {

private final CloseableMonitor monitor = new CloseableMonitor();

private final Executor listenerExecutor = Executors.newSingleThreadExecutor();
jiangmichaellll marked this conversation as resolved.
Show resolved Hide resolved

@GuardedBy("monitor.monitor")
jiangmichaellll marked this conversation as resolved.
Show resolved Hide resolved
private static final Map<PslWriteDataSourceOptions, Publisher<MessageMetadata>> publishers =
new HashMap<>();

public Publisher<MessageMetadata> getOrCreate(PslWriteDataSourceOptions writeOptions) {
try (CloseableMonitor.Hold h = monitor.enter()) {
Publisher<MessageMetadata> publisher = publishers.get(writeOptions);
if (publisher != null) {
return publisher;
}

publisher = createPublisherInternal(writeOptions);
publishers.put(writeOptions, publisher);
publisher.addListener(
new ApiService.Listener() {
@Override
public void failed(ApiService.State s, Throwable t) {
try (CloseableMonitor.Hold h = monitor.enter()) {
publishers.remove(writeOptions);
}
}
},
listenerExecutor);
publisher.startAsync().awaitRunning();
return publisher;
}
}

private PublisherServiceClient newServiceClient(
PslWriteDataSourceOptions writeOptions, Partition partition) throws ApiException {
PublisherServiceSettings.Builder settingsBuilder = PublisherServiceSettings.newBuilder();
settingsBuilder = settingsBuilder.setCredentialsProvider(writeOptions.getCredentialProvider());
settingsBuilder =
addDefaultMetadata(
PubsubContext.of(Constants.FRAMEWORK),
RoutingMetadata.of(writeOptions.topicPath(), partition),
settingsBuilder);
try {
return PublisherServiceClient.create(
addDefaultSettings(writeOptions.topicPath().location().region(), settingsBuilder));
} catch (Throwable t) {
throw toCanonical(t).underlying;
}
}

private AdminClient getAdminClient(PslWriteDataSourceOptions writeOptions) throws ApiException {
try {
return AdminClient.create(
AdminClientSettings.newBuilder()
.setServiceClient(
AdminServiceClient.create(
addDefaultSettings(
writeOptions.topicPath().location().region(),
AdminServiceSettings.newBuilder()
.setCredentialsProvider(writeOptions.getCredentialProvider()))))
.setRegion(writeOptions.topicPath().location().region())
.build());
} catch (Throwable t) {
throw toCanonical(t).underlying;
}
}

private Publisher<MessageMetadata> createPublisherInternal(
PslWriteDataSourceOptions writeOptions) {
return PartitionCountWatchingPublisherSettings.newBuilder()
.setTopic(writeOptions.topicPath())
.setPublisherFactory(
partition ->
SinglePartitionPublisherBuilder.newBuilder()
.setTopic(writeOptions.topicPath())
.setPartition(partition)
.setServiceClient(newServiceClient(writeOptions, partition))
.build())
.setAdminClient(getAdminClient(writeOptions))
.build()
.instantiate();
}
}
14 changes: 8 additions & 6 deletions src/main/java/com/google/cloud/pubsublite/spark/Constants.java
Expand Up @@ -17,7 +17,9 @@
package com.google.cloud.pubsublite.spark;

import com.google.cloud.pubsublite.internal.wire.PubsubContext;
import org.apache.spark.sql.types.ArrayType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.MapType;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
Expand All @@ -26,6 +28,10 @@ public class Constants {
public static long DEFAULT_BYTES_OUTSTANDING = 50_000_000;
public static long DEFAULT_MESSAGES_OUTSTANDING = Long.MAX_VALUE;
public static long DEFAULT_MAX_MESSAGES_PER_BATCH = Long.MAX_VALUE;
public static ArrayType ATTRIBUTES_PER_KEY_DATATYPE =
DataTypes.createArrayType(DataTypes.BinaryType);
public static MapType ATTRIBUTES_DATATYPE =
DataTypes.createMapType(DataTypes.StringType, ATTRIBUTES_PER_KEY_DATATYPE);
public static StructType DEFAULT_SCHEMA =
new StructType(
new StructField[] {
Expand All @@ -36,12 +42,7 @@ public class Constants {
new StructField("data", DataTypes.BinaryType, false, Metadata.empty()),
new StructField("publish_timestamp", DataTypes.TimestampType, false, Metadata.empty()),
new StructField("event_timestamp", DataTypes.TimestampType, true, Metadata.empty()),
new StructField(
"attributes",
DataTypes.createMapType(
DataTypes.StringType, DataTypes.createArrayType(DataTypes.BinaryType)),
true,
Metadata.empty())
new StructField("attributes", ATTRIBUTES_DATATYPE, true, Metadata.empty())
});

public static final PubsubContext.Framework FRAMEWORK = PubsubContext.Framework.of("SPARK");
Expand All @@ -52,6 +53,7 @@ public class Constants {
"pubsublite.flowcontrol.byteoutstandingperpartition";
public static String MESSAGES_OUTSTANDING_CONFIG_KEY =
"pubsublite.flowcontrol.messageoutstandingperparition";
public static String TOPIC_CONFIG_KEY = "pubsublite.topic";
public static String SUBSCRIPTION_CONFIG_KEY = "pubsublite.subscription";
public static String CREDENTIALS_KEY_CONFIG_KEY = "gcp.credentials.key";
}
Expand Up @@ -28,7 +28,15 @@ public class PslCredentialsProvider implements CredentialsProvider {

private final Credentials credentials;

public PslCredentialsProvider(PslDataSourceOptions options) {
public PslCredentialsProvider(PslReadDataSourceOptions options) {
jiangmichaellll marked this conversation as resolved.
Show resolved Hide resolved
if (options.credentialsKey() != null) {
this.credentials = createCredentialsFromKey(options.credentialsKey());
} else {
this.credentials = createDefaultCredentials();
}
}

public PslCredentialsProvider(PslWriteDataSourceOptions options) {
if (options.credentialsKey() != null) {
this.credentials = createCredentialsFromKey(options.credentialsKey());
} else {
Expand Down
58 changes: 37 additions & 21 deletions src/main/java/com/google/cloud/pubsublite/spark/PslDataSource.java
Expand Up @@ -30,13 +30,20 @@
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.DataSourceV2;
import org.apache.spark.sql.sources.v2.MicroBatchReadSupport;
import org.apache.spark.sql.sources.v2.StreamWriteSupport;
import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReader;
import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader;
import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.types.StructType;

@AutoService(DataSourceRegister.class)
public final class PslDataSource
implements DataSourceV2, ContinuousReadSupport, MicroBatchReadSupport, DataSourceRegister {
implements DataSourceV2,
ContinuousReadSupport,
MicroBatchReadSupport,
StreamWriteSupport,
DataSourceRegister {

@Override
public String shortName() {
Expand All @@ -51,23 +58,24 @@ public ContinuousReader createContinuousReader(
"PubSub Lite uses fixed schema and custom schema is not allowed");
}

PslDataSourceOptions pslDataSourceOptions =
PslDataSourceOptions.fromSparkDataSourceOptions(options);
SubscriptionPath subscriptionPath = pslDataSourceOptions.subscriptionPath();
PslReadDataSourceOptions pslReadDataSourceOptions =
PslReadDataSourceOptions.fromSparkDataSourceOptions(options);
SubscriptionPath subscriptionPath = pslReadDataSourceOptions.subscriptionPath();
TopicPath topicPath;
try (AdminClient adminClient = pslDataSourceOptions.newAdminClient()) {
try (AdminClient adminClient = pslReadDataSourceOptions.newAdminClient()) {
topicPath = TopicPath.parse(adminClient.getSubscription(subscriptionPath).get().getTopic());
} catch (Throwable t) {
throw toCanonical(t).underlying;
}
PartitionCountReader partitionCountReader =
new CachedPartitionCountReader(pslDataSourceOptions.newAdminClient(), topicPath);
new CachedPartitionCountReader(pslReadDataSourceOptions.newAdminClient(), topicPath);
return new PslContinuousReader(
pslDataSourceOptions.newCursorClient(),
pslDataSourceOptions.newMultiPartitionCommitter(partitionCountReader.getPartitionCount()),
pslDataSourceOptions.getSubscriberFactory(),
pslReadDataSourceOptions.newCursorClient(),
pslReadDataSourceOptions.newMultiPartitionCommitter(
partitionCountReader.getPartitionCount()),
pslReadDataSourceOptions.getSubscriberFactory(),
subscriptionPath,
Objects.requireNonNull(pslDataSourceOptions.flowControlSettings()),
Objects.requireNonNull(pslReadDataSourceOptions.flowControlSettings()),
partitionCountReader);
}

Expand All @@ -79,28 +87,36 @@ public MicroBatchReader createMicroBatchReader(
"PubSub Lite uses fixed schema and custom schema is not allowed");
}

PslDataSourceOptions pslDataSourceOptions =
PslDataSourceOptions.fromSparkDataSourceOptions(options);
SubscriptionPath subscriptionPath = pslDataSourceOptions.subscriptionPath();
PslReadDataSourceOptions pslReadDataSourceOptions =
PslReadDataSourceOptions.fromSparkDataSourceOptions(options);
SubscriptionPath subscriptionPath = pslReadDataSourceOptions.subscriptionPath();
TopicPath topicPath;
try (AdminClient adminClient = pslDataSourceOptions.newAdminClient()) {
try (AdminClient adminClient = pslReadDataSourceOptions.newAdminClient()) {
topicPath = TopicPath.parse(adminClient.getSubscription(subscriptionPath).get().getTopic());
} catch (Throwable t) {
throw toCanonical(t).underlying;
}
PartitionCountReader partitionCountReader =
new CachedPartitionCountReader(pslDataSourceOptions.newAdminClient(), topicPath);
new CachedPartitionCountReader(pslReadDataSourceOptions.newAdminClient(), topicPath);
return new PslMicroBatchReader(
pslDataSourceOptions.newCursorClient(),
pslDataSourceOptions.newMultiPartitionCommitter(partitionCountReader.getPartitionCount()),
pslDataSourceOptions.getSubscriberFactory(),
pslReadDataSourceOptions.newCursorClient(),
pslReadDataSourceOptions.newMultiPartitionCommitter(
partitionCountReader.getPartitionCount()),
pslReadDataSourceOptions.getSubscriberFactory(),
new LimitingHeadOffsetReader(
pslDataSourceOptions.newTopicStatsClient(),
pslReadDataSourceOptions.newTopicStatsClient(),
topicPath,
partitionCountReader,
Ticker.systemTicker()),
subscriptionPath,
Objects.requireNonNull(pslDataSourceOptions.flowControlSettings()),
pslDataSourceOptions.maxMessagesPerBatch());
Objects.requireNonNull(pslReadDataSourceOptions.flowControlSettings()),
pslReadDataSourceOptions.maxMessagesPerBatch());
}

@Override
public StreamWriter createStreamWriter(
String queryId, StructType schema, OutputMode mode, DataSourceOptions options) {
return new PslStreamWriter(
schema, PslWriteDataSourceOptions.fromSparkDataSourceOptions(options));
}
}