diff --git a/src/postgres/src/backend/replication/logical/reorderbuffer.c b/src/postgres/src/backend/replication/logical/reorderbuffer.c index 52fb08e4461..691d4785877 100644 --- a/src/postgres/src/backend/replication/logical/reorderbuffer.c +++ b/src/postgres/src/backend/replication/logical/reorderbuffer.c @@ -82,6 +82,7 @@ /* YB includes. */ #include "pg_yb_utils.h" +#include "replication/walsender_private.h" /* entry for a hash table we use to map from xid to our transaction state */ typedef struct ReorderBufferTXNByIdEnt @@ -437,6 +438,8 @@ ReorderBufferGetTupleBuf(ReorderBuffer *rb, Size tuple_len) ReorderBufferTupleBuf *tuple; Size alloc_len; + TimestampTz yb_start_time = GetCurrentTimestamp(); + alloc_len = tuple_len + SizeofHeapTupleHeader; tuple = (ReorderBufferTupleBuf *) @@ -447,7 +450,11 @@ ReorderBufferGetTupleBuf(ReorderBuffer *rb, Size tuple_len) tuple->tuple.t_data = ReorderBufferTupleBufData(tuple); if (IsYugaByteEnabled()) + { tuple->yb_is_omitted = NULL; + YbWalSndTotalTimeInReorderBufferMicros += + YbCalculateTimeDifferenceInMicros(yb_start_time); + } return tuple; } @@ -590,6 +597,8 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, { ReorderBufferTXN *txn; + TimestampTz yb_start_time = GetCurrentTimestamp(); + txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); change->lsn = lsn; @@ -599,6 +608,10 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, txn->nentries_mem++; ReorderBufferCheckSerializeTXN(rb, txn); + + if (IsYugaByteEnabled()) + YbWalSndTotalTimeInReorderBufferMicros += + YbCalculateTimeDifferenceInMicros(yb_start_time); } /* @@ -1446,6 +1459,8 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, bool using_subtxn; ReorderBufferIterTXNState *volatile iterstate = NULL; + TimestampTz yb_start_time = GetCurrentTimestamp(); + txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false); @@ -1878,6 +1893,10 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, PG_RE_THROW(); } PG_END_TRY(); + + if (IsYugaByteEnabled()) + YbWalSndTotalTimeInReorderBufferMicros += + YbCalculateTimeDifferenceInMicros(yb_start_time); } /* @@ -1981,6 +2000,8 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn) { ReorderBufferTXN *txn; + TimestampTz yb_start_time = GetCurrentTimestamp(); + txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false); @@ -2004,6 +2025,10 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn) /* remove potential on-disk data, and deallocate */ ReorderBufferCleanupTXN(rb, txn); + + if (IsYugaByteEnabled()) + YbWalSndTotalTimeInReorderBufferMicros += + YbCalculateTimeDifferenceInMicros(yb_start_time); } /* @@ -2051,9 +2076,15 @@ ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, void ReorderBufferProcessXid(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn) { + TimestampTz yb_start_time = GetCurrentTimestamp(); + /* many records won't have an xid assigned, centralize check here */ if (xid != InvalidTransactionId) ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); + + if (IsYugaByteEnabled()) + YbWalSndTotalTimeInReorderBufferMicros += + YbCalculateTimeDifferenceInMicros(yb_start_time); } /* diff --git a/src/postgres/src/backend/replication/logical/yb_decode.c b/src/postgres/src/backend/replication/logical/yb_decode.c index d81fa387755..7c0751013ff 100644 --- a/src/postgres/src/backend/replication/logical/yb_decode.c +++ b/src/postgres/src/backend/replication/logical/yb_decode.c @@ -29,6 +29,7 @@ #include "access/xact.h" #include "pg_yb_utils.h" +#include "replication/walsender_private.h" #include "replication/yb_decode.h" #include "utils/rel.h" #include "yb/yql/pggate/ybc_pg_typedefs.h" @@ -71,6 +72,8 @@ void YBLogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record) { + TimestampTz start_time = GetCurrentTimestamp(); + elog(DEBUG4, "YBLogicalDecodingProcessRecord: Decoding record with action = %d.", record->yb_virtual_wal_record->action); @@ -129,6 +132,9 @@ YBLogicalDecodingProcessRecord(LogicalDecodingContext *ctx, case YB_PG_ROW_MESSAGE_ACTION_UNKNOWN: pg_unreachable(); } + + YbWalSndTotalTimeInYBDecodeMicros += + YbCalculateTimeDifferenceInMicros(start_time); } /* diff --git a/src/postgres/src/backend/replication/logical/yb_virtual_wal_client.c b/src/postgres/src/backend/replication/logical/yb_virtual_wal_client.c index 7f9bc72392d..d903ef91b7b 100644 --- a/src/postgres/src/backend/replication/logical/yb_virtual_wal_client.c +++ b/src/postgres/src/backend/replication/logical/yb_virtual_wal_client.c @@ -248,6 +248,8 @@ YBCReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, &cached_records); cached_records_last_sent_row_idx = 0; + YbWalSndTotalTimeInYBDecodeMicros = 0; + YbWalSndTotalTimeInReorderBufferMicros = 0; YbWalSndTotalTimeInSendingMicros = 0; last_getconsistentchanges_response_receipt_time = GetCurrentTimestamp(); } @@ -317,10 +319,26 @@ PreProcessBeforeFetchingNextBatch() { TimestampDifference(last_getconsistentchanges_response_receipt_time, GetCurrentTimestamp(), &secs, µsecs); + + /* + * Note that this processing time does not include the time taken for + * the conversion from QLValuePB (proto) to PG datum values. This is + * done in ybc_pggate and is logged separately. + * + * The time being logged here is the total time it took for processing + * and sending a whole batch AFTER converting all the values to the PG + * format. + */ elog(DEBUG1, "Walsender processing time for the last batch is (%ld s, %d us)", secs, microsecs); - elog(DEBUG1, "Time spent in sending data (socket): %" PRIu64 " us", + elog(DEBUG1, + "Time Distribution. " + "yb_decode: %" PRIu64 " us, " + "reorder buffer: %" PRIu64 " us, " + "socket: %" PRIu64 " us.", + YbWalSndTotalTimeInYBDecodeMicros, + YbWalSndTotalTimeInReorderBufferMicros, YbWalSndTotalTimeInSendingMicros); } diff --git a/src/postgres/src/backend/replication/walsender.c b/src/postgres/src/backend/replication/walsender.c index 0d12b70b33b..405d234981e 100644 --- a/src/postgres/src/backend/replication/walsender.c +++ b/src/postgres/src/backend/replication/walsender.c @@ -206,6 +206,27 @@ static volatile sig_atomic_t replication_active = false; static LogicalDecodingContext *logical_decoding_ctx = NULL; static XLogRecPtr logical_startptr = InvalidXLogRecPtr; +/* + * Total time spent in the yb_decode steps in a batch of changes. This includes + * the time spent in: + * 1. Creating heap tuples + * 2. Storing heap tuples in the reorder buffer + * 3. Processing time of the reorder buffer + * 4. Processing time of the output plugin + * 5. Sending the data to the client including the socket time + */ +uint64_t YbWalSndTotalTimeInYBDecodeMicros = 0; +/* + * Total time spent in the reorderbuffer steps in a batch of changes. This + * includes the time spent in: + * 1. Storing heap tuples in the reorder buffer + * 2. Processing time of the reorder buffer + * 3. Processing time of the output plugin + * 4. Sending the data to the client including the socket time + * + * A subset of the yb_decode time. + */ +uint64_t YbWalSndTotalTimeInReorderBufferMicros = 0; /* Total time spent in the WalSndWriteData function in a batch of changes. */ uint64_t YbWalSndTotalTimeInSendingMicros = 0; @@ -265,8 +286,6 @@ static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch); static void XLogRead(char *buf, XLogRecPtr startptr, Size count); -static void YbWalSndUpdateTotalTimeInSendingMicros(TimestampTz yb_start_time); - /* Initialize walsender process before entering the main command loop */ void InitWalSender(void) @@ -1346,7 +1365,8 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, !pq_is_send_pending()) { if (IsYugaByteEnabled()) - YbWalSndUpdateTotalTimeInSendingMicros(yb_start_time); + YbWalSndTotalTimeInSendingMicros += + YbCalculateTimeDifferenceInMicros(yb_start_time); return; } @@ -1405,23 +1425,13 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, } if (IsYugaByteEnabled()) - YbWalSndUpdateTotalTimeInSendingMicros(yb_start_time); + YbWalSndTotalTimeInSendingMicros += + YbCalculateTimeDifferenceInMicros(yb_start_time); /* reactivate latch so WalSndLoop knows to continue */ SetLatch(MyLatch); } -static void -YbWalSndUpdateTotalTimeInSendingMicros(TimestampTz yb_start_time) -{ - long secs; - int microsecs; - - TimestampDifference(yb_start_time, GetCurrentTimestamp(), &secs, - µsecs); - YbWalSndTotalTimeInSendingMicros += (secs * USECS_PER_SEC + microsecs); -} - /* * LogicalDecodingContext 'update_progress' callback. * diff --git a/src/postgres/src/backend/utils/misc/pg_yb_utils.c b/src/postgres/src/backend/utils/misc/pg_yb_utils.c index 4137788e1d4..27dab14d2af 100644 --- a/src/postgres/src/backend/utils/misc/pg_yb_utils.c +++ b/src/postgres/src/backend/utils/misc/pg_yb_utils.c @@ -4722,3 +4722,14 @@ YBCUpdateYbReadTimeAndInvalidateRelcache(uint64_t read_time_ht) assign_yb_read_time(read_time, NULL); YbRelationCacheInvalidate(); } + +uint64_t +YbCalculateTimeDifferenceInMicros(TimestampTz yb_start_time) +{ + long secs; + int microsecs; + + TimestampDifference(yb_start_time, GetCurrentTimestamp(), &secs, + µsecs); + return secs * USECS_PER_SEC + microsecs; +} diff --git a/src/postgres/src/include/pg_yb_utils.h b/src/postgres/src/include/pg_yb_utils.h index bbb7373896b..790ed216f83 100644 --- a/src/postgres/src/include/pg_yb_utils.h +++ b/src/postgres/src/include/pg_yb_utils.h @@ -1116,4 +1116,7 @@ extern Relation YbGetRelationWithOverwrittenReplicaIdentity(Oid relid, char replident); extern void YBCUpdateYbReadTimeAndInvalidateRelcache(uint64_t read_time); + +extern uint64_t YbCalculateTimeDifferenceInMicros(TimestampTz yb_start_time); + #endif /* PG_YB_UTILS_H */ diff --git a/src/postgres/src/include/replication/walsender_private.h b/src/postgres/src/include/replication/walsender_private.h index 9d67efec78e..d499592e6ce 100644 --- a/src/postgres/src/include/replication/walsender_private.h +++ b/src/postgres/src/include/replication/walsender_private.h @@ -106,6 +106,8 @@ typedef struct extern WalSndCtlData *WalSndCtl; +extern uint64_t YbWalSndTotalTimeInYBDecodeMicros; +extern uint64_t YbWalSndTotalTimeInReorderBufferMicros; extern uint64_t YbWalSndTotalTimeInSendingMicros; extern void WalSndSetState(WalSndState state);