Skip to content

Commit

Permalink
Merge pull request #75 from Koed00/dev
Browse files Browse the repository at this point in the history
Adds MongoDB broker
  • Loading branch information
Koed00 committed Sep 26, 2015
2 parents 6c81840 + 08104ef commit 96543e4
Show file tree
Hide file tree
Showing 17 changed files with 215 additions and 12 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Expand Up @@ -2,6 +2,7 @@ language: python

services:
- redis-server
- mongodb

python:
- "2.7"
Expand Down
3 changes: 2 additions & 1 deletion README.rst
Expand Up @@ -20,7 +20,7 @@ Features
- Django Admin integration
- PaaS compatible with multiple instances
- Multi cluster monitor
- Redis, Disque, IronMQ, SQS or ORM
- Redis, Disque, IronMQ, SQS, MongoDB or ORM
- Python 2 and 3

Requirements
Expand All @@ -39,6 +39,7 @@ Brokers
- `Disque <https://django-q.readthedocs.org/en/latest/brokers.html#disque>`__
- `IronMQ <https://django-q.readthedocs.org/en/latest/brokers.html#ironmq>`__
- `Amazon SQS <https://django-q.readthedocs.org/en/latest/brokers.html#amazon-sqs>`__
- `MongoDB <https://django-q.readthedocs.org/en/latest/brokers.html#mongodb>`__
- `Django ORM <https://django-q.readthedocs.org/en/latest/brokers.html#django-orm>`__

Installation
Expand Down
6 changes: 5 additions & 1 deletion django_q/brokers/__init__.py
Expand Up @@ -7,6 +7,7 @@ def __init__(self, list_key=Conf.PREFIX):
self.connection = self.get_connection(list_key)
self.list_key = list_key
self.cache = self.get_cache()
self._info = None

def enqueue(self, task):
"""
Expand Down Expand Up @@ -78,7 +79,7 @@ def info(self):
"""
Shows the broker type
"""
pass
return self._info

def set_stat(self, key, value, timeout):
"""
Expand Down Expand Up @@ -167,6 +168,9 @@ def get_broker(list_key=Conf.PREFIX):
elif Conf.ORM:
from brokers import orm
return orm.ORM(list_key=list_key)
elif Conf.MONGO:
from brokers import mongo
return mongo.Mongo(list_key=list_key)
# default to redis
else:
from brokers import redis_broker
Expand Down
6 changes: 4 additions & 2 deletions django_q/brokers/disque.py
Expand Up @@ -40,8 +40,10 @@ def delete_queue(self):
return len(jobs)

def info(self):
info = self.connection.info('server')
return 'Disque {}'.format(info['disque_version'])
if not self._info:
info = self.connection.info('server')
self._info= 'Disque {}'.format(info['disque_version'])
return self._info

@staticmethod
def get_connection(list_key=Conf.PREFIX):
Expand Down
73 changes: 73 additions & 0 deletions django_q/brokers/mongo.py
@@ -0,0 +1,73 @@
from datetime import timedelta
from time import sleep

from bson import ObjectId
from django.utils import timezone
from pymongo import MongoClient

from pymongo.errors import ConfigurationError

from django_q.brokers import Broker
from django_q.conf import Conf


def _timeout():
return timezone.now() - timedelta(seconds=Conf.RETRY)


class Mongo(Broker):
def __init__(self, list_key=Conf.PREFIX):
super(Mongo, self).__init__(list_key)
self.collection = self.get_collection()

@staticmethod
def get_connection(list_key=Conf.PREFIX):
return MongoClient(**Conf.MONGO)

def get_collection(self):
if not Conf.MONGO_DB:
try:
Conf.MONGO_DB = self.connection.get_default_database().name
except ConfigurationError:
Conf.MONGO_DB = 'django-q'
return self.connection[Conf.MONGO_DB][self.list_key]

def queue_size(self):
return self.collection.count({'lock': {'$lte': _timeout()}})

def lock_size(self):
return self.collection.count({'lock': {'$gt': _timeout()}})

def purge_queue(self):
return self.delete_queue()

def ping(self):
return self.info is not None

def info(self):
if not self._info:
self._info = 'MongoDB {}'.format(self.connection.server_info()['version'])
return self._info

def fail(self, task_id):
self.delete(task_id)

def enqueue(self, task):
inserted_id = self.collection.insert_one({'payload': task, 'lock': _timeout()}).inserted_id
return str(inserted_id)

def dequeue(self):
task = self.collection.find_one_and_update({'lock': {'$lte': _timeout()}}, {'$set': {'lock': timezone.now()}})
if task:
return [(str(task['_id']), task['payload'])]
# empty queue, spare the cpu
sleep(0.2)

def delete_queue(self):
return self.collection.drop()

def delete(self, task_id):
self.collection.delete_one({'_id': ObjectId(task_id)})

def acknowledge(self, task_id):
return self.delete(task_id)
6 changes: 4 additions & 2 deletions django_q/brokers/orm.py
Expand Up @@ -21,7 +21,7 @@ def queue_size(self):
return self.connection.filter(key=self.list_key, lock__lte=_timeout()).count()

