Skip to content

Commit

Permalink
[GOBBLIN-2068]Make offset range in Gobblin Metadata pipeline configur…
Browse files Browse the repository at this point in the history
…able (#3949)

* address comments

* use connectionmanager when httpclient is not cloesable

* [GOBBLIN-2068]Make offset range in Gobblin Metadata pipeline configurable

* address comments

* change variable name

---------

Co-authored-by: Zihan Li <zihli@zihli-mn2.linkedin.biz>
  • Loading branch information
ZihanLi58 and Zihan Li committed May 16, 2024
1 parent 2b2769f commit 83c325d
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,11 @@ public void publishData(Collection<? extends WorkUnitState> states) throws IOExc
}

protected Map<String, String> getPartitionOffsetRange(String offsetKey) {
return state.getPropAsList(offsetKey)
.stream()
.collect(Collectors.toMap(s -> s.split(MAP_DELIMITER_KEY)[0], s -> s.split(MAP_DELIMITER_KEY)[1]));
if (state.contains(offsetKey)) {
return state.getPropAsList(offsetKey).stream()
.collect(Collectors.toMap(s -> s.split(MAP_DELIMITER_KEY)[0], s -> s.split(MAP_DELIMITER_KEY)[1]));
}
return null;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,17 @@ public static List<KafkaPartition> getPartitions(State state) {
if (!state.contains(KafkaUtils.getPartitionPropName(KafkaSource.PARTITION_ID, i))) {
break;
}
KafkaPartition partition = new KafkaPartition.Builder().withTopicName(topicName)
.withId(state.getPropAsInt(KafkaUtils.getPartitionPropName(KafkaSource.PARTITION_ID, i)))
.withLeaderId(state.getPropAsInt(KafkaUtils.getPartitionPropName(KafkaSource.LEADER_ID, i)))
.withLeaderHostAndPort(state.getProp(KafkaUtils.getPartitionPropName(KafkaSource.LEADER_HOSTANDPORT, i)))
.build();
KafkaPartition.Builder builder = new KafkaPartition.Builder().withTopicName(topicName)
.withId(state.getPropAsInt(KafkaUtils.getPartitionPropName(KafkaSource.PARTITION_ID, i)));
String partitionLeaderIdProperName = KafkaUtils.getPartitionPropName(KafkaSource.LEADER_ID, i);
String partitionLeaderHostPortProperName = KafkaUtils.getPartitionPropName(KafkaSource.LEADER_HOSTANDPORT, i);
if (state.contains(partitionLeaderIdProperName)) {
builder = builder.withLeaderId(state.getPropAsInt(partitionLeaderIdProperName));
}
if (state.contains(partitionLeaderHostPortProperName)) {
builder = builder.withLeaderHostAndPort(state.getProp(partitionLeaderHostPortProperName));
}
KafkaPartition partition = builder.build();
partitions.add(partition);
}
if (partitions.isEmpty()) {
Expand Down

0 comments on commit 83c325d

Please sign in to comment.