Skip to content

Commit

Permalink
[INLONG-8035][Manager] Fix Non-file tasks cannot be recovered from th…
Browse files Browse the repository at this point in the history
…e heartbeat timeout state (#8037)
  • Loading branch information
fuweng11 authored and dockerzhang committed May 16, 2023
1 parent 3963987 commit d0f950e
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 8 deletions.
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@

# InLong Changelog

<!---
Expand Down Expand Up @@ -48,6 +49,7 @@
### Manager
| ISSUE | Summary |
|:-----------------------------------------------------------:|:-----------------------------------------------------------------------------------------------|
| [INLONG-8035](https://github.com/apache/inlong/issues/8035) | [Bug][Manager] Non-file tasks cannot be recovered from the heartbeat timeout state |
| [INLONG-8021](https://github.com/apache/inlong/issues/8021) | [Improve][Manager] Periodically delete sources with inconsistent states |
| [INLONG-8006](https://github.com/apache/inlong/issues/8006) | [Improve][Manager] Set displayname for the auto-registered cluster |
| [INLONG-7999](https://github.com/apache/inlong/issues/7999) | [Improve][Manager] Support PostgreSQL data node |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -493,8 +493,8 @@ private void preProcessLabelFileTasks(TaskRequest taskRequest) {

private void preTimeoutTasks(TaskRequest taskRequest) {
// If the agent report succeeds, restore the source status
List<Integer> needUpdateIds = sourceMapper.selectHeartbeatTimeoutIds(Lists.newArrayList(SourceType.FILE),
taskRequest.getAgentIp(), taskRequest.getClusterName());
List<Integer> needUpdateIds = sourceMapper.selectHeartbeatTimeoutIds(null, taskRequest.getAgentIp(),
taskRequest.getClusterName());
// restore state for all source by ip and type
if (CollectionUtils.isNotEmpty(needUpdateIds)) {
sourceMapper.rollbackTimeoutStatusByIds(needUpdateIds, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.github.benmanes.caffeine.cache.RemovalCause;
import com.github.benmanes.caffeine.cache.Scheduler;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
import lombok.Getter;
import lombok.SneakyThrows;
Expand All @@ -35,7 +34,6 @@
import org.apache.inlong.common.heartbeat.ComponentHeartbeat;
import org.apache.inlong.common.heartbeat.HeartbeatMsg;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.consts.SourceType;
import org.apache.inlong.manager.common.enums.ClusterStatus;
import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
Expand Down Expand Up @@ -180,8 +178,8 @@ public void reportHeartbeat(HeartbeatMsg heartbeat) {
// If the agent report succeeds, restore the source status
if (Objects.equals(clusterNode.getType(), ClusterType.AGENT)) {
// If the agent report succeeds, restore the source status
List<Integer> needUpdateIds = sourceMapper.selectHeartbeatTimeoutIds(
Lists.newArrayList(SourceType.FILE), heartbeat.getIp(), heartbeat.getClusterName());
List<Integer> needUpdateIds = sourceMapper.selectHeartbeatTimeoutIds(null, heartbeat.getIp(),
heartbeat.getClusterName());
// restore state for all source by ip and type
if (CollectionUtils.isNotEmpty(needUpdateIds)) {
sourceMapper.rollbackTimeoutStatusByIds(needUpdateIds, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,12 @@ data.cleansing.batchSize=100
sort.enable.zookeeper=false

# If turned on, synchronizing change the source status when the agent heartbeat times out
source.update.enabled=true
source.update.enabled=false
source.update.before.seconds=60
source.update.interval=60

# If turned on, tasks in the incorrect state are periodically deleted
source.cleansing.enabled=true
source.cleansing.enabled=false
source.cleansing.interval=600


0 comments on commit d0f950e

Please sign in to comment.