Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to mark a task as completed externally #862

Open
ScribeSavant opened this issue Apr 14, 2024 · 1 comment
Open

How to mark a task as completed externally #862

ScribeSavant opened this issue Apr 14, 2024 · 1 comment
Labels
question Further information is requested

Comments

@ScribeSavant
Copy link

ScribeSavant commented Apr 14, 2024

Hello, How can I mark a process as completed externally, for example I have an http server and I create a job with it and I want to stop the job with it, I tried the 'Inspector.CancelProcessing()' option but the job falls into the retry section and then starts again.

When I set MaxRetry to 0, the work goes to the archive section on restarts, and it does not start again.

Here is my full code

Controller

// create job
func (s *Server) createEvm() http.HandlerFunc {
	return func(w http.ResponseWriter, r *http.Request) {
		payload, err := io.ReadAll(r.Body)

		if err != nil {
			http.Error(w, err.Error(), http.StatusInternalServerError)
		}

		now := time.Now()
		fiveHours := now.Add(5 * time.Hour)
		t1 := workers.NewEvmTask(payload, asynq.MaxRetry(0), asynq.Deadline(fiveHours))
		info, err := s.workerServer.Enqueue(t1)
		if err != nil {
			http.Error(w, err.Error(), http.StatusInternalServerError)
		}
		w.Header().Set("content-type", "application/json")
		json.NewEncoder(w).Encode(info)
	}
}

// stop job
func (s *Server) stopTask() http.HandlerFunc {
	return func(w http.ResponseWriter, r *http.Request) {
		var payload *CancelTaskPayload

		if err := json.NewDecoder(r.Body).Decode(&payload); err != nil {
			http.Error(w, err.Error(), http.StatusInternalServerError)
		}

		if err := s.workerServer.Inspector.CancelProcessing(payload.JobID); err != nil {
			http.Error(w, err.Error(), http.StatusInternalServerError)
		}
		w.Header().Set("content-type", "application/json")
		json.NewEncoder(w).Encode(map[string]bool{"stopped": true})
	}
}

Evm Worker

package workers

import (
	"context"
	"fmt"
	"time"

	"github.com/hibiken/asynq"
)

type EvmTaskPayload struct {
	UserID int `json:"userId"`
}

type EvmJob struct {
}

func NewEvmTask(payload []byte, opts ...asynq.Option) *asynq.Task {
	return asynq.NewTask("evm:task", payload, opts...)
}

func (e *EvmJob) Processor(ctx context.Context, t *asynq.Task) error {
	for {
		select {
		case <-ctx.Done():
			fmt.Println("Stopped")
			return nil
		default:
			fmt.Println("Still working")
			time.Sleep(3 * time.Second)
		}
	}
}

Worker server

package workers

import (
	"context"

	"github.com/hibiken/asynq"
)

type WorkerServer struct {
	*asynq.Server
	*asynq.ServeMux
	*asynq.Client
	*asynq.Inspector
}

func CreateWorkerServer() *WorkerServer {
	s := &WorkerServer{
		Server: asynq.NewServer(
			asynq.RedisClientOpt{Addr: "localhost:6379"},
			asynq.Config{Concurrency: 1000},
		),
		ServeMux: asynq.NewServeMux(),
		Client: asynq.NewClient(
			asynq.RedisClientOpt{Addr: "localhost:6379"},
		),
		Inspector: asynq.NewInspector(
			asynq.RedisClientOpt{Addr: "localhost:6379"},
		),
	}
	
	s.routes()
	return s
}

func (w *WorkerServer) routes() {
	w.ServeMux.HandleFunc("evm:task", func(ctx context.Context, t *asynq.Task) error {
		job := &EvmJob{}
		return job.Processor(ctx, t)
	})
}

func (w *WorkerServer) StopJob(jobId string) {
	w.Inspector.CancelProcessing(jobId)
	w.Inspector.DeleteAllArchivedTasks("default")
}

In this example, when the process is stopped, it falls into the retry section, but since the maximum Retry is 0, it is automatically archived, but the problem is that if the server or the entire code is restarted, the jobs fall into the archive in the same way.

@kamikazechaser kamikazechaser added the question Further information is requested label Apr 15, 2024
@thucnq
Copy link

thucnq commented May 13, 2024

Maybe you can try method DeleteTask

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

3 participants