Skip to content

Commit

Permalink
Add LockJobByID function
Browse files Browse the repository at this point in the history
  • Loading branch information
Delores Diei committed Sep 8, 2021
1 parent e726647 commit b5958c8
Showing 1 changed file with 41 additions and 0 deletions.
41 changes: 41 additions & 0 deletions client.go
Expand Up @@ -139,6 +139,47 @@ LIMIT 1 FOR UPDATE SKIP LOCKED`, queue, now).Scan(
return nil, fmt.Errorf("could not lock a job (rollback result: %v): %w", rbErr, err)
}

// LockJobByID attempts to retrieve a specific Job from the database.
// If the job is found, it will be locked on the transactional level, so other workers
// will be skipping it. If the job is not found, nil will be returned instead of an error.
//
// Because Gue uses transaction-level locks, we have to hold the
// same transaction throughout the process of getting the job, working it,
// deleting it, and releasing the lock.
//
// After the Job has been worked, you must call either Done() or Error() on it
// in order to commit transaction to persist Job changes (remove or update it).
func (c *Client) LockJobByID(ctx context.Context, id int64) (*Job, error) {
tx, err := c.pool.Begin(ctx)
if err != nil {
return nil, err
}

j := Job{pool: c.pool, tx: tx, backoff: c.backoff}

err = tx.QueryRow(ctx, `SELECT job_id, queue, priority, run_at, job_type, args, error_count
FROM gue_jobs
WHERE job_id = $1 FOR UPDATE SKIP LOCKED`, id).Scan(
&j.ID,
&j.Queue,
&j.Priority,
&j.RunAt,
&j.Type,
(*json.RawMessage)(&j.Args),
&j.ErrorCount,
)
if err == nil {
return &j, nil
}

rbErr := tx.Rollback(ctx)
if err == adapter.ErrNoRows {
return nil, rbErr
}

return nil, fmt.Errorf("could not lock the job (rollback result: %v): %w", rbErr, err)
}

func newID() string {
hasher := md5.New()
// nolint:errcheck
Expand Down

0 comments on commit b5958c8

Please sign in to comment.