Skip to content

Commit

Permalink
Added schedule command
Browse files Browse the repository at this point in the history
 * improved schedule tests
 * updated README
  * bumped up to version 0.1.3
  • Loading branch information
Koed00 committed Jun 30, 2015
1 parent e935665 commit 4096b35
Show file tree
Hide file tree
Showing 8 changed files with 119 additions and 39 deletions.
29 changes: 26 additions & 3 deletions README.rst
Expand Up @@ -88,13 +88,13 @@ All configuration settings are optional. e.g:
- **label** The label used for the Django Admin page *['Django Q']*

- **redis** Connection settings for Redis. Follows standard Redis-Py syntax. *[localhost]*
*[defaults]*


\*\ *Django Q uses your SECRET\_KEY to encrypt task packages and prevent
task crossover*

Management Commands
~~~~~~~~~~~~~~~~~~
~~~~~~~~~~~~~~~~~~~

qcluster
^^^^^^^^
Expand Down Expand Up @@ -150,14 +150,37 @@ Admin page or directly from your code:

.. code:: python
from django_q import Schedule
from django_q import Schedule, schedule
# Use the schedule wrapper
schedule('math.copysign',
2, -2,
hook='hooks.print_result',
schedule_type=Schedule.DAILY)
# Or create the object directly
Schedule.objects.create(func='math.copysign',
hook='hooks.print_result',
args='2,-2',
schedule_type=Schedule.DAILY
)
Models
~~~~~~
- `Task` and `Schedule` are Django Models and can therefore be managed by your own code.
- `Task` objects are only created after an async package has been executed.
- A `Schedule` creates a new async package for every execution and thus an unique `Task`
- `Success` and `Failure` are convenient proxy models of `Task`


Testing
-------

To run the tests you will need `py.test <http://pytest.org/latest/>`__ and `pytest-django <https://github.com/pytest-dev/pytest-django>`__


Todo
----

Expand Down
4 changes: 2 additions & 2 deletions django_q/__init__.py
@@ -1,7 +1,7 @@
from django_q.models import Task, Schedule
from django_q.core import async
from django_q.core import async, schedule

VERSION = (0, 1, 2)
VERSION = (0, 1, 3)

default_app_config = 'django_q.apps.DjangoQConfig'

Expand Down
71 changes: 48 additions & 23 deletions django_q/core.py
Expand Up @@ -551,51 +551,76 @@ def __getstate__(self):
return state


def schedule(func, *args, hook=None, schedule_type=Schedule.ONCE, repeats=-1, next_run=timezone.now(), **kwargs):
"""
:param func: function to schedule
:param args: function arguments
:param hook: optional result hook function
:type schedule_type: Schedule.TYPE
:param repeats: how many times to repeat. 0=never, -1=always
:param next_run: Next scheduled run
:type next_run: datetime.datetime
:param kwargs: function keyword arguments
:return: the schedule object
:rtype: Schedule
"""

return Schedule.objects.create(func=func,
hook=hook,
args=args,
kwargs=kwargs,
schedule_type=schedule_type,
repeats=repeats,
next_run=next_run
)


def scheduler(list_key=Conf.Q_LIST):
"""
Creates a task from a schedule at the scheduled time and schedules next run
"""
for schedule in Schedule.objects.exclude(repeats=0).filter(next_run__lt=timezone.now()):
for s in Schedule.objects.exclude(repeats=0).filter(next_run__lt=timezone.now()):
args = ()
kwargs = {}
# get args, kwargs and hook
if schedule.kwargs:
if s.kwargs:
try:
# eval should be safe here cause dict()
kwargs = eval('dict({})'.format(schedule.kwargs))
kwargs = eval('dict({})'.format(s.kwargs))
except SyntaxError:
kwargs = {}
if schedule.args:
args = ast.literal_eval(schedule.args)
if s.args:
args = ast.literal_eval(s.args)
# single value won't eval to tuple, so:
if type(args) != tuple:
args = (args,)
if schedule.hook:
kwargs['hook'] = schedule.hook
if s.hook:
kwargs['hook'] = s.hook
# set up the next run time
if not schedule.schedule_type == schedule.ONCE:
next_run = arrow.get(schedule.next_run)
if schedule.schedule_type == schedule.HOURLY:
if not s.schedule_type == s.ONCE:
next_run = arrow.get(s.next_run)
if s.schedule_type == s.HOURLY:
next_run = next_run.replace(hours=+1)
elif schedule.schedule_type == schedule.DAILY:
elif s.schedule_type == s.DAILY:
next_run = next_run.replace(days=+1)
elif schedule.schedule_type == schedule.WEEKLY:
elif s.schedule_type == s.WEEKLY:
next_run = next_run.replace(weeks=+1)
elif schedule.schedule_type == schedule.MONTHLY:
elif s.schedule_type == s.MONTHLY:
next_run = next_run.replace(months=+1)
elif schedule.schedule_type == schedule.QUARTERLY:
elif s.schedule_type == s.QUARTERLY:
next_run = next_run.replace(months=+3)
elif schedule.schedule_type == schedule.YEARLY:
elif s.schedule_type == s.YEARLY:
next_run = next_run.replace(years=+1)
schedule.next_run = next_run.datetime
schedule.repeats += -1
s.next_run = next_run.datetime
s.repeats += -1
else:
schedule.repeats = 0
s.repeats = 0
# send it to the cluster
kwargs['list_key'] = list_key
schedule.task = async(schedule.func, *args, **kwargs)
if not schedule.task:
logger.error('{} failed to create task from schedule {}').format(current_process().name, schedule.id)
s.task = async(s.func, *args, **kwargs)
if not s.task:
logger.error('{} failed to create task from schedule {}').format(current_process().name, s.id)
else:
logger.info('{} created [{}] from schedule {}'.format(current_process().name, schedule.task, schedule.id))
schedule.save()
logger.info('{} created [{}] from schedule {}'.format(current_process().name, s.task, s.id))
s.save()

