/
.readme-partials.yaml
161 lines (122 loc) · 7.34 KB
/
.readme-partials.yaml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
custom_content: |
## Requirements
### Creating a new subscription or using an existing subscription
Follow [the instruction](https://cloud.google.com/pubsub/lite/docs/quickstart#create_a_lite_subscription) to create a new subscription or use an existing subscription. If using an existing subscription, the connector will read from the oldest unacknowledged message in the subscription.
### Creating a Google Cloud Dataproc cluster (Optional)
If you do not have an Apache Spark environment, you can create a [Cloud Dataproc](https://cloud.google.com/dataproc/docs) cluster with pre-configured auth. The following examples assume you are using Cloud Dataproc, but you can use `spark-submit` on any cluster.
```
MY_CLUSTER=...
gcloud dataproc clusters create "$MY_CLUSTER"
```
## Downloading and Using the Connector
The latest version of the connector is publicly available in the following link:
| Connector version | Link |
| --- | --- |
| 0.1.0 | `gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-0.1.0-with-dependencies.jar`([HTTP link](https://storage.googleapis.com/spark-lib/pubsublite/pubsublite-spark-sql-streaming-0.1.0-with-dependencies.jar)) |
The connector is also available from the [Maven Central repository](https://search.maven.org/artifact/com.google.cloud/pubsublite-spark-sql-streaming). You can download and pass it in the `--jars` option when using the `spark-submit` command.
## Compatibility
| Connector version | Spark version |
| --- | --- |
| 0.1.0 | 2.4.X |
## Usage
### Samples
There are 3 java samples (word count, simple write, simple read) under [samples](https://github.com/googleapis/java-pubsublite-spark/tree/master/samples) that shows using the connector inside Dataproc.
### Reading data from Pub/Sub Lite
Here is an example in Python:
```python
df = spark.readStream \
.format("pubsublite") \
.option("pubsublite.subscription", "projects/$PROJECT_NUMBER/locations/$LOCATION/subscriptions/$SUBSCRIPTION_ID") \
.load
```
Here is an example in Java:
```java
Dataset<Row> df = spark
.readStream()
.format("pubsublite")
.option("pubsublite.subscription", "projects/$PROJECT_NUMBER/locations/$LOCATION/subscriptions/$SUBSCRIPTION_ID"t )
.load();
```
Note that the connector supports both MicroBatch Processing and [Continuous Processing](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#continuous-processing).
### Writing data to Pub/Sub Lite
Here is an example in Python:
```python
df.writeStream \
.format("pubsublite") \
.option("pubsublite.topic", "projects/$PROJECT_NUMBER/locations/$LOCATION/topics/$TOPIC_ID") \
.option("checkpointLocation", "path/to/HDFS/dir")
.outputMode("complete") \
.trigger(processingTime="2 seconds") \
.start()
```
Here is an example in Java:
```java
df.writeStream()
.format("pubsublite")
.option("pubsublite.topic", "projects/$PROJECT_NUMBER/locations/$LOCATION/topics/$TOPIC_ID")
.option("checkpointLocation", "path/to/HDFS/dir")
.outputMode(OutputMode.Complete())
.trigger(Trigger.ProcessingTime(2, TimeUnit.SECONDS))
.start();
```
### Properties
When reading from Pub/Sub Lite, the connector supports a number of configuration options:
| Option | Type | Required | Default Value | Meaning |
| ------ | ---- | -------- | ------------- | ------- |
| pubsublite.subscription | String | Y | | Full subscription path that the connector will read from. |
| pubsublite.flowcontrol.byteoutstandingperpartition | Long | N | 50_000_000 | Max number of bytes per partition that will be cached in workers before Spark processes the messages. |
| pubsublite.flowcontrol.messageoutstandingperpartition | Long | N | Long.MAX | Max number of messages per partition that will be cached in workers before Spark processes the messages. |
| pubsublite.flowcontrol.maxmessagesperbatch | Long | N | Long.MAX | Max number of messages in micro batch. |
| gcp.credentials.key | String | N | [Application Default Credentials](https://cloud.google.com/docs/authentication/production#automatically) | Service account JSON in base64. |
When writing to Pub/Sub Lite, the connector supports a number of configuration options:
| Option | Type | Required | Default Value | Meaning |
| ------ | ---- | -------- | ------------- | ------- |
| pubsublite.topic | String | Y | | Full topic path that the connector will write to. |
| gcp.credentials.key | String | N | [Application Default Credentials](https://cloud.google.com/docs/authentication/production#automatically) | Service account JSON in base64. |
### Data Schema
When reading from Pub/Sub Lite, the connector has a fixed data schema as follows:
| Data Field | Spark Data Type | Notes |
| ---------- | --------------- | ----- |
| subscription | StringType | Full subscription path |
| partition | LongType | |
| offset | LongType | |
| key | BinaryType | |
| data | BinaryType | |
| attributes | MapType\[StringType, ArrayType\[BinaryType\]\] | |
| publish_timestamp | TimestampType | |
| event_timestamp | TimestampType | Nullable |
When writing to Pub/Sub Lite, the connetor matches the following data field and data types as follows:
| Data Field | Spark Data Type | Required |
| ---------- | --------------- | ----- |
| key | BinaryType | N |
| data | BinaryType | N |
| attributes | MapType\[StringType, ArrayType\[BinaryType\]\] | N |
| event_timestamp | TimestampType | N |
Note that when a data field is present in the table but the data type mismatches, the connector will throw IllegalArgumentException that terminates the query.
## Building the Connector
The connector is built using Maven. Following command creates a JAR file with shaded dependencies:
```sh
mvn package
```
## FAQ
### What is the cost for the Pub/Sub Lite?
See the [Pub/Sub Lite pricing documentation](https://cloud.google.com/pubsub/lite/pricing).
### Can I configure the number of Spark partitions?
No, the number of Spark partitions is set to be the number of Pub/Sub Lite partitions of the topic that the subscription is attached to.
### How do I authenticate outside Cloud Compute Engine / Cloud Dataproc?
Use a service account JSON key and `GOOGLE_APPLICATION_CREDENTIALS` as described [here](https://cloud.google.com/docs/authentication/getting-started).
Credentials can be provided with `gcp.credentials.key` option, it needs to be passed in as a base64-encoded string.
Example:
```java
spark.readStream.format("pubsublite").option("gcp.credentials.key", "<SERVICE_ACCOUNT_JSON_IN_BASE64>")
```
about: |
[Google Cloud Pub/Sub Lite][product-docs] is a zonal, real-time messaging
service that lets you send and receive messages between independent
applications. You can manually configure the throughput and storage capacity
for Pub/Sub Lite systems.
The Pub/Sub Lite Spark connector supports Pub/Sub Lite as an input source to
Apache Spark Structured Streaming in both the default micro-batch processing
mode and the _experimental_ continous processing mode. The connector works in
all Apache Spark distributions, including [Google Cloud Dataproc](https://cloud.google.com/dataproc/docs/)
and manual Spark installations.