Skip to content

Commit

Permalink
fix alloc_by_reference_child_distribution
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangyuqin1998 committed Aug 15, 2023
1 parent 48b8129 commit 91f69a9
Show file tree
Hide file tree
Showing 17 changed files with 81 additions and 188 deletions.
2 changes: 1 addition & 1 deletion src/sql/code_generator/ob_static_engine_cg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2840,7 +2840,7 @@ int ObStaticEngineCG::generate_basic_transmit_spec(
spec.is_wf_hybrid_ = op.is_wf_hybrid();
spec.sample_type_ = op.get_sample_type();
spec.repartition_table_id_ = op.get_repartition_table_id();
spec.is_related_pair_ = op.is_related_pair();
spec.is_related_child_ = op.is_related_child();
OZ(check_rollup_distributor(&spec));
LOG_TRACE("CG transmit", K(op.get_dfo_id()), K(op.get_op_id()),
K(op.get_dist_method()), K(op.get_unmatch_row_dist_method()));
Expand Down
6 changes: 2 additions & 4 deletions src/sql/engine/px/exchange/ob_px_transmit_op.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,7 @@ int ObPxTransmitOpInput::get_data_ch(ObPxTaskChSet &task_ch_set, int64_t timeout
OB_SERIALIZE_MEMBER((ObPxTransmitSpec, ObTransmitSpec),
sample_type_, need_null_aware_shuffle_, tablet_id_expr_,
random_expr_, sampling_saving_row_, repartition_table_id_,
wf_hybrid_aggr_status_expr_, wf_hybrid_pby_exprs_cnt_array_,
is_related_pair_);
wf_hybrid_aggr_status_expr_, wf_hybrid_pby_exprs_cnt_array_);

ObPxTransmitSpec::ObPxTransmitSpec(ObIAllocator &alloc, const ObPhyOperatorType type)
: ObTransmitSpec(alloc, type),
Expand All @@ -116,8 +115,7 @@ ObPxTransmitSpec::ObPxTransmitSpec(ObIAllocator &alloc, const ObPhyOperatorType
sampling_saving_row_(alloc),
repartition_table_id_(0),
wf_hybrid_aggr_status_expr_(NULL),
wf_hybrid_pby_exprs_cnt_array_(alloc),
is_related_pair_(false)
wf_hybrid_pby_exprs_cnt_array_(alloc)
{
}

Expand Down
2 changes: 0 additions & 2 deletions src/sql/engine/px/exchange/ob_px_transmit_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,6 @@ class ObPxTransmitSpec : public ObTransmitSpec
int64_t repartition_table_id_; // for pkey, target table location id
ObExpr *wf_hybrid_aggr_status_expr_;
common::ObFixedArray<int64_t, common::ObIAllocator> wf_hybrid_pby_exprs_cnt_array_;
// for random shuffle in nlj and spf
bool is_related_pair_;
};

class ObPxTransmitOp : public ObTransmitOp
Expand Down
6 changes: 4 additions & 2 deletions src/sql/engine/px/exchange/ob_transmit_op.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ ObTransmitSpec::ObTransmitSpec(ObIAllocator &alloc, const ObPhyOperatorType type
slave_mapping_type_(SlaveMappingType::SM_NONE),
has_lgi_(false),
is_rollup_hybrid_(false),
is_wf_hybrid_(false)
is_wf_hybrid_(false),
is_related_child_(false)
{
}

Expand All @@ -63,7 +64,8 @@ OB_SERIALIZE_MEMBER((ObTransmitSpec, ObOpSpec),
has_lgi_,
is_rollup_hybrid_,
null_row_dist_method_,
is_wf_hybrid_);
is_wf_hybrid_,
is_related_child_);

ObTransmitOp::ObTransmitOp(ObExecContext &exec_ctx, const ObOpSpec &spec, ObOpInput *input)
: ObOperator(exec_ctx, spec, input)
Expand Down
4 changes: 4 additions & 0 deletions src/sql/engine/px/exchange/ob_transmit_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ class ObTransmitSpec : public ObOpSpec

// for window function adaptive pushdown
bool is_wf_hybrid_;

// Sometimes, a dfo's construction relies on its child. This flag
// indicates that the current dfo is the child being depended upon
bool is_related_child_;
};

class ObTransmitOp : public ObOperator
Expand Down
8 changes: 4 additions & 4 deletions src/sql/engine/px/ob_dfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ class ObDfo
need_p2p_info_(false),
p2p_dh_map_info_(),
coord_info_ptr_(nullptr),
reference_dfo_id_(common::OB_INVALID_ID)
is_related_pair_(false)
{
}

