Skip to content

Commit

Permalink
Merge pull request #213 from vgarvardt/feat/expose-job-created-at
Browse files Browse the repository at this point in the history
feat: expose CreatedAt field for the Job
  • Loading branch information
vgarvardt committed Aug 31, 2023
2 parents 8d88c31 + 4847e23 commit acb5101
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 18 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ jobs:
uses: ./.github/actions/setup-deps
with:
go-version: '1.19'
token: ${{secrets.GITHUB_TOKEN}}
token: ${{ secrets.GITHUB_TOKEN }}

- name: Lint Golang
uses: golangci/golangci-lint-action@v3
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/testing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
go-version: [ '1.19', '1.20', 'stable' ]
go-version: [ '1.19', '1.20', '1.21', 'stable' ]
timeout-minutes: 10
steps:
- name: Check out code
Expand All @@ -25,8 +25,8 @@ jobs:
- name: Setup dependencies
uses: ./.github/actions/setup-deps
with:
go-version: '1.19'
token: ${{secrets.GITHUB_TOKEN}}
go-version: ${{ matrix.go-version }}
token: ${{ secrets.GITHUB_TOKEN }}

- name: Run tests
run: task test
Expand Down
25 changes: 13 additions & 12 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,20 +117,20 @@ func (c *Client) EnqueueBatchTx(ctx context.Context, jobs []*Job, tx adapter.Tx)
return nil
}

func (c *Client) execEnqueueWithID(ctx context.Context, j *Job, q adapter.Queryable, ulid ulid.ULID) (err error) {
func (c *Client) execEnqueueWithID(ctx context.Context, j *Job, q adapter.Queryable, jobID ulid.ULID) (err error) {
if j.Type == "" {
return ErrMissingType
}

now := time.Now().UTC()
j.CreatedAt = time.Now().UTC()

runAt := j.RunAt
if runAt.IsZero() {
j.RunAt = now
j.RunAt = j.CreatedAt
}

j.ID = ulid
idAsString := ulid.String()
j.ID = jobID
idAsString := jobID.String()

if j.Args == nil {
j.Args = []byte{}
Expand All @@ -140,7 +140,7 @@ func (c *Client) execEnqueueWithID(ctx context.Context, j *Job, q adapter.Querya
(job_id, queue, priority, run_at, job_type, args, created_at, updated_at)
VALUES
($1, $2, $3, $4, $5, $6, $7, $7)
`, idAsString, j.Queue, j.Priority, j.RunAt, j.Type, j.Args, now)
`, idAsString, j.Queue, j.Priority, j.RunAt, j.Type, j.Args, j.CreatedAt)

c.logger.Debug(
"Tried to enqueue a job",
Expand All @@ -155,12 +155,12 @@ VALUES
}

func (c *Client) execEnqueue(ctx context.Context, j *Job, q adapter.Queryable) error {
ulid, err := ulid.New(ulid.Now(), c.entropy)
jobID, err := ulid.New(ulid.Now(), c.entropy)
if err != nil {
return fmt.Errorf("could not generate new Job ULID ID: %w", err)
}

return c.execEnqueueWithID(ctx, j, q, ulid)
return c.execEnqueueWithID(ctx, j, q, jobID)
}

// LockJob attempts to retrieve a Job from the database in the specified queue.
Expand All @@ -177,7 +177,7 @@ func (c *Client) execEnqueue(ctx context.Context, j *Job, q adapter.Queryable) e
// After the Job has been worked, you must call either Job.Done() or Job.Error() on it
// in order to commit transaction to persist Job changes (remove or update it).
func (c *Client) LockJob(ctx context.Context, queue string) (*Job, error) {
sql := `SELECT job_id, queue, priority, run_at, job_type, args, error_count, last_error
sql := `SELECT job_id, queue, priority, run_at, job_type, args, error_count, last_error, created_at
FROM gue_jobs
WHERE queue = $1 AND run_at <= $2
ORDER BY priority ASC
Expand All @@ -197,7 +197,7 @@ LIMIT 1 FOR UPDATE SKIP LOCKED`
// After the Job has been worked, you must call either Job.Done() or Job.Error() on it
// in order to commit transaction to persist Job changes (remove or update it).
func (c *Client) LockJobByID(ctx context.Context, id ulid.ULID) (*Job, error) {
sql := `SELECT job_id, queue, priority, run_at, job_type, args, error_count, last_error
sql := `SELECT job_id, queue, priority, run_at, job_type, args, error_count, last_error, created_at
FROM gue_jobs
WHERE job_id = $1 FOR UPDATE SKIP LOCKED`

Expand All @@ -218,7 +218,7 @@ WHERE job_id = $1 FOR UPDATE SKIP LOCKED`
// After the Job has been worked, you must call either Job.Done() or Job.Error() on it
// in order to commit transaction to persist Job changes (remove or update it).
func (c *Client) LockNextScheduledJob(ctx context.Context, queue string) (*Job, error) {
sql := `SELECT job_id, queue, priority, run_at, job_type, args, error_count, last_error
sql := `SELECT job_id, queue, priority, run_at, job_type, args, error_count, last_error, created_at
FROM gue_jobs
WHERE queue = $1 AND run_at <= $2
ORDER BY run_at, priority ASC
Expand All @@ -245,14 +245,15 @@ func (c *Client) execLockJob(ctx context.Context, handleErrNoRows bool, sql stri
&j.Args,
&j.ErrorCount,
&j.LastError,
&j.CreatedAt,
)
if err == nil {
c.mLockJob.Add(ctx, 1, metric.WithAttributes(attrJobType.String(j.Type), attrSuccess.Bool(true)))
return &j, nil
}

rbErr := tx.Rollback(ctx)
if handleErrNoRows && err == adapter.ErrNoRows {
if handleErrNoRows && errors.Is(err, adapter.ErrNoRows) {
return nil, rbErr
}

Expand Down
5 changes: 3 additions & 2 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,7 @@ func findOneJob(t testing.TB, q adapter.Queryable) *Job {
j := new(Job)
err := q.QueryRow(
context.Background(),
`SELECT priority, run_at, job_id, job_type, args, error_count, last_error, queue FROM gue_jobs LIMIT 1`,
`SELECT priority, run_at, job_id, job_type, args, error_count, last_error, queue, created_at FROM gue_jobs LIMIT 1`,
).Scan(
&j.Priority,
&j.RunAt,
Expand All @@ -652,8 +652,9 @@ func findOneJob(t testing.TB, q adapter.Queryable) *Job {
&j.ErrorCount,
&j.LastError,
&j.Queue,
&j.CreatedAt,
)
if err == adapter.ErrNoRows {
if errors.Is(err, adapter.ErrNoRows) {
return nil
}
require.NoError(t, err)
Expand Down
10 changes: 10 additions & 0 deletions enqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func testEnqueueOnlyType(t *testing.T, connPool adapter.ConnPool) {
j, err := c.LockJobByID(ctx, job.ID)
require.NoError(t, err)
require.NotNil(t, j)
require.False(t, j.CreatedAt.IsZero())

t.Cleanup(func() {
err := j.Done(ctx)
Expand All @@ -49,6 +50,15 @@ func testEnqueueOnlyType(t *testing.T, connPool adapter.ConnPool) {
assert.Equal(t, []byte(``), j.Args)
assert.Equal(t, int32(0), j.ErrorCount)
assert.False(t, j.LastError.Valid)

assert.False(t, j.CreatedAt.IsZero())
assert.True(t, time.Now().After(j.CreatedAt))
assert.True(
t,
job.CreatedAt.Round(time.Second).Equal(j.CreatedAt.Round(time.Second)),
job.CreatedAt.Round(time.Second).String(),
j.CreatedAt.Round(time.Second).String(),
)
}

func TestEnqueueWithPriority(t *testing.T) {
Expand Down
6 changes: 6 additions & 0 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ type Job struct {
// being updated when the current Job run errored. This field supposed to be used mostly for the debug reasons.
LastError sql.NullString

// CreatedAt is the job creation time.
// This field is initialised only when the Job is being retrieved from the DB and is not
// being updated when the current Job run errored. This field can be used as a decision parameter in some handlers
// whether it makes sense to retry the job or it can be dropped.
CreatedAt time.Time

mu sync.Mutex
deleted bool
tx adapter.Tx
Expand Down

0 comments on commit acb5101

Please sign in to comment.