Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unable to kill a single process while leaving others running #307

Open
taylordowns2000 opened this issue Mar 29, 2018 · 2 comments
Open

Comments

@taylordowns2000
Copy link

taylordowns2000 commented Mar 29, 2018

Erlang/OTP 20 [erts-9.0] [source] [64-bit] [smp:4:4] [ds:4:4:10] [async-threads:10] [hipe] [kernel-poll:false]
Elixir 1.5.1
Exq 0.10.1

Is it possible to kill a single processes (a busy worker) in Exq? I can generate a list of all the running processes with Exq.Api.processess(Process.whereis(Exq.Api)).

If I get 10 results in that {:ok, processes} tuple, I'd like to be able to pick out the one that has been running for more than 10 minutes, say, and kill it while leaving the others running.

Exq.Api.clear_processes(Process.whereis(Exq.Api)) works fine for clearing the whole list of processes, but when I pass in just the pid of the bad/long-running process, it complains:

** (FunctionClauseError) no function clause matching in GenServer.whereis/1    
    
    The following arguments were given to GenServer.whereis/1:
    
        # 1
        "<0.959.0>"
    
    (elixir) lib/gen_server.ex:945: GenServer.whereis/1
    (elixir) lib/gen_server.ex:764: GenServer.call/3

If I manipulate that pid a bit I get:

