Skip to content

AzatArslanov/multithreading-task

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

2 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Пример использования DelayQueue

Есть проблема:

На вход поступают пары (LocalDateTime, Callable). Необходимо реализовать систему, которая будет выполнять Callable для каждого пришедшего события в указанный LocalDateTime. Задачи должны выполняться в порядке согласно значению LocalDateTime. Если на один момент времени указано более одной задачи, то порядок их выполнения определяется порядком поступления. Если система перегружена, то задачи, время выполнения которых оказалось в прошлом, все равно должны выполниться согласно приоритетам описанным выше. Задачи могут приходить в произвольном порядке и из разных потоков.

Для ее решения отлично подойдет DelayQueue из пакета java.util.concurrent. DelayQueue это "неограниченная" (максимальное количество элементов Integer.MAX_VALUE) блокирующая очередь которая содержит элементы реализующие интерфейс Delayed. Особенностью этой очереди является то, что элементы будут доступны только после того, как их метод getDelay будет возвращать значение меньше или раное 0. Внутри себя содержит PriorityQueue поэтому также очень важно правильная реализация метода compareTo для наших элементов, так, чтобы во главе очереди оказывались элементы которые в первую очередь стоит выполнить. Для нашей проблемы реализация класса с задачей будет такой:

public class Task implements Delayed {

    private final Callable callable;
    private final LocalDateTime dateTime;

    public Task(LocalDateTime dateTime, Callable callable) {
        if (dateTime == null || callable == null) {
            throw new NullPointerException("dateTime and callable must be set");
        }
        this.dateTime = dateTime;
        this.callable = callable;
    }

    public LocalDateTime getDateTime() {
        return dateTime;
    }

    public Callable getCallable() {
        return callable;
    }

    @Override
    public int compareTo(Delayed o) {
        if (this == o) {
            return 0;
        }
        if (o instanceof Task) {
            LocalDateTime otherDateTime = ((Task) o).getDateTime();
            return dateTime.compareTo(otherDateTime);
        }
        return 1;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(Duration.between(LocalDateTime.now(), dateTime).getSeconds(), TimeUnit.SECONDS);
    }
}

В очередь задачи передаем так:

public void add(LocalDateTime localDateTime, Callable callable) {
        Task task = new Task(localDateTime, callable);
        log.info("add new task in queue: {}", task);
        try {
            queue.put(task);
        } catch (InterruptedException ignored) {
            log.error("Queue throws interrupted exception", ignored);
            throw new RuntimeException(ignored);
        }
    }

Непосредственно выполнением Callable будет заниматься Executors.newSingleThreadExecutor(), а загружать задачами его будет отдельный поток

manager = new Thread(() -> {
                while (!isStopping) {
                    try {
                        log.info("wait new tasks");
                        Task task = queue.take();
                        log.info("received {}", task);
                        taskWorkerService.execute(task.getCallable());
                    } catch (InterruptedException ignored) {
                        log.error("Queue throws interrupted exception", ignored);
                        throw new RuntimeException(ignored);
                    }
                }

            }, "TaskService.manager");
            manager.start();

About

Пример использования DelayQueue

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages