From 64cad24dab014ae9bd64abc833c8f744b039e95a Mon Sep 17 00:00:00 2001 From: jiangmichaellll <40044148+jiangmichaellll@users.noreply.github.com> Date: Tue, 26 Jan 2021 17:42:35 -0500 Subject: [PATCH] feat: use gson instead of jackson (#25) --- pom.xml | 10 +---- .../pubsublite/spark/SparkSourceOffset.java | 39 ++++++------------- 2 files changed, 14 insertions(+), 35 deletions(-) diff --git a/pom.xml b/pom.xml index d34eefe6..b753bd77 100644 --- a/pom.xml +++ b/pom.xml @@ -99,20 +99,14 @@ api-common - com.fasterxml.jackson.core - jackson-core - 2.12.1 + com.google.code.gson + gson com.github.ben-manes.caffeine caffeine 2.8.8 - - com.fasterxml.jackson.core - jackson-databind - 2.12.1 - org.scala-lang scala-library diff --git a/src/main/java/com/google/cloud/pubsublite/spark/SparkSourceOffset.java b/src/main/java/com/google/cloud/pubsublite/spark/SparkSourceOffset.java index 4dcfc87f..98a37b2c 100644 --- a/src/main/java/com/google/cloud/pubsublite/spark/SparkSourceOffset.java +++ b/src/main/java/com/google/cloud/pubsublite/spark/SparkSourceOffset.java @@ -18,23 +18,21 @@ import static com.google.common.base.Preconditions.checkArgument; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.SerializationFeature; import com.google.cloud.pubsublite.Partition; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import java.io.IOException; +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Objects; +import java.util.TreeMap; import java.util.stream.Collectors; public final class SparkSourceOffset extends org.apache.spark.sql.sources.v2.reader.streaming.Offset { - private static final ObjectMapper objectMapper = - new ObjectMapper().configure(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, true); + private static final Gson gson = new Gson(); // Using a map to ensure unique partitions. private final ImmutableMap partitionOffsetMap; @@ -79,26 +77,17 @@ public static SparkSourceOffset merge(SparkPartitionOffset[] offsets) { return new SparkSourceOffset(map); } - @SuppressWarnings("unchecked") public static SparkSourceOffset fromJson(String json) { - Map map; - try { - // TODO: Use TypeReference instead of Map.class, currently TypeReference breaks spark with - // java.lang.LinkageError: loader constraint violation: loader previously initiated loading - // for a different type. - map = objectMapper.readValue(json, Map.class); - } catch (IOException e) { - throw new IllegalStateException("Unable to deserialize PslSourceOffset.", e); - } + Map map = gson.fromJson(json, new TypeToken>() {}.getType()); Map partitionOffsetMap = map.entrySet().stream() .collect( Collectors.toMap( - e -> Partition.of(Long.parseLong(e.getKey())), + e -> Partition.of(e.getKey()), e -> SparkPartitionOffset.builder() - .partition(Partition.of(Long.parseLong(e.getKey()))) - .offset(e.getValue().longValue()) + .partition(Partition.of(e.getKey())) + .offset(e.getValue()) .build())); return new SparkSourceOffset(partitionOffsetMap); } @@ -109,13 +98,9 @@ public Map getPartitionOffsetMap() { @Override public String json() { - try { - Map map = - partitionOffsetMap.entrySet().stream() - .collect(Collectors.toMap(e -> e.getKey().value(), e -> e.getValue().offset())); - return objectMapper.writeValueAsString(map); - } catch (JsonProcessingException e) { - throw new IllegalStateException("Unable to serialize PslSourceOffset.", e); - } + Map map = + partitionOffsetMap.entrySet().stream() + .collect(Collectors.toMap(e -> e.getKey().value(), e -> e.getValue().offset())); + return gson.toJson(new TreeMap<>(map)); } }