/
.readme-partials.yaml
111 lines (78 loc) · 5.08 KB
/
.readme-partials.yaml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
custom_content: |
## Requirements
### Creating a new subscription or using an existing subscription
Follow [the instruction](https://cloud.google.com/pubsub/lite/docs/quickstart#create_a_lite_subscription) to create a new subscription or use an existing subscription. If using an existing subscription, the connector will read from the oldest unacknowledged message in the subscription.
### Creating a Google Cloud Dataproc cluster (Optional)
If you do not have an Apache Spark environment, you can create a [Cloud Dataproc](https://cloud.google.com/dataproc/docs) cluster with pre-configured auth. The following examples assume you are using Cloud Dataproc, but you can use `spark-submit` on any cluster.
```
MY_CLUSTER=...
gcloud dataproc clusters create "$MY_CLUSTER"
```
## Downloading and Using the Connector
<!--- TODO(jiangmichael): Add jar link for spark-pubsublite-latest.jar -->
The latest version connector of the connector (Scala 2.11) will be publicly available in `gs://spark-lib/pubsublite/spark-pubsublite-latest.jar`.
<!--- TODO(jiangmichael): Release on Maven Central and add Maven Central link -->
The connector will also be available from the Maven Central repository. It can be used using the `--packages` option or the `spark.jars.packages` configuration property.
<!--
| Scala version | Connector Artifact |
| --- | --- |
| Scala 2.11 | `com.google.cloud.pubsublite.spark:pubsublite-spark-sql-streaming-with-dependencies_2.11:0.1.0` |
-->
<!--- TODO(jiangmichael): Add exmaple code and brief description here -->
## Usage
### Reading data from Pub/Sub Lite
```python
df = spark.readStream \
.option("pubsublite.subscription", "projects/$PROJECT_NUMBER/locations/$LOCATION/subscriptions/$SUBSCRIPTION_ID")
.format("pubsublite") \
.load
```
Note that the connector supports both MicroBatch Processing and [Continuous Processing](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#continuous-processing).
### Properties
The connector supports a number of options to configure the read:
| Option | Type | Required | Default Value | Meaning |
| ------ | ---- | -------- | ------------- | ------- |
| pubsublite.subscription | String | Y | | Full subscription path that the connector will read from. |
| pubsublite.flowcontrol.byteoutstandingperpartition | Long | N | 50_000_000 | Max number of bytes per partition that will be cached in workers before Spark processes the messages. |
| pubsublite.flowcontrol.messageoutstandingperpartition | Long | N | Long.MAX | Max number of messages per partition that will be cached in workers before Spark processes the messages. |
| pubsublite.flowcontrol.maxmessagesperbatch | Long | N | Long.MAX | Max number of messages in micro batch. |
| gcp.credentials.key | String | N | [Application Default Credentials](https://cloud.google.com/docs/authentication/production#automatically) | Service account JSON in base64. |
### Data Schema
The connector has fixed data schema as follows:
| Data Field | Spark Data Type | Notes |
| ---------- | --------------- | ----- |
| subscription | StringType | Full subscription path |
| partition | LongType | |
| offset | LongType | |
| key | BinaryType | |
| data | BinaryType | |
| attributes | MapType\[StringType, ArrayType\[BinaryType\]\] | |
| publish_timestamp | TimestampType | |
| event_timestamp | TimestampType | Nullable |
## Building the Connector
The connector is built using Maven. Following command creates a JAR file with shaded dependencies:
```sh
mvn package
```
## FAQ
### What is the cost for the Pub/Sub Lite?
See the [Pub/Sub Lite pricing documentation](https://cloud.google.com/pubsub/lite/pricing).
### Can I configure the number of Spark partitions?
No, the number of Spark partitions is set to be the number of Pub/Sub Lite partitions of the topic that the subscription is attached to.
### How do I authenticate outside Cloud Compute Engine / Cloud Dataproc?
Use a service account JSON key and `GOOGLE_APPLICATION_CREDENTIALS` as described [here](https://cloud.google.com/docs/authentication/getting-started).
Credentials can be provided with `gcp.credentials.key` option, it needs to be passed in as a base64-encoded string.
Example:
```java
spark.readStream.format("pubsublite").option("gcp.credentials.key", "<SERVICE_ACCOUNT_JSON_IN_BASE64>")
```
about: |
[Google Cloud Pub/Sub Lite][product-docs] is a zonal, real-time messaging
service that lets you send and receive messages between independent
applications. You can manually configure the throughput and storage capacity
for Pub/Sub Lite systems.
The Pub/Sub Lite Spark connector supports Pub/Sub Lite as an input source to
Apache Spark Structured Streaming in both the default micro-batch processing
mode and the _experimental_ continous processing mode. The connector works in
all Apache Spark distributions, including [Google Cloud Dataproc](https://cloud.google.com/dataproc/docs/)
and manual Spark installations.