Skip to content

Commit

Permalink
Merge pull request #54 from Koed00/dev
Browse files Browse the repository at this point in the history
Adds IronMQ broker
  • Loading branch information
Koed00 committed Sep 8, 2015
2 parents b5c118b + bb7e91e commit f561772
Show file tree
Hide file tree
Showing 15 changed files with 209 additions and 23 deletions.
5 changes: 2 additions & 3 deletions README.rst
Expand Up @@ -20,7 +20,7 @@ Features
- Django Admin integration
- PaaS compatible with multiple instances
- Multi cluster monitor
- Redis broker and Disque broker
- Redis, Disque or IronMQ broker
- Python 2 and 3

Requirements
Expand All @@ -37,8 +37,7 @@ Brokers
~~~~~~~
- `Redis <http://redis.io/>`__
- `Disque <https://github.com/antirez/disque>`__
- `Amazon SQS <https://aws.amazon.com/sqs/>`__ (TBA)
- `IronMQ <http://www.iron.io/mq/>`__ (TBA)
- `IronMQ <http://www.iron.io/mq/>`__


Installation
Expand Down
2 changes: 1 addition & 1 deletion django_q/__init__.py
Expand Up @@ -9,6 +9,6 @@
from .cluster import Cluster
from .status import Stat

VERSION = (0, 6, 2)
VERSION = (0, 6, 3)

default_app_config = 'django_q.apps.DjangoQConfig'
4 changes: 4 additions & 0 deletions django_q/brokers/__init__.py
Expand Up @@ -7,6 +7,7 @@ def __init__(self, list_key=Conf.PREFIX):
self.connection = self.get_connection(list_key)
self.list_key = list_key
self.cache = self.get_cache()
self.task_cache = []

