This directory contains a word count sample for Pub/Sub Lite Spark Connector.
Please see the Google cloud authentication guide.
The recommended approach is to use Application Default Credentials by setting GOOGLE_APPLICATION_CREDENTIALS
.
Set the following environment variables:
PROJECT_NUMBER=12345 # or your project number
REGION=us-central1 # or your region
ZONE_ID=b # or your zone id
TOPIC_ID=test-topic # or your topic id to create
SUBSCRIPTION_ID=test-subscrciption # or your subscription to create
PARTITIONS=1 # or your number of partitions to create
CLUSTER_NAME=waprin-spark7 # or your Dataproc cluster name to create
BUCKET=gs://your-gcs-bucket
SUBSCRIPTION_PATH=projects/$PROJECT_NUMBER/locations/$REGION-$ZONE_ID/subscriptions/$SUBSCRIPTION_ID
CONNECTOR_VERSION= # latest pubsublite-spark-sql-streaming release version
PUBSUBLITE_SPARK_SQL_STREAMING_JAR_LOCATION= # downloaded pubsublite-spark-sql-streaming-$CONNECTOR_VERSION-with-dependencies jar location
To run the word count sample in Dataproc cluster, follow the steps:
cd samples/snippets
- Set the current sample version.
SAMPLE_VERSION=$(mvn -q \ -Dexec.executable=echo \ -Dexec.args='${project.version}' \ --non-recursive \ exec:exec)
- Create the topic and subscription, and publish word count messages to the topic.
PROJECT_NUMBER=$PROJECT_NUMBER \ REGION=$REGION \ ZONE_ID=$ZONE_ID \ TOPIC_ID=$TOPIC_ID \ SUBSCRIPTION_ID=$SUBSCRIPTION_ID \ PARTITIONS=$PARTITIONS \ mvn compile exec:java -Dexec.mainClass=pubsublite.spark.PublishWords
- Create a Dataproc cluster
gcloud dataproc clusters create $CLUSTER_NAME --region=$REGION --zone=$REGION-$ZONE_ID --image-version=1.5-debian10 --scopes=cloud-platform
- Package sample jar
mvn clean package -Dmaven.test.skip=true
- Download
pubsublite-spark-sql-streaming-$CONNECTOR_VERSION-with-dependencies.jar
from Maven Central and setPUBSUBLITE_SPARK_SQL_STREAMING_JAR_LOCATION
environment variable. - Create GCS bucket and upload both
pubsublite-spark-sql-streaming-$CONNECTOR_VERSION-with-dependencies.jar
and the sample jar onto GCSgsutil mb $BUCKET gsutil cp target/pubsublite-spark-snippets-$SAMPLE_VERSION.jar $BUCKET gsutil cp $PUBSUBLITE_SPARK_SQL_STREAMING_JAR_LOCATION $BUCKET
- Set Dataproc region
gcloud config set dataproc/region $REGION
- Run the sample in Dataproc. You would see the word count result show up in the console output.
gcloud dataproc jobs submit spark --cluster=$CLUSTER_NAME \ --jars=$BUCKET/pubsublite-spark-snippets-$SAMPLE_VERSION.jar,$BUCKET/pubsublite-spark-sql-streaming-$CONNECTOR_VERSION-with-dependencies.jar \ --class=pubsublite.spark.WordCount -- $SUBSCRIPTION_PATH
- Delete Pub/Sub Lite topic and subscription.
gcloud pubsub lite-subscriptions delete $SUBSCRIPTION_ID --zone=$REGION-$ZONE_ID gcloud pubsub lite-topics delete $TOPIC_ID --zone=$REGION-$ZONE_ID
- Delete GCS bucket.
gsutil -m rm -rf $BUCKET
- Delete Dataproc cluster.
gcloud dataproc clusters delete $CLUSTER_NAME --region=$REGION