Skip to content

Commit

Permalink
Fixes another problematic case with queue restarts
Browse files Browse the repository at this point in the history
  • Loading branch information
JobJob committed May 15, 2017
1 parent f94ca37 commit efb2978
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 7 deletions.
7 changes: 4 additions & 3 deletions src/core.jl
Expand Up @@ -377,14 +377,15 @@ run_till_now() = run(Base.n_avail(_messages))
function maybe_restart_queue()
global runner_task
if !istaskdone(runner_task)
stop() # calls run_till_now() then puts the break (null) message in _messages
if current_task() == runner_task
break_loop()
if current_task() === runner_task
# will happen if `add_action!` is called while processing a push!
prev_runner = current_task()
runner_task = @async begin
@async begin
# new runner should wait for current runner to process the
# break_loop (null) message
wait(prev_runner)
runner_task = current_task()
run()
end
else
Expand Down
35 changes: 31 additions & 4 deletions test/queue_runner.jl
@@ -1,8 +1,9 @@
Reactive.__init__()
import Reactive: runner_task

facts("Queue runner") do
@fact queue_size() --> 0
context("Queue restarts during push!") do
@fact queue_size() --> 0
bcount = 0
a = Signal(1)
foreach(a; init=nothing) do _
Expand All @@ -16,16 +17,17 @@ facts("Queue runner") do
function test_queue(expected_bcount, orig_runner)
push!(a, 3)
wait(Reactive.runner_task)
@fact queue_size() --> 0 # stop message is in the queue
@fact queue_size() --> 0
@fact orig_runner --> not(Reactive.runner_task) # we should have a new queue runner
@fact bcount --> expected_bcount
end
@fact bcount --> 0
test_queue(1, Reactive.runner_task)
test_queue(2, Reactive.runner_task)
end
@fact queue_size() --> 0

context("Multiple queue restarts during a single action") do
@fact queue_size() --> 0
bcount = 0
a = Signal(1)
foreach(a; init=nothing) do _
Expand All @@ -42,14 +44,39 @@ facts("Queue runner") do
function test_queue(expected_bcount, orig_runner)
push!(a, 3)
wait(Reactive.runner_task)
@fact queue_size() --> 0 # stop message is in the queue
@fact queue_size() --> 0
@fact orig_runner --> not(Reactive.runner_task) # we should have a new queue runner
@fact bcount --> expected_bcount
end
@fact bcount --> 0
test_queue(1, Reactive.runner_task)
test_queue(2, Reactive.runner_task)
end

context("Queue restarts after more than one push!") do
@fact queue_size() --> 0
bcount = 0
a = Signal(1)
foreach(a; init=nothing) do _
b = Signal(1)
bcount += 1
foreach(b; init=nothing) do _
bcount += 1 #won't get run because of the init=nothing
end
nothing
end
function test_queue(expected_bcount, orig_runner)
push!(a, 3)
push!(a, 4)
wait(Reactive.runner_task)
@fact queue_size() --> 0
@fact orig_runner --> not(Reactive.runner_task) # we should have a new queue runner
@fact bcount --> expected_bcount*2
end
@fact bcount --> 0
test_queue(1, Reactive.runner_task)
test_queue(2, Reactive.runner_task)
end
end


Expand Down

0 comments on commit efb2978

Please sign in to comment.