def enqueue(self, task):
"""
Expand Down Expand Up @@ -153,6 +154,9 @@ def get_broker(list_key=Conf.PREFIX):
if Conf.DISQUE_NODES:
from brokers import disque
return disque.Disque(list_key=list_key)
elif Conf.IRON_MQ:
from brokers import ironmq
return ironmq.IronMQBroker(list_key=list_key)
# default to redis
else:
from brokers import redis_broker
Expand Down
15 changes: 12 additions & 3 deletions django_q/brokers/disque.py
Expand Up @@ -11,9 +11,18 @@ def enqueue(self, task):
'ADDJOB {} {} 500 RETRY {}'.format(self.list_key, task, retry)).decode()

def dequeue(self):
task = self.connection.execute_command('GETJOB TIMEOUT 1000 FROM {}'.format(self.list_key))
if task:
return task[0][1].decode(), task[0][2].decode()
t = None
if len(self.task_cache) > 0:
t = self.task_cache.pop()
else:
tasks = self.connection.execute_command(
'GETJOB COUNT {} TIMEOUT 1000 FROM {}'.format(Conf.BULK, self.list_key))
if tasks:
t = tasks.pop()
if tasks:
self.task_cache = tasks
if t:
return t[1].decode(), t[2].decode()

def queue_size(self):
return self.connection.execute_command('QLEN {}'.format(self.list_key))
Expand Down
51 changes: 51 additions & 0 deletions django_q/brokers/ironmq.py
@@ -0,0 +1,51 @@
from django_q.conf import Conf
from django_q.brokers import Broker
from iron_mq import IronMQ


class IronMQBroker(Broker):
def enqueue(self, task):
return self.connection.post(task)['ids'][0]

def dequeue(self):
t = None
if len(self.task_cache) > 0:
t = self.task_cache.pop()
else:
timeout = Conf.RETRY or None
tasks = self.connection.get(timeout=timeout, wait=1, max=Conf.BULK)['messages']
if tasks:
t = tasks.pop()
if tasks:
self.task_cache = tasks
if t:
return t['id'], t['body']

def ping(self):
return self.connection.name == self.list_key

def info(self):
return 'IronMQ'

def queue_size(self):
return self.connection.size()

def delete_queue(self):
return self.connection.delete_queue()['msg']

def purge_queue(self):
return self.connection.clear()

def delete(self, task_id):
return self.connection.delete(task_id)['msg']

def fail(self, task_id):
self.delete(task_id)

def acknowledge(self, task_id):
return self.delete(task_id)

@staticmethod
def get_connection(list_key=Conf.PREFIX):
ironmq = IronMQ(name=None, **Conf.IRON_MQ)
return ironmq.queue(queue_name=list_key)
9 changes: 8 additions & 1 deletion django_q/conf.py
Expand Up @@ -37,6 +37,9 @@ class Conf(object):
# Optional Authentication
DISQUE_AUTH = conf.get('disque_auth', None)

# IronMQ broker
IRON_MQ = conf.get('iron_mq', None)

# Name of the cluster or site. For when you run multiple sites on one redis server
PREFIX = conf.get('name', 'default')

Expand All @@ -62,7 +65,7 @@ class Conf(object):
WORKERS = 4

# Maximum number of tasks that each cluster can work on
QUEUE_LIMIT = conf.get('queue_limit', int(WORKERS)**2)
QUEUE_LIMIT = conf.get('queue_limit', int(WORKERS) ** 2)

# Sets compression of redis packages
COMPRESSED = conf.get('compress', False)
Expand All @@ -77,6 +80,10 @@ class Conf(object):
# Only works with brokers that guarantee delivery. Defaults to 60 seconds.
RETRY = conf.get('retry', 60)

# Sets the amount of tasks the cluster will try to pop off the broker.
# If it supports bulk gets.
BULK = conf.get('bulk', 1)

# The Django Admin label for this app
LABEL = conf.get('label', 'Django Q')

Expand Down
69 changes: 67 additions & 2 deletions django_q/tests/test_brokers.py
Expand Up @@ -4,6 +4,7 @@
import redis
from django_q.conf import Conf
from django_q.brokers import get_broker, Broker
from django_q.humanhash import uuid


def test_broker():
Expand Down Expand Up @@ -72,14 +73,78 @@ def test_disque():
# delete job
task_id = broker.enqueue('test')
broker.delete(task_id)
assert broker.queue_size() == 0
assert broker.dequeue() is None
# fail
task_id=broker.enqueue('test')
task_id = broker.enqueue('test')
broker.fail(task_id)
# bulk test
for i in range(5):
broker.enqueue('test')
Conf.BULK = 5
for i in range(5):
task = broker.dequeue()
assert task is not None
broker.acknowledge(task[0])
# delete queue
broker.enqueue('test')
broker.enqueue('test')
broker.delete_queue()
assert broker.queue_size() == 0
# back to django-redis
Conf.DISQUE_NODES = None


@pytest.mark.skipif(not os.getenv('IRON_MQ_TOKEN'),
reason="requires IronMQ credentials")
def test_ironmq():
Conf.IRON_MQ = {'token': os.getenv('IRON_MQ_TOKEN'),
'project_id': os.getenv('IRON_MQ_PROJECT_ID')}
# check broker
broker = get_broker(list_key=uuid()[0])
assert broker.ping() is True
assert broker.info() is not None
# initialize the queue
broker.enqueue('test')
# clear before we start
broker.purge_queue()
assert broker.queue_size() == 0
# enqueue
broker.enqueue('test')
# dequeue
task = broker.dequeue()
assert task[1] == 'test'
broker.acknowledge(task[0])
assert broker.dequeue() is None
# Retry test
Conf.RETRY = 1
broker.enqueue('test')
assert broker.dequeue() is not None
sleep(1.5)
task = broker.dequeue()
assert len(task) > 0
broker.acknowledge(task[0])
sleep(1.5)
# delete job
task_id = broker.enqueue('test')
broker.delete(task_id)
assert broker.dequeue() is None
# fail
task_id = broker.enqueue('test')
broker.fail(task_id)
# bulk test
for i in range(5):
broker.enqueue('test')
Conf.BULK = 5
for i in range(5):
task = broker.dequeue()
assert task is not None
broker.acknowledge(task[0])
# delete queue
broker.enqueue('test')
broker.enqueue('test')
broker.purge_queue()
assert broker.dequeue() is None
broker.delete_queue()
# back to django-redis
Conf.IRON_MQ = None
Conf.DJANGO_REDIS = 'default'
4 changes: 3 additions & 1 deletion django_q/tests/test_cluster.py
Expand Up @@ -18,7 +18,6 @@
from django_q.brokers import get_broker
from .tasks import multiply


class WordClass(object):
def __init__(self):
self.word_list = DEFAULT_WORDLIST
Expand All @@ -29,6 +28,9 @@ def get_words(self):

@pytest.fixture
def broker():
Conf.DISQUE_NODES = None
Conf.IRON_MQ = None
Conf.DJANGO_REDIS = 'default'
return get_broker()


Expand Down
20 changes: 15 additions & 5 deletions docs/brokers.rst
Expand Up @@ -2,19 +2,18 @@ Brokers
=======

The broker sits between your Django instances and your Django Q cluster instances, accepting and delivering task packages.
Currently we only support `Redis <http://redis.io/>`__ and `Disque <https://github.com/antirez/disque>`__, but support for other brokers is being worked on.

Clients for `Amazon SQS <https://aws.amazon.com/sqs/>`__ and `IronMQ <http://www.iron.io/mq/>`__ are TBA.
Currently we support `Redis <http://redis.io/>`__ , `Disque <https://github.com/antirez/disque>`__ and `IronMQ <http://www.iron.io/mq/>`__.
Support for more brokers is being worked on.


Redis
-----
The default broker for Django Q clusters.

* Atomic
* Does not need separate cache framework for monitoring
* Does not support receipts
* Requires `Redis-py <https://github.com/andymccurdy/redis-py>`__ client library: ``pip install redis``
* Does not need cache framework for monitoring
* Does not support receipts
* Can use existing :ref:`django_redis` connections.
* Configure with :ref:`redis_configuration`-py compatible configuration

Expand All @@ -28,9 +27,20 @@ You can control the amount of time Disque should wait for completion of a task b
* Needs Django's `Cache framework <https://docs.djangoproject.com/en/1.8/topics/cache/#setting-up-the-cache>`__ configured for monitoring
* Compatible with `Tynd <https://disque.tynd.co/>`__ Disque addon on `Heroku <https://heroku.com>`__
* Still considered Alpha software
* Supports bulk dequeue
* Requires `Redis-py <https://github.com/andymccurdy/redis-py>`__ client library: ``pip install redis``
* See the :ref:`disque_configuration` configuration section for more info.

IronMQ
------
This HTTP based queue service is both available directly via `Iron.io <http://www.iron.io/mq/>`__ and as an add-on on Heroku.

* Delivery receipts
* Supports bulk dequeue
* Needs Django's `Cache framework <https://docs.djangoproject.com/en/1.8/topics/cache/#setting-up-the-cache>`__ configured for monitoring
* Requires the `iron-mq <https://github.com/iron-io/iron_mq_python>`__ client library: ``pip install iron-mq``
* See the :ref:`ironmq_configuration` configuration section for options.

Reference
---------
The :class:`Broker` class is used internally to communicate with the different types of brokers.
Expand Down
2 changes: 1 addition & 1 deletion docs/conf.py
Expand Up @@ -72,7 +72,7 @@
# The short X.Y version.
version = '0.6'
# The full version, including alpha/beta/rc tags.
release = '0.6.2'
release = '0.6.3'

# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
Expand Down
41 changes: 38 additions & 3 deletions docs/configure.rst
Expand Up @@ -136,7 +136,7 @@ django_redis
~~~~~~~~~~~~

If you are already using `django-redis <https://github.com/niwinz/django-redis>`__ for your caching, you can take advantage of its excellent connection backend by supplying the name
of the cache connection you want to use::
of the cache connection you want to use instead of a direct Redis connection::

# example django-redis connection
Q_CLUSTER = {
Expand Down Expand Up @@ -167,16 +167,17 @@ If you want to use Disque as your broker, set this to a list of available Disque
}


Django Q is also compatible with the `Tynd <https://disque.tynd.co/>`__ addon on `Heroku <https://heroku.com>`__::
Django Q is also compatible with the `Tynd Disque <https://disque.tynd.co/>`__ addon on `Heroku <https://heroku.com>`__::

# example Tynd connection
# example Tynd Disque connection
import os

Q_CLUSTER = {
'name': 'TyndBroker',
'workers': 8,
'timeout': 30,
'retry': 60,
'bulk': 10,
'disque_nodes': os.environ['TYND_DISQUE_NODES'].split(','),
'disque_auth': os.environ['TYND_DISQUE_AUTH']
}
Expand All @@ -187,6 +188,40 @@ disque_auth

Optional Disque password for servers that require authentication.

.. _ironmq_configuration:

iron_mq
~~~~~~~
Connection settings for IronMQ::

# example IronMQ connection

Q_CLUSTER = {
'name': 'IronBroker',
'workers': 8,
'timeout': 30,
'retry': 60,
'queue_limit': 50,
'bulk': 10,
'iron_mq': {
'host': 'mq-aws-us-east-1.iron.io',
'token': 'Et1En7.....0LuW39Q',
'project_id': '500f7b....b0f302e9'
}
}


All connection keywords are supported. See the `iron-mq <https://github.com/iron-io/iron_mq_python#configure>`__ library for more info

bulk
~~~~
Sets the number of messages each cluster tries to get from the broker per call. Setting this on supported brokers can improve performance.
Especially HTTP based or very high latency servers can benefit from bulk dequeue.
Keep in mind however that settings this too high can degrade performance with multiple clusters or very large task packages.

Not supported by the default Redis broker.
Defaults to 1.

cpu_affinity
~~~~~~~~~~~~

Expand Down
2 changes: 1 addition & 1 deletion docs/index.rst
Expand Up @@ -20,7 +20,7 @@ Features
- Django Admin integration
- PaaS compatible with multiple instances
- Multi cluster monitor
- Redis or Disque broker
- Redis, Disque or IronMQ broker
- Python 2 and 3


Expand Down
1 change: 1 addition & 0 deletions requirements.in
Expand Up @@ -6,3 +6,4 @@ hiredis
redis
psutil
django-redis
iron-mq

0 comments on commit f561772

Please sign in to comment.