Skip to content

Commit

Permalink
Merge pull request #452 from Koed00/cron
Browse files Browse the repository at this point in the history
Support for Cron expressions
  • Loading branch information
Koed00 committed Jul 1, 2020
2 parents 29b7e27 + f223cfc commit bcd119c
Show file tree
Hide file tree
Showing 24 changed files with 355 additions and 185 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Expand Up @@ -66,4 +66,5 @@ db.sqlite3
.venv
.idea
djq
node_modules
node_modules
/c.cache/
6 changes: 3 additions & 3 deletions .travis.yml
@@ -1,16 +1,16 @@
language: python

services:
- redis-server
- redis
- mongodb

python:
- "3.7"
- "3.8"

env:
- DJANGO=3.0.7
- DJANGO=2.2.13
- DJANGO=3.0.8
- DJANGO=2.2.14

sudo: true
dist: xenial
Expand Down
8 changes: 7 additions & 1 deletion README.rst
Expand Up @@ -13,7 +13,7 @@ Features

- Multiprocessing worker pool
- Asynchronous tasks
- Scheduled and repeated tasks
- Scheduled, cron and repeated tasks
- Signed and compressed packages
- Failure and success database or cache
- Result hooks, groups and chains
Expand Down Expand Up @@ -180,6 +180,12 @@ Admin page or directly from your code:
repeats=24,
next_run=arrow.utcnow().replace(hour=18, minute=0))
# Use a cron expression
schedule('math.hypot',
3, 4,
schedule_type=Schedule.CRON,
cron = '0 22 * * 1-5')
For more info check the `Schedules <https://django-q.readthedocs.org/en/latest/schedules.html>`__ documentation.


Expand Down
10 changes: 8 additions & 2 deletions django_q/__init__.py
@@ -1,6 +1,12 @@
VERSION = (1, 2, 4)

default_app_config = 'django_q.apps.DjangoQConfig'
default_app_config = "django_q.apps.DjangoQConfig"


__all__ = ['conf', 'cluster', 'models', 'tasks']
__all__ = ["conf", "cluster", "models", "tasks", "croniter"]

# Optional Imports
try:
from croniter import croniter
except ImportError:
croniter = None
62 changes: 24 additions & 38 deletions django_q/admin.py
Expand Up @@ -5,19 +5,13 @@
from django_q.conf import Conf
from django_q.models import Success, Failure, Schedule, OrmQ
from django_q.tasks import async_task
from django_q import croniter


class TaskAdmin(admin.ModelAdmin):
"""model admin for success tasks."""

list_display = (
'name',
'func',
'started',
'stopped',
'time_taken',
'group'
)
list_display = ("name", "func", "started", "stopped", "time_taken", "group")

def has_add_permission(self, request):
"""Don't allow adds."""
Expand All @@ -28,9 +22,9 @@ def get_queryset(self, request):
qs = super(TaskAdmin, self).get_queryset(request)
return qs.filter(success=True)

search_fields = ('name', 'func', 'group')
search_fields = ("name", "func", "group")
readonly_fields = []
list_filter = ('group',)
list_filter = ("group",)

def get_readonly_fields(self, request, obj=None):
"""Set all fields readonly."""
Expand All @@ -50,21 +44,15 @@ def retry_failed(FailAdmin, request, queryset):
class FailAdmin(admin.ModelAdmin):
"""model admin for failed tasks."""

list_display = (
'name',
'func',
'started',
'stopped',
'short_result'
)
list_display = ("name", "func", "started", "stopped", "short_result")

def has_add_permission(self, request):
"""Don't allow adds."""
return False

actions = [retry_failed]
search_fields = ('name', 'func')
list_filter = ('group',)
search_fields = ("name", "func")
list_filter = ("group",)
readonly_fields = []

def get_readonly_fields(self, request, obj=None):
Expand All @@ -76,31 +64,29 @@ class ScheduleAdmin(admin.ModelAdmin):
""" model admin for schedules """

list_display = (
'id',
'name',
'func',
'schedule_type',
'repeats',
'next_run',
'last_run',
'success'
"id",
"name",
"func",
"schedule_type",
"repeats",
"next_run",
"last_run",
"success",
)

list_filter = ('next_run', 'schedule_type')
search_fields = ('func',)
list_display_links = ('id', 'name')
# optional cron strings
if not croniter:
readonly_fields = ("cron",)

list_filter = ("next_run", "schedule_type")
search_fields = ("func",)
list_display_links = ("id", "name")


class QueueAdmin(admin.ModelAdmin):
""" queue admin for ORM broker """
list_display = (
'id',
'key',
'task_id',
'name',
'func',
'lock'
)

list_display = ("id", "key", "task_id", "name", "func", "lock")

def save_model(self, request, obj, form, change):
obj.save(using=Conf.ORM)
Expand Down
2 changes: 1 addition & 1 deletion django_q/apps.py
Expand Up @@ -4,7 +4,7 @@


class DjangoQConfig(AppConfig):
name = 'django_q'
name = "django_q"
verbose_name = Conf.LABEL

def ready(self):
Expand Down
9 changes: 8 additions & 1 deletion django_q/brokers/disque.py
@@ -1,8 +1,11 @@
import random

# External
import redis
from redis import Redis

# Django
from django.utils.translation import gettext_lazy as _
from django_q.brokers import Broker
from django_q.conf import Conf

Expand Down Expand Up @@ -52,6 +55,8 @@ def info(self) -> str:

@staticmethod
def get_connection(list_key: str = Conf.PREFIX) -> Redis:
if not Conf.DISQUE_NODES:
raise redis.exceptions.ConnectionError(_("No Disque nodes configured"))
# randomize nodes
random.shuffle(Conf.DISQUE_NODES)
# find one that works
Expand All @@ -67,4 +72,6 @@ def get_connection(list_key: str = Conf.PREFIX) -> Redis:
return redis_client
except redis.exceptions.ConnectionError:
continue
raise redis.exceptions.ConnectionError("Could not connect to any Disque nodes")
raise redis.exceptions.ConnectionError(
_("Could not connect to any Disque nodes")
)
49 changes: 29 additions & 20 deletions django_q/cluster.py
@@ -1,6 +1,5 @@
import ast

# Standard
import ast
import importlib
import signal
import socket
Expand All @@ -9,9 +8,8 @@
from multiprocessing import Event, Process, Value, current_process
from time import sleep

# external
# External
import arrow

# Django
from django import db
from django.conf import settings
Expand All @@ -28,6 +26,7 @@
from django_q.signals import pre_execute
from django_q.signing import SignedPackage, BadSignature
from django_q.status import Stat, Status
from django_q import croniter


class Cluster:
Expand Down Expand Up @@ -103,10 +102,10 @@ def is_running(self) -> bool:
@property
def is_stopping(self) -> bool:
return (
self.stop_event
and self.start_event
and self.start_event.is_set()
and self.stop_event.is_set()
self.stop_event
and self.start_event
and self.start_event.is_set()
and self.stop_event.is_set()
)

@property
Expand All @@ -116,13 +115,13 @@ def has_stopped(self) -> bool:

class Sentinel:
def __init__(
self,
stop_event,
start_event,
cluster_id,
broker=None,
timeout=Conf.TIMEOUT,
start=True,
self,
stop_event,
start_event,
cluster_id,
broker=None,
timeout=Conf.TIMEOUT,
start=True,
):
# Make sure we catch signals for the pool
signal.signal(signal.SIGINT, signal.SIG_IGN)
Expand Down Expand Up @@ -208,7 +207,7 @@ def reincarnate(self, process):
if process.timer.value == 0:
# only need to terminate on timeout, otherwise we risk destabilizing the queues
process.terminate()
logger.warn(_(f"reincarnated worker {process.name} after timeout"))
logger.warning(_(f"reincarnated worker {process.name} after timeout"))
elif int(process.timer.value) == -2:
logger.info(_(f"recycled worker {process.name}"))
else:
Expand Down Expand Up @@ -377,7 +376,7 @@ def monitor(result_queue: Queue, broker: Broker = None):


def worker(
task_queue: Queue, result_queue: Queue, timer: Value, timeout: int = Conf.TIMEOUT
task_queue: Queue, result_queue: Queue, timer: Value, timeout: int = Conf.TIMEOUT
):
"""
Takes a task from the task queue, tries to execute it and puts the result back in the result queue
Expand Down Expand Up @@ -552,9 +551,9 @@ def scheduler(broker: Broker = None):
try:
with db.transaction.atomic(using=Schedule.objects.db):
for s in (
Schedule.objects.select_for_update()
.exclude(repeats=0)
.filter(next_run__lt=timezone.now())
Schedule.objects.select_for_update()
.exclude(repeats=0)
.filter(next_run__lt=timezone.now())
):
args = ()
kwargs = {}
Expand Down Expand Up @@ -591,6 +590,16 @@ def scheduler(broker: Broker = None):
next_run = next_run.shift(months=+3)
elif s.schedule_type == s.YEARLY:
next_run = next_run.shift(years=+1)
elif s.schedule_type == s.CRON:
if not croniter:
raise ImportError(
_(
"Please install croniter to enable cron expressions"
)
)
next_run = arrow.get(
croniter(s.cron, timezone.now()).get_next()
)
if Conf.CATCH_UP or next_run > arrow.utcnow():
break
# arrow always returns a tz aware datetime, and we don't want
Expand Down
10 changes: 5 additions & 5 deletions django_q/core_signing.py
Expand Up @@ -24,11 +24,11 @@


def loads(
s,
key=None,
salt: str = "django.core.signing",
serializer=JSONSerializer,
max_age=None,
s,
key=None,
salt: str = "django.core.signing",
serializer=JSONSerializer,
max_age=None,
):
"""
Reverse of dumps(), raise BadSignature if signature fails.
Expand Down
10 changes: 5 additions & 5 deletions django_q/management/commands/qmonitor.py
Expand Up @@ -10,12 +10,12 @@ class Command(BaseCommand):

def add_arguments(self, parser):
parser.add_argument(
'--run-once',
action='store_true',
dest='run_once',
"--run-once",
action="store_true",
dest="run_once",
default=False,
help='Run once and then stop.',
help="Run once and then stop.",
)

def handle(self, *args, **options):
monitor(run_once=options.get('run_once', False))
monitor(run_once=options.get("run_once", False))
23 changes: 23 additions & 0 deletions django_q/migrations/0011_auto_20200628_1055.py
@@ -0,0 +1,23 @@
# Generated by Django 3.0.7 on 2020-06-28 10:55

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('django_q', '0010_auto_20200610_0856'),
]

operations = [
migrations.AddField(
model_name='schedule',
name='cron',
field=models.CharField(blank=True, help_text='Cron expression', max_length=100, null=True),
),
migrations.AlterField(
model_name='schedule',
name='schedule_type',
field=models.CharField(choices=[('O', 'Once'), ('I', 'Minutes'), ('H', 'Hourly'), ('D', 'Daily'), ('W', 'Weekly'), ('M', 'Monthly'), ('Q', 'Quarterly'), ('Y', 'Yearly'), ('C', 'Cron')], default='O', max_length=1, verbose_name='Schedule Type'),
),
]

0 comments on commit bcd119c

Please sign in to comment.