-
-
Notifications
You must be signed in to change notification settings - Fork 273
/
tasks.py
162 lines (134 loc) · 4.43 KB
/
tasks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
"""Provides task functionalities."""
from multiprocessing import Queue, Value
# django
from django.utils import timezone
# local
import signing
import cluster
from django_q.conf import Conf, redis_client, logger
from django_q.models import Schedule, Task
from django_q.humanhash import uuid
def async(func, *args, **kwargs):
"""Send a task to the cluster."""
# get options from q_options dict or direct from kwargs
options = kwargs.pop('q_options', kwargs)
hook = options.pop('hook', None)
list_key = options.pop('list_key', Conf.Q_LIST)
redis = options.pop('redis', redis_client)
sync = options.pop('sync', False)
group = options.pop('group', None)
save = options.pop('save', None)
# get an id
tag = uuid()
# build the task package
task = {'id': tag[1], 'name': tag[0],
'func': func,
'args': args,
'kwargs': kwargs,
'started': timezone.now()}
# add optionals
if hook:
task['hook'] = hook
if group:
task['group'] = group
if save is not None:
task['save'] = save
# sign it
pack = signing.SignedPackage.dumps(task)
if sync:
return _sync(task['id'], pack)
# push it
redis.rpush(list_key, pack)
logger.debug('Pushed {}'.format(tag))
return task['id']
def schedule(func, *args, **kwargs):
"""
Create a schedule.
:param func: function to schedule.
:param args: function arguments.
:param name: optional name for the schedule.
:param hook: optional result hook function.
:type schedule_type: Schedule.TYPE
:param repeats: how many times to repeat. 0=never, -1=always.
:param next_run: Next scheduled run.
:type next_run: datetime.datetime
:param kwargs: function keyword arguments.
:return: the schedule object.
:rtype: Schedule
"""
name = kwargs.pop('name', None)
hook = kwargs.pop('hook', None)
schedule_type = kwargs.pop('schedule_type', Schedule.ONCE)
repeats = kwargs.pop('repeats', -1)
next_run = kwargs.pop('next_run', timezone.now())
return Schedule.objects.create(name=name,
func=func,
hook=hook,
args=args,
kwargs=kwargs,
schedule_type=schedule_type,
repeats=repeats,
next_run=next_run
)
def result(task_id):
"""
Return the result of the named task.
:type task_id: str or uuid
:param task_id: the task name or uuid
:return: the result object of this task
:rtype: object
"""
return Task.get_result(task_id)
def result_group(group_id, failures=False):
"""
Return a list of results for a task group.
:param str group_id: the group id
:param bool failures: set to True to include failures
:return: list or results
"""
return Task.get_result_group(group_id, failures)
def fetch(task_id):
"""
Return the processed task.
:param task_id: the task name or uuid
:type task_id: str or uuid
:return: the full task object
:rtype: Task
"""
return Task.get_task(task_id)
def fetch_group(group_id, failures=True):
"""
Return a list of Tasks for a task group.
:param str group_id: the group id
:param bool failures: set to False to exclude failures
:return: list of Tasks
"""
return Task.get_task_group(group_id, failures)
def count_group(group_id, failures=False):
"""
Count the results in a group.
:param str group_id: the group id
:param bool failures: Returns failure count if True
:return: the number of tasks/results in a group
:rtype: int
"""
return Task.get_group_count(group_id, failures)
def delete_group(group_id, tasks=False):
"""
Delete a group.
:param str group_id: the group id
:param bool tasks: If set to True this will also delete the group tasks.
Otherwise just the group label is removed.
:return:
"""
return Task.delete_group(group_id, tasks)
def _sync(task_id, pack):
"""Simulate a package travelling through the cluster."""
task_queue = Queue()
result_queue = Queue()
task_queue.put(pack)
task_queue.put('STOP')
cluster.worker(task_queue, result_queue, Value('b', -1))
result_queue.put('STOP')
cluster.monitor(result_queue)
return task_id