Skip to content

Commit

Permalink
Merge branch 'master' into readme
Browse files Browse the repository at this point in the history
  • Loading branch information
anguillanneuf committed Jan 27, 2021
2 parents 14cc3ca + 6aa40af commit f6b2b04
Show file tree
Hide file tree
Showing 10 changed files with 225 additions and 40 deletions.
7 changes: 7 additions & 0 deletions .kokoro/nightly/java7.cfg
@@ -0,0 +1,7 @@
# Format: //devtools/kokoro/config/proto/build.proto

# Configure the docker image for kokoro-trampoline.
env_vars: {
key: "TRAMPOLINE_IMAGE"
value: "gcr.io/cloud-devrel-kokoro-resources/java7"
}
3 changes: 3 additions & 0 deletions .kokoro/release/publish_javadoc.cfg
Expand Up @@ -27,3 +27,6 @@ before_action {
}
}
}

# Downloads docfx doclet resource. This will be in ${KOKORO_GFILE_DIR}/<doclet name>
gfile_resources: "/bigstore/cloud-devrel-kokoro-resources/docfx"
16 changes: 5 additions & 11 deletions pom.xml
Expand Up @@ -8,7 +8,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.google.cloud</groupId>
<artifactId>pubsublite-spark-sql-streaming</artifactId>
<version>0.1.0-SNAPSHOT</version><!-- {x-version-update:pubsublite-spark-sql-streaming:current} -->
<version>0.0.1-SNAPSHOT</version><!-- {x-version-update:pubsublite-spark-sql-streaming:current} -->
<packaging>jar</packaging>
<name>Pub/Sub Lite Spark SQL Streaming</name>
<url>https://github.com/googleapis/java-pubsublite-spark</url>
Expand Down Expand Up @@ -99,20 +99,14 @@
<artifactId>api-common</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.12.1</version>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>2.8.8</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.12.1</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
Expand All @@ -130,14 +124,14 @@
<dependency>
<groupId>com.google.truth</groupId>
<artifactId>truth</artifactId>
<version>1.1</version>
<version>1.1.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
<version>3.7.0</version>
<version>3.7.7</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
Expand Down
55 changes: 55 additions & 0 deletions samples/pom.xml
@@ -0,0 +1,55 @@
<?xml version='1.0' encoding='UTF-8'?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsublite-spark-samples</artifactId>
<version>0.0.1-SNAPSHOT</version><!-- This artifact should not be released -->
<packaging>pom</packaging>
<name>Google Pub/Sub Lite Spark Connector Samples Parent</name>
<url>https://github.com/googleapis/java-pubsublite-spark</url>
<description>
Java idiomatic client for Google Cloud Platform services.
</description>

<!--
The parent pom defines common style checks and testing strategies for our samples.
Removing or replacing it should not affect the execution of the samples in anyway.
-->
<parent>
<groupId>com.google.cloud.samples</groupId>
<artifactId>shared-configuration</artifactId>
<version>1.0.18</version>
</parent>

<properties>
<maven.compiler.target>1.8</maven.compiler.target>
<maven.compiler.source>1.8</maven.compiler.source>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<modules>
<module>snapshot</module>
<module>snippets</module>
</modules>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.8.2</version>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
<plugin>
<groupId>org.sonatype.plugins</groupId>
<artifactId>nexus-staging-maven-plugin</artifactId>
<version>1.6.8</version>
<configuration>
<skipNexusStagingDeployMojo>true</skipNexusStagingDeployMojo>
</configuration>
</plugin>
</plugins>
</build>
</project>
83 changes: 83 additions & 0 deletions samples/snapshot/pom.xml
@@ -0,0 +1,83 @@
<?xml version='1.0' encoding='UTF-8'?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.google.cloud</groupId>
<artifactId>pubsublite-spark-snapshot</artifactId>
<packaging>jar</packaging>
<name>Google Pub/Sub Lite Spark Connector Snapshot Samples</name>
<url>https://github.com/googleapis/java-pubsublite-spark</url>

<!--
The parent pom defines common style checks and testing strategies for our samples.
Removing or replacing it should not affect the execution of the samples in anyway.
-->
<parent>
<groupId>com.google.cloud.samples</groupId>
<artifactId>shared-configuration</artifactId>
<version>1.0.12</version>
</parent>

<properties>
<maven.compiler.target>1.8</maven.compiler.target>
<maven.compiler.source>1.8</maven.compiler.source>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<!-- {x-version-update-start:pubsublite-spark:current} -->
<dependencies>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>pubsublite-spark-sql-streaming</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<!-- {x-version-update-end} -->

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.truth</groupId>
<artifactId>truth</artifactId>
<version>1.0.1</version>
<scope>test</scope>
</dependency>
</dependencies>

