Skip to content

Commit

Permalink
Merge pull request #27 from Koed00/dev
Browse files Browse the repository at this point in the history
Adds `save` override options for tasks
  • Loading branch information
Koed00 committed Jul 21, 2015
2 parents 61b859b + ad3a76b commit 9e33174
Show file tree
Hide file tree
Showing 14 changed files with 131 additions and 55 deletions.
1 change: 0 additions & 1 deletion MANIFEST.in
Expand Up @@ -2,5 +2,4 @@ include LICENSE
include README.rst
include django_q/management/*.py
include django_q/management/commands/*.py
exclude django_q/management/commands/qtest.py
include django_q/migrations/*.py
1 change: 1 addition & 0 deletions django_q/__init__.py
Expand Up @@ -7,6 +7,7 @@
from .tasks import async, schedule, result, result_group, fetch, fetch_group, count_group, delete_group
from .models import Task, Schedule, Success, Failure
from .cluster import Cluster
from .monitor import Stat

VERSION = (0, 4, 0)

Expand Down
11 changes: 6 additions & 5 deletions django_q/cluster.py
Expand Up @@ -394,12 +394,11 @@ def save_task(task):
Saves the task package to Django
"""
# SAVE LIMIT < 0 : Don't save success
if Conf.SAVE_LIMIT < 0 and task['success']:
if not task.get('save', Conf.SAVE_LIMIT > 0) and task['success']:
return
# SAVE LIMIT > 0: Prune database, SAVE_LIMIT 0: No pruning
if task['success'] and 0 < Conf.SAVE_LIMIT < Success.objects.count():
Success.objects.last().delete()

try:
Task.objects.create(id=task['id'],
name=task['name'],
Expand Down Expand Up @@ -435,8 +434,9 @@ def scheduler(list_key=Conf.Q_LIST):
# single value won't eval to tuple, so:
if type(args) != tuple:
args = (args,)
q_options = kwargs.get('q_options', {})
if s.hook:
kwargs['hook'] = s.hook
q_options['hook'] = s.hook
# set up the next run time
if not s.schedule_type == s.ONCE:
next_run = arrow.get(s.next_run)
Expand All @@ -455,8 +455,9 @@ def scheduler(list_key=Conf.Q_LIST):
s.next_run = next_run.datetime
s.repeats += -1
# send it to the cluster
kwargs['list_key'] = list_key
kwargs['group'] = s.name or s.id
q_options['list_key'] = list_key
q_options['group'] = s.name or s.id
kwargs['q_options'] = q_options
s.task = tasks.async(s.func, *args, **kwargs)
# log it
if not s.task:
Expand Down
20 changes: 10 additions & 10 deletions django_q/tasks.py
Expand Up @@ -20,16 +20,14 @@ def async(func, *args, **kwargs):
"""
Sends a task to the cluster
"""
# optional hook
hook = kwargs.pop('hook', None)
# optional list_key
list_key = kwargs.pop('list_key', Conf.Q_LIST)
# optional redis connection
redis = kwargs.pop('redis', redis_client)
# optional sync mode
sync = kwargs.pop('sync', False)
# optional group
group = kwargs.pop('group', None)
# get options from q_options dict or direct from kwargs
options = kwargs.pop('q_options', kwargs)
hook = options.pop('hook', None)
list_key = options.pop('list_key', Conf.Q_LIST)
redis = options.pop('redis', redis_client)
sync = options.pop('sync', False)
group = options.pop('group', None)
save = options.pop('save', None)
# get an id
tag = uuid()
# build the task package
Expand All @@ -40,6 +38,8 @@ def async(func, *args, **kwargs):
task['hook'] = hook
if group:
task['group'] = group
if save is not None:
task['save'] = save
# sign it
pack = signing.SignedPackage.dumps(task)
if sync:
Expand Down
9 changes: 7 additions & 2 deletions django_q/tests/test_cluster.py
Expand Up @@ -124,6 +124,9 @@ def test_async(r, admin_user):
h = async('django_q.tests.tasks.word_multiply', 2, word='django', hook='fail.me', list_key=list_key, redis=r)
# args unpickle test
j = async('django_q.tests.tasks.get_user_id', admin_user, list_key=list_key, group='test_j', redis=r)
# q_options and save opt_out test
k = async('django_q.tests.tasks.get_user_id', admin_user,
q_options={'list_key': list_key, 'group': 'test_k', 'redis': r, 'save': False, 'timeout': 90})
# check if everything has a task id
assert isinstance(a, str)
assert isinstance(b, str)
Expand All @@ -134,8 +137,9 @@ def test_async(r, admin_user):
assert isinstance(g, str)
assert isinstance(h, str)
assert isinstance(j, str)
assert isinstance(k, str)
# run the cluster to execute the tasks
task_count = 9
task_count = 10
assert r.llen(list_key) == task_count
task_queue = Queue()
stop_event = Event()
Expand Down Expand Up @@ -210,7 +214,8 @@ def test_async(r, admin_user):
assert count_group('test_j', failures=True) == 0
assert delete_group('test_j') == 1
assert delete_group('test_j', tasks=True) is None

# task k should not have been saved
assert fetch(k) is None
r.delete(list_key)


Expand Down
4 changes: 2 additions & 2 deletions docs/admin.rst
Expand Up @@ -41,14 +41,14 @@ Repeats
If you want a schedule to only run a finite amount of times, e.g. every hour for the next 24 hours, you can do that using the :attr:`Schedule.repeats` attribute.
In this case you would set the schedule type to :attr:`Schedule.HOURLY` and the repeats to `24`. Every time the schedule runs the repeats count down until it hits zero and schedule is no longer run.

When you set repeats to `-1` the schedule will continue indefinitely and the repeats will still count down. This can be used as an indicator of how many times the schedule has been executed.
When you set repeats to ``-1`` the schedule will continue indefinitely and the repeats will still count down. This can be used as an indicator of how many times the schedule has been executed.

An exception to this are schedules of type :attr:`Schedule.ONCE`. Negative repeats for this schedule type will cause it to be deleted from the database.
This behavior is useful if you have many delayed actions which you do not necessarily need a result for. A positive number will keep the ONCE schedule, but it will not run again.

.. note::

To run a `Once` schedule again, change the repeats to something other than `0`. Set a new run time before you do this or let it execute immediately.
To run a ``ONCE`` schedule again, change the repeats to something other than `0`. Set a new run time before you do this or let it execute immediately.


Next run
Expand Down
8 changes: 4 additions & 4 deletions docs/cluster.rst
Expand Up @@ -4,7 +4,7 @@ Cluster
.. py:currentmodule:: django_q
Django Q uses Python's multiprocessing module to manage a pool of workers that will handle your tasks.
Start your cluster using Django's `manage.py` command::
Start your cluster using Django's ``manage.py`` command::

$ python manage.py qcluster

Expand All @@ -26,7 +26,7 @@ You should see the cluster starting ::
10:57:40 [Q] INFO Q Cluster-31781 running.


Stopping the cluster with ctrl-c or either the `SIGTERM` and `SIGKILL` signals, will initiate the :ref:`stop_procedure`::
Stopping the cluster with ctrl-c or either the ``SIGTERM`` and ``SIGKILL`` signals, will initiate the :ref:`stop_procedure`::

16:44:12 [Q] INFO Q Cluster-31781 stopping.
16:44:12 [Q] INFO Process-1 stopping cluster processes
Expand All @@ -50,7 +50,7 @@ You can have multiple clusters on multiple machines, working on the same queue a

- They connect to the same Redis server.
- They use the same cluster name. See :ref:`configuration`
- They share the same `SECRET_KEY`
- They share the same ``SECRET_KEY``

Using a Procfile
----------------
Expand Down Expand Up @@ -80,7 +80,7 @@ An example :file:`circus.ini` ::


Note that we only start one process. It is not a good idea to run multiple instances of the cluster in the same environment since this does nothing to increase performance and in all likelihood will diminish it.
Control your cluster using the `workers`, `recycle` and `timeout` settings in your :ref:`configuration`
Control your cluster using the ``workers``, ``recycle`` and ``timeout`` settings in your :ref:`configuration`

Architecture
------------
Expand Down
2 changes: 1 addition & 1 deletion docs/conf.py
Expand Up @@ -72,7 +72,7 @@
# The short X.Y version.
version = '0.4'
# The full version, including alpha/beta/rc tags.
release = '0.4.0'
release = '0.4.1'

# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
Expand Down
2 changes: 1 addition & 1 deletion docs/index.rst
Expand Up @@ -16,7 +16,7 @@ Features
- Scheduled and repeated tasks
- Encrypted and compressed packages
- Failure and success database
- Result hooks
- Result hooks and groups
- Django Admin integration
- PaaS compatible with multiple instances
- Multi cluster monitor
Expand Down
13 changes: 7 additions & 6 deletions docs/install.rst
Expand Up @@ -6,7 +6,7 @@ Installation
$ pip install django-q


- Add :mod:`django_q` to `INSTALLED_APPS` in your projects :file:`settings.py`::
- Add :mod:`django_q` to ``INSTALLED_APPS`` in your projects :file:`settings.py`::

INSTALLED_APPS = (
# other apps
Expand All @@ -18,14 +18,14 @@ Installation
$ python manage.py migrate

- Make sure you have a `Redis <http://redis.io/>`__ server running
somewhere
somewhere and know how to connect to it.

.. _configuration:

Configuration
-------------

Configuration is handled via the `Q_CLUSTER` dictionary in your :file:`settings.py`
Configuration is handled via the ``Q_CLUSTER`` dictionary in your :file:`settings.py`

.. code:: python
Expand Down Expand Up @@ -133,7 +133,7 @@ of the cache connection you want to use::


.. tip::
Django Q uses your `SECRET_KEY` to encrypt task packages and prevent task crossover. So make sure you have it set up in your Django settings.
Django Q uses your ``SECRET_KEY`` to encrypt task packages and prevent task crossover. So make sure you have it set up in your Django settings.

cpu_affinity
~~~~~~~~~~~~
Expand Down Expand Up @@ -175,7 +175,7 @@ As a rule of thumb; cpu_affinity 1 favors repetitive short running tasks, while

.. note::

The `cpu_affinity` setting requires the optional :ref:`psutil <psutil>` module.
The ``cpu_affinity`` setting requires the optional :ref:`psutil <psutil>` module.

Requirements
------------
Expand All @@ -185,7 +185,7 @@ Django Q is tested for Python 2.7 and 3.4
- `Django <https://www.djangoproject.com>`__

Django Q aims to use as much of Django's standard offerings as possible
The code is tested against Django version `1.7.8` and `1.8.2`.
The code is tested against Django version `1.7.9` and `1.8.3`.

- `Django-picklefield <https://github.com/gintas/django-picklefield>`__

Expand All @@ -207,6 +207,7 @@ Django Q is tested for Python 2.7 and 3.4

Django Q uses Redis as a centralized hub between your Django instances and your Q clusters.


Optional
~~~~~~~~
.. _psutil:
Expand Down
8 changes: 8 additions & 0 deletions docs/schedules.rst
Expand Up @@ -25,6 +25,13 @@ You can manage them through the :ref:`admin_page` or directly from your code wit
schedule_type=Schedule.DAILY
)
# In case you want to use async options
schedule('math.sqrt',
9,
hook='hooks.print_result',
q_options={'timeout': 30},
schedule_type=Schedule.HOURLY)
Management Commands
-------------------
Expand Down Expand Up @@ -59,6 +66,7 @@ Reference
:param str schedule_type: (O)nce, (H)ourly, (D)aily, (W)eekly, (M)onthly, (Q)uarterly, (Y)early or :attr:`Schedule.TYPE`
:param int repeats: Number of times to repeat schedule. -1=Always, 0=Never, n =n.
:param datetime next_run: Next or first scheduled execution datetime.
:param dict q_options: async options to use for this schedule
:param kwargs: optional keyword arguments for the scheduled function.

.. class:: Schedule
Expand Down

0 comments on commit 9e33174

Please sign in to comment.