Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

worker: Backup queued json for unique jobs #181

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,11 @@ require (
github.com/garyburd/redigo v1.6.0 // indirect
github.com/gocraft/health v0.0.0-20170925182251-8675af27fef0
github.com/gocraft/web v0.0.0-20190207150652-9707327fb69b
github.com/gocraft/work v0.5.1
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b // indirect
github.com/gomodule/redigo v2.0.0+incompatible
github.com/jrallison/go-workers v0.0.0-20180112190529-dbf81d0b75bb
github.com/kr/pretty v0.2.0 // indirect
github.com/orfjackal/nanospec.go v0.0.0-20120727230329-de4694c1d701 // indirect
github.com/robfig/cron v1.2.0 // indirect
github.com/robfig/cron/v3 v3.0.1
github.com/stretchr/testify v1.5.1
github.com/youtube/vitess v2.1.1+incompatible // indirect
Expand Down
4 changes: 0 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ github.com/gocraft/health v0.0.0-20170925182251-8675af27fef0 h1:pKjeDsx7HGGbjr7V
github.com/gocraft/health v0.0.0-20170925182251-8675af27fef0/go.mod h1:rWibcVfwbUxi/QXW84U7vNTcIcZFd6miwbt8ritxh/Y=
github.com/gocraft/web v0.0.0-20190207150652-9707327fb69b h1:g2Qcs0B+vOQE1L3a7WQ/JUUSzJnHbTz14qkJSqEWcF4=
github.com/gocraft/web v0.0.0-20190207150652-9707327fb69b/go.mod h1:Ag7UMbZNGrnHwaXPJOUKJIVgx4QOWMOWZngrvsN6qak=
github.com/gocraft/work v0.5.1 h1:3bRjMiOo6N4zcRgZWV3Y7uX7R22SF+A9bPTk4xRXr34=
github.com/gocraft/work v0.5.1/go.mod h1:pc3n9Pb5FAESPPGfM0nL+7Q1xtgtRnF8rr/azzhQVlM=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/gomodule/redigo v2.0.0+incompatible h1:K/R+8tc58AaqLkqG2Ol3Qk+DR/TlNuhuh457pBFPtt0=
Expand All @@ -41,8 +39,6 @@ github.com/orfjackal/nanospec.go v0.0.0-20120727230329-de4694c1d701 h1:yOXfzNV7q
github.com/orfjackal/nanospec.go v0.0.0-20120727230329-de4694c1d701/go.mod h1:VtBIF1XX0c1nKkeAPk8i4aXkYopqQgfDqolHUIHPwNI=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ=
github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down
5 changes: 5 additions & 0 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type Job struct {
FailedAt int64 `json:"failed_at,omitempty"`

rawJSON []byte
rawQueueJSON []byte // the JSON stored in in-progress queue, which may changed by redisLuaZremLpushCmd in requeuer
dequeuedFrom []byte
inProgQueue []byte
argError error
Expand All @@ -45,6 +46,10 @@ func newJob(rawJSON, dequeuedFrom, inProgQueue []byte) (*Job, error) {
return &job, nil
}

func (j *Job) setQueueJSON(json []byte) {
j.rawQueueJSON = json
}

func (j *Job) serialize() ([]byte, error) {
return json.Marshal(j)
}
Expand Down
8 changes: 7 additions & 1 deletion worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ func (w *worker) processJob(job *Job) {
// Going forward the job on the queue will always be just a placeholder, and we will be replacing it with the
// updated job extracted here
if updatedJob != nil {
// Keep json content from in-progress queue for further cleanup
updatedJob.setQueueJSON(job.rawJSON)
job = updatedJob
}
}
Expand Down Expand Up @@ -268,8 +270,12 @@ func (w *worker) removeJobFromInProgress(job *Job, fate terminateOp) {
conn := w.pool.Get()
defer conn.Close()

rawJSON := job.rawJSON
if job.rawQueueJSON != nil {
rawJSON = job.rawQueueJSON
}
conn.Send("MULTI")
conn.Send("LREM", job.inProgQueue, 1, job.rawJSON)
conn.Send("LREM", job.inProgQueue, 1, rawJSON)
conn.Send("DECR", redisKeyJobsLock(w.namespace, job.Name))
conn.Send("HINCRBY", redisKeyJobsLockInfo(w.namespace, job.Name), w.poolID, -1)
fate(conn)
Expand Down