diff --git a/django_q/__init__.py b/django_q/__init__.py index 5a594273..47c95de2 100644 --- a/django_q/__init__.py +++ b/django_q/__init__.py @@ -4,11 +4,11 @@ myPath = os.path.dirname(os.path.abspath(__file__)) sys.path.insert(0, myPath) -from .tasks import async, schedule, result, result_group, fetch, fetch_group, count_group, delete_group +from .tasks import async, schedule, result, result_group, fetch, fetch_group, count_group, delete_group, queue_size from .models import Task, Schedule, Success, Failure from .cluster import Cluster from .monitor import Stat -VERSION = (0, 5, 1) +VERSION = (0, 5, 2) default_app_config = 'django_q.apps.DjangoQConfig' diff --git a/django_q/conf.py b/django_q/conf.py index 9c57b566..30460d0e 100644 --- a/django_q/conf.py +++ b/django_q/conf.py @@ -74,6 +74,9 @@ class Conf(object): # Sets the number of processors for each worker, defaults to all. CPU_AFFINITY = conf.get('cpu_affinity', 0) + # Global sync option to for debugging + SYNC = conf.get('sync', False) + # Use the secret key for package signing # Django itself should raise an error if it's not configured SECRET_KEY = settings.SECRET_KEY diff --git a/django_q/tasks.py b/django_q/tasks.py index ea03ffa4..b9c73603 100644 --- a/django_q/tasks.py +++ b/django_q/tasks.py @@ -39,7 +39,7 @@ def async(func, *args, **kwargs): task['save'] = save # sign it pack = signing.SignedPackage.dumps(task) - if sync: + if sync or Conf.SYNC: return _sync(pack) # push it redis.rpush(list_key, pack) @@ -152,6 +152,19 @@ def delete_group(group_id, tasks=False): return Task.delete_group(group_id, tasks) +def queue_size(list_key=Conf.Q_LIST, r=redis_client): + """ + Returns the current queue size. + Note that this doesn't count any tasks currently being processed by workers. + + :param list_key: optional redis key + :param r: optional redis connection + :return: current queue size + :rtype: int + """ + return r.llen(list_key) + + def _sync(pack): """Simulate a package travelling through the cluster.""" task_queue = Queue() diff --git a/django_q/tests/test_cluster.py b/django_q/tests/test_cluster.py index 684bb863..a5c11547 100644 --- a/django_q/tests/test_cluster.py +++ b/django_q/tests/test_cluster.py @@ -11,7 +11,7 @@ from django_q.cluster import Cluster, Sentinel, pusher, worker, monitor from django_q.humanhash import DEFAULT_WORDLIST -from django_q.tasks import fetch, fetch_group, async, result, result_group, count_group, delete_group +from django_q.tasks import fetch, fetch_group, async, result, result_group, count_group, delete_group, queue_size from django_q.models import Task, Success from django_q.conf import Conf, redis_client from django_q.monitor import Stat @@ -77,7 +77,7 @@ def test_cluster(r): list_key = 'cluster_test:q' r.delete(list_key) task = async('django_q.tests.tasks.count_letters', DEFAULT_WORDLIST, list_key=list_key) - assert r.llen(list_key) == 1 + assert queue_size(list_key=list_key, r=r) == 1 task_queue = Queue() assert task_queue.qsize() == 0 result_queue = Queue() @@ -87,7 +87,7 @@ def test_cluster(r): # Test push pusher(task_queue, event, list_key=list_key, r=r) assert task_queue.qsize() == 1 - assert r.llen(list_key) == 0 + assert queue_size(list_key=list_key, r=r) == 0 # Test work task_queue.put('STOP') worker(task_queue, result_queue, Value('f', -1)) @@ -142,14 +142,14 @@ def test_async(r, admin_user): assert isinstance(k, str) # run the cluster to execute the tasks task_count = 10 - assert r.llen(list_key) == task_count + assert queue_size(list_key=list_key, r=r) == task_count task_queue = Queue() stop_event = Event() stop_event.set() # push the tasks for i in range(task_count): pusher(task_queue, stop_event, list_key=list_key, r=r) - assert r.llen(list_key) == 0 + assert queue_size(list_key=list_key, r=r) == 0 assert task_queue.qsize() == task_count task_queue.put('STOP') # let a worker handle them diff --git a/django_q/tests/test_scheduler.py b/django_q/tests/test_scheduler.py index 074ed848..6a0a2f96 100644 --- a/django_q/tests/test_scheduler.py +++ b/django_q/tests/test_scheduler.py @@ -6,7 +6,7 @@ from django_q.conf import redis_client from django_q.cluster import pusher, worker, monitor, scheduler -from django_q.tasks import Schedule, fetch, schedule as create_schedule +from django_q.tasks import Schedule, fetch, schedule as create_schedule, queue_size @pytest.fixture @@ -34,7 +34,7 @@ def test_scheduler(r): # push it pusher(task_queue, stop_event, list_key=list_key, r=r) assert task_queue.qsize() == 1 - assert r.llen(list_key) == 0 + assert queue_size(list_key=list_key,r=r) == 0 task_queue.put('STOP') # let a worker handle them result_queue = Queue() diff --git a/docs/conf.py b/docs/conf.py index 74a22b82..fd9d4f05 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -72,7 +72,7 @@ # The short X.Y version. version = '0.5' # The full version, including alpha/beta/rc tags. -release = '0.5.1' +release = '0.5.2' # The language for content autogenerated by Sphinx. Refer to documentation # for a list of supported languages. diff --git a/docs/install.rst b/docs/install.rst index 5f2ff5ae..e6b61f6e 100644 --- a/docs/install.rst +++ b/docs/install.rst @@ -1,5 +1,6 @@ Installation ============ +.. py:currentmodule:: django_q - Install the latest version with pip:: @@ -91,6 +92,14 @@ Limits the amount of successful tasks saved to Django. - Defaults to ``250`` - Failures are always saved. +.. _sync: + +sync +~~~~ + +When set to ``True`` this configuration option forces all :func:`async` calls to be run with ``sync=True``. +Effectively making everything synchronous. Useful for testing. Defaults to ``False``. + .. _queue_limit: queue_limit @@ -102,7 +111,7 @@ It can also be used to manage the loss of data in case of a cluster failure. Defaults to ``None``, meaning no limit. label -~~~~~ +~~~~~Dev The label used for the Django Admin page. Defaults to ``'Django Q'`` diff --git a/docs/tasks.rst b/docs/tasks.rst index 1125e952..10c5a65d 100644 --- a/docs/tasks.rst +++ b/docs/tasks.rst @@ -54,6 +54,7 @@ Overrides the cluster's timeout setting for this task. sync """" Simulates a task execution synchronously. Useful for testing. +Can also be forced globally via the :ref:`sync` configuration option. redis """"" @@ -213,6 +214,14 @@ Reference Renamed from get_task +.. py:function:: queue_size() + + Returns the size of the broker queue. + Note that this does not count tasks currently being processed. + + :returns: The amount of task packages in the broker + :rtype: int + .. py:function:: result_group(group_id, failures=False) Returns the results of a task group diff --git a/setup.py b/setup.py index 94fb0206..2f3cdfdc 100644 --- a/setup.py +++ b/setup.py @@ -26,7 +26,7 @@ def run(self): setup( name='django-q', - version='0.5.1', + version='0.5.2', author='Ilan Steemers', author_email='koed00@gmail.com', keywords='django task queue worker redis multiprocessing',