Skip to content

Commit

Permalink
[BACKPORT 2024.1][#21950][#22133][#22143] YSQL: Backport Walsender de…
Browse files Browse the repository at this point in the history
…bug logs and unacked txn fix

Summary:
##### Backport Description
All merges were clean. No conflicts.

##### Original Description
Original commits:
1b902ba / D34327
48a0279 / D34483
5e24eff / D34530

###### YSQL: Insert transaction into unacked txn list only upon receipt of commit record
We maintain a list of unacked transactions for the calculation of the restart_lsn from the confirmed_flush lsn. Prior to this revision, this list also included a transaction at
the end for which we haven't yet received the COMMIT record from the CDC service. Such a transaction is stored with commit_lsn as InvalidXLogRecPtr (0) and was leading to issues in the calculation in the restart_lsn.

This revision updates the logic to put a transaction into the list only on the receipt of the commit record.
Jira: DB-10866

###### YSQL: Log the time taken in converting from QLValuePB to PG datum in walsender
The walsender spends a considerable amount of time in converting QLValuePB to PG datum in ybc_pggate. Add a VLOG(1) which logs the time taken in this
operation.
Jira: DB-11059

###### YSQL: Log the time taken in yb_decode and reorder buffer
This revision adds computation and logging of the time taken by the Walsender in yb_decode and reorderbuffer while processing a single batch from the CDC
service.
Jira: DB-11071

Test Plan: Jenkins: test regex: .*ReplicationSlot.*

Reviewers: asrinivasan, skumar

Reviewed By: skumar

Subscribers: yql, ycdcxcluster

Tags: #jenkins-ready

Differential Revision: https://phorge.dev.yugabyte.com/D34535
  • Loading branch information
dr0pdb committed Apr 29, 2024
1 parent 3e5cb1e commit b35c588
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 40 deletions.
31 changes: 31 additions & 0 deletions src/postgres/src/backend/replication/logical/reorderbuffer.c
Expand Up @@ -82,6 +82,7 @@

/* YB includes. */
#include "pg_yb_utils.h"
#include "replication/walsender_private.h"

/* entry for a hash table we use to map from xid to our transaction state */
typedef struct ReorderBufferTXNByIdEnt
Expand Down Expand Up @@ -437,6 +438,8 @@ ReorderBufferGetTupleBuf(ReorderBuffer *rb, Size tuple_len)
ReorderBufferTupleBuf *tuple;
Size alloc_len;

TimestampTz yb_start_time = GetCurrentTimestamp();

alloc_len = tuple_len + SizeofHeapTupleHeader;

tuple = (ReorderBufferTupleBuf *)
Expand All @@ -447,7 +450,11 @@ ReorderBufferGetTupleBuf(ReorderBuffer *rb, Size tuple_len)
tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);

if (IsYugaByteEnabled())
{
tuple->yb_is_omitted = NULL;
YbWalSndTotalTimeInReorderBufferMicros +=
YbCalculateTimeDifferenceInMicros(yb_start_time);
}

return tuple;
}
Expand Down Expand Up @@ -590,6 +597,8 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
{
ReorderBufferTXN *txn;

TimestampTz yb_start_time = GetCurrentTimestamp();

txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);

change->lsn = lsn;
Expand All @@ -599,6 +608,10 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
txn->nentries_mem++;

ReorderBufferCheckSerializeTXN(rb, txn);

if (IsYugaByteEnabled())
YbWalSndTotalTimeInReorderBufferMicros +=
YbCalculateTimeDifferenceInMicros(yb_start_time);
}

/*
Expand Down Expand Up @@ -1446,6 +1459,8 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
bool using_subtxn;
ReorderBufferIterTXNState *volatile iterstate = NULL;

TimestampTz yb_start_time = GetCurrentTimestamp();

txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
false);

Expand Down Expand Up @@ -1878,6 +1893,10 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
PG_RE_THROW();
}
PG_END_TRY();

if (IsYugaByteEnabled())
YbWalSndTotalTimeInReorderBufferMicros +=
YbCalculateTimeDifferenceInMicros(yb_start_time);
}

/*
Expand Down Expand Up @@ -1981,6 +2000,8 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
{
ReorderBufferTXN *txn;

TimestampTz yb_start_time = GetCurrentTimestamp();

txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
false);

Expand All @@ -2004,6 +2025,10 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)

/* remove potential on-disk data, and deallocate */
ReorderBufferCleanupTXN(rb, txn);

