Skip to content

Commit

Permalink
Merge pull request #64 from Koed00/dev
Browse files Browse the repository at this point in the history
Adds configuration output and other enhancements
  • Loading branch information
Koed00 committed Sep 16, 2015
2 parents 9b7c76b + bd9f7da commit 57169a7
Show file tree
Hide file tree
Showing 19 changed files with 161 additions and 97 deletions.
3 changes: 3 additions & 0 deletions django_q/admin.py
Expand Up @@ -98,6 +98,9 @@ class QueueAdmin(admin.ModelAdmin):
list_display = (
'id',
'key',
'task_id',
'name',
'func',
'lock'
)

Expand Down
1 change: 0 additions & 1 deletion django_q/brokers/__init__.py
Expand Up @@ -7,7 +7,6 @@ def __init__(self, list_key=Conf.PREFIX):
self.connection = self.get_connection(list_key)
self.list_key = list_key
self.cache = self.get_cache()
self.task_cache = []

def enqueue(self, task):
"""
Expand Down
14 changes: 3 additions & 11 deletions django_q/brokers/aws_sqs.py
Expand Up @@ -17,17 +17,9 @@ def dequeue(self):
# sqs supports max 10 messages in bulk
if Conf.BULK > 10:
Conf.BULK = 10
t = None
if len(self.task_cache) > 0:
t = self.task_cache.pop()
else:
tasks = self.queue.receive_messages(MaxNumberOfMessages=Conf.BULK, VisibilityTimeout=Conf.RETRY)
if tasks:
t = tasks.pop()
if tasks:
self.task_cache = tasks
if t:
return t.receipt_handle, t.body
tasks = self.queue.receive_messages(MaxNumberOfMessages=Conf.BULK, VisibilityTimeout=Conf.RETRY)
if tasks:
return [(t.receipt_handle, t.body) for t in tasks]

def acknowledge(self, task_id):
return self.delete(task_id)
Expand Down
10 changes: 1 addition & 9 deletions django_q/brokers/disque.py
Expand Up @@ -11,18 +11,10 @@ def enqueue(self, task):
'ADDJOB {} {} 500 RETRY {}'.format(self.list_key, task, retry)).decode()

def dequeue(self):
t = None
if len(self.task_cache) > 0:
t = self.task_cache.pop()
else:
tasks = self.connection.execute_command(
'GETJOB COUNT {} TIMEOUT 1000 FROM {}'.format(Conf.BULK, self.list_key))
if tasks:
t = tasks.pop()
if tasks:
self.task_cache = tasks
if t:
return t[1].decode(), t[2].decode()
return [(t[1].decode(), t[2].decode()) for t in tasks]

def queue_size(self):
return self.connection.execute_command('QLEN {}'.format(self.list_key))
Expand Down
10 changes: 1 addition & 9 deletions django_q/brokers/ironmq.py
Expand Up @@ -9,18 +9,10 @@ def enqueue(self, task):
return self.connection.post(task)['ids'][0]

def dequeue(self):
t = None
if len(self.task_cache) > 0:
t = self.task_cache.pop()
else:
timeout = Conf.RETRY or None
tasks = self.connection.get(timeout=timeout, wait=1, max=Conf.BULK)['messages']
if tasks:
t = tasks.pop()
if tasks:
self.task_cache = tasks
if t:
return t['id'], t['body']
return [(t['id'], t['body']) for t in tasks]

def ping(self):
return self.connection.name == self.list_key
Expand Down
13 changes: 1 addition & 12 deletions django_q/brokers/orm.py
Expand Up @@ -31,24 +31,13 @@ def enqueue(self, task):
return package.pk

def dequeue(self):
if len(self.task_cache) > 0:
t = self.task_cache.pop()
return t.pk, t.payload
else:
# Get new and timed out tasks
tasks = OrmQ.objects.using(Conf.ORM).filter(
Q(key=self.list_key, lock__isnull=True) |
Q(key=self.list_key, lock__lte=timezone.now() - timedelta(seconds=Conf.RETRY)))[:Conf.BULK]
if tasks:
# lock them
OrmQ.objects.using(Conf.ORM).filter(pk__in=tasks).update(lock=timezone.now())
tasks = [t for t in tasks]
# pop one task
t = tasks.pop()
if tasks:
# add remainder to cache
self.task_cache = [t for t in tasks]
return t.pk, t.payload
return [(t.pk, t.payload) for t in tasks]
# empty queue, spare the cpu
sleep(0.2)

Expand Down
2 changes: 1 addition & 1 deletion django_q/brokers/redis_broker.py
Expand Up @@ -19,7 +19,7 @@ def enqueue(self, task):
def dequeue(self):
task = self.connection.blpop(self.list_key, 1)
if task:
return None, task[1]
return [(None, task[1])]

def queue_size(self):
return self.connection.llen(self.list_key)
Expand Down
29 changes: 15 additions & 14 deletions django_q/cluster.py
Expand Up @@ -171,7 +171,7 @@ def spawn_worker(self):
self.spawn_process(worker, self.task_queue, self.result_queue, Value('f', -1), self.timeout)

def spawn_monitor(self):
return self.spawn_process(monitor, self.result_queue)
return self.spawn_process(monitor, self.result_queue, self.broker)

def reincarnate(self, process):
"""
Expand Down Expand Up @@ -298,23 +298,24 @@ def pusher(task_queue, event, broker=None):
logger.info(_('{} pushing tasks at {}').format(current_process().name, current_process().pid))
while True:
try:
task = broker.dequeue()
task_set = broker.dequeue()
except Exception as e:
logger.error(e)
# broker probably crashed. Let the sentinel handle it.
sleep(10)
break
if task:
ack_id = task[0]
# unpack the task
try:
task = signing.SignedPackage.loads(task[1])
except (TypeError, signing.BadSignature) as e:
logger.error(e)
broker.fail(ack_id)
continue
task['ack_id'] = ack_id
task_queue.put(task)
if task_set:
for task in task_set:
ack_id = task[0]
# unpack the task
try:
task = signing.SignedPackage.loads(task[1])
except (TypeError, signing.BadSignature) as e:
logger.error(e)
broker.fail(ack_id)
continue
task['ack_id'] = ack_id
task_queue.put(task)
logger.debug(_('queueing from {}').format(broker.list_key))
if event.is_set():
break
Expand Down Expand Up @@ -493,7 +494,7 @@ def set_cpu_affinity(n, process_ids, actual=not Conf.TESTING):
"""
Sets the cpu affinity for the supplied processes.
Requires the optional psutil module.
:param int n:
:param int n: affinity
:param list process_ids: a list of pids
:param bool actual: Test workaround for Travis not supporting cpu affinity
"""
Expand Down
18 changes: 17 additions & 1 deletion django_q/management/commands/qinfo.py
@@ -1,12 +1,28 @@
from optparse import make_option
from django.core.management.base import BaseCommand
from django.utils.translation import ugettext as _

from django_q.conf import Conf
from django_q.monitor import info


class Command(BaseCommand):
# Translators: help text for qinfo management command
help = _('General information over all clusters.')

option_list = BaseCommand.option_list + (
make_option('--config',
action='store_true',
dest='config',
default=False,
help='Print current configuration.'),
)

def handle(self, *args, **options):
info()
if options.get('config', False):
hide = ['conf', 'IDLE', 'STOPPING', 'STARTING', 'WORKING', 'SIGNAL_NAMES', 'STOPPED']
settings = [a for a in dir(Conf) if not a.startswith('__') and a not in hide]
for setting in settings:
self.stdout.write('{}: {}'.format(setting, getattr(Conf, setting)))
else:
info()
44 changes: 34 additions & 10 deletions django_q/models.py
@@ -1,6 +1,6 @@
import logging
from django import get_version

from django import get_version
import importlib
from django.core.urlresolvers import reverse
from django.utils.translation import ugettext_lazy as _
Expand All @@ -11,6 +11,8 @@
from picklefield import PickledObjectField
from picklefield.fields import dbsafe_decode

from django_q.signing import SignedPackage


class Task(models.Model):
id = models.CharField(max_length=32, primary_key=True, editable=False)
Expand Down Expand Up @@ -40,19 +42,31 @@ def get_result_group(group_id, failures=False):
values = Task.objects.filter(group=group_id).exclude(success=False).values_list('result', flat=True)
return decode_results(values)

def group_result(self, failures=False):
if self.group:
return self.get_result_group(self.group, failures)

@staticmethod
def get_group_count(group_id, failures=False):
if failures:
return Failure.objects.filter(group=group_id).count()
return Task.objects.filter(group=group_id).count()

def group_count(self, failures=False):
if self.group:
return self.get_group_count(self.group, failures)

@staticmethod
def delete_group(group_id, objects=False):
group = Task.objects.filter(group=group_id)
if objects:
return group.delete()
return group.update(group=None)

def group_delete(self, tasks=False):
if self.group:
return self.delete_group(self.group, tasks)

@staticmethod
def get_task(task_id):
if len(task_id) == 32 and Task.objects.filter(id=task_id).exists():
Expand Down Expand Up @@ -189,14 +203,26 @@ class Meta:


class OrmQ(models.Model):
key = models.CharField(max_length=100)
payload = models.TextField()
lock = models.DateTimeField(null=True)
key = models.CharField(max_length=100)
payload = models.TextField()
lock = models.DateTimeField(null=True)

def task(self):
return SignedPackage.loads(self.payload)

class Meta:
app_label = 'django_q'
verbose_name = _('Queued task')
verbose_name_plural = _('Queued tasks')
def func(self):
return self.task()['func']

def task_id(self):
return self.task()['id']

def name(self):
return self.task()['name']

class Meta:
app_label = 'django_q'
verbose_name = _('Queued task')
verbose_name_plural = _('Queued tasks')


# Backwards compatibility for Django 1.7
Expand All @@ -205,5 +231,3 @@ def decode_results(values):
# decode values in 1.7
return [dbsafe_decode(v) for v in values]
return values


9 changes: 5 additions & 4 deletions django_q/tests/test_admin.py
Expand Up @@ -7,12 +7,13 @@
from django_q.models import Task, Failure, OrmQ
from django_q.humanhash import uuid
from django_q.conf import Conf
from django_q.signing import SignedPackage


@pytest.mark.django_db
def test_admin_views(admin_client):
Conf.ORM='default'
s = schedule('sched.test')
Conf.ORM = 'default'
s = schedule('schedule.test')
tag = uuid()
f = Task.objects.create(
id=tag[1],
Expand All @@ -25,13 +26,13 @@ def test_admin_views(admin_client):
t = Task.objects.create(
id=tag[1],
name=tag[0],
func='test.succes',
func='test.success',
started=timezone.now(),
stopped=timezone.now(),
success=True)
q = OrmQ.objects.create(
key='test',
payload='test')
payload=SignedPackage.dumps({'id': 1, 'func': 'test', 'name': 'test'}))
admin_urls = (
# schedule
reverse('admin:django_q_schedule_changelist'),
Expand Down

0 comments on commit 57169a7

Please sign in to comment.