Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Suggest to support timeout for JobConfiguration #1464

Open
Technoboy- opened this issue Sep 17, 2020 · 10 comments · May be fixed by #1923
Open

Suggest to support timeout for JobConfiguration #1464

Technoboy- opened this issue Sep 17, 2020 · 10 comments · May be fixed by #1923

Comments

@Technoboy-
Copy link
Contributor

JobConfiguration should add timeout for users. If the job executes timeout event if, in sharding level, we can trace it and monitor it further.
As for dag, we can decide to wait for the max time for this node.

@Technoboy-
Copy link
Contributor Author

@terrymanu

@terrymanu
Copy link
Member

It is a necessary feature, I just set the label to new feature.

@skaic
Copy link
Contributor

skaic commented May 27, 2021

I can make it, Could you assign to me. @terrymanu @TeslaCN

@TeslaCN
Copy link
Member

TeslaCN commented Jun 29, 2021

Hi @skaic
We can discuss about how to support and what will happen if timed out.

@skaic
Copy link
Contributor

skaic commented Jun 29, 2021

Thanks @TeslaCN . It is a necessary feature for me , I have a idea and give me some time to write, Thanks.

@skaic
Copy link
Contributor

skaic commented Jun 29, 2021

Core:
核心思想:

We start a process in new thread and waiting it。 When time out, call the interrupt() of the thread And cloes() of the thread socket 。
我们通过线程启动任务后,使用一个计时器进行等待,如果超时了,就调用该线程的 interrupt 方法和该线程的套接字 close 方法。

So, Ours Job can try to find a timeout By Check isInterrupted ( Thread.currentThread().isInterrupted() ) 、 catch InterruptedException | IOException , and to close self .
因此,我们的任务就可以通过检查 线程的 isInterrupted 标记(Thread.currentThread().isInterrupted()) 、 捕获 InterruptedException | IOException 来发现超时,让任务自己可以顺利的终止执行。

代码如下:

 for (int each : items) {
            ... 
            Future<Boolean> future = executorService.submit(() -> {
                try {
                    process(jobConfig, shardingContexts, each, jobExecutionEvent);
                } finally {
                    latch.countDown();
                }
                return true;
            });
            futures.add(future);
        }

        for (Future<Boolean> future : futures) {
            try {
                future.get(10, TimeUnit.SECONDS);
            } catch (final InterruptedException ex) {
                Thread.currentThread().interrupt();
            } catch (final ExecutionException ex) {
                throw new JobSystemException(ex);
            } catch (final TimeoutException ex) {
                // call the interrupt()
                future.cancel(true);
            }
        }

Job:

public class JavaSimpleJob implements SimpleJob {

    private final FooRepository fooRepository = FooRepositoryFactory.getFooRepository();

    @Override
    public void execute(final ShardingContext shardingContext) {
        List<Foo> data = fooRepository.findTodoData(shardingContext.getShardingParameter(), 10);
        for (Foo each : data) {
            if (!Thread.currentThread().isInterrupted()) {
                fooRepository.doSomeThing(each.getId());
            }
        }
    }

}

@skaic
Copy link
Contributor

skaic commented Jun 29, 2021

of course , the waiting timeout code can make it better .

@TeslaCN
Copy link
Member

TeslaCN commented Jun 29, 2021

I don't think it is elegant that let user write the Thread.currentThread().isInterrupted() in their business code. And they can also check the timeout or interrupt by themselves just by adding a little bit of code if they had already wrote some code about Thread or concurrent.
We may need a more elegant way to do that.

@skaic
Copy link
Contributor

skaic commented Jun 29, 2021

Right, Thread.currentThread().isInterrupted() looks stupid.

But Java deprecated Thread.stop(), Thread.suspend() methods, and use 'Thread.Interrupt() ' send the signal to the thread.
if necessary Job need to check the signal and stop itself.

We just providing a solution to stop job-thread When it is timeout.

@skaic
Copy link
Contributor

skaic commented Jul 6, 2021

@sunkai-cai sunkai-cai linked a pull request Jul 7, 2021 that will close this issue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants