Skip to content

Commit

Permalink
Merge pull request #40 from qq254963746/develop
Browse files Browse the repository at this point in the history
Develop
  • Loading branch information
qq254963746 committed Jun 13, 2015
2 parents 3bfc6c6 + 218ac0c commit 050a2b0
Show file tree
Hide file tree
Showing 34 changed files with 410 additions and 247 deletions.
4 changes: 2 additions & 2 deletions README.md
Expand Up @@ -54,7 +54,7 @@ LTS 轻量级分布式任务调度框架(Light Task Schedule)
###LTS Admin
![Aaron Swartz](https://raw.githubusercontent.com/qq254963746/light-task-schedule/master/doc/LTS_Admin.png)
###调用示例
下面提供的是最简单的配置方式。更多配置请查看 [lts-example](https://github.com/qq254963746/light-task-schedule/tree/master/lts-example/src/main/java/com/lts/job/example/api) 模块下的 API 调用方式例子.
下面提供的是最简单的配置方式。更多配置请查看 [lts-example](https://github.com/qq254963746/light-task-schedule/tree/master/lts-example/src/main/java/com/lts/example/api) 模块下的 API 调用方式例子.

####JobTracker 端
```java
Expand Down Expand Up @@ -85,7 +85,7 @@ LTS 轻量级分布式任务调度框架(Light Task Schedule)
public void run(Job job) throws Throwable {
System.out.println("我要执行"+ job);
System.out.println(job.getParam("shopId"));
// TODO 用户自己的业务逻辑
// TODO 用户自己的业务逻辑, 应该保证幂等
try {
Thread.sleep(5*1000L);
} catch (InterruptedException e) {
Expand Down
Expand Up @@ -14,7 +14,7 @@
import static org.junit.Assert.*;

/**
* Created by hugui on 6/12/15.
* @author Robert HG (254963746@qq.com) on 6/12/15.
*/
public class MysqlJobLoggerTest {

Expand Down
13 changes: 2 additions & 11 deletions lts-core/src/main/java/com/lts/core/domain/Job.java
@@ -1,10 +1,10 @@
package com.lts.core.domain;


import com.lts.core.exception.JobSubmitException;
import com.lts.core.support.CronExpression;
import com.lts.core.commons.utils.JSONUtils;
import com.lts.core.commons.utils.StringUtils;
import com.lts.core.exception.JobSubmitException;
import com.lts.core.support.CronExpression;
import com.lts.remoting.annotation.NotNull;

import java.util.Date;
Expand All @@ -16,7 +16,6 @@
*/
public class Job {

protected String jobId;
@NotNull
protected String taskId;
/**
Expand Down Expand Up @@ -85,14 +84,6 @@ public void setNeedFeedback(boolean needFeedback) {
this.needFeedback = needFeedback;
}

public String getJobId() {
return jobId;
}

public void setJobId(String jobId) {
this.jobId = jobId;
}

public Map<String, String> getExtParams() {
return extParams;
}
Expand Down
4 changes: 2 additions & 2 deletions lts-core/src/main/java/com/lts/core/domain/JobResult.java
Expand Up @@ -3,8 +3,8 @@
import com.lts.core.commons.utils.JSONUtils;

/**
* @author Robert HG (254963746@qq.com) on 8/19/14.
* 任务执行结果
* @author Robert HG (254963746@qq.com) on 6/13/15.
* 发送给客户端的 任务执行结果
*/
public class JobResult {

Expand Down
43 changes: 43 additions & 0 deletions lts-core/src/main/java/com/lts/core/domain/JobWrapper.java
@@ -0,0 +1,43 @@
package com.lts.core.domain;

/**
* @author Robert HG (254963746@qq.com) on 6/13/15.
*/
public class JobWrapper {

private String jobId;

private Job job;

public JobWrapper(String jobId, Job job) {
this.jobId = jobId;
this.job = job;
}

public JobWrapper() {
}

public String getJobId() {
return jobId;
}

public void setJobId(String jobId) {
this.jobId = jobId;
}

public Job getJob() {
return job;
}

public void setJob(Job job) {
this.job = job;
}

@Override
public String toString() {
return "JobWrapper{" +
"jobId='" + jobId + '\'' +
", job=" + job +
'}';
}
}
@@ -0,0 +1,56 @@
package com.lts.core.domain;

import com.lts.core.commons.utils.JSONUtils;

/**
* @author Robert HG (254963746@qq.com) on 8/19/14.
* TaskTracker 任务执行结果
*/
public class TaskTrackerJobResult {

private JobWrapper jobWrapper;

// 执行成功还是失败
private boolean success;

private String msg;
// 任务完成时间
private Long time;

public JobWrapper getJobWrapper() {
return jobWrapper;
}

public void setJobWrapper(JobWrapper jobWrapper) {
this.jobWrapper = jobWrapper;
}

public boolean isSuccess() {
return success;
}

public void setSuccess(boolean success) {
this.success = success;
}

public String getMsg() {
return msg;
}

public void setMsg(String msg) {
this.msg = msg;
}

public Long getTime() {
return time;
}

public void setTime(Long time) {
this.time = time;
}

@Override
public String toString() {
return JSONUtils.toJSONString(this);
}
}
@@ -1,6 +1,5 @@
package com.lts.core.failstore.berkeleydb;

import com.lts.core.cluster.Config;
import com.lts.core.commons.file.FileLock;
import com.lts.core.commons.file.FileUtils;
import com.lts.core.commons.utils.CollectionUtils;
Expand Down
Expand Up @@ -7,12 +7,11 @@
import com.lts.core.failstore.FailStore;
import com.lts.core.failstore.FailStoreException;
import org.fusesource.leveldbjni.JniDBFactory;
import org.iq80.leveldb.DB;
import org.iq80.leveldb.DBIterator;
import org.iq80.leveldb.Options;
import org.iq80.leveldb.*;

import java.io.File;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -39,13 +38,23 @@ public LeveldbFailStore(String failStorePath) {
failStorePath = failStorePath + "/leveldb/";
dbPath = FileUtils.createDirIfNotExist(failStorePath);
options = new Options();
options.createIfMissing(true);
// options.compressionType(CompressionType.NONE);
options.cacheSize(100 * 1024 * 1024); // 100M
// options.logger(new Logger() {
// @Override
// public void log(String message) {
// System.out.println(message);
// }
// });
lock = new FileLock(failStorePath + "___db.lock");
}

@Override
public void open() throws FailStoreException {
try {
lock.tryLock();
JniDBFactory.factory.repair(dbPath, options);
db = JniDBFactory.factory.open(dbPath, options);
} catch (IOException e) {
throw new FailStoreException(e);
Expand Down Expand Up @@ -79,16 +88,33 @@ public void delete(List<String> keys) throws FailStoreException {
if (keys == null || keys.size() == 0) {
return;
}
for (String key : keys) {
delete(key);
WriteBatch batch = db.createWriteBatch();
try {

for (String key : keys) {
batch.delete(key.getBytes("UTF-8"));
}
db.write(batch);
} catch (UnsupportedEncodingException e) {
throw new FailStoreException(e);
} finally {
try {
batch.close();
} catch (IOException e) {
throw new FailStoreException(e);
}
}
}

@Override
public <T> List<KVPair<String, T>> fetchTop(int size, Type type) throws FailStoreException {
Snapshot snapshot = db.getSnapshot();
DBIterator iterator = null;
try {
List<KVPair<String, T>> list = new ArrayList<KVPair<String, T>>(size);
DBIterator iterator = db.iterator();
ReadOptions options=new ReadOptions();
options.snapshot(snapshot);
iterator = db.iterator(options);
for (iterator.seekToFirst(); iterator.hasNext(); iterator.next()) {
Map.Entry<byte[], byte[]> entry = iterator.peekNext();
String key = new String(entry.getKey(), "UTF-8");
Expand All @@ -102,6 +128,19 @@ public <T> List<KVPair<String, T>> fetchTop(int size, Type type) throws FailStor
return list;
} catch (Exception e) {
throw new FailStoreException(e);
} finally {
if (iterator != null) {
try {
iterator.close();
} catch (IOException e) {
throw new FailStoreException(e);
}
}
try {
snapshot.close();
} catch (IOException e) {
throw new FailStoreException(e);
}
}
}

Expand Down
Expand Up @@ -103,7 +103,7 @@ public <T> List<KVPair<String, T>> fetchTop(int size, Type type) throws FailStor
try {
List<KVPair<String, T>> list = new ArrayList<KVPair<String, T>>(size);
iterator = db.newIterator();
for (iterator.seekToLast(); iterator.isValid(); iterator.prev()) {
for (iterator.seekToFirst(); iterator.isValid(); iterator.next()) {
iterator.status();
String key = new String(iterator.key(), "UTF-8");
T value = JSONUtils.parse(new String(iterator.value(), "UTF-8"), type);
Expand Down
@@ -1,6 +1,6 @@
package com.lts.core.protocol.command;

import com.lts.core.domain.Job;
import com.lts.core.domain.JobWrapper;
import com.lts.remoting.annotation.NotNull;

/**
Expand All @@ -9,13 +9,13 @@
public class JobPushRequest extends AbstractCommandBody{

@NotNull
private Job job;
private JobWrapper jobWrapper;

public Job getJob() {
return job;
public JobWrapper getJobWrapper() {
return jobWrapper;
}

public void setJob(Job job) {
this.job = job;
public void setJobWrapper(JobWrapper jobWrapper) {
this.jobWrapper = jobWrapper;
}
}
@@ -0,0 +1,55 @@
package com.lts.core.protocol.command;

import com.lts.core.domain.TaskTrackerJobResult;
import com.lts.remoting.annotation.NotNull;

import java.util.ArrayList;
import java.util.List;

/**
* @author Robert HG (254963746@qq.com) on 8/16/14.
* TaskTracker Job finished request command body
*/
public class TtJobFinishedRequest extends AbstractCommandBody {
/**
* 是否接受新任务
*/
private boolean receiveNewJob = false;

@NotNull
private List<TaskTrackerJobResult> taskTrackerJobResults;

// 是否是重发(重发是批量发)
private boolean reSend = false;

public boolean isReSend() {
return reSend;
}

public void setReSend(boolean reSend) {
this.reSend = reSend;
}

public boolean isReceiveNewJob() {
return receiveNewJob;
}

public void setReceiveNewJob(boolean receiveNewJob) {
this.receiveNewJob = receiveNewJob;
}

public List<TaskTrackerJobResult> getTaskTrackerJobResults() {
return taskTrackerJobResults;
}

public void setTaskTrackerJobResults(List<TaskTrackerJobResult> taskTrackerJobResults) {
this.taskTrackerJobResults = taskTrackerJobResults;
}

public void addJobResult(TaskTrackerJobResult taskTrackerJobResult) {
if (taskTrackerJobResults == null) {
taskTrackerJobResults = new ArrayList<TaskTrackerJobResult>();
}
taskTrackerJobResults.add(taskTrackerJobResult);
}
}

0 comments on commit 050a2b0

Please sign in to comment.