diff --git a/django_q/monitor.py b/django_q/monitor.py index 02d6a59d..d575e11c 100644 --- a/django_q/monitor.py +++ b/django_q/monitor.py @@ -86,9 +86,8 @@ def monitor(run_once=False): class Status(object): - """ - Cluster status base class - """ + + """Cluster status base class.""" def __init__(self, pid): self.workers = [] @@ -106,9 +105,8 @@ def __init__(self, pid): class Stat(Status): - """ - Status object for Cluster monitoring - """ + + """Status object for Cluster monitoring.""" def __init__(self, sentinel): super(Stat, self).__init__(sentinel.parent_pid or sentinel.pid) @@ -171,7 +169,8 @@ def get(cluster_id, r=redis_client): @staticmethod def get_all(r=redis_client): """ - Gets status for all currently running clusters with the same prefix and secret key + Get the status for all currently running clusters with the same prefix + and secret key. :return: list of type Stat """ stats = [] diff --git a/django_q/signing.py b/django_q/signing.py index 69a0ae6b..11afc124 100644 --- a/django_q/signing.py +++ b/django_q/signing.py @@ -1,3 +1,4 @@ +"""Package signing.""" try: import cPickle as pickle except ImportError: @@ -11,9 +12,8 @@ class SignedPackage(object): - """ - Wraps Django's signing module with custom Pickle serializer - """ + + """Wraps Django's signing module with custom Pickle serializer.""" @staticmethod def dumps(obj, compressed=Conf.COMPRESSED): @@ -32,10 +32,8 @@ def loads(obj): class PickleSerializer(object): - """ - Simple wrapper around Pickle for signing.dumps and - signing.loads. - """ + + """Simple wrapper around Pickle for signing.dumps and signing.loads.""" @staticmethod def dumps(obj): diff --git a/django_q/tasks.py b/django_q/tasks.py index 86976d19..07870d41 100644 --- a/django_q/tasks.py +++ b/django_q/tasks.py @@ -1,3 +1,4 @@ +"""Provides task functionalities.""" from multiprocessing import Queue, Value # django @@ -12,9 +13,7 @@ def async(func, *args, **kwargs): - """ - Sends a task to the cluster - """ + """Send a task to the cluster.""" # get options from q_options dict or direct from kwargs options = kwargs.pop('q_options', kwargs) hook = options.pop('hook', None) @@ -26,7 +25,10 @@ def async(func, *args, **kwargs): # get an id tag = uuid() # build the task package - task = {'id': tag[1], 'name': tag[0], 'func': func, 'args': args, 'kwargs': kwargs, + task = {'id': tag[1], 'name': tag[0], + 'func': func, + 'args': args, + 'kwargs': kwargs, 'started': timezone.now()} # add optionals if hook: @@ -47,19 +49,20 @@ def async(func, *args, **kwargs): def schedule(func, *args, **kwargs): """ - :param func: function to schedule - :param args: function arguments - :param name: optional name for the schedule - :param hook: optional result hook function + Create a schedule. + + :param func: function to schedule. + :param args: function arguments. + :param name: optional name for the schedule. + :param hook: optional result hook function. :type schedule_type: Schedule.TYPE - :param repeats: how many times to repeat. 0=never, -1=always - :param next_run: Next scheduled run + :param repeats: how many times to repeat. 0=never, -1=always. + :param next_run: Next scheduled run. :type next_run: datetime.datetime - :param kwargs: function keyword arguments - :return: the schedule object + :param kwargs: function keyword arguments. + :return: the schedule object. :rtype: Schedule """ - name = kwargs.pop('name', None) hook = kwargs.pop('hook', None) schedule_type = kwargs.pop('schedule_type', Schedule.ONCE) @@ -79,7 +82,8 @@ def schedule(func, *args, **kwargs): def result(task_id): """ - Returns the result of the named task + Return the result of the named task. + :type task_id: str or uuid :param task_id: the task name or uuid :return: the result object of this task @@ -90,7 +94,8 @@ def result(task_id): def result_group(group_id, failures=False): """ - returns a list of results for a task group + Return a list of results for a task group. + :param str group_id: the group id :param bool failures: set to True to include failures :return: list or results @@ -100,7 +105,8 @@ def result_group(group_id, failures=False): def fetch(task_id): """ - Returns the processed task + Return the processed task. + :param task_id: the task name or uuid :type task_id: str or uuid :return: the full task object @@ -111,17 +117,19 @@ def fetch(task_id): def fetch_group(group_id, failures=True): """ - Returns a list of Tasks for a task group + Return a list of Tasks for a task group. + :param str group_id: the group id :param bool failures: set to False to exclude failures :return: list of Tasks """ - return Task.get_task_group(group_id, failures) def count_group(group_id, failures=False): """ + Count the results in a group. + :param str group_id: the group id :param bool failures: Returns failure count if True :return: the number of tasks/results in a group @@ -132,6 +140,8 @@ def count_group(group_id, failures=False): def delete_group(group_id, tasks=False): """ + Delete a group. + :param str group_id: the group id :param bool tasks: If set to True this will also delete the group tasks. Otherwise just the group label is removed. @@ -141,10 +151,7 @@ def delete_group(group_id, tasks=False): def _sync(task_id, pack): - """ - Simulates a package travelling through the cluster. - - """ + """Simulate a package travelling through the cluster.""" task_queue = Queue() result_queue = Queue() task_queue.put(pack)