Skip to content
This repository has been archived by the owner on May 7, 2020. It is now read-only.

Commit

Permalink
Synchronous Execution of Astro Startup Jobs and minor fixes (#3746)
Browse files Browse the repository at this point in the history
Signed-off-by: Amit Kumar Mondal <admin@amitinside.com>
  • Loading branch information
amitjoy authored and kaikreuzer committed Jun 26, 2017
1 parent 0bae793 commit 04a064c
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 60 deletions.
Expand Up @@ -21,8 +21,6 @@
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
Expand Down Expand Up @@ -71,11 +69,9 @@ public abstract class AstroThingHandler extends BaseThingHandler {
protected AstroThingConfig thingConfig;
private final Lock monitor = new ReentrantLock();
private final Queue<Job> scheduledJobs;
private final List<Future<?>> futures;

public AstroThingHandler(Thing thing) {
super(thing);
futures = new CopyOnWriteArrayList<>();
scheduledExecutor = ExpressionThreadPoolManager.getExpressionScheduledPool("astro");
scheduledJobs = new LinkedBlockingQueue<>(MAX_SCHEDULED_JOBS);
}
Expand Down Expand Up @@ -194,8 +190,8 @@ private void restartJobs() {
scheduledExecutor.schedule(dailyJob, midNightExpression);
logger.info("Scheduled astro job-daily-{} at midnight for thing {}", typeId, thingUID);
}
// Execute startup job immediately
futures.add(scheduler.submit(dailyJob));
// Execute daily startup job immediately
dailyJob.run();

// Repeat scheduled job every configured seconds
LocalDateTime currentTime = LocalDateTime.now();
Expand All @@ -210,8 +206,8 @@ private void restartJobs() {
logger.info("Scheduled astro job-positional with interval of {} seconds for thing {}",
thingConfig.getInterval(), thingUID);
}
// Execute positional job immediately
futures.add(scheduler.submit(positionalJob));
// Execute positional startup job immediately
positionalJob.run();
}
}
} catch (ParseException ex) {
Expand All @@ -228,7 +224,6 @@ private void stopJobs() {
ThingUID thingUID = getThing().getUID();
monitor.lock();
try {
cancelFutures();
if (scheduledExecutor != null) {
List<Job> jobsToRemove = scheduledJobs.stream().filter(this::isJobAssociatedWithThing)
.collect(toList());
Expand All @@ -248,27 +243,6 @@ private void stopJobs() {
}
}

/**
* Cancels all submitted futures
*/
private void cancelFutures() {
ThingUID thingUID = getThing().getUID();
monitor.lock();
try {
if (futures.isEmpty()) {
return;
}
logger.debug("Cancelling futures for thing {}", thingUID);
futures.forEach(f -> f.cancel(true));
futures.clear();
logger.debug("Cancelled futures for thing {}", thingUID);
} catch (Exception ex) {
logger.error("{}", ex.getMessage(), ex);
} finally {
monitor.unlock();
}
}

private boolean isJobAssociatedWithThing(Job job) {
String thingUID = getThing().getUID().getAsString();
String jobThingUID = job.getThingUID();
Expand Down
Expand Up @@ -21,36 +21,6 @@ public String getThingUID() {
return thingUID;
}

@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + (thingUID == null ? 0 : thingUID.hashCode());
return result;
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
AbstractJob other = (AbstractJob) obj;
if (thingUID == null) {
if (other.thingUID != null) {
return false;
}
} else if (!thingUID.equals(other.thingUID)) {
return false;
}
return true;
}

/**
* Ensures the truth of an expression involving one or more parameters to the
* calling method.
Expand Down

0 comments on commit 04a064c

Please sign in to comment.