Skip to content

Commit

Permalink
Merge pull request #138 from JuliaGizmos/queue-restart-issues
Browse files Browse the repository at this point in the history
Fixes issue when runner task is restarted during a push
  • Loading branch information
shashi committed May 16, 2017
2 parents 73e1065 + efb2978 commit cd44368
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 6 deletions.
28 changes: 22 additions & 6 deletions src/core.jl
Expand Up @@ -134,10 +134,14 @@ eltype{T}(::Signal{T}) = T
eltype{T}(::Type{Signal{T}}) = T

##### Connections #####

const restart_queue = Ref(false)
function add_action!(f, node)
push!(node.actions, f)
maybe_restart_queue()
if current_task() !== runner_task
maybe_restart_queue()
else
restart_queue[] = true
end
f
end

Expand Down Expand Up @@ -247,7 +251,6 @@ end

function break_loop()
put!(_messages, Nullable{Message}())
yield()
end

function stop()
Expand All @@ -261,10 +264,12 @@ Processes `n` messages from the Reactive event queue.
function run(n::Int=typemax(Int))
for i=1:n
message = take!(_messages)

isnull(message) && break # break on null messages

msgval = get(message)
run_push(msgval.node, msgval.value, msgval.onerror)
restart_queue[] && maybe_restart_queue()
end
end

Expand Down Expand Up @@ -372,12 +377,23 @@ run_till_now() = run(Base.n_avail(_messages))
function maybe_restart_queue()
global runner_task
if !istaskdone(runner_task)
stop()
if Base.n_avail(_messages) > 0
break_loop()
if current_task() === runner_task
# will happen if `add_action!` is called while processing a push!
prev_runner = current_task()
@async begin
# new runner should wait for current runner to process the
# break_loop (null) message
wait(prev_runner)
runner_task = current_task()
run()
end
else
wait(runner_task)
runner_task = @async run()
end
runner_task = @async run()
end
restart_queue[] = false
end

function __init__()
Expand Down
86 changes: 86 additions & 0 deletions test/queue_runner.jl
@@ -0,0 +1,86 @@
Reactive.__init__()
import Reactive: runner_task

facts("Queue runner") do
context("Queue restarts during push!") do
@fact queue_size() --> 0
bcount = 0
a = Signal(1)
foreach(a; init=nothing) do _
b = Signal(1)
bcount += 1
foreach(b; init=nothing) do _
bcount += 1 #won't get run because of the init=nothing
end
nothing
end
function test_queue(expected_bcount, orig_runner)
push!(a, 3)
wait(Reactive.runner_task)
@fact queue_size() --> 0
@fact orig_runner --> not(Reactive.runner_task) # we should have a new queue runner
@fact bcount --> expected_bcount
end
@fact bcount --> 0
test_queue(1, Reactive.runner_task)
test_queue(2, Reactive.runner_task)
end

context("Multiple queue restarts during a single action") do
@fact queue_size() --> 0
bcount = 0
a = Signal(1)
foreach(a; init=nothing) do _
b = Signal(1)
bcount += 1
foreach(b; init=nothing) do _
bcount += 1 #won't get run because of the init=nothing
end
foreach(b; init=nothing) do _
bcount += 1 #won't get run because of the init=nothing
end
nothing
end
function test_queue(expected_bcount, orig_runner)
push!(a, 3)
wait(Reactive.runner_task)
@fact queue_size() --> 0
@fact orig_runner --> not(Reactive.runner_task) # we should have a new queue runner
@fact bcount --> expected_bcount
end
@fact bcount --> 0
test_queue(1, Reactive.runner_task)
test_queue(2, Reactive.runner_task)
end

context("Queue restarts after more than one push!") do
@fact queue_size() --> 0
bcount = 0
a = Signal(1)
foreach(a; init=nothing) do _
b = Signal(1)
bcount += 1
foreach(b; init=nothing) do _
bcount += 1 #won't get run because of the init=nothing
end
nothing
end
function test_queue(expected_bcount, orig_runner)
push!(a, 3)
push!(a, 4)
wait(Reactive.runner_task)
@fact queue_size() --> 0
@fact orig_runner --> not(Reactive.runner_task) # we should have a new queue runner
@fact bcount --> expected_bcount*2
end
@fact bcount --> 0
test_queue(1, Reactive.runner_task)
test_queue(2, Reactive.runner_task)
end
end


if !istaskdone(Reactive.runner_task)
Reactive.stop()
wait(Reactive.runner_task)
end
1 change: 1 addition & 0 deletions test/runtests.jl
Expand Up @@ -20,4 +20,5 @@ include("call_count.jl")
include("flatten.jl")
include("time.jl")
include("async.jl")
include("queue_runner.jl")
FactCheck.exitstatus()

0 comments on commit cd44368

Please sign in to comment.