Skip to content

Commit

Permalink
Merge pull request #199 from vgarvardt/feat/job-ttl
Browse files Browse the repository at this point in the history
feat: added job ttl option
  • Loading branch information
vgarvardt committed Jul 3, 2023
2 parents d6527b0 + bb51cc2 commit 12c8f88
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 1 deletion.
12 changes: 11 additions & 1 deletion worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ type Worker struct {
running bool
pollStrategy PollStrategy
pollFunc pollFunc
jobTTL time.Duration

graceful bool
gracefulCtx func() context.Context
Expand Down Expand Up @@ -256,7 +257,14 @@ func (w *Worker) WorkOne(ctx context.Context) (didWork bool) {
return
}

if err = wf(ctx, j); err != nil {
handlerCtx := ctx
cancel := context.CancelFunc(func() {})
if w.jobTTL > 0 {
handlerCtx, cancel = context.WithTimeout(ctx, w.jobTTL)
}
defer cancel()

if err = wf(handlerCtx, j); err != nil {
w.mWorked.Add(ctx, 1, metric.WithAttributes(attrJobType.String(j.Type), attrSuccess.Bool(false)))

for _, hook := range w.hooksJobDone {
Expand Down Expand Up @@ -351,6 +359,7 @@ type WorkerPool struct {
mu sync.Mutex
running bool
pollStrategy PollStrategy
jobTTL time.Duration

graceful bool
gracefulCtx func() context.Context
Expand Down Expand Up @@ -410,6 +419,7 @@ func NewWorkerPool(c *Client, wm WorkMap, poolSize int, options ...WorkerPoolOpt
WithWorkerHooksJobDone(w.hooksJobDone...),
WithWorkerPanicStackBufSize(w.panicStackBufSize),
WithWorkerSpanWorkOneNoJob(w.spanWorkOneNoJob),
WithWorkerJobTTL(w.jobTTL),
)

if err != nil {
Expand Down
16 changes: 16 additions & 0 deletions worker_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,14 @@ func WithWorkerSpanWorkOneNoJob(spanWorkOneNoJob bool) WorkerOption {
}
}

// WithWorkerJobTTL sets max time a job can run. Implementation-wise the job runs with the timeout context,
// so it is up to the job implementation to handle context cancellation properly.
func WithWorkerJobTTL(d time.Duration) WorkerOption {
return func(w *Worker) {
w.jobTTL = d
}
}

// WithPoolPollInterval overrides default poll interval with the given value.
// Poll interval is the "sleep" duration if there were no jobs found in the DB.
func WithPoolPollInterval(d time.Duration) WorkerPoolOption {
Expand Down Expand Up @@ -225,3 +233,11 @@ func WithPoolSpanWorkOneNoJob(spanWorkOneNoJob bool) WorkerPoolOption {
w.spanWorkOneNoJob = spanWorkOneNoJob
}
}

// WithPoolJobTTL sets max time a job can run. Implementation-wise the job runs with the timeout context,
// so it is up to the job implementation to handle context cancellation properly.
func WithPoolJobTTL(d time.Duration) WorkerPoolOption {
return func(w *WorkerPool) {
w.jobTTL = d
}
}
26 changes: 26 additions & 0 deletions worker_option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,16 @@ func TestWithWorkerSpanWorkOneNoJob(t *testing.T) {
assert.True(t, workerWithSpanWorkOneNoJob.spanWorkOneNoJob)
}

func TestWithWorkerJobTTL(t *testing.T) {
workerWOutJobTTL, err := NewWorker(nil, dummyWM)
require.NoError(t, err)
assert.Equal(t, time.Duration(0), workerWOutJobTTL.jobTTL)

workerWithJobTTL, err := NewWorker(nil, dummyWM, WithWorkerJobTTL(5*time.Minute))
require.NoError(t, err)
assert.Equal(t, 5*time.Minute, workerWithJobTTL.jobTTL)
}

func TestWithPoolHooksJobLocked(t *testing.T) {
ctx := context.Background()
hook := new(dummyHook)
Expand Down Expand Up @@ -416,3 +426,19 @@ func TestWithPoolSpanWorkOneNoJob(t *testing.T) {
assert.True(t, w.spanWorkOneNoJob)
}
}

func TestWithPoolJobTTL(t *testing.T) {
poolWOutJobTTL, err := NewWorkerPool(nil, dummyWM, 2)
require.NoError(t, err)
assert.Equal(t, time.Duration(0), poolWOutJobTTL.jobTTL)
for _, w := range poolWOutJobTTL.workers {
assert.Equal(t, time.Duration(0), w.jobTTL)
}

poolWithJobTTL, err := NewWorkerPool(nil, dummyWM, 2, WithPoolJobTTL(10*time.Minute))
require.NoError(t, err)
assert.Equal(t, 10*time.Minute, poolWithJobTTL.jobTTL)
for _, w := range poolWithJobTTL.workers {
assert.Equal(t, 10*time.Minute, w.jobTTL)
}
}
43 changes: 43 additions & 0 deletions worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,10 @@ func TestNewWorker_GracefulShutdown(t *testing.T) {
require.NoError(t, err)

chDone := make(chan bool)
t.Cleanup(func() {
close(chDone)
})

go func() {
err := wNonGraceful.Run(ctxNonGraceful)
assert.NoError(t, err)
Expand Down Expand Up @@ -628,6 +632,45 @@ func TestNewWorker_GracefulShutdown(t *testing.T) {
require.False(t, jobCancelled)
}

func TestNewWorker_JobTTL(t *testing.T) {
connPool := adapterTesting.OpenTestPoolLibPQ(t)

c, err := NewClient(connPool)
require.NoError(t, err)

var jobCancelled bool
wm := WorkMap{
"MyJob": func(ctx context.Context, j *Job) error {
select {
case <-ctx.Done():
jobCancelled = true
case <-time.After(5 * time.Second):
jobCancelled = false
}

return nil
},
}

wNoJobTTL, err := NewWorker(c, wm)
require.NoError(t, err)
wWithJobTTL, err := NewWorker(c, wm, WithWorkerJobTTL(2*time.Second))
require.NoError(t, err)

err = c.Enqueue(context.Background(), &Job{Type: "MyJob"})
require.NoError(t, err)
err = c.Enqueue(context.Background(), &Job{Type: "MyJob"})
require.NoError(t, err)

didWork := wNoJobTTL.WorkOne(context.Background())
require.True(t, didWork)
require.False(t, jobCancelled)

didWork = wWithJobTTL.WorkOne(context.Background())
require.True(t, didWork)
require.True(t, jobCancelled)
}

func TestNewWorkerPool_GracefulShutdown(t *testing.T) {
connPool := adapterTesting.OpenTestPoolLibPQ(t)

Expand Down

0 comments on commit 12c8f88

Please sign in to comment.