From 7bb95cdbbc59d5cd959dc0b2771059b72e3a6529 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=83=A1=E8=B4=B5?= Date: Sat, 13 Jun 2015 09:11:50 +0800 Subject: [PATCH 1/2] add retry scheduler name --- README.md | 4 +- .../failstore/leveldb/LeveldbFailStore.java | 42 ++++++++++++++++--- .../failstore/rocksdb/RocksdbFailStore.java | 2 +- .../berkeleydb/BerkeleydbFailStoreTest.java | 7 ++-- .../leveldb/LeveldbFailStoreTest.java | 4 +- .../rocksdb/RocksdbFailStoreTest.java | 4 +- 6 files changed, 49 insertions(+), 14 deletions(-) diff --git a/README.md b/README.md index 8664e0008..e31438616 100644 --- a/README.md +++ b/README.md @@ -54,7 +54,7 @@ LTS 轻量级分布式任务调度框架(Light Task Schedule) ###LTS Admin ![Aaron Swartz](https://raw.githubusercontent.com/qq254963746/light-task-schedule/master/doc/LTS_Admin.png) ###调用示例 -下面提供的是最简单的配置方式。更多配置请查看 [lts-example](https://github.com/qq254963746/light-task-schedule/tree/master/lts-example/src/main/java/com/lts/job/example/api) 模块下的 API 调用方式例子. +下面提供的是最简单的配置方式。更多配置请查看 [lts-example](https://github.com/qq254963746/light-task-schedule/tree/master/lts-example/src/main/java/com/lts/example/api) 模块下的 API 调用方式例子. ####JobTracker 端 ```java @@ -85,7 +85,7 @@ LTS 轻量级分布式任务调度框架(Light Task Schedule) public void run(Job job) throws Throwable { System.out.println("我要执行"+ job); System.out.println(job.getParam("shopId")); - // TODO 用户自己的业务逻辑 + // TODO 用户自己的业务逻辑, 应该保证幂等 try { Thread.sleep(5*1000L); } catch (InterruptedException e) { diff --git a/lts-core/src/main/java/com/lts/core/failstore/leveldb/LeveldbFailStore.java b/lts-core/src/main/java/com/lts/core/failstore/leveldb/LeveldbFailStore.java index 8ec4bc1c7..d45fd9bc7 100644 --- a/lts-core/src/main/java/com/lts/core/failstore/leveldb/LeveldbFailStore.java +++ b/lts-core/src/main/java/com/lts/core/failstore/leveldb/LeveldbFailStore.java @@ -7,12 +7,11 @@ import com.lts.core.failstore.FailStore; import com.lts.core.failstore.FailStoreException; import org.fusesource.leveldbjni.JniDBFactory; -import org.iq80.leveldb.DB; -import org.iq80.leveldb.DBIterator; -import org.iq80.leveldb.Options; +import org.iq80.leveldb.*; import java.io.File; import java.io.IOException; +import java.io.UnsupportedEncodingException; import java.lang.reflect.Type; import java.util.ArrayList; import java.util.List; @@ -39,6 +38,14 @@ public LeveldbFailStore(String failStorePath) { failStorePath = failStorePath + "/leveldb/"; dbPath = FileUtils.createDirIfNotExist(failStorePath); options = new Options(); + options.createIfMissing(true); + options.cacheSize(100 * 1024 * 1024); // 100M +// options.logger(new Logger() { +// @Override +// public void log(String message) { +// System.out.println(message); +// } +// }); lock = new FileLock(failStorePath + "___db.lock"); } @@ -46,6 +53,7 @@ public LeveldbFailStore(String failStorePath) { public void open() throws FailStoreException { try { lock.tryLock(); + JniDBFactory.factory.repair(dbPath, options); db = JniDBFactory.factory.open(dbPath, options); } catch (IOException e) { throw new FailStoreException(e); @@ -79,16 +87,30 @@ public void delete(List keys) throws FailStoreException { if (keys == null || keys.size() == 0) { return; } - for (String key : keys) { - delete(key); + WriteBatch batch = db.createWriteBatch(); + try { + + for (String key : keys) { + batch.delete(key.getBytes("UTF-8")); + } + db.write(batch); + } catch (UnsupportedEncodingException e) { + throw new FailStoreException(e); + } finally { + try { + batch.close(); + } catch (IOException e) { + throw new FailStoreException(e); + } } } @Override public List> fetchTop(int size, Type type) throws FailStoreException { + DBIterator iterator = null; try { List> list = new ArrayList>(size); - DBIterator iterator = db.iterator(); + iterator = db.iterator(); for (iterator.seekToFirst(); iterator.hasNext(); iterator.next()) { Map.Entry entry = iterator.peekNext(); String key = new String(entry.getKey(), "UTF-8"); @@ -102,6 +124,14 @@ public List> fetchTop(int size, Type type) throws FailStor return list; } catch (Exception e) { throw new FailStoreException(e); + } finally { + if (iterator != null) { + try { + iterator.close(); + } catch (IOException e) { + throw new FailStoreException(e); + } + } } } diff --git a/lts-core/src/main/java/com/lts/core/failstore/rocksdb/RocksdbFailStore.java b/lts-core/src/main/java/com/lts/core/failstore/rocksdb/RocksdbFailStore.java index 76dd3cf61..094e27839 100644 --- a/lts-core/src/main/java/com/lts/core/failstore/rocksdb/RocksdbFailStore.java +++ b/lts-core/src/main/java/com/lts/core/failstore/rocksdb/RocksdbFailStore.java @@ -103,7 +103,7 @@ public List> fetchTop(int size, Type type) throws FailStor try { List> list = new ArrayList>(size); iterator = db.newIterator(); - for (iterator.seekToLast(); iterator.isValid(); iterator.prev()) { + for (iterator.seekToFirst(); iterator.isValid(); iterator.next()) { iterator.status(); String key = new String(iterator.key(), "UTF-8"); T value = JSONUtils.parse(new String(iterator.value(), "UTF-8"), type); diff --git a/lts-core/src/test/java/com/lts/core/failstore/berkeleydb/BerkeleydbFailStoreTest.java b/lts-core/src/test/java/com/lts/core/failstore/berkeleydb/BerkeleydbFailStoreTest.java index 850373630..7868d8f5b 100644 --- a/lts-core/src/test/java/com/lts/core/failstore/berkeleydb/BerkeleydbFailStoreTest.java +++ b/lts-core/src/test/java/com/lts/core/failstore/berkeleydb/BerkeleydbFailStoreTest.java @@ -37,10 +37,11 @@ public void setup() throws FailStoreException { public void put() throws FailStoreException { Job job = new Job(); job.setTaskId("2131232"); - failStore.put(key, job); - failStore.close(); + for (int i = 0; i < 100; i++) { + failStore.put(key + "" + i, job); + } System.out.println("这里debug测试多线程"); -// fetchTop(); + failStore.close(); } @Test diff --git a/lts-core/src/test/java/com/lts/core/failstore/leveldb/LeveldbFailStoreTest.java b/lts-core/src/test/java/com/lts/core/failstore/leveldb/LeveldbFailStoreTest.java index 169b47627..8d1994db2 100644 --- a/lts-core/src/test/java/com/lts/core/failstore/leveldb/LeveldbFailStoreTest.java +++ b/lts-core/src/test/java/com/lts/core/failstore/leveldb/LeveldbFailStoreTest.java @@ -32,7 +32,9 @@ public void setup() throws FailStoreException { public void put() throws FailStoreException { Job job = new Job(); job.setTaskId("2131232"); - failStore.put(key, job); + for (int i = 0; i < 100; i++) { + failStore.put(key + "" + i, job); + } System.out.println("这里debug测试多线程"); failStore.close(); } diff --git a/lts-core/src/test/java/com/lts/core/failstore/rocksdb/RocksdbFailStoreTest.java b/lts-core/src/test/java/com/lts/core/failstore/rocksdb/RocksdbFailStoreTest.java index 2cdd8612a..d9d69ebeb 100644 --- a/lts-core/src/test/java/com/lts/core/failstore/rocksdb/RocksdbFailStoreTest.java +++ b/lts-core/src/test/java/com/lts/core/failstore/rocksdb/RocksdbFailStoreTest.java @@ -37,7 +37,9 @@ public void setup() throws FailStoreException { public void put() throws FailStoreException { Job job = new Job(); job.setTaskId("2131232"); - failStore.put(key, job); + for (int i = 0; i < 100; i++) { + failStore.put(key + "" + i, job); + } System.out.println("这里debug测试多线程"); failStore.close(); } From 218ac0cae2d6e7b8979ce74a691837576de76839 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=83=A1=E8=B4=B5?= Date: Sat, 13 Jun 2015 11:01:36 +0800 Subject: [PATCH 2/2] performance sth --- .../biz/logger/mysql/MysqlJobLoggerTest.java | 2 +- .../main/java/com/lts/core/domain/Job.java | 13 +-- .../java/com/lts/core/domain/JobResult.java | 4 +- .../java/com/lts/core/domain/JobWrapper.java | 43 ++++++++ .../lts/core/domain/TaskTrackerJobResult.java | 56 +++++++++++ .../berkeleydb/BerkeleydbFailStore.java | 1 - .../failstore/leveldb/LeveldbFailStore.java | 11 ++- .../core/protocol/command/JobPushRequest.java | 12 +-- .../command/TtJobFinishedRequest.java | 55 +++++++++++ .../com/lts/core/support/RetryScheduler.java | 17 +--- .../leveldb/LeveldbFailStoreTest.java | 5 +- .../support/JobFinishedHandlerImpl.java | 3 +- .../processor/JobFinishedProcessor.java | 25 +---- .../jobclient/support/JobFinishedHandler.java | 2 +- .../processor/JobFinishedProcessor.java | 99 +++++++++---------- .../jobtracker/support/ClientNotifier.java | 52 ++++++---- .../support/ClientNotifyHandler.java | 4 +- .../support/JobDomainConverter.java | 19 ++-- .../com/lts/jobtracker/support/JobPusher.java | 8 +- .../checker/FeedbackJobSendChecker.java | 30 +++--- .../support/policy/OldDataDeletePolicy.java | 2 +- .../com/lts/queue/domain/JobFeedbackPo.java | 12 +-- .../queue/mongo/MongoJobFeedbackQueue.java | 2 +- .../queue/mysql/MysqlJobFeedbackQueue.java | 8 +- .../main/java/com/lts/tasktracker/Result.java | 2 +- .../com/lts/tasktracker/domain/Response.java | 12 +-- .../processor/JobPushProcessor.java | 60 +++++------ .../tasktracker/runner/JobRunnerDelegate.java | 26 ++--- .../tasktracker/runner/RunnerCallback.java | 4 +- .../lts/tasktracker/runner/RunnerPool.java | 7 +- 30 files changed, 362 insertions(+), 234 deletions(-) create mode 100644 lts-core/src/main/java/com/lts/core/domain/JobWrapper.java create mode 100644 lts-core/src/main/java/com/lts/core/domain/TaskTrackerJobResult.java create mode 100644 lts-core/src/main/java/com/lts/core/protocol/command/TtJobFinishedRequest.java diff --git a/lts-admin/src/test/java/com/lts/biz/logger/mysql/MysqlJobLoggerTest.java b/lts-admin/src/test/java/com/lts/biz/logger/mysql/MysqlJobLoggerTest.java index 1bbd57c45..3296d9080 100644 --- a/lts-admin/src/test/java/com/lts/biz/logger/mysql/MysqlJobLoggerTest.java +++ b/lts-admin/src/test/java/com/lts/biz/logger/mysql/MysqlJobLoggerTest.java @@ -14,7 +14,7 @@ import static org.junit.Assert.*; /** - * Created by hugui on 6/12/15. + * @author Robert HG (254963746@qq.com) on 6/12/15. */ public class MysqlJobLoggerTest { diff --git a/lts-core/src/main/java/com/lts/core/domain/Job.java b/lts-core/src/main/java/com/lts/core/domain/Job.java index 74f4a68dc..3360bf83c 100644 --- a/lts-core/src/main/java/com/lts/core/domain/Job.java +++ b/lts-core/src/main/java/com/lts/core/domain/Job.java @@ -1,10 +1,10 @@ package com.lts.core.domain; -import com.lts.core.exception.JobSubmitException; -import com.lts.core.support.CronExpression; import com.lts.core.commons.utils.JSONUtils; import com.lts.core.commons.utils.StringUtils; +import com.lts.core.exception.JobSubmitException; +import com.lts.core.support.CronExpression; import com.lts.remoting.annotation.NotNull; import java.util.Date; @@ -16,7 +16,6 @@ */ public class Job { - protected String jobId; @NotNull protected String taskId; /** @@ -85,14 +84,6 @@ public void setNeedFeedback(boolean needFeedback) { this.needFeedback = needFeedback; } - public String getJobId() { - return jobId; - } - - public void setJobId(String jobId) { - this.jobId = jobId; - } - public Map getExtParams() { return extParams; } diff --git a/lts-core/src/main/java/com/lts/core/domain/JobResult.java b/lts-core/src/main/java/com/lts/core/domain/JobResult.java index 01dcffb4d..02e7fd77c 100644 --- a/lts-core/src/main/java/com/lts/core/domain/JobResult.java +++ b/lts-core/src/main/java/com/lts/core/domain/JobResult.java @@ -3,8 +3,8 @@ import com.lts.core.commons.utils.JSONUtils; /** - * @author Robert HG (254963746@qq.com) on 8/19/14. - * 任务执行结果 + * @author Robert HG (254963746@qq.com) on 6/13/15. + * 发送给客户端的 任务执行结果 */ public class JobResult { diff --git a/lts-core/src/main/java/com/lts/core/domain/JobWrapper.java b/lts-core/src/main/java/com/lts/core/domain/JobWrapper.java new file mode 100644 index 000000000..7fe2738a3 --- /dev/null +++ b/lts-core/src/main/java/com/lts/core/domain/JobWrapper.java @@ -0,0 +1,43 @@ +package com.lts.core.domain; + +/** + * @author Robert HG (254963746@qq.com) on 6/13/15. + */ +public class JobWrapper { + + private String jobId; + + private Job job; + + public JobWrapper(String jobId, Job job) { + this.jobId = jobId; + this.job = job; + } + + public JobWrapper() { + } + + public String getJobId() { + return jobId; + } + + public void setJobId(String jobId) { + this.jobId = jobId; + } + + public Job getJob() { + return job; + } + + public void setJob(Job job) { + this.job = job; + } + + @Override + public String toString() { + return "JobWrapper{" + + "jobId='" + jobId + '\'' + + ", job=" + job + + '}'; + } +} diff --git a/lts-core/src/main/java/com/lts/core/domain/TaskTrackerJobResult.java b/lts-core/src/main/java/com/lts/core/domain/TaskTrackerJobResult.java new file mode 100644 index 000000000..6eef16654 --- /dev/null +++ b/lts-core/src/main/java/com/lts/core/domain/TaskTrackerJobResult.java @@ -0,0 +1,56 @@ +package com.lts.core.domain; + +import com.lts.core.commons.utils.JSONUtils; + +/** + * @author Robert HG (254963746@qq.com) on 8/19/14. + * TaskTracker 任务执行结果 + */ +public class TaskTrackerJobResult { + + private JobWrapper jobWrapper; + + // 执行成功还是失败 + private boolean success; + + private String msg; + // 任务完成时间 + private Long time; + + public JobWrapper getJobWrapper() { + return jobWrapper; + } + + public void setJobWrapper(JobWrapper jobWrapper) { + this.jobWrapper = jobWrapper; + } + + public boolean isSuccess() { + return success; + } + + public void setSuccess(boolean success) { + this.success = success; + } + + public String getMsg() { + return msg; + } + + public void setMsg(String msg) { + this.msg = msg; + } + + public Long getTime() { + return time; + } + + public void setTime(Long time) { + this.time = time; + } + + @Override + public String toString() { + return JSONUtils.toJSONString(this); + } +} diff --git a/lts-core/src/main/java/com/lts/core/failstore/berkeleydb/BerkeleydbFailStore.java b/lts-core/src/main/java/com/lts/core/failstore/berkeleydb/BerkeleydbFailStore.java index 27703d22f..b038f2f33 100644 --- a/lts-core/src/main/java/com/lts/core/failstore/berkeleydb/BerkeleydbFailStore.java +++ b/lts-core/src/main/java/com/lts/core/failstore/berkeleydb/BerkeleydbFailStore.java @@ -1,6 +1,5 @@ package com.lts.core.failstore.berkeleydb; -import com.lts.core.cluster.Config; import com.lts.core.commons.file.FileLock; import com.lts.core.commons.file.FileUtils; import com.lts.core.commons.utils.CollectionUtils; diff --git a/lts-core/src/main/java/com/lts/core/failstore/leveldb/LeveldbFailStore.java b/lts-core/src/main/java/com/lts/core/failstore/leveldb/LeveldbFailStore.java index d45fd9bc7..7b8e77f6c 100644 --- a/lts-core/src/main/java/com/lts/core/failstore/leveldb/LeveldbFailStore.java +++ b/lts-core/src/main/java/com/lts/core/failstore/leveldb/LeveldbFailStore.java @@ -39,6 +39,7 @@ public LeveldbFailStore(String failStorePath) { dbPath = FileUtils.createDirIfNotExist(failStorePath); options = new Options(); options.createIfMissing(true); +// options.compressionType(CompressionType.NONE); options.cacheSize(100 * 1024 * 1024); // 100M // options.logger(new Logger() { // @Override @@ -107,10 +108,13 @@ public void delete(List keys) throws FailStoreException { @Override public List> fetchTop(int size, Type type) throws FailStoreException { + Snapshot snapshot = db.getSnapshot(); DBIterator iterator = null; try { List> list = new ArrayList>(size); - iterator = db.iterator(); + ReadOptions options=new ReadOptions(); + options.snapshot(snapshot); + iterator = db.iterator(options); for (iterator.seekToFirst(); iterator.hasNext(); iterator.next()) { Map.Entry entry = iterator.peekNext(); String key = new String(entry.getKey(), "UTF-8"); @@ -132,6 +136,11 @@ public List> fetchTop(int size, Type type) throws FailStor throw new FailStoreException(e); } } + try { + snapshot.close(); + } catch (IOException e) { + throw new FailStoreException(e); + } } } diff --git a/lts-core/src/main/java/com/lts/core/protocol/command/JobPushRequest.java b/lts-core/src/main/java/com/lts/core/protocol/command/JobPushRequest.java index 1157061e1..1cf75f40c 100644 --- a/lts-core/src/main/java/com/lts/core/protocol/command/JobPushRequest.java +++ b/lts-core/src/main/java/com/lts/core/protocol/command/JobPushRequest.java @@ -1,6 +1,6 @@ package com.lts.core.protocol.command; -import com.lts.core.domain.Job; +import com.lts.core.domain.JobWrapper; import com.lts.remoting.annotation.NotNull; /** @@ -9,13 +9,13 @@ public class JobPushRequest extends AbstractCommandBody{ @NotNull - private Job job; + private JobWrapper jobWrapper; - public Job getJob() { - return job; + public JobWrapper getJobWrapper() { + return jobWrapper; } - public void setJob(Job job) { - this.job = job; + public void setJobWrapper(JobWrapper jobWrapper) { + this.jobWrapper = jobWrapper; } } diff --git a/lts-core/src/main/java/com/lts/core/protocol/command/TtJobFinishedRequest.java b/lts-core/src/main/java/com/lts/core/protocol/command/TtJobFinishedRequest.java new file mode 100644 index 000000000..540132398 --- /dev/null +++ b/lts-core/src/main/java/com/lts/core/protocol/command/TtJobFinishedRequest.java @@ -0,0 +1,55 @@ +package com.lts.core.protocol.command; + +import com.lts.core.domain.TaskTrackerJobResult; +import com.lts.remoting.annotation.NotNull; + +import java.util.ArrayList; +import java.util.List; + +/** + * @author Robert HG (254963746@qq.com) on 8/16/14. + * TaskTracker Job finished request command body + */ +public class TtJobFinishedRequest extends AbstractCommandBody { + /** + * 是否接受新任务 + */ + private boolean receiveNewJob = false; + + @NotNull + private List taskTrackerJobResults; + + // 是否是重发(重发是批量发) + private boolean reSend = false; + + public boolean isReSend() { + return reSend; + } + + public void setReSend(boolean reSend) { + this.reSend = reSend; + } + + public boolean isReceiveNewJob() { + return receiveNewJob; + } + + public void setReceiveNewJob(boolean receiveNewJob) { + this.receiveNewJob = receiveNewJob; + } + + public List getTaskTrackerJobResults() { + return taskTrackerJobResults; + } + + public void setTaskTrackerJobResults(List taskTrackerJobResults) { + this.taskTrackerJobResults = taskTrackerJobResults; + } + + public void addJobResult(TaskTrackerJobResult taskTrackerJobResult) { + if (taskTrackerJobResults == null) { + taskTrackerJobResults = new ArrayList(); + } + taskTrackerJobResults.add(taskTrackerJobResult); + } +} diff --git a/lts-core/src/main/java/com/lts/core/support/RetryScheduler.java b/lts-core/src/main/java/com/lts/core/support/RetryScheduler.java index 7d2d6869e..bbef3668f 100644 --- a/lts-core/src/main/java/com/lts/core/support/RetryScheduler.java +++ b/lts-core/src/main/java/com/lts/core/support/RetryScheduler.java @@ -93,9 +93,6 @@ public void stop() { */ private class CheckRunner implements Runnable { - // 一次最多提交maxSentSize个, 保证文件所也能被其他线程拿到 - private int maxSentSize = 100; - @Override public void run() { try { @@ -104,8 +101,6 @@ public void run() { return; } - int sentSize = 0; - List> kvPairs = null; do { try { @@ -124,20 +119,11 @@ public void run() { values.add(kvPair.getValue()); } if (retry(values)) { - LOGGER.info("{} local files send success, {}", name, JSONUtils.toJSONString(values)); + LOGGER.info("{} local files send success, size: {}, {}", name, values.size(), JSONUtils.toJSONString(values)); failStore.delete(keys); } else { break; } - sentSize += kvPairs.size(); - if (sentSize >= maxSentSize) { - // 一次最多提交maxSentSize个, 保证文件所也能被其他线程拿到 - try { - Thread.sleep(1000L); - } catch (InterruptedException e1) { - LOGGER.warn(e1.getMessage(), e1); - } - } } finally { failStore.close(); } @@ -155,6 +141,7 @@ public void inSchedule(String key, T value) { try { failStore.open(); failStore.put(key, value); + LOGGER.info("{} local files save success, {}", name, JSONUtils.toJSONString(value)); } finally { failStore.close(); } diff --git a/lts-core/src/test/java/com/lts/core/failstore/leveldb/LeveldbFailStoreTest.java b/lts-core/src/test/java/com/lts/core/failstore/leveldb/LeveldbFailStoreTest.java index 8d1994db2..5eb3b217d 100644 --- a/lts-core/src/test/java/com/lts/core/failstore/leveldb/LeveldbFailStoreTest.java +++ b/lts-core/src/test/java/com/lts/core/failstore/leveldb/LeveldbFailStoreTest.java @@ -1,10 +1,7 @@ package com.lts.core.failstore.leveldb; -import com.lts.core.cluster.Config; -import com.lts.core.cluster.NodeType; import com.lts.core.commons.utils.CollectionUtils; import com.lts.core.commons.utils.JSONUtils; -import com.lts.core.constant.Constants; import com.lts.core.domain.Job; import com.lts.core.domain.KVPair; import com.lts.core.failstore.FailStore; @@ -15,7 +12,7 @@ import java.util.List; /** - * Created by hugui on 6/12/15. + * @author Robert HG (254963746@qq.com) on 6/12/15. */ public class LeveldbFailStoreTest { diff --git a/lts-example/src/main/java/com/lts/example/support/JobFinishedHandlerImpl.java b/lts-example/src/main/java/com/lts/example/support/JobFinishedHandlerImpl.java index aa7793820..b97302e97 100644 --- a/lts-example/src/main/java/com/lts/example/support/JobFinishedHandlerImpl.java +++ b/lts-example/src/main/java/com/lts/example/support/JobFinishedHandlerImpl.java @@ -1,8 +1,9 @@ package com.lts.example.support; import com.lts.core.commons.utils.CollectionUtils; -import com.lts.jobclient.support.JobFinishedHandler; import com.lts.core.domain.JobResult; +import com.lts.jobclient.support.JobFinishedHandler; +import com.lts.core.domain.TaskTrackerJobResult; import java.text.SimpleDateFormat; import java.util.Date; diff --git a/lts-jobclient/src/main/java/com/lts/jobclient/processor/JobFinishedProcessor.java b/lts-jobclient/src/main/java/com/lts/jobclient/processor/JobFinishedProcessor.java index 269db89e9..7acc0c6c7 100644 --- a/lts-jobclient/src/main/java/com/lts/jobclient/processor/JobFinishedProcessor.java +++ b/lts-jobclient/src/main/java/com/lts/jobclient/processor/JobFinishedProcessor.java @@ -1,19 +1,15 @@ package com.lts.jobclient.processor; +import com.lts.core.protocol.command.JobFinishedRequest; import com.lts.jobclient.support.JobFinishedHandler; -import com.lts.core.domain.JobResult; import com.lts.core.logger.Logger; import com.lts.core.logger.LoggerFactory; import com.lts.core.protocol.JobProtos; -import com.lts.core.protocol.command.JobFinishedRequest; import com.lts.core.remoting.RemotingClientDelegate; -import com.lts.core.commons.utils.CollectionUtils; import com.lts.remoting.exception.RemotingCommandException; import com.lts.remoting.protocol.RemotingCommand; import io.netty.channel.ChannelHandlerContext; -import java.util.List; - /** * @author Robert HG (254963746@qq.com) on 8/18/14. */ @@ -27,25 +23,6 @@ public JobFinishedProcessor(RemotingClientDelegate remotingClient, JobFinishedHandler jobFinishedHandler) { super(remotingClient); this.jobFinishedHandler = jobFinishedHandler; - if (this.jobFinishedHandler == null) { - this.jobFinishedHandler = new JobFinishedHandler() { - private final Logger log = LoggerFactory.getLogger("JobFinishedHandler"); - - @Override - public void handle(List jobResults) { - // do nothing - if (CollectionUtils.isNotEmpty(jobResults)) { - for (JobResult jobResult : jobResults) { - if (jobResult.isSuccess()) { - log.info("Job exec successful:" + jobResult); - } else { - log.info("Job exec failed:" + jobResult); - } - } - } - } - }; - } } @Override diff --git a/lts-jobclient/src/main/java/com/lts/jobclient/support/JobFinishedHandler.java b/lts-jobclient/src/main/java/com/lts/jobclient/support/JobFinishedHandler.java index 988fea378..b715baf0a 100644 --- a/lts-jobclient/src/main/java/com/lts/jobclient/support/JobFinishedHandler.java +++ b/lts-jobclient/src/main/java/com/lts/jobclient/support/JobFinishedHandler.java @@ -1,6 +1,7 @@ package com.lts.jobclient.support; import com.lts.core.domain.JobResult; +import com.lts.core.domain.TaskTrackerJobResult; import java.util.List; @@ -11,7 +12,6 @@ public interface JobFinishedHandler { /** * 处理返回结果 - * @param jobResults */ public void handle(List jobResults); } diff --git a/lts-jobtracker/src/main/java/com/lts/jobtracker/processor/JobFinishedProcessor.java b/lts-jobtracker/src/main/java/com/lts/jobtracker/processor/JobFinishedProcessor.java index 7f7dca51e..05320013c 100644 --- a/lts-jobtracker/src/main/java/com/lts/jobtracker/processor/JobFinishedProcessor.java +++ b/lts-jobtracker/src/main/java/com/lts/jobtracker/processor/JobFinishedProcessor.java @@ -3,11 +3,11 @@ import com.lts.biz.logger.domain.JobLogPo; import com.lts.biz.logger.domain.LogType; import com.lts.core.constant.Level; -import com.lts.core.domain.Job; -import com.lts.core.domain.JobResult; +import com.lts.core.domain.TaskTrackerJobResult; +import com.lts.core.domain.JobWrapper; import com.lts.core.logger.Logger; import com.lts.core.logger.LoggerFactory; -import com.lts.core.protocol.command.JobFinishedRequest; +import com.lts.core.protocol.command.TtJobFinishedRequest; import com.lts.core.protocol.command.JobPushRequest; import com.lts.core.remoting.RemotingServerDelegate; import com.lts.core.commons.utils.CollectionUtils; @@ -39,27 +39,27 @@ public class JobFinishedProcessor extends AbstractProcessor { public JobFinishedProcessor(RemotingServerDelegate remotingServer, final JobTrackerApplication application) { super(remotingServer, application); - this.clientNotifier = new ClientNotifier(application, new ClientNotifyHandler() { + this.clientNotifier = new ClientNotifier(application, new ClientNotifyHandler() { @Override - public void handleSuccess(List jobResults) { - finishedJob(jobResults); + public void handleSuccess(List taskTrackerJobResults) { + finishedJob(taskTrackerJobResults); } @Override - public void handleFailed(List jobResults) { - if (CollectionUtils.isNotEmpty(jobResults)) { + public void handleFailed(List taskTrackerJobResults) { + if (CollectionUtils.isNotEmpty(taskTrackerJobResults)) { List jobFeedbackPos = - new ArrayList(jobResults.size()); + new ArrayList(taskTrackerJobResults.size()); - for (JobResult jobResult : jobResults) { + for (TaskTrackerJobResult taskTrackerJobResult : taskTrackerJobResults) { JobFeedbackPo jobFeedbackPo = - JobDomainConverter.convert(jobResult); + JobDomainConverter.convert(taskTrackerJobResult); jobFeedbackPos.add(jobFeedbackPo); } // 2. 失败的存储在反馈队列 application.getJobFeedbackQueue().add(jobFeedbackPos); // 3. 完成任务  - finishedJob(jobResults); + finishedJob(taskTrackerJobResults); } } }); @@ -68,40 +68,40 @@ public void handleFailed(List jobResults) { @Override public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { - JobFinishedRequest requestBody = request.getBody(); + TtJobFinishedRequest requestBody = request.getBody(); - List jobResults = requestBody.getJobResults(); + List taskTrackerJobResults = requestBody.getTaskTrackerJobResults(); // 1. 检验参数 - if (CollectionUtils.isEmpty(jobResults)) { + if (CollectionUtils.isEmpty(taskTrackerJobResults)) { return RemotingCommand.createResponseCommand(RemotingProtos.ResponseCode.REQUEST_PARAM_ERROR.code(), "JobFinishedRequest.jobResults can not be empty!"); } if (requestBody.isReSend()) { - log(requestBody.getIdentity(), jobResults, LogType.RESEND); + log(requestBody.getIdentity(), taskTrackerJobResults, LogType.RESEND); } else { - log(requestBody.getIdentity(), jobResults, LogType.FINISHED); + log(requestBody.getIdentity(), taskTrackerJobResults, LogType.FINISHED); } - LOGGER.info("job exec finished : {}", jobResults); + LOGGER.info("job exec finished : {}", taskTrackerJobResults); - return finishJob(requestBody, jobResults); + return finishJob(requestBody, taskTrackerJobResults); } /** * 记录日志 * - * @param jobResults + * @param taskTrackerJobResults * @param logType */ - private void log(String taskTrackerIdentity, List jobResults, LogType logType) { + private void log(String taskTrackerIdentity, List taskTrackerJobResults, LogType logType) { try { - for (JobResult jobResult : jobResults) { - JobLogPo jobLogPo = JobDomainConverter.convertJobLog(jobResult.getJob()); - jobLogPo.setMsg(jobResult.getMsg()); + for (TaskTrackerJobResult taskTrackerJobResult : taskTrackerJobResults) { + JobLogPo jobLogPo = JobDomainConverter.convertJobLog(taskTrackerJobResult.getJobWrapper()); + jobLogPo.setMsg(taskTrackerJobResult.getMsg()); jobLogPo.setLogType(logType); - jobLogPo.setSuccess(jobResult.isSuccess()); + jobLogPo.setSuccess(taskTrackerJobResult.isSuccess()); jobLogPo.setTaskTrackerIdentity(taskTrackerIdentity); jobLogPo.setLevel(Level.INFO); application.getJobLogger().log(jobLogPo); @@ -117,24 +117,24 @@ private void log(String taskTrackerIdentity, List jobResults, LogType * @param requestBody * @return */ - private RemotingCommand finishJob(JobFinishedRequest requestBody, List jobResults) { + private RemotingCommand finishJob(TtJobFinishedRequest requestBody, List taskTrackerJobResults) { // 过滤出来需要通知客户端的 - List needFeedbackList = null; + List needFeedbackList = null; // 不需要反馈的 - List notNeedFeedbackList = null; + List notNeedFeedbackList = null; - for (JobResult jobResult : jobResults) { - if (jobResult.getJob().isNeedFeedback()) { + for (TaskTrackerJobResult taskTrackerJobResult : taskTrackerJobResults) { + if (taskTrackerJobResult.getJobWrapper().getJob().isNeedFeedback()) { if (needFeedbackList == null) { - needFeedbackList = new ArrayList(); + needFeedbackList = new ArrayList(); } - needFeedbackList.add(jobResult); + needFeedbackList.add(taskTrackerJobResult); } else { if (notNeedFeedbackList == null) { - notNeedFeedbackList = new ArrayList(); + notNeedFeedbackList = new ArrayList(); } - notNeedFeedbackList.add(jobResult); + notNeedFeedbackList.add(taskTrackerJobResult); } } @@ -150,17 +150,16 @@ private RemotingCommand finishJob(JobFinishedRequest requestBody, List jobResults) { + private void notifyClient(final List taskTrackerJobResults) { - if (CollectionUtils.isEmpty(jobResults)) { + if (CollectionUtils.isEmpty(taskTrackerJobResults)) { return; } // 1.发送给客户端 - clientNotifier.send(jobResults); + clientNotifier.send(taskTrackerJobResults); } - private void finishedJob(List jobResults) { - if (CollectionUtils.isEmpty(jobResults)) { + private void finishedJob(List taskTrackerJobResults) { + if (CollectionUtils.isEmpty(taskTrackerJobResults)) { return; } - for (JobResult jobResult : jobResults) { - Job job = jobResult.getJob(); + for (TaskTrackerJobResult taskTrackerJobResult : taskTrackerJobResults) { + JobWrapper jobWrapper = taskTrackerJobResult.getJobWrapper(); // 移除 - application.getExecutingJobQueue().remove(job.getJobId()); + application.getExecutingJobQueue().remove(jobWrapper.getJobId()); - if (job.isSchedule()) { + if (jobWrapper.getJob().isSchedule()) { - JobPo cronJobPo = application.getCronJobQueue().finish(job.getJobId()); + JobPo cronJobPo = application.getCronJobQueue().finish(jobWrapper.getJobId()); if (cronJobPo == null) { // 可能任务队列中改条记录被删除了 return; } Date nextTriggerTime = CronExpressionUtils.getNextTriggerTime(cronJobPo.getCronExpression()); if (nextTriggerTime == null) { - application.getCronJobQueue().remove(job.getJobId()); + application.getCronJobQueue().remove(jobWrapper.getJobId()); } else { // 表示下次还要执行 try { diff --git a/lts-jobtracker/src/main/java/com/lts/jobtracker/support/ClientNotifier.java b/lts-jobtracker/src/main/java/com/lts/jobtracker/support/ClientNotifier.java index 0fb7c6f7d..704bf48d4 100644 --- a/lts-jobtracker/src/main/java/com/lts/jobtracker/support/ClientNotifier.java +++ b/lts-jobtracker/src/main/java/com/lts/jobtracker/support/ClientNotifier.java @@ -1,7 +1,10 @@ package com.lts.jobtracker.support; +import com.lts.core.commons.utils.CollectionUtils; +import com.lts.core.commons.utils.Holder; import com.lts.core.constant.Constants; import com.lts.core.domain.JobResult; +import com.lts.core.domain.TaskTrackerJobResult; import com.lts.core.exception.RemotingSendException; import com.lts.core.exception.RequestTimeoutException; import com.lts.core.logger.Logger; @@ -9,13 +12,11 @@ import com.lts.core.protocol.JobProtos; import com.lts.core.protocol.command.JobFinishedRequest; import com.lts.core.remoting.RemotingServerDelegate; -import com.lts.core.commons.utils.CollectionUtils; -import com.lts.core.commons.utils.Holder; +import com.lts.jobtracker.domain.JobClientNode; +import com.lts.jobtracker.domain.JobTrackerApplication; import com.lts.remoting.InvokeCallback; import com.lts.remoting.netty.ResponseFuture; import com.lts.remoting.protocol.RemotingCommand; -import com.lts.jobtracker.domain.JobClientNode; -import com.lts.jobtracker.domain.JobTrackerApplication; import java.util.*; import java.util.concurrent.CountDownLatch; @@ -41,7 +42,7 @@ public ClientNotifier(JobTrackerApplication application, ClientNotifyHandler cli * @param jobResults * @return 返回成功的个数 */ - public int send(List jobResults) { + public int send(List jobResults) { if (CollectionUtils.isEmpty(jobResults)) { return 0; } @@ -49,35 +50,35 @@ public int send(List jobResults) { // 单个 就不用 分组了 if (jobResults.size() == 1) { - JobResult jobResult = jobResults.get(0); - if (!send0(jobResult.getJob().getSubmitNodeGroup(), Arrays.asList(jobResult))) { + TaskTrackerJobResult taskTrackerJobResult = jobResults.get(0); + if (!send0(taskTrackerJobResult.getJobWrapper().getJob().getSubmitNodeGroup(), Arrays.asList(taskTrackerJobResult))) { // 如果没有完成就返回 clientNotifyHandler.handleFailed(jobResults); return 0; } } else if (jobResults.size() > 1) { - List failedJobResult = new ArrayList(); + List failedTaskTrackerJobResult = new ArrayList(); // 有多个要进行分组 (出现在 失败重发的时候) - Map> groupMap = new HashMap>(); + Map> groupMap = new HashMap>(); for (T jobResult : jobResults) { - List jobResultList = groupMap.get(jobResult.getJob().getSubmitNodeGroup()); - if (jobResultList == null) { - jobResultList = new ArrayList(); - groupMap.put(jobResult.getJob().getSubmitNodeGroup(), jobResultList); + List taskTrackerJobResultList = groupMap.get(jobResult.getJobWrapper().getJob().getSubmitNodeGroup()); + if (taskTrackerJobResultList == null) { + taskTrackerJobResultList = new ArrayList(); + groupMap.put(jobResult.getJobWrapper().getJob().getSubmitNodeGroup(), taskTrackerJobResultList); } - jobResultList.add(jobResult); + taskTrackerJobResultList.add(jobResult); } - for (Map.Entry> entry : groupMap.entrySet()) { + for (Map.Entry> entry : groupMap.entrySet()) { if (!send0(entry.getKey(), entry.getValue())) { - failedJobResult.addAll(entry.getValue()); + failedTaskTrackerJobResult.addAll(entry.getValue()); } } - clientNotifyHandler.handleFailed(failedJobResult); - return jobResults.size() - failedJobResult.size(); + clientNotifyHandler.handleFailed(failedTaskTrackerJobResult); + return jobResults.size() - failedTaskTrackerJobResult.size(); } return jobResults.size(); } @@ -87,16 +88,25 @@ public int send(List jobResults) { * 返回是否发送成功还是失败 * * @param nodeGroup - * @param jobResults + * @param taskTrackerJobResults * @return */ - private boolean send0(String nodeGroup, final List jobResults) { + private boolean send0(String nodeGroup, final List taskTrackerJobResults) { // 得到 可用的客户端节点 JobClientNode jobClientNode = application.getJobClientManager().getAvailableJobClient(nodeGroup); if (jobClientNode == null) { return false; } + List jobResults = new ArrayList(taskTrackerJobResults.size()); + for (TaskTrackerJobResult taskTrackerJobResult : taskTrackerJobResults) { + JobResult jobResult = new JobResult(); + jobResult.setJob(taskTrackerJobResult.getJobWrapper().getJob()); + jobResult.setSuccess(taskTrackerJobResult.isSuccess()); + jobResult.setMsg(taskTrackerJobResult.getMsg()); + jobResult.setTime(taskTrackerJobResult.getTime()); + jobResults.add(jobResult); + } JobFinishedRequest requestBody = application.getCommandBodyWrapper().wrapper(new JobFinishedRequest()); requestBody.setJobResults(jobResults); @@ -112,7 +122,7 @@ public void operationComplete(ResponseFuture responseFuture) { RemotingCommand commandResponse = responseFuture.getResponseCommand(); if (commandResponse != null && commandResponse.getCode() == JobProtos.ResponseCode.JOB_NOTIFY_SUCCESS.code()) { - clientNotifyHandler.handleSuccess(jobResults); + clientNotifyHandler.handleSuccess(taskTrackerJobResults); result.set(true); } else { result.set(false); diff --git a/lts-jobtracker/src/main/java/com/lts/jobtracker/support/ClientNotifyHandler.java b/lts-jobtracker/src/main/java/com/lts/jobtracker/support/ClientNotifyHandler.java index 1662c1eff..84eefcdda 100644 --- a/lts-jobtracker/src/main/java/com/lts/jobtracker/support/ClientNotifyHandler.java +++ b/lts-jobtracker/src/main/java/com/lts/jobtracker/support/ClientNotifyHandler.java @@ -1,13 +1,13 @@ package com.lts.jobtracker.support; -import com.lts.core.domain.JobResult; +import com.lts.core.domain.TaskTrackerJobResult; import java.util.List; /** * @author Robert HG (254963746@qq.com) on 3/3/15. */ -public interface ClientNotifyHandler { +public interface ClientNotifyHandler { /** * 通知成功的处理 diff --git a/lts-jobtracker/src/main/java/com/lts/jobtracker/support/JobDomainConverter.java b/lts-jobtracker/src/main/java/com/lts/jobtracker/support/JobDomainConverter.java index fa9f9a807..44f28fecf 100644 --- a/lts-jobtracker/src/main/java/com/lts/jobtracker/support/JobDomainConverter.java +++ b/lts-jobtracker/src/main/java/com/lts/jobtracker/support/JobDomainConverter.java @@ -2,9 +2,10 @@ import com.lts.biz.logger.domain.JobLogPo; import com.lts.core.domain.Job; -import com.lts.core.domain.JobResult; +import com.lts.core.domain.TaskTrackerJobResult; import com.lts.core.commons.utils.DateUtils; import com.lts.core.commons.utils.StringUtils; +import com.lts.core.domain.JobWrapper; import com.lts.queue.domain.JobFeedbackPo; import com.lts.queue.domain.JobPo; @@ -44,7 +45,7 @@ public static JobPo convert(Job job) { * @param jobPo * @return */ - public static Job convert(JobPo jobPo) { + public static JobWrapper convert(JobPo jobPo) { Job job = new Job(); job.setPriority(jobPo.getPriority()); job.setExtParams(jobPo.getExtParams()); @@ -52,22 +53,24 @@ public static Job convert(JobPo jobPo) { job.setTaskId(jobPo.getTaskId()); job.setTaskTrackerNodeGroup(jobPo.getTaskTrackerNodeGroup()); job.setNeedFeedback(jobPo.isNeedFeedback()); - job.setJobId(jobPo.getJobId()); job.setCronExpression(jobPo.getCronExpression()); job.setTriggerTime(jobPo.getTriggerTime()); - return job; + return new JobWrapper(jobPo.getJobId(), job); } - public static JobLogPo convertJobLog(Job job) { + public static JobLogPo convertJobLog(JobWrapper jobWrapper) { JobLogPo jobLogPo = new JobLogPo(); jobLogPo.setTimestamp(DateUtils.currentTimeMillis()); + + Job job = jobWrapper.getJob(); + jobLogPo.setPriority(job.getPriority()); jobLogPo.setExtParams(job.getExtParams()); jobLogPo.setSubmitNodeGroup(job.getSubmitNodeGroup()); jobLogPo.setTaskId(job.getTaskId()); jobLogPo.setTaskTrackerNodeGroup(job.getTaskTrackerNodeGroup()); jobLogPo.setNeedFeedback(job.isNeedFeedback()); - jobLogPo.setJobId(job.getJobId()); + jobLogPo.setJobId(jobWrapper.getJobId()); jobLogPo.setCronExpression(job.getCronExpression()); jobLogPo.setTriggerTime(job.getTriggerTime()); return jobLogPo; @@ -89,9 +92,9 @@ public static JobLogPo convertJobLog(JobPo jobPo) { return jobLogPo; } - public static JobFeedbackPo convert(JobResult jobResult) { + public static JobFeedbackPo convert(TaskTrackerJobResult taskTrackerJobResult) { JobFeedbackPo jobFeedbackPo = new JobFeedbackPo(); - jobFeedbackPo.setJobResult(jobResult); + jobFeedbackPo.setTaskTrackerJobResult(taskTrackerJobResult); jobFeedbackPo.setId(StringUtils.generateUUID()); jobFeedbackPo.setGmtCreated(System.currentTimeMillis()); return jobFeedbackPo; diff --git a/lts-jobtracker/src/main/java/com/lts/jobtracker/support/JobPusher.java b/lts-jobtracker/src/main/java/com/lts/jobtracker/support/JobPusher.java index 86f445486..e916358fc 100644 --- a/lts-jobtracker/src/main/java/com/lts/jobtracker/support/JobPusher.java +++ b/lts-jobtracker/src/main/java/com/lts/jobtracker/support/JobPusher.java @@ -5,6 +5,7 @@ import com.lts.core.constant.Constants; import com.lts.core.constant.Level; import com.lts.core.domain.Job; +import com.lts.core.domain.JobWrapper; import com.lts.core.exception.RemotingSendException; import com.lts.core.exception.RequestTimeoutException; import com.lts.core.factory.NamedThreadFactory; @@ -116,8 +117,7 @@ private PushResult sendJob(RemotingServerDelegate remotingServer, TaskTrackerNod } JobPushRequest body = application.getCommandBodyWrapper().wrapper(new JobPushRequest()); - Job job = JobDomainConverter.convert(jobPo); - body.setJob(job); + body.setJobWrapper(JobDomainConverter.convert(jobPo)); RemotingCommand commandRequest = RemotingCommand.createRequestCommand(JobProtos.RequestCode.PUSH_JOB.code(), body); // 是否分发推送任务成功 @@ -155,7 +155,7 @@ public void operationComplete(ResponseFuture responseFuture) { if (!pushSuccess.get()) { if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Job push failed! nodeGroup=" + nodeGroup + ", identity=" + identity + ", job=" + job); + LOGGER.debug("Job push failed! nodeGroup=" + nodeGroup + ", identity=" + identity + ", job=" + jobPo); } application.getExecutableJobQueue().resume(jobPo); return PushResult.FAILED; @@ -165,7 +165,7 @@ public void operationComplete(ResponseFuture responseFuture) { } catch (DuplicateJobException e) { // ignore } - application.getExecutableJobQueue().remove(job.getTaskTrackerNodeGroup(), job.getJobId()); + application.getExecutableJobQueue().remove(jobPo.getTaskTrackerNodeGroup(), jobPo.getJobId()); // 记录日志 JobLogPo jobLogPo = JobDomainConverter.convertJobLog(jobPo); diff --git a/lts-jobtracker/src/main/java/com/lts/jobtracker/support/checker/FeedbackJobSendChecker.java b/lts-jobtracker/src/main/java/com/lts/jobtracker/support/checker/FeedbackJobSendChecker.java index a0f0bc5b0..1c37791c8 100644 --- a/lts-jobtracker/src/main/java/com/lts/jobtracker/support/checker/FeedbackJobSendChecker.java +++ b/lts-jobtracker/src/main/java/com/lts/jobtracker/support/checker/FeedbackJobSendChecker.java @@ -1,6 +1,6 @@ package com.lts.jobtracker.support.checker; -import com.lts.core.domain.JobResult; +import com.lts.core.domain.TaskTrackerJobResult; import com.lts.core.logger.Logger; import com.lts.core.logger.LoggerFactory; import com.lts.core.commons.utils.CollectionUtils; @@ -45,17 +45,17 @@ private boolean isStart() { public FeedbackJobSendChecker(final JobTrackerApplication application) { this.application = application; - clientNotifier = new ClientNotifier(application, new ClientNotifyHandler() { + clientNotifier = new ClientNotifier(application, new ClientNotifyHandler() { @Override - public void handleSuccess(List jobResults) { - for (JobResultWrapper jobResult : jobResults) { - String submitNodeGroup = jobResult.getJob().getSubmitNodeGroup(); + public void handleSuccess(List jobResults) { + for (TaskTrackerJobResultWrapper jobResult : jobResults) { + String submitNodeGroup = jobResult.getJobWrapper().getJob().getSubmitNodeGroup(); application.getJobFeedbackQueue().remove(submitNodeGroup, jobResult.getId()); } } @Override - public void handleFailed(List jobResults) { + public void handleFailed(List jobResults) { // do nothing } }); @@ -136,7 +136,7 @@ private void check(String jobClientNodeGroup) { return; } - LOGGER.info("{} job need to feedback.", count); + LOGGER.info("{} jobs need to feedback.", count); // 检测是否有可用的客户端 List jobFeedbackPos; @@ -146,12 +146,12 @@ private void check(String jobClientNodeGroup) { if (CollectionUtils.isEmpty(jobFeedbackPos)) { return; } - List jobResults = new ArrayList(jobFeedbackPos.size()); + List jobResults = new ArrayList(jobFeedbackPos.size()); for (JobFeedbackPo jobFeedbackPo : jobFeedbackPos) { // 判断是否是过时的数据,如果是,那么移除 if (oldDataHandler == null || (oldDataHandler != null && !oldDataHandler.handle(application.getJobFeedbackQueue(), jobFeedbackPo, jobFeedbackPo))) { - jobResults.add(new JobResultWrapper(jobFeedbackPo.getId(), jobFeedbackPo.getJobResult())); + jobResults.add(new TaskTrackerJobResultWrapper(jobFeedbackPo.getId(), jobFeedbackPo.getTaskTrackerJobResult())); } } // 返回发送成功的个数 @@ -162,19 +162,19 @@ private void check(String jobClientNodeGroup) { } } - private class JobResultWrapper extends JobResult { + private class TaskTrackerJobResultWrapper extends TaskTrackerJobResult { private String id; public String getId() { return id; } - public JobResultWrapper(String id, JobResult jobResult) { + public TaskTrackerJobResultWrapper(String id, TaskTrackerJobResult taskTrackerJobResult) { this.id = id; - setJob(jobResult.getJob()); - setMsg(jobResult.getMsg()); - setSuccess(jobResult.isSuccess()); - setTime(jobResult.getTime()); + setJobWrapper(taskTrackerJobResult.getJobWrapper()); + setMsg(taskTrackerJobResult.getMsg()); + setSuccess(taskTrackerJobResult.isSuccess()); + setTime(taskTrackerJobResult.getTime()); } } diff --git a/lts-jobtracker/src/main/java/com/lts/jobtracker/support/policy/OldDataDeletePolicy.java b/lts-jobtracker/src/main/java/com/lts/jobtracker/support/policy/OldDataDeletePolicy.java index 5bc9371f0..7046abf19 100644 --- a/lts-jobtracker/src/main/java/com/lts/jobtracker/support/policy/OldDataDeletePolicy.java +++ b/lts-jobtracker/src/main/java/com/lts/jobtracker/support/policy/OldDataDeletePolicy.java @@ -22,7 +22,7 @@ public boolean handle(JobFeedbackQueue jobFeedbackQueue, JobFeedbackPo jobFeedba if (System.currentTimeMillis() - jobFeedbackPo.getGmtCreated() > expired) { // delete - jobFeedbackQueue.remove(po.getJobResult().getJob().getTaskTrackerNodeGroup(), po.getId()); + jobFeedbackQueue.remove(po.getTaskTrackerJobResult().getJobWrapper().getJob().getTaskTrackerNodeGroup(), po.getId()); return true; } diff --git a/lts-queue/lts-queue-api/src/main/java/com/lts/queue/domain/JobFeedbackPo.java b/lts-queue/lts-queue-api/src/main/java/com/lts/queue/domain/JobFeedbackPo.java index 7c39b85f1..d510b4e8a 100644 --- a/lts-queue/lts-queue-api/src/main/java/com/lts/queue/domain/JobFeedbackPo.java +++ b/lts-queue/lts-queue-api/src/main/java/com/lts/queue/domain/JobFeedbackPo.java @@ -1,7 +1,7 @@ package com.lts.queue.domain; -import com.lts.core.domain.JobResult; +import com.lts.core.domain.TaskTrackerJobResult; /** * @author Robert HG (254963746@qq.com) on 3/3/15. @@ -12,14 +12,14 @@ public class JobFeedbackPo{ private Long gmtCreated; - private JobResult jobResult; + private TaskTrackerJobResult taskTrackerJobResult; - public JobResult getJobResult() { - return jobResult; + public TaskTrackerJobResult getTaskTrackerJobResult() { + return taskTrackerJobResult; } - public void setJobResult(JobResult jobResult) { - this.jobResult = jobResult; + public void setTaskTrackerJobResult(TaskTrackerJobResult taskTrackerJobResult) { + this.taskTrackerJobResult = taskTrackerJobResult; } public Long getGmtCreated() { diff --git a/lts-queue/lts-queue-mongo/src/main/java/com/lts/queue/mongo/MongoJobFeedbackQueue.java b/lts-queue/lts-queue-mongo/src/main/java/com/lts/queue/mongo/MongoJobFeedbackQueue.java index 874979339..15b49445f 100644 --- a/lts-queue/lts-queue-mongo/src/main/java/com/lts/queue/mongo/MongoJobFeedbackQueue.java +++ b/lts-queue/lts-queue-mongo/src/main/java/com/lts/queue/mongo/MongoJobFeedbackQueue.java @@ -47,7 +47,7 @@ public boolean add(List jobFeedbackPos) { } for (JobFeedbackPo jobFeedbackPo : jobFeedbackPos) { String tableName = JobQueueUtils.getFeedbackQueueName( - jobFeedbackPo.getJobResult().getJob().getSubmitNodeGroup()); + jobFeedbackPo.getTaskTrackerJobResult().getJobWrapper().getJob().getSubmitNodeGroup()); try { template.save(tableName, jobFeedbackPo); } catch (DuplicateKeyException e) { diff --git a/lts-queue/lts-queue-mysql/src/main/java/com/lts/queue/mysql/MysqlJobFeedbackQueue.java b/lts-queue/lts-queue-mysql/src/main/java/com/lts/queue/mysql/MysqlJobFeedbackQueue.java index a017313f6..e68b6aafe 100644 --- a/lts-queue/lts-queue-mysql/src/main/java/com/lts/queue/mysql/MysqlJobFeedbackQueue.java +++ b/lts-queue/lts-queue-mysql/src/main/java/com/lts/queue/mysql/MysqlJobFeedbackQueue.java @@ -6,7 +6,7 @@ import com.lts.core.commons.utils.CollectionUtils; import com.lts.core.commons.utils.JSONUtils; import com.lts.core.support.JobQueueUtils; -import com.lts.core.domain.JobResult; +import com.lts.core.domain.TaskTrackerJobResult; import com.lts.queue.JobFeedbackQueue; import com.lts.queue.domain.JobFeedbackPo; import com.lts.queue.exception.JobQueueException; @@ -73,9 +73,9 @@ public boolean add(List jobFeedbackPos) { Object[] params = new Object[2]; for (JobFeedbackPo jobFeedbackPo : jobFeedbackPos) { params[0] = jobFeedbackPo.getGmtCreated(); - params[1] = JSONUtils.toJSONString(jobFeedbackPo.getJobResult()); + params[1] = JSONUtils.toJSONString(jobFeedbackPo.getTaskTrackerJobResult()); try { - String jobClientNodeGroup = jobFeedbackPo.getJobResult().getJob().getSubmitNodeGroup(); + String jobClientNodeGroup = jobFeedbackPo.getTaskTrackerJobResult().getJobWrapper().getJob().getSubmitNodeGroup(); getSqlTemplate().update(getRealSql(insertSQL, jobClientNodeGroup), params); } catch (SQLException e) { throw new JobQueueException(e); @@ -125,7 +125,7 @@ public List handle(ResultSet rs) throws SQLException { while (rs.next()) { JobFeedbackPo jobFeedbackPo = new JobFeedbackPo(); jobFeedbackPo.setId(rs.getString("id")); - jobFeedbackPo.setJobResult(JSONUtils.parse(rs.getString("job_result"), new TypeReference() { + jobFeedbackPo.setTaskTrackerJobResult(JSONUtils.parse(rs.getString("job_result"), new TypeReference() { })); jobFeedbackPo.setGmtCreated(rs.getLong("gmt_created")); jobFeedbackPos.add(jobFeedbackPo); diff --git a/lts-tasktracker/src/main/java/com/lts/tasktracker/Result.java b/lts-tasktracker/src/main/java/com/lts/tasktracker/Result.java index 96076a7c0..2937440a5 100644 --- a/lts-tasktracker/src/main/java/com/lts/tasktracker/Result.java +++ b/lts-tasktracker/src/main/java/com/lts/tasktracker/Result.java @@ -1,7 +1,7 @@ package com.lts.tasktracker; /** - * Created by hugui on 6/12/15. + * @author Robert HG (254963746@qq.com) on 6/12/15. */ public class Result { diff --git a/lts-tasktracker/src/main/java/com/lts/tasktracker/domain/Response.java b/lts-tasktracker/src/main/java/com/lts/tasktracker/domain/Response.java index 6536b8693..b4c755437 100644 --- a/lts-tasktracker/src/main/java/com/lts/tasktracker/domain/Response.java +++ b/lts-tasktracker/src/main/java/com/lts/tasktracker/domain/Response.java @@ -1,6 +1,6 @@ package com.lts.tasktracker.domain; -import com.lts.core.domain.Job; +import com.lts.core.domain.JobWrapper; /** * @author Robert HG (254963746@qq.com) on 8/14/14. @@ -11,7 +11,7 @@ public class Response { private String msg; - private Job job; + private JobWrapper jobWrapper; /** * 是否接收新任务 @@ -34,12 +34,12 @@ public void setMsg(String msg) { this.msg = msg; } - public Job getJob() { - return job; + public JobWrapper getJobWrapper() { + return jobWrapper; } - public void setJob(Job job) { - this.job = job; + public void setJobWrapper(JobWrapper jobWrapper) { + this.jobWrapper = jobWrapper; } public boolean isReceiveNewJob() { diff --git a/lts-tasktracker/src/main/java/com/lts/tasktracker/processor/JobPushProcessor.java b/lts-tasktracker/src/main/java/com/lts/tasktracker/processor/JobPushProcessor.java index e8e3e5633..87b6faf77 100644 --- a/lts-tasktracker/src/main/java/com/lts/tasktracker/processor/JobPushProcessor.java +++ b/lts-tasktracker/src/main/java/com/lts/tasktracker/processor/JobPushProcessor.java @@ -1,14 +1,14 @@ package com.lts.tasktracker.processor; import com.lts.core.constant.Constants; -import com.lts.core.domain.Job; -import com.lts.core.domain.JobResult; +import com.lts.core.domain.TaskTrackerJobResult; +import com.lts.core.domain.JobWrapper; import com.lts.core.exception.JobTrackerNotFoundException; import com.lts.core.exception.RequestTimeoutException; import com.lts.core.logger.Logger; import com.lts.core.logger.LoggerFactory; import com.lts.core.protocol.JobProtos; -import com.lts.core.protocol.command.JobFinishedRequest; +import com.lts.core.protocol.command.TtJobFinishedRequest; import com.lts.core.protocol.command.JobPushRequest; import com.lts.core.remoting.RemotingClientDelegate; import com.lts.core.support.RetryScheduler; @@ -40,15 +40,15 @@ public class JobPushProcessor extends AbstractProcessor { protected JobPushProcessor(final RemotingClientDelegate remotingClient, TaskTrackerApplication application) { super(remotingClient, application); - retryScheduler = new RetryScheduler(application, 3) { + retryScheduler = new RetryScheduler(application, 3) { @Override protected boolean isRemotingEnable() { return remotingClient.isServerEnable(); } @Override - protected boolean retry(List jobResults) { - return retrySendJobResults(jobResults); + protected boolean retry(List taskTrackerJobResults) { + return retrySendJobResults(taskTrackerJobResults); } }; retryScheduler.setName("JobPush"); @@ -64,10 +64,10 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, final RemotingC JobPushRequest requestBody = request.getBody(); // JobTracker 分发来的 job - final Job job = requestBody.getJob(); + final JobWrapper jobWrapper = requestBody.getJobWrapper(); try { - application.getRunnerPool().execute(job, jobRunnerCallback); + application.getRunnerPool().execute(jobWrapper, jobRunnerCallback); } catch (NoAvailableJobRunnerException e) { // 任务推送失败 return RemotingCommand.createResponseCommand(JobProtos.ResponseCode.NO_AVAILABLE_JOB_RUNNER.code(), @@ -83,15 +83,15 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, final RemotingC */ private class JobRunnerCallback implements RunnerCallback { @Override - public Job runComplete(Response response) { + public JobWrapper runComplete(Response response) { // 发送消息给 JobTracker - final JobResult jobResult = new JobResult(); - jobResult.setTime(System.currentTimeMillis()); - jobResult.setJob(response.getJob()); - jobResult.setSuccess(response.isSuccess()); - jobResult.setMsg(response.getMsg()); - JobFinishedRequest requestBody = application.getCommandBodyWrapper().wrapper(new JobFinishedRequest()); - requestBody.addJobResult(jobResult); + final TaskTrackerJobResult taskTrackerJobResult = new TaskTrackerJobResult(); + taskTrackerJobResult.setTime(System.currentTimeMillis()); + taskTrackerJobResult.setJobWrapper(response.getJobWrapper()); + taskTrackerJobResult.setSuccess(response.isSuccess()); + taskTrackerJobResult.setMsg(response.getMsg()); + TtJobFinishedRequest requestBody = application.getCommandBodyWrapper().wrapper(new TtJobFinishedRequest()); + requestBody.addJobResult(taskTrackerJobResult); requestBody.setReceiveNewJob(response.isReceiveNewJob()); // 设置可以接受新任务 int requestCode = JobProtos.RequestCode.JOB_FINISHED.code(); @@ -111,15 +111,15 @@ public void operationComplete(ResponseFuture responseFuture) { if (commandResponse != null && commandResponse.getCode() == RemotingProtos.ResponseCode.SUCCESS.code()) { JobPushRequest jobPushRequest = commandResponse.getBody(); if (jobPushRequest != null) { - LOGGER.info("Get new job :{}", jobPushRequest.getJob()); - returnResponse.setJob(jobPushRequest.getJob()); + LOGGER.info("Get new job :{}", jobPushRequest.getJobWrapper()); + returnResponse.setJobWrapper(jobPushRequest.getJobWrapper()); } } else { - LOGGER.info("Job feedback failed, save local files。{}", jobResult); + LOGGER.info("Job feedback failed, save local files。{}", taskTrackerJobResult); try { retryScheduler.inSchedule( - jobResult.getJob().getJobId().concat("_") + System.currentTimeMillis(), - jobResult); + taskTrackerJobResult.getJobWrapper().getJobId().concat("_") + System.currentTimeMillis(), + taskTrackerJobResult); } catch (Exception e) { LOGGER.error("Job feedback failed", e); } @@ -139,27 +139,27 @@ public void operationComplete(ResponseFuture responseFuture) { try { LOGGER.warn("No job tracker available! save local files."); retryScheduler.inSchedule( - jobResult.getJob().getJobId().concat("_") + System.currentTimeMillis(), - jobResult); + taskTrackerJobResult.getJobWrapper().getJobId().concat("_") + System.currentTimeMillis(), + taskTrackerJobResult); } catch (Exception e1) { - LOGGER.error("Save files failed, {}", jobResult.getJob(), e1); + LOGGER.error("Save files failed, {}", taskTrackerJobResult.getJobWrapper(), e1); } } - return returnResponse.getJob(); + return returnResponse.getJobWrapper(); } } /** * 发送JobResults * - * @param jobResults + * @param taskTrackerJobResults * @return */ - private boolean retrySendJobResults(List jobResults) { + private boolean retrySendJobResults(List taskTrackerJobResults) { // 发送消息给 JobTracker - JobFinishedRequest requestBody = application.getCommandBodyWrapper().wrapper(new JobFinishedRequest()); - requestBody.setJobResults(jobResults); + TtJobFinishedRequest requestBody = application.getCommandBodyWrapper().wrapper(new TtJobFinishedRequest()); + requestBody.setTaskTrackerJobResults(taskTrackerJobResults); requestBody.setReSend(true); int requestCode = JobProtos.RequestCode.JOB_FINISHED.code(); @@ -175,7 +175,7 @@ private boolean retrySendJobResults(List jobResults) { return false; } } catch (JobTrackerNotFoundException e) { - LOGGER.error("Retry send job result failed! jobResults={}", jobResults, e); + LOGGER.error("Retry send job result failed! jobResults={}", taskTrackerJobResults, e); } return false; } diff --git a/lts-tasktracker/src/main/java/com/lts/tasktracker/runner/JobRunnerDelegate.java b/lts-tasktracker/src/main/java/com/lts/tasktracker/runner/JobRunnerDelegate.java index 3498f8f7a..35ca71251 100644 --- a/lts-tasktracker/src/main/java/com/lts/tasktracker/runner/JobRunnerDelegate.java +++ b/lts-tasktracker/src/main/java/com/lts/tasktracker/runner/JobRunnerDelegate.java @@ -1,6 +1,6 @@ package com.lts.tasktracker.runner; -import com.lts.core.domain.Job; +import com.lts.core.domain.JobWrapper; import com.lts.core.logger.Logger; import com.lts.core.logger.LoggerFactory; import com.lts.tasktracker.Result; @@ -19,14 +19,14 @@ public class JobRunnerDelegate implements Runnable { private static final Logger LOGGER = LoggerFactory.getLogger("LTS.TaskTracker"); - private Job job; + private JobWrapper jobWrapper; private RunnerCallback callback; private BizLoggerImpl logger; private TaskTrackerApplication application; public JobRunnerDelegate(TaskTrackerApplication application, - Job job, RunnerCallback callback) { - this.job = job; + JobWrapper jobWrapper, RunnerCallback callback) { + this.jobWrapper = jobWrapper; this.callback = callback; this.application = application; this.logger = (BizLoggerImpl) BizLoggerFactory.getLogger( @@ -39,32 +39,32 @@ public void run() { try { LtsLoggerFactory.setLogger(logger); - while (job != null) { + while (jobWrapper != null) { // 设置当前context中的jobId - logger.setId(job.getJobId(), job.getTaskId()); + logger.setId(jobWrapper.getJobId(), jobWrapper.getJob().getTaskId()); Response response = new Response(); - response.setJob(job); + response.setJobWrapper(jobWrapper); try { - application.getRunnerPool().getRunningJobManager().in(job.getJobId()); - Result result = application.getRunnerPool().getRunnerFactory().newRunner().run(job); + application.getRunnerPool().getRunningJobManager().in(jobWrapper.getJobId()); + Result result = application.getRunnerPool().getRunnerFactory().newRunner().run(jobWrapper.getJob()); if (result == null) { response.setSuccess(true); } else { response.setSuccess(result.isSuccess()); response.setMsg(result.getMsg()); } - LOGGER.info("Job exec success : {}", job); + LOGGER.info("Job exec success : {}", jobWrapper); } catch (Throwable t) { StringWriter sw = new StringWriter(); t.printStackTrace(new PrintWriter(sw)); response.setSuccess(false); response.setMsg(sw.toString()); - LOGGER.info("Job exec error : {} {}", job, t.getMessage(), t); + LOGGER.info("Job exec error : {} {}", jobWrapper, t.getMessage(), t); } finally { logger.removeId(); - application.getRunnerPool().getRunningJobManager().out(job.getJobId()); + application.getRunnerPool().getRunningJobManager().out(jobWrapper.getJobId()); } - job = callback.runComplete(response); + jobWrapper = callback.runComplete(response); } } finally { LtsLoggerFactory.remove(); diff --git a/lts-tasktracker/src/main/java/com/lts/tasktracker/runner/RunnerCallback.java b/lts-tasktracker/src/main/java/com/lts/tasktracker/runner/RunnerCallback.java index 43031989d..203d8d482 100644 --- a/lts-tasktracker/src/main/java/com/lts/tasktracker/runner/RunnerCallback.java +++ b/lts-tasktracker/src/main/java/com/lts/tasktracker/runner/RunnerCallback.java @@ -1,6 +1,6 @@ package com.lts.tasktracker.runner; -import com.lts.core.domain.Job; +import com.lts.core.domain.JobWrapper; import com.lts.tasktracker.domain.Response; /** @@ -13,6 +13,6 @@ public interface RunnerCallback { * @param response * @return 如果有新的任务, 那么返回新的任务过来 */ - public Job runComplete(Response response); + public JobWrapper runComplete(Response response); } diff --git a/lts-tasktracker/src/main/java/com/lts/tasktracker/runner/RunnerPool.java b/lts-tasktracker/src/main/java/com/lts/tasktracker/runner/RunnerPool.java index cd79ea50e..30df25adf 100644 --- a/lts-tasktracker/src/main/java/com/lts/tasktracker/runner/RunnerPool.java +++ b/lts-tasktracker/src/main/java/com/lts/tasktracker/runner/RunnerPool.java @@ -2,6 +2,7 @@ import com.lts.core.constant.EcTopic; import com.lts.core.domain.Job; +import com.lts.core.domain.JobWrapper; import com.lts.core.logger.Logger; import com.lts.core.logger.LoggerFactory; import com.lts.core.commons.collect.ConcurrentHashSet; @@ -57,12 +58,12 @@ public void onObserved(EventInfo eventInfo) { }), EcTopic.WORK_THREAD_CHANGE); } - public void execute(Job job, RunnerCallback callback) throws NoAvailableJobRunnerException { + public void execute(JobWrapper jobWrapper, RunnerCallback callback) throws NoAvailableJobRunnerException { try { threadPoolExecutor.execute( - new JobRunnerDelegate(application, job, callback)); + new JobRunnerDelegate(application, jobWrapper, callback)); if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Receive job success ! " + job); + LOGGER.debug("Receive job success ! " + jobWrapper); } } catch (RejectedExecutionException e) { LOGGER.warn("No more thread to run job .");