-
-
Notifications
You must be signed in to change notification settings - Fork 273
/
disque.py
70 lines (59 loc) · 2.42 KB
/
disque.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import random
import redis
from django_q.brokers import Broker
from django_q.conf import Conf
class Disque(Broker):
def enqueue(self, task):
retry = Conf.RETRY if Conf.RETRY > 0 else '{} REPLICATE 1'.format(Conf.RETRY)
return self.connection.execute_command(
'ADDJOB {} {} 500 RETRY {}'.format(self.list_key, task, retry)).decode()
def dequeue(self):
t = None
if len(self.task_cache) > 0:
t = self.task_cache.pop()
else:
tasks = self.connection.execute_command(
'GETJOB COUNT {} TIMEOUT 1000 FROM {}'.format(Conf.BULK, self.list_key))
if tasks:
t = tasks.pop()
if tasks:
self.task_cache = tasks
if t:
return t[1].decode(), t[2].decode()
def queue_size(self):
return self.connection.execute_command('QLEN {}'.format(self.list_key))
def acknowledge(self, task_id):
return self.connection.execute_command('ACKJOB {}'.format(task_id))
def ping(self):
return self.connection.execute_command('HELLO')[0] > 0
def delete(self, task_id):
return self.connection.execute_command('DELJOB {}'.format(task_id))
def fail(self, task_id):
return self.delete(task_id)
def delete_queue(self):
jobs = self.connection.execute_command('JSCAN QUEUE {}'.format(self.list_key))[1]
if jobs:
job_ids = ' '.join(jid.decode() for jid in jobs)
self.connection.execute_command('DELJOB {}'.format(job_ids))
return len(jobs)
def info(self):
info = self.connection.info('server')
return 'Disque {}'.format(info['disque_version'])
@staticmethod
def get_connection(list_key=Conf.PREFIX):
# randomize nodes
random.shuffle(Conf.DISQUE_NODES)
# find one that works
for node in Conf.DISQUE_NODES:
host, port = node.split(':')
kwargs = {'host': host, 'port': port}
if Conf.DISQUE_AUTH:
kwargs['password'] = Conf.DISQUE_AUTH
redis_client = redis.Redis(**kwargs)
redis_client.decode_responses = True
try:
redis_client.execute_command('HELLO')
return redis_client
except redis.exceptions.ConnectionError:
continue
raise redis.exceptions.ConnectionError('Could not connect to any Disque nodes')