From 98f5863245584bf517d4817610dcca0c3979a470 Mon Sep 17 00:00:00 2001
From: jiangmichaellll <40044148+jiangmichaellll@users.noreply.github.com>
Date: Tue, 6 Apr 2021 15:54:00 -0400
Subject: [PATCH] samples: Spark connector writer support sample and
integration test (#122)
---
samples/README.md | 52 ++++--
.../java/pubsublite/spark/AdminUtils.java | 79 ++++++++-
.../java/pubsublite/spark/PublishWords.java | 38 +++--
.../java/pubsublite/spark/ReadResults.java | 55 +++++++
.../main/java/pubsublite/spark/WordCount.java | 31 +++-
.../spark/SampleIntegrationTest.java | 151 +++++++++++-------
.../spark/PslWriteDataSourceOptions.java | 2 +
7 files changed, 314 insertions(+), 94 deletions(-)
create mode 100644 samples/snippets/src/main/java/pubsublite/spark/ReadResults.java
diff --git a/samples/README.md b/samples/README.md
index a8747527..3091db75 100644
--- a/samples/README.md
+++ b/samples/README.md
@@ -1,6 +1,9 @@
# Pub/Sub Lite Spark Connector Word Count Samples
-This directory contains a word count sample for Pub/Sub Lite Spark Connector.
+This directory contains a word count sample for Pub/Sub Lite Spark Connector. The sample will read
+single word count messages from Pub/Sub Lite, do the aggregation (count words) in Spark, and finally
+write back to Pub/Sub Lite. Note the topic/subscription to read is different from the topic/subscription
+to write and verify the final word count results.
## Authentication
@@ -8,17 +11,20 @@ Please see the [Google cloud authentication guide](https://cloud.google.com/docs
The recommended approach is to use Application Default Credentials by setting `GOOGLE_APPLICATION_CREDENTIALS`.
## Environment Variables
-Set the following environment variables:
+Set the following environment variables.
+Note `SOURCE_TOPIC_ID` and `SOURCE_SUBSCRIPTION_ID` are used to read _raw_ single word count messages;
+while `DESTINATION_TOPIC_ID` and `DESTINATION_SUBSCRIPTION_ID` are used for the final word counts results. They must
+be different.
```
PROJECT_NUMBER=12345 # or your project number
REGION=us-central1 # or your region
ZONE_ID=b # or your zone id
-TOPIC_ID=test-topic # or your topic id to create
-SUBSCRIPTION_ID=test-subscrciption # or your subscription to create
-PARTITIONS=1 # or your number of partitions to create
+SOURCE_TOPIC_ID=test-topic # or your topic id to create
+SOURCE_SUBSCRIPTION_ID=test-subscription # or your subscription to create
+DESTINATION_TOPIC_ID=test-topic-2 # or your topic id to create, this is different from SOURCE_TOPIC_ID!
+DESTINATION_SUBSCRIPTION_ID=test-subscription-2 # or your subscription to create, this is different from SOURCE_SUBSCRIPTION_ID!
CLUSTER_NAME=waprin-spark7 # or your Dataproc cluster name to create
BUCKET=gs://your-gcs-bucket
-SUBSCRIPTION_PATH=projects/$PROJECT_NUMBER/locations/$REGION-$ZONE_ID/subscriptions/$SUBSCRIPTION_ID
CONNECTOR_VERSION= # latest pubsublite-spark-sql-streaming release version
PUBSUBLITE_SPARK_SQL_STREAMING_JAR_LOCATION= # downloaded pubsublite-spark-sql-streaming-$CONNECTOR_VERSION-with-dependencies jar location
```
@@ -36,14 +42,16 @@ To run the word count sample in Dataproc cluster, follow the steps:
--non-recursive \
exec:exec)
```
-3. Create the topic and subscription, and publish word count messages to the topic.
+3. Create both the source and destination topics and subscriptions, and publish word count messages to the _source_
+ topic.
```sh
PROJECT_NUMBER=$PROJECT_NUMBER \
REGION=$REGION \
ZONE_ID=$ZONE_ID \
- TOPIC_ID=$TOPIC_ID \
- SUBSCRIPTION_ID=$SUBSCRIPTION_ID \
- PARTITIONS=$PARTITIONS \
+ SOURCE_TOPIC_ID=$SOURCE_TOPIC_ID \
+ SOURCE_SUBSCRIPTION_ID=$SOURCE_SUBSCRIPTION_ID \
+ DESTINATION_TOPIC_ID=$DESTINATION_TOPIC_ID \
+ DESTINATION_SUBSCRIPTION_ID=$DESTINATION_SUBSCRIPTION_ID \
mvn compile exec:java -Dexec.mainClass=pubsublite.spark.PublishWords
```
4. Create a Dataproc cluster
@@ -54,8 +62,7 @@ To run the word count sample in Dataproc cluster, follow the steps:
```sh
mvn clean package -Dmaven.test.skip=true
```
-
-6. Download `pubsublite-spark-sql-streaming-$CONNECTOR_VERSION-with-dependencies.jar` from Maven Central and set `PUBSUBLITE_SPARK_SQL_STREAMING_JAR_LOCATION` environment variable.
+6. Download `pubsublite-spark-sql-streaming-$CONNECTOR_VERSION-with-dependencies.jar` from [Maven Central](https://search.maven.org/artifact/com.google.cloud/pubsublite-spark-sql-streaming) and set `PUBSUBLITE_SPARK_SQL_STREAMING_JAR_LOCATION` environment variable.
7. Create GCS bucket and upload both `pubsublite-spark-sql-streaming-$CONNECTOR_VERSION-with-dependencies.jar` and the sample jar onto GCS
```sh
gsutil mb $BUCKET
@@ -66,19 +73,30 @@ To run the word count sample in Dataproc cluster, follow the steps:
```sh
gcloud config set dataproc/region $REGION
```
-
-9. Run the sample in Dataproc. You would see the word count result show up in the console output.
+9. Run the sample in Dataproc. This will perform word count aggregation and publish word count results to Pub/Sub Lite.
```sh
gcloud dataproc jobs submit spark --cluster=$CLUSTER_NAME \
--jars=$BUCKET/pubsublite-spark-snippets-$SAMPLE_VERSION.jar,$BUCKET/pubsublite-spark-sql-streaming-$CONNECTOR_VERSION-with-dependencies.jar \
- --class=pubsublite.spark.WordCount -- $SUBSCRIPTION_PATH
+ --class=pubsublite.spark.WordCount -- \
+ projects/$PROJECT_NUMBER/locations/$REGION-$ZONE_ID/subscriptions/$SOURCE_SUBSCRIPTION_ID \
+ projects/$PROJECT_NUMBER/locations/$REGION-$ZONE_ID/topics/$DESTINATION_TOPIC_ID
```
+10. Read word count results from Pub/Sub Lite, you should see the result in console output.
+ ```sh
+ PROJECT_NUMBER=$PROJECT_NUMBER \
+ REGION=$REGION \
+ ZONE_ID=$ZONE_ID \
+ DESTINATION_SUBSCRIPTION_ID=$DESTINATION_SUBSCRIPTION_ID \
+ mvn compile exec:java -Dexec.mainClass=pubsublite.spark.ReadResults
+ ```
## Cleaning up
1. Delete Pub/Sub Lite topic and subscription.
```sh
- gcloud pubsub lite-subscriptions delete $SUBSCRIPTION_ID --zone=$REGION-$ZONE_ID
- gcloud pubsub lite-topics delete $TOPIC_ID --zone=$REGION-$ZONE_ID
+ gcloud pubsub lite-subscriptions delete $SOURCE_SUBSCRIPTION_ID --zone=$REGION-$ZONE_ID
+ gcloud pubsub lite-topics delete $SOURCE_TOPIC_ID --zone=$REGION-$ZONE_ID
+ gcloud pubsub lite-subscriptions delete $DESTINATION_SUBSCRIPTION_ID --zone=$REGION-$ZONE_ID
+ gcloud pubsub lite-topics delete $DESTINATION_TOPIC_ID --zone=$REGION-$ZONE_ID
```
2. Delete GCS bucket.
```sh
diff --git a/samples/snippets/src/main/java/pubsublite/spark/AdminUtils.java b/samples/snippets/src/main/java/pubsublite/spark/AdminUtils.java
index 09bac836..b6f712f6 100644
--- a/samples/snippets/src/main/java/pubsublite/spark/AdminUtils.java
+++ b/samples/snippets/src/main/java/pubsublite/spark/AdminUtils.java
@@ -20,6 +20,8 @@
import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.AlreadyExistsException;
import com.google.api.gax.rpc.ApiException;
+import com.google.cloud.pubsub.v1.AckReplyConsumer;
+import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.AdminClientSettings;
import com.google.cloud.pubsublite.CloudRegion;
@@ -30,8 +32,11 @@
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
+import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.cloudpubsub.Publisher;
import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings;
+import com.google.cloud.pubsublite.cloudpubsub.Subscriber;
+import com.google.cloud.pubsublite.cloudpubsub.SubscriberSettings;
import com.google.cloud.pubsublite.proto.Subscription;
import com.google.cloud.pubsublite.proto.Topic;
import com.google.protobuf.ByteString;
@@ -39,7 +44,11 @@
import com.google.pubsub.v1.PubsubMessage;
import java.util.ArrayList;
import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
public class AdminUtils {
@@ -144,14 +153,7 @@ public static void createSubscriptionExample(
}
public static void deleteSubscriptionExample(
- String cloudRegion, char zoneId, long projectNumber, String subscriptionId) throws Exception {
- SubscriptionPath subscriptionPath =
- SubscriptionPath.newBuilder()
- .setLocation(CloudZone.of(CloudRegion.of(cloudRegion), zoneId))
- .setProject(ProjectNumber.of(projectNumber))
- .setName(SubscriptionName.of(subscriptionId))
- .build();
-
+ String cloudRegion, SubscriptionPath subscriptionPath) throws Exception {
AdminClientSettings adminClientSettings =
AdminClientSettings.newBuilder().setRegion(CloudRegion.of(cloudRegion)).build();
@@ -161,6 +163,16 @@ public static void deleteSubscriptionExample(
}
}
+ public static void deleteTopicExample(String cloudRegion, TopicPath topicPath) throws Exception {
+ AdminClientSettings adminClientSettings =
+ AdminClientSettings.newBuilder().setRegion(CloudRegion.of(cloudRegion)).build();
+
+ try (AdminClient adminClient = AdminClient.create(adminClientSettings)) {
+ adminClient.deleteTopic(topicPath).get();
+ System.out.println(topicPath + " deleted successfully.");
+ }
+ }
+
public static void publisherExample(
String cloudRegion, char zoneId, long projectNumber, String topicId, List words)
throws ApiException, ExecutionException, InterruptedException {
@@ -208,4 +220,55 @@ public static void publisherExample(
}
}
}
+
+ public static Queue subscriberExample(
+ String cloudRegion, char zoneId, long projectNumber, String subscriptionId)
+ throws ApiException {
+ // Sample has at most 200 messages.
+ Queue result = new ArrayBlockingQueue<>(1000);
+
+ SubscriptionPath subscriptionPath =
+ SubscriptionPath.newBuilder()
+ .setLocation(CloudZone.of(CloudRegion.of(cloudRegion), zoneId))
+ .setProject(ProjectNumber.of(projectNumber))
+ .setName(SubscriptionName.of(subscriptionId))
+ .build();
+
+ MessageReceiver receiver =
+ (PubsubMessage message, AckReplyConsumer consumer) -> {
+ result.add(message);
+ consumer.ack();
+ };
+ FlowControlSettings flowControlSettings =
+ FlowControlSettings.builder()
+ .setBytesOutstanding(10 * 1024 * 1024L)
+ .setMessagesOutstanding(1000L)
+ .build();
+
+ SubscriberSettings subscriberSettings =
+ SubscriberSettings.newBuilder()
+ .setSubscriptionPath(subscriptionPath)
+ .setReceiver(receiver)
+ .setPerPartitionFlowControlSettings(flowControlSettings)
+ .build();
+
+ Subscriber subscriber = Subscriber.create(subscriberSettings);
+
+ // Start the subscriber. Upon successful starting, its state will become RUNNING.
+ subscriber.startAsync().awaitRunning();
+
+ try {
+ System.out.println(subscriber.state());
+ // Wait 90 seconds for the subscriber to reach TERMINATED state. If it encounters
+ // unrecoverable errors before then, its state will change to FAILED and an
+ // IllegalStateException will be thrown.
+ subscriber.awaitTerminated(90, TimeUnit.SECONDS);
+ } catch (TimeoutException t) {
+ // Shut down the subscriber. This will change the state of the subscriber to TERMINATED.
+ subscriber.stopAsync().awaitTerminated();
+ System.out.println("Subscriber is shut down: " + subscriber.state());
+ }
+
+ return result;
+ }
}
diff --git a/samples/snippets/src/main/java/pubsublite/spark/PublishWords.java b/samples/snippets/src/main/java/pubsublite/spark/PublishWords.java
index 5845d2d6..17152f67 100644
--- a/samples/snippets/src/main/java/pubsublite/spark/PublishWords.java
+++ b/samples/snippets/src/main/java/pubsublite/spark/PublishWords.java
@@ -34,27 +34,37 @@ public class PublishWords {
private static final String REGION = "REGION";
private static final String ZONE_ID = "ZONE_ID";
- private static final String TOPIC_ID = "TOPIC_ID";
- private static final String SUBSCRIPTION_ID = "SUBSCRIPTION_ID";
+ private static final String SOURCE_TOPIC_ID = "SOURCE_TOPIC_ID";
+ private static final String SOURCE_SUBSCRIPTION_ID = "SOURCE_SUBSCRIPTION_ID";
+ private static final String DESTINATION_TOPIC_ID = "DESTINATION_TOPIC_ID";
+ private static final String DESTINATION_SUBSCRIPTION_ID = "DESTINATION_SUBSCRIPTION_ID";
private static final String PROJECT_NUMBER = "PROJECT_NUMBER";
- private static final String PARTITIONS = "PARTITIONS";
public static void main(String[] args) throws Exception {
Map env = System.getenv();
Set missingVars =
Sets.difference(
- ImmutableSet.of(REGION, ZONE_ID, TOPIC_ID, SUBSCRIPTION_ID, PROJECT_NUMBER, PARTITIONS),
+ ImmutableSet.of(
+ REGION,
+ ZONE_ID,
+ SOURCE_TOPIC_ID,
+ SOURCE_SUBSCRIPTION_ID,
+ DESTINATION_TOPIC_ID,
+ DESTINATION_SUBSCRIPTION_ID,
+ PROJECT_NUMBER),
env.keySet());
Preconditions.checkState(
missingVars.isEmpty(), "Missing required environment variables: " + missingVars);
- String cloudRegion = env.get(REGION);
+ final String cloudRegion = env.get(REGION);
char zoneId = env.get(ZONE_ID).charAt(0);
- String topicId = env.get(TOPIC_ID);
- String subscriptionId = env.get(SUBSCRIPTION_ID);
+ final String sourceTopicId = env.get(SOURCE_TOPIC_ID);
+ final String sourceSubscriptionId = env.get(SOURCE_SUBSCRIPTION_ID);
+ final String destinationTopicId = env.get(DESTINATION_TOPIC_ID);
+ final String destinationSubscriptionId = env.get(DESTINATION_SUBSCRIPTION_ID);
long projectNumber = Long.parseLong(env.get(PROJECT_NUMBER));
- int partitions = Integer.parseInt(env.get(PARTITIONS));
+ int partitions = 1;
String snippets =
Resources.toString(Resources.getResource("text_snippets.txt"), Charset.defaultCharset());
@@ -64,12 +74,16 @@ public static void main(String[] args) throws Exception {
.replaceAll("\n", " ")
.replaceAll("\\s+", " ")
.toLowerCase();
- List words = Arrays.asList(snippets.split(" "));
+ final List words = Arrays.asList(snippets.split(" "));
- createTopicExample(cloudRegion, zoneId, projectNumber, topicId, partitions);
- createSubscriptionExample(cloudRegion, zoneId, projectNumber, topicId, subscriptionId);
+ createTopicExample(cloudRegion, zoneId, projectNumber, sourceTopicId, partitions);
+ createSubscriptionExample(
+ cloudRegion, zoneId, projectNumber, sourceTopicId, sourceSubscriptionId);
+ createTopicExample(cloudRegion, zoneId, projectNumber, destinationTopicId, partitions);
+ createSubscriptionExample(
+ cloudRegion, zoneId, projectNumber, destinationTopicId, destinationSubscriptionId);
- publisherExample(cloudRegion, zoneId, projectNumber, topicId, words);
+ publisherExample(cloudRegion, zoneId, projectNumber, sourceTopicId, words);
System.exit(0);
}
diff --git a/samples/snippets/src/main/java/pubsublite/spark/ReadResults.java b/samples/snippets/src/main/java/pubsublite/spark/ReadResults.java
new file mode 100644
index 00000000..e93d08e0
--- /dev/null
+++ b/samples/snippets/src/main/java/pubsublite/spark/ReadResults.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2021 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package pubsublite.spark;
+
+import static pubsublite.spark.AdminUtils.subscriberExample;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import java.util.Map;
+import java.util.Set;
+
+public class ReadResults {
+
+ private static final String REGION = "REGION";
+ private static final String ZONE_ID = "ZONE_ID";
+ private static final String DESTINATION_SUBSCRIPTION_ID = "DESTINATION_SUBSCRIPTION_ID";
+ private static final String PROJECT_NUMBER = "PROJECT_NUMBER";
+
+ public static void main(String[] args) {
+
+ Map env = System.getenv();
+ Set missingVars =
+ Sets.difference(
+ ImmutableSet.of(REGION, ZONE_ID, DESTINATION_SUBSCRIPTION_ID, PROJECT_NUMBER),
+ env.keySet());
+ Preconditions.checkState(
+ missingVars.isEmpty(), "Missing required environment variables: " + missingVars);
+
+ String cloudRegion = env.get(REGION);
+ char zoneId = env.get(ZONE_ID).charAt(0);
+ String destinationSubscriptionId = env.get(DESTINATION_SUBSCRIPTION_ID);
+ long projectNumber = Long.parseLong(env.get(PROJECT_NUMBER));
+
+ System.out.println("Word count results:");
+ subscriberExample(cloudRegion, zoneId, projectNumber, destinationSubscriptionId)
+ .forEach((m) -> System.out.println(m.getData().toStringUtf8().replace("_", ": ")));
+
+ System.exit(0);
+ }
+}
diff --git a/samples/snippets/src/main/java/pubsublite/spark/WordCount.java b/samples/snippets/src/main/java/pubsublite/spark/WordCount.java
index c6c7ce5f..4696bc69 100644
--- a/samples/snippets/src/main/java/pubsublite/spark/WordCount.java
+++ b/samples/snippets/src/main/java/pubsublite/spark/WordCount.java
@@ -16,8 +16,11 @@
package pubsublite.spark;
+import static org.apache.spark.sql.functions.concat;
+import static org.apache.spark.sql.functions.lit;
import static org.apache.spark.sql.functions.split;
+import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
@@ -31,12 +34,25 @@
public class WordCount {
public static void main(String[] args) throws Exception {
+ final String appId = UUID.randomUUID().toString();
+ final String sourceSubscriptionPath = args[0];
+ final String destinationTopicPath = args[1];
- SparkSession spark = SparkSession.builder().appName("Word count").master("yarn").getOrCreate();
+ SparkSession spark =
+ SparkSession.builder()
+ .appName(String.format("Word count (ID: %s)", appId))
+ .master("yarn")
+ .getOrCreate();
+ // Read messages from Pub/Sub Lite
Dataset df =
- spark.readStream().format("pubsublite").option("pubsublite.subscription", args[0]).load();
+ spark
+ .readStream()
+ .format("pubsublite")
+ .option("pubsublite.subscription", sourceSubscriptionPath)
+ .load();
+ // Aggregate word counts
Column splitCol = split(df.col("data"), "_");
df =
df.withColumn("word", splitCol.getItem(0))
@@ -44,9 +60,18 @@ public static void main(String[] args) throws Exception {
df = df.groupBy("word").sum("word_count");
df = df.orderBy(df.col("sum(word_count)").desc(), df.col("word").asc());
+ // Add Pub/Sub Lite message data field
+ df =
+ df.withColumn(
+ "data",
+ concat(df.col("word"), lit("_"), df.col("sum(word_count)")).cast(DataTypes.BinaryType));
+
+ // Write word count results to Pub/Sub Lite
StreamingQuery query =
df.writeStream()
- .format("console")
+ .format("pubsublite")
+ .option("pubsublite.topic", destinationTopicPath)
+ .option("checkpointLocation", String.format("/tmp/checkpoint-%s", appId))
.outputMode(OutputMode.Complete())
.trigger(Trigger.ProcessingTime(1, TimeUnit.SECONDS))
.start();
diff --git a/samples/snippets/src/test/java/pubsublite/spark/SampleIntegrationTest.java b/samples/snippets/src/test/java/pubsublite/spark/SampleIntegrationTest.java
index d3011e6b..27afa147 100644
--- a/samples/snippets/src/test/java/pubsublite/spark/SampleIntegrationTest.java
+++ b/samples/snippets/src/test/java/pubsublite/spark/SampleIntegrationTest.java
@@ -18,7 +18,10 @@
import static com.google.common.truth.Truth.assertThat;
import static pubsublite.spark.AdminUtils.createSubscriptionExample;
+import static pubsublite.spark.AdminUtils.createTopicExample;
import static pubsublite.spark.AdminUtils.deleteSubscriptionExample;
+import static pubsublite.spark.AdminUtils.deleteTopicExample;
+import static pubsublite.spark.AdminUtils.subscriberExample;
import com.google.api.gax.longrunning.OperationFuture;
import com.google.cloud.dataproc.v1.Job;
@@ -34,24 +37,26 @@
import com.google.cloud.pubsublite.SubscriptionName;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.TopicName;
-import com.google.cloud.storage.Blob;
+import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
+import com.google.pubsub.v1.PubsubMessage;
import java.io.BufferedReader;
import java.io.File;
import java.io.InputStreamReader;
import java.nio.file.Files;
import java.nio.file.Paths;
+import java.util.HashMap;
import java.util.Map;
+import java.util.Queue;
import java.util.Set;
import java.util.UUID;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
import org.apache.commons.lang.StringUtils;
import org.apache.maven.shared.invoker.DefaultInvocationRequest;
import org.apache.maven.shared.invoker.DefaultInvoker;
@@ -63,7 +68,6 @@
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import org.spark_project.guava.collect.ImmutableList;
public class SampleIntegrationTest {
@@ -82,9 +86,13 @@ public class SampleIntegrationTest {
private CloudZone cloudZone;
private ProjectNumber projectNumber;
private ProjectId projectId;
- private TopicName topicId;
- private SubscriptionName subscriptionName;
- private SubscriptionPath subscriptionPath;
+ private TopicName sourceTopicId;
+ private SubscriptionName sourceSubscriptionName;
+ private SubscriptionPath sourceSubscriptionPath;
+ private TopicName destinationTopicId;
+ private TopicPath destinationTopicPath;
+ private SubscriptionName destinationSubscriptionName;
+ private SubscriptionPath destinationSubscriptionPath;
private String clusterName;
private String bucketName;
private String workingDir;
@@ -143,7 +151,8 @@ private Job runDataprocJob() throws Exception {
.addJarFileUris(String.format("gs://%s/%s", bucketName, sampleJarNameInGCS))
.addJarFileUris(String.format("gs://%s/%s", bucketName, connectorJarNameInGCS))
.setMainClass("pubsublite.spark.WordCount")
- .addArgs(subscriptionPath.toString())
+ .addArgs(sourceSubscriptionPath.toString())
+ .addArgs(destinationTopicPath.toString())
.build();
Job job = Job.newBuilder().setPlacement(jobPlacement).setSparkJob(sparkJob).build();
OperationFuture submitJobAsOperationAsyncRequest =
@@ -153,39 +162,40 @@ private Job runDataprocJob() throws Exception {
}
}
- private void verifyDataprocOutput(Storage storage, Job job) {
- Matcher matches = Pattern.compile("gs://(.*?)/(.*)").matcher(job.getDriverOutputResourceUri());
- assertThat(matches.matches()).isTrue();
-
- Blob blob = storage.get(matches.group(1), String.format("%s.000000000", matches.group(2)));
- String sparkJobOutput = new String(blob.getContent());
- String expectedWordCountResult =
- "+-----+---------------+\n"
- + "| word|sum(word_count)|\n"
- + "+-----+---------------+\n"
- + "| the| 24|\n"
- + "| of| 16|\n"
- + "| and| 14|\n"
- + "| i| 13|\n"
- + "| my| 10|\n"
- + "| a| 6|\n"
- + "| in| 5|\n"
- + "| that| 5|\n"
- + "| soul| 4|\n"
- + "| with| 4|\n"
- + "| as| 3|\n"
- + "| feel| 3|\n"
- + "| like| 3|\n"
- + "| me| 3|\n"
- + "| so| 3|\n"
- + "| then| 3|\n"
- + "| us| 3|\n"
- + "| when| 3|\n"
- + "|which| 3|\n"
- + "| am| 2|\n"
- + "+-----+---------------+\n"
- + "only showing top 20 rows";
- assertThat(sparkJobOutput).contains(expectedWordCountResult);
+ private void verifyWordCountResult() {
+ Map expected = new HashMap<>();
+ expected.put("the", 24);
+ expected.put("of", 16);
+ expected.put("and", 14);
+ expected.put("i", 13);
+ expected.put("my", 10);
+ expected.put("a", 6);
+ expected.put("in", 5);
+ expected.put("that", 5);
+ expected.put("soul", 4);
+ expected.put("with", 4);
+ expected.put("as", 3);
+ expected.put("feel", 3);
+ expected.put("like", 3);
+ expected.put("me", 3);
+ expected.put("so", 3);
+ expected.put("then", 3);
+ expected.put("us", 3);
+ expected.put("when", 3);
+ expected.put("which", 3);
+ expected.put("am", 2);
+ Map actual = new HashMap<>();
+ Queue results =
+ subscriberExample(
+ cloudRegion.value(),
+ cloudZone.zoneId(),
+ projectNumber.value(),
+ destinationSubscriptionName.value());
+ for (PubsubMessage m : results) {
+ String[] pair = m.getData().toStringUtf8().split("_");
+ actual.put(pair[0], Integer.parseInt(pair[1]));
+ }
+ assertThat(actual).containsAtLeastEntriesIn(expected);
}
private void setUpVariables() {
@@ -208,13 +218,28 @@ private void setUpVariables() {
cloudZone = CloudZone.of(cloudRegion, env.get(CLOUD_ZONE).charAt(0));
projectId = ProjectId.of(env.get(PROJECT_ID));
projectNumber = ProjectNumber.of(Long.parseLong(env.get(PROJECT_NUMBER)));
- topicId = TopicName.of(env.get(TOPIC_ID));
- subscriptionName = SubscriptionName.of("sample-integration-sub-" + runId);
- subscriptionPath =
+ sourceTopicId = TopicName.of(env.get(TOPIC_ID));
+ sourceSubscriptionName = SubscriptionName.of("sample-integration-sub-source-" + runId);
+ sourceSubscriptionPath =
+ SubscriptionPath.newBuilder()
+ .setProject(projectId)
+ .setLocation(cloudZone)
+ .setName(sourceSubscriptionName)
+ .build();
+ destinationTopicId = TopicName.of("sample-integration-topic-destination-" + runId);
+ destinationTopicPath =
+ TopicPath.newBuilder()
+ .setProject(projectId)
+ .setLocation(cloudZone)
+ .setName(destinationTopicId)
+ .build();
+ destinationSubscriptionName =
+ SubscriptionName.of("sample-integration-sub-destination-" + runId);
+ destinationSubscriptionPath =
SubscriptionPath.newBuilder()
.setProject(projectId)
.setLocation(cloudZone)
- .setName(subscriptionName)
+ .setName(destinationSubscriptionName)
.build();
clusterName = env.get(CLUSTER_NAME);
bucketName = env.get(BUCKET_NAME);
@@ -240,22 +265,38 @@ public void setUp() throws Exception {
setUpVariables();
findMavenHome();
- // Create a subscription
+ // Create a subscription to read source word messages
createSubscriptionExample(
cloudRegion.value(),
cloudZone.zoneId(),
projectNumber.value(),
- topicId.value(),
- subscriptionName.value());
+ sourceTopicId.value(),
+ sourceSubscriptionName.value());
+
+ // Create a topic and subscription for word count final results
+ createTopicExample(
+ cloudRegion.value(),
+ cloudZone.zoneId(),
+ projectNumber.value(),
+ destinationTopicId.value(),
+ /*partitions=*/ 1);
+ createSubscriptionExample(
+ cloudRegion.value(),
+ cloudZone.zoneId(),
+ projectNumber.value(),
+ destinationTopicId.value(),
+ destinationSubscriptionName.value());
}
@After
public void tearDown() throws Exception {
- // Cleanup the subscription
- deleteSubscriptionExample(
- cloudRegion.value(), cloudZone.zoneId(), projectNumber.value(), subscriptionName.value());
+ // Cleanup the topics and subscriptions
+ deleteSubscriptionExample(cloudRegion.value(), sourceSubscriptionPath);
+ deleteSubscriptionExample(cloudRegion.value(), destinationSubscriptionPath);
+ deleteTopicExample(cloudRegion.value(), destinationTopicPath);
}
+ /** Note that source single word messages have been published to a permanent topic. */
@Test
public void test() throws Exception {
// Maven package into jars
@@ -268,8 +309,10 @@ public void test() throws Exception {
uploadGCS(storage, sampleJarNameInGCS, sampleJarLoc);
uploadGCS(storage, connectorJarNameInGCS, connectorJarLoc);
- // Run Dataproc job and verify output
- Job jobResponse = runDataprocJob();
- verifyDataprocOutput(storage, jobResponse);
+ // Run Dataproc job, block until it finishes
+ runDataprocJob();
+
+ // Verify final destination messages in Pub/Sub Lite
+ verifyWordCountResult();
}
}
diff --git a/src/main/java/com/google/cloud/pubsublite/spark/PslWriteDataSourceOptions.java b/src/main/java/com/google/cloud/pubsublite/spark/PslWriteDataSourceOptions.java
index 44b6d95d..d36844a0 100644
--- a/src/main/java/com/google/cloud/pubsublite/spark/PslWriteDataSourceOptions.java
+++ b/src/main/java/com/google/cloud/pubsublite/spark/PslWriteDataSourceOptions.java
@@ -27,6 +27,7 @@
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.TopicPath;
+import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings;
import com.google.cloud.pubsublite.internal.Publisher;
import com.google.cloud.pubsublite.internal.wire.PartitionCountWatchingPublisherSettings;
import com.google.cloud.pubsublite.internal.wire.PubsubContext;
@@ -92,6 +93,7 @@ public Publisher createNewPublisher() {
.setTopic(topicPath())
.setPartition(partition)
.setServiceClient(newServiceClient(partition))
+ .setBatchingSettings(PublisherSettings.DEFAULT_BATCHING_SETTINGS)
.build())
.setAdminClient(getAdminClient())
.build()