Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: PSL Connector Writer support #121

Merged
merged 16 commits into from Apr 2, 2021
25 changes: 25 additions & 0 deletions clirr-ignored-differences.xml
Expand Up @@ -12,4 +12,29 @@
<method>*</method>
<to>*</to>
</difference>
<difference>
<differenceType>8001</differenceType>
<className>com/google/cloud/pubsublite/spark/LimitingHeadOffsetReader</className>
</difference>
<difference>
<differenceType>8001</differenceType>
<className>com/google/cloud/pubsublite/spark/MultiPartitionCommitter*</className>
</difference>
<difference>
<differenceType>8001</differenceType>
<className>com/google/cloud/pubsublite/spark/PartitionSubscriberFactory</className>
</difference>
<difference>
<differenceType>8001</differenceType>
<className>com/google/cloud/pubsublite/spark/PerTopicHeadOffsetReader</className>
</difference>
<difference>
<differenceType>8001</differenceType>
<className>com/google/cloud/pubsublite/spark/PslCredentialsProvider</className>
</difference>
<difference>
<differenceType>8001</differenceType>
<className>com/google/cloud/pubsublite/spark/PslDataSourceOptions*</className>
</difference>

</differences>
5 changes: 5 additions & 0 deletions pom.xml
Expand Up @@ -113,6 +113,11 @@
<version>${scala.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-java8-compat_2.11</artifactId>
<version>0.9.1</version>
</dependency>

<!--test dependencies-->
<dependency>
Expand Down
31 changes: 24 additions & 7 deletions src/main/java/com/google/cloud/pubsublite/spark/Constants.java
Expand Up @@ -17,7 +17,12 @@
package com.google.cloud.pubsublite.spark;

import com.google.cloud.pubsublite.internal.wire.PubsubContext;
import com.google.common.collect.ImmutableMap;
import java.util.Map;
import org.apache.spark.sql.types.ArrayType;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.MapType;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
Expand All @@ -26,22 +31,33 @@ public class Constants {
public static long DEFAULT_BYTES_OUTSTANDING = 50_000_000;
public static long DEFAULT_MESSAGES_OUTSTANDING = Long.MAX_VALUE;
public static long DEFAULT_MAX_MESSAGES_PER_BATCH = Long.MAX_VALUE;

public static ArrayType ATTRIBUTES_PER_KEY_DATATYPE =
DataTypes.createArrayType(DataTypes.BinaryType);
public static MapType ATTRIBUTES_DATATYPE =
DataTypes.createMapType(DataTypes.StringType, ATTRIBUTES_PER_KEY_DATATYPE);
public static Map<String, DataType> PUBLISH_FIELD_TYPES =
ImmutableMap.of(
"key", DataTypes.BinaryType,
"data", DataTypes.BinaryType,
"attributes", ATTRIBUTES_DATATYPE,
"event_timestamp", DataTypes.TimestampType);
public static StructType DEFAULT_SCHEMA =
new StructType(
new StructField[] {
new StructField("subscription", DataTypes.StringType, false, Metadata.empty()),
new StructField("partition", DataTypes.LongType, false, Metadata.empty()),
new StructField("offset", DataTypes.LongType, false, Metadata.empty()),
new StructField("key", DataTypes.BinaryType, false, Metadata.empty()),
new StructField("data", DataTypes.BinaryType, false, Metadata.empty()),
new StructField("key", PUBLISH_FIELD_TYPES.get("key"), false, Metadata.empty()),
new StructField("data", PUBLISH_FIELD_TYPES.get("data"), false, Metadata.empty()),
new StructField("publish_timestamp", DataTypes.TimestampType, false, Metadata.empty()),
new StructField("event_timestamp", DataTypes.TimestampType, true, Metadata.empty()),
new StructField(
"attributes",
DataTypes.createMapType(
DataTypes.StringType, DataTypes.createArrayType(DataTypes.BinaryType)),
"event_timestamp",
PUBLISH_FIELD_TYPES.get("event_timestamp"),
true,
Metadata.empty())
Metadata.empty()),
new StructField(
"attributes", PUBLISH_FIELD_TYPES.get("attributes"), true, Metadata.empty())
});

public static final PubsubContext.Framework FRAMEWORK = PubsubContext.Framework.of("SPARK");
Expand All @@ -52,6 +68,7 @@ public class Constants {
"pubsublite.flowcontrol.byteoutstandingperpartition";
public static String MESSAGES_OUTSTANDING_CONFIG_KEY =
"pubsublite.flowcontrol.messageoutstandingperparition";
public static String TOPIC_CONFIG_KEY = "pubsublite.topic";
public static String SUBSCRIPTION_CONFIG_KEY = "pubsublite.subscription";
public static String CREDENTIALS_KEY_CONFIG_KEY = "gcp.credentials.key";
}
Expand Up @@ -22,6 +22,9 @@
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.internal.CursorClient;
import com.google.cloud.pubsublite.internal.wire.SubscriberFactory;
import com.google.cloud.pubsublite.spark.internal.MultiPartitionCommitter;
import com.google.cloud.pubsublite.spark.internal.PartitionCountReader;
import com.google.cloud.pubsublite.spark.internal.PartitionSubscriberFactory;
import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down
63 changes: 42 additions & 21 deletions src/main/java/com/google/cloud/pubsublite/spark/PslDataSource.java
Expand Up @@ -23,20 +23,30 @@
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.spark.internal.CachedPartitionCountReader;
import com.google.cloud.pubsublite.spark.internal.LimitingHeadOffsetReader;
import com.google.cloud.pubsublite.spark.internal.PartitionCountReader;
import java.util.Objects;
import java.util.Optional;
import org.apache.spark.sql.sources.DataSourceRegister;
import org.apache.spark.sql.sources.v2.ContinuousReadSupport;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.DataSourceV2;
import org.apache.spark.sql.sources.v2.MicroBatchReadSupport;
import org.apache.spark.sql.sources.v2.StreamWriteSupport;
import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReader;
import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader;
import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.types.StructType;

@AutoService(DataSourceRegister.class)
public final class PslDataSource
implements DataSourceV2, ContinuousReadSupport, MicroBatchReadSupport, DataSourceRegister {
implements DataSourceV2,
ContinuousReadSupport,
MicroBatchReadSupport,
StreamWriteSupport,
DataSourceRegister {

@Override
public String shortName() {
Expand All @@ -51,23 +61,24 @@ public ContinuousReader createContinuousReader(
"PubSub Lite uses fixed schema and custom schema is not allowed");
}

PslDataSourceOptions pslDataSourceOptions =
PslDataSourceOptions.fromSparkDataSourceOptions(options);
SubscriptionPath subscriptionPath = pslDataSourceOptions.subscriptionPath();
PslReadDataSourceOptions pslReadDataSourceOptions =
PslReadDataSourceOptions.fromSparkDataSourceOptions(options);
SubscriptionPath subscriptionPath = pslReadDataSourceOptions.subscriptionPath();
TopicPath topicPath;
try (AdminClient adminClient = pslDataSourceOptions.newAdminClient()) {
try (AdminClient adminClient = pslReadDataSourceOptions.newAdminClient()) {
topicPath = TopicPath.parse(adminClient.getSubscription(subscriptionPath).get().getTopic());
} catch (Throwable t) {
throw toCanonical(t).underlying;
}
PartitionCountReader partitionCountReader =
new CachedPartitionCountReader(pslDataSourceOptions.newAdminClient(), topicPath);
new CachedPartitionCountReader(pslReadDataSourceOptions.newAdminClient(), topicPath);
return new PslContinuousReader(
pslDataSourceOptions.newCursorClient(),
pslDataSourceOptions.newMultiPartitionCommitter(partitionCountReader.getPartitionCount()),
pslDataSourceOptions.getSubscriberFactory(),
pslReadDataSourceOptions.newCursorClient(),
pslReadDataSourceOptions.newMultiPartitionCommitter(
partitionCountReader.getPartitionCount()),
pslReadDataSourceOptions.getSubscriberFactory(),
subscriptionPath,
Objects.requireNonNull(pslDataSourceOptions.flowControlSettings()),
Objects.requireNonNull(pslReadDataSourceOptions.flowControlSettings()),
partitionCountReader);
}

Expand All @@ -79,28 +90,38 @@ public MicroBatchReader createMicroBatchReader(
"PubSub Lite uses fixed schema and custom schema is not allowed");
}

PslDataSourceOptions pslDataSourceOptions =
PslDataSourceOptions.fromSparkDataSourceOptions(options);
SubscriptionPath subscriptionPath = pslDataSourceOptions.subscriptionPath();
PslReadDataSourceOptions pslReadDataSourceOptions =
PslReadDataSourceOptions.fromSparkDataSourceOptions(options);
SubscriptionPath subscriptionPath = pslReadDataSourceOptions.subscriptionPath();
TopicPath topicPath;
try (AdminClient adminClient = pslDataSourceOptions.newAdminClient()) {
try (AdminClient adminClient = pslReadDataSourceOptions.newAdminClient()) {
topicPath = TopicPath.parse(adminClient.getSubscription(subscriptionPath).get().getTopic());
} catch (Throwable t) {
throw toCanonical(t).underlying;
}
PartitionCountReader partitionCountReader =
new CachedPartitionCountReader(pslDataSourceOptions.newAdminClient(), topicPath);
new CachedPartitionCountReader(pslReadDataSourceOptions.newAdminClient(), topicPath);
return new PslMicroBatchReader(
pslDataSourceOptions.newCursorClient(),
pslDataSourceOptions.newMultiPartitionCommitter(partitionCountReader.getPartitionCount()),
pslDataSourceOptions.getSubscriberFactory(),
pslReadDataSourceOptions.newCursorClient(),
pslReadDataSourceOptions.newMultiPartitionCommitter(
partitionCountReader.getPartitionCount()),
pslReadDataSourceOptions.getSubscriberFactory(),
new LimitingHeadOffsetReader(
pslDataSourceOptions.newTopicStatsClient(),
pslReadDataSourceOptions.newTopicStatsClient(),
topicPath,
partitionCountReader,
Ticker.systemTicker()),
subscriptionPath,
Objects.requireNonNull(pslDataSourceOptions.flowControlSettings()),
pslDataSourceOptions.maxMessagesPerBatch());
Objects.requireNonNull(pslReadDataSourceOptions.flowControlSettings()),
pslReadDataSourceOptions.maxMessagesPerBatch());
}

@Override
public StreamWriter createStreamWriter(
String queryId, StructType schema, OutputMode mode, DataSourceOptions options) {
PslSparkUtils.verifyWriteInputSchema(schema);
PslWriteDataSourceOptions pslWriteDataSourceOptions =
PslWriteDataSourceOptions.fromSparkDataSourceOptions(options);
return new PslStreamWriter(schema, pslWriteDataSourceOptions);
}
}
97 changes: 97 additions & 0 deletions src/main/java/com/google/cloud/pubsublite/spark/PslDataWriter.java
@@ -0,0 +1,97 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.spark;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiService;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.internal.Publisher;
import com.google.cloud.pubsublite.spark.internal.PublisherFactory;
import com.google.common.flogger.GoogleLogger;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import javax.annotation.concurrent.GuardedBy;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.sources.v2.writer.DataWriter;
import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
import org.apache.spark.sql.types.StructType;

public class PslDataWriter implements DataWriter<InternalRow> {

private static final GoogleLogger log = GoogleLogger.forEnclosingClass();

private final long partitionId, taskId, epochId;
private final StructType inputSchema;
private final PublisherFactory publisherFactory;

@GuardedBy("this")
private Optional<Publisher<MessageMetadata>> publisher = Optional.empty();

@GuardedBy("this")
private final List<ApiFuture<MessageMetadata>> futures = new ArrayList<>();

public PslDataWriter(
long partitionId,
long taskId,
long epochId,
StructType schema,
PublisherFactory publisherFactory) {
this.partitionId = partitionId;
this.taskId = taskId;
this.epochId = epochId;
this.inputSchema = schema;
this.publisherFactory = publisherFactory;
}

@Override
public synchronized void write(InternalRow record) {
if (!publisher.isPresent() || publisher.get().state() != ApiService.State.RUNNING) {
publisher = Optional.of(publisherFactory.newPublisher());
}
futures.add(
publisher
.get()
.publish(Objects.requireNonNull(PslSparkUtils.toPubSubMessage(inputSchema, record))));
}

@Override
public synchronized WriterCommitMessage commit() throws IOException {
for (ApiFuture<MessageMetadata> f : futures) {
try {
f.get();
} catch (InterruptedException | ExecutionException e) {
publisher = Optional.empty();
throw new IOException(e);
}
}
log.atInfo().log(
"All writes for partitionId:%d, taskId:%d, epochId:%d succeeded, committing...",
partitionId, taskId, epochId);
return PslWriterCommitMessage.create(futures.size());
}

@Override
public synchronized void abort() {
jiangmichaellll marked this conversation as resolved.
Show resolved Hide resolved
log.atWarning().log(
"One or more writes for partitionId:%d, taskId:%d, epochId:%d failed, aborted.",
partitionId, taskId, epochId);
}
}
@@ -0,0 +1,45 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.spark;

import com.google.cloud.pubsublite.spark.internal.CachedPublishers;
import com.google.cloud.pubsublite.spark.internal.PublisherFactory;
import java.io.Serializable;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.sources.v2.writer.DataWriter;
import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
import org.apache.spark.sql.types.StructType;

public class PslDataWriterFactory implements Serializable, DataWriterFactory<InternalRow> {
private static final long serialVersionUID = -6904546364310978844L;

private static final CachedPublishers CACHED_PUBLISHERS = new CachedPublishers();

private final StructType inputSchema;
private final PslWriteDataSourceOptions writeOptions;
jiangmichaellll marked this conversation as resolved.
Show resolved Hide resolved

public PslDataWriterFactory(StructType inputSchema, PslWriteDataSourceOptions writeOptions) {
this.inputSchema = inputSchema;
this.writeOptions = writeOptions;
}

@Override
public DataWriter<InternalRow> createDataWriter(int partitionId, long taskId, long epochId) {
PublisherFactory pg = () -> CACHED_PUBLISHERS.getOrCreate(writeOptions);
jiangmichaellll marked this conversation as resolved.
Show resolved Hide resolved
return new PslDataWriter(partitionId, taskId, epochId, inputSchema, pg);
}
}
Expand Up @@ -24,6 +24,9 @@
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.internal.CursorClient;
import com.google.cloud.pubsublite.internal.wire.SubscriberFactory;
import com.google.cloud.pubsublite.spark.internal.MultiPartitionCommitter;
import com.google.cloud.pubsublite.spark.internal.PartitionSubscriberFactory;
import com.google.cloud.pubsublite.spark.internal.PerTopicHeadOffsetReader;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
Expand Down