if (IsYugaByteEnabled())
YbWalSndTotalTimeInReorderBufferMicros +=
YbCalculateTimeDifferenceInMicros(yb_start_time);
}

/*
Expand Down Expand Up @@ -2051,9 +2076,15 @@ ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations,
void
ReorderBufferProcessXid(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
{
TimestampTz yb_start_time = GetCurrentTimestamp();

/* many records won't have an xid assigned, centralize check here */
if (xid != InvalidTransactionId)
ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);

if (IsYugaByteEnabled())
YbWalSndTotalTimeInReorderBufferMicros +=
YbCalculateTimeDifferenceInMicros(yb_start_time);
}

/*
Expand Down
6 changes: 6 additions & 0 deletions src/postgres/src/backend/replication/logical/yb_decode.c
Expand Up @@ -29,6 +29,7 @@

#include "access/xact.h"
#include "pg_yb_utils.h"
#include "replication/walsender_private.h"
#include "replication/yb_decode.h"
#include "utils/rel.h"
#include "yb/yql/pggate/ybc_pg_typedefs.h"
Expand Down Expand Up @@ -71,6 +72,8 @@ void
YBLogicalDecodingProcessRecord(LogicalDecodingContext *ctx,
XLogReaderState *record)
{
TimestampTz start_time = GetCurrentTimestamp();

elog(DEBUG4,
"YBLogicalDecodingProcessRecord: Decoding record with action = %d.",
record->yb_virtual_wal_record->action);
Expand Down Expand Up @@ -129,6 +132,9 @@ YBLogicalDecodingProcessRecord(LogicalDecodingContext *ctx,
case YB_PG_ROW_MESSAGE_ACTION_UNKNOWN:
pg_unreachable();
}

YbWalSndTotalTimeInYBDecodeMicros +=
YbCalculateTimeDifferenceInMicros(start_time);
}

/*
Expand Down
Expand Up @@ -42,6 +42,7 @@ static YBCPgChangeRecordBatch *cached_records = NULL;
static size_t cached_records_last_sent_row_idx = 0;
static bool last_getconsistentchanges_response_empty = false;
static TimestampTz last_getconsistentchanges_response_receipt_time;
static XLogRecPtr last_txn_begin_lsn = InvalidXLogRecPtr;

/*
* Marker to refresh publication's tables list. If set to true call
Expand Down Expand Up @@ -139,6 +140,7 @@ YBCInitVirtualWal(List *yb_publication_names)
unacked_transactions = NIL;
last_getconsistentchanges_response_empty = false;
last_getconsistentchanges_response_receipt_time = 0;
last_txn_begin_lsn = InvalidXLogRecPtr;

needs_publication_table_list_refresh = false;
}
Expand Down Expand Up @@ -183,7 +185,8 @@ InitVirtualWal(List *publication_names)
List *tables;
Oid *table_oids;

YBCUpdateYbReadTimeAndInvalidateRelcache(MyReplicationSlot->data.yb_last_pub_refresh_time);
YBCUpdateYbReadTimeAndInvalidateRelcache(
MyReplicationSlot->data.yb_last_pub_refresh_time);

tables = YBCGetTables(publication_names);
table_oids = YBCGetTableOids(tables);
Expand Down Expand Up @@ -245,6 +248,8 @@ YBCReadRecord(XLogReaderState *state, XLogRecPtr RecPtr,
&cached_records);

cached_records_last_sent_row_idx = 0;
YbWalSndTotalTimeInYBDecodeMicros = 0;
YbWalSndTotalTimeInReorderBufferMicros = 0;
YbWalSndTotalTimeInSendingMicros = 0;
last_getconsistentchanges_response_receipt_time = GetCurrentTimestamp();
}
Expand Down Expand Up @@ -314,10 +319,26 @@ PreProcessBeforeFetchingNextBatch()
{
TimestampDifference(last_getconsistentchanges_response_receipt_time,
GetCurrentTimestamp(), &secs, &microsecs);

/*
* Note that this processing time does not include the time taken for
* the conversion from QLValuePB (proto) to PG datum values. This is
* done in ybc_pggate and is logged separately.
*
* The time being logged here is the total time it took for processing
* and sending a whole batch AFTER converting all the values to the PG
* format.
*/
elog(DEBUG1,
"Walsender processing time for the last batch is (%ld s, %d us)",
secs, microsecs);
elog(DEBUG1, "Time spent in sending data (socket): %" PRIu64 " us",
elog(DEBUG1,
"Time Distribution. "
"yb_decode: %" PRIu64 " us, "
"reorder buffer: %" PRIu64 " us, "
"socket: %" PRIu64 " us.",
YbWalSndTotalTimeInYBDecodeMicros,
YbWalSndTotalTimeInReorderBufferMicros,
YbWalSndTotalTimeInSendingMicros);
}

Expand All @@ -329,7 +350,6 @@ PreProcessBeforeFetchingNextBatch()
static void
TrackUnackedTransaction(YBCPgVirtualWalRecord *record)
{
YBUnackedTransactionInfo *transaction = NULL;
MemoryContext caller_context;

caller_context = GetCurrentMemoryContext();
Expand All @@ -339,32 +359,20 @@ TrackUnackedTransaction(YBCPgVirtualWalRecord *record)
{
case YB_PG_ROW_MESSAGE_ACTION_BEGIN:
{
transaction = palloc(sizeof(YBUnackedTransactionInfo));
transaction->xid = record->xid;
transaction->begin_lsn = record->lsn;
transaction->commit_lsn = InvalidXLogRecPtr;

unacked_transactions = lappend(unacked_transactions, transaction);
last_txn_begin_lsn = record->lsn;
break;
}

case YB_PG_ROW_MESSAGE_ACTION_COMMIT:
{
YBUnackedTransactionInfo *txninfo = NULL;

/*
* We should at least have one transaction which we appended while
* handling the corresponding BEGIN record.
*/
Assert(list_length(unacked_transactions) > 0);

txninfo = (YBUnackedTransactionInfo *) lfirst(
list_tail((List *) unacked_transactions));
Assert(txninfo->xid == record->xid);
Assert(txninfo->begin_lsn != InvalidXLogRecPtr);
Assert(txninfo->commit_lsn == InvalidXLogRecPtr);
YBUnackedTransactionInfo *transaction =
palloc(sizeof(YBUnackedTransactionInfo));
transaction->xid = record->xid;
Assert(last_txn_begin_lsn != InvalidXLogRecPtr);
transaction->begin_lsn = last_txn_begin_lsn;
transaction->commit_lsn = record->lsn;

txninfo->commit_lsn = record->lsn;
unacked_transactions = lappend(unacked_transactions, transaction);
break;
}

Expand Down Expand Up @@ -416,8 +424,8 @@ YBCCalculatePersistAndGetRestartLSN(XLogRecPtr confirmed_flush)
elog(DEBUG1, "Updating confirmed_flush to %lu and restart_lsn_hint to %lu",
confirmed_flush, restart_lsn_hint);

YBCUpdateAndPersistLSN(MyReplicationSlot->data.yb_stream_id, restart_lsn_hint,
confirmed_flush, &restart_lsn);
YBCUpdateAndPersistLSN(MyReplicationSlot->data.yb_stream_id,
restart_lsn_hint, confirmed_flush, &restart_lsn);

elog(DEBUG1, "The restart_lsn calculated by the virtual wal is %" PRIu64,
restart_lsn);
Expand Down
40 changes: 25 additions & 15 deletions src/postgres/src/backend/replication/walsender.c
Expand Up @@ -206,6 +206,27 @@ static volatile sig_atomic_t replication_active = false;
static LogicalDecodingContext *logical_decoding_ctx = NULL;
static XLogRecPtr logical_startptr = InvalidXLogRecPtr;

/*
* Total time spent in the yb_decode steps in a batch of changes. This includes
* the time spent in:
* 1. Creating heap tuples
* 2. Storing heap tuples in the reorder buffer
* 3. Processing time of the reorder buffer
* 4. Processing time of the output plugin
* 5. Sending the data to the client including the socket time
*/
uint64_t YbWalSndTotalTimeInYBDecodeMicros = 0;
/*
* Total time spent in the reorderbuffer steps in a batch of changes. This
* includes the time spent in:
* 1. Storing heap tuples in the reorder buffer
* 2. Processing time of the reorder buffer
* 3. Processing time of the output plugin
* 4. Sending the data to the client including the socket time
*
* A subset of the yb_decode time.
*/
uint64_t YbWalSndTotalTimeInReorderBufferMicros = 0;
/* Total time spent in the WalSndWriteData function in a batch of changes. */
uint64_t YbWalSndTotalTimeInSendingMicros = 0;

Expand Down Expand Up @@ -265,8 +286,6 @@ static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);

static void XLogRead(char *buf, XLogRecPtr startptr, Size count);

static void YbWalSndUpdateTotalTimeInSendingMicros(TimestampTz yb_start_time);

/* Initialize walsender process before entering the main command loop */
void
InitWalSender(void)
Expand Down Expand Up @@ -1346,7 +1365,8 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
!pq_is_send_pending())
{
if (IsYugaByteEnabled())
YbWalSndUpdateTotalTimeInSendingMicros(yb_start_time);
YbWalSndTotalTimeInSendingMicros +=
YbCalculateTimeDifferenceInMicros(yb_start_time);

return;
}
Expand Down Expand Up @@ -1405,23 +1425,13 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
}

