Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Plugin for an additional health check #1839

Open
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

ksarabu1
Copy link
Contributor

@ksarabu1 ksarabu1 commented Feb 8, 2021

Feature: Plugin for an additional health check

Patroni keeps a persistent Postgres connection for health checks. If the postmaster doesn't respond or hang for some reason (Issue described in 1371), the query will continue to run normally though the leader is in an unhealthy state.

Change:
Plugin for an additional health check on the leader.

....
postgresql:
liveness:
  probe: /..../db_conn.py
  max_failures: 20
  timeout: 3
  interval: 300
callbacks:
  on_role_change: /.../xyz.py
connect_address: abc1:4335
....

probe: The plugin process should be very quick and should not engage the thread for a long time. It could be a very simple one making a new connection request to the master. The plugin call could be an expensive operation & may add overhead if it runs through each run cycle.
interval: Patroni should run the plugin at a specified interval to reduce the overhead.
timeout: The time Patroni allowed to wait for the plugin process to complete. It should terminate the process if it exceeds the timeout.
max_failures: Maximum failures Patroni can tolerate (Patroni should not take action when the value set to <= 0, it can initiate failover if the value set to >0 and if the # of failures > max_failures).

Will add feature/test-case if the proposed changes look good. Thanks.

Copy link
Collaborator

@CyberDem0n CyberDem0n left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I briefly looked at the code and a lot of things feels wrong to me:

  1. AsyncExecutor and CallbackExecutor seem to be the wrong tools for solving this issue.
  2. Liveness health check is scheduled from the finally block on the _run_cycle().
  3. With such architecture it seems to be not possible to run health-checks more often than loop_wait.
  4. demote/failover functionality seems to be duplicated

Here is my vision of how it should be done:

  1. The health-check class should be inherited from a Thread, and running the probe in the "infinite" loop depending on the state
  2. The health-check class should provide the interface for HA:
  • is_healty() -- returns True, False, and maybe None depending on the exit code of the probe and number of failures
  • is_running() -- health-check loop is running
  1. In order to control the health check loop following mechanisms are necessary:
  • activate()
  • disable()
  1. reload_config() -- to update health-check configs on the fly

These interfaces could be very similar to interfaces provided by Watchdog.
The HA class will control the health-check also quite similar to how it controls Watchdog, i.e. activate() on promote, disable() on demote or on pause.
The place which makes a decision to demote on health-check failure would be in the enforce_master_role() method.

P.S. In order to run probes we could certainly reuse CancellableSubprocess().

@ksarabu1
Copy link
Contributor Author

Thanks for the feedback. Will update the code.

P.S. In order to run probes we could certainly reuse CancellableSubprocess().

patroni/ha.py Outdated
Comment on lines 597 to 612
if self.state_handler.is_leader() and self.state_handler.role == 'master':
if self.is_synchronous_mode() and self.liveness.config and self.liveness.is_running and \
not self.liveness.is_healthy:
logger.info("Leader is in unhealthy state, Demoting")
members = self.cluster.members
if self.is_synchronous_mode():
members = [m for m in members if self.cluster.sync.matches(m.name)]
if self.is_failover_possible(members) and not self.cluster.failover:
self.dcs.manual_failover(leader=self.cluster.leader.name, candidate='', scheduled_at=None,
index=self.cluster.failover)
return 'Demoting self because Liveness Checks returned Leader in unhealthy state'
else:
logger.info("Aborting Liveness Failure Demote operation, failover not possible OR failover "
"already scheduled")
elif not self.liveness.is_healthy and not self.is_synchronous_mode():
logger.info("Skipping Liveness Failure Demote operation due to asynchronous replication mode")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we make a difference between sync and async replication?
If there are nodes that could take over - we should demote.
Also, we don't need to use the manual_failover() method, demote('graceful') would do the job just fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. The intention was to enforce it only when Patroni operating in synchronous mode. Agree that it should behave the same in both modes. Fixing the code.

"""Liveness plugin probe"""
self.resume()
while True:
with self.state:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The check will be executed one more time right after pause() is called.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed


def __init__(self, config):
self.config = config['postgresql'].get('liveness', {})
self.livenesscheck = LivenessThread(config)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In most cases people will have no liveness probe defined, but the active thread increases memory usage.
I would suggest creating LivenessThread on-demand, only when activate() method is called.

Comment on lines 93 to 95
self.livenesscheck.params = self.config and shlex.split(self.config['probe']) + [config['postgresql'].get(
'connect_address', ':').split(':')[0], config['postgresql'].get('connect_address', ':').split(':')[1],
config.get('database', 'postgres')] or []
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to rely on postgresql.config.local_connect_kwargs instead of building a connection string on your own. There are many different ways to screw it up: pgpass, certificates, keys, channel_binding, and so on.
You may take a look at how it is done for post_init: https://github.com/zalando/patroni/blob/master/patroni/postgresql/bootstrap.py#L125-L134 and maybe even extract that block into a separate method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixing the code

Comment on lines 58 to 79
if self.cancellable._process is not None and self.cancellable._process.is_running():
self.cancellable._kill_process()
try:
_ = self.cancellable._start_process(self.params)
ret = self.cancellable._process.wait(timeout=self.config['timeout'])
if ret and ret != 0:
if self.config['max_failures'] > 0:
self.__lv_failures += 1
else:
self.__lv_failures = 0
except TimeoutExpired:
self.cancellable._kill_process()
if self.config['max_failures'] > 0:
self.__lv_failures += 1
logger.error("Liveness probe failed. failures count %s out of %s", self.__lv_failures, self.config[
'max_failures'])
except Exception as e:
logger.error("Exception during liveness probe")
logger.exception(e)
self.cancellable._kill_process()
self.__lv_failures = 0
time.sleep(self.config['interval'])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a few problems with this code-block:

  1. _start_process() should be called holding CancellableExecutor._lock (and some other activities involving _process require locking).
  2. It is very unlikely, but _process.wait() might return None
  3. IMO, any exit code different from 0 or exception raised should be considered as a failure.

I will probably spend some time on the next week making the run() method safe.

Comment on lines +17 to +122
self._lock = Lock()
self.config = config
self.connect_args, self.env = connect_args
self.cmd = self.config and shlex.split(self.config['probe']) + [self.connect_args] or []
self.__lv_failures = 0
self.cancellable = CancellableSubprocess()

@property
def lv_failures(self):
return self.__lv_failures

def terminate(self):
if not self.is_alive():
logger.info("Liveness probe thread not active, terminate skipped")
return
with self._lock:
if not self._stopevent.is_set():
self._stopevent.set()
if self.cancellable._process is not None and self.cancellable._process.is_running():
self.cancellable._kill_process()
logger.info("Liveness probe terminated")

def run(self):
"""Liveness plugin probe"""
while not self._stopevent.isSet():
if self.cancellable._process and self.cancellable._process.is_running():
self.cancellable._kill_process()
try:
with self.cancellable._lock:
self.cancellable._start_process(self.cmd, env=self.env)
ret = self.cancellable._process.wait(timeout=self.config['timeout'])
logger.info("Liveness Probe Completed with status %s", ret)
if ret == 0:
self.__lv_failures = 0
else:
if self.config['max_failures'] > 0:
self.__lv_failures += 1
logger.error("Liveness probe failed. failures %s out of %s", self.__lv_failures, self.config[
'max_failures'])
except TimeoutExpired:
self.cancellable._kill_process()
if self.config['max_failures'] > 0:
self.__lv_failures += 1
logger.error("Liveness probe timedout. failures %s out of %s", self.__lv_failures, self.config[
'max_failures'])
except Exception as e:
logger.error("Exception during liveness probe")
logger.exception(e)
self._stopevent.wait(self.config['interval'])


class Liveness(object):

def __init__(self, config):
self.config = config
self.livenesscheck = None
self._lock = Lock()

@synchronized
def reload_config(self, config):
self.config = config
if self.livenesscheck:
self.livenesscheck.config = self.config
# next loop iteration will restart if liveness checks are set in config
if self.livenesscheck and self.livenesscheck.is_alive():
self._disable()

def _disable(self):
if self.livenesscheck and self.livenesscheck.is_alive():
logger.info("liveness check alive, stopping")
self.livenesscheck.terminate()

def _activate(self, connect_args):
if not self.config:
logger.info("Liveness check activate skipped, No liveness checks in config")
return
self._disable()
if self.livenesscheck and self.livenesscheck.is_alive():
logger.info("Liveness check from prev run still active after terminate, skipping activate")
return
self.livenesscheck = LivenessThread(self.config, connect_args)
self.livenesscheck.start()

@synchronized
def disable(self):
self._disable()

@synchronized
def activate(self, connect_args):
self._activate(connect_args)

@property
def is_running(self):
return self.livenesscheck and self.livenesscheck.is_alive() or False

@property
def is_healthy(self):
if self.livenesscheck and self.livenesscheck.is_alive() and self.config['max_failures'] > 0:
return self.livenesscheck.lv_failures < self.config['max_failures']
return True
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @ksarabu1, I am sorry that it took so long.

Please find attached liveness_check.py draft.
The code is not tested, but it should give you some ideas:

  • LivenessThread() is pretty much modeled like CancellableSubprocess() (call() became call_probe(), and cancel() became terminate()). I did it because CancellableSubprocess() is already well tested.
  • The LivenessThread.run() became more compact
  • LivenessThread() has its own reload_config() method (we need to update self.cmd)
  • Liveness.is_running and Liveness.is_healthy properties are synchronized

Comment on lines +119 to +136
def get_local_connect_kwargs(self):
"""
get local connect kwargs and pgpass
returns tuple (connectString, env)
"""
r = self._postgresql.config.local_connect_kwargs
connstring = self._postgresql.config.format_dsn(r, True)
if 'host' not in r:
# https://www.postgresql.org/docs/current/static/libpq-pgpass.html
# A host name of localhost matches both TCP (host name localhost) and Unix domain socket
# (pghost empty or the default socket directory) connections coming from the local machine.
r['host'] = 'localhost' # set it to localhost to write into pgpass

env = self._postgresql.config.write_pgpass(r)
env['PGOPTIONS'] = '-c synchronous_commit=local'

return connstring, env

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is called from HA in order to activate liveness check, which has nothing to do with a bootstrap, hence I think this method should be created in the Postgresql() class.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yet another concern is that the health check will break if someone removes pgpass file.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants