-
Notifications
You must be signed in to change notification settings - Fork 30
/
health.py
61 lines (46 loc) · 1.34 KB
/
health.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
from flask import current_app
from flask_classful import FlaskView, route
from flask_json import as_json
from redis.exceptions import ConnectionError
from flexmeasures.data import db
def _check_sql_database():
try:
db.session.execute("SELECT 1").first()
return True
except Exception: # noqa: B902
current_app.logger.exception("Database down or undetected")
return False
def _check_redis() -> bool:
"""Check status of the redis instance
:return: True if the redis instance is active, False otherwise
:rtype: bool
"""
try:
current_app.redis_connection.ping()
return True
except ConnectionError:
return False
class HealthAPI(FlaskView):
route_base = "/health"
trailing_slash = False
@route("/ready", methods=["GET"])
@as_json
def is_ready(self):
"""
Get readiness status
.. :quickref: Health; Get readiness status
**Example response:**
.. sourcecode:: json
{
'database_sql': True,
'database_redis': False
}
"""
status = {
"database_sql": _check_sql_database(),
"database_redis": _check_redis(),
}
if all(status.values()):
return status, 200
else:
return status, 503