Skip to content

Commit

Permalink
Merge pull request #39 from qq254963746/develop
Browse files Browse the repository at this point in the history
add retry scheduler name
  • Loading branch information
qq254963746 committed Jun 12, 2015
2 parents 9f6929f + 96a5a28 commit 3bfc6c6
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 8 deletions.
21 changes: 14 additions & 7 deletions lts-core/src/main/java/com/lts/core/support/RetryScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public abstract class RetryScheduler<T> {
private ScheduledFuture<?> scheduledFuture;
private AtomicBoolean start = new AtomicBoolean(false);
private FailStore failStore;
// 名称主要是用来记录日志
private String name;

// 批量发送的消息数
private int batchSize = 5;
Expand All @@ -47,6 +49,7 @@ public RetryScheduler(Application application, String storePath) {
FailStoreFactory failStoreFactory = ExtensionLoader.getExtensionLoader(FailStoreFactory.class).getAdaptiveExtension();
failStore = failStoreFactory.getFailStore(application.getConfig(), storePath);
}

public RetryScheduler(Application application, String storePath, int batchSize) {
this(application, storePath);
this.batchSize = batchSize;
Expand All @@ -57,15 +60,19 @@ protected RetryScheduler(Application application, int batchSize) {
this.batchSize = batchSize;
}

public void setName(String name) {
this.name = name;
}

public void start() {
try {
if (start.compareAndSet(false, true)) {
// 这个时间后面再去优化
scheduledFuture = RETRY_EXECUTOR_SERVICE.scheduleWithFixedDelay(new CheckRunner(), 10, 30, TimeUnit.SECONDS);
LOGGER.error("Start retry scheduler success!");
LOGGER.info("Start {} retry scheduler success!", name);
}
} catch (Throwable t) {
LOGGER.error("Start retry scheduler failed!", t);
LOGGER.error("Start {} retry scheduler failed!", name, t);
}
}

Expand All @@ -74,10 +81,10 @@ public void stop() {
if (start.compareAndSet(false, true)) {
scheduledFuture.cancel(true);
RETRY_EXECUTOR_SERVICE.shutdown();
LOGGER.error("Stop retry scheduler success!");
LOGGER.info("Stop {} retry scheduler success!", name);
}
} catch (Throwable t) {
LOGGER.error("Stop retry scheduler failed!", t);
LOGGER.error("Stop {} retry scheduler failed!", name, t);
}
}

Expand Down Expand Up @@ -117,7 +124,7 @@ public void run() {
values.add(kvPair.getValue());
}
if (retry(values)) {
LOGGER.info("本地数据发送成功, {}", JSONUtils.toJSONString(values));
LOGGER.info("{} local files send success, {}", name, JSONUtils.toJSONString(values));
failStore.delete(keys);
} else {
break;
Expand All @@ -137,7 +144,7 @@ public void run() {
} while (CollectionUtils.isNotEmpty(kvPairs));

} catch (Throwable e) {
LOGGER.error(e.getMessage(), e);
LOGGER.error("Run {} retry scheduler error.", name, e);
}
}

Expand All @@ -152,7 +159,7 @@ public void inSchedule(String key, T value) {
failStore.close();
}
} catch (FailStoreException e) {
LOGGER.error(e.getMessage(), e);
LOGGER.error("{} in schedule error. ", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ protected boolean retry(List<Job> jobs) {
return false;
}
};
retryScheduler.setName(RetryJobClient.class.getSimpleName());
super.innerStart();
retryScheduler.start();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ protected boolean retry(List<BizLog> list) {
return sendBizLog(list);
}
};
retryScheduler.setName(BizLogger.class.getSimpleName());
this.retryScheduler.start();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ protected boolean retry(List<JobResult> jobResults) {
return retrySendJobResults(jobResults);
}
};

retryScheduler.setName("JobPush");
retryScheduler.start();

// 线程安全的
Expand Down

0 comments on commit 3bfc6c6

Please sign in to comment.