Skip to content

Commit

Permalink
feat: use gson instead of jackson (#25)
Browse files Browse the repository at this point in the history
  • Loading branch information
jiangmichaellll committed Jan 26, 2021
1 parent 6b2d3f8 commit 64cad24
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 35 deletions.
10 changes: 2 additions & 8 deletions pom.xml
Expand Up @@ -99,20 +99,14 @@
<artifactId>api-common</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.12.1</version>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>2.8.8</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.12.1</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
Expand Down
Expand Up @@ -18,23 +18,21 @@

import static com.google.common.base.Preconditions.checkArgument;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.google.cloud.pubsublite.Partition;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import java.util.stream.Collectors;

public final class SparkSourceOffset
extends org.apache.spark.sql.sources.v2.reader.streaming.Offset {
private static final ObjectMapper objectMapper =
new ObjectMapper().configure(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, true);
private static final Gson gson = new Gson();

// Using a map to ensure unique partitions.
private final ImmutableMap<Partition, SparkPartitionOffset> partitionOffsetMap;
Expand Down Expand Up @@ -79,26 +77,17 @@ public static SparkSourceOffset merge(SparkPartitionOffset[] offsets) {
return new SparkSourceOffset(map);
}

@SuppressWarnings("unchecked")
public static SparkSourceOffset fromJson(String json) {
Map<String, Number> map;
try {
// TODO: Use TypeReference instead of Map.class, currently TypeReference breaks spark with
// java.lang.LinkageError: loader constraint violation: loader previously initiated loading
// for a different type.
map = objectMapper.readValue(json, Map.class);
} catch (IOException e) {
throw new IllegalStateException("Unable to deserialize PslSourceOffset.", e);
}
Map<Long, Long> map = gson.fromJson(json, new TypeToken<Map<Long, Long>>() {}.getType());
Map<Partition, SparkPartitionOffset> partitionOffsetMap =
map.entrySet().stream()
.collect(
Collectors.toMap(
e -> Partition.of(Long.parseLong(e.getKey())),
e -> Partition.of(e.getKey()),
e ->
SparkPartitionOffset.builder()
.partition(Partition.of(Long.parseLong(e.getKey())))
.offset(e.getValue().longValue())
.partition(Partition.of(e.getKey()))
.offset(e.getValue())
.build()));
return new SparkSourceOffset(partitionOffsetMap);
}
Expand All @@ -109,13 +98,9 @@ public Map<Partition, SparkPartitionOffset> getPartitionOffsetMap() {

@Override
public String json() {
try {
Map<Long, Long> map =
partitionOffsetMap.entrySet().stream()
.collect(Collectors.toMap(e -> e.getKey().value(), e -> e.getValue().offset()));
return objectMapper.writeValueAsString(map);
} catch (JsonProcessingException e) {
throw new IllegalStateException("Unable to serialize PslSourceOffset.", e);
}
Map<Long, Long> map =
partitionOffsetMap.entrySet().stream()
.collect(Collectors.toMap(e -> e.getKey().value(), e -> e.getValue().offset()));
return gson.toJson(new TreeMap<>(map));
}
}

0 comments on commit 64cad24

Please sign in to comment.