From 3d12c1d36e01d7fe4613521cc87d7edcf527df9f Mon Sep 17 00:00:00 2001 From: Ilan Date: Thu, 13 Aug 2015 15:54:57 +0200 Subject: [PATCH 1/4] adds queue_size() returns the size of the current redis queue --- django_q/__init__.py | 2 +- django_q/tasks.py | 13 +++++++++++++ django_q/tests/test_cluster.py | 10 +++++----- django_q/tests/test_scheduler.py | 4 ++-- docs/tasks.rst | 8 ++++++++ 5 files changed, 29 insertions(+), 8 deletions(-) diff --git a/django_q/__init__.py b/django_q/__init__.py index 5a594273..a355c83f 100644 --- a/django_q/__init__.py +++ b/django_q/__init__.py @@ -4,7 +4,7 @@ myPath = os.path.dirname(os.path.abspath(__file__)) sys.path.insert(0, myPath) -from .tasks import async, schedule, result, result_group, fetch, fetch_group, count_group, delete_group +from .tasks import async, schedule, result, result_group, fetch, fetch_group, count_group, delete_group, queue_size from .models import Task, Schedule, Success, Failure from .cluster import Cluster from .monitor import Stat diff --git a/django_q/tasks.py b/django_q/tasks.py index ea03ffa4..ebbc318e 100644 --- a/django_q/tasks.py +++ b/django_q/tasks.py @@ -152,6 +152,19 @@ def delete_group(group_id, tasks=False): return Task.delete_group(group_id, tasks) +def queue_size(list_key=Conf.Q_LIST, r=redis_client): + """ + Returns the current queue size. + Note that this doesn't count any tasks currently being processed by workers. + + :param list_key: optional redis key + :param r: optional redis connection + :return: current queue size + :rtype: int + """ + return r.llen(list_key) + + def _sync(pack): """Simulate a package travelling through the cluster.""" task_queue = Queue() diff --git a/django_q/tests/test_cluster.py b/django_q/tests/test_cluster.py index 684bb863..a5c11547 100644 --- a/django_q/tests/test_cluster.py +++ b/django_q/tests/test_cluster.py @@ -11,7 +11,7 @@ from django_q.cluster import Cluster, Sentinel, pusher, worker, monitor from django_q.humanhash import DEFAULT_WORDLIST -from django_q.tasks import fetch, fetch_group, async, result, result_group, count_group, delete_group +from django_q.tasks import fetch, fetch_group, async, result, result_group, count_group, delete_group, queue_size from django_q.models import Task, Success from django_q.conf import Conf, redis_client from django_q.monitor import Stat @@ -77,7 +77,7 @@ def test_cluster(r): list_key = 'cluster_test:q' r.delete(list_key) task = async('django_q.tests.tasks.count_letters', DEFAULT_WORDLIST, list_key=list_key) - assert r.llen(list_key) == 1 + assert queue_size(list_key=list_key, r=r) == 1 task_queue = Queue() assert task_queue.qsize() == 0 result_queue = Queue() @@ -87,7 +87,7 @@ def test_cluster(r): # Test push pusher(task_queue, event, list_key=list_key, r=r) assert task_queue.qsize() == 1 - assert r.llen(list_key) == 0 + assert queue_size(list_key=list_key, r=r) == 0 # Test work task_queue.put('STOP') worker(task_queue, result_queue, Value('f', -1)) @@ -142,14 +142,14 @@ def test_async(r, admin_user): assert isinstance(k, str) # run the cluster to execute the tasks task_count = 10 - assert r.llen(list_key) == task_count + assert queue_size(list_key=list_key, r=r) == task_count task_queue = Queue() stop_event = Event() stop_event.set() # push the tasks for i in range(task_count): pusher(task_queue, stop_event, list_key=list_key, r=r) - assert r.llen(list_key) == 0 + assert queue_size(list_key=list_key, r=r) == 0 assert task_queue.qsize() == task_count task_queue.put('STOP') # let a worker handle them diff --git a/django_q/tests/test_scheduler.py b/django_q/tests/test_scheduler.py index 074ed848..6a0a2f96 100644 --- a/django_q/tests/test_scheduler.py +++ b/django_q/tests/test_scheduler.py @@ -6,7 +6,7 @@ from django_q.conf import redis_client from django_q.cluster import pusher, worker, monitor, scheduler -from django_q.tasks import Schedule, fetch, schedule as create_schedule +from django_q.tasks import Schedule, fetch, schedule as create_schedule, queue_size @pytest.fixture @@ -34,7 +34,7 @@ def test_scheduler(r): # push it pusher(task_queue, stop_event, list_key=list_key, r=r) assert task_queue.qsize() == 1 - assert r.llen(list_key) == 0 + assert queue_size(list_key=list_key,r=r) == 0 task_queue.put('STOP') # let a worker handle them result_queue = Queue() diff --git a/docs/tasks.rst b/docs/tasks.rst index 1125e952..8e7c9694 100644 --- a/docs/tasks.rst +++ b/docs/tasks.rst @@ -213,6 +213,14 @@ Reference Renamed from get_task +.. py:function:: queue_size() + + Returns the size of the broker queue. + Note that this does not count tasks currently being processed. + + :returns: The amount of task packages in the broker + :rtype: int + .. py:function:: result_group(group_id, failures=False) Returns the results of a task group From 6c37b09151299f46c444ee5eec3cdf6b3a4d14b3 Mon Sep 17 00:00:00 2001 From: Ilan Date: Thu, 13 Aug 2015 18:02:38 +0200 Subject: [PATCH 2/4] adds global `sync` configuration option Setting the `sync` configuration option will force all async() calls to be run with `sync=True`. Useful for testing. --- django_q/conf.py | 3 +++ django_q/tasks.py | 2 +- docs/install.rst | 7 +++++++ 3 files changed, 11 insertions(+), 1 deletion(-) diff --git a/django_q/conf.py b/django_q/conf.py index 9c57b566..30460d0e 100644 --- a/django_q/conf.py +++ b/django_q/conf.py @@ -74,6 +74,9 @@ class Conf(object): # Sets the number of processors for each worker, defaults to all. CPU_AFFINITY = conf.get('cpu_affinity', 0) + # Global sync option to for debugging + SYNC = conf.get('sync', False) + # Use the secret key for package signing # Django itself should raise an error if it's not configured SECRET_KEY = settings.SECRET_KEY diff --git a/django_q/tasks.py b/django_q/tasks.py index ebbc318e..b9c73603 100644 --- a/django_q/tasks.py +++ b/django_q/tasks.py @@ -39,7 +39,7 @@ def async(func, *args, **kwargs): task['save'] = save # sign it pack = signing.SignedPackage.dumps(task) - if sync: + if sync or Conf.SYNC: return _sync(pack) # push it redis.rpush(list_key, pack) diff --git a/docs/install.rst b/docs/install.rst index 5f2ff5ae..87c337d0 100644 --- a/docs/install.rst +++ b/docs/install.rst @@ -1,5 +1,6 @@ Installation ============ +.. py:currentmodule:: django_q - Install the latest version with pip:: @@ -93,6 +94,12 @@ Limits the amount of successful tasks saved to Django. .. _queue_limit: +sync +~~~~ + +When set to ``True`` this configuration option forces all :func:`async` calls to be run with ``sync=True``. +Effectively making everything synchronous. Useful for testing. Defaults to ``False``. + queue_limit ~~~~~~~~~~~ From 265ae9abd952d364e152f78297e75670b5bbeb83 Mon Sep 17 00:00:00 2001 From: Ilan Date: Thu, 13 Aug 2015 18:13:45 +0200 Subject: [PATCH 3/4] docs: added sync configuration reference --- docs/install.rst | 6 ++++-- docs/tasks.rst | 1 + 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/docs/install.rst b/docs/install.rst index 87c337d0..e6b61f6e 100644 --- a/docs/install.rst +++ b/docs/install.rst @@ -92,7 +92,7 @@ Limits the amount of successful tasks saved to Django. - Defaults to ``250`` - Failures are always saved. -.. _queue_limit: +.. _sync: sync ~~~~ @@ -100,6 +100,8 @@ sync When set to ``True`` this configuration option forces all :func:`async` calls to be run with ``sync=True``. Effectively making everything synchronous. Useful for testing. Defaults to ``False``. +.. _queue_limit: + queue_limit ~~~~~~~~~~~ @@ -109,7 +111,7 @@ It can also be used to manage the loss of data in case of a cluster failure. Defaults to ``None``, meaning no limit. label -~~~~~ +~~~~~Dev The label used for the Django Admin page. Defaults to ``'Django Q'`` diff --git a/docs/tasks.rst b/docs/tasks.rst index 8e7c9694..10c5a65d 100644 --- a/docs/tasks.rst +++ b/docs/tasks.rst @@ -54,6 +54,7 @@ Overrides the cluster's timeout setting for this task. sync """" Simulates a task execution synchronously. Useful for testing. +Can also be forced globally via the :ref:`sync` configuration option. redis """"" From 12e9e2035e7c0775e2b73e0d2f6518ff090898e7 Mon Sep 17 00:00:00 2001 From: Ilan Date: Thu, 13 Aug 2015 19:49:36 +0200 Subject: [PATCH 4/4] Version 0.5.2 --- django_q/__init__.py | 2 +- docs/conf.py | 2 +- setup.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/django_q/__init__.py b/django_q/__init__.py index a355c83f..47c95de2 100644 --- a/django_q/__init__.py +++ b/django_q/__init__.py @@ -9,6 +9,6 @@ from .cluster import Cluster from .monitor import Stat -VERSION = (0, 5, 1) +VERSION = (0, 5, 2) default_app_config = 'django_q.apps.DjangoQConfig' diff --git a/docs/conf.py b/docs/conf.py index 74a22b82..fd9d4f05 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -72,7 +72,7 @@ # The short X.Y version. version = '0.5' # The full version, including alpha/beta/rc tags. -release = '0.5.1' +release = '0.5.2' # The language for content autogenerated by Sphinx. Refer to documentation # for a list of supported languages. diff --git a/setup.py b/setup.py index 94fb0206..2f3cdfdc 100644 --- a/setup.py +++ b/setup.py @@ -26,7 +26,7 @@ def run(self): setup( name='django-q', - version='0.5.1', + version='0.5.2', author='Ilan Steemers', author_email='koed00@gmail.com', keywords='django task queue worker redis multiprocessing',