Skip to content

Commit

Permalink
Expose queue length as a prometheus metric
Browse files Browse the repository at this point in the history
Implements #1173

* Quick fix synchronous redis calls.
  Redis broker calls were synchronous, so blocking the main thread. This
  moves them to ThreadPoolExecutor
* Update workers list regularly for up-to-date metrics
  Previously self.workers was only updated only on UI events. This is no
  longer fine as we use the data to produce up-to-date metrics
  • Loading branch information
HTRafal committed Jun 12, 2023
1 parent 5f711a5 commit 46fa852
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 30 deletions.
22 changes: 1 addition & 21 deletions flower/api/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,27 +395,7 @@ async def get(self):
:statuscode 401: unauthorized request
:statuscode 503: result backend is not configured
"""
app = self.application
broker_options = self.capp.conf.broker_transport_options

http_api = None
if app.transport == 'amqp' and app.options.broker_api:
http_api = app.options.broker_api

broker_use_ssl = None
if self.capp.conf.broker_use_ssl:
broker_use_ssl = self.capp.conf.broker_use_ssl

broker = Broker(app.capp.connection().as_uri(include_password=True),
http_api=http_api, broker_options=broker_options, broker_use_ssl=broker_use_ssl)

queue_names = self.get_active_queue_names()

if not queue_names:
queue_names = set([self.capp.conf.task_default_queue]) |\
set([q.name for q in self.capp.conf.task_queues or [] if q.name])

queues = await broker.queues(sorted(queue_names))
queues = await self.get_active_queue_lengths()
self.write({'active_queues': queues})


Expand Down
53 changes: 49 additions & 4 deletions flower/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,20 @@
from tornado import ioloop
from tornado.concurrent import run_on_executor
from tornado.httpserver import HTTPServer
from tornado.ioloop import PeriodicCallback, IOLoop
from tornado.web import url

from .urls import handlers as default_handlers
from .events import Events
from .events import Events, get_prometheus_metrics
from .inspector import Inspector
from .options import default_options

from .utils.broker import get_active_queue_lengths

logger = logging.getLogger(__name__)

# TODO: does this need to be configuration from options?
BROKER_METRICS_UPDATE_INTERVAL_SECONDS = 10
# Main dashboard view is updated regardless of this, because it subscribes to live events from celery.
WORKER_DETAILS_UPDATE_INTERVAL = 120

if sys.version_info[0]==3 and sys.version_info[1] >= 8 and sys.platform.startswith('win'):
import asyncio
Expand Down Expand Up @@ -81,7 +85,9 @@ def start(self):
server.add_socket(socket)

self.started = True
self.update_workers()
self.io_loop.spawn_callback(self.update_broker_metrics)
# otherwise self.workers are only updated on UI events and metrics get outdated after some time
self.io_loop.spawn_callback(self.update_worker_details)
self.io_loop.start()

def stop(self):
Expand All @@ -103,3 +109,42 @@ def workers(self):

def update_workers(self, workername=None):
return self.inspector.inspect(workername)

async def update_broker_metrics(self):
logger.debug("Updating broker metrics.")

def is_worker_alive(worker_name):
worker = self.events.state.workers.data.get(worker_name)
if not worker:
return None
return worker.alive
while True:
next_call = tornado.gen.sleep(BROKER_METRICS_UPDATE_INTERVAL_SECONDS)
try:
active_queues = await get_active_queue_lengths(self)
metrics = get_prometheus_metrics()
# clear old data to not leave metrics for queues no longer active
metrics.queue_online_workers.clear()
metrics.queue_length.clear()
for queue_entry in active_queues:
queue = queue_entry["name"]
metrics.queue_length.labels(queue).set(queue_entry["messages"])
nr_of_workers = sum(
1 for name, data in self.workers.items() if
is_worker_alive(name) and any(q["name"] == queue for q in data.get("active_queues", []))
)
metrics.queue_online_workers.labels(queue).set(nr_of_workers)
except Exception as e:
logger.warning("Updating broker metrics failed with %s", repr(e))
else:
logger.debug("Done updating metrics.")
await next_call

async def update_worker_details(self):
while True:
next_call = tornado.gen.sleep(WORKER_DETAILS_UPDATE_INTERVAL)
try:
self.update_workers()
except Exception as e:
logger.warning("Failed to update workers list from celery %s", repr(e))
await next_call
2 changes: 2 additions & 0 deletions flower/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ def __init__(self):
"Number of tasks currently executing at a worker",
['worker']
)
self.queue_length = Gauge('flower_broker_queue_length', "Broker queue length", ['queue'])
self.queue_online_workers = Gauge('flower_broker_queue_online_workers', "Workers online per queue", ['queue'])


class EventsState(State):
Expand Down
42 changes: 41 additions & 1 deletion flower/utils/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,9 @@ class RedisBase(BrokerBase):
DEFAULT_SEP = '\x06\x16'
DEFAULT_PRIORITY_STEPS = [0, 3, 6, 9]

def __init__(self, broker_url, *args, **kwargs):
def __init__(self, broker_url, io_loop=None, *args, **kwargs):
super(RedisBase, self).__init__(broker_url)
self.io_loop = io_loop or ioloop.IOLoop.instance()
self.redis = None

if not redis:
Expand All @@ -116,6 +117,11 @@ def _q_for_pri(self, queue, pri):
return '{0}{1}{2}'.format(*((queue, self.sep, pri) if pri else (queue, '', '')))

async def queues(self, names):
# TODO: use redis.asyncio instead of synchronous client with ThreadPoolExecutor
queue_sizes = await self.io_loop.run_in_executor(None, self._queues_synchronous, names)
return queue_sizes

def _queues_synchronous(self, names):
queue_stats = []
for name in names:
priority_names = [self.broker_prefix + self._q_for_pri(
Expand Down Expand Up @@ -260,6 +266,40 @@ def queues(self, names):
raise NotImplementedError


def get_active_queue_names(application):
queues = set([])
for _, info in application.workers.items():
for q in info.get('active_queues', []):
queues.add(q['name'])
return queues


async def get_active_queue_lengths(application):
app = application
capp = application.capp
broker_options = capp.conf.BROKER_TRANSPORT_OPTIONS

http_api = None
if app.transport == 'amqp' and app.options.broker_api:
http_api = app.options.broker_api

broker_use_ssl = None
if capp.conf.BROKER_USE_SSL:
broker_use_ssl = capp.conf.BROKER_USE_SSL

broker = Broker(app.capp.connection().as_uri(include_password=True),
http_api=http_api, broker_options=broker_options, broker_use_ssl=broker_use_ssl)

queue_names = get_active_queue_names(application)

if not queue_names:
queue_names = set([capp.conf.CELERY_DEFAULT_QUEUE]) | \
set([q.name for q in capp.conf.CELERY_QUEUES or [] if q.name])

queues = await broker.queues(sorted(queue_names))
return queues


async def main():
broker_url = sys.argv[1] if len(sys.argv) > 1 else 'amqp://'
queue_name = sys.argv[2] if len(sys.argv) > 2 else 'celery'
Expand Down
10 changes: 6 additions & 4 deletions flower/views/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import tornado

from ..utils import template, bugreport, strtobool
from ..utils.broker import get_active_queue_names, get_active_queue_lengths

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -127,8 +128,9 @@ def format_task(self, task):
return task

def get_active_queue_names(self):
queues = set([])
for _, info in self.application.workers.items():
for q in info.get('active_queues', []):
queues.add(q['name'])
return get_active_queue_names(self.application)


async def get_active_queue_lengths(self):
queues = await get_active_queue_lengths(self.application)
return queues

0 comments on commit 46fa852

Please sign in to comment.