diff --git a/.readme-partials.yaml b/.readme-partials.yaml index 9c84e179..260d7a64 100644 --- a/.readme-partials.yaml +++ b/.readme-partials.yaml @@ -28,20 +28,56 @@ custom_content: | ## Usage + ### Samples + + There are 3 java samples (word count, simple write, simple read) under [samples](https://github.com/googleapis/java-pubsublite-spark/tree/master/samples) that shows using the connector inside Dataproc. + ### Reading data from Pub/Sub Lite + Here is an example in Python: ```python df = spark.readStream \ - .option("pubsublite.subscription", "projects/$PROJECT_NUMBER/locations/$LOCATION/subscriptions/$SUBSCRIPTION_ID") .format("pubsublite") \ + .option("pubsublite.subscription", "projects/$PROJECT_NUMBER/locations/$LOCATION/subscriptions/$SUBSCRIPTION_ID") \ .load ``` + Here is an example in Java: + ```java + Dataset df = spark + .readStream() + .format("pubsublite") + .option("pubsublite.subscription", "projects/$PROJECT_NUMBER/locations/$LOCATION/subscriptions/$SUBSCRIPTION_ID"t ) + .load(); + ``` Note that the connector supports both MicroBatch Processing and [Continuous Processing](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#continuous-processing). + ### Writing data to Pub/Sub Lite + + Here is an example in Python: + ```python + df.writeStream \ + .format("pubsublite") \ + .option("pubsublite.topic", "projects/$PROJECT_NUMBER/locations/$LOCATION/topics/$TOPIC_ID") \ + .option("checkpointLocation", "path/to/HDFS/dir") + .outputMode("complete") \ + .trigger(processingTime="2 seconds") \ + .start() + ``` + Here is an example in Java: + ```java + df.writeStream() + .format("pubsublite") + .option("pubsublite.topic", "projects/$PROJECT_NUMBER/locations/$LOCATION/topics/$TOPIC_ID") + .option("checkpointLocation", "path/to/HDFS/dir") + .outputMode(OutputMode.Complete()) + .trigger(Trigger.ProcessingTime(2, TimeUnit.SECONDS)) + .start(); + ``` + ### Properties - The connector supports a number of options to configure the read: + When reading from Pub/Sub Lite, the connector supports a number of configuration options: | Option | Type | Required | Default Value | Meaning | | ------ | ---- | -------- | ------------- | ------- | @@ -51,9 +87,16 @@ custom_content: | | pubsublite.flowcontrol.maxmessagesperbatch | Long | N | Long.MAX | Max number of messages in micro batch. | | gcp.credentials.key | String | N | [Application Default Credentials](https://cloud.google.com/docs/authentication/production#automatically) | Service account JSON in base64. | + When writing to Pub/Sub Lite, the connector supports a number of configuration options: + + | Option | Type | Required | Default Value | Meaning | + | ------ | ---- | -------- | ------------- | ------- | + | pubsublite.topic | String | Y | | Full topic path that the connector will write to. | + | gcp.credentials.key | String | N | [Application Default Credentials](https://cloud.google.com/docs/authentication/production#automatically) | Service account JSON in base64. | + ### Data Schema - The connector has fixed data schema as follows: + When reading from Pub/Sub Lite, the connector has a fixed data schema as follows: | Data Field | Spark Data Type | Notes | | ---------- | --------------- | ----- | @@ -66,6 +109,17 @@ custom_content: | | publish_timestamp | TimestampType | | | event_timestamp | TimestampType | Nullable | + When writing to Pub/Sub Lite, the connetor matches the following data field and data types as follows: + + | Data Field | Spark Data Type | Required | + | ---------- | --------------- | ----- | + | key | BinaryType | N | + | data | BinaryType | N | + | attributes | MapType\[StringType, ArrayType\[BinaryType\]\] | N | + | event_timestamp | TimestampType | N | + + Note that when a data field is present in the table but the data type mismatches, the connector will throw IllegalArgumentException that terminates the query. + ## Building the Connector The connector is built using Maven. Following command creates a JAR file with shaded dependencies: