Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-7663][Manager] Support agent report new fields and automatically issue tasks #7664

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.pojo.common;

import lombok.Data;

@Data
public class FieldTypeMapperInfo {

private String sourceType;
private String targetType;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.pojo.sort.util;

import lombok.Data;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.shaded.jackson2.org.yaml.snakeyaml.Yaml;
import org.springframework.stereotype.Component;

import java.io.FileInputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

/**
* Util for field type convert.
*/
@Data
@Component
public class FieldTypeUtils {
fuweng11 marked this conversation as resolved.
Show resolved Hide resolved

private static String FIELD_TYPE_CONFIG_YAML = "FieldMapper.yml";
private static String SOUCE_TYPE = "sourceType";
private static String TARGET_TYPE = "targetType";

public Map<String, String> getTypeConvertMap(String sinkType, boolean isSink) throws Exception {
Map<String, String> typeMap = new HashMap<>();
String yamlName = isSink ? "sink" + FIELD_TYPE_CONFIG_YAML : "stream" + FIELD_TYPE_CONFIG_YAML;
String path = Objects.requireNonNull(Thread.currentThread().getContextClassLoader().getResource("")).getPath()
+ yamlName;
Yaml yaml = new Yaml();
Map<String, Map<String, String>> map = yaml.load(new FileInputStream(path));
List<Map<String, String>> list = (List<Map<String, String>>) map.get(sinkType);
list.forEach(s -> {
typeMap.put(s.get(SOUCE_TYPE), s.get(TARGET_TYPE));
});
return typeMap;
}

public String getField(String sinkType, String fieldType, boolean isSink) throws Exception {
Map<String, String> typeMap = getTypeConvertMap(sinkType, isSink);
String targetFieldType = typeMap.get(fieldType);
return StringUtils.isNotBlank(targetFieldType) ? targetFieldType : "string";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.pojo.stream;

import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import javax.validation.constraints.NotEmpty;
import java.util.List;

/**
* Request of add fields
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class AddFieldsRequest {

@ApiModelProperty("Task ID, same as the ID of stream_source")
private Integer id;

@ApiModelProperty("Fields need to be added")
@NotEmpty(message = "fields cannot be null")
private List<StreamField> fields;

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.inlong.common.pojo.agent.TaskResult;
import org.apache.inlong.common.pojo.agent.TaskSnapshotRequest;
import org.apache.inlong.manager.pojo.cluster.agent.AgentClusterNodeBindGroupRequest;
import org.apache.inlong.manager.pojo.stream.AddFieldsRequest;

/**
* The service interface for agent
Expand Down Expand Up @@ -57,4 +58,12 @@ public interface AgentService {
* @return Whether succeed.
*/
Boolean bindGroup(AgentClusterNodeBindGroupRequest request);

/**
* adds fields, need re-config the InlongStream and StreamSink.
*
* @param request add fields
* @return success or failure
*/
Boolean addFields(AddFieldsRequest request);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.gson.Gson;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.RandomStringUtils;
Expand Down Expand Up @@ -61,8 +62,11 @@
import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterDTO;
import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarDTO;
import org.apache.inlong.manager.pojo.source.file.FileSourceDTO;
import org.apache.inlong.manager.pojo.stream.AddFieldsRequest;
import org.apache.inlong.manager.service.core.AgentService;
import org.apache.inlong.manager.service.sink.StreamSinkService;
import org.apache.inlong.manager.service.source.SourceSnapshotOperator;
import org.apache.inlong.manager.service.stream.InlongStreamService;
import org.elasticsearch.common.util.set.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -72,6 +76,7 @@
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -81,6 +86,12 @@
import java.util.Locale;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand All @@ -97,6 +108,18 @@ public class AgentServiceImpl implements AgentService {
private static final int TASK_FETCH_SIZE = 2;
private static final Gson GSON = new Gson();

// field change queue (only supports adding fields)
private final LinkedBlockingQueue<AddFieldsRequest> addFieldQueue = new LinkedBlockingQueue<>();

private final ExecutorService executorService = new ThreadPoolExecutor(
5,
10,
10L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100),
new ThreadFactoryBuilder().setNameFormat("async-add-field-%s").build(),
new CallerRunsPolicy());

@Autowired
private StreamSourceEntityMapper sourceMapper;
@Autowired
Expand All @@ -111,6 +134,20 @@ public class AgentServiceImpl implements AgentService {
private InlongClusterEntityMapper clusterMapper;
@Autowired
private InlongClusterNodeEntityMapper clusterNodeMapper;
@Autowired
private InlongStreamService streamService;
@Autowired
private StreamSinkService sinkService;

/**
* start add field task
*/
@PostConstruct
private void startHeartbeatTask() {
AddFieldsTaskRunnable addFieldsTaskRunnable = new AddFieldsTaskRunnable();
this.executorService.execute(addFieldsTaskRunnable);
LOGGER.info("add field task started successfully");
}

@Override
public Boolean reportSnapshot(TaskSnapshotRequest request) {
Expand Down Expand Up @@ -260,6 +297,16 @@ public Boolean bindGroup(AgentClusterNodeBindGroupRequest request) {
return true;
}

@Override
public Boolean addFields(AddFieldsRequest request) {
if (CollectionUtils.isEmpty(request.getFields())) {
LOGGER.warn("add field request is empty, just return");
return true;
}
LOGGER.info("received add fields request: {}", request);
return addFieldQueue.add(request);
}

/**
* Query the tasks that source is waited to be operated.(only clusterName and ip matched it can be operated)
*/
Expand Down Expand Up @@ -590,4 +637,55 @@ private boolean matchGroup(StreamSourceEntity sourceEntity, InlongClusterNodeEnt
return sourceGroups.stream().anyMatch(clusterNodeGroups::contains);
}

/**
* Task for add fields
*/
private class AddFieldsTaskRunnable implements Runnable {

private static final int WAIT_SECONDS = 60 * 1000;

@Override
public void run() {
while (true) {
try {
processFields();
Thread.sleep(WAIT_SECONDS);
} catch (Exception e) {
LOGGER.error("exception occurred when add fields", e);
}
}
}

@Transactional(rollbackFor = Throwable.class)
public void processFields() {
if (addFieldQueue.isEmpty()) {
return;
}
AddFieldsRequest fieldsRequest = addFieldQueue.poll();
Preconditions.expectNotNull(fieldsRequest, "add fields request is null from the queue");
Integer id = fieldsRequest.getId();
Preconditions.expectNotNull(fieldsRequest, "add fields request has no id field " + fieldsRequest);
StreamSourceEntity streamSourceEntity = sourceMapper.selectById(id);
Preconditions.expectNotNull(streamSourceEntity, "stream source not found by id=" + id);

String groupId = streamSourceEntity.getInlongGroupId();
String streamId = streamSourceEntity.getInlongStreamId();
String sourceType = streamSourceEntity.getSourceType();
InlongGroupEntity groupEntity = groupMapper.selectByGroupId(groupId);
Preconditions.expectNotNull(groupEntity, "not found group info by groupId " + groupId);
InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(groupId, streamId);

try {
streamService.addFieldForStream(fieldsRequest, sourceType, groupEntity, streamEntity);
sinkService.addFieldForSink(fieldsRequest, sourceType, groupEntity, streamEntity);
LOGGER.info("success to add fields for groupId={} streamId={}", groupId, streamId);
} catch (Exception e) {
String errMsg = String.format("failed to add fields for groupId=%s streamId=%s ",
groupId, streamId);
LOGGER.error(errMsg, e);
throw new BusinessException(errMsg + e.getMessage());
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.inlong.manager.service.sink;

import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.common.UpdateResult;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
Expand All @@ -27,6 +29,7 @@
import org.apache.inlong.manager.pojo.sink.SinkPageRequest;
import org.apache.inlong.manager.pojo.sink.SinkRequest;
import org.apache.inlong.manager.pojo.sink.StreamSink;
import org.apache.inlong.manager.pojo.stream.AddFieldsRequest;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.user.UserInfo;

Expand Down Expand Up @@ -253,4 +256,15 @@ public interface StreamSinkService {
* @return list of sink field
*/
List<SinkField> parseFields(ParseFieldRequest parseFieldRequest);

/**
* Add field for stream sink
*
* @param fieldsRequest field info
* @param sourceType stream source type
* @param groupEntity group info
* @param streamEntity stream info
*/
void addFieldForSink(AddFieldsRequest fieldsRequest, String sourceType, InlongGroupEntity groupEntity,
InlongStreamEntity streamEntity);
}