Skip to content

Commit

Permalink
Merge branch 'orm'
Browse files Browse the repository at this point in the history
  • Loading branch information
Koed00 committed Sep 14, 2015
2 parents dfe1b10 + ff5531a commit a9f73bf
Show file tree
Hide file tree
Showing 17 changed files with 257 additions and 18 deletions.
12 changes: 6 additions & 6 deletions README.rst
Expand Up @@ -20,7 +20,7 @@ Features
- Django Admin integration
- PaaS compatible with multiple instances
- Multi cluster monitor
- Redis, Disque, IronMQ or SQS
- Redis, Disque, IronMQ, SQS or ORM
- Python 2 and 3

Requirements
Expand All @@ -35,11 +35,11 @@ Tested with: Python 2.7 & 3.4. Django 1.7.10 & 1.8.4

Brokers
~~~~~~~
- `Redis <http://redis.io/>`__
- `Disque <https://github.com/antirez/disque>`__
- `IronMQ <http://www.iron.io/mq/>`__
- `Amazon SQS <https://aws.amazon.com/sqs/>`__

- `Redis <https://django-q.readthedocs.org/en/latest/brokers.html#redis>`__
- `Disque <https://django-q.readthedocs.org/en/latest/brokers.html#disque>`__
- `IronMQ <https://django-q.readthedocs.org/en/latest/brokers.html#ironmq>`__
- `Amazon SQS <https://django-q.readthedocs.org/en/latest/brokers.html#amazon-sqs>`__
- `Django ORM <https://django-q.readthedocs.org/en/latest/brokers.html#django-orm>`__

Installation
~~~~~~~~~~~~
Expand Down
27 changes: 20 additions & 7 deletions django_q/admin.py
Expand Up @@ -3,11 +3,11 @@
from django.utils.translation import ugettext_lazy as _

from .tasks import async
from .models import Success, Failure, Schedule
from .models import Success, Failure, Schedule, OrmQ
from .conf import Conf


class TaskAdmin(admin.ModelAdmin):

"""model admin for success tasks."""

