Skip to content

Commit

Permalink
[INLONG-10213][SDK] SortSDK support unified sort configuration (#10219)
Browse files Browse the repository at this point in the history
* [INLONG-10213][SDK] SortSDK support unified sort configuration
  • Loading branch information
vernedeng committed May 15, 2024
1 parent c5dbfab commit d770484
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class DataFlowConfig implements Serializable {

private String dataflowId;
private Integer version;
private String auditTag;
private String inlongGroupId;
private String inlongStreamId;
private SourceConfig sourceConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,7 @@ public class PulsarClusterConfig extends MqClusterConfig {

@JsonInclude(JsonInclude.Include.NON_NULL)
private String serviceUrl;

@JsonInclude(JsonInclude.Include.NON_NULL)
private String token;
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerExc
CommonBeanUtils.copyProperties(pulsarCluster, PulsarClusterConfig::new);
pulsarClusterConfig.setVersion(String.valueOf(pulsarCluster.getVersion()));
pulsarClusterConfig.setClusterName(pulsarCluster.getName());
pulsarClusterConfig.setServiceUrl(pulsarCluster.getUrl());
list.add(pulsarClusterConfig);
}
clusterConfigEntity.setConfigParams(objectMapper.writeValueAsString(list));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ private DataFlowConfig getDataFlowConfig(InlongGroupInfo groupInfo, InlongStream
return DataFlowConfig.builder()
.dataflowId(String.valueOf(sink.getId()))
.sourceConfig(getSourceConfig(groupInfo, streamInfo, sink))
.auditTag(String.valueOf(sink.getId()))
.sinkConfig(getSinkConfig(sink))
.inlongGroupId(groupInfo.getInlongGroupId())
.inlongStreamId(streamInfo.getInlongStreamId())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,29 @@

package org.apache.inlong.sort.standalone.config.holder.v2;

import org.apache.inlong.common.pojo.sort.SortClusterConfig;
import org.apache.inlong.common.pojo.sort.SortConfig;
import org.apache.inlong.common.pojo.sort.SortTaskConfig;
import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig;
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigType;
import org.apache.inlong.sort.standalone.config.loader.v2.ClassResourceSortClusterConfigLoader;
import org.apache.inlong.sort.standalone.config.loader.v2.ManagerSortClusterConfigLoader;
import org.apache.inlong.sort.standalone.config.loader.v2.SortConfigLoader;
import org.apache.inlong.sort.standalone.config.pojo.InlongId;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ClassUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flume.Context;

import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.stream.Collectors;

import static org.apache.inlong.sort.standalone.utils.Constants.RELOAD_INTERVAL;

Expand All @@ -45,9 +52,10 @@ public class SortConfigHolder {
private Timer reloadTimer;
private SortConfigLoader loader;
private SortConfig config;
private Map<String, Map<String, String>> auditTagMap;

private SortConfigHolder() {

this.auditTagMap = new HashMap<>();
}

private static SortConfigHolder get() {
Expand Down Expand Up @@ -110,9 +118,21 @@ public void run() {
private void reload() {
try {
SortConfig newConfig = this.loader.load();
if (newConfig != null) {
this.config = newConfig;
if (newConfig == null) {
return;
}

// <SortTaskName, <InlongId, AuditTag>>
this.auditTagMap = newConfig.getTasks()
.stream()
.collect(Collectors.toMap(SortTaskConfig::getSortTaskName,
v -> v.getClusters()
.stream()
.map(SortClusterConfig::getDataFlowConfigs)
.flatMap(Collection::stream)
.collect(Collectors.toMap(flow -> InlongId.generateUid(flow.getInlongGroupId(),
flow.getInlongStreamId()),
DataFlowConfig::getAuditTag))));
} catch (Throwable e) {
log.error("failed to reload sort config", e);
}
Expand All @@ -133,4 +153,15 @@ public static SortTaskConfig getTaskConfig(String sortTaskName) {
}
return null;
}

public static String getAuditTag(
String sortTaskName,
String inlongGroupId,
String inlongStreamId) {
Map<String, String> taskMap = get().auditTagMap.get(sortTaskName);
if (taskMap == null) {
return null;
}
return taskMap.get(InlongId.generateUid(inlongGroupId, inlongStreamId));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sort.standalone.config.loader;

import org.apache.inlong.common.pojo.sort.SortClusterConfig;
import org.apache.inlong.common.pojo.sort.SortTaskConfig;
import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig;
import org.apache.inlong.common.pojo.sort.mq.MqClusterConfig;
import org.apache.inlong.common.pojo.sort.mq.PulsarClusterConfig;
import org.apache.inlong.sdk.sort.api.ClientContext;
import org.apache.inlong.sdk.sort.api.InlongTopicTypeEnum;
import org.apache.inlong.sdk.sort.api.QueryConsumeConfig;
import org.apache.inlong.sdk.sort.entity.CacheZoneCluster;
import org.apache.inlong.sdk.sort.entity.ConsumeConfig;
import org.apache.inlong.sdk.sort.entity.InLongTopic;
import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder;

import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.stream.Collectors;

@Slf4j
public class SortConfigQueryConsumeConfig implements QueryConsumeConfig {

private List<InLongTopic> subscribedTopic = new ArrayList<>();

public void reload() {

}

@Override
public ConsumeConfig queryCurrentConsumeConfig(String sortTaskId) {
SortTaskConfig taskConfig = SortConfigHolder.getTaskConfig(sortTaskId);
List<InLongTopic> topics = taskConfig.getClusters()
.stream()
.map(this::parseTopics)
.flatMap(Collection::stream)
.collect(Collectors.toList());

return new ConsumeConfig(topics);
}

public List<InLongTopic> parseTopics(SortClusterConfig clusterConfig) {
List<InLongTopic> topics = new ArrayList<>();
List<MqClusterConfig> mqClusterConfigs = clusterConfig.getMqClusterConfigs();
List<DataFlowConfig> dataFlowConfigs = clusterConfig.getDataFlowConfigs();
for (MqClusterConfig mq : mqClusterConfigs) {
for (DataFlowConfig flow : dataFlowConfigs) {
InLongTopic topic = new InLongTopic();
topic.setInLongCluster(this.parseCacheZone(mq));
topic.setTopic(flow.getSourceConfig().getTopic());
// only supports pulsar now
topic.setTopicType(InlongTopicTypeEnum.PULSAR.getName());
topic.setProperties(flow.getProperties() != null ? flow.getProperties() : new HashMap<>());
topics.add(topic);
}
}
return topics;
}

public CacheZoneCluster parseCacheZone(MqClusterConfig mqClusterConfig) {
PulsarClusterConfig pulsarClusterConfig = (PulsarClusterConfig) mqClusterConfig;
return new CacheZoneCluster(pulsarClusterConfig.getClusterName(),
pulsarClusterConfig.getServiceUrl(), pulsarClusterConfig.getToken());
}

@Override
public void configure(ClientContext context) {

}
}

0 comments on commit d770484

Please sign in to comment.