diff --git a/.gitignore b/.gitignore new file mode 100644 index 00000000..86083d15 --- /dev/null +++ b/.gitignore @@ -0,0 +1,32 @@ +# Compiled class file +*.class + +# Log file +*.log + +# BlueJ files +*.ctxt + +# Mobile Tools for Java (J2ME) +.mtj.tmp/ + +# Package Files # +*.jar +*.war +*.nar +*.ear +*.zip +*.tar.gz +*.rar + +# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml +hs_err_pid* + +# intellij project folder +.idea/ +*.iml + +# maven build directory +target/ + +.flattened-pom.xml \ No newline at end of file diff --git a/.readme-partials.yaml b/.readme-partials.yaml new file mode 100644 index 00000000..275ee9ae --- /dev/null +++ b/.readme-partials.yaml @@ -0,0 +1,126 @@ +custom_content: | + ## Requirements + + ### Enable the PubSub Lite API + + Follow [these instructions](https://cloud.google.com/pubsub/lite/docs/quickstart#before-you-begin). + + ### Create a new subscription or use existing subscription + + Follow [the instruction](https://cloud.google.com/pubsub/lite/docs/quickstart#create_a_lite_subscription) to create a new + subscription or use existing subscription. If using existing subscription, the connector will read message from the + oldest unacknowledged. + + ### Create a Google Cloud Dataproc cluster (Optional) + + If you do not have an Apache Spark environment you can create a Cloud Dataproc cluster with pre-configured auth. The following examples assume you are using Cloud Dataproc, but you can use `spark-submit` on any cluster. + + ``` + MY_CLUSTER=... + gcloud dataproc clusters create "$MY_CLUSTER" + ``` + + ## Downloading and Using the Connector + + + The latest version connector of the connector (Scala 2.11) is publicly available in + gs://spark-lib/pubsublite/spark-pubsublite-latest.jar. + + + The connector is also available from the Maven Central + repository. It can be used using the `--packages` option or the + `spark.jars.packages` configuration property. Use the following value + + | Scala version | Connector Artifact | + | --- | --- | + | Scala 2.11 | `com.google.cloud.pubsublite.spark:pubsublite-spark-sql-streaming-with-dependencies_2.11:0.1.0` | + + + + ## Usage + + ### Reading data from PubSub Lite + + ``` + df = spark.readStream \ + .option("pubsublite.subscription", "projects/123456789/locations/us-central1-a/subscriptions/test-spark-subscription") + .format("pubsublite") \ + .load + ``` + + Note that the connector supports both MicroBatch Processing and [Continuous Processing](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#continuous-processing). + + ### Properties + + The connector supports a number of options to configure the read: + + | Option | Type | Required | Meaning | + | ------ | ---- | -------- | ------- | + | pubsublite.subscription | String | Y | Full subscription path that the connector will read from. | + | pubsublite.flowcontrol.byteoutstandingperpartition | Long | N | Max number of bytes per partition that will be cached in workers before Spark processes the messages. Default to 50000000 bytes. | + | pubsublite.flowcontrol.messageoutstandingperpartition | Long | N | Max number of messages per partition that will be cached in workers before Spark processes the messages. Default to Long.MAX_VALUE. | + | gcp.credentials.key | String | N | Service account JSON in base64. Default to [Application Default Credentials](https://cloud.google.com/docs/authentication/production#automatically). | + + ### Data Schema + + The connector has fixed data schema as follows: + + | Data Field | Spark Data Type | Notes | + | ---------- | --------------- | ----- | + | subscription | StringType | Full subscription path | + | partition | LongType | | + | offset | LongType | | + | key | BinaryType | | + | data | BinaryType | | + | attributes | MapType\[StringType, ArrayType\[BinaryType\]\] | | + | publish_timestamp | TimestampType | | + | event_timestamp | TimestampType | Nullable | + + ## Compiling with the connector + + To include the connector in your project: + + ### Maven + + ```xml + + com.google.cloud.pubsublite.spark + pubsublite-spark-sql-streaming-with-dependencies_2.11 + 0.1.0 + + ``` + + ### SBT + + ```sbt + libraryDependencies += "com.google.cloud.pubsublite.spark" %% "pubsublite-spark-sql-streaming-with-dependencies_2.11" % "0.1.0" + ``` + + ## Building the Connector + + The connector is built using Maven. Following command creates a jar with shaded dependencies: + + ``` + mvn package + ``` + + ## FAQ + + ### What is the Pricing for the PubSub Lite? + + See the [PubSub Lite pricing documentation](https://cloud.google.com/pubsub/lite/pricing). + + ### Can I configure the number of spark partitions? + + No, the number of spark partitions is set to be the number of PubSub Lite partitions of the topic that the supplied subscription is for. + + ### How do I authenticate outside GCE / Dataproc? + + Use a service account JSON key and `GOOGLE_APPLICATION_CREDENTIALS` as described [here](https://cloud.google.com/docs/authentication/getting-started). + + Credentials can be provided with `gcp.credentials.key` option, it needs be passed in as a base64-encoded string directly. + + Example: + ``` + spark.readStream.format("pubsublite").option("gcp.credentials.key", "") + ``` \ No newline at end of file diff --git a/README.md b/README.md deleted file mode 100644 index a486d6be..00000000 --- a/README.md +++ /dev/null @@ -1 +0,0 @@ -# java-pubsublite-spark diff --git a/pom.xml b/pom.xml new file mode 100644 index 00000000..a49376d8 --- /dev/null +++ b/pom.xml @@ -0,0 +1,272 @@ + + + + com.google.cloud + google-cloud-pubsublite-parent + 0.8.0 + + 4.0.0 + com.google.cloud + pubsublite-spark-sql-streaming + 0.1.0-SNAPSHOT + jar + Pub/Sub Lite Spark SQL Streaming + https://github.com/googleapis/java-pubsublite-spark + Spark SQL Streaming Connector for Google Cloud Pub/Sub Lite + + 1.8 + 1.8 + UTF-8 + 2.11.12 + 2.11 + 2.4.7 + + + + org.apache.spark + spark-sql_${scala.version.short} + ${spark.version} + provided + + + org.apache.spark + spark-catalyst_${scala.version.short} + ${spark.version} + provided + + + org.apache.spark + spark-unsafe_${scala.version.short} + ${spark.version} + provided + + + com.google.cloud + google-cloud-pubsublite + 0.8.0 + + + com.google.api.grpc + proto-google-cloud-pubsublite-v1 + 0.8.0 + + + com.google.guava + guava + + + com.google.api + gax + + + com.google.auto.value + auto-value-annotations + + + com.google.auto.service + auto-service-annotations + + + com.google.code.findbugs + jsr305 + + + com.google.auth + google-auth-library-credentials + + + com.google.auth + google-auth-library-oauth2-http + + + com.google.http-client + google-http-client + + + com.google.protobuf + protobuf-java + + + com.google.protobuf + protobuf-java-util + + + com.google.flogger + google-extensions + + + com.google.api + api-common + + + com.fasterxml.jackson.core + jackson-core + 2.12.1 + + + com.github.ben-manes.caffeine + caffeine + 2.8.8 + + + com.fasterxml.jackson.core + jackson-databind + 2.12.1 + + + org.scala-lang + scala-library + ${scala.version} + provided + + + + + junit + junit + 4.13.1 + test + + + com.google.truth + truth + 1.1 + test + + + org.mockito + mockito-core + test + 3.7.0 + + + com.google.guava + guava-testlib + test + + + + + src/main/java + src/test/java + target/classes + target/test-classes + + + org.apache.maven.plugins + maven-shade-plugin + 3.2.4 + + + package + + shade + + + + + + + + + *:* + + META-INF/maven/** + META-INF/*.MF + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + com + repackaged.com + + com.google.protobuf.** + com.google.common.** + + + + + io.grpc.netty.shaded + + com.google.cloud.pubsublite.repackaged.io.grpc.netty.shaded + + + + io + com.google.cloud.pubsublite.repackaged.io + + io.grpc.** + io.opencensus.** + io.perfmark.** + + + + META-INF/native/io_grpc_netty_shaded_ + + META-INF/native/com_google_cloud_pubsublite_repackaged_io_grpc_netty_shaded_ + + + + META-INF/native/libio_grpc_netty_shaded_ + + META-INF/native/libcom_google_cloud_pubsublite_repackaged_io_grpc_netty_shaded_ + + + + true + with-dependencies + + + + + + org.sonatype.plugins + nexus-staging-maven-plugin + + + true + + + + org.apache.maven.plugins + maven-enforcer-plugin + 3.0.0-M3 + + + enforce + + + + WARN + + org.checkerframework:checker-compat-qual + + + + + + org.apache.commons.logging.* + org.apache.commons.collections.* + org.apache.spark.unused.* + org.apache.hadoop.yarn.* + javax.ws.rs.* + + true + + + + + enforce + + + + + + + diff --git a/src/main/java/com/google/cloud/pubsublite/spark/Constants.java b/src/main/java/com/google/cloud/pubsublite/spark/Constants.java new file mode 100644 index 00000000..bdc8a1bd --- /dev/null +++ b/src/main/java/com/google/cloud/pubsublite/spark/Constants.java @@ -0,0 +1,55 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.spark; + +import com.google.cloud.pubsublite.internal.wire.PubsubContext; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; + +public class Constants { + public static long DEFAULT_BYTES_OUTSTANDING = 50_000_000; + public static long DEFAULT_MESSAGES_OUTSTANDING = Long.MAX_VALUE; + public static int DEFAULT_BATCH_OFFSET_RANGE = 100_000; + public static StructType DEFAULT_SCHEMA = + new StructType( + new StructField[] { + new StructField("subscription", DataTypes.StringType, false, Metadata.empty()), + new StructField("partition", DataTypes.LongType, false, Metadata.empty()), + new StructField("offset", DataTypes.LongType, false, Metadata.empty()), + new StructField("key", DataTypes.BinaryType, false, Metadata.empty()), + new StructField("data", DataTypes.BinaryType, false, Metadata.empty()), + new StructField("publish_timestamp", DataTypes.TimestampType, false, Metadata.empty()), + new StructField("event_timestamp", DataTypes.TimestampType, true, Metadata.empty()), + new StructField( + "attributes", + DataTypes.createMapType( + DataTypes.StringType, DataTypes.createArrayType(DataTypes.BinaryType)), + true, + Metadata.empty()) + }); + + public static final PubsubContext.Framework FRAMEWORK = PubsubContext.Framework.of("SPARK"); + + public static String BYTES_OUTSTANDING_CONFIG_KEY = + "pubsublite.flowcontrol.byteoutstandingperpartition"; + public static String MESSAGES_OUTSTANDING_CONFIG_KEY = + "pubsublite.flowcontrol.messageoutstandingperparition"; + public static String SUBSCRIPTION_CONFIG_KEY = "pubsublite.subscription"; + public static String CREDENTIALS_KEY_CONFIG_KEY = "gcp.credentials.key"; +} diff --git a/src/main/java/com/google/cloud/pubsublite/spark/LimitingHeadOffsetReader.java b/src/main/java/com/google/cloud/pubsublite/spark/LimitingHeadOffsetReader.java new file mode 100644 index 00000000..5954492f --- /dev/null +++ b/src/main/java/com/google/cloud/pubsublite/spark/LimitingHeadOffsetReader.java @@ -0,0 +1,100 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.spark; + +import com.github.benmanes.caffeine.cache.AsyncLoadingCache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.Ticker; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; +import com.google.cloud.pubsublite.Offset; +import com.google.cloud.pubsublite.Partition; +import com.google.cloud.pubsublite.TopicPath; +import com.google.cloud.pubsublite.internal.TopicStatsClient; +import com.google.cloud.pubsublite.proto.Cursor; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.MoreExecutors; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; + +/** + * Rate limited HeadOffsetReader, utilizing a LoadingCache that refreshes all partitions head + * offsets for the topic at most once per minute. + */ +public class LimitingHeadOffsetReader implements PerTopicHeadOffsetReader { + + private final TopicStatsClient topicStatsClient; + private final TopicPath topic; + private final long topicPartitionCount; + private final AsyncLoadingCache cachedHeadOffsets; + + @VisibleForTesting + public LimitingHeadOffsetReader( + TopicStatsClient topicStatsClient, TopicPath topic, long topicPartitionCount, Ticker ticker) { + this.topicStatsClient = topicStatsClient; + this.topic = topic; + this.topicPartitionCount = topicPartitionCount; + this.cachedHeadOffsets = + Caffeine.newBuilder() + .ticker(ticker) + .expireAfterWrite(1, TimeUnit.MINUTES) + .buildAsync(this::loadHeadOffset); + } + + private CompletableFuture loadHeadOffset(Partition partition, Executor executor) { + + CompletableFuture result = new CompletableFuture<>(); + ApiFutures.addCallback( + topicStatsClient.computeHeadCursor(topic, partition), + new ApiFutureCallback() { + @Override + public void onFailure(Throwable t) { + result.completeExceptionally(t); + } + + @Override + public void onSuccess(Cursor c) { + result.complete(Offset.of(c.getOffset())); + } + }, + MoreExecutors.directExecutor()); + return result; + } + + @Override + public PslSourceOffset getHeadOffset() { + Set keySet = new HashSet<>(); + for (int i = 0; i < topicPartitionCount; i++) { + keySet.add(Partition.of(i)); + } + CompletableFuture> future = cachedHeadOffsets.getAll(keySet); + try { + return PslSourceOffset.builder().partitionOffsetMap(future.get()).build(); + } catch (Throwable t) { + throw new IllegalStateException("Unable to compute head offset for topic: " + topic, t); + } + } + + @Override + public void close() { + topicStatsClient.close(); + } +} diff --git a/src/main/java/com/google/cloud/pubsublite/spark/MultiPartitionCommitter.java b/src/main/java/com/google/cloud/pubsublite/spark/MultiPartitionCommitter.java new file mode 100644 index 00000000..d42f33ca --- /dev/null +++ b/src/main/java/com/google/cloud/pubsublite/spark/MultiPartitionCommitter.java @@ -0,0 +1,32 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.spark; + +import com.google.cloud.pubsublite.Partition; +import com.google.cloud.pubsublite.internal.wire.Committer; +import java.io.Closeable; + +public interface MultiPartitionCommitter extends Closeable { + + interface CommitterFactory { + Committer newCommitter(Partition partition); + } + + void commit(PslSourceOffset offset); + + void close(); +} diff --git a/src/main/java/com/google/cloud/pubsublite/spark/MultiPartitionCommitterImpl.java b/src/main/java/com/google/cloud/pubsublite/spark/MultiPartitionCommitterImpl.java new file mode 100644 index 00000000..f672242f --- /dev/null +++ b/src/main/java/com/google/cloud/pubsublite/spark/MultiPartitionCommitterImpl.java @@ -0,0 +1,78 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.spark; + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; +import com.google.cloud.pubsublite.Partition; +import com.google.cloud.pubsublite.internal.wire.Committer; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.flogger.GoogleLogger; +import com.google.common.util.concurrent.MoreExecutors; +import java.util.HashMap; +import java.util.Map; + +public class MultiPartitionCommitterImpl implements MultiPartitionCommitter { + private static final GoogleLogger log = GoogleLogger.forEnclosingClass(); + + private final Map committerMap = new HashMap<>(); + + @VisibleForTesting + MultiPartitionCommitterImpl(long topicPartitionCount, CommitterFactory committerFactory) { + for (int i = 0; i < topicPartitionCount; i++) { + Partition p = Partition.of(i); + Committer committer = committerFactory.newCommitter(p); + committer.startAsync().awaitRunning(); + committerMap.put(p, committer); + } + } + + @Override + public synchronized void close() { + committerMap.values().forEach(c -> c.stopAsync().awaitTerminated()); + } + + @Override + public synchronized void commit(PslSourceOffset offset) { + offset + .partitionOffsetMap() + .forEach( + (p, o) -> { + // Note we don't need to worry about commit offset disorder here since Committer + // guarantees the ordering. Once commitOffset() returns, it's either already + // sent to stream, or waiting for next connection to open to be sent in order. + ApiFuture future = committerMap.get(p).commitOffset(o); + ApiFutures.addCallback( + future, + new ApiFutureCallback() { + @Override + public void onFailure(Throwable t) { + if (!future.isCancelled()) { + log.atWarning().log("Failed to commit %s,%s.", p.value(), o.value(), t); + } + } + + @Override + public void onSuccess(Void result) { + log.atInfo().log("Committed %s,%s.", p.value(), o.value()); + } + }, + MoreExecutors.directExecutor()); + }); + } +} diff --git a/src/main/java/com/google/cloud/pubsublite/spark/PartitionSubscriberFactory.java b/src/main/java/com/google/cloud/pubsublite/spark/PartitionSubscriberFactory.java new file mode 100644 index 00000000..9ea51670 --- /dev/null +++ b/src/main/java/com/google/cloud/pubsublite/spark/PartitionSubscriberFactory.java @@ -0,0 +1,31 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.spark; + +import com.google.api.gax.rpc.ApiException; +import com.google.cloud.pubsublite.Partition; +import com.google.cloud.pubsublite.SequencedMessage; +import com.google.cloud.pubsublite.internal.wire.Subscriber; +import com.google.common.collect.ImmutableList; +import java.io.Serializable; +import java.util.function.Consumer; + +public interface PartitionSubscriberFactory extends Serializable { + Subscriber newSubscriber( + Partition partition, Consumer> message_consumer) + throws ApiException; +} diff --git a/src/main/java/com/google/cloud/pubsublite/spark/PerTopicHeadOffsetReader.java b/src/main/java/com/google/cloud/pubsublite/spark/PerTopicHeadOffsetReader.java new file mode 100644 index 00000000..21e0bc63 --- /dev/null +++ b/src/main/java/com/google/cloud/pubsublite/spark/PerTopicHeadOffsetReader.java @@ -0,0 +1,28 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.spark; + +import java.io.Closeable; + +public interface PerTopicHeadOffsetReader extends Closeable { + + // Gets the head offsets for all partitions in the topic. Blocks. + PslSourceOffset getHeadOffset(); + + @Override + void close(); +} diff --git a/src/main/java/com/google/cloud/pubsublite/spark/PslContinuousInputPartition.java b/src/main/java/com/google/cloud/pubsublite/spark/PslContinuousInputPartition.java new file mode 100644 index 00000000..93e763f6 --- /dev/null +++ b/src/main/java/com/google/cloud/pubsublite/spark/PslContinuousInputPartition.java @@ -0,0 +1,85 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.spark; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.cloud.pubsublite.SubscriptionPath; +import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings; +import com.google.cloud.pubsublite.internal.BlockingPullSubscriberImpl; +import com.google.cloud.pubsublite.internal.CheckedApiException; +import com.google.cloud.pubsublite.internal.wire.SubscriberFactory; +import com.google.cloud.pubsublite.proto.Cursor; +import com.google.cloud.pubsublite.proto.SeekRequest; +import java.io.Serializable; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.sources.v2.reader.ContinuousInputPartition; +import org.apache.spark.sql.sources.v2.reader.InputPartitionReader; +import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset; + +public class PslContinuousInputPartition + implements ContinuousInputPartition, Serializable { + + private final SubscriberFactory subscriberFactory; + private final SparkPartitionOffset startOffset; + private final SubscriptionPath subscriptionPath; + private final FlowControlSettings flowControlSettings; + + public PslContinuousInputPartition( + SubscriberFactory subscriberFactory, + SparkPartitionOffset startOffset, + SubscriptionPath subscriptionPath, + FlowControlSettings flowControlSettings) { + this.subscriberFactory = subscriberFactory; + this.startOffset = startOffset; + this.subscriptionPath = subscriptionPath; + this.flowControlSettings = flowControlSettings; + } + + @Override + public InputPartitionReader createContinuousReader(PartitionOffset offset) { + checkArgument( + SparkPartitionOffset.class.isAssignableFrom(offset.getClass()), + "offset is not assignable to SparkPartitionOffset"); + + SparkPartitionOffset sparkPartitionOffset = (SparkPartitionOffset) offset; + PslPartitionOffset pslPartitionOffset = + PslSparkUtils.toPslPartitionOffset(sparkPartitionOffset); + + BlockingPullSubscriberImpl subscriber; + try { + subscriber = + new BlockingPullSubscriberImpl( + subscriberFactory, + flowControlSettings, + SeekRequest.newBuilder() + .setCursor( + Cursor.newBuilder().setOffset(pslPartitionOffset.offset().value()).build()) + .build()); + } catch (CheckedApiException e) { + throw new IllegalStateException( + "Unable to create PSL subscriber for " + startOffset.toString(), e); + } + return new PslContinuousInputPartitionReader( + subscriptionPath, sparkPartitionOffset, subscriber); + } + + @Override + public InputPartitionReader createPartitionReader() { + return createContinuousReader(startOffset); + } +} diff --git a/src/main/java/com/google/cloud/pubsublite/spark/PslContinuousInputPartitionReader.java b/src/main/java/com/google/cloud/pubsublite/spark/PslContinuousInputPartitionReader.java new file mode 100644 index 00000000..4929ad3e --- /dev/null +++ b/src/main/java/com/google/cloud/pubsublite/spark/PslContinuousInputPartitionReader.java @@ -0,0 +1,88 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.spark; + +import static com.google.common.base.Preconditions.checkState; + +import com.google.cloud.pubsublite.SequencedMessage; +import com.google.cloud.pubsublite.SubscriptionPath; +import com.google.cloud.pubsublite.internal.BlockingPullSubscriberImpl; +import com.google.common.flogger.GoogleLogger; +import java.util.Optional; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousInputPartitionReader; +import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset; + +public class PslContinuousInputPartitionReader + implements ContinuousInputPartitionReader { + private static final GoogleLogger log = GoogleLogger.forEnclosingClass(); + + private final SubscriptionPath subscriptionPath; + private final BlockingPullSubscriberImpl subscriber; + private SparkPartitionOffset currentOffset; + private SequencedMessage currentMsg; + + PslContinuousInputPartitionReader( + SubscriptionPath subscriptionPath, + SparkPartitionOffset startOffset, + BlockingPullSubscriberImpl subscriber) { + this.subscriptionPath = subscriptionPath; + this.currentOffset = startOffset; + this.subscriber = subscriber; + this.currentMsg = null; + } + + @Override + public PartitionOffset getOffset() { + return currentOffset; + } + + @Override + public boolean next() { + try { + subscriber.onData().get(); + // since next() will not be called concurrently, we are sure that the message + // is available to this thread. + Optional msg = subscriber.messageIfAvailable(); + checkState(msg.isPresent()); + currentMsg = msg.get(); + currentOffset = + SparkPartitionOffset.builder() + .partition(currentOffset.partition()) + .offset(currentMsg.offset().value()) + .build(); + return true; + } catch (Throwable t) { + throw new IllegalStateException("Failed to retrieve messages.", t); + } + } + + @Override + public InternalRow get() { + checkState(currentMsg != null); + return PslSparkUtils.toInternalRow(currentMsg, subscriptionPath, currentOffset.partition()); + } + + @Override + public void close() { + try { + subscriber.close(); + } catch (Exception e) { + log.atWarning().log("Subscriber failed to close."); + } + } +} diff --git a/src/main/java/com/google/cloud/pubsublite/spark/PslContinuousReader.java b/src/main/java/com/google/cloud/pubsublite/spark/PslContinuousReader.java new file mode 100644 index 00000000..33938594 --- /dev/null +++ b/src/main/java/com/google/cloud/pubsublite/spark/PslContinuousReader.java @@ -0,0 +1,133 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.spark; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.cloud.pubsublite.SubscriptionPath; +import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings; +import com.google.cloud.pubsublite.internal.CursorClient; +import com.google.cloud.pubsublite.internal.wire.SubscriberFactory; +import com.google.common.annotations.VisibleForTesting; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.sources.v2.reader.InputPartition; +import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReader; +import org.apache.spark.sql.sources.v2.reader.streaming.Offset; +import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset; +import org.apache.spark.sql.types.StructType; + +public class PslContinuousReader implements ContinuousReader { + + private final CursorClient cursorClient; + private final MultiPartitionCommitter committer; + private final PartitionSubscriberFactory partitionSubscriberFactory; + private final SubscriptionPath subscriptionPath; + private final FlowControlSettings flowControlSettings; + private final long topicPartitionCount; + private SparkSourceOffset startOffset; + + @VisibleForTesting + public PslContinuousReader( + CursorClient cursorClient, + MultiPartitionCommitter committer, + PartitionSubscriberFactory partitionSubscriberFactory, + SubscriptionPath subscriptionPath, + FlowControlSettings flowControlSettings, + long topicPartitionCount) { + this.cursorClient = cursorClient; + this.committer = committer; + this.partitionSubscriberFactory = partitionSubscriberFactory; + this.subscriptionPath = subscriptionPath; + this.flowControlSettings = flowControlSettings; + this.topicPartitionCount = topicPartitionCount; + } + + @Override + public Offset mergeOffsets(PartitionOffset[] offsets) { + checkArgument( + SparkPartitionOffset.class.isAssignableFrom(offsets.getClass().getComponentType()), + "PartitionOffset object is not assignable to SparkPartitionOffset."); + return SparkSourceOffset.merge( + Arrays.copyOf(offsets, offsets.length, SparkPartitionOffset[].class)); + } + + @Override + public Offset deserializeOffset(String json) { + return SparkSourceOffset.fromJson(json); + } + + @Override + public Offset getStartOffset() { + return startOffset; + } + + @Override + public void setStartOffset(Optional start) { + if (start.isPresent()) { + checkArgument( + SparkSourceOffset.class.isAssignableFrom(start.get().getClass()), + "start offset is not assignable to PslSourceOffset."); + startOffset = (SparkSourceOffset) start.get(); + return; + } + startOffset = + PslSparkUtils.getSparkStartOffset(cursorClient, subscriptionPath, topicPartitionCount); + } + + @Override + public void commit(Offset end) { + checkArgument( + SparkSourceOffset.class.isAssignableFrom(end.getClass()), + "end offset is not assignable to SparkSourceOffset."); + committer.commit(PslSparkUtils.toPslSourceOffset((SparkSourceOffset) end)); + } + + @Override + public void stop() { + cursorClient.shutdown(); + committer.close(); + } + + @Override + public StructType readSchema() { + return Constants.DEFAULT_SCHEMA; + } + + @Override + public List> planInputPartitions() { + List> list = new ArrayList<>(); + for (SparkPartitionOffset offset : startOffset.getPartitionOffsetMap().values()) { + PartitionSubscriberFactory partitionSubscriberFactory = this.partitionSubscriberFactory; + SubscriberFactory subscriberFactory = + (consumer) -> partitionSubscriberFactory.newSubscriber(offset.partition(), consumer); + list.add( + new PslContinuousInputPartition( + subscriberFactory, + SparkPartitionOffset.builder() + .partition(offset.partition()) + .offset(offset.offset()) + .build(), + subscriptionPath, + flowControlSettings)); + } + return list; + } +} diff --git a/src/main/java/com/google/cloud/pubsublite/spark/PslCredentialsProvider.java b/src/main/java/com/google/cloud/pubsublite/spark/PslCredentialsProvider.java new file mode 100644 index 00000000..6dce5272 --- /dev/null +++ b/src/main/java/com/google/cloud/pubsublite/spark/PslCredentialsProvider.java @@ -0,0 +1,61 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.spark; + +import com.google.api.client.util.Base64; +import com.google.api.gax.core.CredentialsProvider; +import com.google.auth.Credentials; +import com.google.auth.oauth2.GoogleCredentials; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.UncheckedIOException; + +public class PslCredentialsProvider implements CredentialsProvider { + + private final Credentials credentials; + + public PslCredentialsProvider(PslDataSourceOptions options) { + if (options.credentialsKey() != null) { + this.credentials = createCredentialsFromKey(options.credentialsKey()); + } else { + this.credentials = createDefaultCredentials(); + } + } + + private static Credentials createCredentialsFromKey(String key) { + try { + return GoogleCredentials.fromStream(new ByteArrayInputStream(Base64.decodeBase64(key))) + .createScoped("https://www.googleapis.com/auth/cloud-platform"); + } catch (IOException e) { + throw new UncheckedIOException("Failed to create Credentials from key", e); + } + } + + public static Credentials createDefaultCredentials() { + try { + return GoogleCredentials.getApplicationDefault() + .createScoped("https://www.googleapis.com/auth/cloud-platform"); + } catch (IOException e) { + throw new UncheckedIOException("Failed to create default Credentials", e); + } + } + + @Override + public Credentials getCredentials() { + return credentials; + } +} diff --git a/src/main/java/com/google/cloud/pubsublite/spark/PslDataSource.java b/src/main/java/com/google/cloud/pubsublite/spark/PslDataSource.java new file mode 100644 index 00000000..0ac68727 --- /dev/null +++ b/src/main/java/com/google/cloud/pubsublite/spark/PslDataSource.java @@ -0,0 +1,103 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.spark; + +import com.github.benmanes.caffeine.cache.Ticker; +import com.google.auto.service.AutoService; +import com.google.cloud.pubsublite.AdminClient; +import com.google.cloud.pubsublite.PartitionLookupUtils; +import com.google.cloud.pubsublite.SubscriptionPath; +import com.google.cloud.pubsublite.TopicPath; +import com.google.cloud.pubsublite.internal.CursorClient; +import java.util.Objects; +import java.util.Optional; +import org.apache.spark.sql.sources.DataSourceRegister; +import org.apache.spark.sql.sources.v2.ContinuousReadSupport; +import org.apache.spark.sql.sources.v2.DataSourceOptions; +import org.apache.spark.sql.sources.v2.DataSourceV2; +import org.apache.spark.sql.sources.v2.MicroBatchReadSupport; +import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReader; +import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader; +import org.apache.spark.sql.types.StructType; + +@AutoService(DataSourceRegister.class) +public final class PslDataSource + implements DataSourceV2, ContinuousReadSupport, MicroBatchReadSupport, DataSourceRegister { + + @Override + public String shortName() { + return "pubsublite"; + } + + @Override + public ContinuousReader createContinuousReader( + Optional schema, String checkpointLocation, DataSourceOptions options) { + if (schema.isPresent()) { + throw new IllegalArgumentException( + "PubSub Lite uses fixed schema and custom schema is not allowed"); + } + + PslDataSourceOptions pslDataSourceOptions = + PslDataSourceOptions.fromSparkDataSourceOptions(options); + CursorClient cursorClient = pslDataSourceOptions.newCursorClient(); + AdminClient adminClient = pslDataSourceOptions.newAdminClient(); + SubscriptionPath subscriptionPath = pslDataSourceOptions.subscriptionPath(); + long topicPartitionCount = PartitionLookupUtils.numPartitions(subscriptionPath, adminClient); + return new PslContinuousReader( + cursorClient, + pslDataSourceOptions.newMultiPartitionCommitter(topicPartitionCount), + pslDataSourceOptions.getSubscriberFactory(), + subscriptionPath, + Objects.requireNonNull(pslDataSourceOptions.flowControlSettings()), + topicPartitionCount); + } + + @Override + public MicroBatchReader createMicroBatchReader( + Optional schema, String checkpointLocation, DataSourceOptions options) { + if (schema.isPresent()) { + throw new IllegalArgumentException( + "PubSub Lite uses fixed schema and custom schema is not allowed"); + } + + PslDataSourceOptions pslDataSourceOptions = + PslDataSourceOptions.fromSparkDataSourceOptions(options); + CursorClient cursorClient = pslDataSourceOptions.newCursorClient(); + AdminClient adminClient = pslDataSourceOptions.newAdminClient(); + SubscriptionPath subscriptionPath = pslDataSourceOptions.subscriptionPath(); + TopicPath topicPath; + try { + topicPath = TopicPath.parse(adminClient.getSubscription(subscriptionPath).get().getTopic()); + } catch (Throwable t) { + throw new IllegalStateException( + "Unable to get topic for subscription " + subscriptionPath, t); + } + long topicPartitionCount = PartitionLookupUtils.numPartitions(topicPath, adminClient); + return new PslMicroBatchReader( + cursorClient, + pslDataSourceOptions.newMultiPartitionCommitter(topicPartitionCount), + pslDataSourceOptions.getSubscriberFactory(), + new LimitingHeadOffsetReader( + pslDataSourceOptions.newTopicStatsClient(), + topicPath, + topicPartitionCount, + Ticker.systemTicker()), + subscriptionPath, + Objects.requireNonNull(pslDataSourceOptions.flowControlSettings()), + topicPartitionCount); + } +} diff --git a/src/main/java/com/google/cloud/pubsublite/spark/PslDataSourceOptions.java b/src/main/java/com/google/cloud/pubsublite/spark/PslDataSourceOptions.java new file mode 100644 index 00000000..8db019fa --- /dev/null +++ b/src/main/java/com/google/cloud/pubsublite/spark/PslDataSourceOptions.java @@ -0,0 +1,209 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.spark; + +import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultSettings; + +import com.google.auto.value.AutoValue; +import com.google.cloud.pubsublite.AdminClient; +import com.google.cloud.pubsublite.AdminClientSettings; +import com.google.cloud.pubsublite.SubscriptionPath; +import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings; +import com.google.cloud.pubsublite.internal.CursorClient; +import com.google.cloud.pubsublite.internal.CursorClientSettings; +import com.google.cloud.pubsublite.internal.TopicStatsClient; +import com.google.cloud.pubsublite.internal.TopicStatsClientSettings; +import com.google.cloud.pubsublite.internal.wire.CommitterBuilder; +import com.google.cloud.pubsublite.internal.wire.PubsubContext; +import com.google.cloud.pubsublite.internal.wire.RoutingMetadata; +import com.google.cloud.pubsublite.internal.wire.ServiceClients; +import com.google.cloud.pubsublite.internal.wire.SubscriberBuilder; +import com.google.cloud.pubsublite.v1.AdminServiceClient; +import com.google.cloud.pubsublite.v1.AdminServiceSettings; +import com.google.cloud.pubsublite.v1.CursorServiceClient; +import com.google.cloud.pubsublite.v1.CursorServiceSettings; +import com.google.cloud.pubsublite.v1.SubscriberServiceClient; +import com.google.cloud.pubsublite.v1.SubscriberServiceSettings; +import com.google.cloud.pubsublite.v1.TopicStatsServiceClient; +import com.google.cloud.pubsublite.v1.TopicStatsServiceSettings; +import java.io.IOException; +import java.io.Serializable; +import java.util.Optional; +import javax.annotation.Nullable; +import org.apache.spark.sql.sources.v2.DataSourceOptions; + +@AutoValue +public abstract class PslDataSourceOptions implements Serializable { + private static final long serialVersionUID = 2680059304693561607L; + + @Nullable + public abstract String credentialsKey(); + + public abstract SubscriptionPath subscriptionPath(); + + @Nullable + public abstract FlowControlSettings flowControlSettings(); + + public abstract long maxBatchOffsetRange(); + + public static Builder builder() { + return new AutoValue_PslDataSourceOptions.Builder() + .setCredentialsKey(null) + // TODO(jiangmichael): Revisit this later about if we need to expose this as a user + // configurable option. Ideally we should expose bytes range/# msgs range not + // offsets range since PSL doesn't guarantee offset = msg. + .setMaxBatchOffsetRange(Constants.DEFAULT_BATCH_OFFSET_RANGE); + } + + public static PslDataSourceOptions fromSparkDataSourceOptions(DataSourceOptions options) { + if (!options.get(Constants.SUBSCRIPTION_CONFIG_KEY).isPresent()) { + throw new IllegalArgumentException(Constants.SUBSCRIPTION_CONFIG_KEY + " is required."); + } + + Builder builder = builder(); + Optional value; + if ((value = options.get(Constants.CREDENTIALS_KEY_CONFIG_KEY)).isPresent()) { + builder.setCredentialsKey(value.get()); + } + return builder + .setSubscriptionPath( + SubscriptionPath.parse(options.get(Constants.SUBSCRIPTION_CONFIG_KEY).get())) + .setFlowControlSettings( + FlowControlSettings.builder() + .setMessagesOutstanding( + options.getLong( + Constants.MESSAGES_OUTSTANDING_CONFIG_KEY, + Constants.DEFAULT_MESSAGES_OUTSTANDING)) + .setBytesOutstanding( + options.getLong( + Constants.BYTES_OUTSTANDING_CONFIG_KEY, + Constants.DEFAULT_BYTES_OUTSTANDING)) + .build()) + .build(); + } + + @AutoValue.Builder + public abstract static class Builder { + + public abstract Builder setCredentialsKey(String credentialsKey); + + public abstract Builder setSubscriptionPath(SubscriptionPath subscriptionPath); + + public abstract Builder setMaxBatchOffsetRange(long maxBatchOffsetRange); + + public abstract Builder setFlowControlSettings(FlowControlSettings flowControlSettings); + + public abstract PslDataSourceOptions build(); + } + + MultiPartitionCommitter newMultiPartitionCommitter(long topicPartitionCount) { + return new MultiPartitionCommitterImpl( + topicPartitionCount, + (partition) -> + CommitterBuilder.newBuilder() + .setSubscriptionPath(this.subscriptionPath()) + .setPartition(partition) + .setServiceClient(newCursorServiceClient()) + .build()); + } + + PartitionSubscriberFactory getSubscriberFactory() { + return (partition, consumer) -> { + PubsubContext context = PubsubContext.of(Constants.FRAMEWORK); + SubscriberServiceSettings.Builder settingsBuilder = + SubscriberServiceSettings.newBuilder() + .setCredentialsProvider(new PslCredentialsProvider(this)); + ServiceClients.addDefaultMetadata( + context, RoutingMetadata.of(this.subscriptionPath(), partition), settingsBuilder); + try { + SubscriberServiceClient serviceClient = + SubscriberServiceClient.create( + addDefaultSettings(this.subscriptionPath().location().region(), settingsBuilder)); + return SubscriberBuilder.newBuilder() + .setSubscriptionPath(this.subscriptionPath()) + .setPartition(partition) + .setContext(context) + .setServiceClient(serviceClient) + .setMessageConsumer(consumer) + .build(); + } catch (IOException e) { + throw new IllegalStateException("Failed to create subscriber service.", e); + } + }; + } + + // TODO(b/jiangmichael): Make XXXClientSettings accept creds so we could simplify below methods. + private CursorServiceClient newCursorServiceClient() { + try { + return CursorServiceClient.create( + addDefaultSettings( + this.subscriptionPath().location().region(), + CursorServiceSettings.newBuilder() + .setCredentialsProvider(new PslCredentialsProvider(this)))); + } catch (IOException e) { + throw new IllegalStateException("Unable to create CursorServiceClient."); + } + } + + CursorClient newCursorClient() { + return CursorClient.create( + CursorClientSettings.newBuilder() + .setRegion(this.subscriptionPath().location().region()) + .setServiceClient(newCursorServiceClient()) + .build()); + } + + private AdminServiceClient newAdminServiceClient() { + try { + return AdminServiceClient.create( + addDefaultSettings( + this.subscriptionPath().location().region(), + AdminServiceSettings.newBuilder() + .setCredentialsProvider(new PslCredentialsProvider(this)))); + } catch (IOException e) { + throw new IllegalStateException("Unable to create AdminServiceClient."); + } + } + + AdminClient newAdminClient() { + return AdminClient.create( + AdminClientSettings.newBuilder() + .setRegion(this.subscriptionPath().location().region()) + .setServiceClient(newAdminServiceClient()) + .build()); + } + + private TopicStatsServiceClient newTopicStatsServiceClient() { + try { + return TopicStatsServiceClient.create( + addDefaultSettings( + this.subscriptionPath().location().region(), + TopicStatsServiceSettings.newBuilder() + .setCredentialsProvider(new PslCredentialsProvider(this)))); + } catch (IOException e) { + throw new IllegalStateException("Unable to create TopicStatsServiceClient."); + } + } + + TopicStatsClient newTopicStatsClient() { + return TopicStatsClient.create( + TopicStatsClientSettings.newBuilder() + .setRegion(this.subscriptionPath().location().region()) + .setServiceClient(newTopicStatsServiceClient()) + .build()); + } +} diff --git a/src/main/java/com/google/cloud/pubsublite/spark/PslMicroBatchInputPartition.java b/src/main/java/com/google/cloud/pubsublite/spark/PslMicroBatchInputPartition.java new file mode 100644 index 00000000..42173680 --- /dev/null +++ b/src/main/java/com/google/cloud/pubsublite/spark/PslMicroBatchInputPartition.java @@ -0,0 +1,73 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.spark; + +import com.google.cloud.pubsublite.SubscriptionPath; +import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings; +import com.google.cloud.pubsublite.internal.BlockingPullSubscriber; +import com.google.cloud.pubsublite.internal.BlockingPullSubscriberImpl; +import com.google.cloud.pubsublite.internal.CheckedApiException; +import com.google.cloud.pubsublite.internal.wire.SubscriberFactory; +import com.google.cloud.pubsublite.proto.Cursor; +import com.google.cloud.pubsublite.proto.SeekRequest; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.sources.v2.reader.InputPartition; +import org.apache.spark.sql.sources.v2.reader.InputPartitionReader; + +public class PslMicroBatchInputPartition implements InputPartition { + + private final SubscriberFactory subscriberFactory; + private final SparkPartitionOffset startOffset; + private final SparkPartitionOffset endOffset; + private final SubscriptionPath subscriptionPath; + private final FlowControlSettings flowControlSettings; + + public PslMicroBatchInputPartition( + SubscriptionPath subscriptionPath, + FlowControlSettings flowControlSettings, + SparkPartitionOffset startOffset, + SparkPartitionOffset endOffset, + SubscriberFactory subscriberFactory) { + this.startOffset = startOffset; + this.endOffset = endOffset; + this.subscriptionPath = subscriptionPath; + this.flowControlSettings = flowControlSettings; + this.subscriberFactory = subscriberFactory; + } + + @Override + public InputPartitionReader createPartitionReader() { + BlockingPullSubscriber subscriber; + try { + subscriber = + new BlockingPullSubscriberImpl( + subscriberFactory, + flowControlSettings, + SeekRequest.newBuilder() + .setCursor( + Cursor.newBuilder() + .setOffset( + PslSparkUtils.toPslPartitionOffset(startOffset).offset().value()) + .build()) + .build()); + } catch (CheckedApiException e) { + throw new IllegalStateException( + "Unable to create PSL subscriber for " + endOffset.partition(), e); + } + return new PslMicroBatchInputPartitionReader(subscriptionPath, endOffset, subscriber); + } +} diff --git a/src/main/java/com/google/cloud/pubsublite/spark/PslMicroBatchInputPartitionReader.java b/src/main/java/com/google/cloud/pubsublite/spark/PslMicroBatchInputPartitionReader.java new file mode 100644 index 00000000..d2eebd23 --- /dev/null +++ b/src/main/java/com/google/cloud/pubsublite/spark/PslMicroBatchInputPartitionReader.java @@ -0,0 +1,100 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.spark; + +import static com.google.common.base.Preconditions.checkState; + +import com.google.cloud.pubsublite.SequencedMessage; +import com.google.cloud.pubsublite.SubscriptionPath; +import com.google.cloud.pubsublite.internal.BlockingPullSubscriber; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.flogger.GoogleLogger; +import java.time.Duration; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import javax.annotation.Nullable; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.sources.v2.reader.InputPartitionReader; + +public class PslMicroBatchInputPartitionReader implements InputPartitionReader { + private static final GoogleLogger log = GoogleLogger.forEnclosingClass(); + + private static final Duration SUBSCRIBER_PULL_TIMEOUT = Duration.ofSeconds(10); + + private final SubscriptionPath subscriptionPath; + private final SparkPartitionOffset endOffset; + private final BlockingPullSubscriber subscriber; + @Nullable private SequencedMessage currentMsg = null; + private boolean batchFulfilled = false; + + @VisibleForTesting + PslMicroBatchInputPartitionReader( + SubscriptionPath subscriptionPath, + SparkPartitionOffset endOffset, + BlockingPullSubscriber subscriber) { + this.subscriptionPath = subscriptionPath; + this.subscriber = subscriber; + this.endOffset = endOffset; + } + + @Override + public boolean next() { + if (batchFulfilled) { + return false; + } + Optional msg; + while (true) { + try { + subscriber.onData().get(SUBSCRIBER_PULL_TIMEOUT.getSeconds(), TimeUnit.SECONDS); + msg = subscriber.messageIfAvailable(); + break; + } catch (TimeoutException e) { + log.atWarning().log("Unable to get any messages in last " + SUBSCRIBER_PULL_TIMEOUT); + } catch (Throwable t) { + throw new IllegalStateException("Failed to retrieve messages.", t); + } + } + // since next() is only called on one thread at a time, we are sure that the message is + // available to this thread. + checkState(msg.isPresent()); + currentMsg = msg.get(); + if (currentMsg.offset().value() == endOffset.offset()) { + // this is the last msg for the batch. + batchFulfilled = true; + } else if (currentMsg.offset().value() > endOffset.offset()) { + batchFulfilled = true; + return false; + } + return true; + } + + @Override + public InternalRow get() { + checkState(currentMsg != null); + return PslSparkUtils.toInternalRow(currentMsg, subscriptionPath, endOffset.partition()); + } + + @Override + public void close() { + try { + subscriber.close(); + } catch (Exception e) { + log.atWarning().log("Subscriber failed to close."); + } + } +} diff --git a/src/main/java/com/google/cloud/pubsublite/spark/PslMicroBatchReader.java b/src/main/java/com/google/cloud/pubsublite/spark/PslMicroBatchReader.java new file mode 100644 index 00000000..9f71a17c --- /dev/null +++ b/src/main/java/com/google/cloud/pubsublite/spark/PslMicroBatchReader.java @@ -0,0 +1,143 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.spark; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; + +import com.google.cloud.pubsublite.SubscriptionPath; +import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings; +import com.google.cloud.pubsublite.internal.CursorClient; +import com.google.cloud.pubsublite.internal.wire.SubscriberFactory; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import javax.annotation.Nullable; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.sources.v2.reader.InputPartition; +import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader; +import org.apache.spark.sql.sources.v2.reader.streaming.Offset; +import org.apache.spark.sql.types.StructType; + +public class PslMicroBatchReader implements MicroBatchReader { + + private final CursorClient cursorClient; + private final MultiPartitionCommitter committer; + private final PartitionSubscriberFactory partitionSubscriberFactory; + private final PerTopicHeadOffsetReader headOffsetReader; + private final SubscriptionPath subscriptionPath; + private final FlowControlSettings flowControlSettings; + private final long topicPartitionCount; + @Nullable private SparkSourceOffset startOffset = null; + private SparkSourceOffset endOffset; + + public PslMicroBatchReader( + CursorClient cursorClient, + MultiPartitionCommitter committer, + PartitionSubscriberFactory partitionSubscriberFactory, + PerTopicHeadOffsetReader headOffsetReader, + SubscriptionPath subscriptionPath, + FlowControlSettings flowControlSettings, + long topicPartitionCount) { + this.cursorClient = cursorClient; + this.committer = committer; + this.partitionSubscriberFactory = partitionSubscriberFactory; + this.headOffsetReader = headOffsetReader; + this.subscriptionPath = subscriptionPath; + this.flowControlSettings = flowControlSettings; + this.topicPartitionCount = topicPartitionCount; + } + + @Override + public void setOffsetRange(Optional start, Optional end) { + if (start.isPresent()) { + checkArgument( + SparkSourceOffset.class.isAssignableFrom(start.get().getClass()), + "start offset is not assignable to PslSourceOffset."); + startOffset = (SparkSourceOffset) start.get(); + } else { + startOffset = + PslSparkUtils.getSparkStartOffset(cursorClient, subscriptionPath, topicPartitionCount); + } + if (end.isPresent()) { + checkArgument( + SparkSourceOffset.class.isAssignableFrom(end.get().getClass()), + "start offset is not assignable to PslSourceOffset."); + endOffset = (SparkSourceOffset) end.get(); + } else { + endOffset = PslSparkUtils.toSparkSourceOffset(headOffsetReader.getHeadOffset()); + } + } + + @Override + public Offset getStartOffset() { + return startOffset; + } + + @Override + public Offset getEndOffset() { + return endOffset; + } + + @Override + public Offset deserializeOffset(String json) { + return SparkSourceOffset.fromJson(json); + } + + @Override + public void commit(Offset end) { + checkArgument( + SparkSourceOffset.class.isAssignableFrom(end.getClass()), + "end offset is not assignable to SparkSourceOffset."); + committer.commit(PslSparkUtils.toPslSourceOffset((SparkSourceOffset) end)); + } + + @Override + public void stop() { + committer.close(); + } + + @Override + public StructType readSchema() { + return Constants.DEFAULT_SCHEMA; + } + + @Override + public List> planInputPartitions() { + checkState(startOffset != null); + List> list = new ArrayList<>(); + for (SparkPartitionOffset offset : startOffset.getPartitionOffsetMap().values()) { + SparkPartitionOffset endPartitionOffset = + endOffset.getPartitionOffsetMap().get(offset.partition()); + if (offset.equals(endPartitionOffset)) { + // There is no message to pull for this partition. + continue; + } + PartitionSubscriberFactory partitionSubscriberFactory = this.partitionSubscriberFactory; + SubscriberFactory subscriberFactory = + (consumer) -> partitionSubscriberFactory.newSubscriber(offset.partition(), consumer); + list.add( + new PslMicroBatchInputPartition( + subscriptionPath, + flowControlSettings, + offset, + endPartitionOffset, + subscriberFactory)); + } + return list; + } +} diff --git a/src/main/java/com/google/cloud/pubsublite/spark/PslPartitionOffset.java b/src/main/java/com/google/cloud/pubsublite/spark/PslPartitionOffset.java new file mode 100644 index 00000000..bd6893c3 --- /dev/null +++ b/src/main/java/com/google/cloud/pubsublite/spark/PslPartitionOffset.java @@ -0,0 +1,45 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.spark; + +import com.google.auto.value.AutoValue; +import com.google.cloud.pubsublite.Offset; +import com.google.cloud.pubsublite.Partition; +import java.io.Serializable; + +@AutoValue +public abstract class PslPartitionOffset implements Serializable { + private static final long serialVersionUID = -5446439851334065339L; + + public abstract Partition partition(); + + public abstract Offset offset(); + + public static Builder builder() { + return new AutoValue_PslPartitionOffset.Builder(); + } + + @AutoValue.Builder + public abstract static class Builder { + + public abstract Builder partition(Partition partition); + + public abstract Builder offset(Offset offset); + + public abstract PslPartitionOffset build(); + } +} diff --git a/src/main/java/com/google/cloud/pubsublite/spark/PslSourceOffset.java b/src/main/java/com/google/cloud/pubsublite/spark/PslSourceOffset.java new file mode 100644 index 00000000..c8b51233 --- /dev/null +++ b/src/main/java/com/google/cloud/pubsublite/spark/PslSourceOffset.java @@ -0,0 +1,40 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.spark; + +import com.google.auto.value.AutoValue; +import com.google.cloud.pubsublite.Offset; +import com.google.cloud.pubsublite.Partition; +import java.util.Map; + +@AutoValue +public abstract class PslSourceOffset { + + public abstract Map partitionOffsetMap(); + + public static Builder builder() { + return new AutoValue_PslSourceOffset.Builder(); + } + + @AutoValue.Builder + public abstract static class Builder { + + public abstract Builder partitionOffsetMap(Map partitionOffsetMap); + + public abstract PslSourceOffset build(); + } +} diff --git a/src/main/java/com/google/cloud/pubsublite/spark/PslSparkUtils.java b/src/main/java/com/google/cloud/pubsublite/spark/PslSparkUtils.java new file mode 100644 index 00000000..962d5dc0 --- /dev/null +++ b/src/main/java/com/google/cloud/pubsublite/spark/PslSparkUtils.java @@ -0,0 +1,135 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.spark; + +import static com.google.common.base.Preconditions.checkArgument; +import static scala.collection.JavaConverters.asScalaBufferConverter; + +import com.google.cloud.pubsublite.Offset; +import com.google.cloud.pubsublite.Partition; +import com.google.cloud.pubsublite.SequencedMessage; +import com.google.cloud.pubsublite.SubscriptionPath; +import com.google.cloud.pubsublite.internal.CursorClient; +import com.google.common.collect.ListMultimap; +import com.google.protobuf.ByteString; +import com.google.protobuf.util.Timestamps; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.util.ArrayBasedMapData; +import org.apache.spark.sql.catalyst.util.GenericArrayData; +import org.apache.spark.unsafe.types.ByteArray; +import org.apache.spark.unsafe.types.UTF8String; + +public class PslSparkUtils { + private static ArrayBasedMapData convertAttributesToSparkMap( + ListMultimap attributeMap) { + + List keyList = new ArrayList<>(); + List valueList = new ArrayList<>(); + + attributeMap + .asMap() + .forEach( + (key, value) -> { + keyList.add(UTF8String.fromString(key)); + List attributeVals = + value.stream() + .map(v -> ByteArray.concat(v.toByteArray())) + .collect(Collectors.toList()); + valueList.add(new GenericArrayData(asScalaBufferConverter(attributeVals).asScala())); + }); + + return new ArrayBasedMapData( + new GenericArrayData(asScalaBufferConverter(keyList).asScala()), + new GenericArrayData(asScalaBufferConverter(valueList).asScala())); + } + + public static InternalRow toInternalRow( + SequencedMessage msg, SubscriptionPath subscription, Partition partition) { + List list = + new ArrayList<>( + Arrays.asList( + UTF8String.fromString(subscription.toString()), + partition.value(), + msg.offset().value(), + ByteArray.concat(msg.message().key().toByteArray()), + ByteArray.concat(msg.message().data().toByteArray()), + Timestamps.toMillis(msg.publishTime()), + msg.message().eventTime().isPresent() + ? Timestamps.toMillis(msg.message().eventTime().get()) + : null, + convertAttributesToSparkMap(msg.message().attributes()))); + return InternalRow.apply(asScalaBufferConverter(list).asScala()); + } + + public static SparkSourceOffset toSparkSourceOffset(PslSourceOffset pslSourceOffset) { + return new SparkSourceOffset( + pslSourceOffset.partitionOffsetMap().entrySet().stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, + e -> + SparkPartitionOffset.builder() + .partition(Partition.of(e.getKey().value())) + .offset(e.getValue().value() - 1) + .build()))); + } + + public static PslSourceOffset toPslSourceOffset(SparkSourceOffset sparkSourceOffset) { + long partitionCount = sparkSourceOffset.getPartitionOffsetMap().size(); + Map pslSourceOffsetMap = new HashMap<>(); + for (long i = 0; i < partitionCount; i++) { + Partition p = Partition.of(i); + checkArgument(sparkSourceOffset.getPartitionOffsetMap().containsKey(p)); + pslSourceOffsetMap.put( + p, Offset.of(sparkSourceOffset.getPartitionOffsetMap().get(p).offset() + 1)); + } + return PslSourceOffset.builder().partitionOffsetMap(pslSourceOffsetMap).build(); + } + + public static PslPartitionOffset toPslPartitionOffset(SparkPartitionOffset sparkPartitionOffset) { + return PslPartitionOffset.builder() + .partition(sparkPartitionOffset.partition()) + .offset(Offset.of(sparkPartitionOffset.offset() + 1)) + .build(); + } + + public static SparkSourceOffset getSparkStartOffset( + CursorClient cursorClient, SubscriptionPath subscriptionPath, long topicPartitionCount) { + try { + Map pslSourceOffsetMap = new HashMap<>(); + for (int i = 0; i < topicPartitionCount; i++) { + pslSourceOffsetMap.put(Partition.of(i), com.google.cloud.pubsublite.Offset.of(0)); + } + cursorClient + .listPartitionCursors(subscriptionPath) + .get() + .forEach(pslSourceOffsetMap::replace); + return PslSparkUtils.toSparkSourceOffset( + PslSourceOffset.builder().partitionOffsetMap(pslSourceOffsetMap).build()); + } catch (InterruptedException | ExecutionException e) { + throw new IllegalStateException( + "Failed to get information from PSL and construct startOffset", e); + } + } +} diff --git a/src/main/java/com/google/cloud/pubsublite/spark/SparkPartitionOffset.java b/src/main/java/com/google/cloud/pubsublite/spark/SparkPartitionOffset.java new file mode 100644 index 00000000..4f984b94 --- /dev/null +++ b/src/main/java/com/google/cloud/pubsublite/spark/SparkPartitionOffset.java @@ -0,0 +1,48 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.spark; + +import com.google.auto.value.AutoValue; +import com.google.cloud.pubsublite.Partition; +import java.io.Serializable; +import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset; + +@AutoValue +abstract class SparkPartitionOffset implements PartitionOffset, Serializable { + private static final long serialVersionUID = -3398208694782540866L; + + abstract Partition partition(); + + abstract long offset(); + + public static SparkPartitionOffset create(Partition partition, long offset) { + return builder().partition(partition).offset(offset).build(); + } + + public static Builder builder() { + return new AutoValue_SparkPartitionOffset.Builder(); + } + + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder partition(Partition partition); + + public abstract Builder offset(long offset); + + public abstract SparkPartitionOffset build(); + } +} diff --git a/src/main/java/com/google/cloud/pubsublite/spark/SparkSourceOffset.java b/src/main/java/com/google/cloud/pubsublite/spark/SparkSourceOffset.java new file mode 100644 index 00000000..4dcfc87f --- /dev/null +++ b/src/main/java/com/google/cloud/pubsublite/spark/SparkSourceOffset.java @@ -0,0 +1,121 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.spark; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.google.cloud.pubsublite.Partition; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +public final class SparkSourceOffset + extends org.apache.spark.sql.sources.v2.reader.streaming.Offset { + private static final ObjectMapper objectMapper = + new ObjectMapper().configure(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, true); + + // Using a map to ensure unique partitions. + private final ImmutableMap partitionOffsetMap; + + public SparkSourceOffset(Map map) { + validateMap(map); + this.partitionOffsetMap = ImmutableMap.copyOf(map); + } + + private static void validateMap(Map map) { + map.forEach( + (k, v) -> + checkArgument( + Objects.equals(k, v.partition()), + "Key(Partition) and value(SparkPartitionOffset)'s partition don't match.")); + } + + public static SparkSourceOffset merge(SparkSourceOffset o1, SparkSourceOffset o2) { + Map result = new HashMap<>(o1.partitionOffsetMap); + o2.partitionOffsetMap.forEach( + (k, v) -> + result.merge( + k, + v, + (v1, v2) -> + SparkPartitionOffset.builder() + .partition(Partition.of(k.value())) + .offset(Collections.max(ImmutableList.of(v1.offset(), v2.offset()))) + .build())); + return new SparkSourceOffset(result); + } + + public static SparkSourceOffset merge(SparkPartitionOffset[] offsets) { + Map map = new HashMap<>(); + for (SparkPartitionOffset po : offsets) { + checkArgument( + !map.containsKey(po.partition()), "Multiple PslPartitionOffset has same partition."); + map.put( + po.partition(), + SparkPartitionOffset.builder().partition(po.partition()).offset(po.offset()).build()); + } + return new SparkSourceOffset(map); + } + + @SuppressWarnings("unchecked") + public static SparkSourceOffset fromJson(String json) { + Map map; + try { + // TODO: Use TypeReference instead of Map.class, currently TypeReference breaks spark with + // java.lang.LinkageError: loader constraint violation: loader previously initiated loading + // for a different type. + map = objectMapper.readValue(json, Map.class); + } catch (IOException e) { + throw new IllegalStateException("Unable to deserialize PslSourceOffset.", e); + } + Map partitionOffsetMap = + map.entrySet().stream() + .collect( + Collectors.toMap( + e -> Partition.of(Long.parseLong(e.getKey())), + e -> + SparkPartitionOffset.builder() + .partition(Partition.of(Long.parseLong(e.getKey()))) + .offset(e.getValue().longValue()) + .build())); + return new SparkSourceOffset(partitionOffsetMap); + } + + public Map getPartitionOffsetMap() { + return this.partitionOffsetMap; + } + + @Override + public String json() { + try { + Map map = + partitionOffsetMap.entrySet().stream() + .collect(Collectors.toMap(e -> e.getKey().value(), e -> e.getValue().offset())); + return objectMapper.writeValueAsString(map); + } catch (JsonProcessingException e) { + throw new IllegalStateException("Unable to serialize PslSourceOffset.", e); + } + } +} diff --git a/src/test/java/com/google/cloud/pubsublite/spark/LimitingHeadOffsetReaderTest.java b/src/test/java/com/google/cloud/pubsublite/spark/LimitingHeadOffsetReaderTest.java new file mode 100644 index 00000000..0007dd89 --- /dev/null +++ b/src/test/java/com/google/cloud/pubsublite/spark/LimitingHeadOffsetReaderTest.java @@ -0,0 +1,69 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.spark; + +import static com.google.common.truth.Truth.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.api.core.ApiFutures; +import com.google.cloud.pubsublite.Offset; +import com.google.cloud.pubsublite.Partition; +import com.google.cloud.pubsublite.internal.TopicStatsClient; +import com.google.cloud.pubsublite.internal.testing.UnitTestExamples; +import com.google.cloud.pubsublite.proto.Cursor; +import com.google.common.testing.FakeTicker; +import java.util.concurrent.TimeUnit; +import org.junit.Test; + +public class LimitingHeadOffsetReaderTest { + + private final FakeTicker ticker = new FakeTicker(); + private final TopicStatsClient topicStatsClient = mock(TopicStatsClient.class); + private final LimitingHeadOffsetReader reader = + new LimitingHeadOffsetReader( + topicStatsClient, UnitTestExamples.exampleTopicPath(), 1, ticker::read); + + @Test + public void testRead() { + Cursor cursor1 = Cursor.newBuilder().setOffset(10).build(); + Cursor cursor2 = Cursor.newBuilder().setOffset(13).build(); + when(topicStatsClient.computeHeadCursor(UnitTestExamples.exampleTopicPath(), Partition.of(0))) + .thenReturn(ApiFutures.immediateFuture(cursor1)); + assertThat(reader.getHeadOffset().partitionOffsetMap()) + .containsExactly(Partition.of(0), Offset.of(10)); + verify(topicStatsClient).computeHeadCursor(any(), any()); + + reset(topicStatsClient); + ticker.advance(59, TimeUnit.SECONDS); + assertThat(reader.getHeadOffset().partitionOffsetMap()) + .containsExactly(Partition.of(0), Offset.of(cursor1.getOffset())); + verify(topicStatsClient, times(0)).computeHeadCursor(any(), any()); + + reset(topicStatsClient); + ticker.advance(2, TimeUnit.SECONDS); + when(topicStatsClient.computeHeadCursor(UnitTestExamples.exampleTopicPath(), Partition.of(0))) + .thenReturn(ApiFutures.immediateFuture(cursor2)); + assertThat(reader.getHeadOffset().partitionOffsetMap()) + .containsExactly(Partition.of(0), Offset.of(cursor2.getOffset())); + verify(topicStatsClient).computeHeadCursor(any(), any()); + } +} diff --git a/src/test/java/com/google/cloud/pubsublite/spark/MultiPartitionCommitterImplTest.java b/src/test/java/com/google/cloud/pubsublite/spark/MultiPartitionCommitterImplTest.java new file mode 100644 index 00000000..a9fbf3a2 --- /dev/null +++ b/src/test/java/com/google/cloud/pubsublite/spark/MultiPartitionCommitterImplTest.java @@ -0,0 +1,90 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.spark; + +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.*; + +import com.google.api.core.SettableApiFuture; +import com.google.cloud.pubsublite.*; +import com.google.cloud.pubsublite.internal.wire.Committer; +import com.google.common.collect.ImmutableMap; +import org.junit.Test; + +public class MultiPartitionCommitterImplTest { + + @Test + public void testCommit() { + Committer committer1 = mock(Committer.class); + Committer committer2 = mock(Committer.class); + when(committer1.startAsync()) + .thenReturn(committer1) + .thenThrow(new IllegalStateException("should only init once")); + when(committer2.startAsync()) + .thenReturn(committer2) + .thenThrow(new IllegalStateException("should only init once")); + MultiPartitionCommitterImpl multiCommitter = + new MultiPartitionCommitterImpl( + 2, + (p) -> { + if (p.value() == 0L) { + return committer1; + } else { + return committer2; + } + }); + verify(committer1, times(1)).startAsync(); + verify(committer2, times(1)).startAsync(); + + PslSourceOffset offset = + PslSourceOffset.builder() + .partitionOffsetMap( + ImmutableMap.of( + Partition.of(0), Offset.of(10L), + Partition.of(1), Offset.of(8L))) + .build(); + SettableApiFuture future1 = SettableApiFuture.create(); + SettableApiFuture future2 = SettableApiFuture.create(); + when(committer1.commitOffset(eq(Offset.of(10L)))).thenReturn(future1); + when(committer2.commitOffset(eq(Offset.of(8L)))).thenReturn(future2); + multiCommitter.commit(offset); + verify(committer1, times(1)).commitOffset(eq(Offset.of(10L))); + verify(committer2, times(1)).commitOffset(eq(Offset.of(8L))); + } + + @Test + public void testClose() { + Committer committer = mock(Committer.class); + when(committer.startAsync()) + .thenReturn(committer) + .thenThrow(new IllegalStateException("should only init once")); + MultiPartitionCommitterImpl multiCommitter = + new MultiPartitionCommitterImpl(1, (p) -> committer); + + PslSourceOffset offset = + PslSourceOffset.builder() + .partitionOffsetMap(ImmutableMap.of(Partition.of(0), Offset.of(10L))) + .build(); + SettableApiFuture future1 = SettableApiFuture.create(); + when(committer.commitOffset(eq(Offset.of(10L)))).thenReturn(future1); + when(committer.stopAsync()).thenReturn(committer); + multiCommitter.commit(offset); + + multiCommitter.close(); + verify(committer, times(1)).stopAsync(); + } +} diff --git a/src/test/java/com/google/cloud/pubsublite/spark/PslContinuousInputPartitionReaderTest.java b/src/test/java/com/google/cloud/pubsublite/spark/PslContinuousInputPartitionReaderTest.java new file mode 100644 index 00000000..1e6ee0dc --- /dev/null +++ b/src/test/java/com/google/cloud/pubsublite/spark/PslContinuousInputPartitionReaderTest.java @@ -0,0 +1,84 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.spark; + +import static com.google.common.truth.Truth.assertThat; +import static org.mockito.Mockito.*; + +import com.google.api.core.ApiFutures; +import com.google.cloud.pubsublite.*; +import com.google.cloud.pubsublite.internal.BlockingPullSubscriberImpl; +import com.google.cloud.pubsublite.internal.testing.UnitTestExamples; +import com.google.protobuf.ByteString; +import com.google.protobuf.util.Timestamps; +import java.util.Optional; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousInputPartitionReader; +import org.junit.Test; + +public class PslContinuousInputPartitionReaderTest { + + private final BlockingPullSubscriberImpl subscriber = mock(BlockingPullSubscriberImpl.class); + private ContinuousInputPartitionReader reader; + + private static SequencedMessage newMessage(long offset) { + return SequencedMessage.of( + Message.builder().setData(ByteString.copyFromUtf8("text")).build(), + Timestamps.EPOCH, + Offset.of(offset), + 10000); + } + + private static void verifyInternalRow(InternalRow row, long expectedOffset) { + assertThat(row.getString(0)).isEqualTo(UnitTestExamples.exampleSubscriptionPath().toString()); + assertThat(row.getLong(1)).isEqualTo(UnitTestExamples.examplePartition().value()); + assertThat(row.getLong(2)).isEqualTo(expectedOffset); + } + + private void createReader() { + reader = + new PslContinuousInputPartitionReader( + UnitTestExamples.exampleSubscriptionPath(), + SparkPartitionOffset.builder() + .partition(UnitTestExamples.examplePartition()) + .offset(UnitTestExamples.exampleOffset().value()) + .build(), + subscriber); + } + + @Test + public void testPartitionReader() throws Exception { + createReader(); + SequencedMessage message1 = newMessage(10); + SequencedMessage message2 = newMessage(13); + + // Multiple get w/o next will return same msg. + when(subscriber.onData()).thenReturn(ApiFutures.immediateFuture(null)); + when(subscriber.messageIfAvailable()).thenReturn(Optional.of(message1)); + assertThat(reader.next()).isTrue(); + verifyInternalRow(reader.get(), 10L); + verifyInternalRow(reader.get(), 10L); + assertThat(((SparkPartitionOffset) reader.getOffset()).offset()).isEqualTo(10L); + + // Next will advance to next msg. + when(subscriber.onData()).thenReturn(ApiFutures.immediateFuture(null)); + when(subscriber.messageIfAvailable()).thenReturn(Optional.of(message2)); + assertThat(reader.next()).isTrue(); + verifyInternalRow(reader.get(), 13L); + assertThat(((SparkPartitionOffset) reader.getOffset()).offset()).isEqualTo(13L); + } +} diff --git a/src/test/java/com/google/cloud/pubsublite/spark/PslContinuousReaderTest.java b/src/test/java/com/google/cloud/pubsublite/spark/PslContinuousReaderTest.java new file mode 100644 index 00000000..b4982caa --- /dev/null +++ b/src/test/java/com/google/cloud/pubsublite/spark/PslContinuousReaderTest.java @@ -0,0 +1,125 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.spark; + +import static com.google.common.truth.Truth.assertThat; +import static org.mockito.Mockito.*; +import static org.mockito.Mockito.mock; + +import com.google.api.core.ApiFutures; +import com.google.cloud.pubsublite.*; +import com.google.cloud.pubsublite.internal.CursorClient; +import com.google.cloud.pubsublite.internal.testing.UnitTestExamples; +import com.google.common.collect.ImmutableMap; +import java.util.Optional; +import org.junit.Test; + +public class PslContinuousReaderTest { + + private static final PslDataSourceOptions OPTIONS = + PslDataSourceOptions.builder() + .setSubscriptionPath(UnitTestExamples.exampleSubscriptionPath()) + .build(); + private final CursorClient cursorClient = mock(CursorClient.class); + private final MultiPartitionCommitter committer = mock(MultiPartitionCommitter.class); + private final PartitionSubscriberFactory partitionSubscriberFactory = + mock(PartitionSubscriberFactory.class); + private final PslContinuousReader reader = + new PslContinuousReader( + cursorClient, + committer, + partitionSubscriberFactory, + UnitTestExamples.exampleSubscriptionPath(), + OPTIONS.flowControlSettings(), + 2); + + @Test + public void testEmptyStartOffset() { + when(cursorClient.listPartitionCursors(UnitTestExamples.exampleSubscriptionPath())) + .thenReturn( + ApiFutures.immediateFuture( + ImmutableMap.of(Partition.of(1L), UnitTestExamples.exampleOffset()))); + + reader.setStartOffset(Optional.empty()); + assertThat(((SparkSourceOffset) reader.getStartOffset()).getPartitionOffsetMap()) + .containsExactly( + Partition.of(0L), + SparkPartitionOffset.builder().partition(Partition.of(0L)).offset(-1L).build(), + Partition.of(1L), + SparkPartitionOffset.builder() + .partition(Partition.of(1L)) + .offset(UnitTestExamples.exampleOffset().value() - 1) + .build()); + } + + @Test + public void testValidStartOffset() { + SparkSourceOffset offset = + new SparkSourceOffset( + ImmutableMap.of( + Partition.of(1), + SparkPartitionOffset.builder() + .partition(Partition.of(1)) + .offset(UnitTestExamples.exampleOffset().value()) + .build())); + reader.setStartOffset(Optional.of(offset)); + assertThat(reader.getStartOffset()).isEqualTo(offset); + } + + @Test + public void testMergeOffsets() { + SparkPartitionOffset po1 = + SparkPartitionOffset.builder().partition(Partition.of(1L)).offset(10L).build(); + SparkPartitionOffset po2 = + SparkPartitionOffset.builder().partition(Partition.of(2L)).offset(5L).build(); + assertThat(reader.mergeOffsets(new SparkPartitionOffset[] {po1, po2})) + .isEqualTo(SparkSourceOffset.merge(new SparkPartitionOffset[] {po1, po2})); + } + + @Test + public void testDeserializeOffset() { + SparkSourceOffset offset = + new SparkSourceOffset( + ImmutableMap.of( + Partition.of(1L), + SparkPartitionOffset.builder().partition(Partition.of(1L)).offset(10L).build())); + assertThat(reader.deserializeOffset(offset.json())).isEqualTo(offset); + } + + @Test + public void testCommit() { + SparkSourceOffset offset = + new SparkSourceOffset( + ImmutableMap.of( + Partition.of(0L), + SparkPartitionOffset.builder().partition(Partition.of(0L)).offset(10L).build(), + Partition.of(1L), + SparkPartitionOffset.builder() + .partition(Partition.of(1L)) + .offset(50L) + .build())); + PslSourceOffset expectedCommitOffset = + PslSourceOffset.builder() + .partitionOffsetMap( + ImmutableMap.of( + Partition.of(0L), Offset.of(11L), + Partition.of(1L), Offset.of(51L))) + .build(); + reader.commit(offset); + verify(committer, times(1)).commit(eq(expectedCommitOffset)); + } +} diff --git a/src/test/java/com/google/cloud/pubsublite/spark/PslMicroBatchInputPartitionReaderTest.java b/src/test/java/com/google/cloud/pubsublite/spark/PslMicroBatchInputPartitionReaderTest.java new file mode 100644 index 00000000..935e7a63 --- /dev/null +++ b/src/test/java/com/google/cloud/pubsublite/spark/PslMicroBatchInputPartitionReaderTest.java @@ -0,0 +1,109 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.spark; + +import static com.google.common.truth.Truth.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.google.api.core.ApiFutures; +import com.google.cloud.pubsublite.Message; +import com.google.cloud.pubsublite.Offset; +import com.google.cloud.pubsublite.SequencedMessage; +import com.google.cloud.pubsublite.internal.BlockingPullSubscriberImpl; +import com.google.cloud.pubsublite.internal.testing.UnitTestExamples; +import com.google.protobuf.ByteString; +import com.google.protobuf.util.Timestamps; +import java.util.Optional; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.sources.v2.reader.InputPartitionReader; +import org.junit.Test; + +public class PslMicroBatchInputPartitionReaderTest { + + private final BlockingPullSubscriberImpl subscriber = mock(BlockingPullSubscriberImpl.class); + private InputPartitionReader reader; + + private static SequencedMessage newMessage(long offset) { + return SequencedMessage.of( + Message.builder().setData(ByteString.copyFromUtf8("text")).build(), + Timestamps.EPOCH, + Offset.of(offset), + 10000); + } + + private static void verifyInternalRow(InternalRow row, long expectedOffset) { + assertThat(row.getString(0)).isEqualTo(UnitTestExamples.exampleSubscriptionPath().toString()); + assertThat(row.getLong(1)).isEqualTo(UnitTestExamples.examplePartition().value()); + assertThat(row.getLong(2)).isEqualTo(expectedOffset); + } + + private void createReader(long endOffset) { + reader = + new PslMicroBatchInputPartitionReader( + UnitTestExamples.exampleSubscriptionPath(), + SparkPartitionOffset.builder() + .partition(UnitTestExamples.examplePartition()) + .offset(endOffset) + .build(), + subscriber); + } + + @Test + public void testPartitionReader() throws Exception { + long endOffset = 14L; + createReader(endOffset); + SequencedMessage message1 = newMessage(10L); + SequencedMessage message2 = newMessage(endOffset); + + // Multiple get w/o next will return same msg. + when(subscriber.onData()).thenReturn(ApiFutures.immediateFuture(null)); + when(subscriber.messageIfAvailable()).thenReturn(Optional.of(message1)); + assertThat(reader.next()).isTrue(); + verifyInternalRow(reader.get(), 10L); + verifyInternalRow(reader.get(), 10L); + + // Next will advance to next msg which is also the last msg in the batch. + when(subscriber.onData()).thenReturn(ApiFutures.immediateFuture(null)); + when(subscriber.messageIfAvailable()).thenReturn(Optional.of(message2)); + assertThat(reader.next()).isTrue(); + verifyInternalRow(reader.get(), 14L); + + // Now it already reached the end of the batch + assertThat(reader.next()).isFalse(); + } + + @Test + public void testPartitionReaderNewMessageExceedsRange() throws Exception { + long endOffset = 14L; + createReader(endOffset); + SequencedMessage message1 = newMessage(10L); + SequencedMessage message2 = newMessage(endOffset + 1); + + // Multiple get w/o next will return same msg. + when(subscriber.onData()).thenReturn(ApiFutures.immediateFuture(null)); + when(subscriber.messageIfAvailable()).thenReturn(Optional.of(message1)); + assertThat(reader.next()).isTrue(); + verifyInternalRow(reader.get(), 10L); + verifyInternalRow(reader.get(), 10L); + + // Next will advance to next msg, and recognize it's out of the batch range. + when(subscriber.onData()).thenReturn(ApiFutures.immediateFuture(null)); + when(subscriber.messageIfAvailable()).thenReturn(Optional.of(message2)); + assertThat(reader.next()).isFalse(); + } +} diff --git a/src/test/java/com/google/cloud/pubsublite/spark/PslMicroBatchReaderTest.java b/src/test/java/com/google/cloud/pubsublite/spark/PslMicroBatchReaderTest.java new file mode 100644 index 00000000..c8f64408 --- /dev/null +++ b/src/test/java/com/google/cloud/pubsublite/spark/PslMicroBatchReaderTest.java @@ -0,0 +1,122 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.spark; + +import static com.google.common.truth.Truth.assertThat; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.*; +import static org.mockito.Mockito.times; + +import com.google.api.core.ApiFutures; +import com.google.cloud.pubsublite.Offset; +import com.google.cloud.pubsublite.Partition; +import com.google.cloud.pubsublite.internal.CursorClient; +import com.google.cloud.pubsublite.internal.testing.UnitTestExamples; +import com.google.common.collect.ImmutableMap; +import java.util.Optional; +import org.junit.Test; + +public class PslMicroBatchReaderTest { + private static final PslDataSourceOptions OPTIONS = + PslDataSourceOptions.builder() + .setSubscriptionPath(UnitTestExamples.exampleSubscriptionPath()) + .build(); + private final CursorClient cursorClient = mock(CursorClient.class); + private final MultiPartitionCommitter committer = mock(MultiPartitionCommitter.class); + private final PartitionSubscriberFactory partitionSubscriberFactory = + mock(PartitionSubscriberFactory.class); + private final PerTopicHeadOffsetReader headOffsetReader = mock(PerTopicHeadOffsetReader.class); + private final PslMicroBatchReader reader = + new PslMicroBatchReader( + cursorClient, + committer, + partitionSubscriberFactory, + headOffsetReader, + UnitTestExamples.exampleSubscriptionPath(), + OPTIONS.flowControlSettings(), + 2); + + private PslSourceOffset createPslSourceOffsetTwoPartition(long offset0, long offset1) { + return PslSourceOffset.builder() + .partitionOffsetMap( + ImmutableMap.of( + Partition.of(0L), Offset.of(offset0), Partition.of(1L), Offset.of(offset1))) + .build(); + } + + private SparkSourceOffset createSparkSourceOffsetTwoPartition(long offset0, long offset1) { + return new SparkSourceOffset( + ImmutableMap.of( + Partition.of(0L), + SparkPartitionOffset.create(Partition.of(0L), offset0), + Partition.of(1L), + SparkPartitionOffset.create(Partition.of(1L), offset1))); + } + + @Test + public void testEmptyOffsets() { + when(cursorClient.listPartitionCursors(UnitTestExamples.exampleSubscriptionPath())) + .thenReturn(ApiFutures.immediateFuture(ImmutableMap.of(Partition.of(0L), Offset.of(100L)))); + when(headOffsetReader.getHeadOffset()).thenReturn(createPslSourceOffsetTwoPartition(301L, 0L)); + reader.setOffsetRange(Optional.empty(), Optional.empty()); + assertThat(((SparkSourceOffset) reader.getStartOffset()).getPartitionOffsetMap()) + .containsExactly( + Partition.of(0L), + SparkPartitionOffset.create(Partition.of(0L), 99L), + Partition.of(1L), + SparkPartitionOffset.create(Partition.of(1L), -1L)); + assertThat(((SparkSourceOffset) reader.getEndOffset()).getPartitionOffsetMap()) + .containsExactly( + Partition.of(0L), + SparkPartitionOffset.create(Partition.of(0L), 300L), + Partition.of(1L), + SparkPartitionOffset.create(Partition.of(1L), -1L)); + } + + @Test + public void testValidOffsets() { + SparkSourceOffset startOffset = createSparkSourceOffsetTwoPartition(10L, 100L); + SparkSourceOffset endOffset = createSparkSourceOffsetTwoPartition(20L, 300L); + reader.setOffsetRange(Optional.of(startOffset), Optional.of(endOffset)); + assertThat(reader.getStartOffset()).isEqualTo(startOffset); + assertThat(reader.getEndOffset()).isEqualTo(endOffset); + } + + @Test + public void testDeserializeOffset() { + SparkSourceOffset offset = + new SparkSourceOffset( + ImmutableMap.of(Partition.of(1L), SparkPartitionOffset.create(Partition.of(1L), 10L))); + assertThat(reader.deserializeOffset(offset.json())).isEqualTo(offset); + } + + @Test + public void testCommit() { + SparkSourceOffset offset = createSparkSourceOffsetTwoPartition(10L, 50L); + PslSourceOffset expectedCommitOffset = createPslSourceOffsetTwoPartition(11L, 51L); + reader.commit(offset); + verify(committer, times(1)).commit(eq(expectedCommitOffset)); + } + + @Test + public void testPlanInputPartitionNoMessage() { + SparkSourceOffset startOffset = createSparkSourceOffsetTwoPartition(10L, 100L); + SparkSourceOffset endOffset = createSparkSourceOffsetTwoPartition(20L, 100L); + reader.setOffsetRange(Optional.of(startOffset), Optional.of(endOffset)); + assertThat(reader.planInputPartitions()).hasSize(1); + } +} diff --git a/src/test/java/com/google/cloud/pubsublite/spark/PslSparkUtilsTest.java b/src/test/java/com/google/cloud/pubsublite/spark/PslSparkUtilsTest.java new file mode 100644 index 00000000..5dbdf65e --- /dev/null +++ b/src/test/java/com/google/cloud/pubsublite/spark/PslSparkUtilsTest.java @@ -0,0 +1,85 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.spark; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.cloud.pubsublite.Message; +import com.google.cloud.pubsublite.Offset; +import com.google.cloud.pubsublite.Partition; +import com.google.cloud.pubsublite.SequencedMessage; +import com.google.cloud.pubsublite.internal.testing.UnitTestExamples; +import com.google.common.collect.ImmutableListMultimap; +import com.google.common.collect.ImmutableMap; +import com.google.protobuf.ByteString; +import com.google.protobuf.Timestamp; +import org.junit.Test; + +public class PslSparkUtilsTest { + + @Test + public void testToInternalRow() { + Message message = + Message.builder() + .setKey(ByteString.copyFromUtf8("key")) + .setData(ByteString.copyFromUtf8("data")) + .setEventTime(Timestamp.newBuilder().setSeconds(10000000L).setNanos(10).build()) + .setAttributes( + ImmutableListMultimap.of( + "key1", ByteString.copyFromUtf8("val1"), + "key1", ByteString.copyFromUtf8("val2"), + "key2", ByteString.copyFromUtf8("val3"))) + .build(); + SequencedMessage sequencedMessage = + SequencedMessage.of( + message, + Timestamp.newBuilder().setSeconds(10000000L).setNanos(10).build(), + Offset.of(10L), + 10L); + PslSparkUtils.toInternalRow( + sequencedMessage, + UnitTestExamples.exampleSubscriptionPath(), + UnitTestExamples.examplePartition()); + } + + @Test + public void testSourceOffsetConversion() { + PslSourceOffset pslSourceOffset = + PslSourceOffset.builder() + .partitionOffsetMap( + ImmutableMap.of(Partition.of(0L), Offset.of(10), Partition.of(1L), Offset.of(50))) + .build(); + + SparkSourceOffset sparkSourceOffset = PslSparkUtils.toSparkSourceOffset(pslSourceOffset); + assertThat(sparkSourceOffset.getPartitionOffsetMap().get(Partition.of(0L)).offset()) + .isEqualTo(9L); + assertThat(sparkSourceOffset.getPartitionOffsetMap().get(Partition.of(1L)).offset()) + .isEqualTo(49L); + + assertThat(PslSparkUtils.toPslSourceOffset(sparkSourceOffset)).isEqualTo(pslSourceOffset); + } + + @Test + public void testToPslPartitionOffset() { + SparkPartitionOffset sparkPartitionOffset = + SparkPartitionOffset.builder().partition(Partition.of(1L)).offset(10L).build(); + PslPartitionOffset pslPartitionOffset = + PslPartitionOffset.builder().partition(Partition.of(1L)).offset(Offset.of(11L)).build(); + assertThat(PslSparkUtils.toPslPartitionOffset(sparkPartitionOffset)) + .isEqualTo(pslPartitionOffset); + } +} diff --git a/src/test/java/com/google/cloud/pubsublite/spark/SparkSourceOffsetTest.java b/src/test/java/com/google/cloud/pubsublite/spark/SparkSourceOffsetTest.java new file mode 100644 index 00000000..78010e07 --- /dev/null +++ b/src/test/java/com/google/cloud/pubsublite/spark/SparkSourceOffsetTest.java @@ -0,0 +1,130 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.spark; + +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.fail; + +import com.google.cloud.pubsublite.Partition; +import com.google.common.collect.ImmutableMap; +import org.junit.Test; + +public class SparkSourceOffsetTest { + + @Test + public void roundTrip() { + SparkSourceOffset offset = + new SparkSourceOffset( + ImmutableMap.of( + // Intentionally unsorted, the serialization should make it sorted. + Partition.of(3L), + SparkPartitionOffset.builder().partition(Partition.of(3L)).offset(10L).build(), + Partition.of(1L), + SparkPartitionOffset.builder().partition(Partition.of(1L)).offset(5L).build(), + Partition.of(2L), + SparkPartitionOffset.builder().partition(Partition.of(2L)).offset(8L).build())); + assertThat(offset.json()).isEqualTo("{\"1\":5,\"2\":8,\"3\":10}"); + assertThat(SparkSourceOffset.fromJson(offset.json())).isEqualTo(offset); + } + + @Test + public void mergeSparkSourceOffsets() { + SparkSourceOffset o1 = + new SparkSourceOffset( + ImmutableMap.of( + Partition.of(1L), + SparkPartitionOffset.builder().partition(Partition.of(1L)).offset(10L).build(), + Partition.of(2L), + SparkPartitionOffset.builder().partition(Partition.of(2L)).offset(8L).build(), + Partition.of(3L), + SparkPartitionOffset.builder() + .partition(Partition.of(3L)) + .offset(10L) + .build())); + SparkSourceOffset o2 = + new SparkSourceOffset( + ImmutableMap.of( + Partition.of(2L), + SparkPartitionOffset.builder().partition(Partition.of(2L)).offset(8L).build(), + Partition.of(3L), + SparkPartitionOffset.builder().partition(Partition.of(3L)).offset(11L).build(), + Partition.of(4L), + SparkPartitionOffset.builder().partition(Partition.of(4L)).offset(1L).build())); + SparkSourceOffset expected = + new SparkSourceOffset( + ImmutableMap.of( + Partition.of(1L), + SparkPartitionOffset.builder().partition(Partition.of(1L)).offset(10L).build(), + Partition.of(2L), + SparkPartitionOffset.builder().partition(Partition.of(2L)).offset(8L).build(), + Partition.of(3L), + SparkPartitionOffset.builder().partition(Partition.of(3L)).offset(11L).build(), + Partition.of(4L), + SparkPartitionOffset.builder().partition(Partition.of(4L)).offset(1L).build())); + assertThat(SparkSourceOffset.merge(o1, o2)).isEqualTo(expected); + } + + @Test + public void mergeSparkPartitionOffsetsDuplicatePartition() { + SparkPartitionOffset[] offsets = { + SparkPartitionOffset.builder().partition(Partition.of(1L)).offset(5L).build(), + SparkPartitionOffset.builder().partition(Partition.of(1L)).offset(4L).build(), + SparkPartitionOffset.builder().partition(Partition.of(3L)).offset(10L).build() + }; + try { + SparkSourceOffset.merge(offsets); + fail(); + } catch (IllegalArgumentException e) { + assertThat(e).hasMessageThat().contains("same partition"); + } + } + + @Test + public void mergeSparkPartitionOffsets() { + SparkPartitionOffset[] offsets = { + SparkPartitionOffset.builder().partition(Partition.of(1L)).offset(5L).build(), + SparkPartitionOffset.builder().partition(Partition.of(2L)).offset(4L).build(), + SparkPartitionOffset.builder().partition(Partition.of(3L)).offset(10L).build() + }; + SparkSourceOffset expected = + new SparkSourceOffset( + ImmutableMap.of( + Partition.of(1L), + SparkPartitionOffset.builder().partition(Partition.of(1L)).offset(5L).build(), + Partition.of(2L), + SparkPartitionOffset.builder().partition(Partition.of(2L)).offset(4L).build(), + Partition.of(3L), + SparkPartitionOffset.builder() + .partition(Partition.of(3L)) + .offset(10L) + .build())); + assertThat(SparkSourceOffset.merge(offsets)).isEqualTo(expected); + } + + @Test + public void invalidMap() { + try { + new SparkSourceOffset( + ImmutableMap.of( + Partition.of(3L), + SparkPartitionOffset.builder().partition(Partition.of(2L)).offset(10L).build())); + fail(); + } catch (IllegalArgumentException e) { + assertThat(e).hasMessageThat().contains("don't match"); + } + } +}