Skip to content

Commit

Permalink
[CP] [OBCDC] Fix memory_usage not controlled while sql response slow
Browse files Browse the repository at this point in the history
  • Loading branch information
SanmuWangZJU authored and ob-robot committed Apr 18, 2024
1 parent 28b241c commit ec03c25
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 21 deletions.
2 changes: 1 addition & 1 deletion src/logservice/libobcdc/src/ob_log_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ class ObLogConfig : public common::ObBaseConfig
DEF_CAP(extra_redo_dispatch_memory_size, OB_CLUSTER_PARAMETER, "0KB", "[0, 512M]", "extra redo dispatcher memory for data skew participant");
T_DEF_INT(pause_redo_dispatch_task_count_threshold, OB_CLUSTER_PARAMETER, 80, 0, 100, "task cound percent threshold for pause redo dispatch");
T_DEF_INT(memory_usage_warn_threshold, OB_CLUSTER_PARAMETER, 85, 10, 100, "memory usage wan threshold, may pause fetch while reach the threshold");
T_DEF_INT_INFT(queue_backlog_lowest_tolerance, OB_CLUSTER_PARAMETER, 100, 0, "lowest threshold of queue_backlog that will touch redo_dispatch flow controll");
T_DEF_INT_INFT(queue_backlog_lowest_tolerance, OB_CLUSTER_PARAMETER, 100, 0, "lowest threshold of queue_backlog that will touch flow controll");
// sorter thread num
T_DEF_INT(msg_sorter_thread_num, OB_CLUSTER_PARAMETER, 1, 1, 32, "trans msg sorter thread num");
// sorter thread
Expand Down
32 changes: 21 additions & 11 deletions src/logservice/libobcdc/src/ob_log_instance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2544,6 +2544,7 @@ void ObLogInstance::global_flow_control_()
int64_t active_part_trans_task_count = trans_task_pool_.get_alloc_count();
int64_t active_log_entry_task_count = log_entry_task_pool_->get_alloc_count();
int64_t reusable_part_trans_task_count = 0;
int64_t ddl_part_trans_count = 0;
int64_t ready_to_seq_task_count = 0;
int64_t seq_queue_trans_count = 0;

Expand All @@ -2568,15 +2569,17 @@ void ObLogInstance::global_flow_control_()
const bool need_pause_dispatch = need_pause_redo_dispatch();
const bool touch_memory_warn_limit = (memory_hold > memory_warn_usage);
const bool is_storage_work_mode = is_storage_working_mode(working_mode_);
const int64_t queue_backlog_lowest_tolerance = TCONF.queue_backlog_lowest_tolerance;
const char *reason = "";