def lock_size(self):
return self.connection.filter(key=self.list_key, lock__gte=_timeout()).count()
return self.connection.filter(key=self.list_key, lock__gt=_timeout()).count()

def purge_queue(self):
return self.connection.filter(key=self.list_key).delete()
Expand All @@ -30,7 +30,9 @@ def ping(self):
return True

def info(self):
return 'ORM {}'.format(Conf.ORM)
if not self._info:
self._info = 'ORM {}'.format(Conf.ORM)
return self._info

def fail(self, task_id):
self.delete(task_id)
Expand Down
8 changes: 5 additions & 3 deletions django_q/brokers/redis_broker.py
@@ -1,4 +1,5 @@
import redis

from django_q.brokers import Broker
from django_q.conf import Conf, logger

Expand All @@ -9,7 +10,6 @@


class Redis(Broker):

def __init__(self, list_key=Conf.PREFIX):
super(Redis, self).__init__(list_key='django_q:{}:q'.format(list_key))

Expand Down Expand Up @@ -38,8 +38,10 @@ def ping(self):
raise e

def info(self):
info = self.connection.info('server')
return 'Redis {}'.format(info['redis_version'])
if not self._info:
info = self.connection.info('server')
self._info = 'Redis {}'.format(info['redis_version'])
return self._info

def set_stat(self, key, value, timeout):
self.connection.set(key, value, timeout)
Expand Down
4 changes: 4 additions & 0 deletions django_q/conf.py
Expand Up @@ -50,6 +50,10 @@ class Conf(object):
# ORM broker
ORM = conf.get('orm', None)

# MongoDB broker
MONGO = conf.get('mongo', None)
MONGO_DB = conf.get('mongo_db', None)

# Name of the cluster or site. For when you run multiple sites on one redis server
PREFIX = conf.get('name', 'default')

Expand Down
61 changes: 61 additions & 0 deletions django_q/tests/test_brokers.py
Expand Up @@ -285,3 +285,64 @@ def test_orm():
assert broker.queue_size() == 0
# back to django-redis
Conf.ORM = None


@pytest.mark.django_db
def test_mongo():
Conf.MONGO = {'host': '127.0.0.1', 'port': 27017}
# check broker
broker = get_broker(list_key='mongo_test')
assert broker.ping() is True
assert broker.info() is not None
# clear before we start
broker.delete_queue()
# enqueue
broker.enqueue('test')
assert broker.queue_size() == 1
# dequeue
task = broker.dequeue()[0]
assert task[1] == 'test'
broker.acknowledge(task[0])
assert broker.queue_size() == 0
# Retry test
Conf.RETRY = 1
broker.enqueue('test')
assert broker.queue_size() == 1
broker.dequeue()
assert broker.queue_size() == 0
sleep(1.5)
assert broker.queue_size() == 1
task = broker.dequeue()[0]
assert broker.queue_size() == 0
broker.acknowledge(task[0])
sleep(1.5)
assert broker.queue_size() == 0
# delete job
task_id = broker.enqueue('test')
broker.delete(task_id)
assert broker.dequeue() is None
# fail
task_id = broker.enqueue('test')
broker.fail(task_id)
# bulk test
for i in range(5):
broker.enqueue('test')
tasks = []
for i in range(5):
tasks.append(broker.dequeue()[0])
assert broker.lock_size() == 5
for task in tasks:
assert task is not None
broker.acknowledge(task[0])
# test lock size
assert broker.lock_size() == 0
# test duplicate acknowledge
broker.acknowledge(task[0])
# delete queue
broker.enqueue('test')
broker.enqueue('test')
broker.purge_queue()
broker.delete_queue()
assert broker.queue_size() == 0
# back to django-redis
Conf.ORM = None
1 change: 1 addition & 0 deletions django_q/tests/test_cluster.py
Expand Up @@ -33,6 +33,7 @@ def broker():
Conf.IRON_MQ = None
Conf.SQS = None
Conf.ORM = None
Conf.MONGO = None
Conf.DJANGO_REDIS = 'default'
return get_broker()

Expand Down
15 changes: 14 additions & 1 deletion docs/brokers.rst
Expand Up @@ -2,7 +2,7 @@ Brokers
=======

The broker sits between your Django instances and your Django Q cluster instances; accepting, saving and delivering task packages.
Currently we support a variety of brokers from the default Redis, bleeding edge Disque to the convenient Amazon SQS.
Currently we support a variety of brokers from the default Redis, bleeding edge Disque to the convenient ORM and fast MongoBD.

The default Redis broker does not support message receipts.
This means that in case of a catastrophic failure of the cluster server or worker timeouts, tasks that were being executed get lost.
Expand Down Expand Up @@ -73,6 +73,18 @@ Although `SQS <https://aws.amazon.com/sqs/>`__ is not the fastest, it is stable,
* Requires the `boto3 <https://github.com/boto/boto3>`__ client library: ``pip install boto3``
* See the :ref:`sqs_configuration` configuration section for options.


MongoDB
-------
This highly scalable NoSQL database makes for a very fast and reliably persistent at-least-once message broker.
Usually available on most PaaS providers, as `open-source <https://www.mongodb.org/>`__ or commercial `enterprise <https://www.mongodb.com/lp/download/mongodb-enterprise>`__ edition.

