From 826178c4637720e814da9d20fcd8934c5e8b6c33 Mon Sep 17 00:00:00 2001 From: Ilan Steemers Date: Fri, 25 Sep 2015 20:03:39 +0200 Subject: [PATCH 1/7] Adds a MongoDB broker * adds mongo broker tests * adds mongo brokers docs * broker info is now cached when it needs to connect to a server, to reduce traffic. --- django_q/brokers/__init__.py | 6 ++- django_q/brokers/disque.py | 6 ++- django_q/brokers/mongo.py | 64 ++++++++++++++++++++++++++++++++ django_q/brokers/orm.py | 6 ++- django_q/brokers/redis_broker.py | 8 ++-- django_q/conf.py | 4 ++ django_q/monitor.py | 2 +- django_q/tests/test_brokers.py | 61 ++++++++++++++++++++++++++++++ django_q/tests/test_cluster.py | 1 + docs/brokers.rst | 12 +++++- docs/configure.rst | 29 ++++++++++++++- 11 files changed, 188 insertions(+), 11 deletions(-) create mode 100644 django_q/brokers/mongo.py diff --git a/django_q/brokers/__init__.py b/django_q/brokers/__init__.py index 0d10cacc..959d3927 100644 --- a/django_q/brokers/__init__.py +++ b/django_q/brokers/__init__.py @@ -7,6 +7,7 @@ def __init__(self, list_key=Conf.PREFIX): self.connection = self.get_connection(list_key) self.list_key = list_key self.cache = self.get_cache() + self._info = None def enqueue(self, task): """ @@ -78,7 +79,7 @@ def info(self): """ Shows the broker type """ - pass + return self._info def set_stat(self, key, value, timeout): """ @@ -167,6 +168,9 @@ def get_broker(list_key=Conf.PREFIX): elif Conf.ORM: from brokers import orm return orm.ORM(list_key=list_key) + elif Conf.MONGO: + from brokers import mongo + return mongo.Mongo(list_key=list_key) # default to redis else: from brokers import redis_broker diff --git a/django_q/brokers/disque.py b/django_q/brokers/disque.py index 3027c1ae..1fc4bf33 100644 --- a/django_q/brokers/disque.py +++ b/django_q/brokers/disque.py @@ -40,8 +40,10 @@ def delete_queue(self): return len(jobs) def info(self): - info = self.connection.info('server') - return 'Disque {}'.format(info['disque_version']) + if not self._info: + info = self.connection.info('server') + self._info= 'Disque {}'.format(info['disque_version']) + return self._info @staticmethod def get_connection(list_key=Conf.PREFIX): diff --git a/django_q/brokers/mongo.py b/django_q/brokers/mongo.py new file mode 100644 index 00000000..be510e05 --- /dev/null +++ b/django_q/brokers/mongo.py @@ -0,0 +1,64 @@ +from datetime import timedelta +from time import sleep +from bson import ObjectId + +from django.utils import timezone + +from pymongo import MongoClient + +from django_q.brokers import Broker +from django_q.conf import Conf + + +def _timeout(): + return timezone.now() - timedelta(seconds=Conf.RETRY) + + +class Mongo(Broker): + def __init__(self, list_key=Conf.PREFIX): + super(Mongo, self).__init__(list_key) + self.collection = self.connection[Conf.MONGO_DB][list_key] + + @staticmethod + def get_connection(list_key=Conf.PREFIX): + return MongoClient(**Conf.MONGO) + + def queue_size(self): + return self.collection.count({'lock': {'$lte': _timeout()}}) + + def lock_size(self): + return self.collection.count({'lock': {'$gt': _timeout()}}) + + def purge_queue(self): + return self.delete_queue() + + def ping(self): + return self.info is not None + + def info(self): + if not self._info: + self._info = 'MongoDB {}'.format(self.connection.server_info()['version']) + return self._info + + def fail(self, task_id): + self.delete(task_id) + + def enqueue(self, task): + inserted_id = self.collection.insert_one({'payload': task, 'lock': _timeout()}).inserted_id + return str(inserted_id) + + def dequeue(self): + task = self.collection.find_one_and_update({'lock': {'$lte': _timeout()}}, {'$set': {'lock': timezone.now()}}) + if task: + return [(str(task['_id']), task['payload'])] + # empty queue, spare the cpu + sleep(0.2) + + def delete_queue(self): + return self.collection.drop() + + def delete(self, task_id): + self.collection.delete_one({'_id': ObjectId(task_id)}) + + def acknowledge(self, task_id): + return self.delete(task_id) diff --git a/django_q/brokers/orm.py b/django_q/brokers/orm.py index f3a180b2..f3a39ee2 100644 --- a/django_q/brokers/orm.py +++ b/django_q/brokers/orm.py @@ -21,7 +21,7 @@ def queue_size(self): return self.connection.filter(key=self.list_key, lock__lte=_timeout()).count() def lock_size(self): - return self.connection.filter(key=self.list_key, lock__gte=_timeout()).count() + return self.connection.filter(key=self.list_key, lock__gt=_timeout()).count() def purge_queue(self): return self.connection.filter(key=self.list_key).delete() @@ -30,7 +30,9 @@ def ping(self): return True def info(self): - return 'ORM {}'.format(Conf.ORM) + if not self._info: + self._info = 'ORM {}'.format(Conf.ORM) + return self._info def fail(self, task_id): self.delete(task_id) diff --git a/django_q/brokers/redis_broker.py b/django_q/brokers/redis_broker.py index 6a8cf9a5..4f4f285f 100644 --- a/django_q/brokers/redis_broker.py +++ b/django_q/brokers/redis_broker.py @@ -1,4 +1,5 @@ import redis + from django_q.brokers import Broker from django_q.conf import Conf, logger @@ -9,7 +10,6 @@ class Redis(Broker): - def __init__(self, list_key=Conf.PREFIX): super(Redis, self).__init__(list_key='django_q:{}:q'.format(list_key)) @@ -38,8 +38,10 @@ def ping(self): raise e def info(self): - info = self.connection.info('server') - return 'Redis {}'.format(info['redis_version']) + if not self._info: + info = self.connection.info('server') + self._info = 'Redis {}'.format(info['redis_version']) + return self._info def set_stat(self, key, value, timeout): self.connection.set(key, value, timeout) diff --git a/django_q/conf.py b/django_q/conf.py index b312c4f0..1ce949ca 100644 --- a/django_q/conf.py +++ b/django_q/conf.py @@ -50,6 +50,10 @@ class Conf(object): # ORM broker ORM = conf.get('orm', None) + # MongoDB broker + MONGO = conf.get('mongo', None) + MONGO_DB = conf.get('mongo_db', 'django-q') + # Name of the cluster or site. For when you run multiple sites on one redis server PREFIX = conf.get('name', 'default') diff --git a/django_q/monitor.py b/django_q/monitor.py index 6a3bcf4a..e87b4a83 100644 --- a/django_q/monitor.py +++ b/django_q/monitor.py @@ -86,7 +86,7 @@ def monitor(run_once=False, broker=None): lock_size = broker.lock_size() if lock_size: queue_size = '{}({})'.format(queue_size, lock_size) - print(term.move(i, 0) + term.white_on_cyan(term.center(broker.info(), width=col_width * 2))) + print(term.move(i, 0) + term.white_on_cyan(term.center(broker.info, width=col_width * 2))) print(term.move(i, 2 * col_width) + term.black_on_cyan(term.center(_('Queued'), width=col_width))) print(term.move(i, 3 * col_width) + term.white_on_cyan(term.center(queue_size, width=col_width))) print(term.move(i, 4 * col_width) + term.black_on_cyan(term.center(_('Success'), width=col_width))) diff --git a/django_q/tests/test_brokers.py b/django_q/tests/test_brokers.py index 59514ccf..4c8b79a8 100644 --- a/django_q/tests/test_brokers.py +++ b/django_q/tests/test_brokers.py @@ -285,3 +285,64 @@ def test_orm(): assert broker.queue_size() == 0 # back to django-redis Conf.ORM = None + + +@pytest.mark.django_db +def test_mongo(): + Conf.MONGO = {'host': '127.0.0.1', 'port': 27017} + # check broker + broker = get_broker(list_key='mongo_test') + assert broker.ping() is True + assert broker.info() is not None + # clear before we start + broker.delete_queue() + # enqueue + broker.enqueue('test') + assert broker.queue_size() == 1 + # dequeue + task = broker.dequeue()[0] + assert task[1] == 'test' + broker.acknowledge(task[0]) + assert broker.queue_size() == 0 + # Retry test + Conf.RETRY = 1 + broker.enqueue('test') + assert broker.queue_size() == 1 + broker.dequeue() + assert broker.queue_size() == 0 + sleep(1.5) + assert broker.queue_size() == 1 + task = broker.dequeue()[0] + assert broker.queue_size() == 0 + broker.acknowledge(task[0]) + sleep(1.5) + assert broker.queue_size() == 0 + # delete job + task_id = broker.enqueue('test') + broker.delete(task_id) + assert broker.dequeue() is None + # fail + task_id = broker.enqueue('test') + broker.fail(task_id) + # bulk test + for i in range(5): + broker.enqueue('test') + tasks = [] + for i in range(5): + tasks.append(broker.dequeue()[0]) + assert broker.lock_size() == 5 + for task in tasks: + assert task is not None + broker.acknowledge(task[0]) + # test lock size + assert broker.lock_size() == 0 + # test duplicate acknowledge + broker.acknowledge(task[0]) + # delete queue + broker.enqueue('test') + broker.enqueue('test') + broker.purge_queue() + broker.delete_queue() + assert broker.queue_size() == 0 + # back to django-redis + Conf.ORM = None diff --git a/django_q/tests/test_cluster.py b/django_q/tests/test_cluster.py index b80de4be..37dc79ab 100644 --- a/django_q/tests/test_cluster.py +++ b/django_q/tests/test_cluster.py @@ -33,6 +33,7 @@ def broker(): Conf.IRON_MQ = None Conf.SQS = None Conf.ORM = None + Conf.MONGO = None Conf.DJANGO_REDIS = 'default' return get_broker() diff --git a/docs/brokers.rst b/docs/brokers.rst index 429419ba..e6dffe5b 100644 --- a/docs/brokers.rst +++ b/docs/brokers.rst @@ -2,7 +2,7 @@ Brokers ======= The broker sits between your Django instances and your Django Q cluster instances; accepting, saving and delivering task packages. -Currently we support a variety of brokers from the default Redis, bleeding edge Disque to the convenient Amazon SQS. +Currently we support a variety of brokers from the default Redis, bleeding edge Disque to the convenient ORM and fast MongoBD. The default Redis broker does not support message receipts. This means that in case of a catastrophic failure of the cluster server or worker timeouts, tasks that were being executed get lost. @@ -73,6 +73,16 @@ Although `SQS `__ is not the fastest, it is stable, * Requires the `boto3 `__ client library: ``pip install boto3`` * See the :ref:`sqs_configuration` configuration section for options. + +MongoDB +------- +The + +* Delivery receipts +* Needs Django's `Cache framework `__ configured for monitoring +* Requires the `pymongo `__ driver: ``pip install pymongo`` +* See the :ref:`mongo_configuration` configuration section for options. + .. _orm_broker: Django ORM diff --git a/docs/configure.rst b/docs/configure.rst index 046e189f..e3c84c5b 100644 --- a/docs/configure.rst +++ b/docs/configure.rst @@ -264,6 +264,33 @@ Using the Django ORM backend will also enable the Queued Tasks table in the Admi If you need better performance , you should consider using a different database backend than the main project. Set ``orm`` to the name of that database connection and make sure you run migrations on it using the ``--database`` option. +.. _mongo_configuration: + +mongo +~~~~~ +To use MongoDB as a message broker you simply provide the connection information in a dictionary :: + + # example MongoDB broker connection + + Q_CLUSTER = { + 'name': 'MongoDB', + 'workers': 8, + 'timeout': 60, + 'retry': 70, + 'queue_limit': 100, + 'mongo': { + 'host': '127.0.0.1', + 'port': 27017 + } + } + +The ``mongo`` dictionary can contain any of the parameters exposed by pymongo's`MongoClient`__ + +mongo_db +~~~~~~~~ +When using the MongoDB broker you can optionally provide a database name to use for the queues. +Defaults to ``django-q`` + .. _bulk: bulk @@ -273,7 +300,7 @@ Especially HTTP based or very high latency servers can benefit from bulk dequeue Keep in mind however that settings this too high can degrade performance with multiple clusters or very large task packages. Not supported by the default Redis broker. -Defaults to 1. +Defaults to ``1``. cache ~~~~~ From 863f6f1aa9d54672680dd134e7ba98c936a6ac9a Mon Sep 17 00:00:00 2001 From: Ilan Steemers Date: Fri, 25 Sep 2015 20:05:56 +0200 Subject: [PATCH 2/7] adds mongodb to travis --- .travis.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.travis.yml b/.travis.yml index 25efbf89..42bffc8d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,6 +2,7 @@ language: python services: - redis-server + - mongodb python: - "2.7" From b5842827dd84ee1d21de91356aad41cafe23b130 Mon Sep 17 00:00:00 2001 From: Ilan Steemers Date: Fri, 25 Sep 2015 20:10:30 +0200 Subject: [PATCH 3/7] adds pymongo to requirements for testing --- requirements.in | 1 + requirements.txt | 1 + 2 files changed, 2 insertions(+) diff --git a/requirements.in b/requirements.in index 476c4a9c..0cc619b9 100644 --- a/requirements.in +++ b/requirements.in @@ -8,3 +8,4 @@ psutil django-redis iron-mq boto3 +pymongo diff --git a/requirements.txt b/requirements.txt index 45bb5cb8..775a24b5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -19,6 +19,7 @@ iron-mq==0.7 jmespath==0.8.0 # via boto3, botocore msgpack-python==0.4.6 # via django-redis psutil==3.2.1 +pymongo==3.0.3 python-dateutil==2.4.2 # via arrow, botocore, iron-core redis==2.10.3 requests==2.7.0 # via iron-core From 1fd9ca2441bfc48db275dc33f1ed6db77a0a4d65 Mon Sep 17 00:00:00 2001 From: Ilan Steemers Date: Fri, 25 Sep 2015 20:57:47 +0200 Subject: [PATCH 4/7] docs: mongodb broker --- README.rst | 3 ++- docs/brokers.rst | 5 ++++- docs/configure.rst | 4 ++-- docs/index.rst | 2 +- docs/install.rst | 5 +++++ docs/tasks.rst | 4 ++++ 6 files changed, 18 insertions(+), 5 deletions(-) diff --git a/README.rst b/README.rst index b6ebb921..eef65947 100644 --- a/README.rst +++ b/README.rst @@ -20,7 +20,7 @@ Features - Django Admin integration - PaaS compatible with multiple instances - Multi cluster monitor -- Redis, Disque, IronMQ, SQS or ORM +- Redis, Disque, IronMQ, SQS, MongoDB or ORM - Python 2 and 3 Requirements @@ -39,6 +39,7 @@ Brokers - `Disque `__ - `IronMQ `__ - `Amazon SQS `__ +- `MongoDB `__ - `Django ORM `__ Installation diff --git a/docs/brokers.rst b/docs/brokers.rst index e6dffe5b..8bf4db60 100644 --- a/docs/brokers.rst +++ b/docs/brokers.rst @@ -76,10 +76,12 @@ Although `SQS `__ is not the fastest, it is stable, MongoDB ------- -The +This highly scalable NoSQL database makes for a very fast and reliably persistent at-least-once message broker. +Usually available on most PaaS providers, as `open-source `__ or commercial `enterprise `__ edition. * Delivery receipts * Needs Django's `Cache framework `__ configured for monitoring +* Can be configured as the Django cache-backend through several open-source cache providers. * Requires the `pymongo `__ driver: ``pip install pymongo`` * See the :ref:`mongo_configuration` configuration section for options. @@ -94,6 +96,7 @@ However for a medium message rate and scheduled tasks, this is the most convenie * Delivery receipts * Supports bulk dequeue * Needs Django's `Cache framework `__ configured for monitoring +* Can be `configured `__ as its own cache backend. * Queue editable in Django Admin * See the :ref:`orm_configuration` configuration on how to set it up. diff --git a/docs/configure.rst b/docs/configure.rst index e3c84c5b..50024ddd 100644 --- a/docs/configure.rst +++ b/docs/configure.rst @@ -268,7 +268,7 @@ Set ``orm`` to the name of that database connection and make sure you run migrat mongo ~~~~~ -To use MongoDB as a message broker you simply provide the connection information in a dictionary :: +To use MongoDB as a message broker you simply provide the connection information in a dictionary:: # example MongoDB broker connection @@ -284,7 +284,7 @@ To use MongoDB as a message broker you simply provide the connection information } } -The ``mongo`` dictionary can contain any of the parameters exposed by pymongo's`MongoClient`__ +The ``mongo`` dictionary can contain any of the parameters exposed by pymongo's `MongoClient `__ mongo_db ~~~~~~~~ diff --git a/docs/index.rst b/docs/index.rst index ddb4aac5..b46ba00a 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -20,7 +20,7 @@ Features - Django Admin integration - PaaS compatible with multiple instances - Multi cluster monitor -- Redis, Disque, IronMQ, SQS or ORM +- Redis, Disque, IronMQ, SQS, MongoDB or ORM - Python 2 and 3 diff --git a/docs/install.rst b/docs/install.rst index 9fb07975..e3b4b386 100644 --- a/docs/install.rst +++ b/docs/install.rst @@ -68,10 +68,15 @@ Optional $ pip install iron-mq +- `Pymongo `__ is needed if you want to use MongoDB as a message broker:: + + $ pip install pymongo + - `Redis `__ server is the default broker for Django Q. It provides the best performance and does not require Django's cache framework for monitoring. - `Disque `__ server is based on Redis by the same author, but focuses on reliable queues. Currently in Alpha, but highly recommended. You can either build it from source or use it on Heroku through the `Tynd `__ beta. +- `MongoDB `__ is a highly scalable NoSQL database which makes for a very fast and reliably persistent at-least-once message broker. Usually available on most PaaS providers. Compatibility ------------- diff --git a/docs/tasks.rst b/docs/tasks.rst index 094913df..b1abdb34 100644 --- a/docs/tasks.rst +++ b/docs/tasks.rst @@ -79,6 +79,10 @@ a single keyword dict named ``q_options``. This enables you to use these keyword Please not that this will override any other option keywords. +.. note:: + For tasks to be processed you will need to have a worker cluster running in the background using ``python manage.py qcluster`` + or you need to configure Django Q to run in synchronous mode for testing using the :ref:`sync` option. + .. _groups: Groups From 8bdbcdaf55f8575ce9a583d478901a209e6810ca Mon Sep 17 00:00:00 2001 From: Ilan Steemers Date: Fri, 25 Sep 2015 21:27:33 +0200 Subject: [PATCH 5/7] revert to info method --- django_q/monitor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/django_q/monitor.py b/django_q/monitor.py index e87b4a83..6a3bcf4a 100644 --- a/django_q/monitor.py +++ b/django_q/monitor.py @@ -86,7 +86,7 @@ def monitor(run_once=False, broker=None): lock_size = broker.lock_size() if lock_size: queue_size = '{}({})'.format(queue_size, lock_size) - print(term.move(i, 0) + term.white_on_cyan(term.center(broker.info, width=col_width * 2))) + print(term.move(i, 0) + term.white_on_cyan(term.center(broker.info(), width=col_width * 2))) print(term.move(i, 2 * col_width) + term.black_on_cyan(term.center(_('Queued'), width=col_width))) print(term.move(i, 3 * col_width) + term.white_on_cyan(term.center(queue_size, width=col_width))) print(term.move(i, 4 * col_width) + term.black_on_cyan(term.center(_('Success'), width=col_width))) From b8bb186471eff6837d11a86c57908dd1077dc64d Mon Sep 17 00:00:00 2001 From: Ilan Steemers Date: Sat, 26 Sep 2015 13:01:15 +0200 Subject: [PATCH 6/7] check for default database on mongo connection --- django_q/brokers/mongo.py | 15 ++++++++++++--- django_q/conf.py | 2 +- docs/configure.rst | 3 ++- 3 files changed, 15 insertions(+), 5 deletions(-) diff --git a/django_q/brokers/mongo.py b/django_q/brokers/mongo.py index be510e05..73020fb3 100644 --- a/django_q/brokers/mongo.py +++ b/django_q/brokers/mongo.py @@ -1,11 +1,12 @@ from datetime import timedelta from time import sleep -from bson import ObjectId +from bson import ObjectId from django.utils import timezone - from pymongo import MongoClient +from pymongo.errors import ConfigurationError + from django_q.brokers import Broker from django_q.conf import Conf @@ -17,12 +18,20 @@ def _timeout(): class Mongo(Broker): def __init__(self, list_key=Conf.PREFIX): super(Mongo, self).__init__(list_key) - self.collection = self.connection[Conf.MONGO_DB][list_key] + self.collection = self.get_collection() @staticmethod def get_connection(list_key=Conf.PREFIX): return MongoClient(**Conf.MONGO) + def get_collection(self): + if not Conf.MONGO_DB: + try: + Conf.MONGO_DB = self.connection.get_default_database()[1] + except ConfigurationError: + Conf.MONGO_DB = 'django-q' + return self.connection[Conf.MONGO_DB][self.list_key] + def queue_size(self): return self.collection.count({'lock': {'$lte': _timeout()}}) diff --git a/django_q/conf.py b/django_q/conf.py index 1ce949ca..9ca2146c 100644 --- a/django_q/conf.py +++ b/django_q/conf.py @@ -52,7 +52,7 @@ class Conf(object): # MongoDB broker MONGO = conf.get('mongo', None) - MONGO_DB = conf.get('mongo_db', 'django-q') + MONGO_DB = conf.get('mongo_db', None) # Name of the cluster or site. For when you run multiple sites on one redis server PREFIX = conf.get('name', 'default') diff --git a/docs/configure.rst b/docs/configure.rst index 50024ddd..8a3d17e1 100644 --- a/docs/configure.rst +++ b/docs/configure.rst @@ -285,11 +285,12 @@ To use MongoDB as a message broker you simply provide the connection information } The ``mongo`` dictionary can contain any of the parameters exposed by pymongo's `MongoClient `__ +If you want to use a mongodb uri, you can supply it as the ``host`` parameter. mongo_db ~~~~~~~~ When using the MongoDB broker you can optionally provide a database name to use for the queues. -Defaults to ``django-q`` +Defaults to default database if available, otherwise ``django-q`` .. _bulk: From 08104ef7774a6155990cc2ff30f8c6b5c9b5ddc7 Mon Sep 17 00:00:00 2001 From: Ilan Steemers Date: Sat, 26 Sep 2015 13:13:25 +0200 Subject: [PATCH 7/7] check for default database on mongo connection correction --- django_q/brokers/mongo.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/django_q/brokers/mongo.py b/django_q/brokers/mongo.py index 73020fb3..dfce5bad 100644 --- a/django_q/brokers/mongo.py +++ b/django_q/brokers/mongo.py @@ -27,7 +27,7 @@ def get_connection(list_key=Conf.PREFIX): def get_collection(self): if not Conf.MONGO_DB: try: - Conf.MONGO_DB = self.connection.get_default_database()[1] + Conf.MONGO_DB = self.connection.get_default_database().name except ConfigurationError: Conf.MONGO_DB = 'django-q' return self.connection[Conf.MONGO_DB][self.list_key]