Skip to content

Commit

Permalink
[#20756] YSQL: Store replication slot plugin name in yb-master
Browse files Browse the repository at this point in the history
Summary:
Up till now, the output plugin name was hardcoded to 'pgoutput'. This revision removes it and allows any plugin that has been loaded to work
with a replication slot. Note that this doesn't mean any output plugin that is loaded will work with YSQL. It has to be validated on a case-by-case
basis.

We now store the plugin name in yb-master as part of the slot/stream metadata. We use this stored metadata as part of streaming as well expose it in
the pg_replication_slots view.

**Upgrade/Rollback safety:**
Changes are disabled with the preview flag `yb_enable_replication_commands`.
Jira: DB-9753

Test Plan:
./yb_build.sh --java-test 'org.yb.pgsql.TestPgRegressReplicationSlot'
./yb_build.sh --java-test 'org.yb.pgsql.TestPgReplicationSlot'

Reviewers: asrinivasan, xCluster, hsunder

Reviewed By: asrinivasan

Subscribers: ybase, ycdcxcluster, yql

Differential Revision: https://phorge.dev.yugabyte.com/D34062
  • Loading branch information
dr0pdb committed May 6, 2024
1 parent bff3fb7 commit fa75045
Show file tree
Hide file tree
Showing 26 changed files with 164 additions and 59 deletions.
2 changes: 2 additions & 0 deletions src/postgres/src/backend/commands/ybccmds.c
Original file line number Diff line number Diff line change
Expand Up @@ -1889,6 +1889,7 @@ YBCValidatePlacement(const char *placement_info)

void
YBCCreateReplicationSlot(const char *slot_name,
const char *plugin_name,
CRSSnapshotAction snapshot_action,
uint64_t *consistent_snapshot_time)
{
Expand All @@ -1909,6 +1910,7 @@ YBCCreateReplicationSlot(const char *slot_name,
}

HandleYBStatus(YBCPgNewCreateReplicationSlot(slot_name,
plugin_name,
MyDatabaseId,
repl_slot_snapshot_action,
&handle));
Expand Down
10 changes: 10 additions & 0 deletions src/postgres/src/backend/replication/logical/logical.c
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,16 @@ LoadOutputPlugin(OutputPluginCallbacks *callbacks, char *plugin)
elog(ERROR, "output plugins have to register a commit callback");
}

void
YBValidateOutputPlugin(char *plugin)
{
OutputPluginCallbacks *callbacks;

callbacks = palloc(sizeof(OutputPluginCallbacks));
LoadOutputPlugin(callbacks, plugin);
pfree(callbacks);
}

static void
output_plugin_error_callback(void *arg)
{
Expand Down
8 changes: 4 additions & 4 deletions src/postgres/src/backend/replication/slot.c
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,6 @@ ReplicationSlot *MyReplicationSlot = NULL;
int max_replication_slots = 0; /* the maximum number of replication
* slots */

const char *PG_OUTPUT_PLUGIN = "pgoutput";

static void ReplicationSlotDropAcquired(void);
static void ReplicationSlotDropPtr(ReplicationSlot *slot);

Expand Down Expand Up @@ -226,6 +224,7 @@ ReplicationSlotValidateName(const char *name, int elevel)
void
ReplicationSlotCreate(const char *name, bool db_specific,
ReplicationSlotPersistency persistency,
char *yb_plugin_name,
CRSSnapshotAction yb_snapshot_action,
uint64_t *yb_consistent_snapshot_time)
{
Expand All @@ -243,7 +242,8 @@ ReplicationSlotCreate(const char *name, bool db_specific,
*/
if (IsYugaByteEnabled())
{
YBCCreateReplicationSlot(name, yb_snapshot_action, yb_consistent_snapshot_time);
YBCCreateReplicationSlot(name, yb_plugin_name, yb_snapshot_action,
yb_consistent_snapshot_time);
return;
}

Expand Down Expand Up @@ -371,7 +371,7 @@ ReplicationSlotAcquire(const char *name, bool nowait)

slot = palloc(sizeof(ReplicationSlot));
namestrcpy(&slot->data.name, yb_replication_slot->slot_name);
namestrcpy(&slot->data.plugin, PG_OUTPUT_PLUGIN);
namestrcpy(&slot->data.plugin, yb_replication_slot->output_plugin);
slot->data.database = yb_replication_slot->database_oid;
slot->data.persistency = RS_PERSISTENT;
strcpy(slot->data.yb_stream_id, yb_replication_slot->stream_id);
Expand Down
19 changes: 8 additions & 11 deletions src/postgres/src/backend/replication/slotfuncs.c
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
/* acquire replication slot, this will check for conflicting names */
ReplicationSlotCreate(NameStr(*name), false,
temporary ? RS_TEMPORARY : RS_PERSISTENT,
CRS_NOEXPORT_SNAPSHOT, NULL);
NULL /* yb_plugin_name */, CRS_NOEXPORT_SNAPSHOT,
NULL);

values[0] = NameGetDatum(&MyReplicationSlot->data.name);
nulls[0] = false;
Expand Down Expand Up @@ -152,15 +153,11 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
*
* This is different from PG where the validation is done after creating
* the replication slot on disk which is cleaned up in case of errors.
*
* TODO(#20756): Support other plugins such as test_decoding once we
* store replication slot metadata in yb-master.
*/
if (plugin == NULL || strcmp(NameStr(*plugin), PG_OUTPUT_PLUGIN) != 0)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid output plugin"),
errdetail("Only 'pgoutput' plugin is supported")));
if (plugin == NULL)
elog(ERROR, "cannot initialize logical decoding without a specified plugin");

YBValidateOutputPlugin(NameStr(*plugin));
}

check_permissions();
Expand All @@ -182,7 +179,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
*/
ReplicationSlotCreate(NameStr(*name), true,
temporary ? RS_TEMPORARY : RS_EPHEMERAL,
CRS_NOEXPORT_SNAPSHOT, NULL);
NameStr(*plugin), CRS_NOEXPORT_SNAPSHOT, NULL);

memset(nulls, 0, sizeof(nulls));

Expand Down Expand Up @@ -363,7 +360,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)

database = slot->database_oid;
namestrcpy(&slot_name, slot->slot_name);
namestrcpy(&plugin, PG_OUTPUT_PLUGIN);
namestrcpy(&plugin, slot->output_plugin);
yb_stream_id = slot->stream_id;
yb_stream_active = slot->active;

Expand Down
36 changes: 20 additions & 16 deletions src/postgres/src/backend/replication/walsender.c
Original file line number Diff line number Diff line change
Expand Up @@ -916,7 +916,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)

ReplicationSlotCreate(cmd->slotname, false,
cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT,
snapshot_action, NULL);
cmd->plugin, snapshot_action, NULL);
}
else
{
Expand All @@ -938,7 +938,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
*/
ReplicationSlotCreate(cmd->slotname, true,
cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL,
snapshot_action, NULL);
cmd->plugin, snapshot_action, NULL);
}
}

Expand Down Expand Up @@ -997,17 +997,6 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)

if (IsYugaByteEnabled())
{
/*
* TODO(#20756): Support other plugins such as test_decoding once we
* store replication slot metadata in yb-master.
*/
if (cmd->plugin == NULL ||
strcmp(cmd->plugin, PG_OUTPUT_PLUGIN) != 0)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid output plugin"),
errdetail("Only 'pgoutput' plugin is supported")));

if (cmd->temporary)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
Expand All @@ -1017,18 +1006,33 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
"issues/19263. React with thumbs up to raise"
" its priority")));

/*
* Validate output plugin requirement early so that we can avoid the
* expensive call to yb-master.
*
* This is different from PG where the validation is done after
* creating the replication slot on disk which is cleaned up in case
* of errors.
*/
if (cmd->plugin == NULL)
elog(ERROR, "cannot initialize logical decoding without a specified plugin");

YBValidateOutputPlugin(cmd->plugin);

/*
* 23 digits is an upper bound for the decimal representation of a uint64
*/
char consistent_snapshot_time_string[24];
uint64_t consistent_snapshot_time;
ReplicationSlotCreate(cmd->slotname, true, RS_PERSISTENT,
snapshot_action, &consistent_snapshot_time);
cmd->plugin, snapshot_action,
&consistent_snapshot_time);

if (snapshot_action == CRS_USE_SNAPSHOT)
{
snprintf(consistent_snapshot_time_string, sizeof(consistent_snapshot_time_string),
"%llu", (unsigned long long)consistent_snapshot_time);
snprintf(consistent_snapshot_time_string,
sizeof(consistent_snapshot_time_string), "%llu",
(unsigned long long) consistent_snapshot_time);
snapshot_name = pstrdup(consistent_snapshot_time_string);
}

Expand Down
1 change: 1 addition & 0 deletions src/postgres/src/include/commands/ybccmds.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ extern void YBCValidatePlacement(const char *placement_info);
/* Replication Slot Functions ------------------------------------------------------------------ */

extern void YBCCreateReplicationSlot(const char *slot_name,
const char *plugin_name,
CRSSnapshotAction snapshot_action,
uint64_t *consistent_snapshot_time);

Expand Down
2 changes: 2 additions & 0 deletions src/postgres/src/include/replication/logical.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,4 +141,6 @@ extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn);

extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);

extern void YBValidateOutputPlugin(char *plugin);

#endif
1 change: 1 addition & 0 deletions src/postgres/src/include/replication/slot.h
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ extern void ReplicationSlotsShmemInit(void);
/* management of individual slots */
extern void ReplicationSlotCreate(const char *name, bool db_specific,
ReplicationSlotPersistency p,
char *yb_plugin_name,
CRSSnapshotAction yb_snapshot_action,
uint64_t *yb_consistent_snapshot_time);
extern void ReplicationSlotPersist(void);
Expand Down
29 changes: 21 additions & 8 deletions src/postgres/src/test/regress/expected/yb_replication_slot.out
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,23 @@ SELECT * FROM pg_create_logical_replication_slot('testslot2', 'pgoutput', false)
testslot2 | 0/2
(1 row)

SELECT * FROM pg_create_logical_replication_slot('testslot_test_decoding', 'test_decoding', false);
slot_name | lsn
------------------------+-----
testslot_test_decoding | 0/2
(1 row)

-- Cannot do SELECT * since yb_stream_id, yb_restart_commit_ht changes across runs.
SELECT slot_name, plugin, slot_type, database, temporary, active,
active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn
FROM pg_replication_slots;
slot_name | plugin | slot_type | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn
-----------+----------+-----------+----------+-----------+--------+------------+------+--------------+-------------+---------------------
testslot2 | pgoutput | logical | yugabyte | f | f | | 1 | 1 | 0/1 | 0/2
testslot1 | pgoutput | logical | yugabyte | f | f | | 1 | 1 | 0/1 | 0/2
(2 rows)
FROM pg_replication_slots
ORDER BY slot_name;
slot_name | plugin | slot_type | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn
------------------------+---------------+-----------+----------+-----------+--------+------------+------+--------------+-------------+---------------------
testslot1 | pgoutput | logical | yugabyte | f | f | | 1 | 1 | 0/1 | 0/2
testslot2 | pgoutput | logical | yugabyte | f | f | | 1 | 1 | 0/1 | 0/2
testslot_test_decoding | test_decoding | logical | yugabyte | f | f | | 1 | 1 | 0/1 | 0/2
(3 rows)

-- drop the replication slot and create with same name again.
SELECT * FROM pg_drop_replication_slot('testslot1');
Expand All @@ -43,8 +51,7 @@ SELECT * FROM pg_create_logical_replication_slot('testslot1', 'pgoutput', false)

-- unsupported cases
SELECT * FROM pg_create_logical_replication_slot('testslot_unsupported_plugin', 'unsupported_plugin', false);
ERROR: invalid output plugin
DETAIL: Only 'pgoutput' plugin is supported
ERROR: could not access file "unsupported_plugin": No such file or directory
SELECT * FROM pg_create_logical_replication_slot('testslot_unsupported_temporary', 'pgoutput', true);
ERROR: Temporary replication slot is not yet supported
HINT: See https://github.com/yugabyte/yugabyte-db/issues/19263. React with thumbs up to raise its priority
Expand Down Expand Up @@ -86,6 +93,12 @@ SELECT * FROM pg_drop_replication_slot('testslot3');

(1 row)

SELECT * FROM pg_drop_replication_slot('testslot_test_decoding');
pg_drop_replication_slot
--------------------------

(1 row)

SELECT slot_name, plugin, slot_type, database, temporary, active,
active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn
FROM pg_replication_slots;
Expand Down
5 changes: 4 additions & 1 deletion src/postgres/src/test/regress/sql/yb_replication_slot.sql
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ SET SESSION AUTHORIZATION 'regress_replicationslot_user';

SELECT * FROM pg_create_logical_replication_slot('testslot1', 'pgoutput', false);
SELECT * FROM pg_create_logical_replication_slot('testslot2', 'pgoutput', false);
SELECT * FROM pg_create_logical_replication_slot('testslot_test_decoding', 'test_decoding', false);

-- Cannot do SELECT * since yb_stream_id, yb_restart_commit_ht changes across runs.
SELECT slot_name, plugin, slot_type, database, temporary, active,
active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn
FROM pg_replication_slots;
FROM pg_replication_slots
ORDER BY slot_name;

-- drop the replication slot and create with same name again.
SELECT * FROM pg_drop_replication_slot('testslot1');
Expand Down Expand Up @@ -42,6 +44,7 @@ RESET ROLE;
SELECT * FROM pg_drop_replication_slot('testslot1');
SELECT * FROM pg_drop_replication_slot('testslot2');
SELECT * FROM pg_drop_replication_slot('testslot3');
SELECT * FROM pg_drop_replication_slot('testslot_test_decoding');
SELECT slot_name, plugin, slot_type, database, temporary, active,
active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn
FROM pg_replication_slots;
Expand Down
5 changes: 3 additions & 2 deletions src/yb/cdc/cdc_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1062,8 +1062,9 @@ Status CDCServiceImpl::CreateCDCStreamForNamespace(
}

xrepl::StreamId db_stream_id = VERIFY_RESULT_OR_SET_CODE(
client()->CreateCDCSDKStreamForNamespace(ns_id, options, populate_namespace_id_as_table_id,
ReplicationSlotName(""), snapshot_option, deadline),
client()->CreateCDCSDKStreamForNamespace(
ns_id, options, populate_namespace_id_as_table_id, ReplicationSlotName(""), std::nullopt,
snapshot_option, deadline),
CDCError(CDCErrorPB::INTERNAL_ERROR));
resp->set_db_stream_id(db_stream_id.ToString());
return Status::OK();
Expand Down
4 changes: 4 additions & 0 deletions src/yb/client/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1516,6 +1516,7 @@ Result<xrepl::StreamId> YBClient::CreateCDCSDKStreamForNamespace(
const std::unordered_map<std::string, std::string>& options,
bool populate_namespace_id_as_table_id,
const ReplicationSlotName& replication_slot_name,
const std::optional<std::string>& replication_slot_plugin_name,
const std::optional<CDCSDKSnapshotOption>& consistent_snapshot_option,
CoarseTimePoint deadline,
uint64_t *consistent_snapshot_time) {
Expand All @@ -1539,6 +1540,9 @@ Result<xrepl::StreamId> YBClient::CreateCDCSDKStreamForNamespace(
if (consistent_snapshot_option.has_value()) {
req.set_cdcsdk_consistent_snapshot_option(*consistent_snapshot_option);
}
if (replication_slot_plugin_name.has_value()) {
req.set_cdcsdk_ysql_replication_slot_plugin_name(*replication_slot_plugin_name);
}

CreateCDCStreamResponsePB resp;
deadline = PatchAdminDeadline(deadline);
Expand Down
6 changes: 6 additions & 0 deletions src/yb/client/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ struct CDCSDKStreamInfo {
std::string stream_id;
uint32_t database_oid;
ReplicationSlotName cdcsdk_ysql_replication_slot_name;
std::string cdcsdk_ysql_replication_slot_plugin_name;
std::unordered_map<std::string, std::string> options;

template <class PB>
Expand All @@ -130,6 +131,9 @@ struct CDCSDKStreamInfo {
if (!cdcsdk_ysql_replication_slot_name.empty()) {
pb->set_slot_name(cdcsdk_ysql_replication_slot_name.ToString());
}
if (!cdcsdk_ysql_replication_slot_plugin_name.empty()) {
pb->set_output_plugin_name(cdcsdk_ysql_replication_slot_plugin_name);
}
}

template <class PB>
Expand All @@ -146,6 +150,7 @@ struct CDCSDKStreamInfo {
.database_oid = database_oid,
.cdcsdk_ysql_replication_slot_name =
ReplicationSlotName(pb.cdcsdk_ysql_replication_slot_name()),
.cdcsdk_ysql_replication_slot_plugin_name = pb.cdcsdk_ysql_replication_slot_plugin_name(),
.options = std::move(options)};

return stream_info;
Expand Down Expand Up @@ -609,6 +614,7 @@ class YBClient {
const NamespaceId& namespace_id, const std::unordered_map<std::string, std::string>& options,
bool populate_namespace_id_as_table_id = false,
const ReplicationSlotName& replication_slot_name = ReplicationSlotName(""),
const std::optional<std::string>& replication_slot_plugin_name = std::nullopt,
const std::optional<CDCSDKSnapshotOption>& consistent_snapshot_option = std::nullopt,
CoarseTimePoint deadline = CoarseTimePoint(),
uint64_t *consistent_snapshot_time = nullptr);
Expand Down
2 changes: 2 additions & 0 deletions src/yb/master/catalog_entity_info.proto
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,8 @@ message SysCDCStreamEntryPB {
// This map stores the replica identity for each table in the stream at the time of stream
// creation. The key for this map is table_id and the value is replica identity for that table.
map<string, PgReplicaIdentity> replica_identity_map = 9;

optional string cdcsdk_ysql_replication_slot_plugin_name = 10;
}


Expand Down
3 changes: 3 additions & 0 deletions src/yb/master/master_replication.proto
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ message CDCStreamInfoPB {
// This map stores the replica identity for each table in the stream at the time of stream
// creation. The key for this map is table_id and the value is replica identity for that table.
map<string, PgReplicaIdentity> replica_identity_map = 10;

optional string cdcsdk_ysql_replication_slot_plugin_name = 11;
}

message ValidateReplicationInfoRequestPB {
Expand All @@ -63,6 +65,7 @@ message CreateCDCStreamRequestPB {
optional string namespace_id = 7;
optional string cdcsdk_ysql_replication_slot_name = 8;
optional CDCSDKSnapshotOption cdcsdk_consistent_snapshot_option = 9;
optional string cdcsdk_ysql_replication_slot_plugin_name = 10;
}

message CreateCDCStreamResponsePB {
Expand Down

0 comments on commit fa75045

Please sign in to comment.