Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

To add random exchange in NLJ and SPF when right child is DAS #1484

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions src/sql/code_generator/ob_static_engine_cg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2840,6 +2840,7 @@ int ObStaticEngineCG::generate_basic_transmit_spec(
spec.is_wf_hybrid_ = op.is_wf_hybrid();
spec.sample_type_ = op.get_sample_type();
spec.repartition_table_id_ = op.get_repartition_table_id();
spec.is_related_child_ = op.is_related_child();
OZ(check_rollup_distributor(&spec));
LOG_TRACE("CG transmit", K(op.get_dfo_id()), K(op.get_op_id()),
K(op.get_dist_method()), K(op.get_unmatch_row_dist_method()));
Expand Down
6 changes: 4 additions & 2 deletions src/sql/engine/px/exchange/ob_transmit_op.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ ObTransmitSpec::ObTransmitSpec(ObIAllocator &alloc, const ObPhyOperatorType type
slave_mapping_type_(SlaveMappingType::SM_NONE),
has_lgi_(false),
is_rollup_hybrid_(false),
is_wf_hybrid_(false)
is_wf_hybrid_(false),
is_related_child_(false)
{
}

Expand All @@ -63,7 +64,8 @@ OB_SERIALIZE_MEMBER((ObTransmitSpec, ObOpSpec),
has_lgi_,
is_rollup_hybrid_,
null_row_dist_method_,
is_wf_hybrid_);
is_wf_hybrid_,
is_related_child_);

ObTransmitOp::ObTransmitOp(ObExecContext &exec_ctx, const ObOpSpec &spec, ObOpInput *input)
: ObOperator(exec_ctx, spec, input)
Expand Down
4 changes: 4 additions & 0 deletions src/sql/engine/px/exchange/ob_transmit_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ class ObTransmitSpec : public ObOpSpec

// for window function adaptive pushdown
bool is_wf_hybrid_;

// Sometimes, a dfo's construction relies on its child. This flag
// indicates that the current dfo is the child being depended upon
bool is_related_child_;
};

class ObTransmitOp : public ObOperator
Expand Down
7 changes: 5 additions & 2 deletions src/sql/engine/px/ob_dfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,8 @@ class ObDfo
p2p_dh_loc_(nullptr),
need_p2p_info_(false),
p2p_dh_map_info_(),
coord_info_ptr_(nullptr)
coord_info_ptr_(nullptr),
is_related_pair_(false)
{
}

Expand Down Expand Up @@ -541,7 +542,8 @@ class ObDfo
inline void set_slave_mapping_type(SlaveMappingType v) { slave_mapping_type_ = v; }
inline SlaveMappingType get_slave_mapping_type() { return slave_mapping_type_; }
inline bool is_slave_mapping() { return SlaveMappingType::SM_NONE != slave_mapping_type_; }

inline void set_is_related_pair(bool is_related_pair) { is_related_pair_ = is_related_pair; }
inline bool is_related_pair() { return is_related_pair_; }
ObPxPartChMapArray &get_part_ch_map() { return part_ch_map_; }

// DFO 分布,DFO 在各个 server 上的任务状态
Expand Down Expand Up @@ -776,6 +778,7 @@ class ObDfo
// ---------------
ObPxCoordInfo *coord_info_ptr_;
bool force_bushy_;
bool is_related_pair_;
};


