Skip to content

Commit

Permalink
Merge pull request #58 from Koed00/dev
Browse files Browse the repository at this point in the history
Adds Amazon SQS broker
  • Loading branch information
Koed00 committed Sep 10, 2015
2 parents 6306505 + 2efa5ba commit 793c0a0
Show file tree
Hide file tree
Showing 14 changed files with 241 additions and 15 deletions.
10 changes: 6 additions & 4 deletions README.rst
Expand Up @@ -20,7 +20,7 @@ Features
- Django Admin integration
- PaaS compatible with multiple instances
- Multi cluster monitor
- Redis, Disque or IronMQ broker
- Redis, Disque, IronMQ or SQS
- Python 2 and 3

Requirements
Expand All @@ -38,6 +38,7 @@ Brokers
- `Redis <http://redis.io/>`__
- `Disque <https://github.com/antirez/disque>`__
- `IronMQ <http://www.iron.io/mq/>`__
- `Amazon SQS <https://aws.amazon.com/sqs/>`__


Installation
Expand Down Expand Up @@ -90,9 +91,6 @@ All configuration settings are optional. e.g:
For full configuration options, see the `configuration documentation <https://django-q.readthedocs.org/en/latest/configure.html>`__.


If you are using `django-redis <https://github.com/niwinz/django-redis>`__ , you can `configure <https://django-q.readthedocs.org/en/latest/configure.html#django-redis>`__ Django Q to use its connection pool.

Management Commands
~~~~~~~~~~~~~~~~~~~

Expand All @@ -104,6 +102,10 @@ Monitor your clusters with::

$ python manage.py qmonitor

Check overall statistics with::

$ python manage.py qinfo

Creating Tasks
~~~~~~~~~~~~~~

Expand Down
3 changes: 3 additions & 0 deletions django_q/brokers/__init__.py
Expand Up @@ -157,6 +157,9 @@ def get_broker(list_key=Conf.PREFIX):
elif Conf.IRON_MQ:
from brokers import ironmq
return ironmq.IronMQBroker(list_key=list_key)
elif Conf.SQS:
from brokers import aws_sqs
return aws_sqs.Sqs(list_key=list_key)
# default to redis
else:
from brokers import redis_broker
Expand Down
65 changes: 65 additions & 0 deletions django_q/brokers/aws_sqs.py
@@ -0,0 +1,65 @@
from django_q.conf import Conf
from django_q.brokers import Broker
from boto3 import Session


class Sqs(Broker):
def __init__(self, list_key=Conf.PREFIX):
self.sqs = None
super(Sqs, self).__init__(list_key)
self.queue = self.get_queue()

def enqueue(self, task):
response = self.queue.send_message(MessageBody=task)
return response.get('MessageId')

def dequeue(self):
# sqs supports max 10 messages in bulk
if Conf.BULK > 10:
Conf.BULK = 10
t = None
if len(self.task_cache) > 0:
t = self.task_cache.pop()
else:
tasks = self.queue.receive_messages(MaxNumberOfMessages=Conf.BULK, VisibilityTimeout=Conf.RETRY)
if tasks:
t = tasks.pop()
if tasks:
self.task_cache = tasks
if t:
return t.receipt_handle, t.body

def acknowledge(self, task_id):
return self.delete(task_id)

def queue_size(self):
return int(self.queue.attributes['ApproximateNumberOfMessages'])

def delete(self, task_id):
message = self.sqs.Message(self.queue.url, task_id)
message.delete()

def fail(self, task_id):
self.delete(task_id)

def delete_queue(self):
self.queue.delete()

def purge_queue(self):
self.queue.purge()

def ping(self):
return 'sqs' in self.connection.get_available_resources()

def info(self):
return 'AWS SQS'

@staticmethod
def get_connection(list_key=Conf.PREFIX):
return Session(aws_access_key_id=Conf.SQS['aws_access_key_id'],
aws_secret_access_key=Conf.SQS['aws_secret_access_key'],
region_name=Conf.SQS['aws_region'])

def get_queue(self):
self.sqs = self.connection.resource('sqs')
return self.sqs.create_queue(QueueName=self.list_key)
11 changes: 9 additions & 2 deletions django_q/brokers/ironmq.py
@@ -1,3 +1,4 @@
from requests.exceptions import HTTPError
from django_q.conf import Conf
from django_q.brokers import Broker
from iron_mq import IronMQ
Expand Down Expand Up @@ -31,13 +32,19 @@ def queue_size(self):
return self.connection.size()

def delete_queue(self):
return self.connection.delete_queue()['msg']
try:
return self.connection.delete_queue()['msg']
except HTTPError:
return False

def purge_queue(self):
return self.connection.clear()

def delete(self, task_id):
return self.connection.delete(task_id)['msg']
try:
return self.connection.delete(task_id)['msg']
except HTTPError:
return False

def fail(self, task_id):
self.delete(task_id)
Expand Down
3 changes: 3 additions & 0 deletions django_q/conf.py
Expand Up @@ -40,6 +40,9 @@ class Conf(object):
# IronMQ broker
IRON_MQ = conf.get('iron_mq', None)

