Skip to content

Commit

Permalink
[#22143] YSQL: Log the time taken in yb_decode and reorder buffer
Browse files Browse the repository at this point in the history
Summary:
This revision adds computation and logging of the time taken by the Walsender in yb_decode and reorderbuffer while processing a single batch from the CDC
service.
Jira: DB-11071

Test Plan:
Jenkins: compile only

Existing tests. Looked at the log manually in a local test run.

Reviewers: asrinivasan

Reviewed By: asrinivasan

Subscribers: ycdcxcluster, yql

Differential Revision: https://phorge.dev.yugabyte.com/D34530
  • Loading branch information
dr0pdb committed Apr 25, 2024
1 parent fb075bf commit 5e24eff
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 16 deletions.
31 changes: 31 additions & 0 deletions src/postgres/src/backend/replication/logical/reorderbuffer.c
Expand Up @@ -82,6 +82,7 @@

/* YB includes. */
#include "pg_yb_utils.h"
#include "replication/walsender_private.h"

/* entry for a hash table we use to map from xid to our transaction state */
typedef struct ReorderBufferTXNByIdEnt
Expand Down Expand Up @@ -437,6 +438,8 @@ ReorderBufferGetTupleBuf(ReorderBuffer *rb, Size tuple_len)
ReorderBufferTupleBuf *tuple;
Size alloc_len;

TimestampTz yb_start_time = GetCurrentTimestamp();

alloc_len = tuple_len + SizeofHeapTupleHeader;

tuple = (ReorderBufferTupleBuf *)
Expand All @@ -447,7 +450,11 @@ ReorderBufferGetTupleBuf(ReorderBuffer *rb, Size tuple_len)
tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);

if (IsYugaByteEnabled())
{
tuple->yb_is_omitted = NULL;
YbWalSndTotalTimeInReorderBufferMicros +=
YbCalculateTimeDifferenceInMicros(yb_start_time);
}

return tuple;
}
Expand Down Expand Up @@ -590,6 +597,8 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
{
ReorderBufferTXN *txn;

TimestampTz yb_start_time = GetCurrentTimestamp();

txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);

change->lsn = lsn;
Expand All @@ -599,6 +608,10 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
txn->nentries_mem++;

ReorderBufferCheckSerializeTXN(rb, txn);

if (IsYugaByteEnabled())
YbWalSndTotalTimeInReorderBufferMicros +=
YbCalculateTimeDifferenceInMicros(yb_start_time);
}

/*
Expand Down Expand Up @@ -1446,6 +1459,8 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
bool using_subtxn;
ReorderBufferIterTXNState *volatile iterstate = NULL;

TimestampTz yb_start_time = GetCurrentTimestamp();

txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
false);

Expand Down Expand Up @@ -1878,6 +1893,10 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
PG_RE_THROW();
}
PG_END_TRY();

if (IsYugaByteEnabled())
YbWalSndTotalTimeInReorderBufferMicros +=
YbCalculateTimeDifferenceInMicros(yb_start_time);
}

/*
Expand Down Expand Up @@ -1981,6 +2000,8 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
{
ReorderBufferTXN *txn;

TimestampTz yb_start_time = GetCurrentTimestamp();

txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
false);

Expand All @@ -2004,6 +2025,10 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)

/* remove potential on-disk data, and deallocate */
ReorderBufferCleanupTXN(rb, txn);

if (IsYugaByteEnabled())
YbWalSndTotalTimeInReorderBufferMicros +=
YbCalculateTimeDifferenceInMicros(yb_start_time);
}

/*
Expand Down Expand Up @@ -2051,9 +2076,15 @@ ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations,
void
ReorderBufferProcessXid(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
{
TimestampTz yb_start_time = GetCurrentTimestamp();

/* many records won't have an xid assigned, centralize check here */
if (xid != InvalidTransactionId)
ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);

if (IsYugaByteEnabled())
YbWalSndTotalTimeInReorderBufferMicros +=
YbCalculateTimeDifferenceInMicros(yb_start_time);
}

/*
Expand Down
6 changes: 6 additions & 0 deletions src/postgres/src/backend/replication/logical/yb_decode.c
Expand Up @@ -29,6 +29,7 @@

#include "access/xact.h"
#include "pg_yb_utils.h"
#include "replication/walsender_private.h"
#include "replication/yb_decode.h"
#include "utils/rel.h"
#include "yb/yql/pggate/ybc_pg_typedefs.h"
Expand Down Expand Up @@ -71,6 +72,8 @@ void
YBLogicalDecodingProcessRecord(LogicalDecodingContext *ctx,
XLogReaderState *record)
{
TimestampTz start_time = GetCurrentTimestamp();

elog(DEBUG4,
"YBLogicalDecodingProcessRecord: Decoding record with action = %d.",
record->yb_virtual_wal_record->action);
Expand Down Expand Up @@ -129,6 +132,9 @@ YBLogicalDecodingProcessRecord(LogicalDecodingContext *ctx,
case YB_PG_ROW_MESSAGE_ACTION_UNKNOWN:
pg_unreachable();
}

YbWalSndTotalTimeInYBDecodeMicros +=
YbCalculateTimeDifferenceInMicros(start_time);
}

/*
Expand Down
Expand Up @@ -248,6 +248,8 @@ YBCReadRecord(XLogReaderState *state, XLogRecPtr RecPtr,
&cached_records);

cached_records_last_sent_row_idx = 0;
YbWalSndTotalTimeInYBDecodeMicros = 0;
YbWalSndTotalTimeInReorderBufferMicros = 0;
YbWalSndTotalTimeInSendingMicros = 0;
last_getconsistentchanges_response_receipt_time = GetCurrentTimestamp();
}
Expand Down Expand Up @@ -317,10 +319,26 @@ PreProcessBeforeFetchingNextBatch()
{
TimestampDifference(last_getconsistentchanges_response_receipt_time,
GetCurrentTimestamp(), &secs, &microsecs);

/*
* Note that this processing time does not include the time taken for
* the conversion from QLValuePB (proto) to PG datum values. This is
* done in ybc_pggate and is logged separately.
*
* The time being logged here is the total time it took for processing
* and sending a whole batch AFTER converting all the values to the PG
* format.
*/
elog(DEBUG1,
"Walsender processing time for the last batch is (%ld s, %d us)",
secs, microsecs);
elog(DEBUG1, "Time spent in sending data (socket): %" PRIu64 " us",
elog(DEBUG1,
"Time Distribution. "
"yb_decode: %" PRIu64 " us, "
"reorder buffer: %" PRIu64 " us, "
"socket: %" PRIu64 " us.",
YbWalSndTotalTimeInYBDecodeMicros,
YbWalSndTotalTimeInReorderBufferMicros,
YbWalSndTotalTimeInSendingMicros);
}