11 changes: 9 additions & 2 deletions django_q/tests/tasks.py
Expand Up @@ -3,22 +3,29 @@ def countdown(n):
while n > 0:
n -= 1


def multiply(x, y):
return x * y


def count_letters(tup):
total = 0
for word in tup:
total += len(word)
return total


def count_letters2(obj):
return count_letters(obj.get_words())


def word_multiply(x, word=''):
return len(word) * x


def get_task_name(task):
return task.name


def result(obj):
print('RESULT HOOK {} : {}'.format(obj.name, obj.result))


3 changes: 3 additions & 0 deletions django_q/tests/test_admin.py
@@ -1,3 +1,6 @@
import pytest

@pytest.mark.django_db
def test_admin_view(admin_client):
response = admin_client.get('/admin/django_q/')
assert response.status_code == 200
Expand Down
10 changes: 9 additions & 1 deletion django_q/tests/test_cluster.py
Expand Up @@ -106,6 +106,8 @@ def test_async(r):
f = async(multiply, 753, 2, hook=assert_result, list_key=list_key)
# model as argument
g = async('django_q.tests.tasks.get_task_name', Task(name='John'), list_key=list_key)
# args and kwargs
h = async('django_q.tests.tasks.word_multiply', 2, word='django', list_key=list_key, redis=r)
# check if everything has a task name
assert isinstance(a, str)
assert isinstance(b, str)
Expand All @@ -114,8 +116,9 @@ def test_async(r):
assert isinstance(e, str)
assert isinstance(f, str)
assert isinstance(g, str)
assert isinstance(h, str)
# run the cluster to execute the tasks
task_count = 7
task_count = 8
assert r.llen(list_key) == task_count
task_queue = Queue()
stop_event = Event()
Expand Down Expand Up @@ -168,6 +171,11 @@ def test_async(r):
assert result_g is not None
assert result_g.success is True
assert result(g) == 'John'
# task h
result_h = get_task(h)
assert result_h is not None
assert result_h.success is True
assert result(h) == 12
r.delete(list_key)


Expand Down
28 changes: 21 additions & 7 deletions django_q/tests/test_scheduler.py
@@ -1,22 +1,25 @@
from multiprocessing import Queue, Event

import pytest
from django_q.core import scheduler, pusher, worker, monitor, redis_client

from django_q.core import scheduler, pusher, worker, monitor, redis_client, schedule as create_schedule
from django_q import Schedule, get_task


@pytest.fixture
def r():
return redis_client


@pytest.mark.django_db
def test_scheduler(r):
list_key = 'scheduler_test:q'
r.delete(list_key)
schedule = Schedule.objects.create(func='math.copysign',
args='1, -1',
schedule_type=Schedule.ONCE,
repeats=1,
hook='django_q.tests.tasks.result'
)
schedule = create_schedule('math.copysign',
1, -1,
hook='django_q.tests.tasks.result',
schedule_type=Schedule.HOURLY,
repeats=1)
assert schedule.last_run() is None
# run scheduler
scheduler(list_key=list_key)
Expand All @@ -39,8 +42,19 @@ def test_scheduler(r):
assert result_queue.qsize() == 0
schedule.refresh_from_db()
assert schedule.repeats == 0
assert schedule.last_run() is not None
assert schedule.success() is True
task = get_task(schedule.task)
assert task is not None
assert task.success is True
assert task.result < 0
for t in Schedule.TYPE:
schedule = create_schedule('django_q.tests.tasks.word_multiply',
2,
word='django',
schedule_type=t[0],
repeats=1,
hook='django_q.tests.tasks.result'
)
assert schedule is not None
assert schedule.last_run() is None
2 changes: 1 addition & 1 deletion setup.py
Expand Up @@ -26,7 +26,7 @@ def run(self):

setup(
name='django-q',
version='0.1.2',
version='0.1.3',
author='Ilan Steemers',
author_email='koed00@gmail.com',
packages=['django_q'],
Expand Down

0 comments on commit 4096b35

Please sign in to comment.