Expand Down Expand Up @@ -542,8 +542,8 @@ class ObDfo
inline void set_slave_mapping_type(SlaveMappingType v) { slave_mapping_type_ = v; }
inline SlaveMappingType get_slave_mapping_type() { return slave_mapping_type_; }
inline bool is_slave_mapping() { return SlaveMappingType::SM_NONE != slave_mapping_type_; }
inline void set_reference_dfo_id(uint64_t reference_dfo_id) { reference_dfo_id_ = reference_dfo_id; }
inline uint64_t get_reference_dfo_id() { return reference_dfo_id_; }
inline void set_is_related_pair(bool is_related_pair) { is_related_pair_ = is_related_pair; }
inline bool is_related_pair() { return is_related_pair_; }
ObPxPartChMapArray &get_part_ch_map() { return part_ch_map_; }

// DFO 分布,DFO 在各个 server 上的任务状态
Expand Down Expand Up @@ -778,7 +778,7 @@ class ObDfo
// ---------------
ObPxCoordInfo *coord_info_ptr_;
bool force_bushy_;
uint64_t reference_dfo_id_;
bool is_related_pair_;
};


Expand Down
4 changes: 2 additions & 2 deletions src/sql/engine/px/ob_dfo_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -631,8 +631,8 @@ int ObDfoMgr::do_split(ObExecContext &exec_ctx,
parent_dfo->set_slave_mapping_type(transmit->get_slave_mapping_type());
dfo->set_pkey_table_loc_id(
(reinterpret_cast<const ObPxTransmitSpec *>(transmit))->repartition_table_id_);
parent_dfo->set_reference_dfo_id(
reinterpret_cast<const ObPxTransmitSpec *>(transmit)->is_related_pair_ ? dfo->get_dfo_id() : common::OB_INVALID_ID);
parent_dfo->set_is_related_pair(transmit->is_related_child_ || parent_dfo->is_related_pair());
dfo->set_is_related_pair(transmit->is_related_child_ || dfo->is_related_pair());
if (OB_ISNULL(parent_dfo)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("parent dfo should not be null", K(ret));
Expand Down
25 changes: 9 additions & 16 deletions src/sql/engine/px/ob_dfo_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -307,12 +307,11 @@ int ObSerialDfoScheduler::init_all_dfo_channel(ObExecContext &ctx) const
} else if (parent->is_root_dfo() && !parent->is_thread_inited() &&
OB_FAIL(ObPXServerAddrUtil::alloc_by_local_distribution(ctx, *parent))) {
LOG_WARN("fail to alloc local distribution", K(ret));
} else if (!parent->is_root_dfo() &&
ObPQDistributeMethod::PARTITION_HASH == child->get_dist_method()) {
} else if (!parent->is_root_dfo() && parent->is_related_pair()) {
if (OB_FAIL(ObPXServerAddrUtil::alloc_by_reference_child_distribution(
coord_info_.pruning_table_location_,
ctx,
*child, *parent))) {
*parent))) {
LOG_WARN("fail alloc addr by data distribution", K(parent), K(child), K(ret));
}
} else if (!parent->is_root_dfo() && !parent->is_thread_inited() &&
Expand Down Expand Up @@ -1409,12 +1408,13 @@ int ObParallelDfoScheduler::schedule_pair(ObExecContext &exec_ctx,
// sqc。
//
// 下面只实现了第一种、第二种情况,第三种需求不明确,列为 TODO
if (common::OB_INVALID_ID != parent.get_reference_dfo_id() &&
OB_FAIL(ObPXServerAddrUtil::alloc_by_reference_child_distribution(
coord_info_.pruning_table_location_,
exec_ctx,
parent))) {
LOG_WARN("fail alloc addr by reference child distribution", K(parent), K(child), K(ret));
if (parent.is_related_pair()) {
if (OB_FAIL(ObPXServerAddrUtil::alloc_by_reference_child_distribution(
coord_info_.pruning_table_location_,
exec_ctx,
parent))) {
LOG_WARN("fail alloc addr by reference child distribution", K(parent), K(child), K(ret));
}
} else if (parent.has_scan_op() || parent.has_dml_op()) { // 参考 Partial Partition Wise Join
// 当DFO中存在TSC或者pdml中的global index maintain op:
// 1. 当存在TSC情况下,sqcs的location信息使用tsc表的location信息
Expand All @@ -1432,13 +1432,6 @@ int ObParallelDfoScheduler::schedule_pair(ObExecContext &exec_ctx,
LOG_WARN("fail alloc addr by data distribution", K(parent), K(ret));
}
LOG_TRACE("alloc_by_local_distribution", K(parent));
} else if (ObPQDistributeMethod::PARTITION_HASH == child.get_dist_method()) {
if (OB_FAIL(ObPXServerAddrUtil::alloc_by_reference_child_distribution(
coord_info_.pruning_table_location_,
exec_ctx,
child, parent))) {
LOG_WARN("fail alloc addr by data distribution", K(parent), K(child), K(ret));
}
} else if (OB_FAIL(ObPXServerAddrUtil::alloc_by_random_distribution(exec_ctx, child, parent))) {
LOG_WARN("fail alloc addr by data distribution", K(parent), K(child), K(ret));
}
Expand Down
71 changes: 31 additions & 40 deletions src/sql/engine/px/ob_px_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,8 @@ int ObPXServerAddrUtil::alloc_by_child_distribution(const ObDfo &child, ObDfo &p
ObArray<const ObPxSqcMeta *> sqcs;
if (OB_FAIL(child.get_sqcs(sqcs))) {
LOG_WARN("fail get sqcs", K(ret));
} else if (OB_FAIL(generate_dh_map_info(parent))) {
LOG_WARN("fail to generate dh map info", K(ret));
} else {
for (int64_t i = 0; i < sqcs.count() && OB_SUCC(ret); ++i) {
const ObPxSqcMeta &child_sqc = *sqcs.at(i);
Expand All @@ -703,6 +705,13 @@ int ObPXServerAddrUtil::alloc_by_child_distribution(const ObDfo &child, ObDfo &p
sqc.set_qc_server_id(parent.get_qc_server_id());
sqc.set_parent_dfo_id(parent.get_parent_dfo_id());
sqc.get_monitoring_info().assign(child_sqc.get_monitoring_info());
if (OB_SUCC(ret)) {
if (!parent.get_p2p_dh_map_info().is_empty()) {
if (OB_FAIL(sqc.get_p2p_dh_map_info().assign(parent.get_p2p_dh_map_info()))) {
LOG_WARN("fail to assign p2p dh map info", K(ret));
}
}
}
if (OB_FAIL(parent.add_sqc(sqc))) {
LOG_WARN("fail add sqc", K(sqc), K(ret));
}
Expand Down Expand Up @@ -866,52 +875,34 @@ int ObPXServerAddrUtil::alloc_by_local_distribution(ObExecContext &exec_ctx,
* dfo2的sqc来进行构建。
*
*/
int ObPXServerAddrUtil::alloc_by_reference_child_distribution(
const ObIArray<ObTableLocation> *table_locations,
ObExecContext &exec_ctx,
ObDfo &child,
ObDfo &parent)
{
int ret = OB_SUCCESS;
ObDfo *reference_child = nullptr;
if (2 != parent.get_child_count()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("parent should has two child", K(ret));
} else if (OB_FAIL(parent.get_child_dfo(0, reference_child))) {
LOG_WARN("failed to get reference_child", K(ret));
} else if (reference_child->get_dfo_id() == child.get_dfo_id()
&& OB_FAIL(parent.get_child_dfo(1, reference_child))) {
LOG_WARN("failed to get reference_child", K(ret));
} else if (OB_FAIL(alloc_by_data_distribution(table_locations, exec_ctx, *reference_child))) {
LOG_WARN("failed to alloc by data", K(ret));
} else if (OB_FAIL(alloc_by_child_distribution(*reference_child, parent))) {
LOG_WARN("failed to alloc by child distribution", K(ret));
}
return ret;
}

int ObPXServerAddrUtil::alloc_by_reference_child_distribution(
const ObIArray<ObTableLocation> *table_locations,
ObExecContext &exec_ctx,
ObDfo &parent)
{
int ret = OB_SUCCESS;
ObDfo *reference_child = nullptr;
bool found = false;
for (int64_t i = 0; OB_SUCC(ret) && i < parent.get_child_count() && !found; i++) {
OZ (parent.get_child_dfo(i, reference_child));
if (reference_child->get_dfo_id() == parent.get_reference_dfo_id()) {
found = true;
if (0 != parent.get_sqcs_count()) {
/**
* this dfo has been build. do nothing.
*/
} else {
ObDfo *reference_child = nullptr;
bool found = false;
for (int64_t i = 0; OB_SUCC(ret) && i < parent.get_child_count() && !found; i++) {
OZ (parent.get_child_dfo(i, reference_child));
if (reference_child->is_related_pair()) {
found = true;
}
}
}
if (OB_SUCC(ret)) {
if (!found) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to get reference_child", K(ret), K(parent.get_reference_dfo_id()));
} else if (alloc_by_data_distribution(table_locations, exec_ctx, *reference_child)) {
LOG_WARN("failed to alloc by data", K(ret));
} else if (OB_FAIL(alloc_by_child_distribution(*reference_child, parent))) {
LOG_WARN("failed to alloc by child distribution", K(ret));
if (OB_SUCC(ret)) {
if (!found) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to get reference_child", K(ret));
} else if (alloc_by_data_distribution(table_locations, exec_ctx, *reference_child)) {
LOG_WARN("failed to alloc by data", K(ret));
} else if (OB_FAIL(alloc_by_child_distribution(*reference_child, parent))) {
LOG_WARN("failed to alloc by child distribution", K(ret));
}
}
}
return ret;
Expand Down Expand Up @@ -3022,7 +3013,7 @@ int ObSlaveMapUtil::build_pwj_slave_map_mn_group(ObDfo &parent, ObDfo &child, ui
*/
if (parent.get_sqcs_count() != child.get_sqcs_count()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("pwj must have some sqc count", K(ret));
LOG_WARN("pwj must have same sqc count", K(ret));
} else if (OB_FAIL(ObDfo::check_dfo_pair(parent, child, child_dfo_idx))) {
LOG_WARN("failed to check dfo pair", K(ret));
} else if (OB_FAIL(build_mn_channel_per_sqcs(
Expand Down
4 changes: 0 additions & 4 deletions src/sql/engine/px/ob_px_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,6 @@ class ObPXServerAddrUtil
ObDfo &child);
static int alloc_by_local_distribution(ObExecContext &exec_ctx,
ObDfo &root);
static int alloc_by_reference_child_distribution(const ObIArray<ObTableLocation> *table_locations,
ObExecContext &exec_ctx,
ObDfo &child,
ObDfo &parent);
static int alloc_by_reference_child_distribution(const ObIArray<ObTableLocation> *table_locations,
ObExecContext &exec_ctx,
ObDfo &parent);
Expand Down
1 change: 1 addition & 0 deletions src/sql/optimizer/ob_log_exchange.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,7 @@ int ObLogExchange::set_exchange_info(const ObExchangeInfo &exch_info)
null_row_dist_method_ = exch_info.null_row_dist_method_;
slave_mapping_type_ = exch_info.slave_mapping_type_;
if (is_producer()) {
is_related_child_ = exch_info.is_related_child_;
in_server_cnt_ = exch_info.server_cnt_;
slice_count_ = exch_info.slice_count_;
repartition_type_ = exch_info.repartition_type_;
Expand Down
7 changes: 3 additions & 4 deletions src/sql/optimizer/ob_log_exchange.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class ObLogExchange : public ObLogicalOperator
is_old_unblock_mode_(true),
sample_type_(NOT_INIT_SAMPLE_TYPE),
in_server_cnt_(0),
is_related_pair_(false)
is_related_child_(false)
{
repartition_table_id_ = 0;
}
Expand Down Expand Up @@ -188,8 +188,7 @@ class ObLogExchange : public ObLogicalOperator
const ObIArray<ObRawExpr *> &keys);
inline void set_in_server_cnt(int64_t in_server_cnt) { in_server_cnt_ = in_server_cnt; }
inline int64_t get_in_server_cnt() { return in_server_cnt_; }
inline void set_is_related_pair(bool is_related_pair) { is_related_pair_ = is_related_pair; }
inline bool is_related_pair() const { return is_related_pair_; }
inline bool is_related_child() const { return is_related_child_; }
private:
int prepare_px_pruning_param(ObLogicalOperator *op, int64_t &count,
common::ObIArray<const ObDMLStmt *> &stmts, common::ObIArray<int64_t> &drop_expr_idxs);
Expand Down Expand Up @@ -266,7 +265,7 @@ class ObLogExchange : public ObLogicalOperator
ObPxSampleType sample_type_;
// -end pkey range/range
int64_t in_server_cnt_; // for producer, need use exchange in server cnt to compute cost
bool is_related_pair_; // for random shuffle in nlj and spf
bool is_related_child_; // Sometimes, a dfo's construction relies on its child. This flag indicates that the current dfo is the child being depended upon
DISALLOW_COPY_AND_ASSIGN(ObLogExchange);
};
} // end of namespace sql
Expand Down
11 changes: 6 additions & 5 deletions src/sql/optimizer/ob_log_plan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4504,6 +4504,7 @@ int ObLogPlan::compute_join_exchange_info(JoinPath &join_path,
} else {
left_exch_info.dist_method_ = ObPQDistributeMethod::PARTITION_HASH;
right_exch_info.dist_method_ = ObPQDistributeMethod::HASH;
right_exch_info.is_related_child_ = true;
left_exch_info.slave_mapping_type_ = sm_type;
right_exch_info.slave_mapping_type_ = sm_type;
}
Expand Down Expand Up @@ -4535,6 +4536,7 @@ int ObLogPlan::compute_join_exchange_info(JoinPath &join_path,
} else {
left_exch_info.dist_method_ = ObPQDistributeMethod::HASH;
right_exch_info.dist_method_ = ObPQDistributeMethod::PARTITION_HASH;
left_exch_info.is_related_child_ = true;
left_exch_info.slave_mapping_type_ = sm_type;
right_exch_info.slave_mapping_type_ = sm_type;
}
Expand Down Expand Up @@ -4635,8 +4637,9 @@ int ObLogPlan::compute_join_exchange_info(JoinPath &join_path,
task skew. We insert a random shuffle operator on GI to shuffle the data and achieve load balancing.
*/
left_exch_info.dist_method_ = ObPQDistributeMethod::RANDOM;
left_exch_info.is_related_pair_ = true;
} else if (DistAlgo::DIST_ALL_NONE == join_path.join_dist_algo_) {
left_exch_info.is_related_child_ = true;
} else if (DistAlgo::DIST_NONE_ALL == join_path.join_dist_algo_ ||
DistAlgo::DIST_ALL_NONE == join_path.join_dist_algo_) {
// do nothing
} else { /*do nothing*/ }

Expand Down Expand Up @@ -7865,8 +7868,6 @@ int ObLogPlan::allocate_exchange_as_top(ObLogicalOperator *&top,
producer->set_to_producer();
consumer->set_to_consumer();
producer->set_sample_type(exch_info.sample_type_);
consumer->set_is_related_pair(exch_info.is_related_pair_);
producer->set_is_related_pair(exch_info.is_related_pair_);
if (OB_FAIL(producer->set_exchange_info(exch_info))) {
LOG_WARN("failed to set exchange info", K(ret));
} else if (OB_FAIL(producer->compute_property())) {
Expand Down Expand Up @@ -9129,7 +9130,7 @@ int ObLogPlan::create_subplan_filter_plan(ObLogicalOperator *&top,
ObExchangeInfo exch_info;
if (DistAlgo::DIST_NONE_ALL == dist_algo && get_optimizer_context().get_parallel() > 1) {
exch_info.dist_method_ = ObPQDistributeMethod::RANDOM;
exch_info.is_related_pair_ = true;
exch_info.is_related_child_ = true;
if (OB_FAIL(allocate_exchange_as_top(top, exch_info))) {
LOG_WARN("failed to allocate exchange as top");
} else if (OB_FAIL(allocate_subplan_filter_as_top(top,
Expand Down
15 changes: 5 additions & 10 deletions src/sql/optimizer/ob_logical_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ struct ObExchangeInfo
parallel_(ObGlobalHint::UNSET_PARALLEL),
server_cnt_(0),
server_list_(),
is_related_pair_(false)
is_related_child_(false)
{
repartition_table_id_ = 0;
}
Expand Down Expand Up @@ -522,14 +522,9 @@ struct ObExchangeInfo
int64_t parallel_;
int64_t server_cnt_;
common::ObSEArray<common::ObAddr, 4> server_list_;
/**
When data volume of left table in NLJ is small, there is a task skew due to small granule count.
Therefore, we insert random shuffle above left table to achieve load balancing. Exchange operator
splits DFO into two, and DFO with NLJ is purely computing and scheduled to QC machine. So, data
in left table will be sent to NLJ operator via network. To minimize network transmission, we
schedule DFO with NLJ on machines where left table data is located.
*/
bool is_related_pair_;
// Sometimes, a dfo's construction relies on its child. This flag
// indicates that the current dfo is the child being depended upon
bool is_related_child_;

TO_STRING_KV(K_(is_remote),
K_(is_task_order),
Expand Down Expand Up @@ -557,7 +552,7 @@ struct ObExchangeInfo
K_(parallel),
K_(server_cnt),
K_(server_list),
K_(is_related_pair));
K_(is_related_child));
private:
DISALLOW_COPY_AND_ASSIGN(ObExchangeInfo);
};
Expand Down

0 comments on commit 91f69a9

Please sign in to comment.