if (IsYugaByteEnabled())
YbWalSndUpdateTotalTimeInSendingMicros(yb_start_time);
YbWalSndTotalTimeInSendingMicros +=
YbCalculateTimeDifferenceInMicros(yb_start_time);

/* reactivate latch so WalSndLoop knows to continue */
SetLatch(MyLatch);
}

static void
YbWalSndUpdateTotalTimeInSendingMicros(TimestampTz yb_start_time)
{
long secs;
int microsecs;

TimestampDifference(yb_start_time, GetCurrentTimestamp(), &secs,
&microsecs);
YbWalSndTotalTimeInSendingMicros += (secs * USECS_PER_SEC + microsecs);
}

/*
* LogicalDecodingContext 'update_progress' callback.
*
Expand Down
11 changes: 11 additions & 0 deletions src/postgres/src/backend/utils/misc/pg_yb_utils.c
Expand Up @@ -4716,3 +4716,14 @@ YBCUpdateYbReadTimeAndInvalidateRelcache(uint64_t read_time_ht)
assign_yb_read_time(read_time, NULL);
YbRelationCacheInvalidate();
}

uint64_t
YbCalculateTimeDifferenceInMicros(TimestampTz yb_start_time)
{
long secs;
int microsecs;

TimestampDifference(yb_start_time, GetCurrentTimestamp(), &secs,
&microsecs);
return secs * USECS_PER_SEC + microsecs;
}
3 changes: 3 additions & 0 deletions src/postgres/src/include/pg_yb_utils.h
Expand Up @@ -1116,4 +1116,7 @@ extern Relation YbGetRelationWithOverwrittenReplicaIdentity(Oid relid,
char replident);

extern void YBCUpdateYbReadTimeAndInvalidateRelcache(uint64_t read_time);

extern uint64_t YbCalculateTimeDifferenceInMicros(TimestampTz yb_start_time);

#endif /* PG_YB_UTILS_H */
2 changes: 2 additions & 0 deletions src/postgres/src/include/replication/walsender_private.h
Expand Up @@ -106,6 +106,8 @@ typedef struct

extern WalSndCtlData *WalSndCtl;

extern uint64_t YbWalSndTotalTimeInYBDecodeMicros;
extern uint64_t YbWalSndTotalTimeInReorderBufferMicros;
extern uint64_t YbWalSndTotalTimeInSendingMicros;

extern void WalSndSetState(WalSndState state);
Expand Down
5 changes: 5 additions & 0 deletions src/yb/yql/pggate/ybc_pggate.cc
Expand Up @@ -2357,6 +2357,7 @@ YBCStatus YBCPgGetCDCConsistentChanges(
*DCHECK_NOTNULL(record_batch) = NULL;
const auto resp = result.get();
VLOG(4) << "The GetConsistentChangesForCDC response: " << resp.DebugString();
auto response_to_pg_conversion_start = GetCurrentTimeMicros();
auto row_count = resp.cdc_sdk_proto_records_size();

// Used for logging a summary of the response received from the CDC service.
Expand Down Expand Up @@ -2502,6 +2503,10 @@ YBCStatus YBCPgGetCDCConsistentChanges(
VLOG(1) << "Received 0 rows in GetConsistentChangesResponsePB response\n";
}

auto time_in_conversion = GetCurrentTimeMicros() - response_to_pg_conversion_start;
VLOG(1) << "Time spent in converting from QLValuePB to PG datum in PgGate: "
<< time_in_conversion << " us";

return YBCStatusOK();
}

Expand Down

0 comments on commit b35c588

Please sign in to comment.