Skip to content

Commit

Permalink
[#22142] CDCSDK: Remove table level attributes for CDCSDK Metrics
Browse files Browse the repository at this point in the history
Summary:
Currently the CDCSDK metrics fail the DCHECK on attributes in metrics-writer class method `AddAggregatedEntry()`. The DCHECK was introduced in the diff https://phorge.dev.yugabyte.com/D33396 and causes tserver crash, as it compares and expects all the attributes to be the same for a given metric entity type, i.e. it expects that all the attributes of entity type cdcsdk should have the same attributes.

Currently the CDCSDK metrics store `table_name` and `table_id` in the attributes. However since the CDCSDK metrics are aggregated at stream level, they can have multiple tables associated with them. This causes the DCHECK to fail.

Since we do not need `table_name` and `table_id` attributes for aggregation of CDCSDK metrics, the fix for this issue is removing the table level attributes from the attribute map for CDCSDK metrics.

This DCHECK failure was not caught in any existing unit test, hence modifying an existing test. Also this was not caught in any QA stress tests as these are run in release mode.
Jira: DB-11070

Test Plan:
./yb_build.sh --cxx-test integration-tests_cdcsdk_ysql-test --gtest_filter CDCSDKYsqlTest.TestCDCSDKMetricsTwoTablesSingleStream
PG connector test PostgresConnectorIT#shouldProduceEventsWithInitialSnapshot

Reviewers: skumar, asrinivasan, vkushwaha, stiwary, yyan

Reviewed By: yyan

Subscribers: mlillibridge, ybase, ycdcxcluster

Tags: #jenkins-ready

Differential Revision: https://phorge.dev.yugabyte.com/D34528
  • Loading branch information
Sumukh-Phalgaonkar committed May 3, 2024
1 parent e346bcf commit f3b9e49
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 2 deletions.
22 changes: 22 additions & 0 deletions src/yb/integration-tests/cdcsdk_ysql-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "yb/tserver/ts_tablet_manager.h"

#include "yb/util/tostring.h"
#include "yb/util/metric_entity.h"

namespace yb {

Expand Down Expand Up @@ -4247,6 +4248,12 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestCDCSDKMetricsTwoTablesSingleS
int64_t total_traffic_sent = 0;
uint64_t total_change_event_count = 0;

std::stringstream output;
MetricPrometheusOptions opts;
PrometheusWriter writer(&output, opts);

std::unordered_map<std::string, std::string> attr;
auto aggregation_level = kStreamLevel;

for (uint32_t idx = 0; idx < num_tables; idx++) {
ASSERT_OK(
Expand All @@ -4272,9 +4279,24 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestCDCSDKMetricsTwoTablesSingleS
return current_expiry_time > metrics[idx]->cdcsdk_expiry_time_ms->value();
},
MonoDelta::FromSeconds(10) * kTimeMultiplier, "Wait for stream expiry time update."));

attr["namespace_name"] = kNamespaceName;
attr["stream_id"] = stream_id.ToString();
attr["metric_type"] = "cdcsdk";

ASSERT_OK(metrics[idx]->cdcsdk_change_event_count->WriteForPrometheus(
&writer, attr, opts, aggregation_level));
ASSERT_OK(metrics[idx]->cdcsdk_traffic_sent->WriteForPrometheus(
&writer, attr, opts, aggregation_level));
}
auto aggregated_change_event_count =
writer.TEST_GetAggregatedValue("cdcsdk_change_event_count", stream_id.ToString());
auto aggregated_traffic_sent =
writer.TEST_GetAggregatedValue("cdcsdk_traffic_sent", stream_id.ToString());

ASSERT_GT(aggregated_traffic_sent, 100);
ASSERT_GT(total_record_size, 100);
ASSERT_GT(aggregated_change_event_count, 100);
ASSERT_GT(total_change_event_count, 100);
ASSERT_TRUE(current_traffic_sent_bytes < total_traffic_sent);
}
Expand Down
2 changes: 0 additions & 2 deletions src/yb/util/metric_entity.cc
Original file line number Diff line number Diff line change
Expand Up @@ -298,8 +298,6 @@ Status MetricEntity::WriteForPrometheus(PrometheusWriter* writer,
prometheus_attr["stream_id"] = attrs["stream_id"];
aggregation_levels = kStreamLevel;
} else if (strcmp(prototype_->name(), kCdcsdkMetricEntityName) == 0) {
prometheus_attr["table_id"] = attrs["table_id"];
prometheus_attr["table_name"] = attrs["table_name"];
prometheus_attr["namespace_name"] = attrs["namespace_name"];
prometheus_attr["stream_id"] = attrs["stream_id"];
aggregation_levels = kStreamLevel;
Expand Down
4 changes: 4 additions & 0 deletions src/yb/util/metrics_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ class PrometheusWriter {
return num_of_entries_cut_off_;
}

int64_t TEST_GetAggregatedValue(std::string metric_name, std::string entity_id) {
return aggregated_values_[metric_name][entity_id];
}

private:
friend class MetricsTest;
// FlushSingleEntry() was a function template with type of "value" as template
Expand Down

0 comments on commit f3b9e49

Please sign in to comment.