Skip to content

Commit

Permalink
pre-publish
Browse files Browse the repository at this point in the history
  • Loading branch information
Donald Mellenbruch authored and Donald Mellenbruch committed Jun 21, 2023
1 parent d759d7b commit 0101c8d
Show file tree
Hide file tree
Showing 9 changed files with 689 additions and 96 deletions.
39 changes: 20 additions & 19 deletions README.Rmd
@@ -1,9 +1,10 @@
---
title: '`ezpq`: an easy parallel queueing system.'
output:
github_document:
md_document:
toc: true
toc_depth: 3
variant: markdown_strict+backtick_code_blocks
pandoc_args: ["--atx-headers"]
---

```{r setup, include=FALSE}
Expand All @@ -26,6 +27,8 @@ if os.path.exists('./ezpq/__init__.py') and sys.path[0] != os.getcwd():
import ezpq
```

# `ezpq`: an easy parallel queueing system.

Read this on [GitHub](https://github.com/dm3ll3n/ezpq) or [my site](https://www.donaldmellenbruch.com/project/ezpq/).

## Overview
Expand Down Expand Up @@ -98,17 +101,13 @@ print('> Runtime: ' + str(end - start))
```

Here is the function ran in parallel with an `ezpq` Queue of 6 workers. Thus, the runtime of the above operation will be reduced from ~60s to ~10s.

```{python, echo=TRUE}
```{python, eval=FALSE, echo=TRUE}
start = time.time()
```
```{python, echo=TRUE}
with ezpq.Queue(6) as Q:
output = Q.map(random_sleep, range(60))
```
```{python, echo=TRUE}
end = time.time()
print('> Runtime: ' + str(end - start))
```
Expand Down Expand Up @@ -170,19 +169,21 @@ plt.save('docs/imgs/quickstart.png')

![](docs/imgs/quickstart.png)

## `ezpq.Queue`
## ezpq.Queue

The `Queue` class implements the queueing system, which is itself a 3-part system composed of the:


1. waiting queue
2. working table
3. completed queue


```{python}
print(help(ezpq.Queue.__init__))
```

## `ezpq.Job`
## ezpq.Job

A `ezpq` job defines the function to run. It is passed to an `ezpq` queue with a call to `submit()`.

Expand All @@ -209,7 +210,7 @@ plt.save('docs/imgs/submit.png')
![](docs/imgs/submit.png)


### `put`
### put

The `put` method creates a job and submits it to an `ezpq` queue. All of its arguments are passed to `ezpq.Job()`.

Expand All @@ -221,7 +222,7 @@ with ezpq.Queue(6) as Q:
output = Q.collect()
```

### `size`
### size

`size()` returns a count of all items across all three queue components. It accepts three boolean parameters, `waiting`, `working`, and `completed`. If all of these are `False` (default), all jobs are counted. If any combination of these is `True`, only the corresponding queue(s) will be counted. For example:

Expand Down Expand Up @@ -250,7 +251,7 @@ with ezpq.Queue(6) as Q:
print_sizes(Q)
```

### `wait`
### wait

The `wait()` method will block execution until all jobs complete. It also accepts a `timeout` parameter, given in seconds. The return value is the count of jobs that did not complete. Thus, a return value greater than 0 indicates the timeout was exceeded. The parameter `poll` can be used to adjust how frequently (in seconds) the operation checks for completed jobs (default=0.1).

Expand All @@ -259,7 +260,7 @@ New in v0.2.0, include `show_progress=True` to show a progress bar while waiting
![](docs/imgs/tqdm.gif)


### `get`
### get

`get()` retrieves and deletes ("pop") the highest priority job from the completed queue, if one is available. If the completed queue is empty, `get()` returns `None`. However, `get()` will wait for a completed job if the `poll` frequency is greater than 0. If the timeout is exceeded, `None` is returned.

Expand All @@ -271,12 +272,12 @@ with ezpq.Queue(6) as Q:
for x in range(n_inputs):
Q.put(random_sleep, args=x)
# repeatedly `get()` queue is empty.
# repeatedly `get()` until queue is empty.
for i in range(n_inputs):
output[i] = Q.get(poll=0.1)
```

### `collect`
### collect

`collect()` is similar to `get()`, but it will return a list of *all* completed jobs and clear the completed queue. It does not support the `poll` or `timeout` parameters, but you can call `wait()` before `collect()` if desired.

Expand All @@ -296,13 +297,13 @@ with ezpq.Queue(6) as Q:
print('Output size: {0}'.format(len(output)))
```

### `map`
### map

`map` encapsulates the logic of `put`, `wait`, and `collect` in one call. Include `show_progress=True` to get output `tqdm` progress bar.

![](docs/imgs/tqdm_map.gif)

### `dispose`
### dispose

The queueing operations performed by `ezpq.Queue` are performed on a periodic basis. By default, the `poll` parameter for a Queue is `0.1` seconds. This "pulse" thread will continue firing until the Queue is disposed of.

Expand All @@ -325,7 +326,7 @@ When you have jobs that are dependent upon another, you can use "lanes" to execu

In the above graphic, notice how same-colored bars never overlap. These bars represent jobs that are in the same lane, which executed synchronously.

## `ezpq.Plot`
## ezpq.Plot

The `Plot` class is used to visualize the wait, start, and end times for each job that entered the queueing system. The class is initialized with a list of dicts; exactly what is returned from a call to `collect()` or `map()`.

Expand Down

0 comments on commit 0101c8d

Please sign in to comment.