diff --git a/.travis.yml b/.travis.yml index 25efbf89..42bffc8d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,6 +2,7 @@ language: python services: - redis-server + - mongodb python: - "2.7" diff --git a/README.rst b/README.rst index b6ebb921..eef65947 100644 --- a/README.rst +++ b/README.rst @@ -20,7 +20,7 @@ Features - Django Admin integration - PaaS compatible with multiple instances - Multi cluster monitor -- Redis, Disque, IronMQ, SQS or ORM +- Redis, Disque, IronMQ, SQS, MongoDB or ORM - Python 2 and 3 Requirements @@ -39,6 +39,7 @@ Brokers - `Disque `__ - `IronMQ `__ - `Amazon SQS `__ +- `MongoDB `__ - `Django ORM `__ Installation diff --git a/django_q/brokers/__init__.py b/django_q/brokers/__init__.py index 0d10cacc..959d3927 100644 --- a/django_q/brokers/__init__.py +++ b/django_q/brokers/__init__.py @@ -7,6 +7,7 @@ def __init__(self, list_key=Conf.PREFIX): self.connection = self.get_connection(list_key) self.list_key = list_key self.cache = self.get_cache() + self._info = None def enqueue(self, task): """ @@ -78,7 +79,7 @@ def info(self): """ Shows the broker type """ - pass + return self._info def set_stat(self, key, value, timeout): """ @@ -167,6 +168,9 @@ def get_broker(list_key=Conf.PREFIX): elif Conf.ORM: from brokers import orm return orm.ORM(list_key=list_key) + elif Conf.MONGO: + from brokers import mongo + return mongo.Mongo(list_key=list_key) # default to redis else: from brokers import redis_broker diff --git a/django_q/brokers/disque.py b/django_q/brokers/disque.py index 3027c1ae..1fc4bf33 100644 --- a/django_q/brokers/disque.py +++ b/django_q/brokers/disque.py @@ -40,8 +40,10 @@ def delete_queue(self): return len(jobs) def info(self): - info = self.connection.info('server') - return 'Disque {}'.format(info['disque_version']) + if not self._info: + info = self.connection.info('server') + self._info= 'Disque {}'.format(info['disque_version']) + return self._info @staticmethod def get_connection(list_key=Conf.PREFIX): diff --git a/django_q/brokers/mongo.py b/django_q/brokers/mongo.py new file mode 100644 index 00000000..dfce5bad --- /dev/null +++ b/django_q/brokers/mongo.py @@ -0,0 +1,73 @@ +from datetime import timedelta +from time import sleep + +from bson import ObjectId +from django.utils import timezone +from pymongo import MongoClient + +from pymongo.errors import ConfigurationError + +from django_q.brokers import Broker +from django_q.conf import Conf + + +def _timeout(): + return timezone.now() - timedelta(seconds=Conf.RETRY) + + +class Mongo(Broker): + def __init__(self, list_key=Conf.PREFIX): + super(Mongo, self).__init__(list_key) + self.collection = self.get_collection() + + @staticmethod + def get_connection(list_key=Conf.PREFIX): + return MongoClient(**Conf.MONGO) + + def get_collection(self): + if not Conf.MONGO_DB: + try: + Conf.MONGO_DB = self.connection.get_default_database().name + except ConfigurationError: + Conf.MONGO_DB = 'django-q' + return self.connection[Conf.MONGO_DB][self.list_key] + + def queue_size(self): + return self.collection.count({'lock': {'$lte': _timeout()}}) + + def lock_size(self): + return self.collection.count({'lock': {'$gt': _timeout()}}) + + def purge_queue(self): + return self.delete_queue() + + def ping(self): + return self.info is not None + + def info(self): + if not self._info: + self._info = 'MongoDB {}'.format(self.connection.server_info()['version']) + return self._info + + def fail(self, task_id): + self.delete(task_id) + + def enqueue(self, task): + inserted_id = self.collection.insert_one({'payload': task, 'lock': _timeout()}).inserted_id + return str(inserted_id) + + def dequeue(self): + task = self.collection.find_one_and_update({'lock': {'$lte': _timeout()}}, {'$set': {'lock': timezone.now()}}) + if task: + return [(str(task['_id']), task['payload'])] + # empty queue, spare the cpu + sleep(0.2) + + def delete_queue(self): + return self.collection.drop() + + def delete(self, task_id): + self.collection.delete_one({'_id': ObjectId(task_id)}) + + def acknowledge(self, task_id): + return self.delete(task_id) diff --git a/django_q/brokers/orm.py b/django_q/brokers/orm.py index f3a180b2..f3a39ee2 100644 --- a/django_q/brokers/orm.py +++ b/django_q/brokers/orm.py @@ -21,7 +21,7 @@ def queue_size(self): return self.connection.filter(key=self.list_key, lock__lte=_timeout()).count() def lock_size(self): - return self.connection.filter(key=self.list_key, lock__gte=_timeout()).count() + return self.connection.filter(key=self.list_key, lock__gt=_timeout()).count() def purge_queue(self): return self.connection.filter(key=self.list_key).delete() @@ -30,7 +30,9 @@ def ping(self): return True def info(self): - return 'ORM {}'.format(Conf.ORM) + if not self._info: + self._info = 'ORM {}'.format(Conf.ORM) + return self._info def fail(self, task_id): self.delete(task_id) diff --git a/django_q/brokers/redis_broker.py b/django_q/brokers/redis_broker.py index 6a8cf9a5..4f4f285f 100644 --- a/django_q/brokers/redis_broker.py +++ b/django_q/brokers/redis_broker.py @@ -1,4 +1,5 @@ import redis + from django_q.brokers import Broker from django_q.conf import Conf, logger @@ -9,7 +10,6 @@ class Redis(Broker): - def __init__(self, list_key=Conf.PREFIX): super(Redis, self).__init__(list_key='django_q:{}:q'.format(list_key)) @@ -38,8 +38,10 @@ def ping(self): raise e def info(self): - info = self.connection.info('server') - return 'Redis {}'.format(info['redis_version']) + if not self._info: + info = self.connection.info('server') + self._info = 'Redis {}'.format(info['redis_version']) + return self._info def set_stat(self, key, value, timeout): self.connection.set(key, value, timeout) diff --git a/django_q/conf.py b/django_q/conf.py index b312c4f0..9ca2146c 100644 --- a/django_q/conf.py +++ b/django_q/conf.py @@ -50,6 +50,10 @@ class Conf(object): # ORM broker ORM = conf.get('orm', None) + # MongoDB broker + MONGO = conf.get('mongo', None) + MONGO_DB = conf.get('mongo_db', None) + # Name of the cluster or site. For when you run multiple sites on one redis server PREFIX = conf.get('name', 'default') diff --git a/django_q/tests/test_brokers.py b/django_q/tests/test_brokers.py index 59514ccf..4c8b79a8 100644 --- a/django_q/tests/test_brokers.py +++ b/django_q/tests/test_brokers.py @@ -285,3 +285,64 @@ def test_orm(): assert broker.queue_size() == 0 # back to django-redis Conf.ORM = None + + +@pytest.mark.django_db +def test_mongo(): + Conf.MONGO = {'host': '127.0.0.1', 'port': 27017} + # check broker + broker = get_broker(list_key='mongo_test') + assert broker.ping() is True + assert broker.info() is not None + # clear before we start + broker.delete_queue() + # enqueue + broker.enqueue('test') + assert broker.queue_size() == 1 + # dequeue + task = broker.dequeue()[0] + assert task[1] == 'test' + broker.acknowledge(task[0]) + assert broker.queue_size() == 0 + # Retry test + Conf.RETRY = 1 + broker.enqueue('test') + assert broker.queue_size() == 1 + broker.dequeue() + assert broker.queue_size() == 0 + sleep(1.5) + assert broker.queue_size() == 1 + task = broker.dequeue()[0] + assert broker.queue_size() == 0 + broker.acknowledge(task[0]) + sleep(1.5) + assert broker.queue_size() == 0 + # delete job + task_id = broker.enqueue('test') + broker.delete(task_id) + assert broker.dequeue() is None + # fail + task_id = broker.enqueue('test') + broker.fail(task_id) + # bulk test + for i in range(5): + broker.enqueue('test') + tasks = [] + for i in range(5): + tasks.append(broker.dequeue()[0]) + assert broker.lock_size() == 5 + for task in tasks: + assert task is not None + broker.acknowledge(task[0]) + # test lock size + assert broker.lock_size() == 0 + # test duplicate acknowledge + broker.acknowledge(task[0]) + # delete queue + broker.enqueue('test') + broker.enqueue('test') + broker.purge_queue() + broker.delete_queue() + assert broker.queue_size() == 0 + # back to django-redis + Conf.ORM = None diff --git a/django_q/tests/test_cluster.py b/django_q/tests/test_cluster.py index b80de4be..37dc79ab 100644 --- a/django_q/tests/test_cluster.py +++ b/django_q/tests/test_cluster.py @@ -33,6 +33,7 @@ def broker(): Conf.IRON_MQ = None Conf.SQS = None Conf.ORM = None + Conf.MONGO = None Conf.DJANGO_REDIS = 'default' return get_broker() diff --git a/docs/brokers.rst b/docs/brokers.rst index 429419ba..8bf4db60 100644 --- a/docs/brokers.rst +++ b/docs/brokers.rst @@ -2,7 +2,7 @@ Brokers ======= The broker sits between your Django instances and your Django Q cluster instances; accepting, saving and delivering task packages. -Currently we support a variety of brokers from the default Redis, bleeding edge Disque to the convenient Amazon SQS. +Currently we support a variety of brokers from the default Redis, bleeding edge Disque to the convenient ORM and fast MongoBD. The default Redis broker does not support message receipts. This means that in case of a catastrophic failure of the cluster server or worker timeouts, tasks that were being executed get lost. @@ -73,6 +73,18 @@ Although `SQS `__ is not the fastest, it is stable, * Requires the `boto3 `__ client library: ``pip install boto3`` * See the :ref:`sqs_configuration` configuration section for options. + +MongoDB +------- +This highly scalable NoSQL database makes for a very fast and reliably persistent at-least-once message broker. +Usually available on most PaaS providers, as `open-source `__ or commercial `enterprise `__ edition. + +* Delivery receipts +* Needs Django's `Cache framework `__ configured for monitoring +* Can be configured as the Django cache-backend through several open-source cache providers. +* Requires the `pymongo `__ driver: ``pip install pymongo`` +* See the :ref:`mongo_configuration` configuration section for options. + .. _orm_broker: Django ORM @@ -84,6 +96,7 @@ However for a medium message rate and scheduled tasks, this is the most convenie * Delivery receipts * Supports bulk dequeue * Needs Django's `Cache framework `__ configured for monitoring +* Can be `configured `__ as its own cache backend. * Queue editable in Django Admin * See the :ref:`orm_configuration` configuration on how to set it up. diff --git a/docs/configure.rst b/docs/configure.rst index 046e189f..8a3d17e1 100644 --- a/docs/configure.rst +++ b/docs/configure.rst @@ -264,6 +264,34 @@ Using the Django ORM backend will also enable the Queued Tasks table in the Admi If you need better performance , you should consider using a different database backend than the main project. Set ``orm`` to the name of that database connection and make sure you run migrations on it using the ``--database`` option. +.. _mongo_configuration: + +mongo +~~~~~ +To use MongoDB as a message broker you simply provide the connection information in a dictionary:: + + # example MongoDB broker connection + + Q_CLUSTER = { + 'name': 'MongoDB', + 'workers': 8, + 'timeout': 60, + 'retry': 70, + 'queue_limit': 100, + 'mongo': { + 'host': '127.0.0.1', + 'port': 27017 + } + } + +The ``mongo`` dictionary can contain any of the parameters exposed by pymongo's `MongoClient `__ +If you want to use a mongodb uri, you can supply it as the ``host`` parameter. + +mongo_db +~~~~~~~~ +When using the MongoDB broker you can optionally provide a database name to use for the queues. +Defaults to default database if available, otherwise ``django-q`` + .. _bulk: bulk @@ -273,7 +301,7 @@ Especially HTTP based or very high latency servers can benefit from bulk dequeue Keep in mind however that settings this too high can degrade performance with multiple clusters or very large task packages. Not supported by the default Redis broker. -Defaults to 1. +Defaults to ``1``. cache ~~~~~ diff --git a/docs/index.rst b/docs/index.rst index ddb4aac5..b46ba00a 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -20,7 +20,7 @@ Features - Django Admin integration - PaaS compatible with multiple instances - Multi cluster monitor -- Redis, Disque, IronMQ, SQS or ORM +- Redis, Disque, IronMQ, SQS, MongoDB or ORM - Python 2 and 3 diff --git a/docs/install.rst b/docs/install.rst index 9fb07975..e3b4b386 100644 --- a/docs/install.rst +++ b/docs/install.rst @@ -68,10 +68,15 @@ Optional $ pip install iron-mq +- `Pymongo `__ is needed if you want to use MongoDB as a message broker:: + + $ pip install pymongo + - `Redis `__ server is the default broker for Django Q. It provides the best performance and does not require Django's cache framework for monitoring. - `Disque `__ server is based on Redis by the same author, but focuses on reliable queues. Currently in Alpha, but highly recommended. You can either build it from source or use it on Heroku through the `Tynd `__ beta. +- `MongoDB `__ is a highly scalable NoSQL database which makes for a very fast and reliably persistent at-least-once message broker. Usually available on most PaaS providers. Compatibility ------------- diff --git a/docs/tasks.rst b/docs/tasks.rst index 094913df..b1abdb34 100644 --- a/docs/tasks.rst +++ b/docs/tasks.rst @@ -79,6 +79,10 @@ a single keyword dict named ``q_options``. This enables you to use these keyword Please not that this will override any other option keywords. +.. note:: + For tasks to be processed you will need to have a worker cluster running in the background using ``python manage.py qcluster`` + or you need to configure Django Q to run in synchronous mode for testing using the :ref:`sync` option. + .. _groups: Groups diff --git a/requirements.in b/requirements.in index 476c4a9c..0cc619b9 100644 --- a/requirements.in +++ b/requirements.in @@ -8,3 +8,4 @@ psutil django-redis iron-mq boto3 +pymongo diff --git a/requirements.txt b/requirements.txt index 45bb5cb8..775a24b5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -19,6 +19,7 @@ iron-mq==0.7 jmespath==0.8.0 # via boto3, botocore msgpack-python==0.4.6 # via django-redis psutil==3.2.1 +pymongo==3.0.3 python-dateutil==2.4.2 # via arrow, botocore, iron-core redis==2.10.3 requests==2.7.0 # via iron-core