Skip to content

Commit

Permalink
Merge pull request #126 from kdmukai/close_connection
Browse files Browse the repository at this point in the history
Fix for stale ORM Broker connections
  • Loading branch information
Koed00 committed Jan 8, 2016
2 parents 2e76f9e + e4a87e1 commit b6dd209
Showing 1 changed file with 17 additions and 7 deletions.
24 changes: 17 additions & 7 deletions django_q/brokers/orm.py
Expand Up @@ -2,10 +2,12 @@
from time import sleep

from django.utils import timezone
from django import db
from django.db import transaction

from django_q.brokers import Broker
from django_q.models import OrmQ
from django_q.conf import Conf
from django_q.conf import Conf, logger


def _timeout():
Expand All @@ -15,16 +17,23 @@ def _timeout():
class ORM(Broker):
@staticmethod
def get_connection(list_key=Conf.PREFIX):
if transaction.get_autocommit(): # Only True when not in an atomic block
# Make sure stale connections in the broker thread are explicitly
# closed before attempting DB access.
# logger.debug("Broker thread calling close_old_connections")
db.close_old_connections()
else:
logger.debug("Broker in an atomic transaction")
return OrmQ.objects.using(Conf.ORM)

def queue_size(self):
return self.connection.filter(key=self.list_key, lock__lte=_timeout()).count()
return self.get_connection().filter(key=self.list_key, lock__lte=_timeout()).count()

def lock_size(self):
return self.connection.filter(key=self.list_key, lock__gt=_timeout()).count()
return self.get_connection().filter(key=self.list_key, lock__gt=_timeout()).count()

def purge_queue(self):
return self.connection.filter(key=self.list_key).delete()
return self.get_connection().filter(key=self.list_key).delete()

def ping(self):
return True
Expand All @@ -38,11 +47,11 @@ def fail(self, task_id):
self.delete(task_id)

def enqueue(self, task):
package = self.connection.create(key=self.list_key, payload=task, lock=_timeout())
package = self.get_connection().create(key=self.list_key, payload=task, lock=_timeout())
return package.pk

def dequeue(self):
tasks = self.connection.filter(key=self.list_key, lock__lt=_timeout())[0:Conf.BULK]
tasks = self.get_connection().filter(key=self.list_key, lock__lt=_timeout())[0:Conf.BULK]
if tasks:
task_list = []
lock = timezone.now()
Expand All @@ -58,7 +67,8 @@ def delete_queue(self):
return self.purge_queue()

def delete(self, task_id):
self.connection.filter(pk=task_id).delete()
self.get_connection().filter(pk=task_id).delete()

def acknowledge(self, task_id):
return self.delete(task_id)

0 comments on commit b6dd209

Please sign in to comment.