Expand Down
40 changes: 25 additions & 15 deletions src/postgres/src/backend/replication/walsender.c
Expand Up @@ -206,6 +206,27 @@ static volatile sig_atomic_t replication_active = false;
static LogicalDecodingContext *logical_decoding_ctx = NULL;
static XLogRecPtr logical_startptr = InvalidXLogRecPtr;

/*
* Total time spent in the yb_decode steps in a batch of changes. This includes
* the time spent in:
* 1. Creating heap tuples
* 2. Storing heap tuples in the reorder buffer
* 3. Processing time of the reorder buffer
* 4. Processing time of the output plugin
* 5. Sending the data to the client including the socket time
*/
uint64_t YbWalSndTotalTimeInYBDecodeMicros = 0;
/*
* Total time spent in the reorderbuffer steps in a batch of changes. This
* includes the time spent in:
* 1. Storing heap tuples in the reorder buffer
* 2. Processing time of the reorder buffer
* 3. Processing time of the output plugin
* 4. Sending the data to the client including the socket time
*
* A subset of the yb_decode time.
*/
uint64_t YbWalSndTotalTimeInReorderBufferMicros = 0;
/* Total time spent in the WalSndWriteData function in a batch of changes. */
uint64_t YbWalSndTotalTimeInSendingMicros = 0;

Expand Down Expand Up @@ -265,8 +286,6 @@ static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);

static void XLogRead(char *buf, XLogRecPtr startptr, Size count);

static void YbWalSndUpdateTotalTimeInSendingMicros(TimestampTz yb_start_time);

/* Initialize walsender process before entering the main command loop */
void
InitWalSender(void)
Expand Down Expand Up @@ -1346,7 +1365,8 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
!pq_is_send_pending())
{
if (IsYugaByteEnabled())
YbWalSndUpdateTotalTimeInSendingMicros(yb_start_time);
YbWalSndTotalTimeInSendingMicros +=
YbCalculateTimeDifferenceInMicros(yb_start_time);

return;
}
Expand Down Expand Up @@ -1405,23 +1425,13 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
}

if (IsYugaByteEnabled())
YbWalSndUpdateTotalTimeInSendingMicros(yb_start_time);
YbWalSndTotalTimeInSendingMicros +=
YbCalculateTimeDifferenceInMicros(yb_start_time);

/* reactivate latch so WalSndLoop knows to continue */
SetLatch(MyLatch);
}

static void
YbWalSndUpdateTotalTimeInSendingMicros(TimestampTz yb_start_time)
{
long secs;
int microsecs;

TimestampDifference(yb_start_time, GetCurrentTimestamp(), &secs,
&microsecs);
YbWalSndTotalTimeInSendingMicros += (secs * USECS_PER_SEC + microsecs);
}

/*
* LogicalDecodingContext 'update_progress' callback.
*
Expand Down
11 changes: 11 additions & 0 deletions src/postgres/src/backend/utils/misc/pg_yb_utils.c
Expand Up @@ -4722,3 +4722,14 @@ YBCUpdateYbReadTimeAndInvalidateRelcache(uint64_t read_time_ht)
assign_yb_read_time(read_time, NULL);
YbRelationCacheInvalidate();
}

uint64_t
YbCalculateTimeDifferenceInMicros(TimestampTz yb_start_time)
{
long secs;
int microsecs;

TimestampDifference(yb_start_time, GetCurrentTimestamp(), &secs,
&microsecs);
return secs * USECS_PER_SEC + microsecs;
}
3 changes: 3 additions & 0 deletions src/postgres/src/include/pg_yb_utils.h
Expand Up @@ -1116,4 +1116,7 @@ extern Relation YbGetRelationWithOverwrittenReplicaIdentity(Oid relid,
char replident);

extern void YBCUpdateYbReadTimeAndInvalidateRelcache(uint64_t read_time);

extern uint64_t YbCalculateTimeDifferenceInMicros(TimestampTz yb_start_time);

#endif /* PG_YB_UTILS_H */
2 changes: 2 additions & 0 deletions src/postgres/src/include/replication/walsender_private.h
Expand Up @@ -106,6 +106,8 @@ typedef struct

extern WalSndCtlData *WalSndCtl;

extern uint64_t YbWalSndTotalTimeInYBDecodeMicros;
extern uint64_t YbWalSndTotalTimeInReorderBufferMicros;
extern uint64_t YbWalSndTotalTimeInSendingMicros;

extern void WalSndSetState(WalSndState state);
Expand Down

0 comments on commit 5e24eff

Please sign in to comment.