Skip to content

Commit

Permalink
feat: PSL spark sql streaming utility classes (#391)
Browse files Browse the repository at this point in the history
Spark sql streaming POJOs and utility classes.

Also making com.google.cloud.pubsublite.Offset serializable, as it's needed for serialization into executor later.
  • Loading branch information
jiangmichaellll committed Dec 3, 2020
1 parent 74a8504 commit 0e4cb93
Show file tree
Hide file tree
Showing 10 changed files with 712 additions and 1 deletion.
Expand Up @@ -17,10 +17,11 @@
package com.google.cloud.pubsublite;

import com.google.auto.value.AutoValue;
import java.io.Serializable;

/** An offset in the partition. */
@AutoValue
public abstract class Offset implements Comparable<Offset> {
public abstract class Offset implements Comparable<Offset>, Serializable {
/** Create an offset. */
public static Offset of(long offset) {
return new AutoValue_Offset(offset);
Expand Down
1 change: 1 addition & 0 deletions pom.xml
Expand Up @@ -116,6 +116,7 @@
<module>google-cloud-pubsublite</module>
<module>grpc-google-cloud-pubsublite-v1</module>
<module>proto-google-cloud-pubsublite-v1</module>
<module>pubsublite-spark-sql-streaming</module>
<module>pubsublite-beam-io</module>
</modules>

Expand Down
223 changes: 223 additions & 0 deletions pubsublite-spark-sql-streaming/pom.xml
@@ -0,0 +1,223 @@
<?xml version="1.0"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsublite-parent</artifactId>
<version>0.6.6-SNAPSHOT</version><!-- {x-version-update:google-cloud-pubsublite:current} -->
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>com.google.cloud</groupId>
<artifactId>pubsublite-spark-sql-streaming</artifactId>
<version>0.6.6-SNAPSHOT</version><!-- {x-version-update:google-cloud-pubsublite:current} -->
<packaging>jar</packaging>
<name>Pub/Sub Lite Spark SQL Streaming</name>
<url>https://github.com/googleapis/java-pubsublite</url>
<description>Spark SQL Streaming Connector for Google Cloud Pub/Sub Lite</description>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>2.4.7</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_2.12</artifactId>
<version>2.4.7</version>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsublite</artifactId>
<version>0.6.6-SNAPSHOT</version><!-- {x-version-update:google-cloud-pubsublite:current} -->
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>com.google.api</groupId>
<artifactId>gax</artifactId>
</dependency>
<dependency>
<groupId>com.google.auto.value</groupId>
<artifactId>auto-value-annotations</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</dependency>
<dependency>
<groupId>com.google.auth</groupId>
<artifactId>google-auth-library-credentials</artifactId>
</dependency>
<dependency>
<groupId>com.google.auth</groupId>
<artifactId>google-auth-library-oauth2-http</artifactId>
</dependency>
<dependency>
<groupId>com.google.http-client</groupId>
<artifactId>google-http-client</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.12.12</version>
</dependency>

<!--test dependencies-->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.truth</groupId>
<artifactId>truth</artifactId>
<version>1.1</version>
<scope>test</scope>
</dependency>
</dependencies>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.16</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.10</version>
</dependency>
</dependencies>
</dependencyManagement>

<build>
<sourceDirectory>src/main/java</sourceDirectory>
<testSourceDirectory>src/test/java</testSourceDirectory>
<outputDirectory>target/classes</outputDirectory>
<testOutputDirectory>target/test-classes</testOutputDirectory>
<pluginManagement>
<plugins>
<plugin>
<groupId>com.coveo</groupId>
<artifactId>fmt-maven-plugin</artifactId>
<version>2.9</version>
<executions>
<execution>
<goals>
<goal>format</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>3.1.2</version>
<configuration>
<!-- ignore annotations for "unused but declared" warnings -->
<ignoredUnusedDeclaredDependencies>
<ignoredUnusedDeclaredDependency>com.fasterxml.jackson.core:jackson-annotations</ignoredUnusedDeclaredDependency>
<ignoredUnusedDeclaredDependency>javax.annotation:javax.annotation-api</ignoredUnusedDeclaredDependency>
</ignoredUnusedDeclaredDependencies>
</configuration>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
<compilerArgument>-Xlint:unchecked</compilerArgument>
<annotationProcessorPaths>
<path>
<groupId>com.google.auto.value</groupId>
<artifactId>auto-value</artifactId>
</path>
<path>
<groupId>com.google.auto.service</groupId>
<artifactId>auto-service-annotations</artifactId>
</path>
</annotationProcessorPaths>
</configuration>
</plugin>

</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>com.coveo</groupId>
<artifactId>fmt-maven-plugin</artifactId>
<version>2.9</version>
<executions>
<execution>
<goals>
<goal>format</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.sonatype.plugins</groupId>
<artifactId>nexus-staging-maven-plugin</artifactId>
<configuration>
<!-- TODO(jiangmichael): Remote this to deploy.-->
<skipNexusStagingDeployMojo>true</skipNexusStagingDeployMojo>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-enforcer-plugin</artifactId>
<version>3.0.0-M3</version>
<executions>
<execution>
<id>enforce</id>
<configuration>
<rules>
<requireUpperBoundDeps>
<level>WARN</level>
<excludes>
<exclude>org.checkerframework:checker-compat-qual</exclude>
</excludes>
</requireUpperBoundDeps>
<banDuplicateClasses>
<ignoreClasses>
<!-- Ignore duplicate classes due to diamond dependencies-->
<ignoreClass>org.apache.commons.logging.*</ignoreClass>
<ignoreClass>org.apache.commons.collections.*</ignoreClass>
<ignoreClass>org.apache.spark.unused.*</ignoreClass>
<ignoreClass>org.apache.hadoop.yarn.*</ignoreClass>
<ignoreClass>javax.ws.rs.*</ignoreClass>
</ignoreClasses>
<ignoreWhenIdentical>true</ignoreWhenIdentical>
</banDuplicateClasses>
</rules>
</configuration>
<goals>
<goal>enforce</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
@@ -0,0 +1,55 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.spark;

import com.google.cloud.pubsublite.internal.wire.PubsubContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

public class Constants {
public static long DEFAULT_BYTES_OUTSTANDING = 50_000_000;
public static long DEFAULT_MESSAGES_OUTSTANDING = Long.MAX_VALUE;
public static int DEFAULT_BATCH_OFFSET_RANGE = 100_000;
public static StructType DEFAULT_SCHEMA =
new StructType(
new StructField[] {
new StructField("subscription", DataTypes.StringType, false, Metadata.empty()),
new StructField("partition", DataTypes.LongType, false, Metadata.empty()),
new StructField("offset", DataTypes.LongType, false, Metadata.empty()),
new StructField("key", DataTypes.BinaryType, false, Metadata.empty()),
new StructField("data", DataTypes.BinaryType, false, Metadata.empty()),
new StructField("publish_timestamp", DataTypes.TimestampType, false, Metadata.empty()),
new StructField("event_timestamp", DataTypes.TimestampType, true, Metadata.empty()),
new StructField(
"attributes",
DataTypes.createMapType(
DataTypes.StringType, DataTypes.createArrayType(DataTypes.BinaryType)),
true,
Metadata.empty())
});

public static final PubsubContext.Framework FRAMEWORK = PubsubContext.Framework.of("SPARK");

public static String BYTES_OUTSTANDING_CONFIG_KEY =
"pubsublite.flowcontrol.byteoutstandingperpartition";
public static String MESSAGES_OUTSTANDING_CONFIG_KEY =
"pubsublite.flowcontrol.messageoutstandingperparition";
public static String SUBSCRIPTION_CONFIG_KEY = "pubsublite.subscription";
public static String CREDENTIALS_KEY_CONFIG_KEY = "gcp.credentials.key";
}
@@ -0,0 +1,59 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.spark;

import com.google.api.client.util.Base64;
import com.google.api.gax.core.CredentialsProvider;
import com.google.auth.Credentials;
import com.google.auth.oauth2.GoogleCredentials;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.UncheckedIOException;

public class PslCredentialsProvider implements CredentialsProvider {

private final Credentials credentials;

public PslCredentialsProvider(PslDataSourceOptions options) {
if (options.credentialsKey() != null) {
this.credentials = createCredentialsFromKey(options.credentialsKey());
} else {
this.credentials = createDefaultCredentials();
}
}

private static Credentials createCredentialsFromKey(String key) {
try {
return GoogleCredentials.fromStream(new ByteArrayInputStream(Base64.decodeBase64(key)));
} catch (IOException e) {
throw new UncheckedIOException("Failed to create Credentials from key", e);
}
}

public static Credentials createDefaultCredentials() {
try {
return GoogleCredentials.getApplicationDefault();
} catch (IOException e) {
throw new UncheckedIOException("Failed to create default Credentials", e);
}
}

@Override
public Credentials getCredentials() {
return credentials;
}
}

0 comments on commit 0e4cb93

Please sign in to comment.