/
README.Rmd
429 lines (303 loc) · 11.7 KB
/
README.Rmd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
---
output:
md_document:
toc: true
toc_depth: 3
variant: markdown_strict+backtick_code_blocks
pandoc_args: ["--atx-headers"]
---
```{r setup, include=FALSE}
library(knitr)
library(reticulate)
use_python('~/envs/mypy36/bin/python', required = TRUE)
knitr::opts_chunk$set(echo=FALSE, warning=FALSE)
py_config()
```
```{python}
import os
import sys
import pprint
print = pprint.pprint
if os.path.exists('./ezpq/__init__.py') and sys.path[0] != os.getcwd():
sys.path.insert(0, os.getcwd())
import ezpq
```
# `ezpq`: an easy parallel queueing system.
> Read this on [GitHub](https://github.com/dm3ll3n/ezpq) or [my site](https://www.donaldmellenbruch.com/project/ezpq/).
## How to get it
Install from [PyPI](https://pypi.org/project/ezpq/) with:
```python
pip install ezpq
```
Optional packages:
```python
pip install pandas # required for plots
pip install plotnine # required for plots
pip install tqdm # required for progress bars
```
## Overview
`ezpq` implements a parallel queueing system consisting of:
1. a priority "waiting" queue in.
2. a lookup table of "working" jobs.
3. a priority "completed" queue out.
The queueing system uses `multiprocessing.Process` by default and can also run jobs with `threading.Thread`.
![](docs/imgs/ezpq.png)
## Features
* Simple interface; pure Python.
* No required dependencies outside of standard library.
* Optional integration with [`tqdm`](https://github.com/tqdm/tqdm) progress bars.
* Compatible with Python 2 & 3.
* Cross platform with MacOS, Linux, and Windows.
* Data remains in-memory.
* Priority Queueing, both in and out and within lanes.
* Synchronous lanes allow dependent jobs to execute in the desired order.
* Easily switch from processes to threads.
* Automatic handling of output.
* Rich job details, easily viewed as pandas dataframe.
* Built-in logging to CSV.
* Customizable visualizations of queue operations.
## Quickstart
Suppose you wanted to speed up the following code, which runs 60 operations that take anywhere from 0s to 2s. With an average job time of ~1s, this operation should take ~60s.
```{python, echo=TRUE}
import time
import random
def random_sleep(x):
random.seed(x)
n = random.uniform(0.5, 1.5)
time.sleep(n)
return n
```
```{python, echo=TRUE, eval=FALSE}
start = time.time()
output = [random_sleep(x) for x in range(60)]
end = time.time()
print('> Runtime: ' + str(end - start))
```
```
## '> Runtime: 58.932034969329834'
```
Here is the function ran in parallel with an `ezpq` Queue of 6 workers. Thus, the runtime of the above operation will be reduced from ~60s to ~10s.
```{python, eval=FALSE, echo=TRUE}
import time
import random
import ezpq
start = time.time()
with ezpq.Queue(6) as Q:
output = Q.map(random_sleep, range(60))
end = time.time()
print('> Runtime: ' + str(end - start))
```
Here is the same scenario, using the `@ezpq.Queue` decorator.
```{python, echo=TRUE}
@ezpq.Queue(6)
def random_sleep(x):
random.seed(x)
n = random.uniform(0.5, 1.5)
time.sleep(n)
return n
output = random_sleep(iterable=range(60))
```
```{python}
# redefine for future functions
def random_sleep(x):
random.seed(x)
n = random.uniform(0.5, 1.5)
time.sleep(n)
return n
```
While `map()` and the decorator are useful for quick-n-simple parallization, the essential functions of an `ezpq` Queue include `put()`, `wait()`, and `get()` (or `collect()`).
```{python, echo=TRUE}
with ezpq.Queue(6) as Q:
for x in range(60):
Q.put(random_sleep, args=x)
Q.wait()
output = Q.collect()
```
The output is a list of dicts containing verbose information about each job, along with its output, and exit code.
```{python, echo=TRUE}
print( output[0] )
```
Easily convert output to a `pandas` dataframe:
```{python, echo=TRUE}
import pandas as pd
df = pd.DataFrame(output)
print( df.head()[['id', 'output', 'runtime', 'exitcode']] )
```
Use `ezpq.Plot` to generate a Gannt chart of the job timings.
```{python, echo=TRUE}
plt = ezpq.Plot(output).build(show_legend=False)
plt.save('docs/imgs/quickstart.png')
```
![](docs/imgs/quickstart.png)
## ezpq.Queue
The `Queue` class implements the queueing system, which is itself a 3-part system composed of the:
1. waiting queue
2. working table
3. completed queue
```{python}
print(help(ezpq.Queue.__init__))
```
## ezpq.Job
A `ezpq` job defines the function to run. It is passed to an `ezpq` queue with a call to `submit()`.
```{python}
print(help(ezpq.Job.__init__))
```
```{python, echo=TRUE}
with ezpq.Queue(6) as Q:
for x in range(60):
priority = x % 2 # give even numbers higher priority.
job = ezpq.Job(random_sleep, args=x, priority=priority)
Q.submit(job)
Q.wait()
output = Q.collect()
```
```{python}
plt = ezpq.Plot(output).build(color_by='priority',
color_pal=['blue', 'green'])
plt.save('docs/imgs/submit.png')
```
![](docs/imgs/submit.png)
### put
The `put` method creates a job and submits it to an `ezpq` queue. All of its arguments are passed to `ezpq.Job()`.
```{python, echo=TRUE}
with ezpq.Queue(6) as Q:
for x in range(60):
Q.put(random_sleep, args=x)
Q.wait()
output = Q.collect()
```
### size
`size()` returns a count of all items across all three queue components. It accepts three boolean parameters, `waiting`, `working`, and `completed`. If all of these are `False` (default), all jobs are counted. If any combination of these is `True`, only the corresponding queue(s) will be counted. For example:
```{python, echo=TRUE}
def print_sizes(Q):
msg = 'Total: {0}; Waiting: {1}; Working: {2}; Completed: {3}'.format(
Q.size(),
Q.size(waiting=True),
Q.size(working=True),
Q.size(completed=True)
)
print(msg)
```
```{python, echo=TRUE}
with ezpq.Queue(6) as Q:
# enqueue jobs
for x in range(60):
Q.put(random_sleep, x)
# repeatedly print sizes until complete.
while Q.size(waiting=True, working=True):
print_sizes(Q)
time.sleep(1)
print_sizes(Q)
```
### wait
The `wait()` method will block execution until all jobs complete. It also accepts a `timeout` parameter, given in seconds. The return value is the count of jobs that did not complete. Thus, a return value greater than 0 indicates the timeout was exceeded. The parameter `poll` can be used to adjust how frequently (in seconds) the operation checks for completed jobs.
New in v0.2.0, include `show_progress=True` to show a progress bar while waiting. This is equivalent to a call to `waitpb()`.
![](docs/imgs/tqdm.gif)
### get
`get()` retrieves and deletes ("pop") the highest priority job from the completed queue, if one is available. If the completed queue is empty, `get()` returns `None`. However, `get()` will wait for a completed job if `wait`, `poll`, or `timeout` are specified. If the timeout is exceeded, `None` is returned.
```{python, echo=TRUE}
with ezpq.Queue(6) as Q:
n_inputs = 60
output = [None] * n_inputs
# enqueue jobs
for x in range(n_inputs):
Q.put(random_sleep, args=x)
# repeatedly `get()` until queue is empty.
for i in range(n_inputs):
output[i] = Q.get(wait=True)
```
### collect
`collect()` is similar to `get()`, but it will return a list of *all* completed jobs and clear the completed queue. It does not support the `poll` or `timeout` parameters, but you can call `wait()` before `collect()` if desired.
```{python, echo=TRUE}
with ezpq.Queue(6) as Q:
# enqueue jobs
for x in range(60):
Q.put(random_sleep, x)
# wait and collect all jobs
print('Queue size before: {0}'.format(Q.size()))
Q.wait()
output = Q.collect()
print('Queue size after: {0}'.format(Q.size()))
print('Output size: {0}'.format(len(output)))
```
### map
`map` encapsulates the logic of `put`, `wait`, and `collect` in one call. Include `show_progress=True` to get output `tqdm` progress bar.
![](docs/imgs/tqdm_map.gif)
### starmap
`starmap` is similar to `map`, but operates on a list of lists, with each nested list being unpacked as arguments to the function.
```{python, echo=TRUE}
def my_pow(x, k):
return '{}^{} = {}'.format(x, k, x**k)
# list of lists to iterate over.
args_list = [[x, x%4] # (x, k)
for x in range(100)]
# starmap
with ezpq.Queue(10) as Q:
output = Q.starmap(my_pow, iterable=args_list)
[x['output'] for x in output[:10]]
```
### startmapkw
Same as `starmap`, but operations on a list of *dicts* to be expanded as kwargs to the function.
```{python, echo=TRUE}
def my_pow(x, k):
return '{}^{} = {}'.format(x, k, x**k)
# list of dicts to iterate over.
kwargs_list = [{ 'x':x, 'k':x%4 } # (x, k)
for x in range(100)]
# starmapkw
with ezpq.Queue(10) as Q:
output = Q.starmapkw(my_pow, iterable=kwargs_list)
[x['output'] for x in output[:10]]
```
### dispose
The queueing operations performed by `ezpq.Queue` are performed on a periodic basis. By default, the `poll` parameter for a Queue is `0.1` seconds. This "pulse" thread will continue firing until the Queue is disposed of.
In the previous examples, use of the context manager (`with ezpq.Queue() as Q:`) results in automatic disposal. If not using the context manager (or decorator), clean up after yourself with `dispose()`.
```{python}
Q = ezpq.Queue(6)
Q.map(random_sleep, range(60))
Q.dispose()
```
## Synchronous Lanes
When you have jobs that are dependent upon another, you can use "lanes" to execute them in sequence. All that is required is an arbitrary lane name/id passed to the `lane` parameter of `put`. Empty lanes are automatically removed.
![](docs/imgs/lanes.gif)
In the above graphic, notice how same-colored bars never overlap. These bars represent jobs that are in the same lane, which executed synchronously.
### Lane Error Handling
You may want to short-circuit a synchronous lane if a job in the lane fails. You can do this by specifying `stop_on_lane_error=True` when putting a job in the queue. If specified and the preceding job has a non-zero exit code, this job will not be run.
```{python, echo=TRUE}
def reciprocal(x):
time.sleep(0.1) # slow things down
return 1/x # will throw DivideByZero exception
```
```{python, echo=TRUE}
import random
with ezpq.Queue(6) as Q:
for i in range(100):
Q.put(reciprocal, random.randint(0, 10), lane=i%5, suppress_errors=True, stop_on_lane_error=True)
Q.wait()
output = Q.collect()
plt = ezpq.Plot(output).build(facet_by='lane', color_by='exitcode', color_pal=['red', 'blue'])
plt.save('docs/imgs/lane_error.png')
```
![](docs/imgs/lane_error.png)
## ezpq.Plot
The `Plot` class is used to visualize the wait, start, and end times for each job that entered the queueing system. The class is initialized with a list of dicts; exactly what is returned from a call to `collect()` or `map()`.
Arguments given to `build()` control various aspects of the plot, from coloring, to faceting,
```{python}
print(help(ezpq.Plot.build))
```
```{python, echo=TRUE}
with ezpq.Queue(6) as Q:
for x in range(60):
lane = x % 5
Q.put(random_sleep, x, timeout=1, lane=lane)
Q.wait()
output = Q.collect()
```
```{python, echo=TRUE}
plt = ezpq.Plot(output).build(facet_by='lane', show_legend=False)
plt.save('docs/imgs/lanes2.png')
```
![](docs/imgs/lanes2.png)
Each horizontal bar represents an independent job id. The start of the gray bar indicates when the job entered the queuing system. The start of the colored bar indicates when the job started running, and when it ended. The gray bar that follows (if any) reflects how long it took for the queue operations to recognize the finished job, join the job data with its output, remove it from the working table, and place it in the completed queue.
## More Examples
Many more examples can be found in [docs/examples.ipynb](//github.com/dm3ll3n/ezpq/blob/master/docs/examples.ipynb).