Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kill busy jobs without restoring #305

Open
robobakery opened this issue Mar 6, 2018 · 1 comment
Open

Kill busy jobs without restoring #305

robobakery opened this issue Mar 6, 2018 · 1 comment

Comments

@robobakery
Copy link
Contributor

in Exq.Api, I couldn't find any api to kill busy job.
there exists remove_job but it can remove a job listed in queue only.
there also exists clear_processes but nothing happens.

If i forced to kill busy job using Process.exit, and restart my app,
then Exq re-enqueues the job i killed.

for the most cases, i think, restoring is very helpful and i love it.
but in my case, i need to kill it without restoring.

is there any way of doing it?

short snippet:

defmodule App.SampleWorker do
  def perform() do
    IO.puts "SampleWorker start"
    :timer.sleep(20_000)
    IO.puts "SampleWorker end"
  end
end

iex:

> Exq.enqueue(Exq, "sample", App.SampleWorker, [])

> {:ok, processes} = Exq.Api.processes(Exq.Api)
> Enum.each(processes, fn process ->
>   Process.exit(IEx.Helpers.pid(process.pid |> String.replace(~r/[<>]/, "")), :kill)
> end)

after restart:

[info] Re-enqueueing job from backup for node_id [Jeyui-MacBook-Pro] and queue [sample]

> Exq.Api.processes(Exq.Api)

{:ok,
 [
   %Exq.Support.Process{
     host: "Jeyui-MacBook-Pro",
     job: %{
       "args" => [],
       "class" => "App.SampleWorker",
       "enqueued_at" => 1520341921.517253,
       "error_class" => nil,
       "error_message" => nil,
       "failed_at" => nil,
       "finished_at" => nil,
       "jid" => "597d2a71-38ec-4827-a78e-7358f662ad30",
       "processor" => nil,
       "queue" => "sample",
       "retry" => 1,
       "retry_count" => nil
     },
     pid: "<0.291.0>",
     started_at: 1520341928.386247
   }
 ]}

thank you.

@robobakery
Copy link
Contributor Author

robobakery commented Mar 7, 2018

i don't think this is beautiful but i've finally achieved it by doing flushdb after killing all the busy jobs.

def factory_reset() do
  with {:ok, processes} <- Exq.Api.processes(Exq.Api) do
    processes
    |> Enum.map(&extract_pid/1)
    |> Enum.each(&kill_process/1)
  end

  Exq.Redis.Connection.flushdb!(Exq.Redis.Client)

  Application.get_env(:exq, :queues)
    |> Enum.each(fn {queue, concurrency} -> Exq.subscribe(Exq, queue, concurrency) end)
end

defp kill_process(pid) do
  Process.exit(pid, :kill)
end

defp extract_pid(process) do
  process.pid |> String.replace(~r/[<>]/, "") |> IEx.Helpers.pid()
end

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant