Skip to content

Commit

Permalink
Merge pull request #35 from Koed00/dev
Browse files Browse the repository at this point in the history
closes old db connections on monitor and worker spawn
  • Loading branch information
Koed00 committed Jul 27, 2015
2 parents 876be41 + a13b959 commit f327563
Showing 1 changed file with 64 additions and 58 deletions.
122 changes: 64 additions & 58 deletions django_q/cluster.py
Expand Up @@ -31,6 +31,7 @@
# Django
from django.utils import timezone
from django.utils.translation import ugettext_lazy as _
from django import db

# Local
import signing
Expand Down Expand Up @@ -328,6 +329,7 @@ def monitor(result_queue):
"""
name = current_process().name
logger.info(_("{} monitoring at {}").format(name, current_process().pid))
db.close_old_connections()
for task in iter(result_queue.get, 'STOP'):
save_task(task)
if task['success']:
Expand All @@ -346,6 +348,7 @@ def worker(task_queue, result_queue, timer, timeout=Conf.TIMEOUT):
"""
name = current_process().name
logger.info(_('{} ready for work at {}').format(name, current_process().pid))
db.close_old_connections()
task_count = 0
# Start reading the task queue
for pack in iter(task_queue.get, 'STOP'):
Expand Down Expand Up @@ -399,9 +402,9 @@ def save_task(task):
if not task.get('save', Conf.SAVE_LIMIT > 0) and task['success']:
return
# SAVE LIMIT > 0: Prune database, SAVE_LIMIT 0: No pruning
if task['success'] and 0 < Conf.SAVE_LIMIT < Success.objects.count():
Success.objects.last().delete()
try:
if task['success'] and 0 < Conf.SAVE_LIMIT < Success.objects.count():
Success.objects.last().delete()
Task.objects.create(id=task['id'],
name=task['name'],
func=task['func'],
Expand All @@ -421,62 +424,65 @@ def scheduler(list_key=Conf.Q_LIST):
"""
Creates a task from a schedule at the scheduled time and schedules next run
"""
for s in Schedule.objects.exclude(repeats=0).filter(next_run__lt=timezone.now()):
args = ()
kwargs = {}
# get args, kwargs and hook
if s.kwargs:
try:
# eval should be safe here cause dict()
kwargs = eval('dict({})'.format(s.kwargs))
except SyntaxError:
kwargs = {}
if s.args:
args = ast.literal_eval(s.args)
# single value won't eval to tuple, so:
if type(args) != tuple:
args = (args,)
q_options = kwargs.get('q_options', {})
if s.hook:
q_options['hook'] = s.hook
# set up the next run time
if not s.schedule_type == s.ONCE:
next_run = arrow.get(s.next_run)
if s.schedule_type == s.HOURLY:
next_run = next_run.replace(hours=+1)
elif s.schedule_type == s.DAILY:
next_run = next_run.replace(days=+1)
elif s.schedule_type == s.WEEKLY:
next_run = next_run.replace(weeks=+1)
elif s.schedule_type == s.MONTHLY:
next_run = next_run.replace(months=+1)
elif s.schedule_type == s.QUARTERLY:
next_run = next_run.replace(months=+3)
elif s.schedule_type == s.YEARLY:
next_run = next_run.replace(years=+1)
s.next_run = next_run.datetime
s.repeats += -1
# send it to the cluster
q_options['list_key'] = list_key
q_options['group'] = s.name or s.id
kwargs['q_options'] = q_options
s.task = tasks.async(s.func, *args, **kwargs)
# log it
if not s.task:
logger.error(
_('{} failed to create a task from schedule [{}]').format(current_process().name, s.name or s.id))
else:
logger.info(
_('{} created a task from schedule [{}]').format(current_process().name, s.name or s.id))
# default behavior is to delete a ONCE schedule
if s.schedule_type == s.ONCE:
if s.repeats < 0:
s.delete()
return
# but not if it has a positive repeats
s.repeats = 0
# save the schedule
s.save()
try:
for s in Schedule.objects.exclude(repeats=0).filter(next_run__lt=timezone.now()):
args = ()
kwargs = {}
# get args, kwargs and hook
if s.kwargs:
try:
# eval should be safe here cause dict()
kwargs = eval('dict({})'.format(s.kwargs))
except SyntaxError:
kwargs = {}
if s.args:
args = ast.literal_eval(s.args)
# single value won't eval to tuple, so:
if type(args) != tuple:
args = (args,)
q_options = kwargs.get('q_options', {})
if s.hook:
q_options['hook'] = s.hook
# set up the next run time
if not s.schedule_type == s.ONCE:
next_run = arrow.get(s.next_run)
if s.schedule_type == s.HOURLY:
next_run = next_run.replace(hours=+1)
elif s.schedule_type == s.DAILY:
next_run = next_run.replace(days=+1)
elif s.schedule_type == s.WEEKLY:
next_run = next_run.replace(weeks=+1)
elif s.schedule_type == s.MONTHLY:
next_run = next_run.replace(months=+1)
elif s.schedule_type == s.QUARTERLY:
next_run = next_run.replace(months=+3)
elif s.schedule_type == s.YEARLY:
next_run = next_run.replace(years=+1)
s.next_run = next_run.datetime
s.repeats += -1
# send it to the cluster
q_options['list_key'] = list_key
q_options['group'] = s.name or s.id
kwargs['q_options'] = q_options
s.task = tasks.async(s.func, *args, **kwargs)
# log it
if not s.task:
logger.error(
_('{} failed to create a task from schedule [{}]').format(current_process().name, s.name or s.id))
else:
logger.info(
_('{} created a task from schedule [{}]').format(current_process().name, s.name or s.id))
# default behavior is to delete a ONCE schedule
if s.schedule_type == s.ONCE:
if s.repeats < 0:
s.delete()
return
# but not if it has a positive repeats
s.repeats = 0
# save the schedule
s.save()
except Exception as e:
logger.error(e)


def set_cpu_affinity(n, process_ids, actual=not Conf.TESTING):
Expand Down

0 comments on commit f327563

Please sign in to comment.