Expand Down
4 changes: 4 additions & 0 deletions src/sql/engine/px/ob_dfo_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,10 @@ int ObDfoMgr::do_split(ObExecContext &exec_ctx,
parent_dfo->set_slave_mapping_type(transmit->get_slave_mapping_type());
dfo->set_pkey_table_loc_id(
(reinterpret_cast<const ObPxTransmitSpec *>(transmit))->repartition_table_id_);
if (transmit->is_related_child_) {
parent_dfo->set_is_related_pair(true);
dfo->set_is_related_pair(true);
}
if (OB_ISNULL(parent_dfo)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("parent dfo should not be null", K(ret));
Expand Down
21 changes: 10 additions & 11 deletions src/sql/engine/px/ob_dfo_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -307,12 +307,11 @@ int ObSerialDfoScheduler::init_all_dfo_channel(ObExecContext &ctx) const
} else if (parent->is_root_dfo() && !parent->is_thread_inited() &&
OB_FAIL(ObPXServerAddrUtil::alloc_by_local_distribution(ctx, *parent))) {
LOG_WARN("fail to alloc local distribution", K(ret));
} else if (!parent->is_root_dfo() &&
ObPQDistributeMethod::PARTITION_HASH == child->get_dist_method()) {
} else if (!parent->is_root_dfo() && parent->is_related_pair()) {
if (OB_FAIL(ObPXServerAddrUtil::alloc_by_reference_child_distribution(
coord_info_.pruning_table_location_,
ctx,
*child, *parent))) {
*parent))) {
LOG_WARN("fail alloc addr by data distribution", K(parent), K(child), K(ret));
}
} else if (!parent->is_root_dfo() && !parent->is_thread_inited() &&
Expand Down Expand Up @@ -1409,7 +1408,14 @@ int ObParallelDfoScheduler::schedule_pair(ObExecContext &exec_ctx,
// sqc。
//
// 下面只实现了第一种、第二种情况,第三种需求不明确,列为 TODO
if (parent.has_scan_op() || parent.has_dml_op()) { // 参考 Partial Partition Wise Join
if (parent.is_related_pair() && !parent.has_dml_op()) {
if (OB_FAIL(ObPXServerAddrUtil::alloc_by_reference_child_distribution(
coord_info_.pruning_table_location_,
exec_ctx,
parent))) {
LOG_WARN("fail alloc addr by reference child distribution", K(parent), K(child), K(ret));
}
} else if (parent.has_scan_op() || parent.has_dml_op()) { // 参考 Partial Partition Wise Join
// 当DFO中存在TSC或者pdml中的global index maintain op:
// 1. 当存在TSC情况下,sqcs的location信息使用tsc表的location信息
// 2. 当是pdml、dml+px情况下,sqcs的locations信息使用DML对应的表的locations
Expand All @@ -1426,13 +1432,6 @@ int ObParallelDfoScheduler::schedule_pair(ObExecContext &exec_ctx,
LOG_WARN("fail alloc addr by data distribution", K(parent), K(ret));
}
LOG_TRACE("alloc_by_local_distribution", K(parent));
} else if (ObPQDistributeMethod::PARTITION_HASH == child.get_dist_method()) {
if (OB_FAIL(ObPXServerAddrUtil::alloc_by_reference_child_distribution(
coord_info_.pruning_table_location_,
exec_ctx,
child, parent))) {
LOG_WARN("fail alloc addr by data distribution", K(parent), K(child), K(ret));
}
} else if (OB_FAIL(ObPXServerAddrUtil::alloc_by_random_distribution(exec_ctx, child, parent))) {
LOG_WARN("fail alloc addr by data distribution", K(parent), K(child), K(ret));
}
Expand Down
48 changes: 33 additions & 15 deletions src/sql/engine/px/ob_px_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,8 @@ int ObPXServerAddrUtil::alloc_by_child_distribution(const ObDfo &child, ObDfo &p
ObArray<const ObPxSqcMeta *> sqcs;
if (OB_FAIL(child.get_sqcs(sqcs))) {
LOG_WARN("fail get sqcs", K(ret));
} else if (OB_FAIL(generate_dh_map_info(parent))) {
LOG_WARN("fail to generate dh map info", K(ret));
} else {
for (int64_t i = 0; i < sqcs.count() && OB_SUCC(ret); ++i) {
const ObPxSqcMeta &child_sqc = *sqcs.at(i);
Expand All @@ -703,6 +705,13 @@ int ObPXServerAddrUtil::alloc_by_child_distribution(const ObDfo &child, ObDfo &p
sqc.set_qc_server_id(parent.get_qc_server_id());
sqc.set_parent_dfo_id(parent.get_parent_dfo_id());
sqc.get_monitoring_info().assign(child_sqc.get_monitoring_info());
if (OB_SUCC(ret)) {
if (!parent.get_p2p_dh_map_info().is_empty()) {
if (OB_FAIL(sqc.get_p2p_dh_map_info().assign(parent.get_p2p_dh_map_info()))) {
LOG_WARN("fail to assign p2p dh map info", K(ret));
}
}
}
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里之前是个bug。父dfo依赖子dfo进行构造时,没有生成p2p id。这会导致slave mapping和join filter一起使用时报4016。在这里生成p2p id之后可以解决这个bug

if (OB_FAIL(parent.add_sqc(sqc))) {
LOG_WARN("fail add sqc", K(sqc), K(ret));
}
Expand Down Expand Up @@ -869,23 +878,32 @@ int ObPXServerAddrUtil::alloc_by_local_distribution(ObExecContext &exec_ctx,
int ObPXServerAddrUtil::alloc_by_reference_child_distribution(
const ObIArray<ObTableLocation> *table_locations,
ObExecContext &exec_ctx,
ObDfo &child,
ObDfo &parent)
{
int ret = OB_SUCCESS;
ObDfo *reference_child = nullptr;
if (2 != parent.get_child_count()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("parent should has two child", K(ret));
} else if (OB_FAIL(parent.get_child_dfo(0, reference_child))) {
LOG_WARN("failed to get reference_child", K(ret));
} else if (reference_child->get_dfo_id() == child.get_dfo_id()
&& OB_FAIL(parent.get_child_dfo(1, reference_child))) {
LOG_WARN("failed to get reference_child", K(ret));
} else if (OB_FAIL(alloc_by_data_distribution(table_locations, exec_ctx, *reference_child))) {
LOG_WARN("failed to alloc by data", K(ret));
} else if (OB_FAIL(alloc_by_child_distribution(*reference_child, parent))) {
LOG_WARN("failed to alloc by child distribution", K(ret));
if (0 != parent.get_sqcs_count()) {
/**
* this dfo has been build. do nothing.
*/
} else {
ObDfo *reference_child = nullptr;
bool found = false;
for (int64_t i = 0; OB_SUCC(ret) && i < parent.get_child_count() && !found; i++) {
OZ (parent.get_child_dfo(i, reference_child));
if (reference_child->is_related_pair()) {
found = true;
}
}
if (OB_SUCC(ret)) {
if (!found) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to get reference_child", K(ret));
} else if (alloc_by_data_distribution(table_locations, exec_ctx, *reference_child)) {
LOG_WARN("failed to alloc by data", K(ret));
} else if (OB_FAIL(alloc_by_child_distribution(*reference_child, parent))) {
LOG_WARN("failed to alloc by child distribution", K(ret));
}
}
}
return ret;
}
Expand Down Expand Up @@ -2995,7 +3013,7 @@ int ObSlaveMapUtil::build_pwj_slave_map_mn_group(ObDfo &parent, ObDfo &child, ui
*/
if (parent.get_sqcs_count() != child.get_sqcs_count()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("pwj must have some sqc count", K(ret));
LOG_WARN("pwj must have same sqc count", K(ret));
} else if (OB_FAIL(ObDfo::check_dfo_pair(parent, child, child_dfo_idx))) {
LOG_WARN("failed to check dfo pair", K(ret));
} else if (OB_FAIL(build_mn_channel_per_sqcs(
Expand Down
1 change: 0 additions & 1 deletion src/sql/engine/px/ob_px_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ class ObPXServerAddrUtil
ObDfo &root);
static int alloc_by_reference_child_distribution(const ObIArray<ObTableLocation> *table_locations,
ObExecContext &exec_ctx,
ObDfo &child,
ObDfo &parent);
static int split_parallel_into_task(const int64_t parallelism,
const common::ObIArray<int64_t> &sqc_partition_count,
Expand Down
1 change: 1 addition & 0 deletions src/sql/optimizer/ob_log_exchange.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,7 @@ int ObLogExchange::set_exchange_info(const ObExchangeInfo &exch_info)
null_row_dist_method_ = exch_info.null_row_dist_method_;
slave_mapping_type_ = exch_info.slave_mapping_type_;
if (is_producer()) {
is_related_child_ = exch_info.is_related_child_;
in_server_cnt_ = exch_info.server_cnt_;
slice_count_ = exch_info.slice_count_;
repartition_type_ = exch_info.repartition_type_;
Expand Down
5 changes: 4 additions & 1 deletion src/sql/optimizer/ob_log_exchange.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ class ObLogExchange : public ObLogicalOperator
need_null_aware_shuffle_(false),
is_old_unblock_mode_(true),
sample_type_(NOT_INIT_SAMPLE_TYPE),
in_server_cnt_(0)
in_server_cnt_(0),
is_related_child_(false)
{
repartition_table_id_ = 0;
}
Expand Down Expand Up @@ -187,6 +188,7 @@ class ObLogExchange : public ObLogicalOperator
const ObIArray<ObRawExpr *> &keys);
inline void set_in_server_cnt(int64_t in_server_cnt) { in_server_cnt_ = in_server_cnt; }
inline int64_t get_in_server_cnt() { return in_server_cnt_; }
inline bool is_related_child() const { return is_related_child_; }
private:
int prepare_px_pruning_param(ObLogicalOperator *op, int64_t &count,
common::ObIArray<const ObDMLStmt *> &stmts, common::ObIArray<int64_t> &drop_expr_idxs);
Expand Down Expand Up @@ -263,6 +265,7 @@ class ObLogExchange : public ObLogicalOperator
ObPxSampleType sample_type_;
// -end pkey range/range
int64_t in_server_cnt_; // for producer, need use exchange in server cnt to compute cost
bool is_related_child_; // Sometimes, a dfo's construction relies on its child. This flag indicates that the current dfo is the child being depended upon
DISALLOW_COPY_AND_ASSIGN(ObLogExchange);
};
} // end of namespace sql
Expand Down
39 changes: 38 additions & 1 deletion src/sql/optimizer/ob_log_plan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4504,6 +4504,8 @@ int ObLogPlan::compute_join_exchange_info(JoinPath &join_path,
} else {
left_exch_info.dist_method_ = ObPQDistributeMethod::PARTITION_HASH;
right_exch_info.dist_method_ = ObPQDistributeMethod::HASH;
// The hash join dfo should rely on the child of the local shuffle side.
right_exch_info.is_related_child_ = true;
left_exch_info.slave_mapping_type_ = sm_type;
right_exch_info.slave_mapping_type_ = sm_type;
}
Expand Down Expand Up @@ -4535,6 +4537,8 @@ int ObLogPlan::compute_join_exchange_info(JoinPath &join_path,
} else {
left_exch_info.dist_method_ = ObPQDistributeMethod::HASH;
right_exch_info.dist_method_ = ObPQDistributeMethod::PARTITION_HASH;
// The hash join dfo should rely on the child of the local shuffle side.
left_exch_info.is_related_child_ = true;
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

slave mapping 中也可以用 is_related_child_ 这个标记,表示父 dfo 依赖子 dfo 进行构造

left_exch_info.slave_mapping_type_ = sm_type;
right_exch_info.slave_mapping_type_ = sm_type;
}
Expand Down Expand Up @@ -4626,6 +4630,17 @@ int ObLogPlan::compute_join_exchange_info(JoinPath &join_path,
if (join_path.right_path_->is_sharding() && !join_path.right_path_->contain_fake_cte()) {
right_exch_info.dist_method_ = ObPQDistributeMethod::LOCAL;
}
} else if (DistAlgo::DIST_NONE_ALL == join_path.join_dist_algo_
&& JoinAlgo::NESTED_LOOP_JOIN == join_path.join_algo_
&& join_path.right_path_->is_access_path()
&& get_optimizer_context().get_parallel() > 1) {
/*
When the data volume is small, the GI operator can only divide a small number of granules, resulting in
task skew. We insert a random shuffle operator on GI to shuffle the data and achieve load balancing. We
construct the nlj dfo relied on the left child, to minimize network transfer caused by random shuffling.
*/
left_exch_info.dist_method_ = ObPQDistributeMethod::RANDOM;
left_exch_info.is_related_child_ = true;
} else if (DistAlgo::DIST_NONE_ALL == join_path.join_dist_algo_ ||
DistAlgo::DIST_ALL_NONE == join_path.join_dist_algo_) {
// do nothing
Expand Down Expand Up @@ -9116,7 +9131,29 @@ int ObLogPlan::create_subplan_filter_plan(ObLogicalOperator *&top,
}
if (OB_SUCC(ret)) {
ObExchangeInfo exch_info;
if (DistAlgo::DIST_BASIC_METHOD == dist_algo ||
if (DistAlgo::DIST_NONE_ALL == dist_algo && get_optimizer_context().get_parallel() > 1) {
/*
When the data volume is small, the GI operator can only divide a small number of granules, resulting in
task skew. We insert a random shuffle operator on GI to shuffle the data and achieve load balancing. We
construct the spf dfo relied on the left child, to minimize network transfer caused by random shuffling.
*/
exch_info.dist_method_ = ObPQDistributeMethod::RANDOM;
exch_info.is_related_child_ = true;
if (OB_FAIL(allocate_exchange_as_top(top, exch_info))) {
LOG_WARN("failed to allocate exchange as top");
} else if (OB_FAIL(allocate_subplan_filter_as_top(top,
subquery_ops,
query_ref_exprs,
params,
onetime_exprs,
initplan_idxs,
onetime_idxs,
filters,
dist_algo,
is_update_set))) {
LOG_WARN("failed to allocate subplan filter as top", K(ret));
}
} else if (DistAlgo::DIST_BASIC_METHOD == dist_algo ||
DistAlgo::DIST_PARTITION_WISE == dist_algo ||
DistAlgo::DIST_NONE_ALL == dist_algo) {
// is basic or is_partition_wise
Expand Down
9 changes: 7 additions & 2 deletions src/sql/optimizer/ob_logical_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,8 @@ struct ObExchangeInfo
sample_type_(NOT_INIT_SAMPLE_TYPE),
parallel_(ObGlobalHint::UNSET_PARALLEL),
server_cnt_(0),
server_list_()
server_list_(),
is_related_child_(false)
{
repartition_table_id_ = 0;
}
Expand Down Expand Up @@ -521,6 +522,9 @@ struct ObExchangeInfo
int64_t parallel_;
int64_t server_cnt_;
common::ObSEArray<common::ObAddr, 4> server_list_;
// Sometimes, a dfo's construction relies on its child. This flag
// indicates that the current dfo is the child being depended upon
bool is_related_child_;

TO_STRING_KV(K_(is_remote),
K_(is_task_order),
Expand All @@ -547,7 +551,8 @@ struct ObExchangeInfo
K_(sample_type),
K_(parallel),
K_(server_cnt),
K_(server_list));
K_(server_list),
K_(is_related_child));
private:
DISALLOW_COPY_AND_ASSIGN(ObExchangeInfo);
};
Expand Down