Skip to content

Commit

Permalink
Merge pull request #46 from Koed00/dev
Browse files Browse the repository at this point in the history
Adds global `sync` configuration option
  • Loading branch information
Koed00 committed Aug 13, 2015
2 parents 1017757 + 12e9e20 commit 0676574
Show file tree
Hide file tree
Showing 9 changed files with 47 additions and 13 deletions.
4 changes: 2 additions & 2 deletions django_q/__init__.py
Expand Up @@ -4,11 +4,11 @@
myPath = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, myPath)

from .tasks import async, schedule, result, result_group, fetch, fetch_group, count_group, delete_group
from .tasks import async, schedule, result, result_group, fetch, fetch_group, count_group, delete_group, queue_size
from .models import Task, Schedule, Success, Failure
from .cluster import Cluster
from .monitor import Stat

VERSION = (0, 5, 1)
VERSION = (0, 5, 2)

default_app_config = 'django_q.apps.DjangoQConfig'
3 changes: 3 additions & 0 deletions django_q/conf.py
Expand Up @@ -74,6 +74,9 @@ class Conf(object):
# Sets the number of processors for each worker, defaults to all.
CPU_AFFINITY = conf.get('cpu_affinity', 0)

# Global sync option to for debugging
SYNC = conf.get('sync', False)

# Use the secret key for package signing
# Django itself should raise an error if it's not configured
SECRET_KEY = settings.SECRET_KEY
Expand Down
15 changes: 14 additions & 1 deletion django_q/tasks.py
Expand Up @@ -39,7 +39,7 @@ def async(func, *args, **kwargs):
task['save'] = save
# sign it
pack = signing.SignedPackage.dumps(task)
if sync:
if sync or Conf.SYNC:
return _sync(pack)
# push it
redis.rpush(list_key, pack)
Expand Down Expand Up @@ -152,6 +152,19 @@ def delete_group(group_id, tasks=False):
return Task.delete_group(group_id, tasks)


def queue_size(list_key=Conf.Q_LIST, r=redis_client):
"""
Returns the current queue size.
Note that this doesn't count any tasks currently being processed by workers.
:param list_key: optional redis key
:param r: optional redis connection
:return: current queue size
:rtype: int
"""
return r.llen(list_key)


def _sync(pack):
"""Simulate a package travelling through the cluster."""
task_queue = Queue()
Expand Down
10 changes: 5 additions & 5 deletions django_q/tests/test_cluster.py
Expand Up @@ -11,7 +11,7 @@

from django_q.cluster import Cluster, Sentinel, pusher, worker, monitor
from django_q.humanhash import DEFAULT_WORDLIST
from django_q.tasks import fetch, fetch_group, async, result, result_group, count_group, delete_group
from django_q.tasks import fetch, fetch_group, async, result, result_group, count_group, delete_group, queue_size
from django_q.models import Task, Success
from django_q.conf import Conf, redis_client
from django_q.monitor import Stat
Expand Down Expand Up @@ -77,7 +77,7 @@ def test_cluster(r):
list_key = 'cluster_test:q'
r.delete(list_key)
task = async('django_q.tests.tasks.count_letters', DEFAULT_WORDLIST, list_key=list_key)
assert r.llen(list_key) == 1
assert queue_size(list_key=list_key, r=r) == 1
task_queue = Queue()
assert task_queue.qsize() == 0
result_queue = Queue()
Expand All @@ -87,7 +87,7 @@ def test_cluster(r):
# Test push
pusher(task_queue, event, list_key=list_key, r=r)
assert task_queue.qsize() == 1
assert r.llen(list_key) == 0
assert queue_size(list_key=list_key, r=r) == 0
# Test work
task_queue.put('STOP')
worker(task_queue, result_queue, Value('f', -1))
Expand Down Expand Up @@ -142,14 +142,14 @@ def test_async(r, admin_user):
assert isinstance(k, str)
# run the cluster to execute the tasks
task_count = 10
assert r.llen(list_key) == task_count
assert queue_size(list_key=list_key, r=r) == task_count
task_queue = Queue()
stop_event = Event()
stop_event.set()
# push the tasks
for i in range(task_count):
pusher(task_queue, stop_event, list_key=list_key, r=r)
assert r.llen(list_key) == 0
assert queue_size(list_key=list_key, r=r) == 0
assert task_queue.qsize() == task_count
task_queue.put('STOP')
# let a worker handle them
Expand Down
4 changes: 2 additions & 2 deletions django_q/tests/test_scheduler.py
Expand Up @@ -6,7 +6,7 @@

from django_q.conf import redis_client
from django_q.cluster import pusher, worker, monitor, scheduler
from django_q.tasks import Schedule, fetch, schedule as create_schedule
from django_q.tasks import Schedule, fetch, schedule as create_schedule, queue_size


@pytest.fixture
Expand Down Expand Up @@ -34,7 +34,7 @@ def test_scheduler(r):
# push it
pusher(task_queue, stop_event, list_key=list_key, r=r)
assert task_queue.qsize() == 1
assert r.llen(list_key) == 0
assert queue_size(list_key=list_key,r=r) == 0
task_queue.put('STOP')
# let a worker handle them
result_queue = Queue()
Expand Down
2 changes: 1 addition & 1 deletion docs/conf.py
Expand Up @@ -72,7 +72,7 @@
# The short X.Y version.
version = '0.5'
# The full version, including alpha/beta/rc tags.
release = '0.5.1'
release = '0.5.2'

# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
Expand Down
11 changes: 10 additions & 1 deletion docs/install.rst
@@ -1,5 +1,6 @@
Installation
============
.. py:currentmodule:: django_q
- Install the latest version with pip::

Expand Down Expand Up @@ -91,6 +92,14 @@ Limits the amount of successful tasks saved to Django.
- Defaults to ``250``
- Failures are always saved.

.. _sync:

sync
~~~~

When set to ``True`` this configuration option forces all :func:`async` calls to be run with ``sync=True``.
Effectively making everything synchronous. Useful for testing. Defaults to ``False``.

.. _queue_limit:

queue_limit
Expand All @@ -102,7 +111,7 @@ It can also be used to manage the loss of data in case of a cluster failure.
Defaults to ``None``, meaning no limit.

label
~~~~~
~~~~~Dev

The label used for the Django Admin page. Defaults to ``'Django Q'``

Expand Down
9 changes: 9 additions & 0 deletions docs/tasks.rst
Expand Up @@ -54,6 +54,7 @@ Overrides the cluster's timeout setting for this task.
sync
""""
Simulates a task execution synchronously. Useful for testing.
Can also be forced globally via the :ref:`sync` configuration option.

redis
"""""
Expand Down Expand Up @@ -213,6 +214,14 @@ Reference

Renamed from get_task

.. py:function:: queue_size()
Returns the size of the broker queue.
Note that this does not count tasks currently being processed.

:returns: The amount of task packages in the broker
:rtype: int

.. py:function:: result_group(group_id, failures=False)
Returns the results of a task group
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Expand Up @@ -26,7 +26,7 @@ def run(self):

setup(
name='django-q',
version='0.5.1',
version='0.5.2',
author='Ilan Steemers',
author_email='koed00@gmail.com',
keywords='django task queue worker redis multiprocessing',
Expand Down

0 comments on commit 0676574

Please sign in to comment.