Skip to content

Commit

Permalink
Build improvements (#569)
Browse files Browse the repository at this point in the history
* Black linting

* Adding Black to dev dependencies and upping minimal python to 3.6.2 for compatibility

* Updating packages

* Removing pip-tools input and exporting requirements with poetry

* Deleting old setup files and test runner

* Trying 1.3.7

* Looser extras requirements to prevent conflicts

* Sorted imports with isort

* Added iSort to dev dependencies

* Fixes localtime for naive setups
  • Loading branch information
Koed00 committed May 30, 2021
1 parent dc95da3 commit 15155c7
Show file tree
Hide file tree
Showing 45 changed files with 1,497 additions and 4,194 deletions.
9 changes: 0 additions & 9 deletions MANIFEST.in

This file was deleted.

2 changes: 1 addition & 1 deletion django_q/__init__.py
@@ -1,4 +1,4 @@
VERSION = (1, 3, 6)
VERSION = (1, 3, 7)

default_app_config = "django_q.apps.DjangoQConfig"

Expand Down
8 changes: 4 additions & 4 deletions django_q/admin.py
Expand Up @@ -3,7 +3,7 @@
from django.utils.translation import gettext_lazy as _

from django_q.conf import Conf, croniter
from django_q.models import Success, Failure, Schedule, OrmQ
from django_q.models import Failure, OrmQ, Schedule, Success
from django_q.tasks import async_task


Expand Down Expand Up @@ -60,7 +60,7 @@ def get_readonly_fields(self, request, obj=None):


class ScheduleAdmin(admin.ModelAdmin):
""" model admin for schedules """
"""model admin for schedules"""

list_display = (
"id",
Expand All @@ -84,7 +84,7 @@ class ScheduleAdmin(admin.ModelAdmin):


class QueueAdmin(admin.ModelAdmin):
""" queue admin for ORM broker """
"""queue admin for ORM broker"""

list_display = ("id", "key", "task_id", "name", "func", "lock")

Expand All @@ -100,7 +100,7 @@ def get_queryset(self, request):
def has_add_permission(self, request):
"""Don't allow adds."""
return False

list_filter = ("key",)


Expand Down
2 changes: 1 addition & 1 deletion django_q/brokers/__init__.py
@@ -1,7 +1,7 @@
import importlib
from typing import Optional

from django.core.cache import caches, InvalidCacheBackendError
from django.core.cache import InvalidCacheBackendError, caches

from django_q.conf import Conf

Expand Down
6 changes: 4 additions & 2 deletions django_q/brokers/aws_sqs.py
Expand Up @@ -38,7 +38,9 @@ def dequeue(self):
if not isinstance(wait_time_second, int):
raise ValueError("receive_message_wait_time_seconds should be int")
if wait_time_second > 20:
raise ValueError("receive_message_wait_time_seconds is invalid. Reason: Must be >= 0 and <= 20")
raise ValueError(
"receive_message_wait_time_seconds is invalid. Reason: Must be >= 0 and <= 20"
)
params.update({"WaitTimeSeconds": wait_time_second})

tasks = self.queue.receive_messages(**params)
Expand Down Expand Up @@ -80,7 +82,7 @@ def get_connection(list_key: str = Conf.PREFIX) -> Session:
config["region_name"] = config["aws_region"]
del config["aws_region"]

if 'receive_message_wait_time_seconds' in config:
if "receive_message_wait_time_seconds" in config:
del config["receive_message_wait_time_seconds"]

return Session(**config)
Expand Down
3 changes: 2 additions & 1 deletion django_q/brokers/disque.py
Expand Up @@ -2,10 +2,11 @@

# External
import redis
from redis import Redis

# Django
from django.utils.translation import gettext_lazy as _
from redis import Redis

from django_q.brokers import Broker
from django_q.conf import Conf

Expand Down
46 changes: 30 additions & 16 deletions django_q/cluster.py
Expand Up @@ -6,20 +6,22 @@
import socket
import traceback
import uuid
from datetime import datetime
from multiprocessing import Event, Process, Value, current_process
from time import sleep

# External
import arrow

# Django
from django import db, core
from django import core, db
from django.apps.registry import apps

try:
apps.check_apps_ready()
except core.exceptions.AppRegistryNotReady:
import django

django.setup()

from django.conf import settings
Expand All @@ -28,21 +30,21 @@

# Local
import django_q.tasks
from django_q.brokers import get_broker, Broker
from django_q.brokers import Broker, get_broker
from django_q.conf import (
Conf,
croniter,
error_reporter,
get_ppid,
logger,
psutil,
get_ppid,
error_reporter,
croniter,
resource,
)
from django_q.humanhash import humanize
from django_q.models import Task, Success, Schedule
from django_q.models import Schedule, Success, Task
from django_q.queues import Queue
from django_q.signals import pre_execute
from django_q.signing import SignedPackage, BadSignature
from django_q.signing import BadSignature, SignedPackage
from django_q.status import Stat, Status


Expand Down Expand Up @@ -485,8 +487,11 @@ def save_task(task, broker: Broker):
existing_task.attempt_count = existing_task.attempt_count + 1
existing_task.save()

if Conf.MAX_ATTEMPTS > 0 and existing_task.attempt_count >= Conf.MAX_ATTEMPTS:
broker.acknowledge(task['ack_id'])
if (
Conf.MAX_ATTEMPTS > 0
and existing_task.attempt_count >= Conf.MAX_ATTEMPTS
):
broker.acknowledge(task["ack_id"])

else:
func = task["func"]
Expand All @@ -495,8 +500,8 @@ def save_task(task, broker: Broker):
func = f"{func.__module__}.{func.__name__}"
elif inspect.ismethod(func):
func = (
f'{func.__self__.__module__}.'
f'{func.__self__.__name__}.{func.__name__}'
f"{func.__self__.__module__}."
f"{func.__self__.__name__}.{func.__name__}"
)
Task.objects.create(
id=task["id"],
Expand All @@ -510,7 +515,7 @@ def save_task(task, broker: Broker):
result=task["result"],
group=task.get("group"),
success=task["success"],
attempt_count=1
attempt_count=1,
)
except Exception as e:
logger.error(e)
Expand Down Expand Up @@ -582,7 +587,9 @@ def scheduler(broker: Broker = None):
Schedule.objects.select_for_update()
.exclude(repeats=0)
.filter(next_run__lt=timezone.now())
.filter(db.models.Q(cluster__isnull=True) | db.models.Q(cluster=Conf.PREFIX))
.filter(
db.models.Q(cluster__isnull=True) | db.models.Q(cluster=Conf.PREFIX)
)
):
args = ()
kwargs = {}
Expand Down Expand Up @@ -627,7 +634,7 @@ def scheduler(broker: Broker = None):
)
)
next_run = arrow.get(
croniter(s.cron, timezone.localtime()).get_next()
croniter(s.cron, localtime()).get_next()
)
if Conf.CATCH_UP or next_run > arrow.utcnow():
break
Expand All @@ -643,7 +650,7 @@ def scheduler(broker: Broker = None):
scheduled_broker = broker
try:
scheduled_broker = get_broker(q_options["broker_name"])
except: # invalid broker_name or non existing broker with broker_name
except: # invalid broker_name or non existing broker with broker_name
pass
q_options["broker"] = scheduled_broker
q_options["group"] = q_options.get("group", s.name or s.id)
Expand Down Expand Up @@ -734,4 +741,11 @@ def rss_check():
return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss >= Conf.MAX_RSS
elif psutil:
return psutil.Process().memory_info().rss >= Conf.MAX_RSS * 1024
return False
return False


def localtime() -> datetime:
"""" Override for timezone.localtime to deal with naive times and local times"""
if settings.USE_TZ:
return timezone.localtime()
return datetime.now()
16 changes: 9 additions & 7 deletions django_q/conf.py
Expand Up @@ -135,12 +135,13 @@ class Conf:
RETRY = conf.get("retry", 60)

# Verify if retry and timeout settings are correct
if not TIMEOUT or (TIMEOUT > RETRY):
warn("""Retry and timeout are misconfigured. Set retry larger than timeout,
if not TIMEOUT or (TIMEOUT > RETRY):
warn(
"""Retry and timeout are misconfigured. Set retry larger than timeout,
failure to do so will cause the tasks to be retriggered before completion.
See https://django-q.readthedocs.io/en/latest/configure.html#retry for details.""")

See https://django-q.readthedocs.io/en/latest/configure.html#retry for details."""
)

# Sets the amount of tasks the cluster will try to pop off the broker.
# If it supports bulk gets.
BULK = conf.get("bulk", 1)
Expand Down Expand Up @@ -176,7 +177,7 @@ class Conf:
ERROR_REPORTER = conf.get("error_reporter", {})

# Optional attempt count. set to 0 for infinite attempts
MAX_ATTEMPTS = conf.get('max_attempts', 0)
MAX_ATTEMPTS = conf.get("max_attempts", 0)

# OSX doesn't implement qsize because of missing sem_getvalue()
try:
Expand All @@ -200,7 +201,8 @@ class Conf:

# to manage workarounds during testing
TESTING = conf.get("testing", False)



# logger
logger = logging.getLogger("django-q")

Expand Down
13 changes: 4 additions & 9 deletions django_q/core_signing.py
Expand Up @@ -2,15 +2,10 @@
import time
import zlib

from django.core.signing import (
BadSignature,
SignatureExpired,
b64_decode,
JSONSerializer,
Signer as Sgnr,
TimestampSigner as TsS,
dumps,
)
from django.core.signing import BadSignature, JSONSerializer, SignatureExpired
from django.core.signing import Signer as Sgnr
from django.core.signing import TimestampSigner as TsS
from django.core.signing import b64_decode, dumps
from django.utils import baseconv
from django.utils.crypto import constant_time_compare
from django.utils.encoding import force_bytes, force_str
Expand Down

0 comments on commit 15155c7

Please sign in to comment.