Skip to content

Commit

Permalink
Merge pull request #308 from cschleiden/fix-timer-cancel-race
Browse files Browse the repository at this point in the history
Fix timer cancel race
  • Loading branch information
cschleiden committed Jan 20, 2024
2 parents e756c08 + 962756c commit 9fbfb31
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 19 deletions.
27 changes: 14 additions & 13 deletions backend/redis/events_future.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,25 @@ import (
// Note: this does not work with Redis Cluster since not all keys are passed into the script.
var futureEventsCmd = redis.NewScript(`
-- Find events which should become visible now
local events = redis.call("ZRANGE", KEYS[1], "-inf", ARGV[1], "BYSCORE")
local now = ARGV[1]
local events = redis.call("ZRANGE", KEYS[1], "-inf", now, "BYSCORE")
for i = 1, #events do
local instanceSegment = redis.call("HGET", events[i], "instance")
-- Add event to pending event stream
local eventData = redis.call("HGET", events[i], "event")
local pending_events_key = "pending-events:" .. instanceSegment
redis.call("XADD", pending_events_key, "*", "event", eventData)
-- Try to queue workflow task
local already_queued = redis.call("SADD", KEYS[3], instanceSegment)
if already_queued ~= 0 then
-- Try to queue workflow task. If a workflow task is already queued, ignore this event for now.
local added = redis.call("SADD", KEYS[3], instanceSegment)
if added == 1 then
redis.call("XADD", KEYS[2], "*", "id", instanceSegment, "data", "")
end
-- Delete event hash data
redis.call("DEL", events[i])
redis.call("ZREM", KEYS[1], events[i])
-- Add event to pending event stream
local eventData = redis.call("HGET", events[i], "event")
local pending_events_key = "pending-events:" .. instanceSegment
redis.call("XADD", pending_events_key, "*", "event", eventData)
-- Delete event hash data
redis.call("DEL", events[i])
redis.call("ZREM", KEYS[1], events[i])
end
end
return #events
Expand Down
16 changes: 10 additions & 6 deletions backend/redis/scripts/complete_workflow_task.lua
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,16 @@ local timersToCancel = tonumber(getArgv())
for i = 1, timersToCancel do
local futureEventKey = getKey()

redis.call("ZREM", futureEventZSetKey, futureEventKey)
-- remove payload
local eventId = redis.call("HGET", futureEventKey, "id")
redis.call("HDEL", payloadHashKey, eventId)
-- remove event hash
redis.call("DEL", futureEventKey)
local eventRemoved = redis.call("ZREM", futureEventZSetKey, futureEventKey)
-- Event might've become visible while this task was being processed, in that
-- case it would be already removed from futureEventZSetKey
if eventRemoved == 1 then
-- remove payload
local eventId = redis.call("HGET", futureEventKey, "id")
redis.call("HDEL", payloadHashKey, eventId)
-- remove event hash
redis.call("DEL", futureEventKey)
end
end

-- Schedule timers
Expand Down
1 change: 1 addition & 0 deletions backend/test/e2e.go
Original file line number Diff line number Diff line change
Expand Up @@ -779,6 +779,7 @@ func EndToEndBackendTest(t *testing.T, setup func(options ...backend.BackendOpti
}

tests = append(tests, e2eActivityTests...)
tests = append(tests, e2eTimerTests...)
tests = append(tests, e2eStatsTests...)

run := func(suffix string, workerOptions worker.Options) {
Expand Down
46 changes: 46 additions & 0 deletions backend/test/e2e_timer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package test

import (
"context"
"testing"
"time"

"github.com/cschleiden/go-workflows/client"
"github.com/cschleiden/go-workflows/worker"
"github.com/cschleiden/go-workflows/workflow"
"github.com/stretchr/testify/require"
)

var e2eTimerTests = []backendTest{
{
name: "Timer_ScheduleCancelRace",
f: func(t *testing.T, ctx context.Context, c *client.Client, w *worker.Worker, b TestBackend) {
wf := func(ctx workflow.Context) error {
// 1) Start of first execution slice
tctx, cancel := workflow.WithCancel(ctx)

// 1) Schedule timer
x := workflow.ScheduleTimer(tctx, time.Millisecond*200)

// 1) Force an end to the execution slice
workflow.Sleep(ctx, time.Millisecond)

// 2) Start of second execution slice
// 2) Cancel timer. It should not have fired at this point,
// we only waited for a millisecond
cancel()

x.Get(ctx)

// 2) Force the execution slice to be active for as long as it takes to fire the timer
time.Sleep(time.Millisecond * 200)

return nil
}
register(t, ctx, w, []interface{}{wf}, nil)

_, err := runWorkflowWithResult[any](t, ctx, c, wf)
require.NoError(t, err)
},
},
}

0 comments on commit 9fbfb31

Please sign in to comment.