if (OB_FAIL(get_task_count_(ready_to_seq_task_count, seq_queue_trans_count, reusable_part_trans_task_count))) {
if (OB_FAIL(get_task_count_(ready_to_seq_task_count, seq_queue_trans_count, reusable_part_trans_task_count, ddl_part_trans_count))) {
LOG_ERROR("get_task_count fail", KR(ret), K(ready_to_seq_task_count), K(seq_queue_trans_count),
K(reusable_part_trans_task_count));
K(reusable_part_trans_task_count), K(ddl_part_trans_count));
} else if (OB_FAIL(dml_parser_->get_log_entry_task_count(dml_parser_part_trans_task_count))) {
LOG_ERROR("DML parser get_log_entry_task_count fail", KR(ret), K(dml_parser_part_trans_task_count));
} else {
const bool is_seq_queue_not_empty = (seq_queue_trans_count > 0);
const bool exist_trans_sequenced_not_handled = (seq_queue_trans_count > queue_backlog_lowest_tolerance);
const bool exist_ddl_processing_or_in_queue = (ddl_part_trans_count > queue_backlog_lowest_tolerance);
int64_t storager_task_count = 0;
int64_t storager_log_count = 0;
storager_->get_task_count(storager_task_count, storager_log_count);
Expand All @@ -2590,23 +2593,27 @@ void ObLogInstance::global_flow_control_()
// OR
// (3) memory is limited and exist trans sequenced but not output
// OR
// (4) memory_limit touch warn threshold and need_pause_dispatch
// (4) memory is limited and exist ddl_trans in to handle or handling
// OR
// (5) memory_limit touch warn threshold and need_pause_dispatch
bool condition1 = (active_part_trans_task_count >= part_trans_task_active_count_upper_bound)
|| touch_memory_warn_limit
|| (system_memory_avail < system_memory_avail_lower_bound);
bool condition2 = (reusable_part_trans_task_count >= part_trans_task_reusable_count_upper_bound)
|| (ready_to_seq_task_count > ready_to_seq_task_upper_bound);
bool condition3 = (storager_task_count > storager_task_count_upper_bound) && (memory_hold >= storager_mem_percentage * memory_limit);

need_slow_down_fetcher = (condition1 && (condition2 || need_pause_dispatch || is_seq_queue_not_empty)) || condition3;
need_slow_down_fetcher = (condition1 && (condition2 || need_pause_dispatch || exist_trans_sequenced_not_handled || exist_ddl_processing_or_in_queue)) || condition3;

if (need_slow_down_fetcher) {
if (condition2) {
reason = "MEMORY_LIMIT_AND_REUSABLE_PART_TOO_MUCH";
} else if (need_pause_dispatch) {
reason = "MEMORY_LIMIT_AND_DISPATCH_PAUSED";
} else if (is_seq_queue_not_empty) {
} else if (exist_trans_sequenced_not_handled) {
reason = "MEMORY_LIMIT_AND_EXIST_TRANS_TO_OUTPUT";
} else if (exist_ddl_processing_or_in_queue) {
reason = "MEMORY_LIMIT_AND_EXIST_DDL_TRANS_TO_HANDLE";
} else if (condition3) {
reason = "STORAGER_TASK_OVER_THRESHOLD";
} else {
Expand All @@ -2633,7 +2640,7 @@ void ObLogInstance::global_flow_control_()
"PART_TRANS(TOTAL=%ld, ACTIVE=%ld/%ld, REUSABLE=%ld/%ld) "
"LOG_TASK(ACTIVE=%ld) "
"STORE(%ld/%ld) "
"[FETCHER=%ld DML_PARSER=%ld "
"[FETCHER=%ld DML_PARSER=%ld DDL=%ld "
"COMMITER=%ld USER_QUEUE=%ld OUT=%ld RC=%ld] "
"DIST_TRANS(SEQ_QUEUE=%ld, SEQ=%ld, COMMITTED=%ld) "
"NEED_PAUSE_DISPATCH=%d REASON=%s",
Expand All @@ -2646,7 +2653,7 @@ void ObLogInstance::global_flow_control_()
reusable_part_trans_task_count, part_trans_task_reusable_count_upper_bound,
active_log_entry_task_count,
storager_task_count, storager_task_count_upper_bound,
fetcher_part_trans_task_count, dml_parser_part_trans_task_count,
fetcher_part_trans_task_count, dml_parser_part_trans_task_count, ddl_part_trans_count,
committer_ddl_part_trans_task_count + committer_dml_part_trans_task_count,
br_queue_part_trans_task_count, out_part_trans_task_count,
resource_collector_part_trans_task_count,
Expand Down Expand Up @@ -2808,7 +2815,8 @@ int64_t ObLogInstance::get_memory_limit_() const
int ObLogInstance::get_task_count_(
int64_t &ready_to_seq_task_count,
int64_t &seq_trans_count,
int64_t &part_trans_task_resuable_count)
int64_t &part_trans_task_resuable_count,
int64_t &ddl_part_trans_count)
{
int ret = OB_SUCCESS;
ready_to_seq_task_count = 0;
Expand Down Expand Up @@ -2865,7 +2873,8 @@ int ObLogInstance::get_task_count_(
int64_t fetcher_part_trans_task_count = fetcher_->get_part_trans_task_count();
committer_->get_part_trans_task_count(committer_ddl_part_trans_task_count,
committer_dml_part_trans_task_count);
int64_t sys_ls_handle_part_trans_task_count = sys_ls_handler_->get_part_trans_task_count();
int64_t sys_ls_handle_part_trans_task_count = 0;
sys_ls_handler_->get_task_count(sys_ls_handle_part_trans_task_count, ddl_part_trans_count);
int64_t br_queue_part_trans_task_count = br_queue_.get_part_trans_task_count();
int64_t out_part_trans_task_count = get_out_part_trans_task_count_();
int64_t resource_collector_part_trans_task_count = 0;
Expand All @@ -2888,7 +2897,8 @@ int ObLogInstance::get_task_count_(
if (REACH_TIME_INTERVAL(PRINT_GLOBAL_FLOW_CONTROL_INTERVAL)) {
_LOG_INFO("------------------------------------------------------------");
_LOG_INFO("[TASK_COUNT_STAT] [FETCHER] [PART_TRANS_TASK=%ld]", fetcher_part_trans_task_count);
_LOG_INFO("[TASK_COUNT_STAT] [SYS_LS_HANDLE] [PART_TRANS_TASK=%ld]", sys_ls_handle_part_trans_task_count);
_LOG_INFO("[TASK_COUNT_STAT] [SYS_LS_HANDLE] [PART_TRANS_TASK=%ld][DDL_QUEUED=%ld]",
sys_ls_handle_part_trans_task_count, ddl_part_trans_count);
_LOG_INFO("[TASK_COUNT_STAT] [STORAGER] [LOG_TASK=%ld/%ld]", storager_task_count, storager_log_count);
_LOG_INFO("[TASK_COUNT_STAT] [SEQUENCER] [PART_TRANS_TASK(QUEUE=%ld TOTAL=[%ld][DDL=%ld DML=%ld HB=%ld])] [TRANS(READY=%ld SEQ=%ld)]",
seq_stat_info.queue_part_trans_task_count_, seq_stat_info.total_part_trans_task_count_,
Expand Down
3 changes: 2 additions & 1 deletion src/logservice/libobcdc/src/ob_log_instance.h
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,8 @@ class ObLogInstance : public IObCDCInstance, public IObLogErrHandler, public lib
int get_task_count_(
int64_t &ready_to_seq_task_count,
int64_t &seq_trans_count,
int64_t &part_trans_task_resuable_count);
int64_t &part_trans_task_resuable_count,
int64_t &ddl_part_trans_count);

// next record
void do_drc_consume_tps_stat_();
Expand Down
22 changes: 16 additions & 6 deletions src/logservice/libobcdc/src/ob_log_sys_ls_task_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ ObLogSysLsTaskHandler::ObLogSysLsTaskHandler() :
handle_pid_(0),
stop_flag_(true),
sys_ls_fetch_queue_(),
ddl_part_trans_count_(0),
wait_formatted_cond_()
{}

Expand Down Expand Up @@ -181,6 +182,7 @@ void ObLogSysLsTaskHandler::destroy()
err_handler_ = NULL;
schema_getter_ = NULL;
handle_pid_ = 0;
ddl_part_trans_count_ = 0;
stop_flag_ = true;
}

Expand Down Expand Up @@ -233,10 +235,14 @@ void ObLogSysLsTaskHandler::stop()
int ObLogSysLsTaskHandler::push(PartTransTask *task, const int64_t timeout)
{
int ret = OB_SUCCESS;
bool is_ddl_trans = false;

if (OB_ISNULL(ddl_parser_)) {
ret = OB_NOT_INIT;
LOG_ERROR("invalid DDL parser", KR(ret), K(ddl_parser_));
} else if (OB_ISNULL(task)) {
ret = OB_INVALID_ARGUMENT;
LOG_ERROR("invalid argument", KR(ret));
} else if (OB_UNLIKELY(! task->is_ddl_trans()
&& ! task->is_ls_op_trans()
&& ! task->is_sys_ls_heartbeat()
Expand All @@ -246,15 +252,16 @@ int ObLogSysLsTaskHandler::push(PartTransTask *task, const int64_t timeout)
}
// DDL task have to push to the DDL parser first, because the task will retry after the task push DDL parser times out.
// that is, the same task may be pushed multiple times.To avoid the same task being added to the queue more than once, the DDL parser is pushed first
else if (task->is_ddl_trans() && OB_FAIL(ddl_parser_->push(*task, timeout))) {
else if ((is_ddl_trans = task->is_ddl_trans()) && OB_FAIL(ddl_parser_->push(*task, timeout))) {
if (OB_IN_STOP_STATE != ret && OB_TIMEOUT != ret) {
LOG_ERROR("push task into DDL parser fail", KR(ret), K(task));
}
}
// Add the task to the Fetch queue without timeout failure, ensuring that it will only be pushed once in the Parser
else if (OB_FAIL(sys_ls_fetch_queue_.push(task))) {
LOG_ERROR("push task into fetch queue fail", KR(ret), KPC(task));
} else {
} else if (is_ddl_trans) {
inc_ddl_part_trans_count_();
// success
}

Expand Down Expand Up @@ -292,9 +299,10 @@ int ObLogSysLsTaskHandler::get_progress(
return ret;
}

int64_t ObLogSysLsTaskHandler::get_part_trans_task_count() const
void ObLogSysLsTaskHandler::get_task_count(int64_t &total_part_trans_count, int64_t &ddl_part_trans_count) const
{
return sys_ls_fetch_queue_.size();
total_part_trans_count = sys_ls_fetch_queue_.size();
ddl_part_trans_count = ATOMIC_LOAD(&ddl_part_trans_count_);
}

void ObLogSysLsTaskHandler::configure(const ObLogConfig &config)
Expand Down Expand Up @@ -369,11 +377,11 @@ int ObLogSysLsTaskHandler::handle_task_(PartTransTask &task,
K(ddl_tenant_id), K(is_tenant_served));
} else {
const bool is_using_online_schema = is_online_refresh_mode(TCTX.refresh_mode_);
const bool is_ddl_trans = task.is_ddl_trans();
// The following handles DDL transaction tasks and DDL heartbeat tasks
// NOTICE: handle_ddl_trans before sequencer when using online_schmea, otherwise(using data_dict) handle_ddl_trans in sequencer.
// First output ddl trans and then update tic
if (task.is_ddl_trans() && is_using_online_schema && OB_FAIL(ddl_processor_->handle_ddl_trans(task, *tenant,
false /* need_update_tic */, stop_flag_))) {
if (is_ddl_trans && is_using_online_schema && OB_FAIL(ddl_processor_->handle_ddl_trans(task, *tenant, false /* need_update_tic */, stop_flag_))) {
if (OB_IN_STOP_STATE != ret) {
LOG_ERROR("ddl_processor_ handle_ddl_trans fail", KR(ret), K(task), K(ddl_tenant_id), K(tenant),
K(is_tenant_served));
Expand All @@ -390,6 +398,8 @@ int ObLogSysLsTaskHandler::handle_task_(PartTransTask &task,
if (OB_IN_STOP_STATE != ret) {
LOG_ERROR("update_sys_ls_info_ fail", KR(ret), K(task), KPC(tenant));
}
} else if (is_ddl_trans) {
dec_ddl_part_trans_count_();
}
}

Expand Down
7 changes: 5 additions & 2 deletions src/logservice/libobcdc/src/ob_log_sys_ls_task_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class IObLogSysLsTaskHandler
int64_t &sys_ls_min_progress,
palf::LSN &sys_ls_min_handle_log_lsn) = 0;

virtual int64_t get_part_trans_task_count() const = 0;
virtual void get_task_count(int64_t &total_part_trans_count, int64_t &ddl_part_trans_count) const = 0;

virtual void configure(const ObLogConfig &config) = 0;
};
Expand Down Expand Up @@ -99,7 +99,7 @@ class ObLogSysLsTaskHandler : public IObLogSysLsTaskHandler
uint64_t &sys_min_progress_tenant_id,
int64_t &sys_ls_min_progress,
palf::LSN &sys_ls_min_handle_log_lsn);
virtual int64_t get_part_trans_task_count() const;
virtual void get_task_count(int64_t &total_part_trans_count, int64_t &ddl_part_trans_count) const;
virtual void configure(const ObLogConfig &config);

public:
Expand Down Expand Up @@ -140,6 +140,8 @@ class ObLogSysLsTaskHandler : public IObLogSysLsTaskHandler
PartTransTask *task,
ObLogTenant *tenant,
const bool is_tenant_served);
void inc_ddl_part_trans_count_() { ATOMIC_INC(&ddl_part_trans_count_); }
void dec_ddl_part_trans_count_() { ATOMIC_DEC(&ddl_part_trans_count_); }

public:
// Task queue
Expand Down Expand Up @@ -180,6 +182,7 @@ class ObLogSysLsTaskHandler : public IObLogSysLsTaskHandler

// Queue of pending tasks exported from Fetcher
TaskQueue sys_ls_fetch_queue_;
int64_t ddl_part_trans_count_;
common::ObCond wait_formatted_cond_;

private:
Expand Down

0 comments on commit ec03c25

Please sign in to comment.