Skip to content

Commit

Permalink
add poll strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
Del-sama committed Oct 11, 2021
1 parent be53e70 commit 34fc6fc
Show file tree
Hide file tree
Showing 5 changed files with 253 additions and 21 deletions.
45 changes: 45 additions & 0 deletions client.go
Expand Up @@ -177,6 +177,51 @@ WHERE job_id = $1 FOR UPDATE SKIP LOCKED`, id).Scan(
return nil, fmt.Errorf("could not lock the job (rollback result: %v): %w", rbErr, err)
}

// LockNextScheduledJob attempts to retrieve the earliest scheduled Job from the database in the specified queue.
// If a job is found, it will be locked on the transactional level, so other workers
// will be skipping it. If no job is found, nil will be returned instead of an error.
//
// Because Gue uses transaction-level locks, we have to hold the
// same transaction throughout the process of getting a job, working it,
// deleting it, and releasing the lock.
//
// After the Job has been worked, you must call either Done() or Error() on it
// in order to commit transaction to persist Job changes (remove or update it).
func (c *Client) LockNextScheduledJob(ctx context.Context, queue string) (*Job, error) {
tx, err := c.pool.Begin(ctx)
if err != nil {
return nil, err
}

now := time.Now().UTC()

j := Job{pool: c.pool, tx: tx, backoff: c.backoff}

err = tx.QueryRow(ctx, `SELECT job_id, queue, priority, run_at, job_type, args, error_count
FROM gue_jobs
WHERE queue = $1 AND run_at <= $2
ORDER BY run_at, priority ASC
LIMIT 1 FOR UPDATE SKIP LOCKED`, queue, now).Scan(
&j.ID,
&j.Queue,
&j.Priority,
&j.RunAt,
&j.Type,
(*json.RawMessage)(&j.Args),
&j.ErrorCount,
)
if err == nil {
return &j, nil
}

rbErr := tx.Rollback(ctx)
if err == adapter.ErrNoRows {
return nil, rbErr
}

return nil, fmt.Errorf("could not lock a job (rollback result: %v): %w", rbErr, err)
}

func newID() string {
hasher := md5.New()
// nolint:errcheck
Expand Down
108 changes: 108 additions & 0 deletions client_test.go
Expand Up @@ -271,6 +271,114 @@ func testLockJobByIDNoJob(t *testing.T, connPool adapter.ConnPool) {
require.Nil(t, j)
}

func TestLockNextScheduledJob(t *testing.T) {
t.Run("pgx/v3", func(t *testing.T) {
testLockNextScheduledJob(t, adapterTesting.OpenTestPoolPGXv3(t))
})
t.Run("pgx/v4", func(t *testing.T) {
testLockNextScheduledJob(t, adapterTesting.OpenTestPoolPGXv4(t))
})
t.Run("lib/pq", func(t *testing.T) {
testLockNextScheduledJob(t, adapterTesting.OpenTestPoolLibPQ(t))
})
t.Run("go-pg/v10", func(t *testing.T) {
testLockNextScheduledJob(t, adapterTesting.OpenTestPoolGoPGv10(t))
})
}

func testLockNextScheduledJob(t *testing.T, connPool adapter.ConnPool) {
c := NewClient(connPool)
ctx := context.Background()

newJob := &Job{
Type: "MyJob",
RunAt: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
}
err := c.Enqueue(ctx, newJob)
require.NoError(t, err)
require.Greater(t, newJob.ID, int64(0))

j, err := c.LockNextScheduledJob(ctx, "")
require.NoError(t, err)

require.NotNil(t, j.tx)
require.NotNil(t, j.pool)
defer func() {
err := j.Done(ctx)
assert.NoError(t, err)
}()

// check values of returned Job
assert.Equal(t, newJob.ID, j.ID)
assert.Equal(t, defaultQueueName, j.Queue)
assert.Equal(t, int16(0), j.Priority)
assert.False(t, j.RunAt.IsZero())
assert.Equal(t, newJob.Type, j.Type)
assert.Equal(t, []byte(`[]`), j.Args)
assert.Equal(t, int32(0), j.ErrorCount)
assert.NotEqual(t, pgtype.Present, j.LastError.Status)
}

func TestLockNextScheduledJobAlreadyLocked(t *testing.T) {
t.Run("pgx/v3", func(t *testing.T) {
testLockNextScheduledJobAlreadyLocked(t, adapterTesting.OpenTestPoolPGXv3(t))
})
t.Run("pgx/v4", func(t *testing.T) {
testLockNextScheduledJobAlreadyLocked(t, adapterTesting.OpenTestPoolPGXv4(t))
})
t.Run("lib/pq", func(t *testing.T) {
testLockNextScheduledJobAlreadyLocked(t, adapterTesting.OpenTestPoolLibPQ(t))
})
t.Run("go-pg/v10", func(t *testing.T) {
testLockNextScheduledJobAlreadyLocked(t, adapterTesting.OpenTestPoolGoPGv10(t))
})
}

func testLockNextScheduledJobAlreadyLocked(t *testing.T, connPool adapter.ConnPool) {
c := NewClient(connPool)
ctx := context.Background()

err := c.Enqueue(ctx, &Job{Type: "MyJob"})
require.NoError(t, err)

j, err := c.LockNextScheduledJob(ctx, "")
require.NoError(t, err)

defer func() {
err := j.Done(ctx)
assert.NoError(t, err)
}()
require.NotNil(t, j)

j2, err := c.LockNextScheduledJob(ctx, "")
require.NoError(t, err)
require.Nil(t, j2)
}

func TestLockNextScheduledJobNoJob(t *testing.T) {
t.Run("pgx/v3", func(t *testing.T) {
testLockNextScheduledJobNoJob(t, adapterTesting.OpenTestPoolPGXv3(t))
})
t.Run("pgx/v4", func(t *testing.T) {
testLockNextScheduledJobNoJob(t, adapterTesting.OpenTestPoolPGXv4(t))
})
t.Run("lib/pq", func(t *testing.T) {
testLockNextScheduledJobNoJob(t, adapterTesting.OpenTestPoolLibPQ(t))
})
t.Run("go-pg/v10", func(t *testing.T) {
testLockNextScheduledJobNoJob(t, adapterTesting.OpenTestPoolGoPGv10(t))
})
}

func testLockNextScheduledJobNoJob(t *testing.T, connPool adapter.ConnPool) {
c := NewClient(connPool)
ctx := context.Background()

j, err := c.LockNextScheduledJob(ctx, "")
require.NoError(t, err)
require.Nil(t, j)
}

func TestJobTx(t *testing.T) {
t.Run("pgx/v3", func(t *testing.T) {
testJobTx(t, adapterTesting.OpenTestPoolPGXv3(t))
Expand Down
62 changes: 41 additions & 21 deletions worker.go
Expand Up @@ -14,8 +14,10 @@ import (
)

const (
defaultPollInterval = 5 * time.Second
defaultQueueName = ""
defaultPollInterval = 5 * time.Second
defaultQueueName = ""
defaultPollStrategy = "OrderByPriority"
nextScheduledPollStrategy = "OrderByRunAtPriority"
)

// WorkFunc is a function that performs a Job. If an error is returned, the job
Expand All @@ -29,14 +31,15 @@ type WorkMap map[string]WorkFunc
// Worker is a single worker that pulls jobs off the specified queue. If no Job
// is found, the Worker will sleep for interval seconds.
type Worker struct {
wm WorkMap
interval time.Duration
queue string
c *Client
id string
logger adapter.Logger
mu sync.Mutex
running bool
wm WorkMap
interval time.Duration
queue string
c *Client
id string
logger adapter.Logger
mu sync.Mutex
running bool
pollStrategy string
}

// NewWorker returns a Worker that fetches Jobs from the Client and executes
Expand Down Expand Up @@ -64,6 +67,10 @@ func NewWorker(c *Client, wm WorkMap, options ...WorkerOption) *Worker {
w.id = newID()
}

if w.pollStrategy == "" {
w.pollStrategy = defaultPollStrategy
}

w.logger = w.logger.With(adapter.F("worker-id", w.id))

return &w
Expand Down Expand Up @@ -150,7 +157,15 @@ func (w *Worker) runLoop(ctx context.Context) error {

// WorkOne tries to consume single message from the queue.
func (w *Worker) WorkOne(ctx context.Context) (didWork bool) {
j, err := w.c.LockJob(ctx, w.queue)
var j *Job
var err error
switch w.pollStrategy {
case nextScheduledPollStrategy:
j, err = w.c.LockNextScheduledJob(ctx, w.queue)
default:
j, err = w.c.LockJob(ctx, w.queue)
}

if err != nil {
w.logger.Error("Worker failed to lock a job", adapter.Err(err))
return
Expand Down Expand Up @@ -217,15 +232,16 @@ func recoverPanic(ctx context.Context, logger adapter.Logger, j *Job) {
// WorkerPool is a pool of Workers, each working jobs from the queue queue
// at the specified interval using the WorkMap.
type WorkerPool struct {
wm WorkMap
interval time.Duration
queue string
c *Client
workers []*Worker
id string
logger adapter.Logger
mu sync.Mutex
running bool
wm WorkMap
interval time.Duration
queue string
c *Client
workers []*Worker
id string
logger adapter.Logger
mu sync.Mutex
running bool
pollStrategy string
}

// NewWorkerPool creates a new WorkerPool with count workers using the Client c.
Expand All @@ -251,6 +267,10 @@ func NewWorkerPool(c *Client, wm WorkMap, poolSize int, options ...WorkerPoolOpt
w.id = newID()
}

if w.pollStrategy == "" {
w.pollStrategy = defaultPollStrategy
}

w.logger = w.logger.With(adapter.F("worker-pool-id", w.id))

for i := range w.workers {
Expand All @@ -261,9 +281,9 @@ func NewWorkerPool(c *Client, wm WorkMap, poolSize int, options ...WorkerPoolOpt
WithWorkerQueue(w.queue),
WithWorkerID(fmt.Sprintf("%s/worker-%d", w.id, i)),
WithWorkerLogger(w.logger),
setWorkerPollStrategy(w.pollStrategy),
)
}

return &w
}

Expand Down
26 changes: 26 additions & 0 deletions worker_option.go
Expand Up @@ -41,6 +41,25 @@ func WithWorkerLogger(logger adapter.Logger) WorkerOption {
}
}

// WithWorkerNextScheduledPollStrategy sets pollStrategy as nextScheduledPollStrategy
func WithWorkerNextScheduledPollStrategy() WorkerOption {
return func(w *Worker) {
w.pollStrategy = nextScheduledPollStrategy
}
}

// setWorkerPollStrategy setter method for worker pollStrategy
func setWorkerPollStrategy(strategy string) WorkerOption {
switch strategy {
case nextScheduledPollStrategy:
return WithWorkerNextScheduledPollStrategy()
default:
return func(w *Worker) {
w.pollStrategy = defaultPollStrategy
}
}
}

// WithPoolPollInterval overrides default poll interval with the given value.
// Poll interval is the "sleep" duration if there were no jobs found in the DB.
func WithPoolPollInterval(d time.Duration) WorkerPoolOption {
Expand Down Expand Up @@ -69,3 +88,10 @@ func WithPoolLogger(logger adapter.Logger) WorkerPoolOption {
w.logger = logger
}
}

// WithPoolNextScheduledPollStrategy sets pollStrategy as nextScheduledPollStrategy
func WithPoolNextScheduledPollStrategy() WorkerPoolOption {
return func(w *WorkerPool) {
w.pollStrategy = nextScheduledPollStrategy
}
}
33 changes: 33 additions & 0 deletions worker_option_test.go
Expand Up @@ -100,6 +100,29 @@ func TestWithWorkerLogger(t *testing.T) {
l.AssertExpectations(t)
}

func TestWithWorkerNextScheduledPollStrategy(t *testing.T) {
wm := WorkMap{
"MyJob": func(ctx context.Context, j *Job) error {
return nil
},
}
workerWithNextScheduledPollStrategy := NewWorker(nil, wm, WithWorkerNextScheduledPollStrategy())
assert.Equal(t, nextScheduledPollStrategy, workerWithNextScheduledPollStrategy.pollStrategy)
}

func TestSetWorkerPollStrategy(t *testing.T) {
wm := WorkMap{
"MyJob": func(ctx context.Context, j *Job) error {
return nil
},
}
workerWithNextScheduledPollStrategy := NewWorker(nil, wm, setWorkerPollStrategy(nextScheduledPollStrategy))
assert.Equal(t, nextScheduledPollStrategy, workerWithNextScheduledPollStrategy.pollStrategy)

workerWithDefaultPollStrategy := NewWorker(nil, wm, setWorkerPollStrategy(defaultPollStrategy))
assert.Equal(t, defaultPollStrategy, workerWithDefaultPollStrategy.pollStrategy)
}

func TestWithPoolPollInterval(t *testing.T) {
wm := WorkMap{
"MyJob": func(ctx context.Context, j *Job) error {
Expand Down Expand Up @@ -167,3 +190,13 @@ func TestWithPoolLogger(t *testing.T) {

l.AssertExpectations(t)
}

func TestWithPoolNextScheduledPollStrategy(t *testing.T) {
wm := WorkMap{
"MyJob": func(ctx context.Context, j *Job) error {
return nil
},
}
workerPoolWithNextScheduledPollStrategy := NewWorkerPool(nil, wm, 2, WithPoolNextScheduledPollStrategy())
assert.Equal(t, nextScheduledPollStrategy, workerPoolWithNextScheduledPollStrategy.pollStrategy)
}

0 comments on commit 34fc6fc

Please sign in to comment.