diff --git a/README.rst b/README.rst index 44679ef4..c4f60cf5 100644 --- a/README.rst +++ b/README.rst @@ -127,7 +127,10 @@ Use `async` from your code to quickly offload tasks: task_result = result(task_id) # result returns None if the task has not been executed yet - # so in most cases you will want to use a hook: + # you can wait for it + task_result = result(task_id, 200) + + # but in most cases you will want to use a hook: async('math.modf', 2.5, hook='hooks.print_result') diff --git a/django_q/brokers/aws_sqs.py b/django_q/brokers/aws_sqs.py index 2334c201..111e631f 100644 --- a/django_q/brokers/aws_sqs.py +++ b/django_q/brokers/aws_sqs.py @@ -27,6 +27,9 @@ def acknowledge(self, task_id): def queue_size(self): return int(self.queue.attributes['ApproximateNumberOfMessages']) + def lock_size(self): + return int(self.queue.attributes['ApproximateNumberOfMessagesNotVisible']) + def delete(self, task_id): message = self.sqs.Message(self.queue.url, task_id) message.delete() diff --git a/django_q/monitor.py b/django_q/monitor.py index 96c83086..3671e83a 100644 --- a/django_q/monitor.py +++ b/django_q/monitor.py @@ -83,7 +83,7 @@ def monitor(run_once=False, broker=None): # bottom bar i += 1 queue_size = broker.queue_size() - if Conf.ORM: + if hasattr(broker, 'lock_size'): queue_size = '{}({})'.format(queue_size, broker.lock_size()) print(term.move(i, 0) + term.white_on_cyan(term.center(broker.info(), width=col_width * 2))) print(term.move(i, 2 * col_width) + term.black_on_cyan(term.center(_('Queued'), width=col_width))) @@ -183,4 +183,3 @@ def info(broker=None): term.white('{0:.4f}'.format(exec_time)) ) return True - diff --git a/django_q/tasks.py b/django_q/tasks.py index 17a89527..62da8f1b 100644 --- a/django_q/tasks.py +++ b/django_q/tasks.py @@ -5,6 +5,7 @@ from django.utils import timezone # local +import time import signing import cluster from django_q.conf import Conf, logger @@ -82,16 +83,25 @@ def schedule(func, *args, **kwargs): ) -def result(task_id): +def result(task_id, wait=0): """ Return the result of the named task. :type task_id: str or uuid :param task_id: the task name or uuid + :type wait: int + :param wait: number of milliseconds to wait for a result :return: the result object of this task :rtype: object """ - return Task.get_result(task_id) + start = time.time() + while True: + r = Task.get_result(task_id) + if r: + return r + if (time.time() - start) * 1000 >= wait: + break + time.sleep(0.01) def result_group(group_id, failures=False): @@ -105,16 +115,25 @@ def result_group(group_id, failures=False): return Task.get_result_group(group_id, failures) -def fetch(task_id): +def fetch(task_id, wait=0): """ Return the processed task. :param task_id: the task name or uuid :type task_id: str or uuid + :param wait: the number of milliseconds to wait for a result + :type wait: int :return: the full task object :rtype: Task """ - return Task.get_task(task_id) + start = time.time() + while True: + t = Task.get_task(task_id) + if t: + return t + if (time.time() - start) * 1000 >= wait: + break + time.sleep(0.01) def fetch_group(group_id, failures=True): diff --git a/django_q/tests/test_brokers.py b/django_q/tests/test_brokers.py index 6222e340..150adfdb 100644 --- a/django_q/tests/test_brokers.py +++ b/django_q/tests/test_brokers.py @@ -216,6 +216,7 @@ def test_sqs(): broker.acknowledge(task[0]) # duplicate acknowledge broker.acknowledge(task[0]) + assert broker.lock_size() == 0 # delete queue broker.enqueue('test') broker.purge_queue() diff --git a/django_q/tests/test_cluster.py b/django_q/tests/test_cluster.py index bc21f979..5f9c9279 100644 --- a/django_q/tests/test_cluster.py +++ b/django_q/tests/test_cluster.py @@ -230,6 +230,8 @@ def test_async(broker, admin_user): assert result_j.group_delete(tasks=True) is None # task k should not have been saved assert fetch(k) is None + assert fetch(k, 100) is None + assert result(k, 100) is None broker.delete_queue() diff --git a/docs/brokers.rst b/docs/brokers.rst index e0de1230..429419ba 100644 --- a/docs/brokers.rst +++ b/docs/brokers.rst @@ -129,6 +129,11 @@ You can override this class if you want to contribute and support your own broke Returns the amount of messages in the brokers queue. + .. py:method:: lock_size() + + Optional method that returns the number of messages currently awaiting acknowledgement. + Only implemented on brokers that support it. + .. py:method:: ping() Returns True if the broker can be reached. diff --git a/docs/tasks.rst b/docs/tasks.rst index 178cc776..d70def41 100644 --- a/docs/tasks.rst +++ b/docs/tasks.rst @@ -25,7 +25,10 @@ Use :func:`async` from your code to quickly offload tasks to the :class:`Cluster task_result = result(task_id) # result returns None if the task has not been executed yet - # so in most cases you will want to use a hook: + # you can wait for it + task_result = result(task_id, 200) + + # but in most cases you will want to use a hook: async('math.modf', 2.5, hook='hooks.print_result') @@ -208,19 +211,21 @@ Reference :returns: The uuid of the task :rtype: str -.. py:function:: result(task_id) +.. py:function:: result(task_id, wait=0) Gets the result of a previously executed task :param str task_id: the uuid or name of the task + :param int wait: optional milliseconds to wait for a result :returns: The result of the executed task -.. py:function:: fetch(task_id) +.. py:function:: fetch(task_id, wait=0) Returns a previously executed task :param str name: the uuid or name of the task - :returns: The task if any + :param int wait: optional milliseconds to wait for a result + :returns: A task object :rtype: Task .. versionchanged:: 0.2.0 diff --git a/requirements.txt b/requirements.txt index fc5aaa53..25a73679 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,7 +7,7 @@ arrow==0.6.0 blessed==1.9.5 boto3==1.1.3 -botocore==1.2.2 # via boto3 +botocore==1.2.4 # via boto3 django-picklefield==0.3.2 django-redis==4.2.0 docutils==0.12 # via botocore