Skip to content

Passive queue

uwiger edited this page Sep 30, 2011 · 1 revision

This example illustrates the combination of a producer queue and a passive queue. The producer queue is configured with 3 workers picking jobs from the passive queue as quickly as they can. This pattern is quite similar to a worker pool of gen_tcp acceptors.

Fun = fun() ->
	      io:fwrite("~p starting...~n",[self()]),
	      Res = jobs:dequeue(q, 3),
	      io:fwrite("Res = ~p~n", [Res])
     end,
application:set_env(jobs, queues,
		    [{pool, producer, Fun,
		      [{regulators, [
				     {counter,[{limit,3}]}
				    ]}
		      ]},
		     {q, passive,[]}]).

Here is a shell printout. The setup creates a pool of size 3, where the producer processes call jobs:dequeue(q, N). This function blocks if the queue is empty. When I put something in the queue, it is immediately forwarded to one of the waiting processes in the pool, which completes, terminates, letting the jobs_server spawn a new worker.

If it is important to reuse worker processes, this could be done simply by letting the producer fun loop and repeatedly call dequeue/2. If it dies, a new worker is started, to ensure the pool is filled.

You can only enqueue/dequeue to a passive queue. Any other type of queue will cause a badarg exception in the client.

=PROGRESS REPORT==== 13-Jan-2011::10:34:32 ===
        application: sasl
         started_at: nonode@nohost
Eshell V5.8.1  (abort with ^G)
1> application:set_env(jobs, queues, [{pool, producer, fun() -> io:fwrite("~p starting...~n",[self()]), Res = jobs:dequeue(q, 3), io:fwrite("Res = ~p~n", [Res]) end, [{regulators, [{counter,[{limit,3}]}]}]}, {q, passive,[]}]).
ok
2> application:start(jobs).

=PROGRESS REPORT==== 13-Jan-2011::10:34:44 ===
         supervisor: {local,jobs_app}
            started: [{pid,<0.47.0>},
                      {name,jobs_server},
                      {mfargs,{jobs_server,start_link,[]}},
                      {restart_type,permanent},
                      {shutdown,3000},
                      {child_type,worker}]
<0.48.0> starting...
<0.49.0> starting...
<0.50.0> starting...
ok
3> 
=PROGRESS REPORT==== 13-Jan-2011::10:34:44 ===
         supervisor: {local,kernel_safe_sup}
            started: [{pid,<0.52.0>},
                      {name,timer_server},
                      {mfargs,{timer,start_link,[]}},
                      {restart_type,permanent},
                      {shutdown,1000},
                      {child_type,worker}]

=PROGRESS REPORT==== 13-Jan-2011::10:34:44 ===
         supervisor: {local,jobs_app}
            started: [{pid,<0.51.0>},
                      {name,jobs_sampler},
                      {mfargs,{jobs_sampler,start_link,[]}},
                      {restart_type,permanent},
                      {shutdown,3000},
                      {child_type,worker}]

=PROGRESS REPORT==== 13-Jan-2011::10:34:44 ===
        application: jobs
         started_at: nonode@nohost

3> jobs:enqueue(q, job1).
ok
Res = [{36911304349436,job1}]
4> <0.55.0> starting...
4> jobs:enqueue(q, job2).
ok
Res = [{36911308613434,job2}]
5> <0.58.0> starting...
Clone this wiki locally