Skip to content

Commit

Permalink
feat: PSL Connector Writer support (#121)
Browse files Browse the repository at this point in the history
  • Loading branch information
jiangmichaellll committed Apr 2, 2021
1 parent 2188988 commit 92cfdfd
Show file tree
Hide file tree
Showing 32 changed files with 986 additions and 59 deletions.
25 changes: 25 additions & 0 deletions clirr-ignored-differences.xml
Expand Up @@ -12,4 +12,29 @@
<method>*</method>
<to>*</to>
</difference>
<difference>
<differenceType>8001</differenceType>
<className>com/google/cloud/pubsublite/spark/LimitingHeadOffsetReader</className>
</difference>
<difference>
<differenceType>8001</differenceType>
<className>com/google/cloud/pubsublite/spark/MultiPartitionCommitter*</className>
</difference>
<difference>
<differenceType>8001</differenceType>
<className>com/google/cloud/pubsublite/spark/PartitionSubscriberFactory</className>
</difference>
<difference>
<differenceType>8001</differenceType>
<className>com/google/cloud/pubsublite/spark/PerTopicHeadOffsetReader</className>
</difference>
<difference>
<differenceType>8001</differenceType>
<className>com/google/cloud/pubsublite/spark/PslCredentialsProvider</className>
</difference>
<difference>
<differenceType>8001</differenceType>
<className>com/google/cloud/pubsublite/spark/PslDataSourceOptions*</className>
</difference>

</differences>
5 changes: 5 additions & 0 deletions pom.xml
Expand Up @@ -113,6 +113,11 @@
<version>${scala.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-java8-compat_2.11</artifactId>
<version>0.9.1</version>
</dependency>

<!--test dependencies-->
<dependency>
Expand Down
31 changes: 24 additions & 7 deletions src/main/java/com/google/cloud/pubsublite/spark/Constants.java
Expand Up @@ -17,7 +17,12 @@
package com.google.cloud.pubsublite.spark;

import com.google.cloud.pubsublite.internal.wire.PubsubContext;
import com.google.common.collect.ImmutableMap;
import java.util.Map;
import org.apache.spark.sql.types.ArrayType;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.MapType;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
Expand All @@ -26,22 +31,33 @@ public class Constants {
public static long DEFAULT_BYTES_OUTSTANDING = 50_000_000;
public static long DEFAULT_MESSAGES_OUTSTANDING = Long.MAX_VALUE;
public static long DEFAULT_MAX_MESSAGES_PER_BATCH = Long.MAX_VALUE;

public static ArrayType ATTRIBUTES_PER_KEY_DATATYPE =
DataTypes.createArrayType(DataTypes.BinaryType);
public static MapType ATTRIBUTES_DATATYPE =
DataTypes.createMapType(DataTypes.StringType, ATTRIBUTES_PER_KEY_DATATYPE);
public static Map<String, DataType> PUBLISH_FIELD_TYPES =
ImmutableMap.of(
"key", DataTypes.BinaryType,
"data", DataTypes.BinaryType,
"attributes", ATTRIBUTES_DATATYPE,
"event_timestamp", DataTypes.TimestampType);
public static StructType DEFAULT_SCHEMA =
new StructType(
new StructField[] {
new StructField("subscription", DataTypes.StringType, false, Metadata.empty()),
new StructField("partition", DataTypes.LongType, false, Metadata.empty()),
new StructField("offset", DataTypes.LongType, false, Metadata.empty()),
new StructField("key", DataTypes.BinaryType, false, Metadata.empty()),
new StructField("data", DataTypes.BinaryType, false, Metadata.empty()),
new StructField("key", PUBLISH_FIELD_TYPES.get("key"), false, Metadata.empty()),
new StructField("data", PUBLISH_FIELD_TYPES.get("data"), false, Metadata.empty()),
new StructField("publish_timestamp", DataTypes.TimestampType, false, Metadata.empty()),
new StructField("event_timestamp", DataTypes.TimestampType, true, Metadata.empty()),
new StructField(
"attributes",
DataTypes.createMapType(
DataTypes.StringType, DataTypes.createArrayType(DataTypes.BinaryType)),
"event_timestamp",
PUBLISH_FIELD_TYPES.get("event_timestamp"),
true,
Metadata.empty())
Metadata.empty()),
new StructField(
"attributes", PUBLISH_FIELD_TYPES.get("attributes"), true, Metadata.empty())
});

public static final PubsubContext.Framework FRAMEWORK = PubsubContext.Framework.of("SPARK");
Expand All @@ -52,6 +68,7 @@ public class Constants {
"pubsublite.flowcontrol.byteoutstandingperpartition";
public static String MESSAGES_OUTSTANDING_CONFIG_KEY =
"pubsublite.flowcontrol.messageoutstandingperparition";
public static String TOPIC_CONFIG_KEY = "pubsublite.topic";
public static String SUBSCRIPTION_CONFIG_KEY = "pubsublite.subscription";
public static String CREDENTIALS_KEY_CONFIG_KEY = "gcp.credentials.key";
}
Expand Up @@ -22,6 +22,9 @@
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.internal.CursorClient;
import com.google.cloud.pubsublite.internal.wire.SubscriberFactory;
import com.google.cloud.pubsublite.spark.internal.MultiPartitionCommitter;
import com.google.cloud.pubsublite.spark.internal.PartitionCountReader;
import com.google.cloud.pubsublite.spark.internal.PartitionSubscriberFactory;
import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down
63 changes: 42 additions & 21 deletions src/main/java/com/google/cloud/pubsublite/spark/PslDataSource.java
Expand Up @@ -23,20 +23,30 @@
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.spark.internal.CachedPartitionCountReader;
import com.google.cloud.pubsublite.spark.internal.LimitingHeadOffsetReader;
import com.google.cloud.pubsublite.spark.internal.PartitionCountReader;
import java.util.Objects;
import java.util.Optional;
import org.apache.spark.sql.sources.DataSourceRegister;
import org.apache.spark.sql.sources.v2.ContinuousReadSupport;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.DataSourceV2;
import org.apache.spark.sql.sources.v2.MicroBatchReadSupport;
import org.apache.spark.sql.sources.v2.StreamWriteSupport;
import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReader;
import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader;
import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.types.StructType;

@AutoService(DataSourceRegister.class)
public final class PslDataSource
implements DataSourceV2, ContinuousReadSupport, MicroBatchReadSupport, DataSourceRegister {
implements DataSourceV2,
ContinuousReadSupport,
MicroBatchReadSupport,
StreamWriteSupport,
DataSourceRegister {

@Override
public String shortName() {
Expand All @@ -51,23 +61,24 @@ public ContinuousReader createContinuousReader(
"PubSub Lite uses fixed schema and custom schema is not allowed");
}

PslDataSourceOptions pslDataSourceOptions =
PslDataSourceOptions.fromSparkDataSourceOptions(options);
SubscriptionPath subscriptionPath = pslDataSourceOptions.subscriptionPath();
PslReadDataSourceOptions pslReadDataSourceOptions =
PslReadDataSourceOptions.fromSparkDataSourceOptions(options);
SubscriptionPath subscriptionPath = pslReadDataSourceOptions.subscriptionPath();
TopicPath topicPath;
try (AdminClient adminClient = pslDataSourceOptions.newAdminClient()) {
try (AdminClient adminClient = pslReadDataSourceOptions.newAdminClient()) {
topicPath = TopicPath.parse(adminClient.getSubscription(subscriptionPath).get().getTopic());
} catch (Throwable t) {
throw toCanonical(t).underlying;
}
PartitionCountReader partitionCountReader =
new CachedPartitionCountReader(pslDataSourceOptions.newAdminClient(), topicPath);
new CachedPartitionCountReader(pslReadDataSourceOptions.newAdminClient(), topicPath);
return new PslContinuousReader(
pslDataSourceOptions.newCursorClient(),
pslDataSourceOptions.newMultiPartitionCommitter(partitionCountReader.getPartitionCount()),
pslDataSourceOptions.getSubscriberFactory(),
pslReadDataSourceOptions.newCursorClient(),
pslReadDataSourceOptions.newMultiPartitionCommitter(
partitionCountReader.getPartitionCount()),
pslReadDataSourceOptions.getSubscriberFactory(),
subscriptionPath,
Objects.requireNonNull(pslDataSourceOptions.flowControlSettings()),
Objects.requireNonNull(pslReadDataSourceOptions.flowControlSettings()),
partitionCountReader);
}

Expand All @@ -79,28 +90,38 @@ public MicroBatchReader createMicroBatchReader(
"PubSub Lite uses fixed schema and custom schema is not allowed");
}

PslDataSourceOptions pslDataSourceOptions =
PslDataSourceOptions.fromSparkDataSourceOptions(options);
SubscriptionPath subscriptionPath = pslDataSourceOptions.subscriptionPath();
PslReadDataSourceOptions pslReadDataSourceOptions =
PslReadDataSourceOptions.fromSparkDataSourceOptions(options);
SubscriptionPath subscriptionPath = pslReadDataSourceOptions.subscriptionPath();
TopicPath topicPath;
try (AdminClient adminClient = pslDataSourceOptions.newAdminClient()) {
try (AdminClient adminClient = pslReadDataSourceOptions.newAdminClient()) {
topicPath = TopicPath.parse(adminClient.getSubscription(subscriptionPath).get().getTopic());
} catch (Throwable t) {
throw toCanonical(t).underlying;
}
PartitionCountReader partitionCountReader =
new CachedPartitionCountReader(pslDataSourceOptions.newAdminClient(), topicPath);
new CachedPartitionCountReader(pslReadDataSourceOptions.newAdminClient(), topicPath);
return new PslMicroBatchReader(
pslDataSourceOptions.newCursorClient(),
pslDataSourceOptions.newMultiPartitionCommitter(partitionCountReader.getPartitionCount()),
pslDataSourceOptions.getSubscriberFactory(),
pslReadDataSourceOptions.newCursorClient(),
pslReadDataSourceOptions.newMultiPartitionCommitter(
partitionCountReader.getPartitionCount()),
pslReadDataSourceOptions.getSubscriberFactory(),
new LimitingHeadOffsetReader(
pslDataSourceOptions.newTopicStatsClient(),
pslReadDataSourceOptions.newTopicStatsClient(),
topicPath,
partitionCountReader,
Ticker.systemTicker()),
subscriptionPath,
Objects.requireNonNull(pslDataSourceOptions.flowControlSettings()),
pslDataSourceOptions.maxMessagesPerBatch());
Objects.requireNonNull(pslReadDataSourceOptions.flowControlSettings()),
pslReadDataSourceOptions.maxMessagesPerBatch());
}

@Override
public StreamWriter createStreamWriter(
String queryId, StructType schema, OutputMode mode, DataSourceOptions options) {
PslSparkUtils.verifyWriteInputSchema(schema);
PslWriteDataSourceOptions pslWriteDataSourceOptions =
PslWriteDataSourceOptions.fromSparkDataSourceOptions(options);
return new PslStreamWriter(schema, pslWriteDataSourceOptions);
}
}
97 changes: 97 additions & 0 deletions src/main/java/com/google/cloud/pubsublite/spark/PslDataWriter.java
@@ -0,0 +1,97 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.spark;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiService;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.internal.Publisher;
import com.google.cloud.pubsublite.spark.internal.PublisherFactory;
import com.google.common.flogger.GoogleLogger;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import javax.annotation.concurrent.GuardedBy;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.sources.v2.writer.DataWriter;
import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
import org.apache.spark.sql.types.StructType;

public class PslDataWriter implements DataWriter<InternalRow> {

private static final GoogleLogger log = GoogleLogger.forEnclosingClass();

private final long partitionId, taskId, epochId;
private final StructType inputSchema;
private final PublisherFactory publisherFactory;

@GuardedBy("this")
private Optional<Publisher<MessageMetadata>> publisher = Optional.empty();

@GuardedBy("this")
private final List<ApiFuture<MessageMetadata>> futures = new ArrayList<>();

public PslDataWriter(
long partitionId,
long taskId,
long epochId,
StructType schema,
PublisherFactory publisherFactory) {
this.partitionId = partitionId;
this.taskId = taskId;
this.epochId = epochId;
this.inputSchema = schema;
this.publisherFactory = publisherFactory;
}

@Override
public synchronized void write(InternalRow record) {
if (!publisher.isPresent() || publisher.get().state() != ApiService.State.RUNNING) {
publisher = Optional.of(publisherFactory.newPublisher());
}
futures.add(
publisher
.get()
.publish(Objects.requireNonNull(PslSparkUtils.toPubSubMessage(inputSchema, record))));
}

@Override
public synchronized WriterCommitMessage commit() throws IOException {
for (ApiFuture<MessageMetadata> f : futures) {
try {
f.get();
} catch (InterruptedException | ExecutionException e) {
publisher = Optional.empty();
throw new IOException(e);
}
}
log.atInfo().log(
"All writes for partitionId:%d, taskId:%d, epochId:%d succeeded, committing...",
partitionId, taskId, epochId);
return PslWriterCommitMessage.create(futures.size());
}

@Override
public synchronized void abort() {
log.atWarning().log(
"One or more writes for partitionId:%d, taskId:%d, epochId:%d failed, aborted.",
partitionId, taskId, epochId);
}
}
@@ -0,0 +1,45 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.spark;

import com.google.cloud.pubsublite.spark.internal.CachedPublishers;
import com.google.cloud.pubsublite.spark.internal.PublisherFactory;
import java.io.Serializable;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.sources.v2.writer.DataWriter;
import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
import org.apache.spark.sql.types.StructType;

public class PslDataWriterFactory implements Serializable, DataWriterFactory<InternalRow> {
private static final long serialVersionUID = -6904546364310978844L;

private static final CachedPublishers CACHED_PUBLISHERS = new CachedPublishers();

private final StructType inputSchema;
private final PslWriteDataSourceOptions writeOptions;

public PslDataWriterFactory(StructType inputSchema, PslWriteDataSourceOptions writeOptions) {
this.inputSchema = inputSchema;
this.writeOptions = writeOptions;
}

@Override
public DataWriter<InternalRow> createDataWriter(int partitionId, long taskId, long epochId) {
PublisherFactory pf = () -> CACHED_PUBLISHERS.getOrCreate(writeOptions);
return new PslDataWriter(partitionId, taskId, epochId, inputSchema, pf);
}
}
Expand Up @@ -24,6 +24,9 @@
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.internal.CursorClient;
import com.google.cloud.pubsublite.internal.wire.SubscriberFactory;
import com.google.cloud.pubsublite.spark.internal.MultiPartitionCommitter;
import com.google.cloud.pubsublite.spark.internal.PartitionSubscriberFactory;
import com.google.cloud.pubsublite.spark.internal.PerTopicHeadOffsetReader;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
Expand Down

0 comments on commit 92cfdfd

Please sign in to comment.