Skip to content

Commit

Permalink
[INLONG-10189][Agent] Handling SDK initialization exceptions (#10190)
Browse files Browse the repository at this point in the history
Co-authored-by: AloysZhang <lofterzhang@gmail.com>
  • Loading branch information
justinwwhuang and aloyszhang committed May 13, 2024
1 parent 06d5e37 commit 61d2b74
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,6 @@ public abstract class Instance extends AbstractStateWrapper {
* get instance id
*/
public abstract String getInstanceId();

public abstract long getLastHeartbeatTime();
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public class InstanceManager extends AbstractDaemon {
private static final int ACTION_QUEUE_CAPACITY = 100;
public volatile int CORE_THREAD_SLEEP_TIME_MS = 1000;
public static final int INSTANCE_PRINT_INTERVAL_MS = 10000;
public static final long INSTANCE_KEEP_ALIVE_MS = 5 * 60 * 1000;
private long lastPrintTime = 0;
// instance in db
private final InstanceDb instanceDb;
Expand Down Expand Up @@ -240,6 +241,11 @@ private void traverseMemoryTasksToDb() {
if (stateFromDb != InstanceStateEnum.DEFAULT) {
deleteFromMemory(instance.getInstanceId());
}
if (AgentUtils.getCurrentTime() - instance.getLastHeartbeatTime() > INSTANCE_KEEP_ALIVE_MS) {
LOGGER.error("instance heartbeat timeout, id: {}, will be deleted from memory",
instance.getInstanceId());
deleteFromMemory(instance.getInstanceId());
}
});
}

Expand Down Expand Up @@ -391,6 +397,11 @@ private void addToMemory(InstanceProfile instanceProfile) {
}
LOGGER.info("instanceProfile {}", instanceProfile.toJsonStr());
try {
if (instanceMap.size() > instanceLimit) {
LOGGER.info("add instance to memory refused because instanceMap size over limit {}",
instanceProfile.getInstanceId());
return;
}
Class<?> taskClass = Class.forName(instanceProfile.getInstanceClass());
Instance instance = (Instance) taskClass.newInstance();
boolean initSuc = instance.init(this, instanceProfile);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public class TaskManager extends AbstractDaemon {
public static final int CONFIG_QUEUE_CAPACITY = 1;
public static final int CORE_THREAD_SLEEP_TIME = 1000;
public static final int CORE_THREAD_PRINT_TIME = 10000;
private static final int ACTION_QUEUE_CAPACITY = 100000;
private static final int ACTION_QUEUE_CAPACITY = 1000;
private long lastPrintTime = 0;
// task basic db
private final Db taskBasicDb;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.plugin.Instance;
import org.apache.inlong.agent.utils.AgentUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -66,6 +67,11 @@ public String getInstanceId() {
return profile.getInstanceId();
}

@Override
public long getLastHeartbeatTime() {
return AgentUtils.getCurrentTime();
}

@Override
public void addCallbacks() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public abstract class CommonInstance extends Instance {
private volatile boolean running = false;
private volatile boolean inited = false;
private volatile int checkFinishCount = 0;
private int heartbeatcheckCount = 0;
private int heartbeatCheckCount = 0;
private long heartBeatStartTime = AgentUtils.getCurrentTime();
protected long auditVersion;

Expand All @@ -72,15 +72,15 @@ public boolean init(Object srcManager, InstanceProfile srcProfile) {
profile.getInstanceId(), profile.toJsonStr());
source = (Source) Class.forName(profile.getSourceClass()).newInstance();
source.init(profile);
source.start();
sink = (Sink) Class.forName(profile.getSinkClass()).newInstance();
sink.init(profile);
inited = true;
return true;
} catch (Throwable e) {
handleSourceDeleted();
handleDeleted();
doChangeState(State.FATAL);
LOGGER.error("init instance {} for task {} failed", profile.getInstanceId(), profile.getInstanceId(), e);
LOGGER.error("init instance {} for task {} failed", profile.getInstanceId(), profile.getInstanceId(),
e);
ThreadUtils.threadThrowableHandler(Thread.currentThread(), e);
return false;
}
Expand Down Expand Up @@ -117,10 +117,17 @@ public void run() {
}

private void doRun() {
source.start();
while (!isFinished()) {
if (!source.sourceExist()) {
handleSourceDeleted();
break;
if (handleDeleted()) {
break;
} else {
LOGGER.error("instance manager action queue is full: taskId {}",
instanceManager.getTaskId());
AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
continue;
}
}
Message msg = source.read();
if (msg == null) {
Expand All @@ -144,8 +151,8 @@ private void doRun() {
AgentUtils.silenceSleepInMs(WRITE_FAILED_WAIT_TIME_MS);
}
}
heartbeatcheckCount++;
if (heartbeatcheckCount > HEARTBEAT_CHECK_GAP) {
heartbeatCheckCount++;
if (heartbeatCheckCount > HEARTBEAT_CHECK_GAP) {
heartbeatStatic();
}
}
Expand All @@ -156,7 +163,7 @@ private void heartbeatStatic() {
if (AgentUtils.getCurrentTime() - heartBeatStartTime > TimeUnit.SECONDS.toMillis(1)) {
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_INSTANCE_HEARTBEAT, profile.getInlongGroupId(),
profile.getInlongStreamId(), AgentUtils.getCurrentTime(), 1, 1, auditVersion);
heartbeatcheckCount = 0;
heartbeatCheckCount = 0;
heartBeatStartTime = AgentUtils.getCurrentTime();
}
}
Expand All @@ -169,16 +176,17 @@ private void handleReadEnd() {
}
}

private void handleSourceDeleted() {
private boolean handleDeleted() {
OffsetManager.getInstance().deleteOffset(getTaskId(), getInstanceId());
profile.setState(InstanceStateEnum.DELETE);
profile.setModifyTime(AgentUtils.getCurrentTime());
InstanceAction action = new InstanceAction(ActionType.DELETE, profile);
while (!isFinished() && !instanceManager.submitAction(action)) {
LOGGER.error("instance manager action queue is full: taskId {}",
instanceManager.getTaskId());
AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
}
return instanceManager.submitAction(action);
}

@Override
public long getLastHeartbeatTime() {
return heartBeatStartTime;
}

@Override
Expand Down

0 comments on commit 61d2b74

Please sign in to comment.