iex(8)> Exq.Api.clear_processes(pid("0.959.0"))  
** (exit) exited in: GenServer.call(#PID<0.959.0>, :clear_processes, 5000)
    ** (EXIT) no process: the process is not alive or there's no process currently associated with the given name, possibly because its application isn't started
    (elixir) lib/gen_server.ex:774: GenServer.call/3

And going further down this rabbit hole:

iex(4)> Exq.Api.clear_processes(IEx.Helpers.pid(pid |> String.replace(~r/[<>]/, "")))        
** (exit) exited in: GenServer.call(#PID<0.959.0>, :clear_processes, 5000)
    ** (EXIT) an exception was raised:
        ** (RuntimeError) attempted to call GenServer #PID<0.959.0> but no handle_call/3 clause was provided
            (exq) lib/gen_server.ex:598: Exq.Worker.Server.handle_call/3
            (stdlib) gen_server.erl:636: :gen_server.try_handle_call/4
            (stdlib) gen_server.erl:665: :gen_server.handle_msg/6
            (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
    (elixir) lib/gen_server.ex:774: GenServer.call/3
iex(4)> [error] GenServer #PID<0.959.0> terminating
** (RuntimeError) attempted to call GenServer #PID<0.959.0> but no handle_call/3 clause was provided
    (exq) lib/gen_server.ex:598: Exq.Worker.Server.handle_call/3
    (stdlib) gen_server.erl:636: :gen_server.try_handle_call/4
    (stdlib) gen_server.erl:665: :gen_server.handle_msg/6
    (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
Last message (from #PID<0.907.0>): :clear_processes
State: %Exq.Worker.Server.State{host: "taylor-ThinkPad-X1-Carbon-5th", job_serialized: "{\"retry\":0,\"queue\":\"browser_runs\",\"jid\":\"73b7faf7-9f23-4a0b-9ed8-97264d809cbc\",\"enqueued_at\":1522354133.120197,\"class\":\"OpenFn.JobRunner\",\"args\":[{\"type\":\"ReceiptMatch\",\"trigger_id\":1,\"receipt_id\":341,\"job_id\":2}]}", manager: #PID<0.669.0>, metadata: Exq.Worker.Metadata, middleware: Exq.Middleware.Server, middleware_state: [Exq.Middleware.Stats, Exq.Middleware.Job, Exq.Middleware.Manager, Exq.Middleware.Logger], namespace: "dev_exq", pipeline: %Exq.Middleware.Pipeline{assigns: %{host: "taylor-ThinkPad-X1-Carbon-5th", job: %Exq.Support.Job{args: [%{"job_id" => 2, "receipt_id" => 341, "trigger_id" => 1, "type" => "ReceiptMatch"}], class: "OpenFn.JobRunner", enqueued_at: 1522354133.120197, error_class: nil, error_message: nil, failed_at: nil, finished_at: nil, jid: "73b7faf7-9f23-4a0b-9ed8-97264d809cbc", processor: nil, queue: "browser_runs", retry: 0, retry_count: nil}, job_serialized: "{\"retry\":0,\"queue\":\"browser_runs\",\"jid\":\"73b7faf7-9f23-4a0b-9ed8-97264d809cbc\",\"enqueued_at\":1522354133.120197,\"class\":\"OpenFn.JobRunner\",\"args\":[{\"type\":\"ReceiptMatch\",\"trigger_id\":1,\"receipt_id\":341,\"job_id\":2}]}", manager: #PID<0.669.0>, namespace: "dev_exq", process_info: %Exq.Support.Process{host: "taylor-ThinkPad-X1-Carbon-5th", job: %Exq.Support.Job{args: [%{"job_id" => 2, "receipt_id" => 341, "trigger_id" => 1, "type" => "ReceiptMatch"}], class: "OpenFn.JobRunner", enqueued_at: 1522354133.120197, error_class: nil, error_message: nil, failed_at: nil, finished_at: nil, jid: "73b7faf7-9f23-4a0b-9ed8-97264d809cbc", processor: nil, queue: "browser_runs", retry: 0, retry_count: nil}, pid: #PID<0.959.0>, started_at: 1522354133.142235}, queue: "browser_runs", redis: Exq.Redis.Client, started_at: #DateTime<2018-03-29 20:08:53.156974Z>, stats: Exq.Stats, worker_module: OpenFn.JobRunner}, event: :before_work, halted: false, terminated: false, worker_pid: #PID<0.959.0>}, queue: "browser_runs", redis: Exq.Redis.Client, stats: Exq.Stats, work_table: #Reference<0.2669252645.2345271297.158598>}
Client #PID<0.907.0> is alive
    (stdlib) gen.erl:169: :gen.do_call/4
    (elixir) lib/gen_server.ex:771: GenServer.call/3
    (stdlib) erl_eval.erl:670: :erl_eval.do_apply/6
    (elixir) src/elixir.erl:239: :elixir.eval_forms/4
    (iex) lib/iex/evaluator.ex:219: IEx.Evaluator.handle_eval/5
    (iex) lib/iex/evaluator.ex:200: IEx.Evaluator.do_eval/3
    (iex) lib/iex/evaluator.ex:178: IEx.Evaluator.eval/3
    (iex) lib/iex/evaluator.ex:77: IEx.Evaluator.loop/1
    (iex) lib/iex/evaluator.ex:21: IEx.Evaluator.init/4
    (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3

I'm really sorry if I've missed something in the docs, but I haven't been able to figure out how to kill (or configure Exq to automatically kill) processes that have been running for a long time. Ultimately, my objective here is to configure a queue that will kill anything that lasts more than X seconds.

Update: I found @akira 's helpful comment on #228 where he suggested :timer.kill_after(:timer.seconds(600)) as a way to auto-kill long-running stuff. (Thank you!) Still wondering if it is possible to kill a single process by hand.

Thanks in advance!

@taylordowns2000
Copy link
Author

taylordowns2000 commented Mar 30, 2018

In case it's useful, this is getting me 90% of the way there, but the process still shows up in the Exq.Api.processes(Process.whereis(Exq.Api)) tuple, and I get the [info] Re-enqueueing job from backup for node_id [taylor...] and queue [runs] problem that others have faced.

  def kill_process(process) do
    Logger.info "Killing process."
    Process.exit(IEx.Helpers.pid(process.pid |> String.replace(~r/[<>]/, "")), :kill)
    Exq.Redis.JobStat.remove_process(Exq.Redis.Client, Application.get_env(:exq, :namespace), process, Exq.Support.Process.encode(process))
    Exq.Stats.Server.process_terminated(Exq.Stats, Application.get_env(:exq, :namespace), process)
    Exq.Stats.Server.record_failure(Exq.Stats, Application.get_env(:exq, :namespace), "I killed it.", process.job)
    # Exq.Redis.Connection.flushdb!(Exq.Redis.Client)
  end

I don't want to flushdb! there at the end because that seems to drop everything from redis, not just the single job I killed. I've been scanning more issues and realizing that this is really similar to #305 .

To be honest, it still feels like I'm missing something obvious—like I'm trying to find a back door for something that should be straightforward! Apologies, again, if I'm just missing it.

@robobakery
Copy link
Contributor

robobakery commented Apr 4, 2018

you might want to check out Exq.Redis.JobQueue.remove_job_from_backup/5

  defp extract_pid(process) do
    process.pid |> String.replace(~r/[<>]/, "") |> IEx.Helpers.pid()
  end

  def kill_process(process) do
    process
    |> extract_pid()
    |> Process.exit(:kill)

    job_serialized =
      process.job
      |> Map.take(["queue", "retry", "class", "args", "jid", "enqueued_at"])
      |> Exq.Support.Config.serializer.encode!()

    Exq.Redis.JobQueue.remove_job_from_backup(
      Exq.Redis.Client, Application.get_env(:exq, :namespace), process.host, process.job["queue"], job_serialized)

    process_serialized =
      process
      |> Exq.Support.Config.serializer.encode!()

    Exq.Redis.JobStat.remove_process(
      Exq.Redis.Client, Application.get_env(:exq, :namespace), process, process_serialized)

    Exq.Manager.Server.job_terminated(Exq, Application.get_env(:exq, :namespace), process.job["queue"], job_serialized)
  end

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants