Skip to content

Commit

Permalink
Support replication connections through PgBouncer
Browse files Browse the repository at this point in the history
In session pooling mode PgBouncer is pretty much a transparent proxy,
i.e. the client does normally not even need to know that PgBouncer is in
the middle. This allows things like load balancing and failovers without
the client needing to know about this at all. But as soon as replication
connections are needed, this was not possible anymore, because PgBouncer
would reject those instead of proxying them to the right server.

This PR fixes that by also proxying replication connections. They are
handled pretty differently from normal connections though. A client and
server replication connection will form a strong pair, as soon as one is
closed the other is closed too. So, there's no caching of the server
replication connections, like is done for regular connections. Reusing
replication connections comes with a ton of gotchas. Postgres will throw
errors in many cases when trying to do so. So simply not doing it seems
like a good tradeoff for ease of implementation. Especially because
replication connections are pretty much always very long lived. So
re-using them gains pretty much no performance benefits.

Fixes pgbouncer#382
  • Loading branch information
JelteF committed Sep 6, 2023
1 parent 8d11b1c commit d6b65d1
Show file tree
Hide file tree
Showing 33 changed files with 1,057 additions and 101 deletions.
6 changes: 3 additions & 3 deletions .cirrus.yml
Expand Up @@ -103,7 +103,7 @@ task:
# - image: rockylinux:8
# - image: centos:centos7
# setup_script:
# - yum -y install autoconf automake diffutils file libevent-devel libtool make openssl-devel pkg-config postgresql-server systemd-devel wget
# - yum -y install autoconf automake diffutils file libevent-devel libtool make openssl-devel pkg-config postgresql-server postgresql-contrib systemd-devel wget
# - if cat /etc/centos-release | grep -q ' 7'; then yum -y install python python-pip; else yum -y install python3 python3-pip sudo iptables; fi
# - wget -O /tmp/pandoc.tar.gz https://github.com/jgm/pandoc/releases/download/2.10.1/pandoc-2.10.1-linux-amd64.tar.gz
# - tar xvzf /tmp/pandoc.tar.gz --strip-components 1 -C /usr/local/
Expand Down Expand Up @@ -133,7 +133,7 @@ task:
# - image: alpine:latest
# setup_script:
# - apk update
# - apk add autoconf automake bash build-base libevent-dev libtool openssl openssl-dev pkgconf postgresql python3 py3-pip wget sudo iptables
# - apk add autoconf automake bash build-base libevent-dev libtool openssl openssl-dev pkgconf postgresql postgresql-contrib python3 py3-pip wget sudo iptables
# - wget -O /tmp/pandoc.tar.gz https://github.com/jgm/pandoc/releases/download/2.10.1/pandoc-2.10.1-linux-amd64.tar.gz
# - tar xvzf /tmp/pandoc.tar.gz --strip-components 1 -C /usr/local/
# - python3 -m pip install -r requirements.txt
Expand Down Expand Up @@ -161,7 +161,7 @@ task:
HAVE_IPV6_LOCALHOST: yes
USE_SUDO: true
setup_script:
- pkg install -y autoconf automake bash gmake hs-pandoc libevent libtool pkgconf postgresql12-server python devel/py-pip sudo
- pkg install -y autoconf automake bash gmake hs-pandoc libevent libtool pkgconf postgresql12-server postgresql12-contrib python devel/py-pip sudo
- pip install -r requirements.txt
- kldload pf
- echo 'anchor "pgbouncer_test/*"' >> /etc/pf.conf
Expand Down
10 changes: 10 additions & 0 deletions .editorconfig
Expand Up @@ -12,6 +12,16 @@ trim_trailing_whitespace = true
indent_style = tab
indent_size = 8

[hba_test.{eval,rules}]
indent_style = tab
indent_size = 8

[hba_test.rules]
# Disable trailing_whitespace check for hba_test.rules, because one of the
# tests in that file is that parsing doesn't break in case of trailing
# whitespace.
trim_trailing_whitespace = false

[*.py]
indent_style = space
indent_size = 4
Expand Down
2 changes: 2 additions & 0 deletions Makefile
Expand Up @@ -25,6 +25,7 @@ pgbouncer_SOURCES = \
src/util.c \
src/varcache.c \
src/common/base64.c \
src/common/bool.c \
src/common/saslprep.c \
src/common/scram-common.c \
src/common/unicode_norm.c \
Expand All @@ -51,6 +52,7 @@ pgbouncer_SOURCES = \
include/util.h \
include/varcache.h \
include/common/base64.h \
include/common/builtins.h \
include/common/pg_wchar.h \
include/common/postgres_compat.h \
include/common/saslprep.h \
Expand Down
2 changes: 1 addition & 1 deletion doc/config.md
Expand Up @@ -1285,7 +1285,7 @@ The file follows the format of the PostgreSQL `pg_hba.conf` file
(see <https://www.postgresql.org/docs/current/auth-pg-hba-conf.html>).

* Supported record types: `local`, `host`, `hostssl`, `hostnossl`.
* Database field: Supports `all`, `sameuser`, `@file`, multiple names. Not supported: `replication`, `samerole`, `samegroup`.
* Database field: Supports `all`, `replication`, `sameuser`, `@file`, multiple names. Not supported: `samerole`, `samegroup`.
* User name field: Supports `all`, `@file`, multiple names. Not supported: `+groupname`.
* Address field: Supports IPv4, IPv6. Not supported: DNS names, domain prefixes.
* Auth-method field: Only methods supported by PgBouncer's `auth_type`
Expand Down
16 changes: 16 additions & 0 deletions include/bouncer.h
Expand Up @@ -104,6 +104,7 @@ typedef union PgAddr PgAddr;
typedef enum SocketState SocketState;
typedef struct PktHdr PktHdr;
typedef struct ScramState ScramState;
typedef enum ReplicationType ReplicationType;

extern int cf_sbuf_len;

Expand Down Expand Up @@ -305,6 +306,11 @@ struct PgPool {
* Clients that sent cancel request, to cancel another client its query.
* These requests are waiting for a new server connection to be opened,
* before the request can be forwarded.
*
* This is a separate list from waiting_client_list, because we want to
* give cancel requests priority over regular clients. The main reason
* for this is, because a cancel request might free up a connection,
* which can be used for one of the waiting clients.
*/
struct StatList waiting_cancel_req_list;

Expand Down Expand Up @@ -504,6 +510,13 @@ struct PgDatabase {
struct AATree user_tree; /* users that have been queried on this database */
};

enum ReplicationType {
REPLICATION_NONE = 0,
REPLICATION_LOGICAL,
REPLICATION_PHYSICAL,
};

extern const char *replication_type_parameters[3];

/*
* A client or server connection.
Expand Down Expand Up @@ -543,6 +556,9 @@ struct PgSocket {

bool wait_sslchar : 1; /* server: waiting for ssl response: S/N */

ReplicationType replication; /* If this is a replication connection */
char *startup_options; /* only tracked for replication connections */

int expect_rfq_count; /* client: count of ReadyForQuery packets client should see */

usec_t connect_time; /* when connection was made */
Expand Down
17 changes: 17 additions & 0 deletions include/common/builtins.h
@@ -0,0 +1,17 @@
/*-------------------------------------------------------------------------
*
* builtins.h
* Declarations for operations on built-in types.
*
*
* Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* include/common/builtins.h
*
*-------------------------------------------------------------------------
*/

/* bool.c */
extern bool parse_bool(const char *value, bool *result);
extern bool parse_bool_with_len(const char *value, size_t len, bool *result);
3 changes: 2 additions & 1 deletion include/common/postgres_compat.h
Expand Up @@ -7,6 +7,7 @@
/* from c.h */

#include <string.h>
#include <usual/ctype.h>

#define int8 int8_t
#define uint8 uint8_t
Expand All @@ -15,6 +16,7 @@

#define lengthof(array) (sizeof (array) / sizeof ((array)[0]))
#define pg_hton32(x) htobe32(x)
#define pg_strncasecmp strncasecmp

#define pg_attribute_noreturn() _NORETURN

Expand All @@ -30,6 +32,5 @@
#define pg_sha256_update(ctx, data, len) sha256_update(ctx, data, len)
#define pg_sha256_final(ctx, dst) sha256_final(ctx, dst)


/* define this to use non-server code paths */
#define FRONTEND
2 changes: 1 addition & 1 deletion include/hba.h
Expand Up @@ -20,4 +20,4 @@ struct HBA;

struct HBA *hba_load_rules(const char *fn);
void hba_free(struct HBA *hba);
int hba_eval(struct HBA *hba, PgAddr *addr, bool is_tls, const char *dbname, const char *username);
int hba_eval(struct HBA *hba, PgAddr *addr, bool is_tls, ReplicationType replication, const char *dbname, const char *username);
1 change: 1 addition & 0 deletions include/objects.h
Expand Up @@ -42,6 +42,7 @@ PgPool *get_pool(PgDatabase *, PgUser *);
PgPool *get_peer_pool(PgDatabase *);
PgSocket *compare_connections_by_time(PgSocket *lhs, PgSocket *rhs);
bool evict_connection(PgDatabase *db) _MUSTCHECK;
bool evict_pool_connection(PgPool *pool) _MUSTCHECK;
bool evict_user_connection(PgUser *user) _MUSTCHECK;
bool find_server(PgSocket *client) _MUSTCHECK;
bool life_over(PgSocket *server);
Expand Down
3 changes: 0 additions & 3 deletions include/pktbuf.h
Expand Up @@ -97,9 +97,6 @@ void pktbuf_write_ExtQuery(PktBuf *buf, const char *query, int nargs, ...);
#define pktbuf_write_CancelRequest(buf, key) \
pktbuf_write_generic(buf, PKT_CANCEL, "b", key, 8)

#define pktbuf_write_StartupMessage(buf, user, parms, parms_len) \
pktbuf_write_generic(buf, PKT_STARTUP, "bsss", parms, parms_len, "user", user, "")

#define pktbuf_write_PasswordMessage(buf, psw) \
pktbuf_write_generic(buf, 'p', "s", psw)

Expand Down
1 change: 1 addition & 0 deletions include/server.h
Expand Up @@ -18,6 +18,7 @@

bool server_proto(SBuf *sbuf, SBufEvent evtype, struct MBuf *pkt) _MUSTCHECK;
void kill_pool_logins(PgPool *pool, const char *sqlstate, const char *msg);
int server_pool_mode(PgSocket *server) _MUSTCHECK;
int pool_pool_mode(PgPool *pool) _MUSTCHECK;
int pool_pool_size(PgPool *pool) _MUSTCHECK;
int pool_min_pool_size(PgPool *pool) _MUSTCHECK;
Expand Down
2 changes: 2 additions & 0 deletions include/util.h
Expand Up @@ -70,3 +70,5 @@ bool cf_set_authdb(struct CfValue *cv, const char *value);

/* reserved database name checking */
bool check_reserved_database(const char *value);

bool strings_equal(const char *str_left, const char *str_right) _MUSTCHECK;
1 change: 1 addition & 0 deletions include/varcache.h
Expand Up @@ -19,6 +19,7 @@ void init_var_lookup(const char *cf_track_extra_parameters);
int get_num_var_cached(void);
bool varcache_set(VarCache *cache, const char *key, const char *value) /* _MUSTCHECK */;
bool varcache_apply(PgSocket *server, PgSocket *client, bool *changes_p) _MUSTCHECK;
void varcache_apply_startup(PktBuf *pkt, PgSocket *client);
void varcache_fill_unset(VarCache *src, PgSocket *dst);
void varcache_clean(VarCache *cache);
void varcache_add_params(PktBuf *pkt, VarCache *vars);
Expand Down
83 changes: 77 additions & 6 deletions src/client.c
Expand Up @@ -23,6 +23,7 @@
#include "bouncer.h"
#include "pam.h"
#include "scram.h"
#include "common/builtins.h"

#include <usual/pgutil.h>

Expand Down Expand Up @@ -267,11 +268,15 @@ static bool finish_set_pool(PgSocket *client, bool takeover)
if (client->sbuf.tls) {
char infobuf[96] = "";
tls_get_connection_info(client->sbuf.tls, infobuf, sizeof infobuf);
slog_info(client, "login attempt: db=%s user=%s tls=%s",
client->db->name, client->login_user->name, infobuf);
slog_info(client, "login attempt: db=%s user=%s tls=%s replication=%s",
client->db->name,
client->login_user->name,
infobuf,
replication_type_parameters[client->replication]);
} else {
slog_info(client, "login attempt: db=%s user=%s tls=no",
client->db->name, client->login_user->name);
slog_info(client, "login attempt: db=%s user=%s tls=no replication=%s",
client->db->name, client->login_user->name,
replication_type_parameters[client->replication]);
}
}

Expand All @@ -288,8 +293,13 @@ static bool finish_set_pool(PgSocket *client, bool takeover)

auth = cf_auth_type;
if (auth == AUTH_HBA) {
auth = hba_eval(parsed_hba, &client->remote_addr, !!client->sbuf.tls,
client->db->name, client->login_user->name);
auth = hba_eval(
parsed_hba,
&client->remote_addr,
!!client->sbuf.tls,
client->replication,
client->db->name,
client->login_user->name);
}

if (auth == AUTH_MD5) {
Expand Down Expand Up @@ -595,6 +605,25 @@ static bool set_startup_options(PgSocket *client, const char *options)
char arg_buf[400];
struct MBuf arg;
const char *position = options;

if (client->replication) {
/*
* Since replication clients will be bound 1-to-1 to a server
* connection, we can support any configuration flags and
* fields in the options startup parameter. Because we can
* simply send the exact same value for the options parameter
* when opening the replication connection to the server. This
* allows us to also support GUCs that don't have the
* GUC_REPORT flag, specifically extra_float_digits which is a
* configuration that is set by CREATE SUBSCRIPTION in the
* options parameter.
*/
client->startup_options = strdup(options);
if (!client->startup_options)
disconnect_client(client, true, "out of memory");
return true;
}

mbuf_init_fixed_writer(&arg, arg_buf, sizeof(arg_buf));
slog_debug(client, "received options: %s", options);

Expand Down Expand Up @@ -658,12 +687,52 @@ static void set_appname(PgSocket *client, const char *app_name)
}
}

/*
* set_replication sets the replication field on the client according the given
* replicationString.
*/
static bool set_replication(PgSocket *client, const char *replicationString)
{
bool replicationBool = false;
if (strcmp(replicationString, "database") == 0) {
client->replication = REPLICATION_LOGICAL;
return true;
}
if (!parse_bool(replicationString, &replicationBool)) {
return false;
}
client->replication = replicationBool ? REPLICATION_PHYSICAL : REPLICATION_NONE;
return true;
}

static bool decide_startup_pool(PgSocket *client, PktHdr *pkt)
{
const char *username = NULL, *dbname = NULL;
const char *key, *val;
bool ok;
bool appname_found = false;
unsigned original_read_pos = pkt->data.read_pos;

/*
* First check if we're dealing with a replication connection. Because for
* those we support some additional things when parsing the startup
* parameters, specifically we support any arguments in the options startup
* packet.
*/
while (1) {
ok = mbuf_get_string(&pkt->data, &key);
if (!ok || *key == 0)
break;
ok = mbuf_get_string(&pkt->data, &val);
if (!ok)
break;
if (strcmp(key, "replication") == 0) {
slog_debug(client, "got var: %s=%s", key, val);
set_replication(client, val);
}
}

pkt->data.read_pos = original_read_pos;

while (1) {
ok = mbuf_get_string(&pkt->data, &key);
Expand All @@ -685,6 +754,8 @@ static bool decide_startup_pool(PgSocket *client, PktHdr *pkt)
} else if (strcmp(key, "application_name") == 0) {
set_appname(client, val);
appname_found = true;
} else if (strcmp(key, "replication") == 0) {
/* do nothing, already checked in the previous loop */
} else if (varcache_set(&client->vars, key, val)) {
slog_debug(client, "got var: %s=%s", key, val);
} else if (strlist_contains(cf_ignore_startup_params, key)) {
Expand Down

0 comments on commit d6b65d1

Please sign in to comment.