Skip to content

Commit

Permalink
[GOBBLIN-2040] Abstract comparable watermark (#3919)
Browse files Browse the repository at this point in the history
* address comments

* use connectionmanager when httpclient is not cloesable

* [GOBBLIN-2040]Abstract comparable watermark

* address comments

---------

Co-authored-by: Zihan Li <zihli@zihli-mn2.linkedin.biz>
  • Loading branch information
ZihanLi58 and Zihan Li committed Apr 19, 2024
1 parent 62f645c commit 36cdf2b
Show file tree
Hide file tree
Showing 8 changed files with 41 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
/**
* {@link Watermark} that is also {@link Comparable}.
*/
public interface ComparableWatermark extends Watermark, Comparable<ComparableWatermark>{
public interface ComparableWatermark<V extends Comparable<V>> extends Watermark, Comparable<ComparableWatermark>{

V getValue();

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,29 @@
import org.apache.gobblin.source.extractor.Watermark;

import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;


/**
* Long based {@link ComparableWatermark} implementation.
*/
@ToString
@EqualsAndHashCode
public class LongWatermark implements ComparableWatermark {
public class LongWatermark implements ComparableWatermark<Long> {

private static final Gson GSON = new Gson();

@Getter
@Setter
private long value;

@Override
// Returns a Long object due to Java generics' requirement for object types.
// The underlying variable is maintained as a primitive long to optimize performance for mathematical operations.
public Long getValue(){
return value;
}

public LongWatermark(long value) {
this.value = value;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ private static void verifyCommitables(FineGrainedWatermarkTracker tracker, Sorte
} else {
Assert.assertEquals(uncommitted.size(), 1);
CheckpointableWatermark uncommitable = uncommitted.get("default");
Assert.assertEquals(((LongWatermark) uncommitable.getWatermark()).getValue(), (long) holes.first());
Assert.assertEquals((long)((LongWatermark) uncommitable.getWatermark()).getValue(), (long)holes.first());
}

Map<String, CheckpointableWatermark> commitables = tracker.getCommittableWatermarks();
Expand All @@ -109,9 +109,9 @@ private static void verifyCommitables(FineGrainedWatermarkTracker tracker, Sorte
Assert.assertEquals(commitables.size(), 1);
CheckpointableWatermark commitable = commitables.get("default");
if (holes.isEmpty()) {
Assert.assertEquals(((LongWatermark) commitable.getWatermark()).getValue(), maxWatermark);
Assert.assertEquals((long)((LongWatermark) commitable.getWatermark()).getValue(), maxWatermark);
} else {
Assert.assertEquals(((LongWatermark) commitable.getWatermark()).getValue(), holes.first() - 1);
Assert.assertEquals((long)((LongWatermark) commitable.getWatermark()).getValue(), holes.first() - 1);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public void testSingleSource() {
commits(watermarkTracker, "default", 0, 4, 5, 6);

Assert.assertEquals(watermarkTracker.getCommittableWatermark("default").get().getSource(), "default");
Assert.assertEquals(((LongWatermark) watermarkTracker.getCommittableWatermark("default")
Assert.assertEquals((long)((LongWatermark) watermarkTracker.getCommittableWatermark("default")
.get().getWatermark()).getValue(), 6L);
}

Expand All @@ -54,10 +54,10 @@ public void testMultiSource() {
commits(watermarkTracker, "other", 1, 3, 5, 7);

Assert.assertEquals(watermarkTracker.getCommittableWatermark("default").get().getSource(), "default");
Assert.assertEquals(((LongWatermark) watermarkTracker.getCommittableWatermark("default")
Assert.assertEquals((long)((LongWatermark) watermarkTracker.getCommittableWatermark("default")
.get().getWatermark()).getValue(), 6L);
Assert.assertEquals(watermarkTracker.getCommittableWatermark("other").get().getSource(), "other");
Assert.assertEquals(((LongWatermark) watermarkTracker.getCommittableWatermark("other")
Assert.assertEquals((long)((LongWatermark) watermarkTracker.getCommittableWatermark("other")
.get().getWatermark()).getValue(), 7L);

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,24 @@
import com.google.common.base.Preconditions;
import com.google.gson.JsonElement;

import lombok.Getter;
import org.apache.gobblin.source.extractor.ComparableWatermark;
import org.apache.gobblin.source.extractor.Watermark;
import org.apache.gobblin.source.extractor.WatermarkSerializerHelper;

import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;


/**
* String based {@link ComparableWatermark} implementation.
*/
@AllArgsConstructor
@EqualsAndHashCode
public class StringWatermark implements ComparableWatermark {
public class StringWatermark implements ComparableWatermark<String> {

@Getter
String value;
private String value;

@Override
public int compareTo(ComparableWatermark other) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,11 @@ public void testExpectedHighWatermarkNoPreviousState() throws Exception {
List<WorkUnit> workunits = Lists.newArrayList();
watermarker.onGetWorkunitsEnd(workunits);

Assert.assertEquals(watermarker.getPreviousHighWatermark(part1).getValue(), 0l);
Assert.assertEquals(watermarker.getPreviousHighWatermark(table).getValue(), 0l);
Assert.assertEquals((long)watermarker.getPreviousHighWatermark(part1).getValue(), 0l);
Assert.assertEquals((long)watermarker.getPreviousHighWatermark(table).getValue(), 0l);

Assert.assertEquals(watermarker.getPreviousHighWatermark(part2).getValue(), 0l);
Assert.assertEquals(watermarker.getPreviousHighWatermark(table2).getValue(), 0l);
Assert.assertEquals((long)watermarker.getPreviousHighWatermark(part2).getValue(), 0l);
Assert.assertEquals((long)watermarker.getPreviousHighWatermark(table2).getValue(), 0l);

Assert.assertEquals(workunits.size(), 2);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,17 +205,17 @@ public void testNonDrilldownDatasetState()
List<LongWatermark> watermarks1 = new ArrayList<>();
List<Dataset> datasets1 = new ArrayList<>();
Assert.assertEquals(workUnits.get(0).getProp(ConfigurationKeys.DATASET_URN_KEY), "dataset1");
Assert.assertEquals(workUnits.get(0).getLowWatermark(LongWatermark.class).getValue(), 0);
Assert.assertEquals((long)workUnits.get(0).getLowWatermark(LongWatermark.class).getValue(), 0);
watermarks1.add(workUnits.get(0).getExpectedHighWatermark(LongWatermark.class));
datasets1.add(dataset1);

Assert.assertEquals(workUnits.get(1).getProp(ConfigurationKeys.DATASET_URN_KEY), "dataset2");
Assert.assertEquals(workUnits.get(1).getLowWatermark(LongWatermark.class).getValue(), 0);
Assert.assertEquals((long)workUnits.get(1).getLowWatermark(LongWatermark.class).getValue(), 0);
watermarks1.add(workUnits.get(1).getExpectedHighWatermark(LongWatermark.class));
datasets1.add(dataset2);

Assert.assertEquals(workUnits.get(2).getProp(ConfigurationKeys.DATASET_URN_KEY), "dataset3");
Assert.assertEquals(workUnits.get(2).getLowWatermark(LongWatermark.class).getValue(), 0);
Assert.assertEquals((long)workUnits.get(2).getLowWatermark(LongWatermark.class).getValue(), 0);
watermarks1.add(workUnits.get(2).getExpectedHighWatermark(LongWatermark.class));
datasets1.add(dataset3);

Expand Down Expand Up @@ -244,12 +244,12 @@ public void testNonDrilldownDatasetState()

Assert.assertEquals(workUnits.size(), 3);
Assert.assertEquals(workUnits.get(0).getProp(ConfigurationKeys.DATASET_URN_KEY), "dataset4");
Assert.assertEquals(workUnits.get(0).getLowWatermark(LongWatermark.class).getValue(), 0);
Assert.assertEquals((long)workUnits.get(0).getLowWatermark(LongWatermark.class).getValue(), 0);
watermarks2.add(workUnits.get(0).getExpectedHighWatermark(LongWatermark.class));
datasets2.add(dataset4);

Assert.assertEquals(workUnits.get(1).getProp(ConfigurationKeys.DATASET_URN_KEY), "dataset5");
Assert.assertEquals(workUnits.get(1).getLowWatermark(LongWatermark.class).getValue(), 0);
Assert.assertEquals((long)workUnits.get(1).getLowWatermark(LongWatermark.class).getValue(), 0);
watermarks2.add(workUnits.get(1).getExpectedHighWatermark(LongWatermark.class));
datasets2.add(dataset5);

Expand Down Expand Up @@ -327,17 +327,17 @@ public void testDrilldownDatasetState()

Assert.assertEquals(workUnits.size(), 4);
Assert.assertEquals(workUnits.get(0).getProp(ConfigurationKeys.DATASET_URN_KEY), "dataset1");
Assert.assertEquals(workUnits.get(0).getLowWatermark(LongWatermark.class).getValue(), 0);
Assert.assertEquals((long)workUnits.get(0).getLowWatermark(LongWatermark.class).getValue(), 0);
watermarks1.add(workUnits.get(0).getExpectedHighWatermark(LongWatermark.class));
datasets1.add(dataset1);

Assert.assertEquals(workUnits.get(1).getProp(ConfigurationKeys.DATASET_URN_KEY), "dataset2@p1");
Assert.assertEquals(workUnits.get(1).getLowWatermark(LongWatermark.class).getValue(), 0);
Assert.assertEquals((long)workUnits.get(1).getLowWatermark(LongWatermark.class).getValue(), 0);
watermarks1.add(workUnits.get(1).getExpectedHighWatermark(LongWatermark.class));
datasets1.add(new SimpleDatasetForTesting("dataset2@p1"));

Assert.assertEquals(workUnits.get(2).getProp(ConfigurationKeys.DATASET_URN_KEY), "dataset2@p2");
Assert.assertEquals(workUnits.get(2).getLowWatermark(LongWatermark.class).getValue(), 0);
Assert.assertEquals((long)workUnits.get(2).getLowWatermark(LongWatermark.class).getValue(), 0);
watermarks1.add(workUnits.get(2).getExpectedHighWatermark(LongWatermark.class));
datasets1.add(new SimpleDatasetForTesting("dataset2@p2"));

Expand Down Expand Up @@ -367,17 +367,17 @@ public void testDrilldownDatasetState()

Assert.assertEquals(workUnits.size(), 4);
Assert.assertEquals(workUnits.get(0).getProp(ConfigurationKeys.DATASET_URN_KEY), "dataset2@p3");
Assert.assertEquals(workUnits.get(0).getLowWatermark(LongWatermark.class).getValue(), 0);
Assert.assertEquals((long)workUnits.get(0).getLowWatermark(LongWatermark.class).getValue(), 0);
watermarks2.add(workUnits.get(0).getExpectedHighWatermark(LongWatermark.class));
datasets2.add(new SimpleDatasetForTesting("dataset2@p3"));

Assert.assertEquals(workUnits.get(1).getProp(ConfigurationKeys.DATASET_URN_KEY), "dataset3@p1");
Assert.assertEquals(workUnits.get(1).getLowWatermark(LongWatermark.class).getValue(), 0);
Assert.assertEquals((long)workUnits.get(1).getLowWatermark(LongWatermark.class).getValue(), 0);
watermarks2.add(workUnits.get(1).getExpectedHighWatermark(LongWatermark.class));
datasets2.add(new SimpleDatasetForTesting("dataset3@p1"));

Assert.assertEquals(workUnits.get(2).getProp(ConfigurationKeys.DATASET_URN_KEY), "dataset3@p2");
Assert.assertEquals(workUnits.get(2).getLowWatermark(LongWatermark.class).getValue(), 0);
Assert.assertEquals((long)workUnits.get(2).getLowWatermark(LongWatermark.class).getValue(), 0);
watermarks2.add(workUnits.get(2).getExpectedHighWatermark(LongWatermark.class));
datasets2.add(new SimpleDatasetForTesting("dataset3@p2"));

Expand All @@ -404,7 +404,7 @@ public void testDrilldownDatasetState()

Assert.assertEquals(workUnits.size(), 2);
Assert.assertEquals(workUnits.get(0).getProp(ConfigurationKeys.DATASET_URN_KEY), "dataset3@p3");
Assert.assertEquals(workUnits.get(0).getLowWatermark(LongWatermark.class).getValue(), 0);
Assert.assertEquals((long)workUnits.get(0).getLowWatermark(LongWatermark.class).getValue(), 0);
watermarks3.add(workUnits.get(0).getExpectedHighWatermark(LongWatermark.class));
datasets3.add(new SimpleDatasetForTesting("dataset3@p3"));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public void testPersistWatermarkStateToZk() throws IOException {
ImmutableList.of("source"));

Assert.assertEquals(watermarkMap.size(), 1);
Assert.assertEquals(((LongWatermark) watermarkMap.get("source").getWatermark()).getValue(), startTime);
Assert.assertEquals((long)((LongWatermark) watermarkMap.get("source").getWatermark()).getValue(), startTime);
}

@AfterClass
Expand Down

0 comments on commit 36cdf2b

Please sign in to comment.