From b56cc9da8f69a9d3d8cb2cad0849ceb9954cd59c Mon Sep 17 00:00:00 2001 From: Ilan Steemers Date: Tue, 8 Sep 2015 11:20:31 +0200 Subject: [PATCH 01/12] IronMQ broker --- django_q/__init__.py | 2 +- django_q/brokers/__init__.py | 3 +++ django_q/brokers/ironmq.py | 44 +++++++++++++++++++++++++++++++ django_q/conf.py | 5 +++- django_q/tests/test_brokers.py | 48 +++++++++++++++++++++++++++++++++- docs/conf.py | 2 +- requirements.in | 1 + requirements.txt | 5 +++- setup.py | 2 +- 9 files changed, 106 insertions(+), 6 deletions(-) create mode 100644 django_q/brokers/ironmq.py diff --git a/django_q/__init__.py b/django_q/__init__.py index 4a9d8ad2..ecb80d9f 100644 --- a/django_q/__init__.py +++ b/django_q/__init__.py @@ -9,6 +9,6 @@ from .cluster import Cluster from .status import Stat -VERSION = (0, 6, 2) +VERSION = (0, 6, 3) default_app_config = 'django_q.apps.DjangoQConfig' diff --git a/django_q/brokers/__init__.py b/django_q/brokers/__init__.py index c247bb1e..90c8d4ba 100644 --- a/django_q/brokers/__init__.py +++ b/django_q/brokers/__init__.py @@ -153,6 +153,9 @@ def get_broker(list_key=Conf.PREFIX): if Conf.DISQUE_NODES: from brokers import disque return disque.Disque(list_key=list_key) + elif Conf.IRON_MQ: + from brokers import ironmq + return ironmq.IronMQBroker(list_key=list_key) # default to redis else: from brokers import redis_broker diff --git a/django_q/brokers/ironmq.py b/django_q/brokers/ironmq.py new file mode 100644 index 00000000..6388bf60 --- /dev/null +++ b/django_q/brokers/ironmq.py @@ -0,0 +1,44 @@ +from django_q.conf import Conf +from django_q.brokers import Broker +from iron_mq import IronMQ + + +class IronMQBroker(Broker): + + def enqueue(self, task): + return self.connection.post(task)['ids'][0] + + def dequeue(self): + timeout = Conf.RETRY or None + task = self.connection.get(timeout=timeout, wait=1)['messages'] + if task: + return task[0]['id'], task[0]['body'] + + def ping(self): + return self.connection.name == self.list_key + + def info(self): + return 'IronMQ' + + def queue_size(self): + return self.connection.size() + + def delete_queue(self): + return self.connection.delete_queue()['msg'] + + def purge_queue(self): + return self.connection.clear() + + def delete(self, task_id): + return self.connection.delete(task_id)['msg'] + + def fail(self, task_id): + self.delete(task_id) + + def acknowledge(self, task_id): + return self.delete(task_id) + + @staticmethod + def get_connection(list_key=Conf.PREFIX): + ironmq = IronMQ(name=None, **Conf.IRON_MQ) + return ironmq.queue(queue_name=list_key) \ No newline at end of file diff --git a/django_q/conf.py b/django_q/conf.py index ebd28422..6893702e 100644 --- a/django_q/conf.py +++ b/django_q/conf.py @@ -37,6 +37,9 @@ class Conf(object): # Optional Authentication DISQUE_AUTH = conf.get('disque_auth', None) + # IronMQ broker + IRON_MQ = conf.get('iron_mq', None) + # Name of the cluster or site. For when you run multiple sites on one redis server PREFIX = conf.get('name', 'default') @@ -62,7 +65,7 @@ class Conf(object): WORKERS = 4 # Maximum number of tasks that each cluster can work on - QUEUE_LIMIT = conf.get('queue_limit', int(WORKERS)**2) + QUEUE_LIMIT = conf.get('queue_limit', int(WORKERS) ** 2) # Sets compression of redis packages COMPRESSED = conf.get('compress', False) diff --git a/django_q/tests/test_brokers.py b/django_q/tests/test_brokers.py index 836e59fd..09ce743d 100644 --- a/django_q/tests/test_brokers.py +++ b/django_q/tests/test_brokers.py @@ -74,7 +74,7 @@ def test_disque(): broker.delete(task_id) assert broker.queue_size() == 0 # fail - task_id=broker.enqueue('test') + task_id = broker.enqueue('test') broker.fail(task_id) # delete queue broker.enqueue('test') @@ -83,3 +83,49 @@ def test_disque(): assert broker.queue_size() == 0 # back to django-redis Conf.DISQUE_NODES = None + + +@pytest.mark.skipif(not os.getenv('IRON_MQ_TOKEN'), + reason="requires IronMQ credentials") +def test_ironmq(): + Conf.IRON_MQ = {'host': os.getenv('IRON_MQ_HOST'), + 'token': os.getenv('IRON_MQ_TOKEN'), + 'project_id': os.getenv('IRON_MQ_PROJECT_ID')} + # check broker + broker = get_broker(list_key='djangoQ') + assert broker.ping() is True + assert broker.info() is not None + # clear before we start + broker.purge_queue() + # enqueue + broker.enqueue('test') + assert broker.queue_size() == 1 + # dequeue + task = broker.dequeue() + assert task[1] == 'test' + broker.acknowledge(task[0]) + assert broker.queue_size() == 0 + # Retry test + Conf.RETRY = 1 + broker.enqueue('test') + assert broker.queue_size() == 1 + assert broker.dequeue() is not None + sleep(1.5) + task = broker.dequeue() + assert len(task) > 0 + broker.acknowledge(task[0]) + sleep(1.5) + # delete job + task_id = broker.enqueue('test') + broker.delete(task_id) + assert broker.queue_size() == 0 + # fail + task_id = broker.enqueue('test') + broker.fail(task_id) + # delete queue + broker.enqueue('test') + broker.enqueue('test') + broker.purge_queue() + assert broker.queue_size() == 0 + # back to django-redis + Conf.IRON_MQ = None diff --git a/docs/conf.py b/docs/conf.py index bc9e9b65..191eb06b 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -72,7 +72,7 @@ # The short X.Y version. version = '0.6' # The full version, including alpha/beta/rc tags. -release = '0.6.2' +release = '0.6.3' # The language for content autogenerated by Sphinx. Refer to documentation # for a list of supported languages. diff --git a/requirements.in b/requirements.in index 1ab7ca1f..c3021434 100644 --- a/requirements.in +++ b/requirements.in @@ -6,3 +6,4 @@ hiredis redis psutil django-redis +iron-mq diff --git a/requirements.txt b/requirements.txt index 9fce9599..48590742 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,9 +10,12 @@ django-picklefield==0.3.2 django-redis==4.2.0 future==0.15.0 hiredis==0.2.0 +iron-core==1.1.9 # via iron-mq +iron-mq==0.7 msgpack-python==0.4.6 # via django-redis psutil==3.2.1 -python-dateutil==2.4.2 # via arrow +python-dateutil==2.4.2 # via arrow, iron-core redis==2.10.3 +requests==2.7.0 # via iron-core six==1.9.0 # via python-dateutil wcwidth==0.1.4 # via blessed diff --git a/setup.py b/setup.py index d9ab8b07..ec25e3b6 100644 --- a/setup.py +++ b/setup.py @@ -26,7 +26,7 @@ def run(self): setup( name='django-q', - version='0.6.2', + version='0.6.3', author='Ilan Steemers', author_email='koed00@gmail.com', keywords='django task queue worker redis disque multiprocessing', From 0ffae4519b6d41d0235958a82831ad36504fc8bf Mon Sep 17 00:00:00 2001 From: Ilan Steemers Date: Tue, 8 Sep 2015 13:37:33 +0200 Subject: [PATCH 02/12] Adds bulk get to ironmq --- django_q/brokers/__init__.py | 1 + django_q/brokers/ironmq.py | 21 ++++++++++++++------- django_q/conf.py | 1 + django_q/tests/test_brokers.py | 1 + 4 files changed, 17 insertions(+), 7 deletions(-) diff --git a/django_q/brokers/__init__.py b/django_q/brokers/__init__.py index 90c8d4ba..626717c5 100644 --- a/django_q/brokers/__init__.py +++ b/django_q/brokers/__init__.py @@ -7,6 +7,7 @@ def __init__(self, list_key=Conf.PREFIX): self.connection = self.get_connection(list_key) self.list_key = list_key self.cache = self.get_cache() + self.task_cache = [] def enqueue(self, task): """ diff --git a/django_q/brokers/ironmq.py b/django_q/brokers/ironmq.py index 6388bf60..d8b92ecb 100644 --- a/django_q/brokers/ironmq.py +++ b/django_q/brokers/ironmq.py @@ -1,18 +1,25 @@ -from django_q.conf import Conf +from django_q.conf import Conf, logger from django_q.brokers import Broker from iron_mq import IronMQ class IronMQBroker(Broker): - def enqueue(self, task): return self.connection.post(task)['ids'][0] def dequeue(self): - timeout = Conf.RETRY or None - task = self.connection.get(timeout=timeout, wait=1)['messages'] - if task: - return task[0]['id'], task[0]['body'] + t = None + if len(self.task_cache) > 0: + t = self.task_cache.pop() + else: + timeout = Conf.RETRY or None + tasks = self.connection.get(timeout=timeout, wait=1, max=Conf.IRON_MQ_MAX)['messages'] + if tasks: + t = tasks.pop() + if tasks: + self.task_cache = tasks + if t: + return t['id'], t['body'] def ping(self): return self.connection.name == self.list_key @@ -41,4 +48,4 @@ def acknowledge(self, task_id): @staticmethod def get_connection(list_key=Conf.PREFIX): ironmq = IronMQ(name=None, **Conf.IRON_MQ) - return ironmq.queue(queue_name=list_key) \ No newline at end of file + return ironmq.queue(queue_name=list_key) diff --git a/django_q/conf.py b/django_q/conf.py index 6893702e..6128875e 100644 --- a/django_q/conf.py +++ b/django_q/conf.py @@ -39,6 +39,7 @@ class Conf(object): # IronMQ broker IRON_MQ = conf.get('iron_mq', None) + IRON_MQ_MAX = conf.get('iron_mq_max', 1) # Name of the cluster or site. For when you run multiple sites on one redis server PREFIX = conf.get('name', 'default') diff --git a/django_q/tests/test_brokers.py b/django_q/tests/test_brokers.py index 09ce743d..d1d8d7e4 100644 --- a/django_q/tests/test_brokers.py +++ b/django_q/tests/test_brokers.py @@ -129,3 +129,4 @@ def test_ironmq(): assert broker.queue_size() == 0 # back to django-redis Conf.IRON_MQ = None + Conf.DJANGO_REDIS = 'default' From 83548dcfbb7aebc2dd75fe1a7583451ab16eea2c Mon Sep 17 00:00:00 2001 From: Ilan Steemers Date: Tue, 8 Sep 2015 13:42:15 +0200 Subject: [PATCH 03/12] Makes bulk a general setting --- django_q/brokers/ironmq.py | 2 +- django_q/conf.py | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/django_q/brokers/ironmq.py b/django_q/brokers/ironmq.py index d8b92ecb..c79b88f9 100644 --- a/django_q/brokers/ironmq.py +++ b/django_q/brokers/ironmq.py @@ -13,7 +13,7 @@ def dequeue(self): t = self.task_cache.pop() else: timeout = Conf.RETRY or None - tasks = self.connection.get(timeout=timeout, wait=1, max=Conf.IRON_MQ_MAX)['messages'] + tasks = self.connection.get(timeout=timeout, wait=1, max=Conf.BULK)['messages'] if tasks: t = tasks.pop() if tasks: diff --git a/django_q/conf.py b/django_q/conf.py index 6128875e..94638064 100644 --- a/django_q/conf.py +++ b/django_q/conf.py @@ -39,7 +39,6 @@ class Conf(object): # IronMQ broker IRON_MQ = conf.get('iron_mq', None) - IRON_MQ_MAX = conf.get('iron_mq_max', 1) # Name of the cluster or site. For when you run multiple sites on one redis server PREFIX = conf.get('name', 'default') @@ -81,6 +80,10 @@ class Conf(object): # Only works with brokers that guarantee delivery. Defaults to 60 seconds. RETRY = conf.get('retry', 60) + # Sets the amount of tasks the cluster will try to pop off the broker. + # If it supports bulk gets. + BULK = conf.get('bulk', 1) + # The Django Admin label for this app LABEL = conf.get('label', 'Django Q') From d849774c8647c8a61fed408536602eafde1c55f1 Mon Sep 17 00:00:00 2001 From: Ilan Steemers Date: Tue, 8 Sep 2015 14:06:38 +0200 Subject: [PATCH 04/12] Adds bulk option to disque broker adds tests --- django_q/brokers/disque.py | 15 ++++++++++++--- django_q/tests/test_brokers.py | 21 +++++++++++++++++++-- 2 files changed, 31 insertions(+), 5 deletions(-) diff --git a/django_q/brokers/disque.py b/django_q/brokers/disque.py index 7e43ccd3..bd9c20b3 100644 --- a/django_q/brokers/disque.py +++ b/django_q/brokers/disque.py @@ -11,9 +11,18 @@ def enqueue(self, task): 'ADDJOB {} {} 500 RETRY {}'.format(self.list_key, task, retry)).decode() def dequeue(self): - task = self.connection.execute_command('GETJOB TIMEOUT 1000 FROM {}'.format(self.list_key)) - if task: - return task[0][1].decode(), task[0][2].decode() + t = None + if len(self.task_cache) > 0: + t = self.task_cache.pop() + else: + tasks = self.connection.execute_command( + 'GETJOB COUNT {} TIMEOUT 1000 FROM {}'.format(Conf.BULK, self.list_key)) + if tasks: + t = tasks.pop() + if tasks: + self.task_cache = tasks + if t: + return t[1].decode(), t[2].decode() def queue_size(self): return self.connection.execute_command('QLEN {}'.format(self.list_key)) diff --git a/django_q/tests/test_brokers.py b/django_q/tests/test_brokers.py index d1d8d7e4..553788dc 100644 --- a/django_q/tests/test_brokers.py +++ b/django_q/tests/test_brokers.py @@ -76,6 +76,14 @@ def test_disque(): # fail task_id = broker.enqueue('test') broker.fail(task_id) + # bulk test + for i in range(5): + broker.enqueue('test') + Conf.BULK = 5 + for i in range(5): + task = broker.dequeue() + assert task is not None + broker.acknowledge(task[0]) # delete queue broker.enqueue('test') broker.enqueue('test') @@ -89,8 +97,8 @@ def test_disque(): reason="requires IronMQ credentials") def test_ironmq(): Conf.IRON_MQ = {'host': os.getenv('IRON_MQ_HOST'), - 'token': os.getenv('IRON_MQ_TOKEN'), - 'project_id': os.getenv('IRON_MQ_PROJECT_ID')} + 'token': os.getenv('IRON_MQ_TOKEN'), + 'project_id': os.getenv('IRON_MQ_PROJECT_ID')} # check broker broker = get_broker(list_key='djangoQ') assert broker.ping() is True @@ -122,11 +130,20 @@ def test_ironmq(): # fail task_id = broker.enqueue('test') broker.fail(task_id) + # bulk test + for i in range(5): + broker.enqueue('test') + Conf.BULK = 5 + for i in range(5): + task = broker.dequeue() + assert task is not None + broker.acknowledge(task[0]) # delete queue broker.enqueue('test') broker.enqueue('test') broker.purge_queue() assert broker.queue_size() == 0 + broker.delete_queue() # back to django-redis Conf.IRON_MQ = None Conf.DJANGO_REDIS = 'default' From 7fb12fce49ed36f5656a46bdee6f41834c98626f Mon Sep 17 00:00:00 2001 From: Ilan Steemers Date: Tue, 8 Sep 2015 15:19:40 +0200 Subject: [PATCH 05/12] fixes iron-mq test purging a queue that never had a message in it raises a not found error. So we queue one message to initialize. --- django_q/tests/test_brokers.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/django_q/tests/test_brokers.py b/django_q/tests/test_brokers.py index 553788dc..0a08bd01 100644 --- a/django_q/tests/test_brokers.py +++ b/django_q/tests/test_brokers.py @@ -96,13 +96,14 @@ def test_disque(): @pytest.mark.skipif(not os.getenv('IRON_MQ_TOKEN'), reason="requires IronMQ credentials") def test_ironmq(): - Conf.IRON_MQ = {'host': os.getenv('IRON_MQ_HOST'), - 'token': os.getenv('IRON_MQ_TOKEN'), + Conf.IRON_MQ = {'token': os.getenv('IRON_MQ_TOKEN'), 'project_id': os.getenv('IRON_MQ_PROJECT_ID')} # check broker broker = get_broker(list_key='djangoQ') assert broker.ping() is True assert broker.info() is not None + # initialize the queue + broker.enqueue('test') # clear before we start broker.purge_queue() # enqueue From c946a16b99fca41937f4d99b3bcea01a0a155e23 Mon Sep 17 00:00:00 2001 From: Ilan Steemers Date: Tue, 8 Sep 2015 15:38:00 +0200 Subject: [PATCH 06/12] changes py 2.7 import for ironmq --- django_q/brokers/__init__.py | 4 ++-- django_q/brokers/{ironmq.py => ironmq_broker.py} | 2 +- django_q/tests/test_cluster.py | 4 ++++ 3 files changed, 7 insertions(+), 3 deletions(-) rename django_q/brokers/{ironmq.py => ironmq_broker.py} (97%) diff --git a/django_q/brokers/__init__.py b/django_q/brokers/__init__.py index 626717c5..c1c3d084 100644 --- a/django_q/brokers/__init__.py +++ b/django_q/brokers/__init__.py @@ -155,8 +155,8 @@ def get_broker(list_key=Conf.PREFIX): from brokers import disque return disque.Disque(list_key=list_key) elif Conf.IRON_MQ: - from brokers import ironmq - return ironmq.IronMQBroker(list_key=list_key) + from brokers import ironmq_broker + return ironmq_broker.IronMQBroker(list_key=list_key) # default to redis else: from brokers import redis_broker diff --git a/django_q/brokers/ironmq.py b/django_q/brokers/ironmq_broker.py similarity index 97% rename from django_q/brokers/ironmq.py rename to django_q/brokers/ironmq_broker.py index c79b88f9..fd47667e 100644 --- a/django_q/brokers/ironmq.py +++ b/django_q/brokers/ironmq_broker.py @@ -1,4 +1,4 @@ -from django_q.conf import Conf, logger +from django_q.conf import Conf from django_q.brokers import Broker from iron_mq import IronMQ diff --git a/django_q/tests/test_cluster.py b/django_q/tests/test_cluster.py index 5df7bcbe..e2ecd940 100644 --- a/django_q/tests/test_cluster.py +++ b/django_q/tests/test_cluster.py @@ -18,6 +18,10 @@ from django_q.brokers import get_broker from .tasks import multiply +Conf.DISQUE_NODES = None +Conf.IRON_MQ = None +Conf.DJANGO_REDIS = 'default' + class WordClass(object): def __init__(self): From 8eb14314133462fd576b9df92192ff3e80cca58f Mon Sep 17 00:00:00 2001 From: Ilan Steemers Date: Tue, 8 Sep 2015 15:50:32 +0200 Subject: [PATCH 07/12] Resets broker configuration in case a third party broker crashes --- django_q/brokers/__init__.py | 4 ++-- django_q/brokers/{ironmq_broker.py => ironmq.py} | 0 django_q/tests/test_cluster.py | 8 +++----- 3 files changed, 5 insertions(+), 7 deletions(-) rename django_q/brokers/{ironmq_broker.py => ironmq.py} (100%) diff --git a/django_q/brokers/__init__.py b/django_q/brokers/__init__.py index c1c3d084..626717c5 100644 --- a/django_q/brokers/__init__.py +++ b/django_q/brokers/__init__.py @@ -155,8 +155,8 @@ def get_broker(list_key=Conf.PREFIX): from brokers import disque return disque.Disque(list_key=list_key) elif Conf.IRON_MQ: - from brokers import ironmq_broker - return ironmq_broker.IronMQBroker(list_key=list_key) + from brokers import ironmq + return ironmq.IronMQBroker(list_key=list_key) # default to redis else: from brokers import redis_broker diff --git a/django_q/brokers/ironmq_broker.py b/django_q/brokers/ironmq.py similarity index 100% rename from django_q/brokers/ironmq_broker.py rename to django_q/brokers/ironmq.py diff --git a/django_q/tests/test_cluster.py b/django_q/tests/test_cluster.py index e2ecd940..3aa09c64 100644 --- a/django_q/tests/test_cluster.py +++ b/django_q/tests/test_cluster.py @@ -18,11 +18,6 @@ from django_q.brokers import get_broker from .tasks import multiply -Conf.DISQUE_NODES = None -Conf.IRON_MQ = None -Conf.DJANGO_REDIS = 'default' - - class WordClass(object): def __init__(self): self.word_list = DEFAULT_WORDLIST @@ -33,6 +28,9 @@ def get_words(self): @pytest.fixture def broker(): + Conf.DISQUE_NODES = None + Conf.IRON_MQ = None + Conf.DJANGO_REDIS = 'default' return get_broker() From 661987b3689ea5efdf03dad08b3576e2592d4235 Mon Sep 17 00:00:00 2001 From: Ilan Steemers Date: Tue, 8 Sep 2015 15:56:53 +0200 Subject: [PATCH 08/12] updated delete job test --- django_q/tests/test_brokers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/django_q/tests/test_brokers.py b/django_q/tests/test_brokers.py index 0a08bd01..ac1ea9c8 100644 --- a/django_q/tests/test_brokers.py +++ b/django_q/tests/test_brokers.py @@ -72,7 +72,7 @@ def test_disque(): # delete job task_id = broker.enqueue('test') broker.delete(task_id) - assert broker.queue_size() == 0 + assert broker.dequeue() is None # fail task_id = broker.enqueue('test') broker.fail(task_id) From 52f353fb37f94cd726574cd7c6ae7ffa88aacda7 Mon Sep 17 00:00:00 2001 From: Ilan Steemers Date: Tue, 8 Sep 2015 16:11:27 +0200 Subject: [PATCH 09/12] random queue names for ironmq test --- django_q/tests/test_brokers.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/django_q/tests/test_brokers.py b/django_q/tests/test_brokers.py index ac1ea9c8..6b38b01a 100644 --- a/django_q/tests/test_brokers.py +++ b/django_q/tests/test_brokers.py @@ -4,6 +4,7 @@ import redis from django_q.conf import Conf from django_q.brokers import get_broker, Broker +from django_q.humanhash import uuid def test_broker(): @@ -99,7 +100,7 @@ def test_ironmq(): Conf.IRON_MQ = {'token': os.getenv('IRON_MQ_TOKEN'), 'project_id': os.getenv('IRON_MQ_PROJECT_ID')} # check broker - broker = get_broker(list_key='djangoQ') + broker = get_broker(list_key=uuid()[0]) assert broker.ping() is True assert broker.info() is not None # initialize the queue From 69df45ea4ec5a5941cca24cf79d64b782fa58803 Mon Sep 17 00:00:00 2001 From: Ilan Steemers Date: Tue, 8 Sep 2015 16:22:38 +0200 Subject: [PATCH 10/12] queue size is not atomic on ironmq --- django_q/tests/test_brokers.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/django_q/tests/test_brokers.py b/django_q/tests/test_brokers.py index 6b38b01a..3086613d 100644 --- a/django_q/tests/test_brokers.py +++ b/django_q/tests/test_brokers.py @@ -107,18 +107,17 @@ def test_ironmq(): broker.enqueue('test') # clear before we start broker.purge_queue() + assert broker.queue_size() == 0 # enqueue broker.enqueue('test') - assert broker.queue_size() == 1 # dequeue task = broker.dequeue() assert task[1] == 'test' broker.acknowledge(task[0]) - assert broker.queue_size() == 0 + assert broker.dequeue() is None # Retry test Conf.RETRY = 1 broker.enqueue('test') - assert broker.queue_size() == 1 assert broker.dequeue() is not None sleep(1.5) task = broker.dequeue() @@ -128,7 +127,7 @@ def test_ironmq(): # delete job task_id = broker.enqueue('test') broker.delete(task_id) - assert broker.queue_size() == 0 + assert broker.dequeue() is None # fail task_id = broker.enqueue('test') broker.fail(task_id) @@ -144,7 +143,7 @@ def test_ironmq(): broker.enqueue('test') broker.enqueue('test') broker.purge_queue() - assert broker.queue_size() == 0 + assert broker.dequeue() is None broker.delete_queue() # back to django-redis Conf.IRON_MQ = None From 5f4334d85ad84035b338d548e5eb209926a420eb Mon Sep 17 00:00:00 2001 From: Ilan Steemers Date: Tue, 8 Sep 2015 17:17:54 +0200 Subject: [PATCH 11/12] docs: adds IronMQ and bulk config --- README.rst | 5 ++--- docs/brokers.rst | 20 +++++++++++++++----- docs/configure.rst | 34 ++++++++++++++++++++++++++++++++++ docs/index.rst | 2 +- 4 files changed, 52 insertions(+), 9 deletions(-) diff --git a/README.rst b/README.rst index 0909c8dc..af1cfa55 100644 --- a/README.rst +++ b/README.rst @@ -20,7 +20,7 @@ Features - Django Admin integration - PaaS compatible with multiple instances - Multi cluster monitor -- Redis broker and Disque broker +- Redis, Disque or IronMQ broker - Python 2 and 3 Requirements @@ -37,8 +37,7 @@ Brokers ~~~~~~~ - `Redis `__ - `Disque `__ -- `Amazon SQS `__ (TBA) -- `IronMQ `__ (TBA) +- `IronMQ `__ Installation diff --git a/docs/brokers.rst b/docs/brokers.rst index c285e15c..a062e069 100644 --- a/docs/brokers.rst +++ b/docs/brokers.rst @@ -2,9 +2,8 @@ Brokers ======= The broker sits between your Django instances and your Django Q cluster instances, accepting and delivering task packages. -Currently we only support `Redis `__ and `Disque `__, but support for other brokers is being worked on. - -Clients for `Amazon SQS `__ and `IronMQ `__ are TBA. +Currently we support `Redis `__ , `Disque `__ and `IronMQ `__. +Support for more brokers is being worked on. Redis @@ -12,9 +11,9 @@ Redis The default broker for Django Q clusters. * Atomic -* Does not need separate cache framework for monitoring -* Does not support receipts * Requires `Redis-py `__ client library: ``pip install redis`` +* Does not need cache framework for monitoring +* Does not support receipts * Can use existing :ref:`django_redis` connections. * Configure with :ref:`redis_configuration`-py compatible configuration @@ -28,9 +27,20 @@ You can control the amount of time Disque should wait for completion of a task b * Needs Django's `Cache framework `__ configured for monitoring * Compatible with `Tynd `__ Disque addon on `Heroku `__ * Still considered Alpha software +* Supports bulk dequeue * Requires `Redis-py `__ client library: ``pip install redis`` * See the :ref:`disque_configuration` configuration section for more info. +IronMQ +------ +This HTTP based queue service is both available directly via `Iron.io `__ and as an add-on on Heroku. + +* Delivery receipts +* Supports bulk dequeue +* Needs Django's `Cache framework `__ configured for monitoring +* Requires the `iron-mq `__ client library: ``pip install iron-mq`` +* See the :ref:`ironmq_configuration` configuration section for options. + Reference --------- The :class:`Broker` class is used internally to communicate with the different types of brokers. diff --git a/docs/configure.rst b/docs/configure.rst index 423af597..dfc34f97 100644 --- a/docs/configure.rst +++ b/docs/configure.rst @@ -187,6 +187,40 @@ disque_auth Optional Disque password for servers that require authentication. +.. _ironmq_configuration: + +iron_mq +~~~~~~~ +Connection settings for IronMQ:: + + # example IronMQ connection + + Q_CLUSTER = { + 'name': 'IronBroker', + 'workers': 8, + 'timeout': 30, + 'retry': 60, + 'queue_limit': 50, + 'bulk': 10, + 'iron_mq': { + 'host': 'mq-aws-us-east-1.iron.io', + 'token': 'Et1En7.....0LuW39Q', + 'project_id': '500f7b....b0f302e9' + } + } + + +All connection keywords are supported. See the `iron-mq `__ library for more info + +bulk +~~~~ +Sets the number of messages each cluster tries to get from the broker per call. Setting this on supported brokers can improve performance. +Especially HTTP based or very high latency servers can benefit from bulk dequeue. +Keep in mind however that settings this too high can degrade performance with multiple clusters or very large task packages. + +Not supported by the default Redis broker. +Defaults to 1. + cpu_affinity ~~~~~~~~~~~~ diff --git a/docs/index.rst b/docs/index.rst index 5d38a220..fb698c74 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -20,7 +20,7 @@ Features - Django Admin integration - PaaS compatible with multiple instances - Multi cluster monitor -- Redis or Disque broker +- Redis, Disque or IronMQ broker - Python 2 and 3 From bb7e91e8753fe297bab8585e548dcec70e62afdd Mon Sep 17 00:00:00 2001 From: Ilan Steemers Date: Tue, 8 Sep 2015 17:30:09 +0200 Subject: [PATCH 12/12] docs: minor changes --- docs/configure.rst | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/docs/configure.rst b/docs/configure.rst index dfc34f97..7e15784b 100644 --- a/docs/configure.rst +++ b/docs/configure.rst @@ -136,7 +136,7 @@ django_redis ~~~~~~~~~~~~ If you are already using `django-redis `__ for your caching, you can take advantage of its excellent connection backend by supplying the name -of the cache connection you want to use:: +of the cache connection you want to use instead of a direct Redis connection:: # example django-redis connection Q_CLUSTER = { @@ -167,9 +167,9 @@ If you want to use Disque as your broker, set this to a list of available Disque } -Django Q is also compatible with the `Tynd `__ addon on `Heroku `__:: +Django Q is also compatible with the `Tynd Disque `__ addon on `Heroku `__:: - # example Tynd connection + # example Tynd Disque connection import os Q_CLUSTER = { @@ -177,6 +177,7 @@ Django Q is also compatible with the `Tynd `__ addon o 'workers': 8, 'timeout': 30, 'retry': 60, + 'bulk': 10, 'disque_nodes': os.environ['TYND_DISQUE_NODES'].split(','), 'disque_auth': os.environ['TYND_DISQUE_AUTH'] }