Skip to content

Commit

Permalink
Merge pull request #48 from Koed00/dev
Browse files Browse the repository at this point in the history
adds `catch_up` configuration option
  • Loading branch information
Koed00 committed Aug 18, 2015
2 parents 5158d31 + b99e16f commit 97a3fc9
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 18 deletions.
34 changes: 18 additions & 16 deletions django_q/cluster.py
Expand Up @@ -436,20 +436,23 @@ def scheduler(list_key=Conf.Q_LIST):
# set up the next run time
if not s.schedule_type == s.ONCE:
next_run = arrow.get(s.next_run)
if s.schedule_type == s.MINUTES:
next_run = next_run.replace(minutes=+(s.minutes or 1))
elif s.schedule_type == s.HOURLY:
next_run = next_run.replace(hours=+1)
elif s.schedule_type == s.DAILY:
next_run = next_run.replace(days=+1)
elif s.schedule_type == s.WEEKLY:
next_run = next_run.replace(weeks=+1)
elif s.schedule_type == s.MONTHLY:
next_run = next_run.replace(months=+1)
elif s.schedule_type == s.QUARTERLY:
next_run = next_run.replace(months=+3)
elif s.schedule_type == s.YEARLY:
next_run = next_run.replace(years=+1)
while True:
if s.schedule_type == s.MINUTES:
next_run = next_run.replace(minutes=+(s.minutes or 1))
elif s.schedule_type == s.HOURLY:
next_run = next_run.replace(hours=+1)
elif s.schedule_type == s.DAILY:
next_run = next_run.replace(days=+1)
elif s.schedule_type == s.WEEKLY:
next_run = next_run.replace(weeks=+1)
elif s.schedule_type == s.MONTHLY:
next_run = next_run.replace(months=+1)
elif s.schedule_type == s.QUARTERLY:
next_run = next_run.replace(months=+3)
elif s.schedule_type == s.YEARLY:
next_run = next_run.replace(years=+1)
if Conf.CATCH_UP or next_run > arrow.utcnow():
break
s.next_run = next_run.datetime
s.repeats += -1
# send it to the cluster
Expand All @@ -467,8 +470,7 @@ def scheduler(list_key=Conf.Q_LIST):
# default behavior is to delete a ONCE schedule
if s.schedule_type == s.ONCE:
if s.repeats < 0:
s.delete()
return
return s.delete()
# but not if it has a positive repeats
s.repeats = 0
# save the schedule
Expand Down
4 changes: 4 additions & 0 deletions django_q/conf.py
Expand Up @@ -77,6 +77,10 @@ class Conf(object):
# Global sync option to for debugging
SYNC = conf.get('sync', False)

# If set to False the scheduler won't execute tasks in the past.
# Instead it will reschedule the next run in the future. Defaults to True.
CATCH_UP = conf.get('catch_up', True)

# Use the secret key for package signing
# Django itself should raise an error if it's not configured
SECRET_KEY = settings.SECRET_KEY
Expand Down
26 changes: 24 additions & 2 deletions django_q/tests/test_scheduler.py
@@ -1,10 +1,12 @@
from datetime import timedelta
from multiprocessing import Queue, Event, Value

import pytest
import arrow

from django.utils import timezone

from django_q.conf import redis_client
from django_q.conf import redis_client, Conf
from django_q.cluster import pusher, worker, monitor, scheduler
from django_q.tasks import Schedule, fetch, schedule as create_schedule, queue_size

Expand Down Expand Up @@ -34,7 +36,7 @@ def test_scheduler(r):
# push it
pusher(task_queue, stop_event, list_key=list_key, r=r)
assert task_queue.qsize() == 1
assert queue_size(list_key=list_key,r=r) == 0
assert queue_size(list_key=list_key, r=r) == 0
task_queue.put('STOP')
# let a worker handle them
result_queue = Queue()
Expand Down Expand Up @@ -96,7 +98,27 @@ def test_scheduler(r):
kwargs='word="django"',
schedule_type=Schedule.DAILY
)
# scheduler
scheduler(list_key=list_key)
# ONCE schedule should be deleted
assert Schedule.objects.filter(pk=once_schedule.pk).exists() is False
# Catch up On
Conf.CATCH_UP = True
now = timezone.now()
schedule = create_schedule('django_q.tests.tasks.word_multiply',
2,
word='catch_up',
schedule_type=Schedule.HOURLY,
next_run=timezone.now() - timedelta(hours=12),
repeats=-1
)
scheduler(list_key=list_key)
schedule = Schedule.objects.get(pk=schedule.pk)
assert schedule.next_run < now
# Catch up off
Conf.CATCH_UP = False
scheduler(list_key=list_key)
schedule = Schedule.objects.get(pk=schedule.pk)
assert schedule.next_run > now
# Done
r.delete(list_key)

0 comments on commit 97a3fc9

Please sign in to comment.