Skip to content

Commit

Permalink
feat: Multiple fixes to make continuous mode work (#432)
Browse files Browse the repository at this point in the history
  • Loading branch information
jiangmichaellll committed Dec 21, 2020
1 parent 34d8d02 commit 9df4ccf
Show file tree
Hide file tree
Showing 8 changed files with 166 additions and 122 deletions.
5 changes: 5 additions & 0 deletions pom.xml
Expand Up @@ -39,6 +39,11 @@
<artifactId>error_prone_annotations</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>com.google.auto.service</groupId>
<artifactId>auto-service-annotations</artifactId>
<version>1.0-rc7</version>
</dependency>
<dependency>
<groupId>com.google.truth</groupId>
<artifactId>truth</artifactId>
Expand Down
132 changes: 56 additions & 76 deletions pubsublite-spark-sql-streaming/pom.xml
Expand Up @@ -14,16 +14,30 @@
<name>Pub/Sub Lite Spark SQL Streaming</name>
<url>https://github.com/googleapis/java-pubsublite</url>
<description>Spark SQL Streaming Connector for Google Cloud Pub/Sub Lite</description>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.version>2.11.12</scala.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.7</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_2.12</artifactId>
<artifactId>spark-catalyst_2.11</artifactId>
<version>2.4.7</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-unsafe_2.11</artifactId>
<version>2.4.7</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
Expand All @@ -47,6 +61,10 @@
<groupId>com.google.auto.value</groupId>
<artifactId>auto-value-annotations</artifactId>
</dependency>
<dependency>
<groupId>com.google.auto.service</groupId>
<artifactId>auto-service-annotations</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
Expand Down Expand Up @@ -89,15 +107,10 @@
<artifactId>jackson-databind</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.12.12</version>
<version>2.11.12</version>
</dependency>

<!--test dependencies-->
Expand All @@ -121,85 +134,52 @@
</dependency>
</dependencies>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.30</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.15</version>
</dependency>
</dependencies>
</dependencyManagement>

<build>
<sourceDirectory>src/main/java</sourceDirectory>
<testSourceDirectory>src/test/java</testSourceDirectory>
<outputDirectory>target/classes</outputDirectory>
<testOutputDirectory>target/test-classes</testOutputDirectory>
<pluginManagement>
<plugins>
<plugin>
<groupId>com.coveo</groupId>
<artifactId>fmt-maven-plugin</artifactId>
<version>2.9</version>
<executions>
<execution>
<goals>
<goal>format</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>3.1.2</version>
<configuration>
<!-- ignore annotations for "unused but declared" warnings -->
<ignoredUnusedDeclaredDependencies>
<ignoredUnusedDeclaredDependency>com.fasterxml.jackson.core:jackson-annotations</ignoredUnusedDeclaredDependency>
<ignoredUnusedDeclaredDependency>javax.annotation:javax.annotation-api</ignoredUnusedDeclaredDependency>
</ignoredUnusedDeclaredDependencies>
</configuration>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
<compilerArgument>-Xlint:unchecked</compilerArgument>
<annotationProcessorPaths>
<path>
<groupId>com.google.auto.value</groupId>
<artifactId>auto-value</artifactId>
</path>
<path>
<groupId>com.google.auto.service</groupId>
<artifactId>auto-service-annotations</artifactId>
</path>
</annotationProcessorPaths>
</configuration>
</plugin>

</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>com.coveo</groupId>
<artifactId>fmt-maven-plugin</artifactId>
<version>2.9</version>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>format</goal>
<goal>shade</goal>
</goals>
<configuration>
<!-- Used to merge META-INF/services-->
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/maven/**</exclude>
<exclude>META-INF/*.MF</exclude>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<relocations>
<relocation>
<pattern>com</pattern>
<shadedPattern>repackaged.com</shadedPattern>
<includes>
<include>com.google.protobuf.**</include>
<include>com.google.common.**</include>
</includes>
</relocation>
</relocations>
<shadedArtifactAttached>true</shadedArtifactAttached>
<shadedClassifierName>with-dependencies</shadedClassifierName>
</configuration>
</execution>
</executions>
</plugin>
Expand Down
Expand Up @@ -38,15 +38,17 @@ public PslCredentialsProvider(PslDataSourceOptions options) {

private static Credentials createCredentialsFromKey(String key) {
try {
return GoogleCredentials.fromStream(new ByteArrayInputStream(Base64.decodeBase64(key)));
return GoogleCredentials.fromStream(new ByteArrayInputStream(Base64.decodeBase64(key)))
.createScoped("https://www.googleapis.com/auth/cloud-platform");
} catch (IOException e) {
throw new UncheckedIOException("Failed to create Credentials from key", e);
}
}

public static Credentials createDefaultCredentials() {
try {
return GoogleCredentials.getApplicationDefault();
return GoogleCredentials.getApplicationDefault()
.createScoped("https://www.googleapis.com/auth/cloud-platform");
} catch (IOException e) {
throw new UncheckedIOException("Failed to create default Credentials", e);
}
Expand Down
Expand Up @@ -16,6 +16,7 @@

package com.google.cloud.pubsublite.spark;

import com.google.auto.service.AutoService;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.PartitionLookupUtils;
Expand All @@ -36,7 +37,8 @@
import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader;
import org.apache.spark.sql.types.StructType;

public class PslDataSource
@AutoService(DataSourceRegister.class)
public final class PslDataSource
implements DataSourceV2, ContinuousReadSupport, MicroBatchReadSupport, DataSourceRegister {

@Override
Expand Down
Expand Up @@ -16,32 +16,69 @@

package com.google.cloud.pubsublite.spark;

import static scala.collection.JavaConverters.asScalaBufferConverter;

import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.internal.CursorClient;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ListMultimap;
import com.google.protobuf.ByteString;
import com.google.protobuf.util.Timestamps;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.util.ArrayBasedMapData;
import org.apache.spark.sql.catalyst.util.GenericArrayData;
import org.apache.spark.unsafe.types.ByteArray;
import org.apache.spark.unsafe.types.UTF8String;

public class PslSparkUtils {
private static ArrayBasedMapData convertAttributesToSparkMap(
ListMultimap<String, ByteString> attributeMap) {

List<UTF8String> keyList = new ArrayList<>();
List<GenericArrayData> valueList = new ArrayList<>();

attributeMap
.asMap()
.forEach(
(key, value) -> {
keyList.add(UTF8String.fromString(key));
List<byte[]> attributeVals =
value.stream()
.map(v -> ByteArray.concat(v.toByteArray()))
.collect(Collectors.toList());
valueList.add(new GenericArrayData(asScalaBufferConverter(attributeVals).asScala()));
});

return new ArrayBasedMapData(
new GenericArrayData(asScalaBufferConverter(keyList).asScala()),
new GenericArrayData(asScalaBufferConverter(valueList).asScala()));
}

public static InternalRow toInternalRow(
SequencedMessage msg, SubscriptionPath subscription, Partition partition) {
return InternalRow.apply(
scala.collection.JavaConverters.asScalaBuffer(
ImmutableList.of(
subscription.toString(),
List<Object> list =
new ArrayList<>(
Arrays.asList(
UTF8String.fromString(subscription.toString()),
partition.value(),
msg.offset().value(),
msg.message().key(),
msg.message().data(),
msg.publishTime(),
msg.message().eventTime(),
msg.message().attributes())));
ByteArray.concat(msg.message().key().toByteArray()),
ByteArray.concat(msg.message().data().toByteArray()),
Timestamps.toMillis(msg.publishTime()),
msg.message().eventTime().isPresent()
? Timestamps.toMillis(msg.message().eventTime().get())
: null,
convertAttributesToSparkMap(msg.message().attributes())));
return InternalRow.apply(asScalaBufferConverter(list).asScala());
}

public static SparkSourceOffset toSparkSourceOffset(PslSourceOffset pslSourceOffset) {
Expand Down
Expand Up @@ -43,6 +43,12 @@ private static SequencedMessage newMessage(long offset) {
10000);
}

private static void verifyInternalRow(InternalRow row, long expectedOffset) {
assertThat(row.getString(0)).isEqualTo(UnitTestExamples.exampleSubscriptionPath().toString());
assertThat(row.getLong(1)).isEqualTo(UnitTestExamples.examplePartition().value());
assertThat(row.getLong(2)).isEqualTo(expectedOffset);
}

private void createReader() {
reader =
new PslContinuousInputPartitionReader(
Expand All @@ -58,31 +64,21 @@ private void createReader() {
public void testPartitionReader() throws Exception {
createReader();
SequencedMessage message1 = newMessage(10);
InternalRow expectedRow1 =
PslSparkUtils.toInternalRow(
message1,
UnitTestExamples.exampleSubscriptionPath(),
UnitTestExamples.examplePartition());
SequencedMessage message2 = newMessage(13);
InternalRow expectedRow2 =
PslSparkUtils.toInternalRow(
message2,
UnitTestExamples.exampleSubscriptionPath(),
UnitTestExamples.examplePartition());

// Multiple get w/o next will return same msg.
when(subscriber.onData()).thenReturn(ApiFutures.immediateFuture(null));
when(subscriber.messageIfAvailable()).thenReturn(Optional.of(message1));
assertThat(reader.next()).isTrue();
assertThat(reader.get()).isEqualTo(expectedRow1);
assertThat(reader.get()).isEqualTo(expectedRow1);
verifyInternalRow(reader.get(), 10L);
verifyInternalRow(reader.get(), 10L);
assertThat(((SparkPartitionOffset) reader.getOffset()).offset()).isEqualTo(10L);

// Next will advance to next msg.
when(subscriber.onData()).thenReturn(ApiFutures.immediateFuture(null));
when(subscriber.messageIfAvailable()).thenReturn(Optional.of(message2));
assertThat(reader.next()).isTrue();
assertThat(reader.get()).isEqualTo(expectedRow2);
verifyInternalRow(reader.get(), 13L);
assertThat(((SparkPartitionOffset) reader.getOffset()).offset()).isEqualTo(13L);
}
}

0 comments on commit 9df4ccf

Please sign in to comment.