* Delivery receipts
* Needs Django's `Cache framework <https://docs.djangoproject.com/en/1.8/topics/cache/#setting-up-the-cache>`__ configured for monitoring
* Can be configured as the Django cache-backend through several open-source cache providers.
* Requires the `pymongo <https://github.com/mongodb/mongo-python-driver>`__ driver: ``pip install pymongo``
* See the :ref:`mongo_configuration` configuration section for options.

.. _orm_broker:

Django ORM
Expand All @@ -84,6 +96,7 @@ However for a medium message rate and scheduled tasks, this is the most convenie
* Delivery receipts
* Supports bulk dequeue
* Needs Django's `Cache framework <https://docs.djangoproject.com/en/1.8/topics/cache/#setting-up-the-cache>`__ configured for monitoring
* Can be `configured <https://docs.djangoproject.com/en/1.8/topics/cache/#database-caching>`__ as its own cache backend.
* Queue editable in Django Admin
* See the :ref:`orm_configuration` configuration on how to set it up.

Expand Down
30 changes: 29 additions & 1 deletion docs/configure.rst
Expand Up @@ -264,6 +264,34 @@ Using the Django ORM backend will also enable the Queued Tasks table in the Admi
If you need better performance , you should consider using a different database backend than the main project.
Set ``orm`` to the name of that database connection and make sure you run migrations on it using the ``--database`` option.

.. _mongo_configuration:

mongo
~~~~~
To use MongoDB as a message broker you simply provide the connection information in a dictionary::

# example MongoDB broker connection

Q_CLUSTER = {
'name': 'MongoDB',
'workers': 8,
'timeout': 60,
'retry': 70,
'queue_limit': 100,
'mongo': {
'host': '127.0.0.1',
'port': 27017
}
}

The ``mongo`` dictionary can contain any of the parameters exposed by pymongo's `MongoClient <https://api.mongodb.org/python/current/api/pymongo/mongo_client.html#pymongo.mongo_client.MongoClient>`__
If you want to use a mongodb uri, you can supply it as the ``host`` parameter.

mongo_db
~~~~~~~~
When using the MongoDB broker you can optionally provide a database name to use for the queues.
Defaults to default database if available, otherwise ``django-q``

.. _bulk:

bulk
Expand All @@ -273,7 +301,7 @@ Especially HTTP based or very high latency servers can benefit from bulk dequeue
Keep in mind however that settings this too high can degrade performance with multiple clusters or very large task packages.

Not supported by the default Redis broker.
Defaults to 1.
Defaults to ``1``.

cache
~~~~~
Expand Down
2 changes: 1 addition & 1 deletion docs/index.rst
Expand Up @@ -20,7 +20,7 @@ Features
- Django Admin integration
- PaaS compatible with multiple instances
- Multi cluster monitor
- Redis, Disque, IronMQ, SQS or ORM
- Redis, Disque, IronMQ, SQS, MongoDB or ORM
- Python 2 and 3


Expand Down
5 changes: 5 additions & 0 deletions docs/install.rst
Expand Up @@ -68,10 +68,15 @@ Optional

$ pip install iron-mq

- `Pymongo <https://github.com/mongodb/mongo-python-driver>`__ is needed if you want to use MongoDB as a message broker::

$ pip install pymongo

- `Redis <http://redis.io/>`__ server is the default broker for Django Q. It provides the best performance and does not require Django's cache framework for monitoring.

- `Disque <https://github.com/antirez/disque>`__ server is based on Redis by the same author, but focuses on reliable queues. Currently in Alpha, but highly recommended. You can either build it from source or use it on Heroku through the `Tynd <https://disque.tynd.co/>`__ beta.

- `MongoDB <https://www.mongodb.org/>`__ is a highly scalable NoSQL database which makes for a very fast and reliably persistent at-least-once message broker. Usually available on most PaaS providers.

Compatibility
-------------
Expand Down
4 changes: 4 additions & 0 deletions docs/tasks.rst
Expand Up @@ -79,6 +79,10 @@ a single keyword dict named ``q_options``. This enables you to use these keyword

Please not that this will override any other option keywords.

.. note::
For tasks to be processed you will need to have a worker cluster running in the background using ``python manage.py qcluster``
or you need to configure Django Q to run in synchronous mode for testing using the :ref:`sync` option.

.. _groups:

Groups
Expand Down
1 change: 1 addition & 0 deletions requirements.in
Expand Up @@ -8,3 +8,4 @@ psutil
django-redis
iron-mq
boto3
pymongo
1 change: 1 addition & 0 deletions requirements.txt
Expand Up @@ -19,6 +19,7 @@ iron-mq==0.7
jmespath==0.8.0 # via boto3, botocore
msgpack-python==0.4.6 # via django-redis
psutil==3.2.1
pymongo==3.0.3
python-dateutil==2.4.2 # via arrow, botocore, iron-core
redis==2.10.3
requests==2.7.0 # via iron-core
Expand Down

0 comments on commit 96543e4

Please sign in to comment.