list_display = (
Expand All @@ -34,8 +34,8 @@ def get_queryset(self, request):

def get_readonly_fields(self, request, obj=None):
"""Set all fields readonly."""
return list(self.readonly_fields) +\
[field.name for field in obj._meta.fields]
return list(self.readonly_fields) + \
[field.name for field in obj._meta.fields]


def retry_failed(FailAdmin, request, queryset):
Expand All @@ -49,7 +49,6 @@ def retry_failed(FailAdmin, request, queryset):


class FailAdmin(admin.ModelAdmin):

"""model admin for failed tasks."""

list_display = (
Expand All @@ -72,11 +71,10 @@ def has_add_permission(self, request, obj=None):
def get_readonly_fields(self, request, obj=None):
"""Set all fields readonly."""
return list(self.readonly_fields) + \
[field.name for field in obj._meta.fields]
[field.name for field in obj._meta.fields]


class ScheduleAdmin(admin.ModelAdmin):

""" model admin for schedules """

list_display = (
Expand All @@ -95,6 +93,21 @@ class ScheduleAdmin(admin.ModelAdmin):
list_display_links = ('id', 'name')


class QueueAdmin(admin.ModelAdmin):
""" queue admin for ORM broker """
list_display = (
'id',
'key',
'lock'
)

def has_add_permission(self, request, obj=None):
"""Don't allow adds."""
return False

admin.site.register(Schedule, ScheduleAdmin)
admin.site.register(Success, TaskAdmin)
admin.site.register(Failure, FailAdmin)

if Conf.ORM or Conf.TESTING:
admin.site.register(OrmQ, QueueAdmin)
3 changes: 3 additions & 0 deletions django_q/brokers/__init__.py
Expand Up @@ -160,6 +160,9 @@ def get_broker(list_key=Conf.PREFIX):
elif Conf.SQS:
from brokers import aws_sqs
return aws_sqs.Sqs(list_key=list_key)
elif Conf.ORM:
from brokers import orm
return orm.ORM(list_key=list_key)
# default to redis
else:
from brokers import redis_broker
Expand Down
62 changes: 62 additions & 0 deletions django_q/brokers/orm.py
@@ -0,0 +1,62 @@
from datetime import timedelta
from time import sleep
from django.utils import timezone
from django.db.models import Q
from django_q.brokers import Broker
from django_q.models import OrmQ
from django_q.conf import Conf


class ORM(Broker):
def queue_size(self):
return OrmQ.objects.using(Conf.ORM) \
.filter(Q(key=self.list_key, lock__isnull=True) |
Q(key=self.list_key, lock__lte=timezone.now() - timedelta(seconds=Conf.RETRY))) \
.count()

def purge_queue(self):
return OrmQ.objects.using(Conf.ORM).filter(key=self.list_key).delete()

def ping(self):
return True

def info(self):
return 'ORM {}'.format(Conf.ORM)

def fail(self, task_id):
self.delete(task_id)

def enqueue(self, task):
package = OrmQ.objects.using(Conf.ORM).create(key=self.list_key, payload=task)
return package.pk

def dequeue(self):
if len(self.task_cache) > 0:
t = self.task_cache.pop()
return t.pk, t.payload
else:
# Get new and timed out tasks
tasks = OrmQ.objects.using(Conf.ORM).filter(
Q(key=self.list_key, lock__isnull=True) |
Q(key=self.list_key, lock__lte=timezone.now() - timedelta(seconds=Conf.RETRY)))[:Conf.BULK]
if tasks:
# lock them
OrmQ.objects.using(Conf.ORM).filter(pk__in=tasks).update(lock=timezone.now())
tasks = [t for t in tasks]
# pop one task
t = tasks.pop()
if tasks:
# add remainder to cache
self.task_cache = [t for t in tasks]
return t.pk, t.payload
# empty queue, spare the cpu
sleep(0.2)

def delete_queue(self):
return self.purge_queue()

def delete(self, task_id):
return OrmQ.objects.using(Conf.ORM).filter(pk=task_id).delete()

def acknowledge(self, task_id):
return self.delete(task_id)
2 changes: 1 addition & 1 deletion django_q/cluster.py
Expand Up @@ -238,7 +238,7 @@ def guard(self):
self.reincarnate(self.pusher)
# Call scheduler once a minute (or so)
counter += cycle
if counter == 30:
if counter == 30 and Conf.SCHEDULER:
counter = 0
scheduler(broker=self.broker)
# Save current status
Expand Down
6 changes: 6 additions & 0 deletions django_q/conf.py
Expand Up @@ -47,6 +47,9 @@ class Conf(object):
# SQS broker
SQS = conf.get('sqs', None)

# ORM broker
ORM = conf.get('orm', None)

# Name of the cluster or site. For when you run multiple sites on one redis server
PREFIX = conf.get('name', 'default')

Expand All @@ -57,6 +60,9 @@ class Conf(object):
# Failures are always saved
SAVE_LIMIT = conf.get('save_limit', 250)

# Disable the scheduler
SCHEDULER = conf.get('scheduler', True)

# Number of workers in the pool. Default is cpu count if implemented, otherwise 4.
WORKERS = conf.get('workers', False)
if not WORKERS:
Expand Down
27 changes: 27 additions & 0 deletions django_q/migrations/0007_ormq.py
@@ -0,0 +1,27 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals

from django.db import models, migrations


class Migration(migrations.Migration):

dependencies = [
('django_q', '0006_auto_20150805_1817'),
]

operations = [
migrations.CreateModel(
name='OrmQ',
fields=[
('id', models.AutoField(primary_key=True, auto_created=True, verbose_name='ID', serialize=False)),
('key', models.CharField(max_length=100)),
('payload', models.TextField()),
('lock', models.DateTimeField(null=True)),
],
options={
'verbose_name_plural': 'Queued tasks',
'verbose_name': 'Queued task',
},
),
]
11 changes: 11 additions & 0 deletions django_q/models.py
Expand Up @@ -188,6 +188,17 @@ class Meta:
ordering = ['next_run']


class OrmQ(models.Model):
key = models.CharField(max_length=100)
payload = models.TextField()
lock = models.DateTimeField(null=True)

class Meta:
app_label = 'django_q'
verbose_name = _('Queued task')
verbose_name_plural = _('Queued tasks')


# Backwards compatibility for Django 1.7
def decode_results(values):
if get_version().split('.')[1] == '7':
Expand Down
15 changes: 14 additions & 1 deletion django_q/tests/test_admin.py
Expand Up @@ -4,12 +4,14 @@
import pytest

from django_q.tasks import schedule
from django_q.models import Task, Failure
from django_q.models import Task, Failure, OrmQ
from django_q.humanhash import uuid
from django_q.conf import Conf


@pytest.mark.django_db
def test_admin_views(admin_client):
Conf.ORM='default'
s = schedule('sched.test')
tag = uuid()
f = Task.objects.create(
Expand All @@ -27,6 +29,9 @@ def test_admin_views(admin_client):
started=timezone.now(),
stopped=timezone.now(),
success=True)
q = OrmQ.objects.create(
key='test',
payload='test')
admin_urls = (
# schedule
reverse('admin:django_q_schedule_changelist'),
Expand All @@ -44,6 +49,11 @@ def test_admin_views(admin_client):
reverse('admin:django_q_failure_change', args=(f.id,)),
reverse('admin:django_q_failure_history', args=(f.id,)),
reverse('admin:django_q_failure_delete', args=(f.id,)),
# orm queue
reverse('admin:django_q_ormq_changelist'),
reverse('admin:django_q_ormq_change', args=(q.id,)),
reverse('admin:django_q_ormq_history', args=(q.id,)),
reverse('admin:django_q_ormq_delete', args=(q.id,)),

)
for url in admin_urls:
Expand All @@ -57,3 +67,6 @@ def test_admin_views(admin_client):
response = admin_client.post(url, data)
assert response.status_code == 302
assert Failure.objects.filter(name=f.id).exists() is False
# cleanup
q.delete()
Conf.ORM = None
55 changes: 55 additions & 0 deletions django_q/tests/test_brokers.py
Expand Up @@ -215,3 +215,58 @@ def test_sqs():
Conf.SQS = None
Conf.BULK = 1
Conf.DJANGO_REDIS = 'default'

@pytest.mark.django_db
def test_orm():
Conf.ORM = 'default'
# check broker
broker = get_broker(list_key='orm_test')
assert broker.ping() is True
assert broker.info() is not None
# clear before we start
broker.delete_queue()
# enqueue
broker.enqueue('test')
assert broker.queue_size() == 1
# dequeue
task = broker.dequeue()
assert task[1] == 'test'
broker.acknowledge(task[0])
assert broker.queue_size() == 0
# Retry test
Conf.RETRY = 1
broker.enqueue('test')
assert broker.queue_size() == 1
broker.dequeue()
assert broker.queue_size() == 0
sleep(1.5)
assert broker.queue_size() == 1
task = broker.dequeue()
assert broker.queue_size() == 0
broker.acknowledge(task[0])
sleep(1.5)
assert broker.queue_size() == 0
# delete job
task_id = broker.enqueue('test')
broker.delete(task_id)
assert broker.dequeue() is None
# fail
task_id = broker.enqueue('test')
broker.fail(task_id)
# bulk test
for i in range(5):
broker.enqueue('test')
Conf.BULK = 5
for i in range(5):
task = broker.dequeue()
assert task is not None
broker.acknowledge(task[0])
# test duplicate acknowledge
broker.acknowledge(task[0])
# delete queue
broker.enqueue('test')
broker.enqueue('test')
broker.delete_queue()
assert broker.queue_size() == 0
# back to django-redis
Conf.ORM = None
1 change: 1 addition & 0 deletions django_q/tests/test_cluster.py
Expand Up @@ -32,6 +32,7 @@ def broker():
Conf.DISQUE_NODES = None
Conf.IRON_MQ = None
Conf.SQS = None
Conf.ORM = None
Conf.DJANGO_REDIS = 'default'
return get_broker()

Expand Down
1 change: 1 addition & 0 deletions django_q/tests/test_scheduler.py
Expand Up @@ -17,6 +17,7 @@ def broker():
Conf.DISQUE_NODES = None
Conf.IRON_MQ = None
Conf.SQS = None
Conf.ORM = None
Conf.DJANGO_REDIS = 'default'
return get_broker()

Expand Down
6 changes: 6 additions & 0 deletions docs/admin.rst
Expand Up @@ -81,3 +81,9 @@ Indicates the success status of the last scheduled task, if any.


Uses the :class:`Schedule` model

Queued tasks
------------
This admin view is only enabled when you use the :ref:`orm_broker` broker.
It shows all tasks packages currently in the broker queue. The ``lock`` column shows the moment at which this package was picked up by the cluster and is used to determine whether it has expired or not.
For development purposes you can edit and delete queued tasks from here.
13 changes: 13 additions & 0 deletions docs/brokers.rst
Expand Up @@ -73,6 +73,19 @@ Although `SQS <https://aws.amazon.com/sqs/>`__ is not the fastest, it is stable,
* Requires the `boto3 <https://github.com/boto/boto3>`__ client library: ``pip install boto3``
* See the :ref:`sqs_configuration` configuration section for options.

.. _orm_broker:

Django ORM
----------
Select this to use Django's database backend as a message broker.
Unless you have configured a dedicated database backend for it, this should probably not be your first choice for a high traffic setup.
However for a medium message rate and scheduled tasks, this is the most convenient guaranteed delivery broker.

* Delivery receipts
* Supports bulk dequeue
* Needs Django's `Cache framework <https://docs.djangoproject.com/en/1.8/topics/cache/#setting-up-the-cache>`__ configured for monitoring
* Queue editable in Django Admin
* See the :ref:`orm_configuration` configuration on how to set it up.

Reference
---------
Expand Down

0 comments on commit a9f73bf

Please sign in to comment.