From 72d830f187e60e7bf3612607d75e7320b03706a7 Mon Sep 17 00:00:00 2001 From: Joel Mason Date: Sat, 13 May 2017 22:15:47 +1000 Subject: [PATCH 1/3] Fix queue restarting when add_action! called while processing push! --- src/core.jl | 16 ++++++++++++---- test/queue_runner.jl | 33 +++++++++++++++++++++++++++++++++ test/runtests.jl | 1 + 3 files changed, 46 insertions(+), 4 deletions(-) create mode 100644 test/queue_runner.jl diff --git a/src/core.jl b/src/core.jl index 3aa7523..c3bc26e 100644 --- a/src/core.jl +++ b/src/core.jl @@ -247,7 +247,6 @@ end function break_loop() put!(_messages, Nullable{Message}()) - yield() end function stop() @@ -372,11 +371,20 @@ run_till_now() = run(Base.n_avail(_messages)) function maybe_restart_queue() global runner_task if !istaskdone(runner_task) - stop() - if Base.n_avail(_messages) > 0 + stop() # calls run_till_now() then puts the break (null) message in _messages + if current_task() == runner_task + # will happen if `add_action!` is called while processing a push! + prev_runner = current_task() + runner_task = @async begin + # new runner should wait for current runner to process the + # break_loop (null) message + wait(prev_runner) + run() + end + else wait(runner_task) + runner_task = @async run() end - runner_task = @async run() end end diff --git a/test/queue_runner.jl b/test/queue_runner.jl new file mode 100644 index 0000000..23d60c2 --- /dev/null +++ b/test/queue_runner.jl @@ -0,0 +1,33 @@ +Reactive.__init__() + +facts("Queue runner") do + @fact queue_size() --> 0 + context("Queue restarts during push!") do + bcount = 0 + a = Signal(1) + foreach(a; init=nothing) do _ + b = Signal(1) + bcount += 1 + foreach(b; init=nothing) do _ + bcount += 1 #won't get run because of the init=nothing + end + nothing + end + function test_queue(expected_bcount, orig_runner) + push!(a, 3) + wait(Reactive.runner_task) + @fact queue_size() --> 0 # stop message is in the queue + @fact orig_runner --> not(Reactive.runner_task) # we should have a new queue runner + @fact bcount --> expected_bcount + end + @fact bcount --> 0 + test_queue(1, Reactive.runner_task) + test_queue(2, Reactive.runner_task) + end +end + + +if !istaskdone(Reactive.runner_task) + Reactive.stop() + wait(Reactive.runner_task) +end diff --git a/test/runtests.jl b/test/runtests.jl index 373fe63..f0ae2cc 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -20,4 +20,5 @@ include("call_count.jl") include("flatten.jl") include("time.jl") include("async.jl") +include("queue_runner.jl") FactCheck.exitstatus() From f94ca376f06c2697bf43e5f641723fba9e887728 Mon Sep 17 00:00:00 2001 From: Joel Mason Date: Sun, 14 May 2017 00:58:06 +1000 Subject: [PATCH 2/3] Fixes restart issues when there are multiple add_actions when processing a push And adds a test for this case --- src/core.jl | 11 +++++++++-- test/queue_runner.jl | 26 ++++++++++++++++++++++++++ 2 files changed, 35 insertions(+), 2 deletions(-) diff --git a/src/core.jl b/src/core.jl index c3bc26e..73837b1 100644 --- a/src/core.jl +++ b/src/core.jl @@ -134,10 +134,14 @@ eltype{T}(::Signal{T}) = T eltype{T}(::Type{Signal{T}}) = T ##### Connections ##### - +const restart_queue = Ref(false) function add_action!(f, node) push!(node.actions, f) - maybe_restart_queue() + if current_task() !== runner_task + maybe_restart_queue() + else + restart_queue[] = true + end f end @@ -260,10 +264,12 @@ Processes `n` messages from the Reactive event queue. function run(n::Int=typemax(Int)) for i=1:n message = take!(_messages) + isnull(message) && break # break on null messages msgval = get(message) run_push(msgval.node, msgval.value, msgval.onerror) + restart_queue[] && maybe_restart_queue() end end @@ -386,6 +392,7 @@ function maybe_restart_queue() runner_task = @async run() end end + restart_queue[] = false end function __init__() diff --git a/test/queue_runner.jl b/test/queue_runner.jl index 23d60c2..48b3170 100644 --- a/test/queue_runner.jl +++ b/test/queue_runner.jl @@ -24,6 +24,32 @@ facts("Queue runner") do test_queue(1, Reactive.runner_task) test_queue(2, Reactive.runner_task) end + @fact queue_size() --> 0 + context("Multiple queue restarts during a single action") do + bcount = 0 + a = Signal(1) + foreach(a; init=nothing) do _ + b = Signal(1) + bcount += 1 + foreach(b; init=nothing) do _ + bcount += 1 #won't get run because of the init=nothing + end + foreach(b; init=nothing) do _ + bcount += 1 #won't get run because of the init=nothing + end + nothing + end + function test_queue(expected_bcount, orig_runner) + push!(a, 3) + wait(Reactive.runner_task) + @fact queue_size() --> 0 # stop message is in the queue + @fact orig_runner --> not(Reactive.runner_task) # we should have a new queue runner + @fact bcount --> expected_bcount + end + @fact bcount --> 0 + test_queue(1, Reactive.runner_task) + test_queue(2, Reactive.runner_task) + end end From efb2978f414b4134d01f6adca1db8f6074e54646 Mon Sep 17 00:00:00 2001 From: Joel Mason Date: Mon, 15 May 2017 21:10:21 +1000 Subject: [PATCH 3/3] Fixes another problematic case with queue restarts --- src/core.jl | 7 ++++--- test/queue_runner.jl | 35 +++++++++++++++++++++++++++++++---- 2 files changed, 35 insertions(+), 7 deletions(-) diff --git a/src/core.jl b/src/core.jl index 73837b1..3c1e122 100644 --- a/src/core.jl +++ b/src/core.jl @@ -377,14 +377,15 @@ run_till_now() = run(Base.n_avail(_messages)) function maybe_restart_queue() global runner_task if !istaskdone(runner_task) - stop() # calls run_till_now() then puts the break (null) message in _messages - if current_task() == runner_task + break_loop() + if current_task() === runner_task # will happen if `add_action!` is called while processing a push! prev_runner = current_task() - runner_task = @async begin + @async begin # new runner should wait for current runner to process the # break_loop (null) message wait(prev_runner) + runner_task = current_task() run() end else diff --git a/test/queue_runner.jl b/test/queue_runner.jl index 48b3170..0431f42 100644 --- a/test/queue_runner.jl +++ b/test/queue_runner.jl @@ -1,8 +1,9 @@ Reactive.__init__() +import Reactive: runner_task facts("Queue runner") do - @fact queue_size() --> 0 context("Queue restarts during push!") do + @fact queue_size() --> 0 bcount = 0 a = Signal(1) foreach(a; init=nothing) do _ @@ -16,7 +17,7 @@ facts("Queue runner") do function test_queue(expected_bcount, orig_runner) push!(a, 3) wait(Reactive.runner_task) - @fact queue_size() --> 0 # stop message is in the queue + @fact queue_size() --> 0 @fact orig_runner --> not(Reactive.runner_task) # we should have a new queue runner @fact bcount --> expected_bcount end @@ -24,8 +25,9 @@ facts("Queue runner") do test_queue(1, Reactive.runner_task) test_queue(2, Reactive.runner_task) end - @fact queue_size() --> 0 + context("Multiple queue restarts during a single action") do + @fact queue_size() --> 0 bcount = 0 a = Signal(1) foreach(a; init=nothing) do _ @@ -42,7 +44,7 @@ facts("Queue runner") do function test_queue(expected_bcount, orig_runner) push!(a, 3) wait(Reactive.runner_task) - @fact queue_size() --> 0 # stop message is in the queue + @fact queue_size() --> 0 @fact orig_runner --> not(Reactive.runner_task) # we should have a new queue runner @fact bcount --> expected_bcount end @@ -50,6 +52,31 @@ facts("Queue runner") do test_queue(1, Reactive.runner_task) test_queue(2, Reactive.runner_task) end + + context("Queue restarts after more than one push!") do + @fact queue_size() --> 0 + bcount = 0 + a = Signal(1) + foreach(a; init=nothing) do _ + b = Signal(1) + bcount += 1 + foreach(b; init=nothing) do _ + bcount += 1 #won't get run because of the init=nothing + end + nothing + end + function test_queue(expected_bcount, orig_runner) + push!(a, 3) + push!(a, 4) + wait(Reactive.runner_task) + @fact queue_size() --> 0 + @fact orig_runner --> not(Reactive.runner_task) # we should have a new queue runner + @fact bcount --> expected_bcount*2 + end + @fact bcount --> 0 + test_queue(1, Reactive.runner_task) + test_queue(2, Reactive.runner_task) + end end