Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry single dead job type #236

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
52 changes: 52 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,58 @@ func (c *Client) DeleteDeadJob(diedAt int64, jobID string) error {
return nil
}

// Retry retries failed jobs of a specific type from the dead queue.
func (c *Client) RetryDeadOfType(jobType string) error {
// Get queues for job names
queues, err := c.Queues()
if err != nil {
logError("client.retry_all_dead_jobs.queues", err)
return err
}

fmt.Println("Queues: %+V", queues)
// Extract job names
var jobNames []string
for _, q := range queues {
if q.JobName == jobType {
jobNames = append(jobNames, q.JobName)
}
}
fmt.Println("JobNames: %+V", jobNames)

script := redis.NewScript(len(jobNames)+1, redisLuaRequeueAllDeadCmd)

args := make([]interface{}, 0, len(jobNames)+1+3)
args = append(args, redisKeyDead(c.namespace)) // KEY[1]
for _, jobName := range jobNames {
args = append(args, redisKeyJobs(c.namespace, jobName)) // KEY[2, 3, ...]
}
args = append(args, redisKeyJobsPrefix(c.namespace)) // ARGV[1]
args = append(args, nowEpochSeconds())
args = append(args, 1000)

conn := c.pool.Get()
defer conn.Close()

fmt.Println("args: %+V", args)
// Cap iterations for safety (which could reprocess 1k*1k jobs).
// This is conceptually an infinite loop but let's be careful.
for i := 0; i < 1000; i++ {
res, err := redis.Int64(script.Do(conn, args...))
if err != nil {
logError("client.retry_all_dead_jobs.do", err)
return err
}

if res == 0 {
break
}
}

return nil

}

// RetryDeadJob retries a dead job. The job will be re-queued on the normal work queue for eventual processing by a worker.
func (c *Client) RetryDeadJob(diedAt int64, jobID string) error {
// Get queues for job names
Expand Down
7 changes: 7 additions & 0 deletions webui/webui.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func NewServer(namespace string, pool *redis.Pool, hostPort string) *Server {
router.Get("/dead_jobs", (*context).deadJobs)
router.Post("/delete_dead_job/:died_at:\\d.*/:job_id", (*context).deleteDeadJob)
router.Post("/retry_dead_job/:died_at:\\d.*/:job_id", (*context).retryDeadJob)
router.Post("/retry_dead_job_type/:job_type", (*context).retryDeadJobOfType)
router.Post("/delete_all_dead_jobs", (*context).deleteAllDeadJobs)
router.Post("/retry_all_dead_jobs", (*context).retryAllDeadJobs)

Expand Down Expand Up @@ -205,6 +206,12 @@ func (c *context) retryDeadJob(rw web.ResponseWriter, r *web.Request) {
render(rw, map[string]string{"status": "ok"}, err)
}

func (c *context) retryDeadJobOfType(rw web.ResponseWriter, r *web.Request) {
err := c.client.RetryDeadOfType(r.PathParams["job_type"])

render(rw, map[string]string{"status": "ok"}, err)
}

func (c *context) deleteAllDeadJobs(rw web.ResponseWriter, r *web.Request) {
err := c.client.DeleteAllDeadJobs()
render(rw, map[string]string{"status": "ok"}, err)
Expand Down