<!-- compile and run all snippet tests -->
<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>3.1.0</version>
<executions>
<execution>
<id>add-snippets-source</id>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>../snippets/src/main/java</source>
</sources>
</configuration>
</execution>
<execution>
<id>add-snippets-tests</id>
<goals>
<goal>add-test-source</goal>
</goals>
<configuration>
<sources>
<source>../snippets/src/test/java</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
51 changes: 51 additions & 0 deletions samples/snippets/pom.xml
@@ -0,0 +1,51 @@
<?xml version='1.0' encoding='UTF-8'?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.google.cloud</groupId>
<artifactId>pubsublite-spark-snippets</artifactId>
<packaging>jar</packaging>
<name>Google Pub/Sub Lite Spark Connector Snippets</name>
<url>https://github.com/googleapis/java-pubsublite-spark</url>

<!--
The parent pom defines common style checks and testing strategies for our samples.
Removing or replacing it should not affect the execution of the samples in anyway.
-->
<parent>
<groupId>com.google.cloud.samples</groupId>
<artifactId>shared-configuration</artifactId>
<version>1.0.12</version>
</parent>

<properties>
<maven.compiler.target>1.8</maven.compiler.target>
<maven.compiler.source>1.8</maven.compiler.source>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>
<!-- TODO: switch to libraries-bom after this artifact is included -->
<!-- [START pubsublite-spark_install_without_bom] -->
<!-- [START pubsublite-spark_java_dependencies] -->
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>pubsublite-spark-sql-streaming</artifactId>
<version>0.0.0</version>
</dependency>
<!-- [END pubsublite-spark_java_dependencies] -->
<!-- [END pubsublite-spark_install_without_bom] -->

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.truth</groupId>
<artifactId>truth</artifactId>
<version>1.0.1</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Expand Up @@ -18,23 +18,21 @@

import static com.google.common.base.Preconditions.checkArgument;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.google.cloud.pubsublite.Partition;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import java.util.stream.Collectors;

public final class SparkSourceOffset
extends org.apache.spark.sql.sources.v2.reader.streaming.Offset {
private static final ObjectMapper objectMapper =
new ObjectMapper().configure(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, true);
private static final Gson gson = new Gson();

// Using a map to ensure unique partitions.
private final ImmutableMap<Partition, SparkPartitionOffset> partitionOffsetMap;
Expand Down Expand Up @@ -79,26 +77,17 @@ public static SparkSourceOffset merge(SparkPartitionOffset[] offsets) {
return new SparkSourceOffset(map);
}

@SuppressWarnings("unchecked")
public static SparkSourceOffset fromJson(String json) {
Map<String, Number> map;
try {
// TODO: Use TypeReference instead of Map.class, currently TypeReference breaks spark with
// java.lang.LinkageError: loader constraint violation: loader previously initiated loading
// for a different type.
map = objectMapper.readValue(json, Map.class);
} catch (IOException e) {
throw new IllegalStateException("Unable to deserialize PslSourceOffset.", e);
}
Map<Long, Long> map = gson.fromJson(json, new TypeToken<Map<Long, Long>>() {}.getType());
Map<Partition, SparkPartitionOffset> partitionOffsetMap =
map.entrySet().stream()
.collect(
Collectors.toMap(
e -> Partition.of(Long.parseLong(e.getKey())),
e -> Partition.of(e.getKey()),
e ->
SparkPartitionOffset.builder()
.partition(Partition.of(Long.parseLong(e.getKey())))
.offset(e.getValue().longValue())
.partition(Partition.of(e.getKey()))
.offset(e.getValue())
.build()));
return new SparkSourceOffset(partitionOffsetMap);
}
Expand All @@ -109,13 +98,9 @@ public Map<Partition, SparkPartitionOffset> getPartitionOffsetMap() {

@Override
public String json() {
try {
Map<Long, Long> map =
partitionOffsetMap.entrySet().stream()
.collect(Collectors.toMap(e -> e.getKey().value(), e -> e.getValue().offset()));
return objectMapper.writeValueAsString(map);
} catch (JsonProcessingException e) {
throw new IllegalStateException("Unable to serialize PslSourceOffset.", e);
}
Map<Long, Long> map =
partitionOffsetMap.entrySet().stream()
.collect(Collectors.toMap(e -> e.getKey().value(), e -> e.getValue().offset()));
return gson.toJson(new TreeMap<>(map));
}
}
2 changes: 1 addition & 1 deletion synth.metadata
Expand Up @@ -4,7 +4,7 @@
"git": {
"name": ".",
"remote": "https://github.com/googleapis/java-pubsublite-spark.git",
"sha": "fd0572c74e21fd119dc8c34d1377e4c2a53228e2"
"sha": "6b2d3f80ff3485f747cc28be07383c4f25cbf958"
}
},
{
Expand Down
5 changes: 4 additions & 1 deletion synth.py
Expand Up @@ -16,4 +16,7 @@

import synthtool.languages.java as java

java.common_templates()
java.common_templates(excludes=[
# TODO: allow when pubsublite-spark is available in libraries-bom
'samples/install-without-bom/*',
])
4 changes: 4 additions & 0 deletions versions.txt
@@ -0,0 +1,4 @@
# Format:
# module:released-version:current-version

pubsublite-spark-sql-streaming:0.0.0:0.0.1-SNAPSHOT

0 comments on commit f6b2b04

Please sign in to comment.