Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs: Add write support documentations #132

Merged
merged 5 commits into from Apr 9, 2021
Merged
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
58 changes: 55 additions & 3 deletions .readme-partials.yaml
Expand Up @@ -28,20 +28,54 @@ custom_content: |

## Usage

### Samples

There are 3 java samples (word count, simple write, simple read) under [samples](https://github.com/googleapis/java-pubsublite-spark/tree/master/samples) that shows using the connector inside Dataproc.

### Reading data from Pub/Sub Lite

```python
df = spark.readStream \
.option("pubsublite.subscription", "projects/$PROJECT_NUMBER/locations/$LOCATION/subscriptions/$SUBSCRIPTION_ID")
.format("pubsublite") \
.option("pubsublite.subscription", "projects/$PROJECT_NUMBER/locations/$LOCATION/subscriptions/$SUBSCRIPTION_ID") \
.load
```

```java
Dataset<Row> df = spark
.readStream()
.format("pubsublite")
.option("pubsublite.subscription", "projects/$PROJECT_NUMBER/locations/$LOCATION/subscriptions/$SUBSCRIPTION_ID"t )
.load();
```

Note that the connector supports both MicroBatch Processing and [Continuous Processing](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#continuous-processing).

### Writing data to Pub/Sub Lite
jiangmichaellll marked this conversation as resolved.
Show resolved Hide resolved

```python
df.writeStream \
.format("pubsublite") \
.option("pubsublite.topic", "projects/$PROJECT_NUMBER/locations/$LOCATION/topics/$TOPIC_ID") \
.option("checkpointLocation", "path/to/HDFS/dir")
.outputMode("complete") \
.trigger(processingTime="2 seconds") \
.start()
```

```java
df.writeStream()
.format("pubsublite")
.option("pubsublite.topic", "projects/$PROJECT_NUMBER/locations/$LOCATION/topics/$TOPIC_ID")
.option("checkpointLocation", "path/to/HDFS/dir")
.outputMode(OutputMode.Complete())
.trigger(Trigger.ProcessingTime(2, TimeUnit.SECONDS))
.start();
```

### Properties

The connector supports a number of options to configure the read:
When reading from Pub/Sub Lite, the connector supports a number of configuration options:

| Option | Type | Required | Default Value | Meaning |
| ------ | ---- | -------- | ------------- | ------- |
Expand All @@ -51,9 +85,16 @@ custom_content: |
| pubsublite.flowcontrol.maxmessagesperbatch | Long | N | Long.MAX | Max number of messages in micro batch. |
| gcp.credentials.key | String | N | [Application Default Credentials](https://cloud.google.com/docs/authentication/production#automatically) | Service account JSON in base64. |

When writing to Pub/Sub Lite, the connector supports a number of configuration options:

| Option | Type | Required | Default Value | Meaning |
| ------ | ---- | -------- | ------------- | ------- |
| pubsublite.topic | String | Y | | Full topic path that the connector will write to. |
| gcp.credentials.key | String | N | [Application Default Credentials](https://cloud.google.com/docs/authentication/production#automatically) | Service account JSON in base64. |

### Data Schema

The connector has fixed data schema as follows:
When reading from Pub/Sub Lite, the connector has a fixed data schema as follows:

| Data Field | Spark Data Type | Notes |
| ---------- | --------------- | ----- |
Expand All @@ -66,6 +107,17 @@ custom_content: |
| publish_timestamp | TimestampType | |
| event_timestamp | TimestampType | Nullable |

When writing to Pub/Sub Lite, the connetor matches the following data field and data types as follows:

| Data Field | Spark Data Type | Required |
| ---------- | --------------- | ----- |
| key | BinaryType | N |
| data | BinaryType | N |
| attributes | MapType\[StringType, ArrayType\[BinaryType\]\] | N |
| event_timestamp | TimestampType | N |

Note that when a data field is present in the table but the data type mismatches, the connector will throw IllegalArgumentException that terminates the query.

## Building the Connector

The connector is built using Maven. Following command creates a JAR file with shaded dependencies:
Expand Down