Skip to content

Commit

Permalink
docs: Add write support documentations (#132)
Browse files Browse the repository at this point in the history
  • Loading branch information
jiangmichaellll committed Apr 9, 2021
1 parent 1e20e48 commit b5edda6
Showing 1 changed file with 57 additions and 3 deletions.
60 changes: 57 additions & 3 deletions .readme-partials.yaml
Expand Up @@ -28,20 +28,56 @@ custom_content: |
## Usage
### Samples
There are 3 java samples (word count, simple write, simple read) under [samples](https://github.com/googleapis/java-pubsublite-spark/tree/master/samples) that shows using the connector inside Dataproc.
### Reading data from Pub/Sub Lite
Here is an example in Python:
```python
df = spark.readStream \
.option("pubsublite.subscription", "projects/$PROJECT_NUMBER/locations/$LOCATION/subscriptions/$SUBSCRIPTION_ID")
.format("pubsublite") \
.option("pubsublite.subscription", "projects/$PROJECT_NUMBER/locations/$LOCATION/subscriptions/$SUBSCRIPTION_ID") \
.load
```
Here is an example in Java:
```java
Dataset<Row> df = spark
.readStream()
.format("pubsublite")
.option("pubsublite.subscription", "projects/$PROJECT_NUMBER/locations/$LOCATION/subscriptions/$SUBSCRIPTION_ID"t )
.load();
```
Note that the connector supports both MicroBatch Processing and [Continuous Processing](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#continuous-processing).
### Writing data to Pub/Sub Lite
Here is an example in Python:
```python
df.writeStream \
.format("pubsublite") \
.option("pubsublite.topic", "projects/$PROJECT_NUMBER/locations/$LOCATION/topics/$TOPIC_ID") \
.option("checkpointLocation", "path/to/HDFS/dir")
.outputMode("complete") \
.trigger(processingTime="2 seconds") \
.start()
```
Here is an example in Java:
```java
df.writeStream()
.format("pubsublite")
.option("pubsublite.topic", "projects/$PROJECT_NUMBER/locations/$LOCATION/topics/$TOPIC_ID")
.option("checkpointLocation", "path/to/HDFS/dir")
.outputMode(OutputMode.Complete())
.trigger(Trigger.ProcessingTime(2, TimeUnit.SECONDS))
.start();
```
### Properties
The connector supports a number of options to configure the read:
When reading from Pub/Sub Lite, the connector supports a number of configuration options:
| Option | Type | Required | Default Value | Meaning |
| ------ | ---- | -------- | ------------- | ------- |
Expand All @@ -51,9 +87,16 @@ custom_content: |
| pubsublite.flowcontrol.maxmessagesperbatch | Long | N | Long.MAX | Max number of messages in micro batch. |
| gcp.credentials.key | String | N | [Application Default Credentials](https://cloud.google.com/docs/authentication/production#automatically) | Service account JSON in base64. |
When writing to Pub/Sub Lite, the connector supports a number of configuration options:
| Option | Type | Required | Default Value | Meaning |
| ------ | ---- | -------- | ------------- | ------- |
| pubsublite.topic | String | Y | | Full topic path that the connector will write to. |
| gcp.credentials.key | String | N | [Application Default Credentials](https://cloud.google.com/docs/authentication/production#automatically) | Service account JSON in base64. |
### Data Schema
The connector has fixed data schema as follows:
When reading from Pub/Sub Lite, the connector has a fixed data schema as follows:
| Data Field | Spark Data Type | Notes |
| ---------- | --------------- | ----- |
Expand All @@ -66,6 +109,17 @@ custom_content: |
| publish_timestamp | TimestampType | |
| event_timestamp | TimestampType | Nullable |
When writing to Pub/Sub Lite, the connetor matches the following data field and data types as follows:
| Data Field | Spark Data Type | Required |
| ---------- | --------------- | ----- |
| key | BinaryType | N |
| data | BinaryType | N |
| attributes | MapType\[StringType, ArrayType\[BinaryType\]\] | N |
| event_timestamp | TimestampType | N |
Note that when a data field is present in the table but the data type mismatches, the connector will throw IllegalArgumentException that terminates the query.
## Building the Connector
The connector is built using Maven. Following command creates a JAR file with shaded dependencies:
Expand Down

0 comments on commit b5edda6

Please sign in to comment.