# SQS broker
SQS = conf.get('sqs', None)

# Name of the cluster or site. For when you run multiple sites on one redis server
PREFIX = conf.get('name', 'default')

Expand Down
66 changes: 66 additions & 0 deletions django_q/tests/test_brokers.py
Expand Up @@ -85,6 +85,13 @@ def test_disque():
task = broker.dequeue()
assert task is not None
broker.acknowledge(task[0])
# test duplicate acknowledge
# broker.acknowledge(task[0])
#
# this crashes Disque when followed by a JSCAN
# https://github.com/antirez/disque/issues/113
# confirmed fix and merge is on the way
#
# delete queue
broker.enqueue('test')
broker.enqueue('test')
Expand Down Expand Up @@ -139,6 +146,8 @@ def test_ironmq():
task = broker.dequeue()
assert task is not None
broker.acknowledge(task[0])
# duplicate acknowledge
broker.acknowledge(task[0])
# delete queue
broker.enqueue('test')
broker.enqueue('test')
Expand All @@ -148,3 +157,60 @@ def test_ironmq():
# back to django-redis
Conf.IRON_MQ = None
Conf.DJANGO_REDIS = 'default'


@pytest.mark.skipif(not os.getenv('AWS_ACCESS_KEY_ID'),
reason="requires AWS credentials")
def test_sqs():
Conf.SQS = {'aws_region': os.getenv('AWS_REGION'),
'aws_access_key_id': os.getenv('AWS_ACCESS_KEY_ID'),
'aws_secret_access_key': os.getenv('AWS_SECRET_ACCESS_KEY')}
# check broker
broker = get_broker(list_key=uuid()[0])
assert broker.ping() is True
assert broker.info() is not None
assert broker.queue_size() == 0
# enqueue
broker.enqueue('test')
# dequeue
task = broker.dequeue()
assert task[1] == 'test'
broker.acknowledge(task[0])
assert broker.dequeue() is None
# Retry test
Conf.RETRY = 1
broker.enqueue('test')
assert broker.dequeue() is not None
sleep(1.5)
task = broker.dequeue()
assert len(task) > 0
broker.acknowledge(task[0])
sleep(1.5)
# delete job
broker.enqueue('test')
task_id = broker.dequeue()[0]
broker.delete(task_id)
assert broker.dequeue() is None
# fail
broker.enqueue('test')
while task is None:
task = broker.dequeue()
broker.fail(task[0])
# bulk test
for i in range(10):
broker.enqueue('test')
Conf.BULK = 12
for i in range(10):
task = broker.dequeue()
assert task is not None
broker.acknowledge(task[0])
# duplicate acknowledge
broker.acknowledge(task[0])
# delete queue
broker.enqueue('test')
broker.purge_queue()
broker.delete_queue()
# back to django-redis
Conf.SQS = None
Conf.BULK = 1
Conf.DJANGO_REDIS = 'default'
2 changes: 2 additions & 0 deletions django_q/tests/test_cluster.py
Expand Up @@ -18,6 +18,7 @@
from django_q.brokers import get_broker
from .tasks import multiply


class WordClass(object):
def __init__(self):
self.word_list = DEFAULT_WORDLIST
Expand All @@ -30,6 +31,7 @@ def get_words(self):
def broker():
Conf.DISQUE_NODES = None
Conf.IRON_MQ = None
Conf.SQS = None
Conf.DJANGO_REDIS = 'default'
return get_broker()

Expand Down
4 changes: 4 additions & 0 deletions django_q/tests/test_scheduler.py
Expand Up @@ -14,6 +14,10 @@

@pytest.fixture
def broker():
Conf.DISQUE_NODES = None
Conf.IRON_MQ = None
Conf.SQS = None
Conf.DJANGO_REDIS = 'default'
return get_broker()


Expand Down
41 changes: 37 additions & 4 deletions docs/brokers.rst
@@ -1,8 +1,26 @@
Brokers
=======

The broker sits between your Django instances and your Django Q cluster instances, accepting and delivering task packages.
Currently we support `Redis <http://redis.io/>`__ , `Disque <https://github.com/antirez/disque>`__ and `IronMQ <http://www.iron.io/mq/>`__.
The broker sits between your Django instances and your Django Q cluster instances; accepting, saving and delivering task packages.
Currently we support a variety of brokers from the default Redis, bleeding edge Disque to the convenient Amazon SQS.

The default Redis broker does not support message receipts.
This means that in case of a catastrophic failure of the cluster server or worker timeouts, tasks that were being executed get lost.
Keep in mind this is not the same as a failing task. If a tasks code crashes, this should only lead to a failed task status.

Even though this might be acceptable in some use cases, you might prefer brokers with message receipts support.
These guarantee delivery by waiting for the cluster to send a receipt after the task has been processed.
In case a receipt has not been received after a set time, the task package is put back in the queue.
Django Q supports this behavior by setting the :ref:`retry` timer on brokers that support message receipts.

Some pointers:

* Don't set the :ref:`retry` timer to a lower or equal number than the task timeout.
* Retry time includes time the task spends waiting in the clusters internal queue.
* Don't set the :ref:`queue_limit` so high that tasks time out while waiting to be processed.
* In case a task is worked on twice, you will see a duplicate key error in the cluster logs.
* Duplicate tasks do generate additional receipt messages, but the result is discarded in favor of the first result.

Support for more brokers is being worked on.


Expand All @@ -19,8 +37,10 @@ The default broker for Django Q clusters.

Disque
------
Unlike Redis, Disque supports message receipts which make delivery to the cluster workers guaranteed. If a task never produces a failed or successful result, it will automatically be sent to the cluster again for a retry.
Unlike Redis, Disque supports message receipts which make delivery to the cluster workers guaranteed.
In our tests it is as fast or faster than the Redis broker.
You can control the amount of time Disque should wait for completion of a task by configuring the :ref:`retry` setting.
Bulk task retrieval is supported via the :ref:`bulk` option.

* Delivery receipts
* Atomic
Expand All @@ -41,6 +61,19 @@ This HTTP based queue service is both available directly via `Iron.io <http://ww
* Requires the `iron-mq <https://github.com/iron-io/iron_mq_python>`__ client library: ``pip install iron-mq``
* See the :ref:`ironmq_configuration` configuration section for options.

Amazon SQS
----------
Amazon's Simple Queue Service is another HTTP based message queue.
Although `SQS <https://aws.amazon.com/sqs/>`__ is not the fastest, it is stable, cheap and convenient if you already use AWS.

* Delivery receipts
* Maximum message size is 256Kb
* Supports bulk dequeue up to 10 messages with a maximum total size of 256Kb
* Needs Django's `Cache framework <https://docs.djangoproject.com/en/1.8/topics/cache/#setting-up-the-cache>`__ configured for monitoring
* Requires the `boto3 <https://github.com/boto/boto3>`__ client library: ``pip install boto3``
* See the :ref:`sqs_configuration` configuration section for options.


Reference
---------
The :class:`Broker` class is used internally to communicate with the different types of brokers.
Expand All @@ -54,7 +87,7 @@ You can override this class if you want to contribute and support your own broke

.. py:method:: dequeue()
Gets a task package from the broker.
Gets a task package from the broker and returns a tuple with a tracking id and the package.

.. py:method:: acknowledge(id)
Expand Down
29 changes: 29 additions & 0 deletions docs/configure.rst
Expand Up @@ -214,6 +214,35 @@ Connection settings for IronMQ::

All connection keywords are supported. See the `iron-mq <https://github.com/iron-io/iron_mq_python#configure>`__ library for more info

.. _sqs_configuration:

sqs
~~~
To use Amazon SQS as a broker you need to provide the AWS region and credentials::

# example SQS broker connection

Q_CLUSTER = {
'name': 'SQSExample',
'workers': 4,
'timeout': 60,
'retry': 90,
'queue_limit': 100,
'bulk': 5,
'sqs': {
'aws_region': 'us-east-1',
'aws_access_key_id': 'ac-Idr.....YwflZBaaxI',
'aws_secret_access_key': '500f7b....b0f302e9'
}
}


Please make sure these credentials have proper SQS access.

Amazon SQS only supports a bulk setting between 1 and 10, with the total payload not exceeding 256kb.

.. _bulk:

bulk
~~~~
Sets the number of messages each cluster tries to get from the broker per call. Setting this on supported brokers can improve performance.
Expand Down
2 changes: 1 addition & 1 deletion docs/index.rst
Expand Up @@ -20,7 +20,7 @@ Features
- Django Admin integration
- PaaS compatible with multiple instances
- Multi cluster monitor
- Redis, Disque or IronMQ broker
- Redis, Disque, IronMQ or SQS
- Python 2 and 3


Expand Down
10 changes: 8 additions & 2 deletions docs/install.rst
Expand Up @@ -60,8 +60,14 @@ Optional

$ pip install hiredis

- `Redis <http://redis.io/>`__ server is the default broker for Django Q. It provides the best performance and does not require Django's cache framework for monitoring.
- `Boto3 <https://github.com/boto/boto3>`__ is used for the Amazon SQS broker in favor of the now deprecating boto library::

- `Disque <https://github.com/antirez/disque>`__ server is based on Redis by the same author, but focuses on reliable queues. Currently in Alpha, but highly recommended. You can either build it from source or use it on Heroku through the `Tynd <https://disque.tynd.co/>`__ beta.
$ pip install boto3

- `Iron-mq <https://github.com/iron-io/iron_mq_python>`_ is the official python binding for the IronMQ broker::

$ pip install iron-mq

- `Redis <http://redis.io/>`__ server is the default broker for Django Q. It provides the best performance and does not require Django's cache framework for monitoring.

- `Disque <https://github.com/antirez/disque>`__ server is based on Redis by the same author, but focuses on reliable queues. Currently in Alpha, but highly recommended. You can either build it from source or use it on Heroku through the `Tynd <https://disque.tynd.co/>`__ beta.
1 change: 1 addition & 0 deletions requirements.in
Expand Up @@ -7,3 +7,4 @@ redis
psutil
django-redis
iron-mq
boto3

0 comments on commit 793c0a0

Please sign in to comment.