diff --git a/README.rst b/README.rst index 0909c8dc..af1cfa55 100644 --- a/README.rst +++ b/README.rst @@ -20,7 +20,7 @@ Features - Django Admin integration - PaaS compatible with multiple instances - Multi cluster monitor -- Redis broker and Disque broker +- Redis, Disque or IronMQ broker - Python 2 and 3 Requirements @@ -37,8 +37,7 @@ Brokers ~~~~~~~ - `Redis `__ - `Disque `__ -- `Amazon SQS `__ (TBA) -- `IronMQ `__ (TBA) +- `IronMQ `__ Installation diff --git a/django_q/__init__.py b/django_q/__init__.py index 4a9d8ad2..ecb80d9f 100644 --- a/django_q/__init__.py +++ b/django_q/__init__.py @@ -9,6 +9,6 @@ from .cluster import Cluster from .status import Stat -VERSION = (0, 6, 2) +VERSION = (0, 6, 3) default_app_config = 'django_q.apps.DjangoQConfig' diff --git a/django_q/brokers/__init__.py b/django_q/brokers/__init__.py index c247bb1e..626717c5 100644 --- a/django_q/brokers/__init__.py +++ b/django_q/brokers/__init__.py @@ -7,6 +7,7 @@ def __init__(self, list_key=Conf.PREFIX): self.connection = self.get_connection(list_key) self.list_key = list_key self.cache = self.get_cache() + self.task_cache = [] def enqueue(self, task): """ @@ -153,6 +154,9 @@ def get_broker(list_key=Conf.PREFIX): if Conf.DISQUE_NODES: from brokers import disque return disque.Disque(list_key=list_key) + elif Conf.IRON_MQ: + from brokers import ironmq + return ironmq.IronMQBroker(list_key=list_key) # default to redis else: from brokers import redis_broker diff --git a/django_q/brokers/disque.py b/django_q/brokers/disque.py index 7e43ccd3..bd9c20b3 100644 --- a/django_q/brokers/disque.py +++ b/django_q/brokers/disque.py @@ -11,9 +11,18 @@ def enqueue(self, task): 'ADDJOB {} {} 500 RETRY {}'.format(self.list_key, task, retry)).decode() def dequeue(self): - task = self.connection.execute_command('GETJOB TIMEOUT 1000 FROM {}'.format(self.list_key)) - if task: - return task[0][1].decode(), task[0][2].decode() + t = None + if len(self.task_cache) > 0: + t = self.task_cache.pop() + else: + tasks = self.connection.execute_command( + 'GETJOB COUNT {} TIMEOUT 1000 FROM {}'.format(Conf.BULK, self.list_key)) + if tasks: + t = tasks.pop() + if tasks: + self.task_cache = tasks + if t: + return t[1].decode(), t[2].decode() def queue_size(self): return self.connection.execute_command('QLEN {}'.format(self.list_key)) diff --git a/django_q/brokers/ironmq.py b/django_q/brokers/ironmq.py new file mode 100644 index 00000000..fd47667e --- /dev/null +++ b/django_q/brokers/ironmq.py @@ -0,0 +1,51 @@ +from django_q.conf import Conf +from django_q.brokers import Broker +from iron_mq import IronMQ + + +class IronMQBroker(Broker): + def enqueue(self, task): + return self.connection.post(task)['ids'][0] + + def dequeue(self): + t = None + if len(self.task_cache) > 0: + t = self.task_cache.pop() + else: + timeout = Conf.RETRY or None + tasks = self.connection.get(timeout=timeout, wait=1, max=Conf.BULK)['messages'] + if tasks: + t = tasks.pop() + if tasks: + self.task_cache = tasks + if t: + return t['id'], t['body'] + + def ping(self): + return self.connection.name == self.list_key + + def info(self): + return 'IronMQ' + + def queue_size(self): + return self.connection.size() + + def delete_queue(self): + return self.connection.delete_queue()['msg'] + + def purge_queue(self): + return self.connection.clear() + + def delete(self, task_id): + return self.connection.delete(task_id)['msg'] + + def fail(self, task_id): + self.delete(task_id) + + def acknowledge(self, task_id): + return self.delete(task_id) + + @staticmethod + def get_connection(list_key=Conf.PREFIX): + ironmq = IronMQ(name=None, **Conf.IRON_MQ) + return ironmq.queue(queue_name=list_key) diff --git a/django_q/conf.py b/django_q/conf.py index ebd28422..94638064 100644 --- a/django_q/conf.py +++ b/django_q/conf.py @@ -37,6 +37,9 @@ class Conf(object): # Optional Authentication DISQUE_AUTH = conf.get('disque_auth', None) + # IronMQ broker + IRON_MQ = conf.get('iron_mq', None) + # Name of the cluster or site. For when you run multiple sites on one redis server PREFIX = conf.get('name', 'default') @@ -62,7 +65,7 @@ class Conf(object): WORKERS = 4 # Maximum number of tasks that each cluster can work on - QUEUE_LIMIT = conf.get('queue_limit', int(WORKERS)**2) + QUEUE_LIMIT = conf.get('queue_limit', int(WORKERS) ** 2) # Sets compression of redis packages COMPRESSED = conf.get('compress', False) @@ -77,6 +80,10 @@ class Conf(object): # Only works with brokers that guarantee delivery. Defaults to 60 seconds. RETRY = conf.get('retry', 60) + # Sets the amount of tasks the cluster will try to pop off the broker. + # If it supports bulk gets. + BULK = conf.get('bulk', 1) + # The Django Admin label for this app LABEL = conf.get('label', 'Django Q') diff --git a/django_q/tests/test_brokers.py b/django_q/tests/test_brokers.py index 836e59fd..3086613d 100644 --- a/django_q/tests/test_brokers.py +++ b/django_q/tests/test_brokers.py @@ -4,6 +4,7 @@ import redis from django_q.conf import Conf from django_q.brokers import get_broker, Broker +from django_q.humanhash import uuid def test_broker(): @@ -72,10 +73,18 @@ def test_disque(): # delete job task_id = broker.enqueue('test') broker.delete(task_id) - assert broker.queue_size() == 0 + assert broker.dequeue() is None # fail - task_id=broker.enqueue('test') + task_id = broker.enqueue('test') broker.fail(task_id) + # bulk test + for i in range(5): + broker.enqueue('test') + Conf.BULK = 5 + for i in range(5): + task = broker.dequeue() + assert task is not None + broker.acknowledge(task[0]) # delete queue broker.enqueue('test') broker.enqueue('test') @@ -83,3 +92,59 @@ def test_disque(): assert broker.queue_size() == 0 # back to django-redis Conf.DISQUE_NODES = None + + +@pytest.mark.skipif(not os.getenv('IRON_MQ_TOKEN'), + reason="requires IronMQ credentials") +def test_ironmq(): + Conf.IRON_MQ = {'token': os.getenv('IRON_MQ_TOKEN'), + 'project_id': os.getenv('IRON_MQ_PROJECT_ID')} + # check broker + broker = get_broker(list_key=uuid()[0]) + assert broker.ping() is True + assert broker.info() is not None + # initialize the queue + broker.enqueue('test') + # clear before we start + broker.purge_queue() + assert broker.queue_size() == 0 + # enqueue + broker.enqueue('test') + # dequeue + task = broker.dequeue() + assert task[1] == 'test' + broker.acknowledge(task[0]) + assert broker.dequeue() is None + # Retry test + Conf.RETRY = 1 + broker.enqueue('test') + assert broker.dequeue() is not None + sleep(1.5) + task = broker.dequeue() + assert len(task) > 0 + broker.acknowledge(task[0]) + sleep(1.5) + # delete job + task_id = broker.enqueue('test') + broker.delete(task_id) + assert broker.dequeue() is None + # fail + task_id = broker.enqueue('test') + broker.fail(task_id) + # bulk test + for i in range(5): + broker.enqueue('test') + Conf.BULK = 5 + for i in range(5): + task = broker.dequeue() + assert task is not None + broker.acknowledge(task[0]) + # delete queue + broker.enqueue('test') + broker.enqueue('test') + broker.purge_queue() + assert broker.dequeue() is None + broker.delete_queue() + # back to django-redis + Conf.IRON_MQ = None + Conf.DJANGO_REDIS = 'default' diff --git a/django_q/tests/test_cluster.py b/django_q/tests/test_cluster.py index 5df7bcbe..3aa09c64 100644 --- a/django_q/tests/test_cluster.py +++ b/django_q/tests/test_cluster.py @@ -18,7 +18,6 @@ from django_q.brokers import get_broker from .tasks import multiply - class WordClass(object): def __init__(self): self.word_list = DEFAULT_WORDLIST @@ -29,6 +28,9 @@ def get_words(self): @pytest.fixture def broker(): + Conf.DISQUE_NODES = None + Conf.IRON_MQ = None + Conf.DJANGO_REDIS = 'default' return get_broker() diff --git a/docs/brokers.rst b/docs/brokers.rst index c285e15c..a062e069 100644 --- a/docs/brokers.rst +++ b/docs/brokers.rst @@ -2,9 +2,8 @@ Brokers ======= The broker sits between your Django instances and your Django Q cluster instances, accepting and delivering task packages. -Currently we only support `Redis `__ and `Disque `__, but support for other brokers is being worked on. - -Clients for `Amazon SQS `__ and `IronMQ `__ are TBA. +Currently we support `Redis `__ , `Disque `__ and `IronMQ `__. +Support for more brokers is being worked on. Redis @@ -12,9 +11,9 @@ Redis The default broker for Django Q clusters. * Atomic -* Does not need separate cache framework for monitoring -* Does not support receipts * Requires `Redis-py `__ client library: ``pip install redis`` +* Does not need cache framework for monitoring +* Does not support receipts * Can use existing :ref:`django_redis` connections. * Configure with :ref:`redis_configuration`-py compatible configuration @@ -28,9 +27,20 @@ You can control the amount of time Disque should wait for completion of a task b * Needs Django's `Cache framework `__ configured for monitoring * Compatible with `Tynd `__ Disque addon on `Heroku `__ * Still considered Alpha software +* Supports bulk dequeue * Requires `Redis-py `__ client library: ``pip install redis`` * See the :ref:`disque_configuration` configuration section for more info. +IronMQ +------ +This HTTP based queue service is both available directly via `Iron.io `__ and as an add-on on Heroku. + +* Delivery receipts +* Supports bulk dequeue +* Needs Django's `Cache framework `__ configured for monitoring +* Requires the `iron-mq `__ client library: ``pip install iron-mq`` +* See the :ref:`ironmq_configuration` configuration section for options. + Reference --------- The :class:`Broker` class is used internally to communicate with the different types of brokers. diff --git a/docs/conf.py b/docs/conf.py index bc9e9b65..191eb06b 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -72,7 +72,7 @@ # The short X.Y version. version = '0.6' # The full version, including alpha/beta/rc tags. -release = '0.6.2' +release = '0.6.3' # The language for content autogenerated by Sphinx. Refer to documentation # for a list of supported languages. diff --git a/docs/configure.rst b/docs/configure.rst index 423af597..7e15784b 100644 --- a/docs/configure.rst +++ b/docs/configure.rst @@ -136,7 +136,7 @@ django_redis ~~~~~~~~~~~~ If you are already using `django-redis `__ for your caching, you can take advantage of its excellent connection backend by supplying the name -of the cache connection you want to use:: +of the cache connection you want to use instead of a direct Redis connection:: # example django-redis connection Q_CLUSTER = { @@ -167,9 +167,9 @@ If you want to use Disque as your broker, set this to a list of available Disque } -Django Q is also compatible with the `Tynd `__ addon on `Heroku `__:: +Django Q is also compatible with the `Tynd Disque `__ addon on `Heroku `__:: - # example Tynd connection + # example Tynd Disque connection import os Q_CLUSTER = { @@ -177,6 +177,7 @@ Django Q is also compatible with the `Tynd `__ addon o 'workers': 8, 'timeout': 30, 'retry': 60, + 'bulk': 10, 'disque_nodes': os.environ['TYND_DISQUE_NODES'].split(','), 'disque_auth': os.environ['TYND_DISQUE_AUTH'] } @@ -187,6 +188,40 @@ disque_auth Optional Disque password for servers that require authentication. +.. _ironmq_configuration: + +iron_mq +~~~~~~~ +Connection settings for IronMQ:: + + # example IronMQ connection + + Q_CLUSTER = { + 'name': 'IronBroker', + 'workers': 8, + 'timeout': 30, + 'retry': 60, + 'queue_limit': 50, + 'bulk': 10, + 'iron_mq': { + 'host': 'mq-aws-us-east-1.iron.io', + 'token': 'Et1En7.....0LuW39Q', + 'project_id': '500f7b....b0f302e9' + } + } + + +All connection keywords are supported. See the `iron-mq `__ library for more info + +bulk +~~~~ +Sets the number of messages each cluster tries to get from the broker per call. Setting this on supported brokers can improve performance. +Especially HTTP based or very high latency servers can benefit from bulk dequeue. +Keep in mind however that settings this too high can degrade performance with multiple clusters or very large task packages. + +Not supported by the default Redis broker. +Defaults to 1. + cpu_affinity ~~~~~~~~~~~~ diff --git a/docs/index.rst b/docs/index.rst index 5d38a220..fb698c74 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -20,7 +20,7 @@ Features - Django Admin integration - PaaS compatible with multiple instances - Multi cluster monitor -- Redis or Disque broker +- Redis, Disque or IronMQ broker - Python 2 and 3 diff --git a/requirements.in b/requirements.in index 1ab7ca1f..c3021434 100644 --- a/requirements.in +++ b/requirements.in @@ -6,3 +6,4 @@ hiredis redis psutil django-redis +iron-mq diff --git a/requirements.txt b/requirements.txt index 9fce9599..48590742 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,9 +10,12 @@ django-picklefield==0.3.2 django-redis==4.2.0 future==0.15.0 hiredis==0.2.0 +iron-core==1.1.9 # via iron-mq +iron-mq==0.7 msgpack-python==0.4.6 # via django-redis psutil==3.2.1 -python-dateutil==2.4.2 # via arrow +python-dateutil==2.4.2 # via arrow, iron-core redis==2.10.3 +requests==2.7.0 # via iron-core six==1.9.0 # via python-dateutil wcwidth==0.1.4 # via blessed diff --git a/setup.py b/setup.py index d9ab8b07..ec25e3b6 100644 --- a/setup.py +++ b/setup.py @@ -26,7 +26,7 @@ def run(self): setup( name='django-q', - version='0.6.2', + version='0.6.3', author='Ilan Steemers', author_email='koed00@gmail.com', keywords='django task queue worker redis disque multiprocessing',