Skip to content

Commit

Permalink
samples: Spark connector writer support sample and integration test (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
jiangmichaellll committed Apr 6, 2021
1 parent 4ef4a04 commit 98f5863
Show file tree
Hide file tree
Showing 7 changed files with 314 additions and 94 deletions.
52 changes: 35 additions & 17 deletions samples/README.md
@@ -1,24 +1,30 @@
# Pub/Sub Lite Spark Connector Word Count Samples

This directory contains a word count sample for Pub/Sub Lite Spark Connector.
This directory contains a word count sample for Pub/Sub Lite Spark Connector. The sample will read
single word count messages from Pub/Sub Lite, do the aggregation (count words) in Spark, and finally
write back to Pub/Sub Lite. Note the topic/subscription to read is different from the topic/subscription
to write and verify the final word count results.

## Authentication

Please see the [Google cloud authentication guide](https://cloud.google.com/docs/authentication/).
The recommended approach is to use Application Default Credentials by setting `GOOGLE_APPLICATION_CREDENTIALS`.

## Environment Variables
Set the following environment variables:
Set the following environment variables. <br>
Note `SOURCE_TOPIC_ID` and `SOURCE_SUBSCRIPTION_ID` are used to read _raw_ single word count messages;
while `DESTINATION_TOPIC_ID` and `DESTINATION_SUBSCRIPTION_ID` are used for the final word counts results. They must
be different.
```
PROJECT_NUMBER=12345 # or your project number
REGION=us-central1 # or your region
ZONE_ID=b # or your zone id
TOPIC_ID=test-topic # or your topic id to create
SUBSCRIPTION_ID=test-subscrciption # or your subscription to create
PARTITIONS=1 # or your number of partitions to create
SOURCE_TOPIC_ID=test-topic # or your topic id to create
SOURCE_SUBSCRIPTION_ID=test-subscription # or your subscription to create
DESTINATION_TOPIC_ID=test-topic-2 # or your topic id to create, this is different from SOURCE_TOPIC_ID!
DESTINATION_SUBSCRIPTION_ID=test-subscription-2 # or your subscription to create, this is different from SOURCE_SUBSCRIPTION_ID!
CLUSTER_NAME=waprin-spark7 # or your Dataproc cluster name to create
BUCKET=gs://your-gcs-bucket
SUBSCRIPTION_PATH=projects/$PROJECT_NUMBER/locations/$REGION-$ZONE_ID/subscriptions/$SUBSCRIPTION_ID
CONNECTOR_VERSION= # latest pubsublite-spark-sql-streaming release version
PUBSUBLITE_SPARK_SQL_STREAMING_JAR_LOCATION= # downloaded pubsublite-spark-sql-streaming-$CONNECTOR_VERSION-with-dependencies jar location
```
Expand All @@ -36,14 +42,16 @@ To run the word count sample in Dataproc cluster, follow the steps:
--non-recursive \
exec:exec)
```
3. Create the topic and subscription, and publish word count messages to the topic.
3. Create both the source and destination topics and subscriptions, and publish word count messages to the _source_
topic.
```sh
PROJECT_NUMBER=$PROJECT_NUMBER \
REGION=$REGION \
ZONE_ID=$ZONE_ID \
TOPIC_ID=$TOPIC_ID \
SUBSCRIPTION_ID=$SUBSCRIPTION_ID \
PARTITIONS=$PARTITIONS \
SOURCE_TOPIC_ID=$SOURCE_TOPIC_ID \
SOURCE_SUBSCRIPTION_ID=$SOURCE_SUBSCRIPTION_ID \
DESTINATION_TOPIC_ID=$DESTINATION_TOPIC_ID \
DESTINATION_SUBSCRIPTION_ID=$DESTINATION_SUBSCRIPTION_ID \
mvn compile exec:java -Dexec.mainClass=pubsublite.spark.PublishWords
```
4. Create a Dataproc cluster
Expand All @@ -54,8 +62,7 @@ To run the word count sample in Dataproc cluster, follow the steps:
```sh
mvn clean package -Dmaven.test.skip=true
```
<!-- TODO: provide link to maven central -->
6. Download `pubsublite-spark-sql-streaming-$CONNECTOR_VERSION-with-dependencies.jar` from Maven Central and set `PUBSUBLITE_SPARK_SQL_STREAMING_JAR_LOCATION` environment variable.
6. Download `pubsublite-spark-sql-streaming-$CONNECTOR_VERSION-with-dependencies.jar` from [Maven Central](https://search.maven.org/artifact/com.google.cloud/pubsublite-spark-sql-streaming) and set `PUBSUBLITE_SPARK_SQL_STREAMING_JAR_LOCATION` environment variable.
7. Create GCS bucket and upload both `pubsublite-spark-sql-streaming-$CONNECTOR_VERSION-with-dependencies.jar` and the sample jar onto GCS
```sh
gsutil mb $BUCKET
Expand All @@ -66,19 +73,30 @@ To run the word count sample in Dataproc cluster, follow the steps:
```sh
gcloud config set dataproc/region $REGION
```
<!-- TODO: set up bots to update jar version -->
9. Run the sample in Dataproc. You would see the word count result show up in the console output.
9. Run the sample in Dataproc. This will perform word count aggregation and publish word count results to Pub/Sub Lite.
```sh
gcloud dataproc jobs submit spark --cluster=$CLUSTER_NAME \
--jars=$BUCKET/pubsublite-spark-snippets-$SAMPLE_VERSION.jar,$BUCKET/pubsublite-spark-sql-streaming-$CONNECTOR_VERSION-with-dependencies.jar \
--class=pubsublite.spark.WordCount -- $SUBSCRIPTION_PATH
--class=pubsublite.spark.WordCount -- \
projects/$PROJECT_NUMBER/locations/$REGION-$ZONE_ID/subscriptions/$SOURCE_SUBSCRIPTION_ID \
projects/$PROJECT_NUMBER/locations/$REGION-$ZONE_ID/topics/$DESTINATION_TOPIC_ID
```
10. Read word count results from Pub/Sub Lite, you should see the result in console output.
```sh
PROJECT_NUMBER=$PROJECT_NUMBER \
REGION=$REGION \
ZONE_ID=$ZONE_ID \
DESTINATION_SUBSCRIPTION_ID=$DESTINATION_SUBSCRIPTION_ID \
mvn compile exec:java -Dexec.mainClass=pubsublite.spark.ReadResults
```

## Cleaning up
1. Delete Pub/Sub Lite topic and subscription.
```sh
gcloud pubsub lite-subscriptions delete $SUBSCRIPTION_ID --zone=$REGION-$ZONE_ID
gcloud pubsub lite-topics delete $TOPIC_ID --zone=$REGION-$ZONE_ID
gcloud pubsub lite-subscriptions delete $SOURCE_SUBSCRIPTION_ID --zone=$REGION-$ZONE_ID
gcloud pubsub lite-topics delete $SOURCE_TOPIC_ID --zone=$REGION-$ZONE_ID
gcloud pubsub lite-subscriptions delete $DESTINATION_SUBSCRIPTION_ID --zone=$REGION-$ZONE_ID
gcloud pubsub lite-topics delete $DESTINATION_TOPIC_ID --zone=$REGION-$ZONE_ID
```
2. Delete GCS bucket.
```sh
Expand Down
79 changes: 71 additions & 8 deletions samples/snippets/src/main/java/pubsublite/spark/AdminUtils.java
Expand Up @@ -20,6 +20,8 @@
import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.AlreadyExistsException;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.AdminClientSettings;
import com.google.cloud.pubsublite.CloudRegion;
Expand All @@ -30,16 +32,23 @@
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.cloudpubsub.Publisher;
import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings;
import com.google.cloud.pubsublite.cloudpubsub.Subscriber;
import com.google.cloud.pubsublite.cloudpubsub.SubscriberSettings;
import com.google.cloud.pubsublite.proto.Subscription;
import com.google.cloud.pubsublite.proto.Topic;
import com.google.protobuf.ByteString;
import com.google.protobuf.util.Durations;
import com.google.pubsub.v1.PubsubMessage;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class AdminUtils {

Expand Down Expand Up @@ -144,14 +153,7 @@ public static void createSubscriptionExample(
}

public static void deleteSubscriptionExample(
String cloudRegion, char zoneId, long projectNumber, String subscriptionId) throws Exception {
SubscriptionPath subscriptionPath =
SubscriptionPath.newBuilder()
.setLocation(CloudZone.of(CloudRegion.of(cloudRegion), zoneId))
.setProject(ProjectNumber.of(projectNumber))
.setName(SubscriptionName.of(subscriptionId))
.build();

String cloudRegion, SubscriptionPath subscriptionPath) throws Exception {
AdminClientSettings adminClientSettings =
AdminClientSettings.newBuilder().setRegion(CloudRegion.of(cloudRegion)).build();

Expand All @@ -161,6 +163,16 @@ public static void deleteSubscriptionExample(
}
}

public static void deleteTopicExample(String cloudRegion, TopicPath topicPath) throws Exception {
AdminClientSettings adminClientSettings =
AdminClientSettings.newBuilder().setRegion(CloudRegion.of(cloudRegion)).build();

try (AdminClient adminClient = AdminClient.create(adminClientSettings)) {
adminClient.deleteTopic(topicPath).get();
System.out.println(topicPath + " deleted successfully.");
}
}

public static void publisherExample(
String cloudRegion, char zoneId, long projectNumber, String topicId, List<String> words)
throws ApiException, ExecutionException, InterruptedException {
Expand Down Expand Up @@ -208,4 +220,55 @@ public static void publisherExample(
}
}
}

public static Queue<PubsubMessage> subscriberExample(
String cloudRegion, char zoneId, long projectNumber, String subscriptionId)
throws ApiException {
// Sample has at most 200 messages.
Queue<PubsubMessage> result = new ArrayBlockingQueue<>(1000);

SubscriptionPath subscriptionPath =
SubscriptionPath.newBuilder()
.setLocation(CloudZone.of(CloudRegion.of(cloudRegion), zoneId))
.setProject(ProjectNumber.of(projectNumber))
.setName(SubscriptionName.of(subscriptionId))
.build();

MessageReceiver receiver =
(PubsubMessage message, AckReplyConsumer consumer) -> {
result.add(message);
consumer.ack();
};
FlowControlSettings flowControlSettings =
FlowControlSettings.builder()
.setBytesOutstanding(10 * 1024 * 1024L)
.setMessagesOutstanding(1000L)
.build();

SubscriberSettings subscriberSettings =
SubscriberSettings.newBuilder()
.setSubscriptionPath(subscriptionPath)
.setReceiver(receiver)
.setPerPartitionFlowControlSettings(flowControlSettings)
.build();

Subscriber subscriber = Subscriber.create(subscriberSettings);

// Start the subscriber. Upon successful starting, its state will become RUNNING.
subscriber.startAsync().awaitRunning();

try {
System.out.println(subscriber.state());
// Wait 90 seconds for the subscriber to reach TERMINATED state. If it encounters
// unrecoverable errors before then, its state will change to FAILED and an
// IllegalStateException will be thrown.
subscriber.awaitTerminated(90, TimeUnit.SECONDS);
} catch (TimeoutException t) {
// Shut down the subscriber. This will change the state of the subscriber to TERMINATED.
subscriber.stopAsync().awaitTerminated();
System.out.println("Subscriber is shut down: " + subscriber.state());
}

return result;
}
}
38 changes: 26 additions & 12 deletions samples/snippets/src/main/java/pubsublite/spark/PublishWords.java
Expand Up @@ -34,27 +34,37 @@ public class PublishWords {

private static final String REGION = "REGION";
private static final String ZONE_ID = "ZONE_ID";
private static final String TOPIC_ID = "TOPIC_ID";
private static final String SUBSCRIPTION_ID = "SUBSCRIPTION_ID";
private static final String SOURCE_TOPIC_ID = "SOURCE_TOPIC_ID";
private static final String SOURCE_SUBSCRIPTION_ID = "SOURCE_SUBSCRIPTION_ID";
private static final String DESTINATION_TOPIC_ID = "DESTINATION_TOPIC_ID";
private static final String DESTINATION_SUBSCRIPTION_ID = "DESTINATION_SUBSCRIPTION_ID";
private static final String PROJECT_NUMBER = "PROJECT_NUMBER";
private static final String PARTITIONS = "PARTITIONS";

public static void main(String[] args) throws Exception {

Map<String, String> env = System.getenv();
Set<String> missingVars =
Sets.difference(
ImmutableSet.of(REGION, ZONE_ID, TOPIC_ID, SUBSCRIPTION_ID, PROJECT_NUMBER, PARTITIONS),
ImmutableSet.of(
REGION,
ZONE_ID,
SOURCE_TOPIC_ID,
SOURCE_SUBSCRIPTION_ID,
DESTINATION_TOPIC_ID,
DESTINATION_SUBSCRIPTION_ID,
PROJECT_NUMBER),
env.keySet());
Preconditions.checkState(
missingVars.isEmpty(), "Missing required environment variables: " + missingVars);

String cloudRegion = env.get(REGION);
final String cloudRegion = env.get(REGION);
char zoneId = env.get(ZONE_ID).charAt(0);
String topicId = env.get(TOPIC_ID);
String subscriptionId = env.get(SUBSCRIPTION_ID);
final String sourceTopicId = env.get(SOURCE_TOPIC_ID);
final String sourceSubscriptionId = env.get(SOURCE_SUBSCRIPTION_ID);
final String destinationTopicId = env.get(DESTINATION_TOPIC_ID);
final String destinationSubscriptionId = env.get(DESTINATION_SUBSCRIPTION_ID);
long projectNumber = Long.parseLong(env.get(PROJECT_NUMBER));
int partitions = Integer.parseInt(env.get(PARTITIONS));
int partitions = 1;

String snippets =
Resources.toString(Resources.getResource("text_snippets.txt"), Charset.defaultCharset());
Expand All @@ -64,12 +74,16 @@ public static void main(String[] args) throws Exception {
.replaceAll("\n", " ")
.replaceAll("\\s+", " ")
.toLowerCase();
List<String> words = Arrays.asList(snippets.split(" "));
final List<String> words = Arrays.asList(snippets.split(" "));

createTopicExample(cloudRegion, zoneId, projectNumber, topicId, partitions);
createSubscriptionExample(cloudRegion, zoneId, projectNumber, topicId, subscriptionId);
createTopicExample(cloudRegion, zoneId, projectNumber, sourceTopicId, partitions);
createSubscriptionExample(
cloudRegion, zoneId, projectNumber, sourceTopicId, sourceSubscriptionId);
createTopicExample(cloudRegion, zoneId, projectNumber, destinationTopicId, partitions);
createSubscriptionExample(
cloudRegion, zoneId, projectNumber, destinationTopicId, destinationSubscriptionId);

publisherExample(cloudRegion, zoneId, projectNumber, topicId, words);
publisherExample(cloudRegion, zoneId, projectNumber, sourceTopicId, words);

System.exit(0);
}
Expand Down
55 changes: 55 additions & 0 deletions samples/snippets/src/main/java/pubsublite/spark/ReadResults.java
@@ -0,0 +1,55 @@
/*
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package pubsublite.spark;

import static pubsublite.spark.AdminUtils.subscriberExample;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import java.util.Map;
import java.util.Set;

public class ReadResults {

private static final String REGION = "REGION";
private static final String ZONE_ID = "ZONE_ID";
private static final String DESTINATION_SUBSCRIPTION_ID = "DESTINATION_SUBSCRIPTION_ID";
private static final String PROJECT_NUMBER = "PROJECT_NUMBER";

public static void main(String[] args) {

Map<String, String> env = System.getenv();
Set<String> missingVars =
Sets.difference(
ImmutableSet.of(REGION, ZONE_ID, DESTINATION_SUBSCRIPTION_ID, PROJECT_NUMBER),
env.keySet());
Preconditions.checkState(
missingVars.isEmpty(), "Missing required environment variables: " + missingVars);

String cloudRegion = env.get(REGION);
char zoneId = env.get(ZONE_ID).charAt(0);
String destinationSubscriptionId = env.get(DESTINATION_SUBSCRIPTION_ID);
long projectNumber = Long.parseLong(env.get(PROJECT_NUMBER));

System.out.println("Word count results:");
subscriberExample(cloudRegion, zoneId, projectNumber, destinationSubscriptionId)
.forEach((m) -> System.out.println(m.getData().toStringUtf8().replace("_", ": ")));

System.exit(0);
}
}

0 comments on commit 98f5863

Please sign in to comment.