Skip to content

Commit

Permalink
performance sth
Browse files Browse the repository at this point in the history
  • Loading branch information
胡贵 committed Jun 13, 2015
1 parent 7bb95cd commit 218ac0c
Show file tree
Hide file tree
Showing 30 changed files with 362 additions and 234 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import static org.junit.Assert.*;

/**
* Created by hugui on 6/12/15.
* @author Robert HG (254963746@qq.com) on 6/12/15.
*/
public class MysqlJobLoggerTest {

Expand Down
13 changes: 2 additions & 11 deletions lts-core/src/main/java/com/lts/core/domain/Job.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package com.lts.core.domain;


import com.lts.core.exception.JobSubmitException;
import com.lts.core.support.CronExpression;
import com.lts.core.commons.utils.JSONUtils;
import com.lts.core.commons.utils.StringUtils;
import com.lts.core.exception.JobSubmitException;
import com.lts.core.support.CronExpression;
import com.lts.remoting.annotation.NotNull;

import java.util.Date;
Expand All @@ -16,7 +16,6 @@
*/
public class Job {

protected String jobId;
@NotNull
protected String taskId;
/**
Expand Down Expand Up @@ -85,14 +84,6 @@ public void setNeedFeedback(boolean needFeedback) {
this.needFeedback = needFeedback;
}

public String getJobId() {
return jobId;
}

public void setJobId(String jobId) {
this.jobId = jobId;
}

public Map<String, String> getExtParams() {
return extParams;
}
Expand Down
4 changes: 2 additions & 2 deletions lts-core/src/main/java/com/lts/core/domain/JobResult.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import com.lts.core.commons.utils.JSONUtils;

/**
* @author Robert HG (254963746@qq.com) on 8/19/14.
* 任务执行结果
* @author Robert HG (254963746@qq.com) on 6/13/15.
* 发送给客户端的 任务执行结果
*/
public class JobResult {

Expand Down
43 changes: 43 additions & 0 deletions lts-core/src/main/java/com/lts/core/domain/JobWrapper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package com.lts.core.domain;

/**
* @author Robert HG (254963746@qq.com) on 6/13/15.
*/
public class JobWrapper {

private String jobId;

private Job job;

public JobWrapper(String jobId, Job job) {
this.jobId = jobId;
this.job = job;
}

public JobWrapper() {
}

public String getJobId() {
return jobId;
}

public void setJobId(String jobId) {
this.jobId = jobId;
}

public Job getJob() {
return job;
}

public void setJob(Job job) {
this.job = job;
}

@Override
public String toString() {
return "JobWrapper{" +
"jobId='" + jobId + '\'' +
", job=" + job +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package com.lts.core.domain;

import com.lts.core.commons.utils.JSONUtils;

/**
* @author Robert HG (254963746@qq.com) on 8/19/14.
* TaskTracker 任务执行结果
*/
public class TaskTrackerJobResult {

private JobWrapper jobWrapper;

// 执行成功还是失败
private boolean success;

private String msg;
// 任务完成时间
private Long time;

public JobWrapper getJobWrapper() {
return jobWrapper;
}

public void setJobWrapper(JobWrapper jobWrapper) {
this.jobWrapper = jobWrapper;
}

public boolean isSuccess() {
return success;
}

public void setSuccess(boolean success) {
this.success = success;
}

public String getMsg() {
return msg;
}

public void setMsg(String msg) {
this.msg = msg;
}

public Long getTime() {
return time;
}

public void setTime(Long time) {
this.time = time;
}

@Override
public String toString() {
return JSONUtils.toJSONString(this);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.lts.core.failstore.berkeleydb;

import com.lts.core.cluster.Config;
import com.lts.core.commons.file.FileLock;
import com.lts.core.commons.file.FileUtils;
import com.lts.core.commons.utils.CollectionUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public LeveldbFailStore(String failStorePath) {
dbPath = FileUtils.createDirIfNotExist(failStorePath);
options = new Options();
options.createIfMissing(true);
// options.compressionType(CompressionType.NONE);
options.cacheSize(100 * 1024 * 1024); // 100M
// options.logger(new Logger() {
// @Override
Expand Down Expand Up @@ -107,10 +108,13 @@ public void delete(List<String> keys) throws FailStoreException {

@Override
public <T> List<KVPair<String, T>> fetchTop(int size, Type type) throws FailStoreException {
Snapshot snapshot = db.getSnapshot();
DBIterator iterator = null;
try {
List<KVPair<String, T>> list = new ArrayList<KVPair<String, T>>(size);
iterator = db.iterator();
ReadOptions options=new ReadOptions();
options.snapshot(snapshot);
iterator = db.iterator(options);
for (iterator.seekToFirst(); iterator.hasNext(); iterator.next()) {
Map.Entry<byte[], byte[]> entry = iterator.peekNext();
String key = new String(entry.getKey(), "UTF-8");
Expand All @@ -132,6 +136,11 @@ public <T> List<KVPair<String, T>> fetchTop(int size, Type type) throws FailStor
throw new FailStoreException(e);
}
}
try {
snapshot.close();
} catch (IOException e) {
throw new FailStoreException(e);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.lts.core.protocol.command;

import com.lts.core.domain.Job;
import com.lts.core.domain.JobWrapper;
import com.lts.remoting.annotation.NotNull;

/**
Expand All @@ -9,13 +9,13 @@
public class JobPushRequest extends AbstractCommandBody{

@NotNull
private Job job;
private JobWrapper jobWrapper;

public Job getJob() {
return job;
public JobWrapper getJobWrapper() {
return jobWrapper;
}

public void setJob(Job job) {
this.job = job;
public void setJobWrapper(JobWrapper jobWrapper) {
this.jobWrapper = jobWrapper;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package com.lts.core.protocol.command;

import com.lts.core.domain.TaskTrackerJobResult;
import com.lts.remoting.annotation.NotNull;

import java.util.ArrayList;
import java.util.List;

/**
* @author Robert HG (254963746@qq.com) on 8/16/14.
* TaskTracker Job finished request command body
*/
public class TtJobFinishedRequest extends AbstractCommandBody {
/**
* 是否接受新任务
*/
private boolean receiveNewJob = false;

@NotNull
private List<TaskTrackerJobResult> taskTrackerJobResults;

// 是否是重发(重发是批量发)
private boolean reSend = false;

public boolean isReSend() {
return reSend;
}

public void setReSend(boolean reSend) {
this.reSend = reSend;
}

public boolean isReceiveNewJob() {
return receiveNewJob;
}

public void setReceiveNewJob(boolean receiveNewJob) {
this.receiveNewJob = receiveNewJob;
}

public List<TaskTrackerJobResult> getTaskTrackerJobResults() {
return taskTrackerJobResults;
}

public void setTaskTrackerJobResults(List<TaskTrackerJobResult> taskTrackerJobResults) {
this.taskTrackerJobResults = taskTrackerJobResults;
}

public void addJobResult(TaskTrackerJobResult taskTrackerJobResult) {
if (taskTrackerJobResults == null) {
taskTrackerJobResults = new ArrayList<TaskTrackerJobResult>();
}
taskTrackerJobResults.add(taskTrackerJobResult);
}
}
17 changes: 2 additions & 15 deletions lts-core/src/main/java/com/lts/core/support/RetryScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,6 @@ public void stop() {
*/
private class CheckRunner implements Runnable {

// 一次最多提交maxSentSize个, 保证文件所也能被其他线程拿到
private int maxSentSize = 100;

@Override
public void run() {
try {
Expand All @@ -104,8 +101,6 @@ public void run() {
return;
}

int sentSize = 0;

List<KVPair<String, T>> kvPairs = null;
do {
try {
Expand All @@ -124,20 +119,11 @@ public void run() {
values.add(kvPair.getValue());
}
if (retry(values)) {
LOGGER.info("{} local files send success, {}", name, JSONUtils.toJSONString(values));
LOGGER.info("{} local files send success, size: {}, {}", name, values.size(), JSONUtils.toJSONString(values));
failStore.delete(keys);
} else {
break;
}
sentSize += kvPairs.size();
if (sentSize >= maxSentSize) {
// 一次最多提交maxSentSize个, 保证文件所也能被其他线程拿到
try {
Thread.sleep(1000L);
} catch (InterruptedException e1) {
LOGGER.warn(e1.getMessage(), e1);
}
}
} finally {
failStore.close();
}
Expand All @@ -155,6 +141,7 @@ public void inSchedule(String key, T value) {
try {
failStore.open();
failStore.put(key, value);
LOGGER.info("{} local files save success, {}", name, JSONUtils.toJSONString(value));
} finally {
failStore.close();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
package com.lts.core.failstore.leveldb;

import com.lts.core.cluster.Config;
import com.lts.core.cluster.NodeType;
import com.lts.core.commons.utils.CollectionUtils;
import com.lts.core.commons.utils.JSONUtils;
import com.lts.core.constant.Constants;
import com.lts.core.domain.Job;
import com.lts.core.domain.KVPair;
import com.lts.core.failstore.FailStore;
Expand All @@ -15,7 +12,7 @@
import java.util.List;

/**
* Created by hugui on 6/12/15.
* @author Robert HG (254963746@qq.com) on 6/12/15.
*/
public class LeveldbFailStoreTest {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package com.lts.example.support;

import com.lts.core.commons.utils.CollectionUtils;
import com.lts.jobclient.support.JobFinishedHandler;
import com.lts.core.domain.JobResult;
import com.lts.jobclient.support.JobFinishedHandler;
import com.lts.core.domain.TaskTrackerJobResult;

import java.text.SimpleDateFormat;
import java.util.Date;
Expand Down

0 comments on commit 218ac0c

Please sign in to comment.