From ae82cbe716655417203cd545fda2e78f23d2584c Mon Sep 17 00:00:00 2001 From: jiangmichaellll <40044148+jiangmichaellll@users.noreply.github.com> Date: Tue, 26 Jan 2021 17:00:58 -0500 Subject: [PATCH] feat: remove Spark connector module from this repo (#467) The source code now lives in https://github.com/googleapis/java-pubsublite-spark --- pom.xml | 1 - pubsublite-spark-sql-streaming/README.md | 134 --------- .../examples/simple_read.py | 33 --- pubsublite-spark-sql-streaming/pom.xml | 273 ------------------ .../cloud/pubsublite/spark/Constants.java | 55 ---- .../spark/LimitingHeadOffsetReader.java | 100 ------- .../spark/MultiPartitionCommitter.java | 32 -- .../spark/MultiPartitionCommitterImpl.java | 78 ----- .../spark/PartitionSubscriberFactory.java | 31 -- .../spark/PerTopicHeadOffsetReader.java | 28 -- .../spark/PslContinuousInputPartition.java | 85 ------ .../PslContinuousInputPartitionReader.java | 88 ------ .../pubsublite/spark/PslContinuousReader.java | 133 --------- .../spark/PslCredentialsProvider.java | 61 ---- .../cloud/pubsublite/spark/PslDataSource.java | 103 ------- .../spark/PslDataSourceOptions.java | 209 -------------- .../spark/PslMicroBatchInputPartition.java | 73 ----- .../PslMicroBatchInputPartitionReader.java | 100 ------- .../pubsublite/spark/PslMicroBatchReader.java | 143 --------- .../pubsublite/spark/PslPartitionOffset.java | 45 --- .../pubsublite/spark/PslSourceOffset.java | 40 --- .../cloud/pubsublite/spark/PslSparkUtils.java | 135 --------- .../spark/SparkPartitionOffset.java | 48 --- .../pubsublite/spark/SparkSourceOffset.java | 121 -------- .../spark/LimitingHeadOffsetReaderTest.java | 69 ----- .../MultiPartitionCommitterImplTest.java | 90 ------ ...PslContinuousInputPartitionReaderTest.java | 84 ------ .../spark/PslContinuousReaderTest.java | 125 -------- ...PslMicroBatchInputPartitionReaderTest.java | 109 ------- .../spark/PslMicroBatchReaderTest.java | 122 -------- .../pubsublite/spark/PslSparkUtilsTest.java | 85 ------ .../spark/SparkSourceOffsetTest.java | 130 --------- 32 files changed, 2963 deletions(-) delete mode 100644 pubsublite-spark-sql-streaming/README.md delete mode 100644 pubsublite-spark-sql-streaming/examples/simple_read.py delete mode 100644 pubsublite-spark-sql-streaming/pom.xml delete mode 100644 pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/Constants.java delete mode 100644 pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/LimitingHeadOffsetReader.java delete mode 100644 pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/MultiPartitionCommitter.java delete mode 100644 pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/MultiPartitionCommitterImpl.java delete mode 100644 pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/PartitionSubscriberFactory.java delete mode 100644 pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/PerTopicHeadOffsetReader.java delete mode 100644 pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/PslContinuousInputPartition.java delete mode 100644 pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/PslContinuousInputPartitionReader.java delete mode 100644 pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/PslContinuousReader.java delete mode 100644 pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/PslCredentialsProvider.java delete mode 100644 pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/PslDataSource.java delete mode 100644 pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/PslDataSourceOptions.java delete mode 100644 pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/PslMicroBatchInputPartition.java delete mode 100644 pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/PslMicroBatchInputPartitionReader.java delete mode 100644 pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/PslMicroBatchReader.java delete mode 100644 pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/PslPartitionOffset.java delete mode 100644 pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/PslSourceOffset.java delete mode 100644 pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/PslSparkUtils.java delete mode 100644 pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/SparkPartitionOffset.java delete mode 100644 pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/SparkSourceOffset.java delete mode 100644 pubsublite-spark-sql-streaming/src/test/java/com/google/cloud/pubsublite/spark/LimitingHeadOffsetReaderTest.java delete mode 100644 pubsublite-spark-sql-streaming/src/test/java/com/google/cloud/pubsublite/spark/MultiPartitionCommitterImplTest.java delete mode 100644 pubsublite-spark-sql-streaming/src/test/java/com/google/cloud/pubsublite/spark/PslContinuousInputPartitionReaderTest.java delete mode 100644 pubsublite-spark-sql-streaming/src/test/java/com/google/cloud/pubsublite/spark/PslContinuousReaderTest.java delete mode 100644 pubsublite-spark-sql-streaming/src/test/java/com/google/cloud/pubsublite/spark/PslMicroBatchInputPartitionReaderTest.java delete mode 100644 pubsublite-spark-sql-streaming/src/test/java/com/google/cloud/pubsublite/spark/PslMicroBatchReaderTest.java delete mode 100644 pubsublite-spark-sql-streaming/src/test/java/com/google/cloud/pubsublite/spark/PslSparkUtilsTest.java delete mode 100644 pubsublite-spark-sql-streaming/src/test/java/com/google/cloud/pubsublite/spark/SparkSourceOffsetTest.java diff --git a/pom.xml b/pom.xml index e278da134..80de096fd 100644 --- a/pom.xml +++ b/pom.xml @@ -121,7 +121,6 @@ google-cloud-pubsublite grpc-google-cloud-pubsublite-v1 proto-google-cloud-pubsublite-v1 - pubsublite-spark-sql-streaming pubsublite-beam-io diff --git a/pubsublite-spark-sql-streaming/README.md b/pubsublite-spark-sql-streaming/README.md deleted file mode 100644 index bd5ca9c27..000000000 --- a/pubsublite-spark-sql-streaming/README.md +++ /dev/null @@ -1,134 +0,0 @@ -# Apache Spark SQL Streaming connector for Google PubSub Lite (Unreleased) - -The connector is a custom implementation of [Spark Structured Streaming](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html) -that supports reading messages from [Google PubSub Lite](https://cloud.google.com/pubsub/lite/docs) subscriptions into Spark. - -## Disclaimer - -This connector is still being worked on. Backwards-incompatible changes may be made until version 1.0.0 is released. - -## Requirements - -### Enable the PubSub Lite API - -Follow [these instructions](https://cloud.google.com/pubsub/lite/docs/quickstart#before-you-begin). - -### Create a new subscription or use existing subscription - -Follow [the instruction](https://cloud.google.com/pubsub/lite/docs/quickstart#create_a_lite_subscription) to create a new -subscription or use existing subscription. If using existing subscription, the connector will read message from the -oldest unacknowledged. - -### Create a Google Cloud Dataproc cluster (Optional) - -If you do not have an Apache Spark environment you can create a Cloud Dataproc cluster with pre-configured auth. The following examples assume you are using Cloud Dataproc, but you can use `spark-submit` on any cluster. - -``` -MY_CLUSTER=... -gcloud dataproc clusters create "$MY_CLUSTER" -``` - -## Downloading and Using the Connector - - -The latest version connector of the connector (Scala 2.11) is publicly available in -gs://spark-lib/pubsublite/spark-pubsublite-latest.jar. - - -The connector is also available from the Maven Central -repository. It can be used using the `--packages` option or the -`spark.jars.packages` configuration property. Use the following value - -| Scala version | Connector Artifact | -| --- | --- | -| Scala 2.11 | `com.google.cloud.pubsublite.spark:pubsublite-spark-sql-streaming-with-dependencies_2.11:0.1.0` | - - - -## Usage - -### Reading data from PubSub Lite - -``` -df = spark.readStream \ - .option("pubsublite.subscription", "projects/123456789/locations/us-central1-a/subscriptions/test-spark-subscription") - .format("pubsublite") \ - .load -``` - -Note that the connector supports both MicroBatch Processing and [Continuous Processing](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#continuous-processing). - -### Properties - -The connector supports a number of options to configure the read: - -| Option | Type | Required | Meaning | -| ------ | ---- | -------- | ------- | -| pubsublite.subscription | String | Y | Full subscription path that the connector will read from. | -| pubsublite.flowcontrol.byteoutstandingperpartition | Long | N | Max number of bytes per partition that will be cached in workers before Spark processes the messages. Default to 50000000 bytes. | -| pubsublite.flowcontrol.messageoutstandingperpartition | Long | N | Max number of messages per partition that will be cached in workers before Spark processes the messages. Default to Long.MAX_VALUE. | -| gcp.credentials.key | String | N | Service account JSON in base64. Default to [Application Default Credentials](https://cloud.google.com/docs/authentication/production#automatically). | - -### Data Schema - -The connector has fixed data schema as follows: - -| Data Field | Spark Data Type | Notes | -| ---------- | --------------- | ----- | -| subscription | StringType | Full subscription path | -| partition | LongType | | -| offset | LongType | | -| key | BinaryType | | -| data | BinaryType | | -| attributes | MapType\[StringType, ArrayType\[BinaryType\]\] | | -| publish_timestamp | TimestampType | | -| event_timestamp | TimestampType | Nullable | - -## Compiling with the connector - -To include the connector in your project: - -### Maven - -```xml - - com.google.cloud.pubsublite.spark - pubsublite-spark-sql-streaming-with-dependencies_2.11 - 0.1.0 - -``` - -### SBT - -```sbt -libraryDependencies += "com.google.cloud.pubsublite.spark" %% "pubsublite-spark-sql-streaming-with-dependencies_2.11" % "0.1.0" -``` - -## Building the Connector - -The connector is built using Maven. Following command creates a jar with shaded dependencies: - -``` -mvn package -``` - -## FAQ - -### What is the Pricing for the PubSub Lite? - -See the [PubSub Lite pricing documentation](https://cloud.google.com/pubsub/lite/pricing). - -### Can I configure the number of spark partitions? - -No, the number of spark partitions is set to be the number of PubSub Lite partitions of the topic that the supplied subscription is for. - -### How do I authenticate outside GCE / Dataproc? - -Use a service account JSON key and `GOOGLE_APPLICATION_CREDENTIALS` as described [here](https://cloud.google.com/docs/authentication/getting-started). - -Credentials can be provided with `gcp.credentials.key` option, it needs be passed in as a base64-encoded string directly. - -Example: -``` -spark.readStream.format("pubsublite").option("gcp.credentials.key", "") -``` diff --git a/pubsublite-spark-sql-streaming/examples/simple_read.py b/pubsublite-spark-sql-streaming/examples/simple_read.py deleted file mode 100644 index de9887d68..000000000 --- a/pubsublite-spark-sql-streaming/examples/simple_read.py +++ /dev/null @@ -1,33 +0,0 @@ -#!/usr/bin/env python -# Copyright 2020 Google Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from pyspark.sql import SparkSession -import sys - -full_subscription_path = sys.argv[1] - -spark = SparkSession.builder.appName('Simple PubSub Lite Read').master('yarn').getOrCreate() - -spark \ - .readStream \ - .format('pubsublite') \ - .option('pubsublite.subscription', full_subscription_path) \ - .load() \ - .writeStream \ - .format('console') \ - .outputMode('append') \ - .trigger(processingTime='1 second') \ - .start() \ - .awaitTermination() \ No newline at end of file diff --git a/pubsublite-spark-sql-streaming/pom.xml b/pubsublite-spark-sql-streaming/pom.xml deleted file mode 100644 index 113deb5bd..000000000 --- a/pubsublite-spark-sql-streaming/pom.xml +++ /dev/null @@ -1,273 +0,0 @@ - - - - com.google.cloud - google-cloud-pubsublite-parent - 0.8.1-SNAPSHOT - ../pom.xml - - 4.0.0 - com.google.cloud - pubsublite-spark-sql-streaming - 0.8.1-SNAPSHOT - jar - Pub/Sub Lite Spark SQL Streaming - https://github.com/googleapis/java-pubsublite - Spark SQL Streaming Connector for Google Cloud Pub/Sub Lite - - 1.8 - 1.8 - UTF-8 - 2.11.12 - 2.11 - 2.4.7 - - - - org.apache.spark - spark-sql_${scala.version.short} - ${spark.version} - provided - - - org.apache.spark - spark-catalyst_${scala.version.short} - ${spark.version} - provided - - - org.apache.spark - spark-unsafe_${scala.version.short} - ${spark.version} - provided - - - com.google.cloud - google-cloud-pubsublite - 0.8.1-SNAPSHOT - - - com.google.api.grpc - proto-google-cloud-pubsublite-v1 - 0.8.1-SNAPSHOT - - - com.google.guava - guava - - - com.google.api - gax - - - com.google.auto.value - auto-value-annotations - - - com.google.auto.service - auto-service-annotations - - - com.google.code.findbugs - jsr305 - - - com.google.auth - google-auth-library-credentials - - - com.google.auth - google-auth-library-oauth2-http - - - com.google.http-client - google-http-client - - - com.google.protobuf - protobuf-java - - - com.google.protobuf - protobuf-java-util - - - com.google.flogger - google-extensions - - - com.google.api - api-common - - - com.fasterxml.jackson.core - jackson-core - 2.12.1 - - - com.github.ben-manes.caffeine - caffeine - 2.8.8 - - - com.fasterxml.jackson.core - jackson-databind - 2.12.1 - - - org.scala-lang - scala-library - ${scala.version} - provided - - - - - junit - junit - 4.13.1 - test - - - com.google.truth - truth - 1.1.2 - test - - - org.mockito - mockito-core - test - 3.7.0 - - - com.google.guava - guava-testlib - test - - - - - src/main/java - src/test/java - target/classes - target/test-classes - - - org.apache.maven.plugins - maven-shade-plugin - 3.2.4 - - - package - - shade - - - - - - - - - *:* - - META-INF/maven/** - META-INF/*.MF - META-INF/*.SF - META-INF/*.DSA - META-INF/*.RSA - - - - - - com - repackaged.com - - com.google.protobuf.** - com.google.common.** - - - - - io.grpc.netty.shaded - - com.google.cloud.pubsublite.repackaged.io.grpc.netty.shaded - - - - io - com.google.cloud.pubsublite.repackaged.io - - io.grpc.** - io.opencensus.** - io.perfmark.** - - - - META-INF/native/io_grpc_netty_shaded_ - - META-INF/native/com_google_cloud_pubsublite_repackaged_io_grpc_netty_shaded_ - - - - META-INF/native/libio_grpc_netty_shaded_ - - META-INF/native/libcom_google_cloud_pubsublite_repackaged_io_grpc_netty_shaded_ - - - - true - with-dependencies - - - - - - org.sonatype.plugins - nexus-staging-maven-plugin - - - true - - - - org.apache.maven.plugins - maven-enforcer-plugin - 3.0.0-M3 - - - enforce - - - - WARN - - org.checkerframework:checker-compat-qual - - - - - - org.apache.commons.logging.* - org.apache.commons.collections.* - org.apache.spark.unused.* - org.apache.hadoop.yarn.* - javax.ws.rs.* - - true - - - - - enforce - - - - - - - diff --git a/pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/Constants.java b/pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/Constants.java deleted file mode 100644 index bdc8a1bd4..000000000 --- a/pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/Constants.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Copyright 2020 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.pubsublite.spark; - -import com.google.cloud.pubsublite.internal.wire.PubsubContext; -import org.apache.spark.sql.types.DataTypes; -import org.apache.spark.sql.types.Metadata; -import org.apache.spark.sql.types.StructField; -import org.apache.spark.sql.types.StructType; - -public class Constants { - public static long DEFAULT_BYTES_OUTSTANDING = 50_000_000; - public static long DEFAULT_MESSAGES_OUTSTANDING = Long.MAX_VALUE; - public static int DEFAULT_BATCH_OFFSET_RANGE = 100_000; - public static StructType DEFAULT_SCHEMA = - new StructType( - new StructField[] { - new StructField("subscription", DataTypes.StringType, false, Metadata.empty()), - new StructField("partition", DataTypes.LongType, false, Metadata.empty()), - new StructField("offset", DataTypes.LongType, false, Metadata.empty()), - new StructField("key", DataTypes.BinaryType, false, Metadata.empty()), - new StructField("data", DataTypes.BinaryType, false, Metadata.empty()), - new StructField("publish_timestamp", DataTypes.TimestampType, false, Metadata.empty()), - new StructField("event_timestamp", DataTypes.TimestampType, true, Metadata.empty()), - new StructField( - "attributes", - DataTypes.createMapType( - DataTypes.StringType, DataTypes.createArrayType(DataTypes.BinaryType)), - true, - Metadata.empty()) - }); - - public static final PubsubContext.Framework FRAMEWORK = PubsubContext.Framework.of("SPARK"); - - public static String BYTES_OUTSTANDING_CONFIG_KEY = - "pubsublite.flowcontrol.byteoutstandingperpartition"; - public static String MESSAGES_OUTSTANDING_CONFIG_KEY = - "pubsublite.flowcontrol.messageoutstandingperparition"; - public static String SUBSCRIPTION_CONFIG_KEY = "pubsublite.subscription"; - public static String CREDENTIALS_KEY_CONFIG_KEY = "gcp.credentials.key"; -} diff --git a/pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/LimitingHeadOffsetReader.java b/pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/LimitingHeadOffsetReader.java deleted file mode 100644 index 5954492f8..000000000 --- a/pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/LimitingHeadOffsetReader.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Copyright 2020 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.pubsublite.spark; - -import com.github.benmanes.caffeine.cache.AsyncLoadingCache; -import com.github.benmanes.caffeine.cache.Caffeine; -import com.github.benmanes.caffeine.cache.Ticker; -import com.google.api.core.ApiFutureCallback; -import com.google.api.core.ApiFutures; -import com.google.cloud.pubsublite.Offset; -import com.google.cloud.pubsublite.Partition; -import com.google.cloud.pubsublite.TopicPath; -import com.google.cloud.pubsublite.internal.TopicStatsClient; -import com.google.cloud.pubsublite.proto.Cursor; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.util.concurrent.MoreExecutors; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; -import java.util.concurrent.TimeUnit; - -/** - * Rate limited HeadOffsetReader, utilizing a LoadingCache that refreshes all partitions head - * offsets for the topic at most once per minute. - */ -public class LimitingHeadOffsetReader implements PerTopicHeadOffsetReader { - - private final TopicStatsClient topicStatsClient; - private final TopicPath topic; - private final long topicPartitionCount; - private final AsyncLoadingCache cachedHeadOffsets; - - @VisibleForTesting - public LimitingHeadOffsetReader( - TopicStatsClient topicStatsClient, TopicPath topic, long topicPartitionCount, Ticker ticker) { - this.topicStatsClient = topicStatsClient; - this.topic = topic; - this.topicPartitionCount = topicPartitionCount; - this.cachedHeadOffsets = - Caffeine.newBuilder() - .ticker(ticker) - .expireAfterWrite(1, TimeUnit.MINUTES) - .buildAsync(this::loadHeadOffset); - } - - private CompletableFuture loadHeadOffset(Partition partition, Executor executor) { - - CompletableFuture result = new CompletableFuture<>(); - ApiFutures.addCallback( - topicStatsClient.computeHeadCursor(topic, partition), - new ApiFutureCallback() { - @Override - public void onFailure(Throwable t) { - result.completeExceptionally(t); - } - - @Override - public void onSuccess(Cursor c) { - result.complete(Offset.of(c.getOffset())); - } - }, - MoreExecutors.directExecutor()); - return result; - } - - @Override - public PslSourceOffset getHeadOffset() { - Set keySet = new HashSet<>(); - for (int i = 0; i < topicPartitionCount; i++) { - keySet.add(Partition.of(i)); - } - CompletableFuture> future = cachedHeadOffsets.getAll(keySet); - try { - return PslSourceOffset.builder().partitionOffsetMap(future.get()).build(); - } catch (Throwable t) { - throw new IllegalStateException("Unable to compute head offset for topic: " + topic, t); - } - } - - @Override - public void close() { - topicStatsClient.close(); - } -} diff --git a/pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/MultiPartitionCommitter.java b/pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/MultiPartitionCommitter.java deleted file mode 100644 index d42f33cab..000000000 --- a/pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/MultiPartitionCommitter.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright 2020 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.pubsublite.spark; - -import com.google.cloud.pubsublite.Partition; -import com.google.cloud.pubsublite.internal.wire.Committer; -import java.io.Closeable; - -public interface MultiPartitionCommitter extends Closeable { - - interface CommitterFactory { - Committer newCommitter(Partition partition); - } - - void commit(PslSourceOffset offset); - - void close(); -} diff --git a/pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/MultiPartitionCommitterImpl.java b/pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/MultiPartitionCommitterImpl.java deleted file mode 100644 index f672242f8..000000000 --- a/pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/MultiPartitionCommitterImpl.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Copyright 2020 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.pubsublite.spark; - -import com.google.api.core.ApiFuture; -import com.google.api.core.ApiFutureCallback; -import com.google.api.core.ApiFutures; -import com.google.cloud.pubsublite.Partition; -import com.google.cloud.pubsublite.internal.wire.Committer; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.flogger.GoogleLogger; -import com.google.common.util.concurrent.MoreExecutors; -import java.util.HashMap; -import java.util.Map; - -public class MultiPartitionCommitterImpl implements MultiPartitionCommitter { - private static final GoogleLogger log = GoogleLogger.forEnclosingClass(); - - private final Map committerMap = new HashMap<>(); - - @VisibleForTesting - MultiPartitionCommitterImpl(long topicPartitionCount, CommitterFactory committerFactory) { - for (int i = 0; i < topicPartitionCount; i++) { - Partition p = Partition.of(i); - Committer committer = committerFactory.newCommitter(p); - committer.startAsync().awaitRunning(); - committerMap.put(p, committer); - } - } - - @Override - public synchronized void close() { - committerMap.values().forEach(c -> c.stopAsync().awaitTerminated()); - } - - @Override - public synchronized void commit(PslSourceOffset offset) { - offset - .partitionOffsetMap() - .forEach( - (p, o) -> { - // Note we don't need to worry about commit offset disorder here since Committer - // guarantees the ordering. Once commitOffset() returns, it's either already - // sent to stream, or waiting for next connection to open to be sent in order. - ApiFuture future = committerMap.get(p).commitOffset(o); - ApiFutures.addCallback( - future, - new ApiFutureCallback() { - @Override - public void onFailure(Throwable t) { - if (!future.isCancelled()) { - log.atWarning().log("Failed to commit %s,%s.", p.value(), o.value(), t); - } - } - - @Override - public void onSuccess(Void result) { - log.atInfo().log("Committed %s,%s.", p.value(), o.value()); - } - }, - MoreExecutors.directExecutor()); - }); - } -} diff --git a/pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/PartitionSubscriberFactory.java b/pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/PartitionSubscriberFactory.java deleted file mode 100644 index 9ea516706..000000000 --- a/pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/PartitionSubscriberFactory.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright 2020 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.pubsublite.spark; - -import com.google.api.gax.rpc.ApiException; -import com.google.cloud.pubsublite.Partition; -import com.google.cloud.pubsublite.SequencedMessage; -import com.google.cloud.pubsublite.internal.wire.Subscriber; -import com.google.common.collect.ImmutableList; -import java.io.Serializable; -import java.util.function.Consumer; - -public interface PartitionSubscriberFactory extends Serializable { - Subscriber newSubscriber( - Partition partition, Consumer> message_consumer) - throws ApiException; -} diff --git a/pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/PerTopicHeadOffsetReader.java b/pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/PerTopicHeadOffsetReader.java deleted file mode 100644 index 21e0bc634..000000000 --- a/pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/PerTopicHeadOffsetReader.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright 2020 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.pubsublite.spark; - -import java.io.Closeable; - -public interface PerTopicHeadOffsetReader extends Closeable { - - // Gets the head offsets for all partitions in the topic. Blocks. - PslSourceOffset getHeadOffset(); - - @Override - void close(); -} diff --git a/pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/PslContinuousInputPartition.java b/pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/PslContinuousInputPartition.java deleted file mode 100644 index 93e763f67..000000000 --- a/pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/PslContinuousInputPartition.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Copyright 2020 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.pubsublite.spark; - -import static com.google.common.base.Preconditions.checkArgument; - -import com.google.cloud.pubsublite.SubscriptionPath; -import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings; -import com.google.cloud.pubsublite.internal.BlockingPullSubscriberImpl; -import com.google.cloud.pubsublite.internal.CheckedApiException; -import com.google.cloud.pubsublite.internal.wire.SubscriberFactory; -import com.google.cloud.pubsublite.proto.Cursor; -import com.google.cloud.pubsublite.proto.SeekRequest; -import java.io.Serializable; -import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.sources.v2.reader.ContinuousInputPartition; -import org.apache.spark.sql.sources.v2.reader.InputPartitionReader; -import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset; - -public class PslContinuousInputPartition - implements ContinuousInputPartition, Serializable { - - private final SubscriberFactory subscriberFactory; - private final SparkPartitionOffset startOffset; - private final SubscriptionPath subscriptionPath; - private final FlowControlSettings flowControlSettings; - - public PslContinuousInputPartition( - SubscriberFactory subscriberFactory, - SparkPartitionOffset startOffset, - SubscriptionPath subscriptionPath, - FlowControlSettings flowControlSettings) { - this.subscriberFactory = subscriberFactory; - this.startOffset = startOffset; - this.subscriptionPath = subscriptionPath; - this.flowControlSettings = flowControlSettings; - } - - @Override - public InputPartitionReader createContinuousReader(PartitionOffset offset) { - checkArgument( - SparkPartitionOffset.class.isAssignableFrom(offset.getClass()), - "offset is not assignable to SparkPartitionOffset"); - - SparkPartitionOffset sparkPartitionOffset = (SparkPartitionOffset) offset; - PslPartitionOffset pslPartitionOffset = - PslSparkUtils.toPslPartitionOffset(sparkPartitionOffset); - - BlockingPullSubscriberImpl subscriber; - try { - subscriber = - new BlockingPullSubscriberImpl( - subscriberFactory, - flowControlSettings, - SeekRequest.newBuilder() - .setCursor( - Cursor.newBuilder().setOffset(pslPartitionOffset.offset().value()).build()) - .build()); - } catch (CheckedApiException e) { - throw new IllegalStateException( - "Unable to create PSL subscriber for " + startOffset.toString(), e); - } - return new PslContinuousInputPartitionReader( - subscriptionPath, sparkPartitionOffset, subscriber); - } - - @Override - public InputPartitionReader createPartitionReader() { - return createContinuousReader(startOffset); - } -} diff --git a/pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/PslContinuousInputPartitionReader.java b/pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/PslContinuousInputPartitionReader.java deleted file mode 100644 index 4929ad3e9..000000000 --- a/pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/PslContinuousInputPartitionReader.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Copyright 2020 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.pubsublite.spark; - -import static com.google.common.base.Preconditions.checkState; - -import com.google.cloud.pubsublite.SequencedMessage; -import com.google.cloud.pubsublite.SubscriptionPath; -import com.google.cloud.pubsublite.internal.BlockingPullSubscriberImpl; -import com.google.common.flogger.GoogleLogger; -import java.util.Optional; -import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousInputPartitionReader; -import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset; - -public class PslContinuousInputPartitionReader - implements ContinuousInputPartitionReader { - private static final GoogleLogger log = GoogleLogger.forEnclosingClass(); - - private final SubscriptionPath subscriptionPath; - private final BlockingPullSubscriberImpl subscriber; - private SparkPartitionOffset currentOffset; - private SequencedMessage currentMsg; - - PslContinuousInputPartitionReader( - SubscriptionPath subscriptionPath, - SparkPartitionOffset startOffset, - BlockingPullSubscriberImpl subscriber) { - this.subscriptionPath = subscriptionPath; - this.currentOffset = startOffset; - this.subscriber = subscriber; - this.currentMsg = null; - } - - @Override - public PartitionOffset getOffset() { - return currentOffset; - } - - @Override - public boolean next() { - try { - subscriber.onData().get(); - // since next() will not be called concurrently, we are sure that the message - // is available to this thread. - Optional msg = subscriber.messageIfAvailable(); - checkState(msg.isPresent()); - currentMsg = msg.get(); - currentOffset = - SparkPartitionOffset.builder() - .partition(currentOffset.partition()) - .offset(currentMsg.offset().value()) - .build(); - return true; - } catch (Throwable t) { - throw new IllegalStateException("Failed to retrieve messages.", t); - } - } - - @Override - public InternalRow get() { - checkState(currentMsg != null); - return PslSparkUtils.toInternalRow(currentMsg, subscriptionPath, currentOffset.partition()); - } - - @Override - public void close() { - try { - subscriber.close(); - } catch (Exception e) { - log.atWarning().log("Subscriber failed to close."); - } - } -} diff --git a/pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/PslContinuousReader.java b/pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/PslContinuousReader.java deleted file mode 100644 index 339385948..000000000 --- a/pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/PslContinuousReader.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Copyright 2020 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.pubsublite.spark; - -import static com.google.common.base.Preconditions.checkArgument; - -import com.google.cloud.pubsublite.SubscriptionPath; -import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings; -import com.google.cloud.pubsublite.internal.CursorClient; -import com.google.cloud.pubsublite.internal.wire.SubscriberFactory; -import com.google.common.annotations.VisibleForTesting; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Optional; -import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.sources.v2.reader.InputPartition; -import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReader; -import org.apache.spark.sql.sources.v2.reader.streaming.Offset; -import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset; -import org.apache.spark.sql.types.StructType; - -public class PslContinuousReader implements ContinuousReader { - - private final CursorClient cursorClient; - private final MultiPartitionCommitter committer; - private final PartitionSubscriberFactory partitionSubscriberFactory; - private final SubscriptionPath subscriptionPath; - private final FlowControlSettings flowControlSettings; - private final long topicPartitionCount; - private SparkSourceOffset startOffset; - - @VisibleForTesting - public PslContinuousReader( - CursorClient cursorClient, - MultiPartitionCommitter committer, - PartitionSubscriberFactory partitionSubscriberFactory, - SubscriptionPath subscriptionPath, - FlowControlSettings flowControlSettings, - long topicPartitionCount) { - this.cursorClient = cursorClient; - this.committer = committer; - this.partitionSubscriberFactory = partitionSubscriberFactory; - this.subscriptionPath = subscriptionPath; - this.flowControlSettings = flowControlSettings; - this.topicPartitionCount = topicPartitionCount; - } - - @Override - public Offset mergeOffsets(PartitionOffset[] offsets) { - checkArgument( - SparkPartitionOffset.class.isAssignableFrom(offsets.getClass().getComponentType()), - "PartitionOffset object is not assignable to SparkPartitionOffset."); - return SparkSourceOffset.merge( - Arrays.copyOf(offsets, offsets.length, SparkPartitionOffset[].class)); - } - - @Override - public Offset deserializeOffset(String json) { - return SparkSourceOffset.fromJson(json); - } - - @Override - public Offset getStartOffset() { - return startOffset; - } - - @Override - public void setStartOffset(Optional start) { - if (start.isPresent()) { - checkArgument( - SparkSourceOffset.class.isAssignableFrom(start.get().getClass()), - "start offset is not assignable to PslSourceOffset."); - startOffset = (SparkSourceOffset) start.get(); - return; - } - startOffset = - PslSparkUtils.getSparkStartOffset(cursorClient, subscriptionPath, topicPartitionCount); - } - - @Override - public void commit(Offset end) { - checkArgument( - SparkSourceOffset.class.isAssignableFrom(end.getClass()), - "end offset is not assignable to SparkSourceOffset."); - committer.commit(PslSparkUtils.toPslSourceOffset((SparkSourceOffset) end)); - } - - @Override - public void stop() { - cursorClient.shutdown(); - committer.close(); - } - - @Override - public StructType readSchema() { - return Constants.DEFAULT_SCHEMA; - } - - @Override - public List> planInputPartitions() { - List> list = new ArrayList<>(); - for (SparkPartitionOffset offset : startOffset.getPartitionOffsetMap().values()) { - PartitionSubscriberFactory partitionSubscriberFactory = this.partitionSubscriberFactory; - SubscriberFactory subscriberFactory = - (consumer) -> partitionSubscriberFactory.newSubscriber(offset.partition(), consumer); - list.add( - new PslContinuousInputPartition( - subscriberFactory, - SparkPartitionOffset.builder() - .partition(offset.partition()) - .offset(offset.offset()) - .build(), - subscriptionPath, - flowControlSettings)); - } - return list; - } -} diff --git a/pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/PslCredentialsProvider.java b/pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/PslCredentialsProvider.java deleted file mode 100644 index 6dce52729..000000000 --- a/pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/PslCredentialsProvider.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Copyright 2020 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.pubsublite.spark; - -import com.google.api.client.util.Base64; -import com.google.api.gax.core.CredentialsProvider; -import com.google.auth.Credentials; -import com.google.auth.oauth2.GoogleCredentials; -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.UncheckedIOException; - -public class PslCredentialsProvider implements CredentialsProvider { - - private final Credentials credentials; - - public PslCredentialsProvider(PslDataSourceOptions options) { - if (options.credentialsKey() != null) { - this.credentials = createCredentialsFromKey(options.credentialsKey()); - } else { - this.credentials = createDefaultCredentials(); - } - } - - private static Credentials createCredentialsFromKey(String key) { - try { - return GoogleCredentials.fromStream(new ByteArrayInputStream(Base64.decodeBase64(key))) - .createScoped("https://www.googleapis.com/auth/cloud-platform"); - } catch (IOException e) { - throw new UncheckedIOException("Failed to create Credentials from key", e); - } - } - - public static Credentials createDefaultCredentials() { - try { - return GoogleCredentials.getApplicationDefault() - .createScoped("https://www.googleapis.com/auth/cloud-platform"); - } catch (IOException e) { - throw new UncheckedIOException("Failed to create default Credentials", e); - } - } - - @Override - public Credentials getCredentials() { - return credentials; - } -} diff --git a/pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/PslDataSource.java b/pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/PslDataSource.java deleted file mode 100644 index 0ac687271..000000000 --- a/pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/PslDataSource.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Copyright 2020 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.pubsublite.spark; - -import com.github.benmanes.caffeine.cache.Ticker; -import com.google.auto.service.AutoService; -import com.google.cloud.pubsublite.AdminClient; -import com.google.cloud.pubsublite.PartitionLookupUtils; -import com.google.cloud.pubsublite.SubscriptionPath; -import com.google.cloud.pubsublite.TopicPath; -import com.google.cloud.pubsublite.internal.CursorClient; -import java.util.Objects; -import java.util.Optional; -import org.apache.spark.sql.sources.DataSourceRegister; -import org.apache.spark.sql.sources.v2.ContinuousReadSupport; -import org.apache.spark.sql.sources.v2.DataSourceOptions; -import org.apache.spark.sql.sources.v2.DataSourceV2; -import org.apache.spark.sql.sources.v2.MicroBatchReadSupport; -import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReader; -import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader; -import org.apache.spark.sql.types.StructType; - -@AutoService(DataSourceRegister.class) -public final class PslDataSource - implements DataSourceV2, ContinuousReadSupport, MicroBatchReadSupport, DataSourceRegister { - - @Override - public String shortName() { - return "pubsublite"; - } - - @Override - public ContinuousReader createContinuousReader( - Optional schema, String checkpointLocation, DataSourceOptions options) { - if (schema.isPresent()) { - throw new IllegalArgumentException( - "PubSub Lite uses fixed schema and custom schema is not allowed"); - } - - PslDataSourceOptions pslDataSourceOptions = - PslDataSourceOptions.fromSparkDataSourceOptions(options); - CursorClient cursorClient = pslDataSourceOptions.newCursorClient(); - AdminClient adminClient = pslDataSourceOptions.newAdminClient(); - SubscriptionPath subscriptionPath = pslDataSourceOptions.subscriptionPath(); - long topicPartitionCount = PartitionLookupUtils.numPartitions(subscriptionPath, adminClient); - return new PslContinuousReader( - cursorClient, - pslDataSourceOptions.newMultiPartitionCommitter(topicPartitionCount), - pslDataSourceOptions.getSubscriberFactory(), - subscriptionPath, - Objects.requireNonNull(pslDataSourceOptions.flowControlSettings()), - topicPartitionCount); - } - - @Override - public MicroBatchReader createMicroBatchReader( - Optional schema, String checkpointLocation, DataSourceOptions options) { - if (schema.isPresent()) { - throw new IllegalArgumentException( - "PubSub Lite uses fixed schema and custom schema is not allowed"); - } - - PslDataSourceOptions pslDataSourceOptions = - PslDataSourceOptions.fromSparkDataSourceOptions(options); - CursorClient cursorClient = pslDataSourceOptions.newCursorClient(); - AdminClient adminClient = pslDataSourceOptions.newAdminClient(); - SubscriptionPath subscriptionPath = pslDataSourceOptions.subscriptionPath(); - TopicPath topicPath; - try { - topicPath = TopicPath.parse(adminClient.getSubscription(subscriptionPath).get().getTopic()); - } catch (Throwable t) { - throw new IllegalStateException( - "Unable to get topic for subscription " + subscriptionPath, t); - } - long topicPartitionCount = PartitionLookupUtils.numPartitions(topicPath, adminClient); - return new PslMicroBatchReader( - cursorClient, - pslDataSourceOptions.newMultiPartitionCommitter(topicPartitionCount), - pslDataSourceOptions.getSubscriberFactory(), - new LimitingHeadOffsetReader( - pslDataSourceOptions.newTopicStatsClient(), - topicPath, - topicPartitionCount, - Ticker.systemTicker()), - subscriptionPath, - Objects.requireNonNull(pslDataSourceOptions.flowControlSettings()), - topicPartitionCount); - } -} diff --git a/pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/PslDataSourceOptions.java b/pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/PslDataSourceOptions.java deleted file mode 100644 index 8db019fa2..000000000 --- a/pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/PslDataSourceOptions.java +++ /dev/null @@ -1,209 +0,0 @@ -/* - * Copyright 2020 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.pubsublite.spark; - -import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultSettings; - -import com.google.auto.value.AutoValue; -import com.google.cloud.pubsublite.AdminClient; -import com.google.cloud.pubsublite.AdminClientSettings; -import com.google.cloud.pubsublite.SubscriptionPath; -import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings; -import com.google.cloud.pubsublite.internal.CursorClient; -import com.google.cloud.pubsublite.internal.CursorClientSettings; -import com.google.cloud.pubsublite.internal.TopicStatsClient; -import com.google.cloud.pubsublite.internal.TopicStatsClientSettings; -import com.google.cloud.pubsublite.internal.wire.CommitterBuilder; -import com.google.cloud.pubsublite.internal.wire.PubsubContext; -import com.google.cloud.pubsublite.internal.wire.RoutingMetadata; -import com.google.cloud.pubsublite.internal.wire.ServiceClients; -import com.google.cloud.pubsublite.internal.wire.SubscriberBuilder; -import com.google.cloud.pubsublite.v1.AdminServiceClient; -import com.google.cloud.pubsublite.v1.AdminServiceSettings; -import com.google.cloud.pubsublite.v1.CursorServiceClient; -import com.google.cloud.pubsublite.v1.CursorServiceSettings; -import com.google.cloud.pubsublite.v1.SubscriberServiceClient; -import com.google.cloud.pubsublite.v1.SubscriberServiceSettings; -import com.google.cloud.pubsublite.v1.TopicStatsServiceClient; -import com.google.cloud.pubsublite.v1.TopicStatsServiceSettings; -import java.io.IOException; -import java.io.Serializable; -import java.util.Optional; -import javax.annotation.Nullable; -import org.apache.spark.sql.sources.v2.DataSourceOptions; - -@AutoValue -public abstract class PslDataSourceOptions implements Serializable { - private static final long serialVersionUID = 2680059304693561607L; - - @Nullable - public abstract String credentialsKey(); - - public abstract SubscriptionPath subscriptionPath(); - - @Nullable - public abstract FlowControlSettings flowControlSettings(); - - public abstract long maxBatchOffsetRange(); - - public static Builder builder() { - return new AutoValue_PslDataSourceOptions.Builder() - .setCredentialsKey(null) - // TODO(jiangmichael): Revisit this later about if we need to expose this as a user - // configurable option. Ideally we should expose bytes range/# msgs range not - // offsets range since PSL doesn't guarantee offset = msg. - .setMaxBatchOffsetRange(Constants.DEFAULT_BATCH_OFFSET_RANGE); - } - - public static PslDataSourceOptions fromSparkDataSourceOptions(DataSourceOptions options) { - if (!options.get(Constants.SUBSCRIPTION_CONFIG_KEY).isPresent()) { - throw new IllegalArgumentException(Constants.SUBSCRIPTION_CONFIG_KEY + " is required."); - } - - Builder builder = builder(); - Optional value; - if ((value = options.get(Constants.CREDENTIALS_KEY_CONFIG_KEY)).isPresent()) { - builder.setCredentialsKey(value.get()); - } - return builder - .setSubscriptionPath( - SubscriptionPath.parse(options.get(Constants.SUBSCRIPTION_CONFIG_KEY).get())) - .setFlowControlSettings( - FlowControlSettings.builder() - .setMessagesOutstanding( - options.getLong( - Constants.MESSAGES_OUTSTANDING_CONFIG_KEY, - Constants.DEFAULT_MESSAGES_OUTSTANDING)) - .setBytesOutstanding( - options.getLong( - Constants.BYTES_OUTSTANDING_CONFIG_KEY, - Constants.DEFAULT_BYTES_OUTSTANDING)) - .build()) - .build(); - } - - @AutoValue.Builder - public abstract static class Builder { - - public abstract Builder setCredentialsKey(String credentialsKey); - - public abstract Builder setSubscriptionPath(SubscriptionPath subscriptionPath); - - public abstract Builder setMaxBatchOffsetRange(long maxBatchOffsetRange); - - public abstract Builder setFlowControlSettings(FlowControlSettings flowControlSettings); - - public abstract PslDataSourceOptions build(); - } - - MultiPartitionCommitter newMultiPartitionCommitter(long topicPartitionCount) { - return new MultiPartitionCommitterImpl( - topicPartitionCount, - (partition) -> - CommitterBuilder.newBuilder() - .setSubscriptionPath(this.subscriptionPath()) - .setPartition(partition) - .setServiceClient(newCursorServiceClient()) - .build()); - } - - PartitionSubscriberFactory getSubscriberFactory() { - return (partition, consumer) -> { - PubsubContext context = PubsubContext.of(Constants.FRAMEWORK); - SubscriberServiceSettings.Builder settingsBuilder = - SubscriberServiceSettings.newBuilder() - .setCredentialsProvider(new PslCredentialsProvider(this)); - ServiceClients.addDefaultMetadata( - context, RoutingMetadata.of(this.subscriptionPath(), partition), settingsBuilder); - try { - SubscriberServiceClient serviceClient = - SubscriberServiceClient.create( - addDefaultSettings(this.subscriptionPath().location().region(), settingsBuilder)); - return SubscriberBuilder.newBuilder() - .setSubscriptionPath(this.subscriptionPath()) - .setPartition(partition) - .setContext(context) - .setServiceClient(serviceClient) - .setMessageConsumer(consumer) - .build(); - } catch (IOException e) { - throw new IllegalStateException("Failed to create subscriber service.", e); - } - }; - } - - // TODO(b/jiangmichael): Make XXXClientSettings accept creds so we could simplify below methods. - private CursorServiceClient newCursorServiceClient() { - try { - return CursorServiceClient.create( - addDefaultSettings( - this.subscriptionPath().location().region(), - CursorServiceSettings.newBuilder() - .setCredentialsProvider(new PslCredentialsProvider(this)))); - } catch (IOException e) { - throw new IllegalStateException("Unable to create CursorServiceClient."); - } - } - - CursorClient newCursorClient() { - return CursorClient.create( - CursorClientSettings.newBuilder() - .setRegion(this.subscriptionPath().location().region()) - .setServiceClient(newCursorServiceClient()) - .build()); - } - - private AdminServiceClient newAdminServiceClient() { - try { - return AdminServiceClient.create( - addDefaultSettings( - this.subscriptionPath().location().region(), - AdminServiceSettings.newBuilder() - .setCredentialsProvider(new PslCredentialsProvider(this)))); - } catch (IOException e) { - throw new IllegalStateException("Unable to create AdminServiceClient."); - } - } - - AdminClient newAdminClient() { - return AdminClient.create( - AdminClientSettings.newBuilder() - .setRegion(this.subscriptionPath().location().region()) - .setServiceClient(newAdminServiceClient()) - .build()); - } - - private TopicStatsServiceClient newTopicStatsServiceClient() { - try { - return TopicStatsServiceClient.create( - addDefaultSettings( - this.subscriptionPath().location().region(), - TopicStatsServiceSettings.newBuilder() - .setCredentialsProvider(new PslCredentialsProvider(this)))); - } catch (IOException e) { - throw new IllegalStateException("Unable to create TopicStatsServiceClient."); - } - } - - TopicStatsClient newTopicStatsClient() { - return TopicStatsClient.create( - TopicStatsClientSettings.newBuilder() - .setRegion(this.subscriptionPath().location().region()) - .setServiceClient(newTopicStatsServiceClient()) - .build()); - } -} diff --git a/pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/PslMicroBatchInputPartition.java b/pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/PslMicroBatchInputPartition.java deleted file mode 100644 index 421736808..000000000 --- a/pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/PslMicroBatchInputPartition.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Copyright 2020 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.pubsublite.spark; - -import com.google.cloud.pubsublite.SubscriptionPath; -import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings; -import com.google.cloud.pubsublite.internal.BlockingPullSubscriber; -import com.google.cloud.pubsublite.internal.BlockingPullSubscriberImpl; -import com.google.cloud.pubsublite.internal.CheckedApiException; -import com.google.cloud.pubsublite.internal.wire.SubscriberFactory; -import com.google.cloud.pubsublite.proto.Cursor; -import com.google.cloud.pubsublite.proto.SeekRequest; -import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.sources.v2.reader.InputPartition; -import org.apache.spark.sql.sources.v2.reader.InputPartitionReader; - -public class PslMicroBatchInputPartition implements InputPartition { - - private final SubscriberFactory subscriberFactory; - private final SparkPartitionOffset startOffset; - private final SparkPartitionOffset endOffset; - private final SubscriptionPath subscriptionPath; - private final FlowControlSettings flowControlSettings; - - public PslMicroBatchInputPartition( - SubscriptionPath subscriptionPath, - FlowControlSettings flowControlSettings, - SparkPartitionOffset startOffset, - SparkPartitionOffset endOffset, - SubscriberFactory subscriberFactory) { - this.startOffset = startOffset; - this.endOffset = endOffset; - this.subscriptionPath = subscriptionPath; - this.flowControlSettings = flowControlSettings; - this.subscriberFactory = subscriberFactory; - } - - @Override - public InputPartitionReader createPartitionReader() { - BlockingPullSubscriber subscriber; - try { - subscriber = - new BlockingPullSubscriberImpl( - subscriberFactory, - flowControlSettings, - SeekRequest.newBuilder() - .setCursor( - Cursor.newBuilder() - .setOffset( - PslSparkUtils.toPslPartitionOffset(startOffset).offset().value()) - .build()) - .build()); - } catch (CheckedApiException e) { - throw new IllegalStateException( - "Unable to create PSL subscriber for " + endOffset.partition(), e); - } - return new PslMicroBatchInputPartitionReader(subscriptionPath, endOffset, subscriber); - } -} diff --git a/pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/PslMicroBatchInputPartitionReader.java b/pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/PslMicroBatchInputPartitionReader.java deleted file mode 100644 index d2eebd23c..000000000 --- a/pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/PslMicroBatchInputPartitionReader.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Copyright 2020 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.pubsublite.spark; - -import static com.google.common.base.Preconditions.checkState; - -import com.google.cloud.pubsublite.SequencedMessage; -import com.google.cloud.pubsublite.SubscriptionPath; -import com.google.cloud.pubsublite.internal.BlockingPullSubscriber; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.flogger.GoogleLogger; -import java.time.Duration; -import java.util.Optional; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import javax.annotation.Nullable; -import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.sources.v2.reader.InputPartitionReader; - -public class PslMicroBatchInputPartitionReader implements InputPartitionReader { - private static final GoogleLogger log = GoogleLogger.forEnclosingClass(); - - private static final Duration SUBSCRIBER_PULL_TIMEOUT = Duration.ofSeconds(10); - - private final SubscriptionPath subscriptionPath; - private final SparkPartitionOffset endOffset; - private final BlockingPullSubscriber subscriber; - @Nullable private SequencedMessage currentMsg = null; - private boolean batchFulfilled = false; - - @VisibleForTesting - PslMicroBatchInputPartitionReader( - SubscriptionPath subscriptionPath, - SparkPartitionOffset endOffset, - BlockingPullSubscriber subscriber) { - this.subscriptionPath = subscriptionPath; - this.subscriber = subscriber; - this.endOffset = endOffset; - } - - @Override - public boolean next() { - if (batchFulfilled) { - return false; - } - Optional msg; - while (true) { - try { - subscriber.onData().get(SUBSCRIBER_PULL_TIMEOUT.getSeconds(), TimeUnit.SECONDS); - msg = subscriber.messageIfAvailable(); - break; - } catch (TimeoutException e) { - log.atWarning().log("Unable to get any messages in last " + SUBSCRIBER_PULL_TIMEOUT); - } catch (Throwable t) { - throw new IllegalStateException("Failed to retrieve messages.", t); - } - } - // since next() is only called on one thread at a time, we are sure that the message is - // available to this thread. - checkState(msg.isPresent()); - currentMsg = msg.get(); - if (currentMsg.offset().value() == endOffset.offset()) { - // this is the last msg for the batch. - batchFulfilled = true; - } else if (currentMsg.offset().value() > endOffset.offset()) { - batchFulfilled = true; - return false; - } - return true; - } - - @Override - public InternalRow get() { - checkState(currentMsg != null); - return PslSparkUtils.toInternalRow(currentMsg, subscriptionPath, endOffset.partition()); - } - - @Override - public void close() { - try { - subscriber.close(); - } catch (Exception e) { - log.atWarning().log("Subscriber failed to close."); - } - } -} diff --git a/pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/PslMicroBatchReader.java b/pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/PslMicroBatchReader.java deleted file mode 100644 index 9f71a17c1..000000000 --- a/pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/PslMicroBatchReader.java +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Copyright 2020 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.pubsublite.spark; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkState; - -import com.google.cloud.pubsublite.SubscriptionPath; -import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings; -import com.google.cloud.pubsublite.internal.CursorClient; -import com.google.cloud.pubsublite.internal.wire.SubscriberFactory; -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; -import javax.annotation.Nullable; -import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.sources.v2.reader.InputPartition; -import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader; -import org.apache.spark.sql.sources.v2.reader.streaming.Offset; -import org.apache.spark.sql.types.StructType; - -public class PslMicroBatchReader implements MicroBatchReader { - - private final CursorClient cursorClient; - private final MultiPartitionCommitter committer; - private final PartitionSubscriberFactory partitionSubscriberFactory; - private final PerTopicHeadOffsetReader headOffsetReader; - private final SubscriptionPath subscriptionPath; - private final FlowControlSettings flowControlSettings; - private final long topicPartitionCount; - @Nullable private SparkSourceOffset startOffset = null; - private SparkSourceOffset endOffset; - - public PslMicroBatchReader( - CursorClient cursorClient, - MultiPartitionCommitter committer, - PartitionSubscriberFactory partitionSubscriberFactory, - PerTopicHeadOffsetReader headOffsetReader, - SubscriptionPath subscriptionPath, - FlowControlSettings flowControlSettings, - long topicPartitionCount) { - this.cursorClient = cursorClient; - this.committer = committer; - this.partitionSubscriberFactory = partitionSubscriberFactory; - this.headOffsetReader = headOffsetReader; - this.subscriptionPath = subscriptionPath; - this.flowControlSettings = flowControlSettings; - this.topicPartitionCount = topicPartitionCount; - } - - @Override - public void setOffsetRange(Optional start, Optional end) { - if (start.isPresent()) { - checkArgument( - SparkSourceOffset.class.isAssignableFrom(start.get().getClass()), - "start offset is not assignable to PslSourceOffset."); - startOffset = (SparkSourceOffset) start.get(); - } else { - startOffset = - PslSparkUtils.getSparkStartOffset(cursorClient, subscriptionPath, topicPartitionCount); - } - if (end.isPresent()) { - checkArgument( - SparkSourceOffset.class.isAssignableFrom(end.get().getClass()), - "start offset is not assignable to PslSourceOffset."); - endOffset = (SparkSourceOffset) end.get(); - } else { - endOffset = PslSparkUtils.toSparkSourceOffset(headOffsetReader.getHeadOffset()); - } - } - - @Override - public Offset getStartOffset() { - return startOffset; - } - - @Override - public Offset getEndOffset() { - return endOffset; - } - - @Override - public Offset deserializeOffset(String json) { - return SparkSourceOffset.fromJson(json); - } - - @Override - public void commit(Offset end) { - checkArgument( - SparkSourceOffset.class.isAssignableFrom(end.getClass()), - "end offset is not assignable to SparkSourceOffset."); - committer.commit(PslSparkUtils.toPslSourceOffset((SparkSourceOffset) end)); - } - - @Override - public void stop() { - committer.close(); - } - - @Override - public StructType readSchema() { - return Constants.DEFAULT_SCHEMA; - } - - @Override - public List> planInputPartitions() { - checkState(startOffset != null); - List> list = new ArrayList<>(); - for (SparkPartitionOffset offset : startOffset.getPartitionOffsetMap().values()) { - SparkPartitionOffset endPartitionOffset = - endOffset.getPartitionOffsetMap().get(offset.partition()); - if (offset.equals(endPartitionOffset)) { - // There is no message to pull for this partition. - continue; - } - PartitionSubscriberFactory partitionSubscriberFactory = this.partitionSubscriberFactory; - SubscriberFactory subscriberFactory = - (consumer) -> partitionSubscriberFactory.newSubscriber(offset.partition(), consumer); - list.add( - new PslMicroBatchInputPartition( - subscriptionPath, - flowControlSettings, - offset, - endPartitionOffset, - subscriberFactory)); - } - return list; - } -} diff --git a/pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/PslPartitionOffset.java b/pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/PslPartitionOffset.java deleted file mode 100644 index bd6893c36..000000000 --- a/pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/PslPartitionOffset.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright 2020 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.pubsublite.spark; - -import com.google.auto.value.AutoValue; -import com.google.cloud.pubsublite.Offset; -import com.google.cloud.pubsublite.Partition; -import java.io.Serializable; - -@AutoValue -public abstract class PslPartitionOffset implements Serializable { - private static final long serialVersionUID = -5446439851334065339L; - - public abstract Partition partition(); - - public abstract Offset offset(); - - public static Builder builder() { - return new AutoValue_PslPartitionOffset.Builder(); - } - - @AutoValue.Builder - public abstract static class Builder { - - public abstract Builder partition(Partition partition); - - public abstract Builder offset(Offset offset); - - public abstract PslPartitionOffset build(); - } -} diff --git a/pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/PslSourceOffset.java b/pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/PslSourceOffset.java deleted file mode 100644 index c8b51233f..000000000 --- a/pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/PslSourceOffset.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Copyright 2020 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.pubsublite.spark; - -import com.google.auto.value.AutoValue; -import com.google.cloud.pubsublite.Offset; -import com.google.cloud.pubsublite.Partition; -import java.util.Map; - -@AutoValue -public abstract class PslSourceOffset { - - public abstract Map partitionOffsetMap(); - - public static Builder builder() { - return new AutoValue_PslSourceOffset.Builder(); - } - - @AutoValue.Builder - public abstract static class Builder { - - public abstract Builder partitionOffsetMap(Map partitionOffsetMap); - - public abstract PslSourceOffset build(); - } -} diff --git a/pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/PslSparkUtils.java b/pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/PslSparkUtils.java deleted file mode 100644 index 962d5dc06..000000000 --- a/pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/PslSparkUtils.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Copyright 2020 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.pubsublite.spark; - -import static com.google.common.base.Preconditions.checkArgument; -import static scala.collection.JavaConverters.asScalaBufferConverter; - -import com.google.cloud.pubsublite.Offset; -import com.google.cloud.pubsublite.Partition; -import com.google.cloud.pubsublite.SequencedMessage; -import com.google.cloud.pubsublite.SubscriptionPath; -import com.google.cloud.pubsublite.internal.CursorClient; -import com.google.common.collect.ListMultimap; -import com.google.protobuf.ByteString; -import com.google.protobuf.util.Timestamps; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ExecutionException; -import java.util.stream.Collectors; -import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.catalyst.util.ArrayBasedMapData; -import org.apache.spark.sql.catalyst.util.GenericArrayData; -import org.apache.spark.unsafe.types.ByteArray; -import org.apache.spark.unsafe.types.UTF8String; - -public class PslSparkUtils { - private static ArrayBasedMapData convertAttributesToSparkMap( - ListMultimap attributeMap) { - - List keyList = new ArrayList<>(); - List valueList = new ArrayList<>(); - - attributeMap - .asMap() - .forEach( - (key, value) -> { - keyList.add(UTF8String.fromString(key)); - List attributeVals = - value.stream() - .map(v -> ByteArray.concat(v.toByteArray())) - .collect(Collectors.toList()); - valueList.add(new GenericArrayData(asScalaBufferConverter(attributeVals).asScala())); - }); - - return new ArrayBasedMapData( - new GenericArrayData(asScalaBufferConverter(keyList).asScala()), - new GenericArrayData(asScalaBufferConverter(valueList).asScala())); - } - - public static InternalRow toInternalRow( - SequencedMessage msg, SubscriptionPath subscription, Partition partition) { - List list = - new ArrayList<>( - Arrays.asList( - UTF8String.fromString(subscription.toString()), - partition.value(), - msg.offset().value(), - ByteArray.concat(msg.message().key().toByteArray()), - ByteArray.concat(msg.message().data().toByteArray()), - Timestamps.toMillis(msg.publishTime()), - msg.message().eventTime().isPresent() - ? Timestamps.toMillis(msg.message().eventTime().get()) - : null, - convertAttributesToSparkMap(msg.message().attributes()))); - return InternalRow.apply(asScalaBufferConverter(list).asScala()); - } - - public static SparkSourceOffset toSparkSourceOffset(PslSourceOffset pslSourceOffset) { - return new SparkSourceOffset( - pslSourceOffset.partitionOffsetMap().entrySet().stream() - .collect( - Collectors.toMap( - Map.Entry::getKey, - e -> - SparkPartitionOffset.builder() - .partition(Partition.of(e.getKey().value())) - .offset(e.getValue().value() - 1) - .build()))); - } - - public static PslSourceOffset toPslSourceOffset(SparkSourceOffset sparkSourceOffset) { - long partitionCount = sparkSourceOffset.getPartitionOffsetMap().size(); - Map pslSourceOffsetMap = new HashMap<>(); - for (long i = 0; i < partitionCount; i++) { - Partition p = Partition.of(i); - checkArgument(sparkSourceOffset.getPartitionOffsetMap().containsKey(p)); - pslSourceOffsetMap.put( - p, Offset.of(sparkSourceOffset.getPartitionOffsetMap().get(p).offset() + 1)); - } - return PslSourceOffset.builder().partitionOffsetMap(pslSourceOffsetMap).build(); - } - - public static PslPartitionOffset toPslPartitionOffset(SparkPartitionOffset sparkPartitionOffset) { - return PslPartitionOffset.builder() - .partition(sparkPartitionOffset.partition()) - .offset(Offset.of(sparkPartitionOffset.offset() + 1)) - .build(); - } - - public static SparkSourceOffset getSparkStartOffset( - CursorClient cursorClient, SubscriptionPath subscriptionPath, long topicPartitionCount) { - try { - Map pslSourceOffsetMap = new HashMap<>(); - for (int i = 0; i < topicPartitionCount; i++) { - pslSourceOffsetMap.put(Partition.of(i), com.google.cloud.pubsublite.Offset.of(0)); - } - cursorClient - .listPartitionCursors(subscriptionPath) - .get() - .forEach(pslSourceOffsetMap::replace); - return PslSparkUtils.toSparkSourceOffset( - PslSourceOffset.builder().partitionOffsetMap(pslSourceOffsetMap).build()); - } catch (InterruptedException | ExecutionException e) { - throw new IllegalStateException( - "Failed to get information from PSL and construct startOffset", e); - } - } -} diff --git a/pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/SparkPartitionOffset.java b/pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/SparkPartitionOffset.java deleted file mode 100644 index 4f984b94f..000000000 --- a/pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/SparkPartitionOffset.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright 2020 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.pubsublite.spark; - -import com.google.auto.value.AutoValue; -import com.google.cloud.pubsublite.Partition; -import java.io.Serializable; -import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset; - -@AutoValue -abstract class SparkPartitionOffset implements PartitionOffset, Serializable { - private static final long serialVersionUID = -3398208694782540866L; - - abstract Partition partition(); - - abstract long offset(); - - public static SparkPartitionOffset create(Partition partition, long offset) { - return builder().partition(partition).offset(offset).build(); - } - - public static Builder builder() { - return new AutoValue_SparkPartitionOffset.Builder(); - } - - @AutoValue.Builder - public abstract static class Builder { - public abstract Builder partition(Partition partition); - - public abstract Builder offset(long offset); - - public abstract SparkPartitionOffset build(); - } -} diff --git a/pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/SparkSourceOffset.java b/pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/SparkSourceOffset.java deleted file mode 100644 index 4dcfc87f4..000000000 --- a/pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/SparkSourceOffset.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Copyright 2020 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.pubsublite.spark; - -import static com.google.common.base.Preconditions.checkArgument; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.SerializationFeature; -import com.google.cloud.pubsublite.Partition; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import java.io.IOException; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.Objects; -import java.util.stream.Collectors; - -public final class SparkSourceOffset - extends org.apache.spark.sql.sources.v2.reader.streaming.Offset { - private static final ObjectMapper objectMapper = - new ObjectMapper().configure(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, true); - - // Using a map to ensure unique partitions. - private final ImmutableMap partitionOffsetMap; - - public SparkSourceOffset(Map map) { - validateMap(map); - this.partitionOffsetMap = ImmutableMap.copyOf(map); - } - - private static void validateMap(Map map) { - map.forEach( - (k, v) -> - checkArgument( - Objects.equals(k, v.partition()), - "Key(Partition) and value(SparkPartitionOffset)'s partition don't match.")); - } - - public static SparkSourceOffset merge(SparkSourceOffset o1, SparkSourceOffset o2) { - Map result = new HashMap<>(o1.partitionOffsetMap); - o2.partitionOffsetMap.forEach( - (k, v) -> - result.merge( - k, - v, - (v1, v2) -> - SparkPartitionOffset.builder() - .partition(Partition.of(k.value())) - .offset(Collections.max(ImmutableList.of(v1.offset(), v2.offset()))) - .build())); - return new SparkSourceOffset(result); - } - - public static SparkSourceOffset merge(SparkPartitionOffset[] offsets) { - Map map = new HashMap<>(); - for (SparkPartitionOffset po : offsets) { - checkArgument( - !map.containsKey(po.partition()), "Multiple PslPartitionOffset has same partition."); - map.put( - po.partition(), - SparkPartitionOffset.builder().partition(po.partition()).offset(po.offset()).build()); - } - return new SparkSourceOffset(map); - } - - @SuppressWarnings("unchecked") - public static SparkSourceOffset fromJson(String json) { - Map map; - try { - // TODO: Use TypeReference instead of Map.class, currently TypeReference breaks spark with - // java.lang.LinkageError: loader constraint violation: loader previously initiated loading - // for a different type. - map = objectMapper.readValue(json, Map.class); - } catch (IOException e) { - throw new IllegalStateException("Unable to deserialize PslSourceOffset.", e); - } - Map partitionOffsetMap = - map.entrySet().stream() - .collect( - Collectors.toMap( - e -> Partition.of(Long.parseLong(e.getKey())), - e -> - SparkPartitionOffset.builder() - .partition(Partition.of(Long.parseLong(e.getKey()))) - .offset(e.getValue().longValue()) - .build())); - return new SparkSourceOffset(partitionOffsetMap); - } - - public Map getPartitionOffsetMap() { - return this.partitionOffsetMap; - } - - @Override - public String json() { - try { - Map map = - partitionOffsetMap.entrySet().stream() - .collect(Collectors.toMap(e -> e.getKey().value(), e -> e.getValue().offset())); - return objectMapper.writeValueAsString(map); - } catch (JsonProcessingException e) { - throw new IllegalStateException("Unable to serialize PslSourceOffset.", e); - } - } -} diff --git a/pubsublite-spark-sql-streaming/src/test/java/com/google/cloud/pubsublite/spark/LimitingHeadOffsetReaderTest.java b/pubsublite-spark-sql-streaming/src/test/java/com/google/cloud/pubsublite/spark/LimitingHeadOffsetReaderTest.java deleted file mode 100644 index 0007dd896..000000000 --- a/pubsublite-spark-sql-streaming/src/test/java/com/google/cloud/pubsublite/spark/LimitingHeadOffsetReaderTest.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Copyright 2020 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.pubsublite.spark; - -import static com.google.common.truth.Truth.assertThat; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.reset; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import com.google.api.core.ApiFutures; -import com.google.cloud.pubsublite.Offset; -import com.google.cloud.pubsublite.Partition; -import com.google.cloud.pubsublite.internal.TopicStatsClient; -import com.google.cloud.pubsublite.internal.testing.UnitTestExamples; -import com.google.cloud.pubsublite.proto.Cursor; -import com.google.common.testing.FakeTicker; -import java.util.concurrent.TimeUnit; -import org.junit.Test; - -public class LimitingHeadOffsetReaderTest { - - private final FakeTicker ticker = new FakeTicker(); - private final TopicStatsClient topicStatsClient = mock(TopicStatsClient.class); - private final LimitingHeadOffsetReader reader = - new LimitingHeadOffsetReader( - topicStatsClient, UnitTestExamples.exampleTopicPath(), 1, ticker::read); - - @Test - public void testRead() { - Cursor cursor1 = Cursor.newBuilder().setOffset(10).build(); - Cursor cursor2 = Cursor.newBuilder().setOffset(13).build(); - when(topicStatsClient.computeHeadCursor(UnitTestExamples.exampleTopicPath(), Partition.of(0))) - .thenReturn(ApiFutures.immediateFuture(cursor1)); - assertThat(reader.getHeadOffset().partitionOffsetMap()) - .containsExactly(Partition.of(0), Offset.of(10)); - verify(topicStatsClient).computeHeadCursor(any(), any()); - - reset(topicStatsClient); - ticker.advance(59, TimeUnit.SECONDS); - assertThat(reader.getHeadOffset().partitionOffsetMap()) - .containsExactly(Partition.of(0), Offset.of(cursor1.getOffset())); - verify(topicStatsClient, times(0)).computeHeadCursor(any(), any()); - - reset(topicStatsClient); - ticker.advance(2, TimeUnit.SECONDS); - when(topicStatsClient.computeHeadCursor(UnitTestExamples.exampleTopicPath(), Partition.of(0))) - .thenReturn(ApiFutures.immediateFuture(cursor2)); - assertThat(reader.getHeadOffset().partitionOffsetMap()) - .containsExactly(Partition.of(0), Offset.of(cursor2.getOffset())); - verify(topicStatsClient).computeHeadCursor(any(), any()); - } -} diff --git a/pubsublite-spark-sql-streaming/src/test/java/com/google/cloud/pubsublite/spark/MultiPartitionCommitterImplTest.java b/pubsublite-spark-sql-streaming/src/test/java/com/google/cloud/pubsublite/spark/MultiPartitionCommitterImplTest.java deleted file mode 100644 index a9fbf3a2e..000000000 --- a/pubsublite-spark-sql-streaming/src/test/java/com/google/cloud/pubsublite/spark/MultiPartitionCommitterImplTest.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Copyright 2020 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.pubsublite.spark; - -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.*; - -import com.google.api.core.SettableApiFuture; -import com.google.cloud.pubsublite.*; -import com.google.cloud.pubsublite.internal.wire.Committer; -import com.google.common.collect.ImmutableMap; -import org.junit.Test; - -public class MultiPartitionCommitterImplTest { - - @Test - public void testCommit() { - Committer committer1 = mock(Committer.class); - Committer committer2 = mock(Committer.class); - when(committer1.startAsync()) - .thenReturn(committer1) - .thenThrow(new IllegalStateException("should only init once")); - when(committer2.startAsync()) - .thenReturn(committer2) - .thenThrow(new IllegalStateException("should only init once")); - MultiPartitionCommitterImpl multiCommitter = - new MultiPartitionCommitterImpl( - 2, - (p) -> { - if (p.value() == 0L) { - return committer1; - } else { - return committer2; - } - }); - verify(committer1, times(1)).startAsync(); - verify(committer2, times(1)).startAsync(); - - PslSourceOffset offset = - PslSourceOffset.builder() - .partitionOffsetMap( - ImmutableMap.of( - Partition.of(0), Offset.of(10L), - Partition.of(1), Offset.of(8L))) - .build(); - SettableApiFuture future1 = SettableApiFuture.create(); - SettableApiFuture future2 = SettableApiFuture.create(); - when(committer1.commitOffset(eq(Offset.of(10L)))).thenReturn(future1); - when(committer2.commitOffset(eq(Offset.of(8L)))).thenReturn(future2); - multiCommitter.commit(offset); - verify(committer1, times(1)).commitOffset(eq(Offset.of(10L))); - verify(committer2, times(1)).commitOffset(eq(Offset.of(8L))); - } - - @Test - public void testClose() { - Committer committer = mock(Committer.class); - when(committer.startAsync()) - .thenReturn(committer) - .thenThrow(new IllegalStateException("should only init once")); - MultiPartitionCommitterImpl multiCommitter = - new MultiPartitionCommitterImpl(1, (p) -> committer); - - PslSourceOffset offset = - PslSourceOffset.builder() - .partitionOffsetMap(ImmutableMap.of(Partition.of(0), Offset.of(10L))) - .build(); - SettableApiFuture future1 = SettableApiFuture.create(); - when(committer.commitOffset(eq(Offset.of(10L)))).thenReturn(future1); - when(committer.stopAsync()).thenReturn(committer); - multiCommitter.commit(offset); - - multiCommitter.close(); - verify(committer, times(1)).stopAsync(); - } -} diff --git a/pubsublite-spark-sql-streaming/src/test/java/com/google/cloud/pubsublite/spark/PslContinuousInputPartitionReaderTest.java b/pubsublite-spark-sql-streaming/src/test/java/com/google/cloud/pubsublite/spark/PslContinuousInputPartitionReaderTest.java deleted file mode 100644 index 1e6ee0dcf..000000000 --- a/pubsublite-spark-sql-streaming/src/test/java/com/google/cloud/pubsublite/spark/PslContinuousInputPartitionReaderTest.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Copyright 2020 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.pubsublite.spark; - -import static com.google.common.truth.Truth.assertThat; -import static org.mockito.Mockito.*; - -import com.google.api.core.ApiFutures; -import com.google.cloud.pubsublite.*; -import com.google.cloud.pubsublite.internal.BlockingPullSubscriberImpl; -import com.google.cloud.pubsublite.internal.testing.UnitTestExamples; -import com.google.protobuf.ByteString; -import com.google.protobuf.util.Timestamps; -import java.util.Optional; -import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousInputPartitionReader; -import org.junit.Test; - -public class PslContinuousInputPartitionReaderTest { - - private final BlockingPullSubscriberImpl subscriber = mock(BlockingPullSubscriberImpl.class); - private ContinuousInputPartitionReader reader; - - private static SequencedMessage newMessage(long offset) { - return SequencedMessage.of( - Message.builder().setData(ByteString.copyFromUtf8("text")).build(), - Timestamps.EPOCH, - Offset.of(offset), - 10000); - } - - private static void verifyInternalRow(InternalRow row, long expectedOffset) { - assertThat(row.getString(0)).isEqualTo(UnitTestExamples.exampleSubscriptionPath().toString()); - assertThat(row.getLong(1)).isEqualTo(UnitTestExamples.examplePartition().value()); - assertThat(row.getLong(2)).isEqualTo(expectedOffset); - } - - private void createReader() { - reader = - new PslContinuousInputPartitionReader( - UnitTestExamples.exampleSubscriptionPath(), - SparkPartitionOffset.builder() - .partition(UnitTestExamples.examplePartition()) - .offset(UnitTestExamples.exampleOffset().value()) - .build(), - subscriber); - } - - @Test - public void testPartitionReader() throws Exception { - createReader(); - SequencedMessage message1 = newMessage(10); - SequencedMessage message2 = newMessage(13); - - // Multiple get w/o next will return same msg. - when(subscriber.onData()).thenReturn(ApiFutures.immediateFuture(null)); - when(subscriber.messageIfAvailable()).thenReturn(Optional.of(message1)); - assertThat(reader.next()).isTrue(); - verifyInternalRow(reader.get(), 10L); - verifyInternalRow(reader.get(), 10L); - assertThat(((SparkPartitionOffset) reader.getOffset()).offset()).isEqualTo(10L); - - // Next will advance to next msg. - when(subscriber.onData()).thenReturn(ApiFutures.immediateFuture(null)); - when(subscriber.messageIfAvailable()).thenReturn(Optional.of(message2)); - assertThat(reader.next()).isTrue(); - verifyInternalRow(reader.get(), 13L); - assertThat(((SparkPartitionOffset) reader.getOffset()).offset()).isEqualTo(13L); - } -} diff --git a/pubsublite-spark-sql-streaming/src/test/java/com/google/cloud/pubsublite/spark/PslContinuousReaderTest.java b/pubsublite-spark-sql-streaming/src/test/java/com/google/cloud/pubsublite/spark/PslContinuousReaderTest.java deleted file mode 100644 index b4982caa8..000000000 --- a/pubsublite-spark-sql-streaming/src/test/java/com/google/cloud/pubsublite/spark/PslContinuousReaderTest.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Copyright 2020 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.pubsublite.spark; - -import static com.google.common.truth.Truth.assertThat; -import static org.mockito.Mockito.*; -import static org.mockito.Mockito.mock; - -import com.google.api.core.ApiFutures; -import com.google.cloud.pubsublite.*; -import com.google.cloud.pubsublite.internal.CursorClient; -import com.google.cloud.pubsublite.internal.testing.UnitTestExamples; -import com.google.common.collect.ImmutableMap; -import java.util.Optional; -import org.junit.Test; - -public class PslContinuousReaderTest { - - private static final PslDataSourceOptions OPTIONS = - PslDataSourceOptions.builder() - .setSubscriptionPath(UnitTestExamples.exampleSubscriptionPath()) - .build(); - private final CursorClient cursorClient = mock(CursorClient.class); - private final MultiPartitionCommitter committer = mock(MultiPartitionCommitter.class); - private final PartitionSubscriberFactory partitionSubscriberFactory = - mock(PartitionSubscriberFactory.class); - private final PslContinuousReader reader = - new PslContinuousReader( - cursorClient, - committer, - partitionSubscriberFactory, - UnitTestExamples.exampleSubscriptionPath(), - OPTIONS.flowControlSettings(), - 2); - - @Test - public void testEmptyStartOffset() { - when(cursorClient.listPartitionCursors(UnitTestExamples.exampleSubscriptionPath())) - .thenReturn( - ApiFutures.immediateFuture( - ImmutableMap.of(Partition.of(1L), UnitTestExamples.exampleOffset()))); - - reader.setStartOffset(Optional.empty()); - assertThat(((SparkSourceOffset) reader.getStartOffset()).getPartitionOffsetMap()) - .containsExactly( - Partition.of(0L), - SparkPartitionOffset.builder().partition(Partition.of(0L)).offset(-1L).build(), - Partition.of(1L), - SparkPartitionOffset.builder() - .partition(Partition.of(1L)) - .offset(UnitTestExamples.exampleOffset().value() - 1) - .build()); - } - - @Test - public void testValidStartOffset() { - SparkSourceOffset offset = - new SparkSourceOffset( - ImmutableMap.of( - Partition.of(1), - SparkPartitionOffset.builder() - .partition(Partition.of(1)) - .offset(UnitTestExamples.exampleOffset().value()) - .build())); - reader.setStartOffset(Optional.of(offset)); - assertThat(reader.getStartOffset()).isEqualTo(offset); - } - - @Test - public void testMergeOffsets() { - SparkPartitionOffset po1 = - SparkPartitionOffset.builder().partition(Partition.of(1L)).offset(10L).build(); - SparkPartitionOffset po2 = - SparkPartitionOffset.builder().partition(Partition.of(2L)).offset(5L).build(); - assertThat(reader.mergeOffsets(new SparkPartitionOffset[] {po1, po2})) - .isEqualTo(SparkSourceOffset.merge(new SparkPartitionOffset[] {po1, po2})); - } - - @Test - public void testDeserializeOffset() { - SparkSourceOffset offset = - new SparkSourceOffset( - ImmutableMap.of( - Partition.of(1L), - SparkPartitionOffset.builder().partition(Partition.of(1L)).offset(10L).build())); - assertThat(reader.deserializeOffset(offset.json())).isEqualTo(offset); - } - - @Test - public void testCommit() { - SparkSourceOffset offset = - new SparkSourceOffset( - ImmutableMap.of( - Partition.of(0L), - SparkPartitionOffset.builder().partition(Partition.of(0L)).offset(10L).build(), - Partition.of(1L), - SparkPartitionOffset.builder() - .partition(Partition.of(1L)) - .offset(50L) - .build())); - PslSourceOffset expectedCommitOffset = - PslSourceOffset.builder() - .partitionOffsetMap( - ImmutableMap.of( - Partition.of(0L), Offset.of(11L), - Partition.of(1L), Offset.of(51L))) - .build(); - reader.commit(offset); - verify(committer, times(1)).commit(eq(expectedCommitOffset)); - } -} diff --git a/pubsublite-spark-sql-streaming/src/test/java/com/google/cloud/pubsublite/spark/PslMicroBatchInputPartitionReaderTest.java b/pubsublite-spark-sql-streaming/src/test/java/com/google/cloud/pubsublite/spark/PslMicroBatchInputPartitionReaderTest.java deleted file mode 100644 index 935e7a63c..000000000 --- a/pubsublite-spark-sql-streaming/src/test/java/com/google/cloud/pubsublite/spark/PslMicroBatchInputPartitionReaderTest.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Copyright 2020 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.pubsublite.spark; - -import static com.google.common.truth.Truth.assertThat; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import com.google.api.core.ApiFutures; -import com.google.cloud.pubsublite.Message; -import com.google.cloud.pubsublite.Offset; -import com.google.cloud.pubsublite.SequencedMessage; -import com.google.cloud.pubsublite.internal.BlockingPullSubscriberImpl; -import com.google.cloud.pubsublite.internal.testing.UnitTestExamples; -import com.google.protobuf.ByteString; -import com.google.protobuf.util.Timestamps; -import java.util.Optional; -import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.sources.v2.reader.InputPartitionReader; -import org.junit.Test; - -public class PslMicroBatchInputPartitionReaderTest { - - private final BlockingPullSubscriberImpl subscriber = mock(BlockingPullSubscriberImpl.class); - private InputPartitionReader reader; - - private static SequencedMessage newMessage(long offset) { - return SequencedMessage.of( - Message.builder().setData(ByteString.copyFromUtf8("text")).build(), - Timestamps.EPOCH, - Offset.of(offset), - 10000); - } - - private static void verifyInternalRow(InternalRow row, long expectedOffset) { - assertThat(row.getString(0)).isEqualTo(UnitTestExamples.exampleSubscriptionPath().toString()); - assertThat(row.getLong(1)).isEqualTo(UnitTestExamples.examplePartition().value()); - assertThat(row.getLong(2)).isEqualTo(expectedOffset); - } - - private void createReader(long endOffset) { - reader = - new PslMicroBatchInputPartitionReader( - UnitTestExamples.exampleSubscriptionPath(), - SparkPartitionOffset.builder() - .partition(UnitTestExamples.examplePartition()) - .offset(endOffset) - .build(), - subscriber); - } - - @Test - public void testPartitionReader() throws Exception { - long endOffset = 14L; - createReader(endOffset); - SequencedMessage message1 = newMessage(10L); - SequencedMessage message2 = newMessage(endOffset); - - // Multiple get w/o next will return same msg. - when(subscriber.onData()).thenReturn(ApiFutures.immediateFuture(null)); - when(subscriber.messageIfAvailable()).thenReturn(Optional.of(message1)); - assertThat(reader.next()).isTrue(); - verifyInternalRow(reader.get(), 10L); - verifyInternalRow(reader.get(), 10L); - - // Next will advance to next msg which is also the last msg in the batch. - when(subscriber.onData()).thenReturn(ApiFutures.immediateFuture(null)); - when(subscriber.messageIfAvailable()).thenReturn(Optional.of(message2)); - assertThat(reader.next()).isTrue(); - verifyInternalRow(reader.get(), 14L); - - // Now it already reached the end of the batch - assertThat(reader.next()).isFalse(); - } - - @Test - public void testPartitionReaderNewMessageExceedsRange() throws Exception { - long endOffset = 14L; - createReader(endOffset); - SequencedMessage message1 = newMessage(10L); - SequencedMessage message2 = newMessage(endOffset + 1); - - // Multiple get w/o next will return same msg. - when(subscriber.onData()).thenReturn(ApiFutures.immediateFuture(null)); - when(subscriber.messageIfAvailable()).thenReturn(Optional.of(message1)); - assertThat(reader.next()).isTrue(); - verifyInternalRow(reader.get(), 10L); - verifyInternalRow(reader.get(), 10L); - - // Next will advance to next msg, and recognize it's out of the batch range. - when(subscriber.onData()).thenReturn(ApiFutures.immediateFuture(null)); - when(subscriber.messageIfAvailable()).thenReturn(Optional.of(message2)); - assertThat(reader.next()).isFalse(); - } -} diff --git a/pubsublite-spark-sql-streaming/src/test/java/com/google/cloud/pubsublite/spark/PslMicroBatchReaderTest.java b/pubsublite-spark-sql-streaming/src/test/java/com/google/cloud/pubsublite/spark/PslMicroBatchReaderTest.java deleted file mode 100644 index c8f644088..000000000 --- a/pubsublite-spark-sql-streaming/src/test/java/com/google/cloud/pubsublite/spark/PslMicroBatchReaderTest.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Copyright 2020 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.pubsublite.spark; - -import static com.google.common.truth.Truth.assertThat; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.*; -import static org.mockito.Mockito.times; - -import com.google.api.core.ApiFutures; -import com.google.cloud.pubsublite.Offset; -import com.google.cloud.pubsublite.Partition; -import com.google.cloud.pubsublite.internal.CursorClient; -import com.google.cloud.pubsublite.internal.testing.UnitTestExamples; -import com.google.common.collect.ImmutableMap; -import java.util.Optional; -import org.junit.Test; - -public class PslMicroBatchReaderTest { - private static final PslDataSourceOptions OPTIONS = - PslDataSourceOptions.builder() - .setSubscriptionPath(UnitTestExamples.exampleSubscriptionPath()) - .build(); - private final CursorClient cursorClient = mock(CursorClient.class); - private final MultiPartitionCommitter committer = mock(MultiPartitionCommitter.class); - private final PartitionSubscriberFactory partitionSubscriberFactory = - mock(PartitionSubscriberFactory.class); - private final PerTopicHeadOffsetReader headOffsetReader = mock(PerTopicHeadOffsetReader.class); - private final PslMicroBatchReader reader = - new PslMicroBatchReader( - cursorClient, - committer, - partitionSubscriberFactory, - headOffsetReader, - UnitTestExamples.exampleSubscriptionPath(), - OPTIONS.flowControlSettings(), - 2); - - private PslSourceOffset createPslSourceOffsetTwoPartition(long offset0, long offset1) { - return PslSourceOffset.builder() - .partitionOffsetMap( - ImmutableMap.of( - Partition.of(0L), Offset.of(offset0), Partition.of(1L), Offset.of(offset1))) - .build(); - } - - private SparkSourceOffset createSparkSourceOffsetTwoPartition(long offset0, long offset1) { - return new SparkSourceOffset( - ImmutableMap.of( - Partition.of(0L), - SparkPartitionOffset.create(Partition.of(0L), offset0), - Partition.of(1L), - SparkPartitionOffset.create(Partition.of(1L), offset1))); - } - - @Test - public void testEmptyOffsets() { - when(cursorClient.listPartitionCursors(UnitTestExamples.exampleSubscriptionPath())) - .thenReturn(ApiFutures.immediateFuture(ImmutableMap.of(Partition.of(0L), Offset.of(100L)))); - when(headOffsetReader.getHeadOffset()).thenReturn(createPslSourceOffsetTwoPartition(301L, 0L)); - reader.setOffsetRange(Optional.empty(), Optional.empty()); - assertThat(((SparkSourceOffset) reader.getStartOffset()).getPartitionOffsetMap()) - .containsExactly( - Partition.of(0L), - SparkPartitionOffset.create(Partition.of(0L), 99L), - Partition.of(1L), - SparkPartitionOffset.create(Partition.of(1L), -1L)); - assertThat(((SparkSourceOffset) reader.getEndOffset()).getPartitionOffsetMap()) - .containsExactly( - Partition.of(0L), - SparkPartitionOffset.create(Partition.of(0L), 300L), - Partition.of(1L), - SparkPartitionOffset.create(Partition.of(1L), -1L)); - } - - @Test - public void testValidOffsets() { - SparkSourceOffset startOffset = createSparkSourceOffsetTwoPartition(10L, 100L); - SparkSourceOffset endOffset = createSparkSourceOffsetTwoPartition(20L, 300L); - reader.setOffsetRange(Optional.of(startOffset), Optional.of(endOffset)); - assertThat(reader.getStartOffset()).isEqualTo(startOffset); - assertThat(reader.getEndOffset()).isEqualTo(endOffset); - } - - @Test - public void testDeserializeOffset() { - SparkSourceOffset offset = - new SparkSourceOffset( - ImmutableMap.of(Partition.of(1L), SparkPartitionOffset.create(Partition.of(1L), 10L))); - assertThat(reader.deserializeOffset(offset.json())).isEqualTo(offset); - } - - @Test - public void testCommit() { - SparkSourceOffset offset = createSparkSourceOffsetTwoPartition(10L, 50L); - PslSourceOffset expectedCommitOffset = createPslSourceOffsetTwoPartition(11L, 51L); - reader.commit(offset); - verify(committer, times(1)).commit(eq(expectedCommitOffset)); - } - - @Test - public void testPlanInputPartitionNoMessage() { - SparkSourceOffset startOffset = createSparkSourceOffsetTwoPartition(10L, 100L); - SparkSourceOffset endOffset = createSparkSourceOffsetTwoPartition(20L, 100L); - reader.setOffsetRange(Optional.of(startOffset), Optional.of(endOffset)); - assertThat(reader.planInputPartitions()).hasSize(1); - } -} diff --git a/pubsublite-spark-sql-streaming/src/test/java/com/google/cloud/pubsublite/spark/PslSparkUtilsTest.java b/pubsublite-spark-sql-streaming/src/test/java/com/google/cloud/pubsublite/spark/PslSparkUtilsTest.java deleted file mode 100644 index 5dbdf65ec..000000000 --- a/pubsublite-spark-sql-streaming/src/test/java/com/google/cloud/pubsublite/spark/PslSparkUtilsTest.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Copyright 2020 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.pubsublite.spark; - -import static com.google.common.truth.Truth.assertThat; - -import com.google.cloud.pubsublite.Message; -import com.google.cloud.pubsublite.Offset; -import com.google.cloud.pubsublite.Partition; -import com.google.cloud.pubsublite.SequencedMessage; -import com.google.cloud.pubsublite.internal.testing.UnitTestExamples; -import com.google.common.collect.ImmutableListMultimap; -import com.google.common.collect.ImmutableMap; -import com.google.protobuf.ByteString; -import com.google.protobuf.Timestamp; -import org.junit.Test; - -public class PslSparkUtilsTest { - - @Test - public void testToInternalRow() { - Message message = - Message.builder() - .setKey(ByteString.copyFromUtf8("key")) - .setData(ByteString.copyFromUtf8("data")) - .setEventTime(Timestamp.newBuilder().setSeconds(10000000L).setNanos(10).build()) - .setAttributes( - ImmutableListMultimap.of( - "key1", ByteString.copyFromUtf8("val1"), - "key1", ByteString.copyFromUtf8("val2"), - "key2", ByteString.copyFromUtf8("val3"))) - .build(); - SequencedMessage sequencedMessage = - SequencedMessage.of( - message, - Timestamp.newBuilder().setSeconds(10000000L).setNanos(10).build(), - Offset.of(10L), - 10L); - PslSparkUtils.toInternalRow( - sequencedMessage, - UnitTestExamples.exampleSubscriptionPath(), - UnitTestExamples.examplePartition()); - } - - @Test - public void testSourceOffsetConversion() { - PslSourceOffset pslSourceOffset = - PslSourceOffset.builder() - .partitionOffsetMap( - ImmutableMap.of(Partition.of(0L), Offset.of(10), Partition.of(1L), Offset.of(50))) - .build(); - - SparkSourceOffset sparkSourceOffset = PslSparkUtils.toSparkSourceOffset(pslSourceOffset); - assertThat(sparkSourceOffset.getPartitionOffsetMap().get(Partition.of(0L)).offset()) - .isEqualTo(9L); - assertThat(sparkSourceOffset.getPartitionOffsetMap().get(Partition.of(1L)).offset()) - .isEqualTo(49L); - - assertThat(PslSparkUtils.toPslSourceOffset(sparkSourceOffset)).isEqualTo(pslSourceOffset); - } - - @Test - public void testToPslPartitionOffset() { - SparkPartitionOffset sparkPartitionOffset = - SparkPartitionOffset.builder().partition(Partition.of(1L)).offset(10L).build(); - PslPartitionOffset pslPartitionOffset = - PslPartitionOffset.builder().partition(Partition.of(1L)).offset(Offset.of(11L)).build(); - assertThat(PslSparkUtils.toPslPartitionOffset(sparkPartitionOffset)) - .isEqualTo(pslPartitionOffset); - } -} diff --git a/pubsublite-spark-sql-streaming/src/test/java/com/google/cloud/pubsublite/spark/SparkSourceOffsetTest.java b/pubsublite-spark-sql-streaming/src/test/java/com/google/cloud/pubsublite/spark/SparkSourceOffsetTest.java deleted file mode 100644 index 78010e07d..000000000 --- a/pubsublite-spark-sql-streaming/src/test/java/com/google/cloud/pubsublite/spark/SparkSourceOffsetTest.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Copyright 2020 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.pubsublite.spark; - -import static com.google.common.truth.Truth.assertThat; -import static org.junit.Assert.fail; - -import com.google.cloud.pubsublite.Partition; -import com.google.common.collect.ImmutableMap; -import org.junit.Test; - -public class SparkSourceOffsetTest { - - @Test - public void roundTrip() { - SparkSourceOffset offset = - new SparkSourceOffset( - ImmutableMap.of( - // Intentionally unsorted, the serialization should make it sorted. - Partition.of(3L), - SparkPartitionOffset.builder().partition(Partition.of(3L)).offset(10L).build(), - Partition.of(1L), - SparkPartitionOffset.builder().partition(Partition.of(1L)).offset(5L).build(), - Partition.of(2L), - SparkPartitionOffset.builder().partition(Partition.of(2L)).offset(8L).build())); - assertThat(offset.json()).isEqualTo("{\"1\":5,\"2\":8,\"3\":10}"); - assertThat(SparkSourceOffset.fromJson(offset.json())).isEqualTo(offset); - } - - @Test - public void mergeSparkSourceOffsets() { - SparkSourceOffset o1 = - new SparkSourceOffset( - ImmutableMap.of( - Partition.of(1L), - SparkPartitionOffset.builder().partition(Partition.of(1L)).offset(10L).build(), - Partition.of(2L), - SparkPartitionOffset.builder().partition(Partition.of(2L)).offset(8L).build(), - Partition.of(3L), - SparkPartitionOffset.builder() - .partition(Partition.of(3L)) - .offset(10L) - .build())); - SparkSourceOffset o2 = - new SparkSourceOffset( - ImmutableMap.of( - Partition.of(2L), - SparkPartitionOffset.builder().partition(Partition.of(2L)).offset(8L).build(), - Partition.of(3L), - SparkPartitionOffset.builder().partition(Partition.of(3L)).offset(11L).build(), - Partition.of(4L), - SparkPartitionOffset.builder().partition(Partition.of(4L)).offset(1L).build())); - SparkSourceOffset expected = - new SparkSourceOffset( - ImmutableMap.of( - Partition.of(1L), - SparkPartitionOffset.builder().partition(Partition.of(1L)).offset(10L).build(), - Partition.of(2L), - SparkPartitionOffset.builder().partition(Partition.of(2L)).offset(8L).build(), - Partition.of(3L), - SparkPartitionOffset.builder().partition(Partition.of(3L)).offset(11L).build(), - Partition.of(4L), - SparkPartitionOffset.builder().partition(Partition.of(4L)).offset(1L).build())); - assertThat(SparkSourceOffset.merge(o1, o2)).isEqualTo(expected); - } - - @Test - public void mergeSparkPartitionOffsetsDuplicatePartition() { - SparkPartitionOffset[] offsets = { - SparkPartitionOffset.builder().partition(Partition.of(1L)).offset(5L).build(), - SparkPartitionOffset.builder().partition(Partition.of(1L)).offset(4L).build(), - SparkPartitionOffset.builder().partition(Partition.of(3L)).offset(10L).build() - }; - try { - SparkSourceOffset.merge(offsets); - fail(); - } catch (IllegalArgumentException e) { - assertThat(e).hasMessageThat().contains("same partition"); - } - } - - @Test - public void mergeSparkPartitionOffsets() { - SparkPartitionOffset[] offsets = { - SparkPartitionOffset.builder().partition(Partition.of(1L)).offset(5L).build(), - SparkPartitionOffset.builder().partition(Partition.of(2L)).offset(4L).build(), - SparkPartitionOffset.builder().partition(Partition.of(3L)).offset(10L).build() - }; - SparkSourceOffset expected = - new SparkSourceOffset( - ImmutableMap.of( - Partition.of(1L), - SparkPartitionOffset.builder().partition(Partition.of(1L)).offset(5L).build(), - Partition.of(2L), - SparkPartitionOffset.builder().partition(Partition.of(2L)).offset(4L).build(), - Partition.of(3L), - SparkPartitionOffset.builder() - .partition(Partition.of(3L)) - .offset(10L) - .build())); - assertThat(SparkSourceOffset.merge(offsets)).isEqualTo(expected); - } - - @Test - public void invalidMap() { - try { - new SparkSourceOffset( - ImmutableMap.of( - Partition.of(3L), - SparkPartitionOffset.builder().partition(Partition.of(2L)).offset(10L).build())); - fail(); - } catch (IllegalArgumentException e) { - assertThat(e).hasMessageThat().contains("don't match"); - } - } -}