Skip to content

Commit

Permalink
[#22232] YSQL: Convert row level debug logs to DEBUG2 in walsender
Browse files Browse the repository at this point in the history
Summary:
We have debug logs in place in walsender to help identify issues in case of errors. Some of these logs are printed for every row being streamed. As a result, they can have a substantial overhead.

This revision updates them with the following strategy:
1. Batch level: DEBUG1. These logs are printed once per batch of changes received from the VWAL. The current batch size is 500. So these are summary logs
2. Row level: DEBUG2. These logs are printed for every row being streamed.

Also did misc. improvements in the logs.
Jira: DB-11152

Test Plan:
Jenkins: test regex: .*ReplicationSlot.*

Existing tests

Reviewers: asrinivasan

Reviewed By: asrinivasan

Subscribers: ybase, ycdcxcluster, yql

Differential Revision: https://phorge.dev.yugabyte.com/D34682
  • Loading branch information
dr0pdb committed May 3, 2024
1 parent 0a86d62 commit e346bcf
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 22 deletions.
15 changes: 11 additions & 4 deletions src/postgres/src/backend/replication/logical/yb_decode.c
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,10 @@ YBDecodeUpdate(LogicalDecodingContext *ctx, XLogReaderState *record)
before_op_tuple_buf->tuple = *before_op_tuple;
before_op_tuple_buf->yb_is_omitted = before_op_is_omitted;

elog(DEBUG2, "The before_op heap tuple: %s and after_op heap tuple: %s",
YbHeapTupleToString(before_op_tuple, tupdesc),
YbHeapTupleToString(after_op_tuple, tupdesc));

change->data.tp.newtuple = after_op_tuple_buf;
change->data.tp.oldtuple = before_op_tuple_buf;
change->data.tp.yb_table_oid = yb_record->table_oid;
Expand Down Expand Up @@ -416,6 +420,9 @@ YBGetHeapTuplesForRecord(const YBCPgVirtualWalRecord *yb_record,
}

tuple = heap_form_tuple(tupdesc, datums, is_nulls);
elog(DEBUG2, "The heap tuple: %s for operation: %s",
YbHeapTupleToString(tuple, tupdesc),
(change_type == REORDER_BUFFER_CHANGE_INSERT) ? "INSERT" : "DELETE");

RelationClose(relation);
return tuple;
Expand Down Expand Up @@ -536,14 +543,14 @@ static void
YBLogTupleDescIfRequested(const YBCPgVirtualWalRecord *yb_record,
TupleDesc tupdesc)
{
/* Log tuple descriptor for DEBUG1 onwards. */
if (log_min_messages <= DEBUG1)
/* Log tuple descriptor for DEBUG2 onwards. */
if (log_min_messages <= DEBUG2)
{
elog(DEBUG1, "Printing tuple descriptor for relation %d\n",
elog(DEBUG2, "Printing tuple descriptor for relation %d\n",
yb_record->table_oid);
for (int attr_idx = 0; attr_idx < tupdesc->natts; attr_idx++)
{
elog(DEBUG1, "Col %d: name = %s, dropped = %d, type = %d\n",
elog(DEBUG2, "Col %d: name = %s, dropped = %d, type = %d\n",
attr_idx, tupdesc->attrs[attr_idx].attname.data,
tupdesc->attrs[attr_idx].attisdropped,
tupdesc->attrs[attr_idx].atttypid);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,23 +296,6 @@ PreProcessBeforeFetchingNextBatch()
long secs;
int microsecs;

if (last_getconsistentchanges_response_empty)
{
elog(DEBUG4, "YBCReadRecord: Sleeping for %d ms due to empty response.",
yb_walsender_poll_sleep_duration_empty_ms);
pg_usleep(1000L * yb_walsender_poll_sleep_duration_empty_ms);
}
else
{
elog(DEBUG4,
"YBCReadRecord: Sleeping for %d ms as the last "
"response was non-empty.",
yb_walsender_poll_sleep_duration_nonempty_ms);
pg_usleep(1000L * yb_walsender_poll_sleep_duration_nonempty_ms);
}

elog(DEBUG5, "YBCReadRecord: Fetching a fresh batch of changes.");

/* Log the summary of time spent in processing the previous batch. */
if (log_min_messages <= DEBUG1 &&
last_getconsistentchanges_response_receipt_time != 0)
Expand All @@ -333,10 +316,12 @@ PreProcessBeforeFetchingNextBatch()
"Walsender processing time for the last batch is (%ld s, %d us)",
secs, microsecs);
elog(DEBUG1,
"Time Distribution. "
"More Information: "
"batch_size: %d, "
"yb_decode: %" PRIu64 " us, "
"reorder buffer: %" PRIu64 " us, "
"socket: %" PRIu64 " us.",
(cached_records) ? cached_records->row_count : 0,
YbWalSndTotalTimeInYBDecodeMicros,
YbWalSndTotalTimeInReorderBufferMicros,
YbWalSndTotalTimeInSendingMicros);
Expand All @@ -345,6 +330,23 @@ PreProcessBeforeFetchingNextBatch()
/* We no longer need the earlier record batch. */
if (cached_records)
MemoryContextReset(cached_records_context);

if (last_getconsistentchanges_response_empty)
{
elog(DEBUG4, "YBCReadRecord: Sleeping for %d ms due to empty response.",
yb_walsender_poll_sleep_duration_empty_ms);
pg_usleep(1000L * yb_walsender_poll_sleep_duration_empty_ms);
}
else
{
elog(DEBUG4,
"YBCReadRecord: Sleeping for %d ms as the last "
"response was non-empty.",
yb_walsender_poll_sleep_duration_nonempty_ms);
pg_usleep(1000L * yb_walsender_poll_sleep_duration_nonempty_ms);
}

elog(DEBUG5, "YBCReadRecord: Fetching a fresh batch of changes.");
}

static void
Expand Down

0 comments on commit e346bcf

Please sign in to comment.