diff --git a/README.Rmd b/README.Rmd index cf43d36..0b0630e 100644 --- a/README.Rmd +++ b/README.Rmd @@ -1,9 +1,10 @@ --- -title: '`ezpq`: an easy parallel queueing system.' output: - github_document: + md_document: toc: true toc_depth: 3 + variant: markdown_strict+backtick_code_blocks + pandoc_args: ["--atx-headers"] --- ```{r setup, include=FALSE} @@ -26,6 +27,8 @@ if os.path.exists('./ezpq/__init__.py') and sys.path[0] != os.getcwd(): import ezpq ``` +# `ezpq`: an easy parallel queueing system. + Read this on [GitHub](https://github.com/dm3ll3n/ezpq) or [my site](https://www.donaldmellenbruch.com/project/ezpq/). ## Overview @@ -98,17 +101,13 @@ print('> Runtime: ' + str(end - start)) ``` Here is the function ran in parallel with an `ezpq` Queue of 6 workers. Thus, the runtime of the above operation will be reduced from ~60s to ~10s. - -```{python, echo=TRUE} + +```{python, eval=FALSE, echo=TRUE} start = time.time() -``` -```{python, echo=TRUE} with ezpq.Queue(6) as Q: output = Q.map(random_sleep, range(60)) -``` -```{python, echo=TRUE} end = time.time() print('> Runtime: ' + str(end - start)) ``` @@ -170,19 +169,21 @@ plt.save('docs/imgs/quickstart.png') ![](docs/imgs/quickstart.png) -## `ezpq.Queue` +## ezpq.Queue The `Queue` class implements the queueing system, which is itself a 3-part system composed of the: + 1. waiting queue 2. working table 3. completed queue + ```{python} print(help(ezpq.Queue.__init__)) ``` -## `ezpq.Job` +## ezpq.Job A `ezpq` job defines the function to run. It is passed to an `ezpq` queue with a call to `submit()`. @@ -209,7 +210,7 @@ plt.save('docs/imgs/submit.png') ![](docs/imgs/submit.png) -### `put` +### put The `put` method creates a job and submits it to an `ezpq` queue. All of its arguments are passed to `ezpq.Job()`. @@ -221,7 +222,7 @@ with ezpq.Queue(6) as Q: output = Q.collect() ``` -### `size` +### size `size()` returns a count of all items across all three queue components. It accepts three boolean parameters, `waiting`, `working`, and `completed`. If all of these are `False` (default), all jobs are counted. If any combination of these is `True`, only the corresponding queue(s) will be counted. For example: @@ -250,7 +251,7 @@ with ezpq.Queue(6) as Q: print_sizes(Q) ``` -### `wait` +### wait The `wait()` method will block execution until all jobs complete. It also accepts a `timeout` parameter, given in seconds. The return value is the count of jobs that did not complete. Thus, a return value greater than 0 indicates the timeout was exceeded. The parameter `poll` can be used to adjust how frequently (in seconds) the operation checks for completed jobs (default=0.1). @@ -259,7 +260,7 @@ New in v0.2.0, include `show_progress=True` to show a progress bar while waiting ![](docs/imgs/tqdm.gif) -### `get` +### get `get()` retrieves and deletes ("pop") the highest priority job from the completed queue, if one is available. If the completed queue is empty, `get()` returns `None`. However, `get()` will wait for a completed job if the `poll` frequency is greater than 0. If the timeout is exceeded, `None` is returned. @@ -271,12 +272,12 @@ with ezpq.Queue(6) as Q: for x in range(n_inputs): Q.put(random_sleep, args=x) - # repeatedly `get()` queue is empty. + # repeatedly `get()` until queue is empty. for i in range(n_inputs): output[i] = Q.get(poll=0.1) ``` -### `collect` +### collect `collect()` is similar to `get()`, but it will return a list of *all* completed jobs and clear the completed queue. It does not support the `poll` or `timeout` parameters, but you can call `wait()` before `collect()` if desired. @@ -296,13 +297,13 @@ with ezpq.Queue(6) as Q: print('Output size: {0}'.format(len(output))) ``` -### `map` +### map `map` encapsulates the logic of `put`, `wait`, and `collect` in one call. Include `show_progress=True` to get output `tqdm` progress bar. ![](docs/imgs/tqdm_map.gif) -### `dispose` +### dispose The queueing operations performed by `ezpq.Queue` are performed on a periodic basis. By default, the `poll` parameter for a Queue is `0.1` seconds. This "pulse" thread will continue firing until the Queue is disposed of. @@ -325,7 +326,7 @@ When you have jobs that are dependent upon another, you can use "lanes" to execu In the above graphic, notice how same-colored bars never overlap. These bars represent jobs that are in the same lane, which executed synchronously. -## `ezpq.Plot` +## ezpq.Plot The `Plot` class is used to visualize the wait, start, and end times for each job that entered the queueing system. The class is initialized with a list of dicts; exactly what is returned from a call to `collect()` or `map()`. diff --git a/README.md b/README.md index 1dbce05..3be0e2e 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,24 @@ -`ezpq`: an easy parallel queueing system. -================ - -Read this on [GitHub](https://github.com/dm3ll3n/ezpq) or [my -site](https://www.donaldmellenbruch.com/project/ezpq/). +# `ezpq`: an easy parallel queueing system. + +Read this on [GitHub](https://github.com/dm3ll3n/ezpq) or [my site](https://www.donaldmellenbruch.com/project/ezpq/). + +* [`ezpq`: an easy parallel queueing system.](#ezpq-an-easy-parallel-queueing-system) + * [Overview](#overview) + * [Features](#features) + * [How to get it](#how-to-get-it) + * [Quickstart](#quickstart) + * [ezpq.Queue](#ezpqqueue) + * [ezpq.Job](#ezpqjob) + * [put](#put) + * [size](#size) + * [wait](#wait) + * [get](#get) + * [collect](#collect) + * [map](#map) + * [dispose](#dispose) + * [Synchronous Lanes](#synchronous-lanes) + * [ezpq.Plot](#ezpqplot) + * [More Examples](#more-examples) ## Overview @@ -19,21 +35,21 @@ also run jobs with `threading.Thread`. ## Features - - Simple interface; pure Python. - - No required dependencies outside of standard library. - - Optional integration with [`tqdm`](https://github.com/tqdm/tqdm) +- Simple interface; pure Python. +- No required dependencies outside of standard library. +- Optional integration with [`tqdm`](https://github.com/tqdm/tqdm) progress bars. - - Compatible with Python 2 & 3. - - Cross platform with MacOS, Linux, and Windows. - - Data remains in-memory. - - Priority Queueing, both in and out and within lanes. - - Synchronous lanes allow dependent jobs to execute in the desired +- Compatible with Python 2 & 3. +- Cross platform with MacOS, Linux, and Windows. +- Data remains in-memory. +- Priority Queueing, both in and out and within lanes. +- Synchronous lanes allow dependent jobs to execute in the desired order. - - Easily switch from processes to threads. - - Automatic handling of output. - - Rich job details, easily viewed as pandas dataframe. - - Built-in logging to CSV. - - Customizable visualizations of queue operations. +- Easily switch from processes to threads. +- Automatic handling of output. +- Rich job details, easily viewed as pandas dataframe. +- Built-in logging to CSV. +- Customizable visualizations of queue operations. ## How to get it @@ -74,7 +90,7 @@ end = time.time() print('> Runtime: ' + str(end - start)) ``` - ## '> Runtime: 58.9256477355957' + ## '> Runtime: 58.932034969329834' Here is the function ran in parallel with an `ezpq` Queue of 6 workers. Thus, the runtime of the above operation will be reduced from ~60s to @@ -82,20 +98,12 @@ Thus, the runtime of the above operation will be reduced from ~60s to ``` python start = time.time() -``` - -``` python with ezpq.Queue(6) as Q: output = Q.map(random_sleep, range(60)) -``` - -``` python end = time.time() print('> Runtime: ' + str(end - start)) ``` - ## '> Runtime: 11.134793758392334' - Here is the same scenario, using the `@ezpq.Queue` decorator. ``` python @@ -127,10 +135,10 @@ job, along with its output, and exit code. print( output[0] ) ``` - ## {'args': 0, + ## {'args': [0], ## 'callback': None, ## 'cancelled': False, - ## 'ended': datetime.datetime(2019, 1, 25, 16, 21, 2, 691459), + ## 'ended': datetime.datetime(2019, 1, 28, 17, 45, 29, 943860), ## 'exception': None, ## 'exitcode': 0, ## 'function': 'random_sleep', @@ -140,11 +148,11 @@ print( output[0] ) ## 'name': 1, ## 'output': 1.3444218515250481, ## 'priority': 100, - ## 'processed': datetime.datetime(2019, 1, 25, 16, 21, 2, 745552), - ## 'qid': 'e2c223f4', - ## 'runtime': 1.3498365879058838, - ## 'started': datetime.datetime(2019, 1, 25, 16, 21, 1, 341623), - ## 'submitted': datetime.datetime(2019, 1, 25, 16, 21, 1, 236632), + ## 'processed': datetime.datetime(2019, 1, 28, 17, 45, 29, 998175), + ## 'qid': 'd6eaaf93', + ## 'runtime': 1.3502492904663086, + ## 'started': datetime.datetime(2019, 1, 28, 17, 45, 28, 593611), + ## 'submitted': datetime.datetime(2019, 1, 28, 17, 45, 28, 489300), ## 'timeout': 0} Easily convert output to a `pandas` dataframe: @@ -156,11 +164,11 @@ print( df.head()[['id', 'output', 'runtime', 'exitcode']] ) ``` ## id output runtime exitcode - ## 0 1 1.344422 1.349837 0 - ## 1 2 0.634364 0.638510 0 - ## 2 3 1.456034 1.460064 0 - ## 3 4 0.737965 0.740669 0 - ## 4 5 0.736048 0.739299 0 + ## 0 1 1.344422 1.350249 0 + ## 1 2 0.634364 0.638975 0 + ## 2 3 1.456034 1.460431 0 + ## 3 4 0.737965 0.742028 0 + ## 4 5 0.736048 0.740672 0 Use `ezpq.Plot` to generate a Gannt chart of the job timings. @@ -171,7 +179,7 @@ plt.save('docs/imgs/quickstart.png') ![](docs/imgs/quickstart.png) -## `ezpq.Queue` +## ezpq.Queue The `Queue` class implements the queueing system, which is itself a 3-part system composed of the: @@ -180,7 +188,7 @@ The `Queue` class implements the queueing system, which is itself a 2. working table 3. completed queue - + ## Help on function __init__ in module ezpq.Queue: ## @@ -221,7 +229,7 @@ The `Queue` class implements the queueing system, which is itself a ## ## None -## `ezpq.Job` +## ezpq.Job A `ezpq` job defines the function to run. It is passed to an `ezpq` queue with a call to `submit()`. @@ -270,7 +278,7 @@ with ezpq.Queue(6) as Q: ![](docs/imgs/submit.png) -### `put` +### put The `put` method creates a job and submits it to an `ezpq` queue. All of its arguments are passed to `ezpq.Job()`. @@ -283,7 +291,7 @@ with ezpq.Queue(6) as Q: output = Q.collect() ``` -### `size` +### size `size()` returns a count of all items across all three queue components. It accepts three boolean parameters, `waiting`, `working`, and @@ -321,14 +329,14 @@ with ezpq.Queue(6) as Q: ## 'Total: 60; Waiting: 34; Working: 6; Completed: 20' ## 'Total: 60; Waiting: 31; Working: 6; Completed: 23' ## 'Total: 60; Waiting: 24; Working: 6; Completed: 30' - ## 'Total: 60; Waiting: 18; Working: 6; Completed: 36' - ## 'Total: 60; Waiting: 12; Working: 6; Completed: 42' + ## 'Total: 60; Waiting: 17; Working: 6; Completed: 37' + ## 'Total: 60; Waiting: 11; Working: 6; Completed: 43' ## 'Total: 60; Waiting: 6; Working: 6; Completed: 48' - ## 'Total: 60; Waiting: 1; Working: 6; Completed: 53' + ## 'Total: 60; Waiting: 0; Working: 5; Completed: 55' ## 'Total: 60; Waiting: 0; Working: 1; Completed: 59' ## 'Total: 60; Waiting: 0; Working: 0; Completed: 60' -### `wait` +### wait The `wait()` method will block execution until all jobs complete. It also accepts a `timeout` parameter, given in seconds. The return value @@ -342,7 +350,7 @@ waiting. This is equivalent to a call to `waitpb()`. ![](docs/imgs/tqdm.gif) -### `get` +### get `get()` retrieves and deletes (“pop”) the highest priority job from the completed queue, if one is available. If the completed queue is empty, @@ -358,12 +366,12 @@ with ezpq.Queue(6) as Q: for x in range(n_inputs): Q.put(random_sleep, args=x) - # repeatedly `get()` queue is empty. + # repeatedly `get()` until queue is empty. for i in range(n_inputs): output[i] = Q.get(poll=0.1) ``` -### `collect` +### collect `collect()` is similar to `get()`, but it will return a list of *all* completed jobs and clear the completed queue. It does not support the @@ -387,23 +395,24 @@ with ezpq.Queue(6) as Q: ## 'Queue size after: 0' ## 'Output size: 60' -### `map` +### map `map` encapsulates the logic of `put`, `wait`, and `collect` in one call. Include `show_progress=True` to get output `tqdm` progress bar. ![](docs/imgs/tqdm_map.gif) -### `dispose` +### dispose The queueing operations performed by `ezpq.Queue` are performed on a periodic basis. By default, the `poll` parameter for a Queue is `0.1` seconds. This “pulse” thread will continue firing until the Queue is disposed of. -In the previous examples, use of the context manager (`with ezpq.Queue() -as Q:`) results in automatic disposal. If not using the context manager -(or decorator), clean up after yourself with `dispose()`. +In the previous examples, use of the context manager +(`with ezpq.Queue() as Q:`) results in automatic disposal. If not using +the context manager (or decorator), clean up after yourself with +`dispose()`. ## Synchronous Lanes @@ -418,7 +427,7 @@ In the above graphic, notice how same-colored bars never overlap. These bars represent jobs that are in the same lane, which executed synchronously. -## `ezpq.Plot` +## ezpq.Plot The `Plot` class is used to visualize the wait, start, and end times for each job that entered the queueing system. The class is initialized with diff --git a/README.rst b/README.rst new file mode 100644 index 0000000..c1430fe --- /dev/null +++ b/README.rst @@ -0,0 +1,545 @@ +``ezpq``: an easy parallel queueing system. +=========================================== + +Read this on `GitHub `__ or `my +site `__. + +- ```ezpq``: an easy parallel queueing + system. <#ezpq-an-easy-parallel-queueing-system>`__ +- `Overview <#overview>`__ +- `Features <#features>`__ +- `How to get it <#how-to-get-it>`__ +- `Quickstart <#quickstart>`__ +- `ezpq.Queue <#ezpqqueue>`__ +- `ezpq.Job <#ezpqjob>`__ + + - `put <#put>`__ + - `size <#size>`__ + - `wait <#wait>`__ + - `get <#get>`__ + - `collect <#collect>`__ + - `map <#map>`__ + - `dispose <#dispose>`__ + +- `Synchronous Lanes <#synchronous-lanes>`__ +- `ezpq.Plot <#ezpqplot>`__ +- `More Examples <#more-examples>`__ + +Overview +-------- + +``ezpq`` implements a parallel queueing system consisting of: + +1. a priority “waiting” queue in. +2. a lookup table of “working” jobs. +3. a priority “completed” queue out. + +The queueing system uses ``multiprocessing.Process`` by default and can +also run jobs with ``threading.Thread``. + +.. figure:: docs/imgs/ezpq.png + :alt: + +Features +-------- + +- Simple interface; pure Python. +- No required dependencies outside of standard library. +- Optional integration with ```tqdm`` `__ + progress bars. +- Compatible with Python 2 & 3. +- Cross platform with MacOS, Linux, and Windows. +- Data remains in-memory. +- Priority Queueing, both in and out and within lanes. +- Synchronous lanes allow dependent jobs to execute in the desired + order. +- Easily switch from processes to threads. +- Automatic handling of output. +- Rich job details, easily viewed as pandas dataframe. +- Built-in logging to CSV. +- Customizable visualizations of queue operations. + +How to get it +------------- + +Install from `PyPI `__ with: + +.. code:: python + + pip install ezpq + +Optional packages: + +.. code:: python + + pip install pandas # required for plots + pip install plotnine # required for plots + pip install tqdm # required for progress bars + +Quickstart +---------- + +Suppose you wanted to speed up the following code, which runs 60 +operations that take anywhere from 0s to 2s. With an average job time of +~1s, this operation should take ~60s. + +.. code:: python + + import time + import random + def random_sleep(x): + random.seed(x) + n = random.uniform(0.5, 1.5) + time.sleep(n) + return n + +.. code:: python + + start = time.time() + output = [random_sleep(x) for x in range(60)] + end = time.time() + print('> Runtime: ' + str(end - start)) + +:: + + ## '> Runtime: 58.932034969329834' + +Here is the function ran in parallel with an ``ezpq`` Queue of 6 +workers. Thus, the runtime of the above operation will be reduced from +~60s to ~10s. + +.. code:: python + + start = time.time() + with ezpq.Queue(6) as Q: + output = Q.map(random_sleep, range(60)) + end = time.time() + print('> Runtime: ' + str(end - start)) + +Here is the same scenario, using the ``@ezpq.Queue`` decorator. + +.. code:: python + + @ezpq.Queue(6) + def random_sleep(x): + random.seed(x) + n = random.uniform(0.5, 1.5) + time.sleep(n) + return n + output = random_sleep(iterable=range(60)) + +While ``map()`` and the decorator are useful for quick-n-simple +parallization, the essential functions of an ``ezpq`` Queue include +``put()``, ``wait()``, and ``get()`` (or ``collect()``). + +.. code:: python + + with ezpq.Queue(6) as Q: + for x in range(60): + Q.put(random_sleep, args=x) + Q.wait() + output = Q.collect() + +The output is a list of dicts containing verbose information about each +job, along with its output, and exit code. + +.. code:: python + + print( output[0] ) + +:: + + ## {'args': [0], + ## 'callback': None, + ## 'cancelled': False, + ## 'ended': datetime.datetime(2019, 1, 28, 17, 45, 29, 943860), + ## 'exception': None, + ## 'exitcode': 0, + ## 'function': 'random_sleep', + ## 'id': 1, + ## 'kwargs': None, + ## 'lane': None, + ## 'name': 1, + ## 'output': 1.3444218515250481, + ## 'priority': 100, + ## 'processed': datetime.datetime(2019, 1, 28, 17, 45, 29, 998175), + ## 'qid': 'd6eaaf93', + ## 'runtime': 1.3502492904663086, + ## 'started': datetime.datetime(2019, 1, 28, 17, 45, 28, 593611), + ## 'submitted': datetime.datetime(2019, 1, 28, 17, 45, 28, 489300), + ## 'timeout': 0} + +Easily convert output to a ``pandas`` dataframe: + +.. code:: python + + import pandas as pd + df = pd.DataFrame(output) + print( df.head()[['id', 'output', 'runtime', 'exitcode']] ) + +:: + + ## id output runtime exitcode + ## 0 1 1.344422 1.350249 0 + ## 1 2 0.634364 0.638975 0 + ## 2 3 1.456034 1.460431 0 + ## 3 4 0.737965 0.742028 0 + ## 4 5 0.736048 0.740672 0 + +Use ``ezpq.Plot`` to generate a Gannt chart of the job timings. + +.. code:: python + + plt = ezpq.Plot(output).build(show_legend=False) + plt.save('docs/imgs/quickstart.png') + +.. figure:: docs/imgs/quickstart.png + :alt: + +ezpq.Queue +---------- + +The ``Queue`` class implements the queueing system, which is itself a +3-part system composed of the: + +1. waiting queue +2. working table +3. completed queue + + + + +:: + + ## Help on function __init__ in module ezpq.Queue: + ## + ## __init__(self, n_workers=8, max_size=0, job_runner=, auto_remove=False, auto_start=True, auto_stop=False, callback=None, log_file=None, poll=0.1, show_progress=False, qid=None) + ## Implements a parallel queueing system. + ## + ## Args: + ## n_workers: the max number of concurrent jobs. + ## - Accepts: int + ## - Default: cpu_count() + ## max_size: when > 0, will throw an exception the number of enqueued jobs exceeds this value. Otherwise, no limit. + ## - Accepts: int + ## - Default: 0 (unlimited) + ## job_runner: the class to use to invoke new jobs. + ## - Accepts: multiprocessing.Process, threading.Thread + ## - Default: multiprocessing.Process + ## auto_remove: controls whether jobs are discarded of after completion. + ## - Accepts: bool + ## - Default: False + ## auto_start: controls whether the queue system "pulse" is started upon instantiation (default), or manually. + ## - Accepts: bool + ## - Default: True + ## auto_stop: controls whether the queue system "pulse" stops itself after all jobs are complete. + ## - Accepts: bool + ## - Default: False + ## callback: optional function to execute synchronously immediately after a job completes. + ## - Accepts: function object + ## - Default: None + ## log_file: if file path is specified, job data is written to this path in CSV format. + ## - Accepts: str + ## - Default: None + ## poll: controls the pulse frequency; the amount of time slept between operations. + ## - Accepts: float + ## - Default: 0.1 + ## + ## Returns: + ## ezpq.Queue object. + ## + ## None + +ezpq.Job +-------- + +A ``ezpq`` job defines the function to run. It is passed to an ``ezpq`` +queue with a call to ``submit()``. + +:: + + ## Help on function __init__ in module ezpq.Job: + ## + ## __init__(self, function, args=None, kwargs=None, name=None, priority=100, lane=None, timeout=0, suppress_errors=False) + ## Defines what to run within a `ezpq.Queue`, and how to run it. + ## + ## Args: + ## function: the function to run. + ## - Accepts: function object + ## args: optional positional arguments to pass to the function. + ## - Accepts: list, tuple + ## - Default: None + ## kwargs: optional keyword arguments to pass to the function. + ## - Accepts: dict + ## - Default: None + ## name: optional name to give to the job. Does not have to be unique. + ## - Accepts: str + ## - Default: None; assumes same name as job id. + ## priority: priority value to assign. Lower values get processed sooner. + ## - Accepts: int + ## - Default: 100 + ## lane: a sequential lane to place the job in. if it does not already exist, it will be created. + ## - Accepts: int, str; any hashable object + ## - Default: None; no lanes. + ## timeout: When > 0, if this value (in seconds) is exceeded, the job is terminated. Otherwise, no limit enforced. + ## - Accepts: float + ## - Default: 0 (unlimited) + ## + ## Returns: + ## ezpq.Job object + ## + ## None + +.. code:: python + + with ezpq.Queue(6) as Q: + for x in range(60): + priority = x % 2 # give even numbers higher priority. + job = ezpq.Job(random_sleep, args=x, priority=priority) + Q.submit(job) + Q.wait() + output = Q.collect() + +.. figure:: docs/imgs/submit.png + :alt: + +put +~~~ + +The ``put`` method creates a job and submits it to an ``ezpq`` queue. +All of its arguments are passed to ``ezpq.Job()``. + +.. code:: python + + with ezpq.Queue(6) as Q: + for x in range(60): + Q.put(random_sleep, args=x) + Q.wait() + output = Q.collect() + +size +~~~~ + +``size()`` returns a count of all items across all three queue +components. It accepts three boolean parameters, ``waiting``, +``working``, and ``completed``. If all of these are ``False`` (default), +all jobs are counted. If any combination of these is ``True``, only the +corresponding queue(s) will be counted. For example: + +.. code:: python + + def print_sizes(Q): + msg = 'Total: {0}; Waiting: {1}; Working: {2}; Completed: {3}'.format( + Q.size(), + Q.size(waiting=True), + Q.size(working=True), + Q.size(completed=True) + ) + print(msg) + +.. code:: python + + with ezpq.Queue(6) as Q: + # enqueue jobs + for x in range(60): + Q.put(random_sleep, x) + # repeatedly print sizes until complete. + while Q.has_work(): + print_sizes(Q) + time.sleep(1) + print_sizes(Q) + +:: + + ## 'Total: 60; Waiting: 60; Working: 0; Completed: 0' + ## 'Total: 60; Waiting: 51; Working: 6; Completed: 3' + ## 'Total: 60; Waiting: 46; Working: 6; Completed: 8' + ## 'Total: 60; Waiting: 39; Working: 6; Completed: 15' + ## 'Total: 60; Waiting: 34; Working: 6; Completed: 20' + ## 'Total: 60; Waiting: 31; Working: 6; Completed: 23' + ## 'Total: 60; Waiting: 24; Working: 6; Completed: 30' + ## 'Total: 60; Waiting: 17; Working: 6; Completed: 37' + ## 'Total: 60; Waiting: 11; Working: 6; Completed: 43' + ## 'Total: 60; Waiting: 6; Working: 6; Completed: 48' + ## 'Total: 60; Waiting: 0; Working: 5; Completed: 55' + ## 'Total: 60; Waiting: 0; Working: 1; Completed: 59' + ## 'Total: 60; Waiting: 0; Working: 0; Completed: 60' + +wait +~~~~ + +The ``wait()`` method will block execution until all jobs complete. It +also accepts a ``timeout`` parameter, given in seconds. The return value +is the count of jobs that did not complete. Thus, a return value greater +than 0 indicates the timeout was exceeded. The parameter ``poll`` can be +used to adjust how frequently (in seconds) the operation checks for +completed jobs (default=0.1). + +New in v0.2.0, include ``show_progress=True`` to show a progress bar +while waiting. This is equivalent to a call to ``waitpb()``. + +.. figure:: docs/imgs/tqdm.gif + :alt: + +get +~~~ + +``get()`` retrieves and deletes (“pop”) the highest priority job from +the completed queue, if one is available. If the completed queue is +empty, ``get()`` returns ``None``. However, ``get()`` will wait for a +completed job if the ``poll`` frequency is greater than 0. If the +timeout is exceeded, ``None`` is returned. + +.. code:: python + + with ezpq.Queue(6) as Q: + n_inputs = 60 + output = [None] * n_inputs + # enqueue jobs + for x in range(n_inputs): + Q.put(random_sleep, args=x) + + # repeatedly `get()` until queue is empty. + for i in range(n_inputs): + output[i] = Q.get(poll=0.1) + +collect +~~~~~~~ + +``collect()`` is similar to ``get()``, but it will return a list of +*all* completed jobs and clear the completed queue. It does not support +the ``poll`` or ``timeout`` parameters, but you can call ``wait()`` +before ``collect()`` if desired. + +.. code:: python + + with ezpq.Queue(6) as Q: + # enqueue jobs + for x in range(60): + Q.put(random_sleep, x) + # wait and collect all jobs + print('Queue size before: {0}'.format(Q.size())) + Q.wait() + output = Q.collect() + print('Queue size after: {0}'.format(Q.size())) + print('Output size: {0}'.format(len(output))) + +:: + + ## 'Queue size before: 60' + ## 'Queue size after: 0' + ## 'Output size: 60' + +map +~~~ + +``map`` encapsulates the logic of ``put``, ``wait``, and ``collect`` in +one call. Include ``show_progress=True`` to get output ``tqdm`` progress +bar. + +.. figure:: docs/imgs/tqdm_map.gif + :alt: + +dispose +~~~~~~~ + +The queueing operations performed by ``ezpq.Queue`` are performed on a +periodic basis. By default, the ``poll`` parameter for a Queue is +``0.1`` seconds. This “pulse” thread will continue firing until the +Queue is disposed of. + +In the previous examples, use of the context manager +(``with ezpq.Queue() as Q:``) results in automatic disposal. If not +using the context manager (or decorator), clean up after yourself with +``dispose()``. + +Synchronous Lanes +----------------- + +When you have jobs that are dependent upon another, you can use “lanes” +to execute them in sequence. All that is required is an arbitrary lane +name/id passed to the ``lane`` parameter of ``put``. Empty lanes are +automatically removed. + +.. figure:: docs/imgs/lanes.gif + :alt: + +In the above graphic, notice how same-colored bars never overlap. These +bars represent jobs that are in the same lane, which executed +synchronously. + +ezpq.Plot +--------- + +The ``Plot`` class is used to visualize the wait, start, and end times +for each job that entered the queueing system. The class is initialized +with a list of dicts; exactly what is returned from a call to +``collect()`` or ``map()``. + +Arguments given to ``build()`` control various aspects of the plot, from +coloring, to faceting, + +:: + + ## Help on function build in module ezpq.Plot: + ## + ## build(self, color_by='qid', facet_by='qid', facet_scale='fixed', show_legend=True, bar_width=1, title=None, color_pal=None, theme='bw') + ## Produces a plot based on the data and options provided to a `ezpq.Plot()` object. + ## + ## Args: + ## color_by: controls the column to use for coloring the bars. + ## - Accepts: one of 'qid', 'priority', 'lane', 'cancelled', 'exitcode', 'name', 'output' + ## - Default: 'qid' + ## facet_by: controls the column to use for facetting the plot. + ## - Accepts: one of 'qid', 'priority', 'lane', 'cancelled', 'exitcode', 'name', 'output' + ## - Default: 'qid' + ## facet_scale: controls the scale of the x/y axis across facets. + ## - Accepts: one of 'fixed', 'free', 'free_x', 'free_y' + ## - Default: 'fixed' + ## show_legend: controls whether the legend is drawn. + ## - Accepts: bool + ## - Default: True + ## bar_width: controls the bar width + ## - Accepts: float + ## - Default: 1 + ## title: optional title to be drawn above the plot. + ## - Accepts: str, None + ## - Default: None + ## theme: + ## - Accepts: 'bw', 'classic', 'gray', 'grey', 'seaborn', '538', 'dark', 'matplotlib', 'minimal', 'xkcd', 'light' + ## - Default: 'bw' + ## Returns: + ## The plot produced from plotnine.ggplot(). + ## + ## None + +.. code:: python + + with ezpq.Queue(6) as Q: + for x in range(60): + lane = x % 5 + Q.put(random_sleep, x, timeout=1, lane=lane) + Q.wait() + output = Q.collect() + +.. code:: python + + plt = ezpq.Plot(output).build(facet_by='lane', show_legend=False) + plt.save('docs/imgs/lanes2.png') + +.. figure:: docs/imgs/lanes2.png + :alt: + +Each horizontal bar represents an independent job id. The start of the +gray bar indicates when the job entered the queuing system. The start of +the colored bar indicates when the job started running, and when it +ended. The gray bar that follows (if any) reflects how long it took for +the queue operations to recognize the finished job, join the job data +with its output, remove it from the working table, and place it in the +completed queue. + +More Examples +------------- + +Many more examples can be found in +`docs/examples.ipynb `__. diff --git a/ezpq/Job.py b/ezpq/Job.py index 5a616f4..1226915 100644 --- a/ezpq/Job.py +++ b/ezpq/Job.py @@ -39,8 +39,19 @@ def __init__(self, function, args=None, kwargs=None, name=None, priority=100, la self.lane = lane self.timeout = timeout self.function = function - self.args = args - self.kwargs = kwargs + + if args is None: + self.args = None + elif not hasattr(args, '__iter__'): + self.args = [args] + else: + self.args = list(args) + + if kwargs is None: + self.kwargs = None + else: + self.kwargs = dict(kwargs) + self.priority = priority self._suppress_errors = suppress_errors self._inner_job = None diff --git a/ezpq/Queue.py b/ezpq/Queue.py index 9834dc0..8d59cd3 100644 --- a/ezpq/Queue.py +++ b/ezpq/Queue.py @@ -481,20 +481,21 @@ def waitpb(self, poll=0.1, timeout=0): def _job_wrap(_job, _output, *args, **kwargs): '''Used internally to wrap a job, capture output and any exception.''' out = None - ex = None + ex_obj = None ex_msg = None code = 0 try: out = _job.function(*args, **kwargs) except Exception as ex: - ex_msg = traceback.format_exc() #ex + ex_obj = ex + ex_msg = traceback.format_exc() code = -1 _output.update({ _job._id: {'_ended':time.time(), '_output':out, '_exception': ex_msg, '_exitcode': code} }) - if ex is not None and not _job._suppress_errors: - raise ex + if ex_obj is not None and not _job._suppress_errors: + raise ex_obj def _start_job(self, job): '''Internal; invokes jobs.''' @@ -502,10 +503,10 @@ def _start_job(self, job): job_args = dict() if job.args is not None: - if not isinstance(job.args, list): - job_args['args'] = [job, self._output, job.args] - else: - job_args['args'] = [job, self._output] + job.args + # if not isinstance(job.args, list): + # job_args['args'] = [job, self._output, job.args] + # else: + job_args['args'] = [job, self._output] + job.args if job.kwargs is None: job_args['kwargs'] = dict() diff --git a/ezpq/__init__.py b/ezpq/__init__.py index 81e174e..f65e02e 100644 --- a/ezpq/__init__.py +++ b/ezpq/__init__.py @@ -1,4 +1,4 @@ -__version__ = "0.1.0" +__version__ = "0.2.0" from .Job import Job from .Queue import Queue diff --git a/publish.ps1 b/publish.ps1 new file mode 100644 index 0000000..0d8e8bc --- /dev/null +++ b/publish.ps1 @@ -0,0 +1,25 @@ +# https://realpython.com/pypi-publish-python-package + +# pip install setuptools wheel +# pip install twine + +Remove-Item 'build', 'dist', 'ezpq.egg-info' -Recurse -Force -ea 0 + +pandoc --from=markdown --to=commonmark -o README.md README.md +pandoc --from=commonmark --to=rst --output=README.rst README.md + +# fix 'Warning: "raw" directive disabled.' +$rst = Get-Content 'README.rst' | Where-Object { $_ -ne '.. raw:: html' } +$rst | Out-File 'README.rst' + +python setup.py sdist bdist_wheel + +twine check dist/* + +twine upload --repository-url https://test.pypi.org/legacy/ dist/* + +python3 -m pip install --index-url https://test.pypi.org/simple/ ezpq + +# twine upload dist/* + + diff --git a/setup.py b/setup.py index 951e767..675f09c 100644 --- a/setup.py +++ b/setup.py @@ -1,5 +1,3 @@ -# https://realpython.com/pypi-publish-python-package - import pathlib from setuptools import setup @@ -7,16 +5,16 @@ HERE = pathlib.Path(__file__).parent # The text of the README file -README = (HERE / "README.md").read_text() +README = (HERE / "README.rst").read_text() setup( name="ezpq", - version="0.1.0", - download_url = 'https://github.com/dm3ll3n/ezpq/archive/0.1.0.tar.gz', + version="0.2.0", + #download_url = 'https://github.com/dm3ll3n/ezpq/releases/download/0.2.0/ezpq-0.2.0.tar.gz', description="an easy parallel queueing system", long_description=README, - long_description_content_type='text/markdown', + long_description_content_type='text/x-rst', url="https://www.github.com/dm3ll3n/ezpq", author="Donald Mellenbruch", author_email="dmellenbruch@outlook.com", diff --git a/test/test_ezpq.py b/test/test_ezpq.py index 41b489f..9eba33d 100644 --- a/test/test_ezpq.py +++ b/test/test_ezpq.py @@ -2,6 +2,7 @@ import unittest from multiprocessing import Process from threading import Thread +import random from _context import ezpq @@ -12,14 +13,16 @@ class TestEZPQ(unittest.TestCase): def setUp(self): self.Q = ezpq.Queue(job_runner=Process, auto_start=True, n_workers=5) - self.input = tuple(range(100)) + in_list = list(range(100)) + random.shuffle(in_list) + self.input = tuple(in_list) def test_priority(self): self.Q._stop() - for x in self.input: + for i,x in enumerate(self.input): self.Q.put(function=return_me, args=x, - priority=-x) # should result in reversed inputs. + priority=-i) # should result in reversed inputs. self.Q.start() self.Q.wait()