From 92cfdfdc85449bb2bf745d59cd9b40e5949ba53c Mon Sep 17 00:00:00 2001 From: jiangmichaellll <40044148+jiangmichaellll@users.noreply.github.com> Date: Fri, 2 Apr 2021 15:11:23 -0400 Subject: [PATCH] feat: PSL Connector Writer support (#121) --- clirr-ignored-differences.xml | 25 ++++ pom.xml | 5 + .../cloud/pubsublite/spark/Constants.java | 31 +++- .../pubsublite/spark/PslContinuousReader.java | 3 + .../cloud/pubsublite/spark/PslDataSource.java | 63 ++++++--- .../cloud/pubsublite/spark/PslDataWriter.java | 97 +++++++++++++ .../spark/PslDataWriterFactory.java | 45 ++++++ .../pubsublite/spark/PslMicroBatchReader.java | 3 + ...ons.java => PslReadDataSourceOptions.java} | 20 +-- .../cloud/pubsublite/spark/PslSparkUtils.java | 111 ++++++++++++++- .../pubsublite/spark/PslStreamWriter.java | 65 +++++++++ .../spark/PslWriteDataSourceOptions.java | 133 ++++++++++++++++++ .../spark/PslWriterCommitMessage.java | 30 ++++ .../CachedPartitionCountReader.java | 2 +- .../spark/internal/CachedPublishers.java | 64 +++++++++ .../LimitingHeadOffsetReader.java | 3 +- .../MultiPartitionCommitter.java | 3 +- .../MultiPartitionCommitterImpl.java | 3 +- .../{ => internal}/PartitionCountReader.java | 2 +- .../PartitionSubscriberFactory.java | 2 +- .../PerTopicHeadOffsetReader.java | 3 +- .../PslCredentialsProvider.java | 14 +- .../spark/internal/PublisherFactory.java | 26 ++++ .../spark/PslContinuousReaderTest.java | 7 +- .../pubsublite/spark/PslDataWriterTest.java | 84 +++++++++++ .../spark/PslMicroBatchReaderTest.java | 7 +- ...java => PslReadDataSourceOptionsTest.java} | 4 +- .../pubsublite/spark/PslSparkUtilsTest.java | 100 +++++++++++++ .../pubsublite/spark/PslStreamWriterTest.java | 50 +++++++ .../spark/PslWriteDataSourceOptionsTest.java | 35 +++++ .../LimitingHeadOffsetReaderTest.java | 2 +- .../MultiPartitionCommitterImplTest.java | 3 +- 32 files changed, 986 insertions(+), 59 deletions(-) create mode 100644 src/main/java/com/google/cloud/pubsublite/spark/PslDataWriter.java create mode 100644 src/main/java/com/google/cloud/pubsublite/spark/PslDataWriterFactory.java rename src/main/java/com/google/cloud/pubsublite/spark/{PslDataSourceOptions.java => PslReadDataSourceOptions.java} (92%) create mode 100644 src/main/java/com/google/cloud/pubsublite/spark/PslStreamWriter.java create mode 100644 src/main/java/com/google/cloud/pubsublite/spark/PslWriteDataSourceOptions.java create mode 100644 src/main/java/com/google/cloud/pubsublite/spark/PslWriterCommitMessage.java rename src/main/java/com/google/cloud/pubsublite/spark/{ => internal}/CachedPartitionCountReader.java (96%) create mode 100644 src/main/java/com/google/cloud/pubsublite/spark/internal/CachedPublishers.java rename src/main/java/com/google/cloud/pubsublite/spark/{ => internal}/LimitingHeadOffsetReader.java (97%) rename src/main/java/com/google/cloud/pubsublite/spark/{ => internal}/MultiPartitionCommitter.java (89%) rename src/main/java/com/google/cloud/pubsublite/spark/{ => internal}/MultiPartitionCommitterImpl.java (97%) rename src/main/java/com/google/cloud/pubsublite/spark/{ => internal}/PartitionCountReader.java (93%) rename src/main/java/com/google/cloud/pubsublite/spark/{ => internal}/PartitionSubscriberFactory.java (95%) rename src/main/java/com/google/cloud/pubsublite/spark/{ => internal}/PerTopicHeadOffsetReader.java (88%) rename src/main/java/com/google/cloud/pubsublite/spark/{ => internal}/PslCredentialsProvider.java (85%) create mode 100644 src/main/java/com/google/cloud/pubsublite/spark/internal/PublisherFactory.java create mode 100644 src/test/java/com/google/cloud/pubsublite/spark/PslDataWriterTest.java rename src/test/java/com/google/cloud/pubsublite/spark/{PslDataSourceOptionsTest.java => PslReadDataSourceOptionsTest.java} (89%) create mode 100644 src/test/java/com/google/cloud/pubsublite/spark/PslStreamWriterTest.java create mode 100644 src/test/java/com/google/cloud/pubsublite/spark/PslWriteDataSourceOptionsTest.java rename src/test/java/com/google/cloud/pubsublite/spark/{ => internal}/LimitingHeadOffsetReaderTest.java (98%) rename src/test/java/com/google/cloud/pubsublite/spark/{ => internal}/MultiPartitionCommitterImplTest.java (97%) diff --git a/clirr-ignored-differences.xml b/clirr-ignored-differences.xml index 1aa41e4f..6aa9dcf9 100644 --- a/clirr-ignored-differences.xml +++ b/clirr-ignored-differences.xml @@ -12,4 +12,29 @@ * * + + 8001 + com/google/cloud/pubsublite/spark/LimitingHeadOffsetReader + + + 8001 + com/google/cloud/pubsublite/spark/MultiPartitionCommitter* + + + 8001 + com/google/cloud/pubsublite/spark/PartitionSubscriberFactory + + + 8001 + com/google/cloud/pubsublite/spark/PerTopicHeadOffsetReader + + + 8001 + com/google/cloud/pubsublite/spark/PslCredentialsProvider + + + 8001 + com/google/cloud/pubsublite/spark/PslDataSourceOptions* + + \ No newline at end of file diff --git a/pom.xml b/pom.xml index c377b164..74123ab4 100644 --- a/pom.xml +++ b/pom.xml @@ -113,6 +113,11 @@ ${scala.version} provided + + org.scala-lang.modules + scala-java8-compat_2.11 + 0.9.1 + diff --git a/src/main/java/com/google/cloud/pubsublite/spark/Constants.java b/src/main/java/com/google/cloud/pubsublite/spark/Constants.java index cac4337a..9ad29b23 100644 --- a/src/main/java/com/google/cloud/pubsublite/spark/Constants.java +++ b/src/main/java/com/google/cloud/pubsublite/spark/Constants.java @@ -17,7 +17,12 @@ package com.google.cloud.pubsublite.spark; import com.google.cloud.pubsublite.internal.wire.PubsubContext; +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import org.apache.spark.sql.types.ArrayType; +import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.MapType; import org.apache.spark.sql.types.Metadata; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; @@ -26,22 +31,33 @@ public class Constants { public static long DEFAULT_BYTES_OUTSTANDING = 50_000_000; public static long DEFAULT_MESSAGES_OUTSTANDING = Long.MAX_VALUE; public static long DEFAULT_MAX_MESSAGES_PER_BATCH = Long.MAX_VALUE; + + public static ArrayType ATTRIBUTES_PER_KEY_DATATYPE = + DataTypes.createArrayType(DataTypes.BinaryType); + public static MapType ATTRIBUTES_DATATYPE = + DataTypes.createMapType(DataTypes.StringType, ATTRIBUTES_PER_KEY_DATATYPE); + public static Map PUBLISH_FIELD_TYPES = + ImmutableMap.of( + "key", DataTypes.BinaryType, + "data", DataTypes.BinaryType, + "attributes", ATTRIBUTES_DATATYPE, + "event_timestamp", DataTypes.TimestampType); public static StructType DEFAULT_SCHEMA = new StructType( new StructField[] { new StructField("subscription", DataTypes.StringType, false, Metadata.empty()), new StructField("partition", DataTypes.LongType, false, Metadata.empty()), new StructField("offset", DataTypes.LongType, false, Metadata.empty()), - new StructField("key", DataTypes.BinaryType, false, Metadata.empty()), - new StructField("data", DataTypes.BinaryType, false, Metadata.empty()), + new StructField("key", PUBLISH_FIELD_TYPES.get("key"), false, Metadata.empty()), + new StructField("data", PUBLISH_FIELD_TYPES.get("data"), false, Metadata.empty()), new StructField("publish_timestamp", DataTypes.TimestampType, false, Metadata.empty()), - new StructField("event_timestamp", DataTypes.TimestampType, true, Metadata.empty()), new StructField( - "attributes", - DataTypes.createMapType( - DataTypes.StringType, DataTypes.createArrayType(DataTypes.BinaryType)), + "event_timestamp", + PUBLISH_FIELD_TYPES.get("event_timestamp"), true, - Metadata.empty()) + Metadata.empty()), + new StructField( + "attributes", PUBLISH_FIELD_TYPES.get("attributes"), true, Metadata.empty()) }); public static final PubsubContext.Framework FRAMEWORK = PubsubContext.Framework.of("SPARK"); @@ -52,6 +68,7 @@ public class Constants { "pubsublite.flowcontrol.byteoutstandingperpartition"; public static String MESSAGES_OUTSTANDING_CONFIG_KEY = "pubsublite.flowcontrol.messageoutstandingperparition"; + public static String TOPIC_CONFIG_KEY = "pubsublite.topic"; public static String SUBSCRIPTION_CONFIG_KEY = "pubsublite.subscription"; public static String CREDENTIALS_KEY_CONFIG_KEY = "gcp.credentials.key"; } diff --git a/src/main/java/com/google/cloud/pubsublite/spark/PslContinuousReader.java b/src/main/java/com/google/cloud/pubsublite/spark/PslContinuousReader.java index 65953031..ad2ca3da 100644 --- a/src/main/java/com/google/cloud/pubsublite/spark/PslContinuousReader.java +++ b/src/main/java/com/google/cloud/pubsublite/spark/PslContinuousReader.java @@ -22,6 +22,9 @@ import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings; import com.google.cloud.pubsublite.internal.CursorClient; import com.google.cloud.pubsublite.internal.wire.SubscriberFactory; +import com.google.cloud.pubsublite.spark.internal.MultiPartitionCommitter; +import com.google.cloud.pubsublite.spark.internal.PartitionCountReader; +import com.google.cloud.pubsublite.spark.internal.PartitionSubscriberFactory; import com.google.common.annotations.VisibleForTesting; import java.util.ArrayList; import java.util.Arrays; diff --git a/src/main/java/com/google/cloud/pubsublite/spark/PslDataSource.java b/src/main/java/com/google/cloud/pubsublite/spark/PslDataSource.java index 08a96ee8..2ef2535d 100644 --- a/src/main/java/com/google/cloud/pubsublite/spark/PslDataSource.java +++ b/src/main/java/com/google/cloud/pubsublite/spark/PslDataSource.java @@ -23,6 +23,9 @@ import com.google.cloud.pubsublite.AdminClient; import com.google.cloud.pubsublite.SubscriptionPath; import com.google.cloud.pubsublite.TopicPath; +import com.google.cloud.pubsublite.spark.internal.CachedPartitionCountReader; +import com.google.cloud.pubsublite.spark.internal.LimitingHeadOffsetReader; +import com.google.cloud.pubsublite.spark.internal.PartitionCountReader; import java.util.Objects; import java.util.Optional; import org.apache.spark.sql.sources.DataSourceRegister; @@ -30,13 +33,20 @@ import org.apache.spark.sql.sources.v2.DataSourceOptions; import org.apache.spark.sql.sources.v2.DataSourceV2; import org.apache.spark.sql.sources.v2.MicroBatchReadSupport; +import org.apache.spark.sql.sources.v2.StreamWriteSupport; import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReader; import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader; +import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter; +import org.apache.spark.sql.streaming.OutputMode; import org.apache.spark.sql.types.StructType; @AutoService(DataSourceRegister.class) public final class PslDataSource - implements DataSourceV2, ContinuousReadSupport, MicroBatchReadSupport, DataSourceRegister { + implements DataSourceV2, + ContinuousReadSupport, + MicroBatchReadSupport, + StreamWriteSupport, + DataSourceRegister { @Override public String shortName() { @@ -51,23 +61,24 @@ public ContinuousReader createContinuousReader( "PubSub Lite uses fixed schema and custom schema is not allowed"); } - PslDataSourceOptions pslDataSourceOptions = - PslDataSourceOptions.fromSparkDataSourceOptions(options); - SubscriptionPath subscriptionPath = pslDataSourceOptions.subscriptionPath(); + PslReadDataSourceOptions pslReadDataSourceOptions = + PslReadDataSourceOptions.fromSparkDataSourceOptions(options); + SubscriptionPath subscriptionPath = pslReadDataSourceOptions.subscriptionPath(); TopicPath topicPath; - try (AdminClient adminClient = pslDataSourceOptions.newAdminClient()) { + try (AdminClient adminClient = pslReadDataSourceOptions.newAdminClient()) { topicPath = TopicPath.parse(adminClient.getSubscription(subscriptionPath).get().getTopic()); } catch (Throwable t) { throw toCanonical(t).underlying; } PartitionCountReader partitionCountReader = - new CachedPartitionCountReader(pslDataSourceOptions.newAdminClient(), topicPath); + new CachedPartitionCountReader(pslReadDataSourceOptions.newAdminClient(), topicPath); return new PslContinuousReader( - pslDataSourceOptions.newCursorClient(), - pslDataSourceOptions.newMultiPartitionCommitter(partitionCountReader.getPartitionCount()), - pslDataSourceOptions.getSubscriberFactory(), + pslReadDataSourceOptions.newCursorClient(), + pslReadDataSourceOptions.newMultiPartitionCommitter( + partitionCountReader.getPartitionCount()), + pslReadDataSourceOptions.getSubscriberFactory(), subscriptionPath, - Objects.requireNonNull(pslDataSourceOptions.flowControlSettings()), + Objects.requireNonNull(pslReadDataSourceOptions.flowControlSettings()), partitionCountReader); } @@ -79,28 +90,38 @@ public MicroBatchReader createMicroBatchReader( "PubSub Lite uses fixed schema and custom schema is not allowed"); } - PslDataSourceOptions pslDataSourceOptions = - PslDataSourceOptions.fromSparkDataSourceOptions(options); - SubscriptionPath subscriptionPath = pslDataSourceOptions.subscriptionPath(); + PslReadDataSourceOptions pslReadDataSourceOptions = + PslReadDataSourceOptions.fromSparkDataSourceOptions(options); + SubscriptionPath subscriptionPath = pslReadDataSourceOptions.subscriptionPath(); TopicPath topicPath; - try (AdminClient adminClient = pslDataSourceOptions.newAdminClient()) { + try (AdminClient adminClient = pslReadDataSourceOptions.newAdminClient()) { topicPath = TopicPath.parse(adminClient.getSubscription(subscriptionPath).get().getTopic()); } catch (Throwable t) { throw toCanonical(t).underlying; } PartitionCountReader partitionCountReader = - new CachedPartitionCountReader(pslDataSourceOptions.newAdminClient(), topicPath); + new CachedPartitionCountReader(pslReadDataSourceOptions.newAdminClient(), topicPath); return new PslMicroBatchReader( - pslDataSourceOptions.newCursorClient(), - pslDataSourceOptions.newMultiPartitionCommitter(partitionCountReader.getPartitionCount()), - pslDataSourceOptions.getSubscriberFactory(), + pslReadDataSourceOptions.newCursorClient(), + pslReadDataSourceOptions.newMultiPartitionCommitter( + partitionCountReader.getPartitionCount()), + pslReadDataSourceOptions.getSubscriberFactory(), new LimitingHeadOffsetReader( - pslDataSourceOptions.newTopicStatsClient(), + pslReadDataSourceOptions.newTopicStatsClient(), topicPath, partitionCountReader, Ticker.systemTicker()), subscriptionPath, - Objects.requireNonNull(pslDataSourceOptions.flowControlSettings()), - pslDataSourceOptions.maxMessagesPerBatch()); + Objects.requireNonNull(pslReadDataSourceOptions.flowControlSettings()), + pslReadDataSourceOptions.maxMessagesPerBatch()); + } + + @Override + public StreamWriter createStreamWriter( + String queryId, StructType schema, OutputMode mode, DataSourceOptions options) { + PslSparkUtils.verifyWriteInputSchema(schema); + PslWriteDataSourceOptions pslWriteDataSourceOptions = + PslWriteDataSourceOptions.fromSparkDataSourceOptions(options); + return new PslStreamWriter(schema, pslWriteDataSourceOptions); } } diff --git a/src/main/java/com/google/cloud/pubsublite/spark/PslDataWriter.java b/src/main/java/com/google/cloud/pubsublite/spark/PslDataWriter.java new file mode 100644 index 00000000..631fb2d3 --- /dev/null +++ b/src/main/java/com/google/cloud/pubsublite/spark/PslDataWriter.java @@ -0,0 +1,97 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.spark; + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiService; +import com.google.cloud.pubsublite.MessageMetadata; +import com.google.cloud.pubsublite.internal.Publisher; +import com.google.cloud.pubsublite.spark.internal.PublisherFactory; +import com.google.common.flogger.GoogleLogger; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import javax.annotation.concurrent.GuardedBy; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.sources.v2.writer.DataWriter; +import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage; +import org.apache.spark.sql.types.StructType; + +public class PslDataWriter implements DataWriter { + + private static final GoogleLogger log = GoogleLogger.forEnclosingClass(); + + private final long partitionId, taskId, epochId; + private final StructType inputSchema; + private final PublisherFactory publisherFactory; + + @GuardedBy("this") + private Optional> publisher = Optional.empty(); + + @GuardedBy("this") + private final List> futures = new ArrayList<>(); + + public PslDataWriter( + long partitionId, + long taskId, + long epochId, + StructType schema, + PublisherFactory publisherFactory) { + this.partitionId = partitionId; + this.taskId = taskId; + this.epochId = epochId; + this.inputSchema = schema; + this.publisherFactory = publisherFactory; + } + + @Override + public synchronized void write(InternalRow record) { + if (!publisher.isPresent() || publisher.get().state() != ApiService.State.RUNNING) { + publisher = Optional.of(publisherFactory.newPublisher()); + } + futures.add( + publisher + .get() + .publish(Objects.requireNonNull(PslSparkUtils.toPubSubMessage(inputSchema, record)))); + } + + @Override + public synchronized WriterCommitMessage commit() throws IOException { + for (ApiFuture f : futures) { + try { + f.get(); + } catch (InterruptedException | ExecutionException e) { + publisher = Optional.empty(); + throw new IOException(e); + } + } + log.atInfo().log( + "All writes for partitionId:%d, taskId:%d, epochId:%d succeeded, committing...", + partitionId, taskId, epochId); + return PslWriterCommitMessage.create(futures.size()); + } + + @Override + public synchronized void abort() { + log.atWarning().log( + "One or more writes for partitionId:%d, taskId:%d, epochId:%d failed, aborted.", + partitionId, taskId, epochId); + } +} diff --git a/src/main/java/com/google/cloud/pubsublite/spark/PslDataWriterFactory.java b/src/main/java/com/google/cloud/pubsublite/spark/PslDataWriterFactory.java new file mode 100644 index 00000000..12d95921 --- /dev/null +++ b/src/main/java/com/google/cloud/pubsublite/spark/PslDataWriterFactory.java @@ -0,0 +1,45 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.spark; + +import com.google.cloud.pubsublite.spark.internal.CachedPublishers; +import com.google.cloud.pubsublite.spark.internal.PublisherFactory; +import java.io.Serializable; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.sources.v2.writer.DataWriter; +import org.apache.spark.sql.sources.v2.writer.DataWriterFactory; +import org.apache.spark.sql.types.StructType; + +public class PslDataWriterFactory implements Serializable, DataWriterFactory { + private static final long serialVersionUID = -6904546364310978844L; + + private static final CachedPublishers CACHED_PUBLISHERS = new CachedPublishers(); + + private final StructType inputSchema; + private final PslWriteDataSourceOptions writeOptions; + + public PslDataWriterFactory(StructType inputSchema, PslWriteDataSourceOptions writeOptions) { + this.inputSchema = inputSchema; + this.writeOptions = writeOptions; + } + + @Override + public DataWriter createDataWriter(int partitionId, long taskId, long epochId) { + PublisherFactory pf = () -> CACHED_PUBLISHERS.getOrCreate(writeOptions); + return new PslDataWriter(partitionId, taskId, epochId, inputSchema, pf); + } +} diff --git a/src/main/java/com/google/cloud/pubsublite/spark/PslMicroBatchReader.java b/src/main/java/com/google/cloud/pubsublite/spark/PslMicroBatchReader.java index b2a346c0..a0f0dfee 100644 --- a/src/main/java/com/google/cloud/pubsublite/spark/PslMicroBatchReader.java +++ b/src/main/java/com/google/cloud/pubsublite/spark/PslMicroBatchReader.java @@ -24,6 +24,9 @@ import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings; import com.google.cloud.pubsublite.internal.CursorClient; import com.google.cloud.pubsublite.internal.wire.SubscriberFactory; +import com.google.cloud.pubsublite.spark.internal.MultiPartitionCommitter; +import com.google.cloud.pubsublite.spark.internal.PartitionSubscriberFactory; +import com.google.cloud.pubsublite.spark.internal.PerTopicHeadOffsetReader; import java.util.ArrayList; import java.util.List; import java.util.Optional; diff --git a/src/main/java/com/google/cloud/pubsublite/spark/PslDataSourceOptions.java b/src/main/java/com/google/cloud/pubsublite/spark/PslReadDataSourceOptions.java similarity index 92% rename from src/main/java/com/google/cloud/pubsublite/spark/PslDataSourceOptions.java rename to src/main/java/com/google/cloud/pubsublite/spark/PslReadDataSourceOptions.java index 380e022a..f5987788 100644 --- a/src/main/java/com/google/cloud/pubsublite/spark/PslDataSourceOptions.java +++ b/src/main/java/com/google/cloud/pubsublite/spark/PslReadDataSourceOptions.java @@ -33,6 +33,10 @@ import com.google.cloud.pubsublite.internal.wire.RoutingMetadata; import com.google.cloud.pubsublite.internal.wire.ServiceClients; import com.google.cloud.pubsublite.internal.wire.SubscriberBuilder; +import com.google.cloud.pubsublite.spark.internal.MultiPartitionCommitter; +import com.google.cloud.pubsublite.spark.internal.MultiPartitionCommitterImpl; +import com.google.cloud.pubsublite.spark.internal.PartitionSubscriberFactory; +import com.google.cloud.pubsublite.spark.internal.PslCredentialsProvider; import com.google.cloud.pubsublite.v1.AdminServiceClient; import com.google.cloud.pubsublite.v1.AdminServiceSettings; import com.google.cloud.pubsublite.v1.CursorServiceClient; @@ -47,7 +51,7 @@ import org.apache.spark.sql.sources.v2.DataSourceOptions; @AutoValue -public abstract class PslDataSourceOptions implements Serializable { +public abstract class PslReadDataSourceOptions implements Serializable { private static final long serialVersionUID = 2680059304693561607L; @Nullable @@ -60,7 +64,7 @@ public abstract class PslDataSourceOptions implements Serializable { public abstract long maxMessagesPerBatch(); public static Builder builder() { - return new AutoValue_PslDataSourceOptions.Builder() + return new AutoValue_PslReadDataSourceOptions.Builder() .setCredentialsKey(null) .setMaxMessagesPerBatch(Constants.DEFAULT_MAX_MESSAGES_PER_BATCH) .setFlowControlSettings( @@ -70,7 +74,7 @@ public static Builder builder() { .build()); } - public static PslDataSourceOptions fromSparkDataSourceOptions(DataSourceOptions options) { + public static PslReadDataSourceOptions fromSparkDataSourceOptions(DataSourceOptions options) { if (!options.get(Constants.SUBSCRIPTION_CONFIG_KEY).isPresent()) { throw new IllegalArgumentException(Constants.SUBSCRIPTION_CONFIG_KEY + " is required."); } @@ -115,7 +119,7 @@ public abstract static class Builder { public abstract Builder setFlowControlSettings(FlowControlSettings flowControlSettings); - public abstract PslDataSourceOptions build(); + public abstract PslReadDataSourceOptions build(); } MultiPartitionCommitter newMultiPartitionCommitter(long topicPartitionCount) { @@ -135,7 +139,7 @@ PartitionSubscriberFactory getSubscriberFactory() { PubsubContext context = PubsubContext.of(Constants.FRAMEWORK); SubscriberServiceSettings.Builder settingsBuilder = SubscriberServiceSettings.newBuilder() - .setCredentialsProvider(new PslCredentialsProvider(this)); + .setCredentialsProvider(new PslCredentialsProvider(credentialsKey())); ServiceClients.addDefaultMetadata( context, RoutingMetadata.of(this.subscriptionPath(), partition), settingsBuilder); try { @@ -161,7 +165,7 @@ private CursorServiceClient newCursorServiceClient() { addDefaultSettings( this.subscriptionPath().location().region(), CursorServiceSettings.newBuilder() - .setCredentialsProvider(new PslCredentialsProvider(this)))); + .setCredentialsProvider(new PslCredentialsProvider(credentialsKey())))); } catch (IOException e) { throw new IllegalStateException("Unable to create CursorServiceClient."); } @@ -181,7 +185,7 @@ private AdminServiceClient newAdminServiceClient() { addDefaultSettings( this.subscriptionPath().location().region(), AdminServiceSettings.newBuilder() - .setCredentialsProvider(new PslCredentialsProvider(this)))); + .setCredentialsProvider(new PslCredentialsProvider(credentialsKey())))); } catch (IOException e) { throw new IllegalStateException("Unable to create AdminServiceClient."); } @@ -201,7 +205,7 @@ private TopicStatsServiceClient newTopicStatsServiceClient() { addDefaultSettings( this.subscriptionPath().location().region(), TopicStatsServiceSettings.newBuilder() - .setCredentialsProvider(new PslCredentialsProvider(this)))); + .setCredentialsProvider(new PslCredentialsProvider(credentialsKey())))); } catch (IOException e) { throw new IllegalStateException("Unable to create TopicStatsServiceClient."); } diff --git a/src/main/java/com/google/cloud/pubsublite/spark/PslSparkUtils.java b/src/main/java/com/google/cloud/pubsublite/spark/PslSparkUtils.java index 1d54fe19..2510315a 100644 --- a/src/main/java/com/google/cloud/pubsublite/spark/PslSparkUtils.java +++ b/src/main/java/com/google/cloud/pubsublite/spark/PslSparkUtils.java @@ -19,12 +19,16 @@ import static com.google.common.base.Preconditions.checkArgument; import static scala.collection.JavaConverters.asScalaBufferConverter; +import com.google.cloud.pubsublite.Message; import com.google.cloud.pubsublite.Offset; import com.google.cloud.pubsublite.Partition; import com.google.cloud.pubsublite.SequencedMessage; import com.google.cloud.pubsublite.SubscriptionPath; import com.google.cloud.pubsublite.internal.CursorClient; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableListMultimap; import com.google.common.collect.ListMultimap; +import com.google.common.flogger.GoogleLogger; import com.google.common.math.LongMath; import com.google.protobuf.ByteString; import com.google.protobuf.util.Timestamps; @@ -34,15 +38,29 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import java.util.stream.Collectors; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.util.ArrayBasedMapData; +import org.apache.spark.sql.catalyst.util.ArrayData; import org.apache.spark.sql.catalyst.util.GenericArrayData; +import org.apache.spark.sql.catalyst.util.MapData; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; import org.apache.spark.unsafe.types.ByteArray; import org.apache.spark.unsafe.types.UTF8String; +import scala.Option; +import scala.compat.java8.functionConverterImpls.FromJavaBiConsumer; public class PslSparkUtils { - private static ArrayBasedMapData convertAttributesToSparkMap( + + private static final GoogleLogger log = GoogleLogger.forEnclosingClass(); + + @VisibleForTesting + public static ArrayBasedMapData convertAttributesToSparkMap( ListMultimap attributeMap) { List keyList = new ArrayList<>(); @@ -83,6 +101,97 @@ public static InternalRow toInternalRow( return InternalRow.apply(asScalaBufferConverter(list).asScala()); } + @SuppressWarnings("unchecked") + private static void extractVal( + StructType inputSchema, + InternalRow row, + String fieldName, + DataType expectedDataType, + Consumer consumer) { + Option idxOr = inputSchema.getFieldIndex(fieldName); + if (!idxOr.isEmpty()) { + Integer idx = (Integer) idxOr.get(); + // DateType should match and not throw ClassCastException, as we already verified + // type match in driver node. + consumer.accept((T) row.get(idx, expectedDataType)); + } + } + + public static Message toPubSubMessage(StructType inputSchema, InternalRow row) { + Message.Builder builder = Message.builder(); + extractVal( + inputSchema, + row, + "key", + Constants.PUBLISH_FIELD_TYPES.get("key"), + (byte[] o) -> builder.setKey(ByteString.copyFrom(o))); + extractVal( + inputSchema, + row, + "data", + Constants.PUBLISH_FIELD_TYPES.get("data"), + (byte[] o) -> builder.setData(ByteString.copyFrom(o))); + extractVal( + inputSchema, + row, + "event_timestamp", + Constants.PUBLISH_FIELD_TYPES.get("event_timestamp"), + (Long o) -> builder.setEventTime(Timestamps.fromMicros(o))); + extractVal( + inputSchema, + row, + "attributes", + Constants.PUBLISH_FIELD_TYPES.get("attributes"), + (MapData o) -> { + ImmutableListMultimap.Builder attributeMapBuilder = + ImmutableListMultimap.builder(); + o.foreach( + DataTypes.StringType, + Constants.ATTRIBUTES_PER_KEY_DATATYPE, + new FromJavaBiConsumer<>( + (k, v) -> { + String key = ((UTF8String) k).toString(); + ArrayData values = (ArrayData) v; + values.foreach( + DataTypes.BinaryType, + new FromJavaBiConsumer<>( + (idx, a) -> + attributeMapBuilder.put(key, ByteString.copyFrom((byte[]) a)))); + })); + builder.setAttributes(attributeMapBuilder.build()); + }); + return builder.build(); + } + + /** + * Make sure data fields for publish have expected Spark DataType if they exist. + * + * @param inputSchema input table schema to write to Pub/Sub Lite. + * @throws IllegalArgumentException if any DataType mismatch detected. + */ + public static void verifyWriteInputSchema(StructType inputSchema) { + Constants.PUBLISH_FIELD_TYPES.forEach( + (k, v) -> { + Option idxOr = inputSchema.getFieldIndex(k); + if (!idxOr.isEmpty()) { + StructField f = inputSchema.apply((int) idxOr.get()); + if (f.dataType() != v) { + throw new IllegalArgumentException( + String.format( + "Column %s in input schema to write to " + + "Pub/Sub Lite has a wrong DataType. Actual: %s, expected: %s.", + k, f.dataType(), v)); + } + } else { + log.atInfo().atMostEvery(5, TimeUnit.MINUTES).log( + "Input schema to write " + + "to Pub/Sub Lite doesn't contain %s column, this field for all rows will " + + "be set to empty.", + k); + } + }); + } + public static SparkSourceOffset toSparkSourceOffset(PslSourceOffset pslSourceOffset) { return new SparkSourceOffset( pslSourceOffset.partitionOffsetMap().entrySet().stream() diff --git a/src/main/java/com/google/cloud/pubsublite/spark/PslStreamWriter.java b/src/main/java/com/google/cloud/pubsublite/spark/PslStreamWriter.java new file mode 100644 index 00000000..b2efaf80 --- /dev/null +++ b/src/main/java/com/google/cloud/pubsublite/spark/PslStreamWriter.java @@ -0,0 +1,65 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.spark; + +import com.google.common.flogger.GoogleLogger; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.sources.v2.writer.DataWriterFactory; +import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage; +import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter; +import org.apache.spark.sql.types.StructType; + +public class PslStreamWriter implements StreamWriter { + + private static final GoogleLogger log = GoogleLogger.forEnclosingClass(); + + private final StructType inputSchema; + private final PslWriteDataSourceOptions writeOptions; + + public PslStreamWriter(StructType inputSchema, PslWriteDataSourceOptions writeOptions) { + this.inputSchema = inputSchema; + this.writeOptions = writeOptions; + } + + @Override + public void commit(long epochId, WriterCommitMessage[] messages) { + log.atInfo().log("Committed %d messages for epochId:%d.", countMessages(messages), epochId); + } + + @Override + public void abort(long epochId, WriterCommitMessage[] messages) { + log.atWarning().log( + "Epoch id: %d is aborted, %d messages might have been published.", + epochId, countMessages(messages)); + } + + private long countMessages(WriterCommitMessage[] messages) { + long cnt = 0; + for (WriterCommitMessage m : messages) { + // It's not guaranteed to be typed PslWriterCommitMessage when abort. + if (m instanceof PslWriterCommitMessage) { + cnt += ((PslWriterCommitMessage) m).numMessages(); + } + } + return cnt; + } + + @Override + public DataWriterFactory createWriterFactory() { + return new PslDataWriterFactory(inputSchema, writeOptions); + } +} diff --git a/src/main/java/com/google/cloud/pubsublite/spark/PslWriteDataSourceOptions.java b/src/main/java/com/google/cloud/pubsublite/spark/PslWriteDataSourceOptions.java new file mode 100644 index 00000000..44b6d95d --- /dev/null +++ b/src/main/java/com/google/cloud/pubsublite/spark/PslWriteDataSourceOptions.java @@ -0,0 +1,133 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.spark; + +import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical; +import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultMetadata; +import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultSettings; + +import com.google.api.gax.rpc.ApiException; +import com.google.auto.value.AutoValue; +import com.google.cloud.pubsublite.AdminClient; +import com.google.cloud.pubsublite.AdminClientSettings; +import com.google.cloud.pubsublite.MessageMetadata; +import com.google.cloud.pubsublite.Partition; +import com.google.cloud.pubsublite.TopicPath; +import com.google.cloud.pubsublite.internal.Publisher; +import com.google.cloud.pubsublite.internal.wire.PartitionCountWatchingPublisherSettings; +import com.google.cloud.pubsublite.internal.wire.PubsubContext; +import com.google.cloud.pubsublite.internal.wire.RoutingMetadata; +import com.google.cloud.pubsublite.internal.wire.SinglePartitionPublisherBuilder; +import com.google.cloud.pubsublite.spark.internal.PslCredentialsProvider; +import com.google.cloud.pubsublite.v1.AdminServiceClient; +import com.google.cloud.pubsublite.v1.AdminServiceSettings; +import com.google.cloud.pubsublite.v1.PublisherServiceClient; +import com.google.cloud.pubsublite.v1.PublisherServiceSettings; +import java.io.Serializable; +import javax.annotation.Nullable; +import org.apache.spark.sql.sources.v2.DataSourceOptions; + +@AutoValue +public abstract class PslWriteDataSourceOptions implements Serializable { + + @Nullable + public abstract String credentialsKey(); + + public abstract TopicPath topicPath(); + + public static Builder builder() { + return new AutoValue_PslWriteDataSourceOptions.Builder().setCredentialsKey(null); + } + + @AutoValue.Builder + public abstract static class Builder { + + public abstract PslWriteDataSourceOptions.Builder setCredentialsKey(String credentialsKey); + + public abstract PslWriteDataSourceOptions.Builder setTopicPath(TopicPath topicPath); + + public abstract PslWriteDataSourceOptions build(); + } + + public static PslWriteDataSourceOptions fromSparkDataSourceOptions(DataSourceOptions options) { + if (!options.get(Constants.TOPIC_CONFIG_KEY).isPresent()) { + throw new IllegalArgumentException(Constants.TOPIC_CONFIG_KEY + " is required."); + } + + Builder builder = builder(); + String topicPathVal = options.get(Constants.TOPIC_CONFIG_KEY).get(); + try { + builder.setTopicPath(TopicPath.parse(topicPathVal)); + } catch (ApiException e) { + throw new IllegalArgumentException("Unable to parse topic path " + topicPathVal, e); + } + options.get(Constants.CREDENTIALS_KEY_CONFIG_KEY).ifPresent(builder::setCredentialsKey); + return builder.build(); + } + + public PslCredentialsProvider getCredentialProvider() { + return new PslCredentialsProvider(credentialsKey()); + } + + public Publisher createNewPublisher() { + return PartitionCountWatchingPublisherSettings.newBuilder() + .setTopic(topicPath()) + .setPublisherFactory( + partition -> + SinglePartitionPublisherBuilder.newBuilder() + .setTopic(topicPath()) + .setPartition(partition) + .setServiceClient(newServiceClient(partition)) + .build()) + .setAdminClient(getAdminClient()) + .build() + .instantiate(); + } + + private PublisherServiceClient newServiceClient(Partition partition) throws ApiException { + PublisherServiceSettings.Builder settingsBuilder = PublisherServiceSettings.newBuilder(); + settingsBuilder = settingsBuilder.setCredentialsProvider(getCredentialProvider()); + settingsBuilder = + addDefaultMetadata( + PubsubContext.of(Constants.FRAMEWORK), + RoutingMetadata.of(topicPath(), partition), + settingsBuilder); + try { + return PublisherServiceClient.create( + addDefaultSettings(topicPath().location().region(), settingsBuilder)); + } catch (Throwable t) { + throw toCanonical(t).underlying; + } + } + + private AdminClient getAdminClient() throws ApiException { + try { + return AdminClient.create( + AdminClientSettings.newBuilder() + .setServiceClient( + AdminServiceClient.create( + addDefaultSettings( + topicPath().location().region(), + AdminServiceSettings.newBuilder() + .setCredentialsProvider(getCredentialProvider())))) + .setRegion(topicPath().location().region()) + .build()); + } catch (Throwable t) { + throw toCanonical(t).underlying; + } + } +} diff --git a/src/main/java/com/google/cloud/pubsublite/spark/PslWriterCommitMessage.java b/src/main/java/com/google/cloud/pubsublite/spark/PslWriterCommitMessage.java new file mode 100644 index 00000000..9204d169 --- /dev/null +++ b/src/main/java/com/google/cloud/pubsublite/spark/PslWriterCommitMessage.java @@ -0,0 +1,30 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.spark; + +import com.google.auto.value.AutoValue; +import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage; + +@AutoValue +public abstract class PslWriterCommitMessage implements WriterCommitMessage { + + public abstract long numMessages(); + + public static PslWriterCommitMessage create(long numMessages) { + return new AutoValue_PslWriterCommitMessage(numMessages); + } +} diff --git a/src/main/java/com/google/cloud/pubsublite/spark/CachedPartitionCountReader.java b/src/main/java/com/google/cloud/pubsublite/spark/internal/CachedPartitionCountReader.java similarity index 96% rename from src/main/java/com/google/cloud/pubsublite/spark/CachedPartitionCountReader.java rename to src/main/java/com/google/cloud/pubsublite/spark/internal/CachedPartitionCountReader.java index 35555805..a144d253 100644 --- a/src/main/java/com/google/cloud/pubsublite/spark/CachedPartitionCountReader.java +++ b/src/main/java/com/google/cloud/pubsublite/spark/internal/CachedPartitionCountReader.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.google.cloud.pubsublite.spark; +package com.google.cloud.pubsublite.spark.internal; import com.google.cloud.pubsublite.AdminClient; import com.google.cloud.pubsublite.PartitionLookupUtils; diff --git a/src/main/java/com/google/cloud/pubsublite/spark/internal/CachedPublishers.java b/src/main/java/com/google/cloud/pubsublite/spark/internal/CachedPublishers.java new file mode 100644 index 00000000..711a241a --- /dev/null +++ b/src/main/java/com/google/cloud/pubsublite/spark/internal/CachedPublishers.java @@ -0,0 +1,64 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.spark.internal; + +import com.google.api.core.ApiService; +import com.google.cloud.pubsublite.MessageMetadata; +import com.google.cloud.pubsublite.internal.Publisher; +import com.google.cloud.pubsublite.spark.PslWriteDataSourceOptions; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import javax.annotation.concurrent.GuardedBy; + +/** Cached {@link Publisher}s to reuse publisher of same settings in the same task. */ +public class CachedPublishers { + + // TODO(jiangmichaellll): Use com.google.cloud.pubsublite.internal.wire.SystemExecutors + // once new PSL client library is released. + private final Executor listenerExecutor = Executors.newSingleThreadExecutor(); + + @GuardedBy("this") + private static final Map> publishers = + new HashMap<>(); + + public synchronized Publisher getOrCreate( + PslWriteDataSourceOptions writeOptions) { + Publisher publisher = publishers.get(writeOptions); + if (publisher != null && publisher.state() == ApiService.State.RUNNING) { + return publisher; + } + + publisher = writeOptions.createNewPublisher(); + publishers.put(writeOptions, publisher); + publisher.addListener( + new ApiService.Listener() { + @Override + public void failed(ApiService.State s, Throwable t) { + removePublisher(writeOptions); + } + }, + listenerExecutor); + publisher.startAsync().awaitRunning(); + return publisher; + } + + private synchronized void removePublisher(PslWriteDataSourceOptions writeOptions) { + publishers.remove(writeOptions); + } +} diff --git a/src/main/java/com/google/cloud/pubsublite/spark/LimitingHeadOffsetReader.java b/src/main/java/com/google/cloud/pubsublite/spark/internal/LimitingHeadOffsetReader.java similarity index 97% rename from src/main/java/com/google/cloud/pubsublite/spark/LimitingHeadOffsetReader.java rename to src/main/java/com/google/cloud/pubsublite/spark/internal/LimitingHeadOffsetReader.java index 7bad0ffc..a974ba23 100644 --- a/src/main/java/com/google/cloud/pubsublite/spark/LimitingHeadOffsetReader.java +++ b/src/main/java/com/google/cloud/pubsublite/spark/internal/LimitingHeadOffsetReader.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.google.cloud.pubsublite.spark; +package com.google.cloud.pubsublite.spark.internal; import com.github.benmanes.caffeine.cache.AsyncLoadingCache; import com.github.benmanes.caffeine.cache.Caffeine; @@ -26,6 +26,7 @@ import com.google.cloud.pubsublite.TopicPath; import com.google.cloud.pubsublite.internal.TopicStatsClient; import com.google.cloud.pubsublite.proto.Cursor; +import com.google.cloud.pubsublite.spark.PslSourceOffset; import com.google.common.annotations.VisibleForTesting; import com.google.common.flogger.GoogleLogger; import com.google.common.util.concurrent.MoreExecutors; diff --git a/src/main/java/com/google/cloud/pubsublite/spark/MultiPartitionCommitter.java b/src/main/java/com/google/cloud/pubsublite/spark/internal/MultiPartitionCommitter.java similarity index 89% rename from src/main/java/com/google/cloud/pubsublite/spark/MultiPartitionCommitter.java rename to src/main/java/com/google/cloud/pubsublite/spark/internal/MultiPartitionCommitter.java index d42f33ca..bf6441e8 100644 --- a/src/main/java/com/google/cloud/pubsublite/spark/MultiPartitionCommitter.java +++ b/src/main/java/com/google/cloud/pubsublite/spark/internal/MultiPartitionCommitter.java @@ -14,10 +14,11 @@ * limitations under the License. */ -package com.google.cloud.pubsublite.spark; +package com.google.cloud.pubsublite.spark.internal; import com.google.cloud.pubsublite.Partition; import com.google.cloud.pubsublite.internal.wire.Committer; +import com.google.cloud.pubsublite.spark.PslSourceOffset; import java.io.Closeable; public interface MultiPartitionCommitter extends Closeable { diff --git a/src/main/java/com/google/cloud/pubsublite/spark/MultiPartitionCommitterImpl.java b/src/main/java/com/google/cloud/pubsublite/spark/internal/MultiPartitionCommitterImpl.java similarity index 97% rename from src/main/java/com/google/cloud/pubsublite/spark/MultiPartitionCommitterImpl.java rename to src/main/java/com/google/cloud/pubsublite/spark/internal/MultiPartitionCommitterImpl.java index 7ebec891..4c221f1d 100644 --- a/src/main/java/com/google/cloud/pubsublite/spark/MultiPartitionCommitterImpl.java +++ b/src/main/java/com/google/cloud/pubsublite/spark/internal/MultiPartitionCommitterImpl.java @@ -14,13 +14,14 @@ * limitations under the License. */ -package com.google.cloud.pubsublite.spark; +package com.google.cloud.pubsublite.spark.internal; import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutures; import com.google.cloud.pubsublite.Partition; import com.google.cloud.pubsublite.internal.wire.Committer; +import com.google.cloud.pubsublite.spark.PslSourceOffset; import com.google.common.annotations.VisibleForTesting; import com.google.common.flogger.GoogleLogger; import com.google.common.util.concurrent.MoreExecutors; diff --git a/src/main/java/com/google/cloud/pubsublite/spark/PartitionCountReader.java b/src/main/java/com/google/cloud/pubsublite/spark/internal/PartitionCountReader.java similarity index 93% rename from src/main/java/com/google/cloud/pubsublite/spark/PartitionCountReader.java rename to src/main/java/com/google/cloud/pubsublite/spark/internal/PartitionCountReader.java index 934d40be..90991835 100644 --- a/src/main/java/com/google/cloud/pubsublite/spark/PartitionCountReader.java +++ b/src/main/java/com/google/cloud/pubsublite/spark/internal/PartitionCountReader.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.google.cloud.pubsublite.spark; +package com.google.cloud.pubsublite.spark.internal; import java.io.Closeable; diff --git a/src/main/java/com/google/cloud/pubsublite/spark/PartitionSubscriberFactory.java b/src/main/java/com/google/cloud/pubsublite/spark/internal/PartitionSubscriberFactory.java similarity index 95% rename from src/main/java/com/google/cloud/pubsublite/spark/PartitionSubscriberFactory.java rename to src/main/java/com/google/cloud/pubsublite/spark/internal/PartitionSubscriberFactory.java index 9ea51670..d7a16257 100644 --- a/src/main/java/com/google/cloud/pubsublite/spark/PartitionSubscriberFactory.java +++ b/src/main/java/com/google/cloud/pubsublite/spark/internal/PartitionSubscriberFactory.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.google.cloud.pubsublite.spark; +package com.google.cloud.pubsublite.spark.internal; import com.google.api.gax.rpc.ApiException; import com.google.cloud.pubsublite.Partition; diff --git a/src/main/java/com/google/cloud/pubsublite/spark/PerTopicHeadOffsetReader.java b/src/main/java/com/google/cloud/pubsublite/spark/internal/PerTopicHeadOffsetReader.java similarity index 88% rename from src/main/java/com/google/cloud/pubsublite/spark/PerTopicHeadOffsetReader.java rename to src/main/java/com/google/cloud/pubsublite/spark/internal/PerTopicHeadOffsetReader.java index 21e0bc63..9ccd72c5 100644 --- a/src/main/java/com/google/cloud/pubsublite/spark/PerTopicHeadOffsetReader.java +++ b/src/main/java/com/google/cloud/pubsublite/spark/internal/PerTopicHeadOffsetReader.java @@ -14,8 +14,9 @@ * limitations under the License. */ -package com.google.cloud.pubsublite.spark; +package com.google.cloud.pubsublite.spark.internal; +import com.google.cloud.pubsublite.spark.PslSourceOffset; import java.io.Closeable; public interface PerTopicHeadOffsetReader extends Closeable { diff --git a/src/main/java/com/google/cloud/pubsublite/spark/PslCredentialsProvider.java b/src/main/java/com/google/cloud/pubsublite/spark/internal/PslCredentialsProvider.java similarity index 85% rename from src/main/java/com/google/cloud/pubsublite/spark/PslCredentialsProvider.java rename to src/main/java/com/google/cloud/pubsublite/spark/internal/PslCredentialsProvider.java index 6dce5272..6022a655 100644 --- a/src/main/java/com/google/cloud/pubsublite/spark/PslCredentialsProvider.java +++ b/src/main/java/com/google/cloud/pubsublite/spark/internal/PslCredentialsProvider.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.google.cloud.pubsublite.spark; +package com.google.cloud.pubsublite.spark.internal; import com.google.api.client.util.Base64; import com.google.api.gax.core.CredentialsProvider; @@ -23,17 +23,17 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.UncheckedIOException; +import javax.annotation.Nullable; public class PslCredentialsProvider implements CredentialsProvider { private final Credentials credentials; - public PslCredentialsProvider(PslDataSourceOptions options) { - if (options.credentialsKey() != null) { - this.credentials = createCredentialsFromKey(options.credentialsKey()); - } else { - this.credentials = createDefaultCredentials(); - } + public PslCredentialsProvider(@Nullable String credentialsKey) { + this.credentials = + credentialsKey != null + ? createCredentialsFromKey(credentialsKey) + : createDefaultCredentials(); } private static Credentials createCredentialsFromKey(String key) { diff --git a/src/main/java/com/google/cloud/pubsublite/spark/internal/PublisherFactory.java b/src/main/java/com/google/cloud/pubsublite/spark/internal/PublisherFactory.java new file mode 100644 index 00000000..81750def --- /dev/null +++ b/src/main/java/com/google/cloud/pubsublite/spark/internal/PublisherFactory.java @@ -0,0 +1,26 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.spark.internal; + +import com.google.cloud.pubsublite.MessageMetadata; +import com.google.cloud.pubsublite.internal.Publisher; +import java.io.Serializable; + +public interface PublisherFactory extends Serializable { + + Publisher newPublisher(); +} diff --git a/src/test/java/com/google/cloud/pubsublite/spark/PslContinuousReaderTest.java b/src/test/java/com/google/cloud/pubsublite/spark/PslContinuousReaderTest.java index 36bcdf91..d5cbc30e 100644 --- a/src/test/java/com/google/cloud/pubsublite/spark/PslContinuousReaderTest.java +++ b/src/test/java/com/google/cloud/pubsublite/spark/PslContinuousReaderTest.java @@ -24,14 +24,17 @@ import com.google.cloud.pubsublite.*; import com.google.cloud.pubsublite.internal.CursorClient; import com.google.cloud.pubsublite.internal.testing.UnitTestExamples; +import com.google.cloud.pubsublite.spark.internal.MultiPartitionCommitter; +import com.google.cloud.pubsublite.spark.internal.PartitionCountReader; +import com.google.cloud.pubsublite.spark.internal.PartitionSubscriberFactory; import com.google.common.collect.ImmutableMap; import java.util.Optional; import org.junit.Test; public class PslContinuousReaderTest { - private static final PslDataSourceOptions OPTIONS = - PslDataSourceOptions.builder() + private static final PslReadDataSourceOptions OPTIONS = + PslReadDataSourceOptions.builder() .setSubscriptionPath(UnitTestExamples.exampleSubscriptionPath()) .build(); private final CursorClient cursorClient = mock(CursorClient.class); diff --git a/src/test/java/com/google/cloud/pubsublite/spark/PslDataWriterTest.java b/src/test/java/com/google/cloud/pubsublite/spark/PslDataWriterTest.java new file mode 100644 index 00000000..a3f6f1a8 --- /dev/null +++ b/src/test/java/com/google/cloud/pubsublite/spark/PslDataWriterTest.java @@ -0,0 +1,84 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.spark; + +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.google.api.core.ApiFutures; +import com.google.cloud.pubsublite.MessageMetadata; +import com.google.cloud.pubsublite.Offset; +import com.google.cloud.pubsublite.Partition; +import com.google.cloud.pubsublite.internal.Publisher; +import com.google.cloud.pubsublite.spark.internal.PublisherFactory; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.junit.Test; + +public class PslDataWriterTest { + + private final InternalRow row = mock(InternalRow.class); + + @SuppressWarnings("unchecked") + private final Publisher publisher = mock(Publisher.class); + + private final PublisherFactory publisherFactory = mock(PublisherFactory.class); + private final StructType keyOnly = + new StructType( + new StructField[] { + new StructField( + "key", Constants.PUBLISH_FIELD_TYPES.get("key"), false, Metadata.empty()), + }); + + private final PslDataWriter writer = new PslDataWriter(1L, 2L, 3L, keyOnly, publisherFactory); + + @Test + public void testAllSuccess() throws IOException { + when(publisherFactory.newPublisher()).thenReturn(publisher); + when(publisher.publish(any())) + .thenReturn( + ApiFutures.immediateFuture(MessageMetadata.of(Partition.of(0L), Offset.of(0L)))); + when(row.get(anyInt(), eq(DataTypes.BinaryType))) + .thenReturn("abc".getBytes(StandardCharsets.UTF_8)); + writer.write(row); + writer.write(row); + assertThat(writer.commit()).isEqualTo(PslWriterCommitMessage.create(2)); + } + + @Test + public void testPartialFail() { + when(publisherFactory.newPublisher()).thenReturn(publisher); + when(publisher.publish(any())) + .thenReturn(ApiFutures.immediateFuture(MessageMetadata.of(Partition.of(0L), Offset.of(0L)))) + .thenReturn(ApiFutures.immediateFailedFuture(new InternalError(""))); + when(row.get(anyInt(), eq(DataTypes.BinaryType))) + .thenReturn("abc".getBytes(StandardCharsets.UTF_8)); + writer.write(row); + writer.write(row); + assertThrows(IOException.class, writer::commit); + } +} diff --git a/src/test/java/com/google/cloud/pubsublite/spark/PslMicroBatchReaderTest.java b/src/test/java/com/google/cloud/pubsublite/spark/PslMicroBatchReaderTest.java index 3692e7a5..23bee103 100644 --- a/src/test/java/com/google/cloud/pubsublite/spark/PslMicroBatchReaderTest.java +++ b/src/test/java/com/google/cloud/pubsublite/spark/PslMicroBatchReaderTest.java @@ -28,13 +28,16 @@ import com.google.cloud.pubsublite.Partition; import com.google.cloud.pubsublite.internal.CursorClient; import com.google.cloud.pubsublite.internal.testing.UnitTestExamples; +import com.google.cloud.pubsublite.spark.internal.MultiPartitionCommitter; +import com.google.cloud.pubsublite.spark.internal.PartitionSubscriberFactory; +import com.google.cloud.pubsublite.spark.internal.PerTopicHeadOffsetReader; import com.google.common.collect.ImmutableMap; import java.util.Optional; import org.junit.Test; public class PslMicroBatchReaderTest { - private static final PslDataSourceOptions OPTIONS = - PslDataSourceOptions.builder() + private static final PslReadDataSourceOptions OPTIONS = + PslReadDataSourceOptions.builder() .setSubscriptionPath(UnitTestExamples.exampleSubscriptionPath()) .build(); private final CursorClient cursorClient = mock(CursorClient.class); diff --git a/src/test/java/com/google/cloud/pubsublite/spark/PslDataSourceOptionsTest.java b/src/test/java/com/google/cloud/pubsublite/spark/PslReadDataSourceOptionsTest.java similarity index 89% rename from src/test/java/com/google/cloud/pubsublite/spark/PslDataSourceOptionsTest.java rename to src/test/java/com/google/cloud/pubsublite/spark/PslReadDataSourceOptionsTest.java index bc794ead..2db8f705 100644 --- a/src/test/java/com/google/cloud/pubsublite/spark/PslDataSourceOptionsTest.java +++ b/src/test/java/com/google/cloud/pubsublite/spark/PslReadDataSourceOptionsTest.java @@ -22,7 +22,7 @@ import org.apache.spark.sql.sources.v2.DataSourceOptions; import org.junit.Test; -public class PslDataSourceOptionsTest { +public class PslReadDataSourceOptionsTest { @Test public void testInvalidSubPath() { @@ -30,6 +30,6 @@ public void testInvalidSubPath() { new DataSourceOptions(ImmutableMap.of(Constants.SUBSCRIPTION_CONFIG_KEY, "invalid/path")); assertThrows( IllegalArgumentException.class, - () -> PslDataSourceOptions.fromSparkDataSourceOptions(options)); + () -> PslReadDataSourceOptions.fromSparkDataSourceOptions(options)); } } diff --git a/src/test/java/com/google/cloud/pubsublite/spark/PslSparkUtilsTest.java b/src/test/java/com/google/cloud/pubsublite/spark/PslSparkUtilsTest.java index b3b81246..7081082f 100644 --- a/src/test/java/com/google/cloud/pubsublite/spark/PslSparkUtilsTest.java +++ b/src/test/java/com/google/cloud/pubsublite/spark/PslSparkUtilsTest.java @@ -17,6 +17,8 @@ package com.google.cloud.pubsublite.spark; import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertThrows; +import static scala.collection.JavaConverters.asScalaBufferConverter; import com.google.cloud.pubsublite.Message; import com.google.cloud.pubsublite.Offset; @@ -29,10 +31,18 @@ import com.google.protobuf.Timestamp; import com.google.protobuf.util.Timestamps; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.util.ArrayData; import org.apache.spark.sql.catalyst.util.GenericArrayData; import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.unsafe.types.ByteArray; import org.junit.Test; public class PslSparkUtilsTest { @@ -105,4 +115,94 @@ public void testToPslPartitionOffset() { assertThat(PslSparkUtils.toPslPartitionOffset(sparkPartitionOffset)) .isEqualTo(pslPartitionOffset); } + + @Test + public void testToPubSubMessage() { + Timestamp eventTimestamp = Timestamp.newBuilder().setSeconds(10000000L).build(); + Message message = + Message.builder() + .setKey(ByteString.copyFromUtf8("key")) + .setData(ByteString.copyFromUtf8("data")) + .setEventTime(eventTimestamp) + .setAttributes( + ImmutableListMultimap.of( + "key1", ByteString.copyFromUtf8("val1"), + "key1", ByteString.copyFromUtf8("val2"), + "key2", ByteString.copyFromUtf8("val3"))) + .build(); + List list = + new ArrayList<>( + Arrays.asList( + ByteArray.concat(message.key().toByteArray()), + ByteArray.concat(message.data().toByteArray()), + PslSparkUtils.convertAttributesToSparkMap(message.attributes()), + Timestamps.toMicros(message.eventTime().get()), + "abc".getBytes())); + InternalRow row = InternalRow.apply(asScalaBufferConverter(list).asScala()); + + StructType structType = + new StructType( + new StructField[] { + new StructField("key", DataTypes.BinaryType, false, Metadata.empty()), + new StructField("data", DataTypes.BinaryType, false, Metadata.empty()), + new StructField("attributes", Constants.ATTRIBUTES_DATATYPE, true, Metadata.empty()), + new StructField("event_timestamp", DataTypes.TimestampType, true, Metadata.empty()), + new StructField("random_extra_field", DataTypes.BinaryType, false, Metadata.empty()) + }); + + assertThat(message).isEqualTo(PslSparkUtils.toPubSubMessage(structType, row)); + } + + @Test + public void testToPubSubMessageLongForEventTimestamp() { + Message expectedMsg = Message.builder().setEventTime(Timestamps.fromMicros(100000L)).build(); + + StructType structType = + new StructType( + new StructField[] { + new StructField("event_timestamp", DataTypes.LongType, false, Metadata.empty()) + }); + List list = Collections.singletonList(/*Timestamp=*/ 100000L); + InternalRow row = InternalRow.apply(asScalaBufferConverter(list).asScala()); + + Message message = PslSparkUtils.toPubSubMessage(structType, row); + assertThat(message).isEqualTo(expectedMsg); + } + + @Test + public void testVerifyWriteInputSchema() { + PslSparkUtils.verifyWriteInputSchema(Constants.DEFAULT_SCHEMA); + + StructType goodThoughMissing = + new StructType( + new StructField[] { + new StructField("offset", DataTypes.LongType, false, Metadata.empty()), + new StructField( + "key", Constants.PUBLISH_FIELD_TYPES.get("key"), false, Metadata.empty()), + new StructField( + "publish_timestamp", DataTypes.TimestampType, false, Metadata.empty()), + new StructField( + "attributes", + Constants.PUBLISH_FIELD_TYPES.get("attributes"), + true, + Metadata.empty()) + }); + PslSparkUtils.verifyWriteInputSchema(goodThoughMissing); + + StructType bad = + new StructType( + new StructField[] { + new StructField("offset", DataTypes.LongType, false, Metadata.empty()), + // Key field wrong DataType + new StructField("key", DataTypes.StringType, false, Metadata.empty()), + new StructField( + "publish_timestamp", DataTypes.TimestampType, false, Metadata.empty()), + new StructField( + "attributes", + Constants.PUBLISH_FIELD_TYPES.get("attributes"), + true, + Metadata.empty()) + }); + assertThrows(IllegalArgumentException.class, () -> PslSparkUtils.verifyWriteInputSchema(bad)); + } } diff --git a/src/test/java/com/google/cloud/pubsublite/spark/PslStreamWriterTest.java b/src/test/java/com/google/cloud/pubsublite/spark/PslStreamWriterTest.java new file mode 100644 index 00000000..35b525d7 --- /dev/null +++ b/src/test/java/com/google/cloud/pubsublite/spark/PslStreamWriterTest.java @@ -0,0 +1,50 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.spark; + +import com.google.cloud.pubsublite.internal.testing.UnitTestExamples; +import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage; +import org.junit.Test; + +public class PslStreamWriterTest { + + private final PslStreamWriter writer = + new PslStreamWriter( + Constants.DEFAULT_SCHEMA, + PslWriteDataSourceOptions.builder() + .setTopicPath(UnitTestExamples.exampleTopicPath()) + .build()); + private final PslWriterCommitMessage message1 = PslWriterCommitMessage.create(10); + private final PslWriterCommitMessage message2 = PslWriterCommitMessage.create(5); + + private static class AbortCommitMessage implements WriterCommitMessage {} + + @Test + public void testCommit() { + writer.commit(100, new WriterCommitMessage[] {message1, message2}); + } + + @Test + public void testAbort() { + writer.abort(100, new WriterCommitMessage[] {message1, message2, new AbortCommitMessage()}); + } + + @Test + public void testCreateFactory() { + writer.createWriterFactory(); + } +} diff --git a/src/test/java/com/google/cloud/pubsublite/spark/PslWriteDataSourceOptionsTest.java b/src/test/java/com/google/cloud/pubsublite/spark/PslWriteDataSourceOptionsTest.java new file mode 100644 index 00000000..5cf10f50 --- /dev/null +++ b/src/test/java/com/google/cloud/pubsublite/spark/PslWriteDataSourceOptionsTest.java @@ -0,0 +1,35 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.spark; + +import static org.junit.Assert.assertThrows; + +import com.google.common.collect.ImmutableMap; +import org.apache.spark.sql.sources.v2.DataSourceOptions; +import org.junit.Test; + +public class PslWriteDataSourceOptionsTest { + + @Test + public void testInvalidTopicPath() { + DataSourceOptions options = + new DataSourceOptions(ImmutableMap.of(Constants.TOPIC_CONFIG_KEY, "invalid/path")); + assertThrows( + IllegalArgumentException.class, + () -> PslWriteDataSourceOptions.fromSparkDataSourceOptions(options)); + } +} diff --git a/src/test/java/com/google/cloud/pubsublite/spark/LimitingHeadOffsetReaderTest.java b/src/test/java/com/google/cloud/pubsublite/spark/internal/LimitingHeadOffsetReaderTest.java similarity index 98% rename from src/test/java/com/google/cloud/pubsublite/spark/LimitingHeadOffsetReaderTest.java rename to src/test/java/com/google/cloud/pubsublite/spark/internal/LimitingHeadOffsetReaderTest.java index dcc3025a..944f86b0 100644 --- a/src/test/java/com/google/cloud/pubsublite/spark/LimitingHeadOffsetReaderTest.java +++ b/src/test/java/com/google/cloud/pubsublite/spark/internal/LimitingHeadOffsetReaderTest.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.google.cloud.pubsublite.spark; +package com.google.cloud.pubsublite.spark.internal; import static com.google.common.truth.Truth.assertThat; import static org.mockito.ArgumentMatchers.any; diff --git a/src/test/java/com/google/cloud/pubsublite/spark/MultiPartitionCommitterImplTest.java b/src/test/java/com/google/cloud/pubsublite/spark/internal/MultiPartitionCommitterImplTest.java similarity index 97% rename from src/test/java/com/google/cloud/pubsublite/spark/MultiPartitionCommitterImplTest.java rename to src/test/java/com/google/cloud/pubsublite/spark/internal/MultiPartitionCommitterImplTest.java index 65b4675a..9d801ea2 100644 --- a/src/test/java/com/google/cloud/pubsublite/spark/MultiPartitionCommitterImplTest.java +++ b/src/test/java/com/google/cloud/pubsublite/spark/internal/MultiPartitionCommitterImplTest.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.google.cloud.pubsublite.spark; +package com.google.cloud.pubsublite.spark.internal; import static com.google.cloud.pubsublite.spark.TestingUtils.createPslSourceOffset; import static org.mockito.ArgumentMatchers.eq; @@ -23,6 +23,7 @@ import com.google.api.core.SettableApiFuture; import com.google.cloud.pubsublite.*; import com.google.cloud.pubsublite.internal.wire.Committer; +import com.google.cloud.pubsublite.spark.PslSourceOffset; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ScheduledExecutorService;