New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature: Plugin for an additional health check #1839
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I briefly looked at the code and a lot of things feels wrong to me:
- AsyncExecutor and CallbackExecutor seem to be the wrong tools for solving this issue.
- Liveness health check is scheduled from the
finally
block on the _run_cycle(). - With such architecture it seems to be not possible to run health-checks more often than loop_wait.
- demote/failover functionality seems to be duplicated
Here is my vision of how it should be done:
- The health-check class should be inherited from a Thread, and running the probe in the "infinite" loop depending on the state
- The health-check class should provide the interface for HA:
- is_healty() -- returns True, False, and maybe None depending on the exit code of the probe and number of failures
- is_running() -- health-check loop is running
- In order to control the health check loop following mechanisms are necessary:
- activate()
- disable()
- reload_config() -- to update health-check configs on the fly
These interfaces could be very similar to interfaces provided by Watchdog.
The HA class will control the health-check also quite similar to how it controls Watchdog, i.e. activate() on promote, disable() on demote or on pause.
The place which makes a decision to demote on health-check failure would be in the enforce_master_role() method.
P.S. In order to run probes we could certainly reuse CancellableSubprocess().
Thanks for the feedback. Will update the code.
|
patroni/ha.py
Outdated
if self.state_handler.is_leader() and self.state_handler.role == 'master': | ||
if self.is_synchronous_mode() and self.liveness.config and self.liveness.is_running and \ | ||
not self.liveness.is_healthy: | ||
logger.info("Leader is in unhealthy state, Demoting") | ||
members = self.cluster.members | ||
if self.is_synchronous_mode(): | ||
members = [m for m in members if self.cluster.sync.matches(m.name)] | ||
if self.is_failover_possible(members) and not self.cluster.failover: | ||
self.dcs.manual_failover(leader=self.cluster.leader.name, candidate='', scheduled_at=None, | ||
index=self.cluster.failover) | ||
return 'Demoting self because Liveness Checks returned Leader in unhealthy state' | ||
else: | ||
logger.info("Aborting Liveness Failure Demote operation, failover not possible OR failover " | ||
"already scheduled") | ||
elif not self.liveness.is_healthy and not self.is_synchronous_mode(): | ||
logger.info("Skipping Liveness Failure Demote operation due to asynchronous replication mode") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we make a difference between sync and async replication?
If there are nodes that could take over - we should demote.
Also, we don't need to use the manual_failover() method, demote('graceful')
would do the job just fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. The intention was to enforce it only when Patroni operating in synchronous mode. Agree that it should behave the same in both modes. Fixing the code.
patroni/liveness_check.py
Outdated
"""Liveness plugin probe""" | ||
self.resume() | ||
while True: | ||
with self.state: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The check will be executed one more time right after pause()
is called.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
patroni/liveness_check.py
Outdated
|
||
def __init__(self, config): | ||
self.config = config['postgresql'].get('liveness', {}) | ||
self.livenesscheck = LivenessThread(config) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In most cases people will have no liveness probe defined, but the active thread increases memory usage.
I would suggest creating LivenessThread
on-demand, only when activate() method is called.
patroni/liveness_check.py
Outdated
self.livenesscheck.params = self.config and shlex.split(self.config['probe']) + [config['postgresql'].get( | ||
'connect_address', ':').split(':')[0], config['postgresql'].get('connect_address', ':').split(':')[1], | ||
config.get('database', 'postgres')] or [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be better to rely on postgresql.config.local_connect_kwargs
instead of building a connection string on your own. There are many different ways to screw it up: pgpass, certificates, keys, channel_binding, and so on.
You may take a look at how it is done for post_init
: https://github.com/zalando/patroni/blob/master/patroni/postgresql/bootstrap.py#L125-L134 and maybe even extract that block into a separate method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixing the code
patroni/liveness_check.py
Outdated
if self.cancellable._process is not None and self.cancellable._process.is_running(): | ||
self.cancellable._kill_process() | ||
try: | ||
_ = self.cancellable._start_process(self.params) | ||
ret = self.cancellable._process.wait(timeout=self.config['timeout']) | ||
if ret and ret != 0: | ||
if self.config['max_failures'] > 0: | ||
self.__lv_failures += 1 | ||
else: | ||
self.__lv_failures = 0 | ||
except TimeoutExpired: | ||
self.cancellable._kill_process() | ||
if self.config['max_failures'] > 0: | ||
self.__lv_failures += 1 | ||
logger.error("Liveness probe failed. failures count %s out of %s", self.__lv_failures, self.config[ | ||
'max_failures']) | ||
except Exception as e: | ||
logger.error("Exception during liveness probe") | ||
logger.exception(e) | ||
self.cancellable._kill_process() | ||
self.__lv_failures = 0 | ||
time.sleep(self.config['interval']) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are a few problems with this code-block:
- _start_process() should be called holding CancellableExecutor._lock (and some other activities involving
_process
require locking). - It is very unlikely, but _process.wait() might return None
- IMO, any exit code different from 0 or exception raised should be considered as a failure.
I will probably spend some time on the next week making the run()
method safe.
self._lock = Lock() | ||
self.config = config | ||
self.connect_args, self.env = connect_args | ||
self.cmd = self.config and shlex.split(self.config['probe']) + [self.connect_args] or [] | ||
self.__lv_failures = 0 | ||
self.cancellable = CancellableSubprocess() | ||
|
||
@property | ||
def lv_failures(self): | ||
return self.__lv_failures | ||
|
||
def terminate(self): | ||
if not self.is_alive(): | ||
logger.info("Liveness probe thread not active, terminate skipped") | ||
return | ||
with self._lock: | ||
if not self._stopevent.is_set(): | ||
self._stopevent.set() | ||
if self.cancellable._process is not None and self.cancellable._process.is_running(): | ||
self.cancellable._kill_process() | ||
logger.info("Liveness probe terminated") | ||
|
||
def run(self): | ||
"""Liveness plugin probe""" | ||
while not self._stopevent.isSet(): | ||
if self.cancellable._process and self.cancellable._process.is_running(): | ||
self.cancellable._kill_process() | ||
try: | ||
with self.cancellable._lock: | ||
self.cancellable._start_process(self.cmd, env=self.env) | ||
ret = self.cancellable._process.wait(timeout=self.config['timeout']) | ||
logger.info("Liveness Probe Completed with status %s", ret) | ||
if ret == 0: | ||
self.__lv_failures = 0 | ||
else: | ||
if self.config['max_failures'] > 0: | ||
self.__lv_failures += 1 | ||
logger.error("Liveness probe failed. failures %s out of %s", self.__lv_failures, self.config[ | ||
'max_failures']) | ||
except TimeoutExpired: | ||
self.cancellable._kill_process() | ||
if self.config['max_failures'] > 0: | ||
self.__lv_failures += 1 | ||
logger.error("Liveness probe timedout. failures %s out of %s", self.__lv_failures, self.config[ | ||
'max_failures']) | ||
except Exception as e: | ||
logger.error("Exception during liveness probe") | ||
logger.exception(e) | ||
self._stopevent.wait(self.config['interval']) | ||
|
||
|
||
class Liveness(object): | ||
|
||
def __init__(self, config): | ||
self.config = config | ||
self.livenesscheck = None | ||
self._lock = Lock() | ||
|
||
@synchronized | ||
def reload_config(self, config): | ||
self.config = config | ||
if self.livenesscheck: | ||
self.livenesscheck.config = self.config | ||
# next loop iteration will restart if liveness checks are set in config | ||
if self.livenesscheck and self.livenesscheck.is_alive(): | ||
self._disable() | ||
|
||
def _disable(self): | ||
if self.livenesscheck and self.livenesscheck.is_alive(): | ||
logger.info("liveness check alive, stopping") | ||
self.livenesscheck.terminate() | ||
|
||
def _activate(self, connect_args): | ||
if not self.config: | ||
logger.info("Liveness check activate skipped, No liveness checks in config") | ||
return | ||
self._disable() | ||
if self.livenesscheck and self.livenesscheck.is_alive(): | ||
logger.info("Liveness check from prev run still active after terminate, skipping activate") | ||
return | ||
self.livenesscheck = LivenessThread(self.config, connect_args) | ||
self.livenesscheck.start() | ||
|
||
@synchronized | ||
def disable(self): | ||
self._disable() | ||
|
||
@synchronized | ||
def activate(self, connect_args): | ||
self._activate(connect_args) | ||
|
||
@property | ||
def is_running(self): | ||
return self.livenesscheck and self.livenesscheck.is_alive() or False | ||
|
||
@property | ||
def is_healthy(self): | ||
if self.livenesscheck and self.livenesscheck.is_alive() and self.config['max_failures'] > 0: | ||
return self.livenesscheck.lv_failures < self.config['max_failures'] | ||
return True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @ksarabu1, I am sorry that it took so long.
Please find attached liveness_check.py draft.
The code is not tested, but it should give you some ideas:
- LivenessThread() is pretty much modeled like CancellableSubprocess() (call() became call_probe(), and cancel() became terminate()). I did it because CancellableSubprocess() is already well tested.
- The LivenessThread.run() became more compact
- LivenessThread() has its own reload_config() method (we need to update self.cmd)
- Liveness.is_running and Liveness.is_healthy properties are synchronized
def get_local_connect_kwargs(self): | ||
""" | ||
get local connect kwargs and pgpass | ||
returns tuple (connectString, env) | ||
""" | ||
r = self._postgresql.config.local_connect_kwargs | ||
connstring = self._postgresql.config.format_dsn(r, True) | ||
if 'host' not in r: | ||
# https://www.postgresql.org/docs/current/static/libpq-pgpass.html | ||
# A host name of localhost matches both TCP (host name localhost) and Unix domain socket | ||
# (pghost empty or the default socket directory) connections coming from the local machine. | ||
r['host'] = 'localhost' # set it to localhost to write into pgpass | ||
|
||
env = self._postgresql.config.write_pgpass(r) | ||
env['PGOPTIONS'] = '-c synchronous_commit=local' | ||
|
||
return connstring, env | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is called from HA in order to activate liveness check, which has nothing to do with a bootstrap, hence I think this method should be created in the Postgresql() class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yet another concern is that the health check will break if someone removes pgpass file.
Feature: Plugin for an additional health check
Patroni keeps a persistent Postgres connection for health checks. If the postmaster doesn't respond or hang for some reason (Issue described in 1371), the query will continue to run normally though the leader is in an unhealthy state.
Change:
Plugin for an additional health check on the leader.
probe: The plugin process should be very quick and should not engage the thread for a long time. It could be a very simple one making a new connection request to the master. The plugin call could be an expensive operation & may add overhead if it runs through each run cycle.
interval: Patroni should run the plugin at a specified interval to reduce the overhead.
timeout: The time Patroni allowed to wait for the plugin process to complete. It should terminate the process if it exceeds the timeout.
max_failures: Maximum failures Patroni can tolerate (Patroni should not take action when the value set to <= 0, it can initiate failover if the value set to >0 and if the # of failures > max_failures).
Will add feature/test-case if the proposed changes look good. Thanks.