Skip to content

Commit

Permalink
fix alloc_by_reference_child_distribution
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangyuqin1998 committed Aug 15, 2023
1 parent 48b8129 commit e933fae
Show file tree
Hide file tree
Showing 17 changed files with 78 additions and 181 deletions.
2 changes: 1 addition & 1 deletion src/sql/code_generator/ob_static_engine_cg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2840,7 +2840,7 @@ int ObStaticEngineCG::generate_basic_transmit_spec(
spec.is_wf_hybrid_ = op.is_wf_hybrid();
spec.sample_type_ = op.get_sample_type();
spec.repartition_table_id_ = op.get_repartition_table_id();
spec.is_related_pair_ = op.is_related_pair();
spec.is_related_child_ = op.is_related_child();
OZ(check_rollup_distributor(&spec));
LOG_TRACE("CG transmit", K(op.get_dfo_id()), K(op.get_op_id()),
K(op.get_dist_method()), K(op.get_unmatch_row_dist_method()));
Expand Down
6 changes: 2 additions & 4 deletions src/sql/engine/px/exchange/ob_px_transmit_op.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,7 @@ int ObPxTransmitOpInput::get_data_ch(ObPxTaskChSet &task_ch_set, int64_t timeout
OB_SERIALIZE_MEMBER((ObPxTransmitSpec, ObTransmitSpec),
sample_type_, need_null_aware_shuffle_, tablet_id_expr_,
random_expr_, sampling_saving_row_, repartition_table_id_,
wf_hybrid_aggr_status_expr_, wf_hybrid_pby_exprs_cnt_array_,
is_related_pair_);
wf_hybrid_aggr_status_expr_, wf_hybrid_pby_exprs_cnt_array_);

ObPxTransmitSpec::ObPxTransmitSpec(ObIAllocator &alloc, const ObPhyOperatorType type)
: ObTransmitSpec(alloc, type),
Expand All @@ -116,8 +115,7 @@ ObPxTransmitSpec::ObPxTransmitSpec(ObIAllocator &alloc, const ObPhyOperatorType
sampling_saving_row_(alloc),
repartition_table_id_(0),
wf_hybrid_aggr_status_expr_(NULL),
wf_hybrid_pby_exprs_cnt_array_(alloc),
is_related_pair_(false)
wf_hybrid_pby_exprs_cnt_array_(alloc)
{
}

Expand Down
2 changes: 0 additions & 2 deletions src/sql/engine/px/exchange/ob_px_transmit_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,6 @@ class ObPxTransmitSpec : public ObTransmitSpec
int64_t repartition_table_id_; // for pkey, target table location id
ObExpr *wf_hybrid_aggr_status_expr_;
common::ObFixedArray<int64_t, common::ObIAllocator> wf_hybrid_pby_exprs_cnt_array_;
// for random shuffle in nlj and spf
bool is_related_pair_;
};

class ObPxTransmitOp : public ObTransmitOp
Expand Down
6 changes: 4 additions & 2 deletions src/sql/engine/px/exchange/ob_transmit_op.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ ObTransmitSpec::ObTransmitSpec(ObIAllocator &alloc, const ObPhyOperatorType type
slave_mapping_type_(SlaveMappingType::SM_NONE),
has_lgi_(false),
is_rollup_hybrid_(false),
is_wf_hybrid_(false)
is_wf_hybrid_(false),
is_related_child_(false)
{
}

Expand All @@ -63,7 +64,8 @@ OB_SERIALIZE_MEMBER((ObTransmitSpec, ObOpSpec),
has_lgi_,
is_rollup_hybrid_,
null_row_dist_method_,
is_wf_hybrid_);
is_wf_hybrid_,
is_related_child_);

ObTransmitOp::ObTransmitOp(ObExecContext &exec_ctx, const ObOpSpec &spec, ObOpInput *input)
: ObOperator(exec_ctx, spec, input)
Expand Down
3 changes: 3 additions & 0 deletions src/sql/engine/px/exchange/ob_transmit_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ class ObTransmitSpec : public ObOpSpec

// for window function adaptive pushdown
bool is_wf_hybrid_;

// for random shuffle in nlj & spf
bool is_related_child_;
};

class ObTransmitOp : public ObOperator
Expand Down
8 changes: 4 additions & 4 deletions src/sql/engine/px/ob_dfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ class ObDfo
need_p2p_info_(false),
p2p_dh_map_info_(),
coord_info_ptr_(nullptr),
reference_dfo_id_(common::OB_INVALID_ID)
is_related_pair_(false)
{
}

Expand Down Expand Up @@ -542,8 +542,8 @@ class ObDfo
inline void set_slave_mapping_type(SlaveMappingType v) { slave_mapping_type_ = v; }
inline SlaveMappingType get_slave_mapping_type() { return slave_mapping_type_; }
inline bool is_slave_mapping() { return SlaveMappingType::SM_NONE != slave_mapping_type_; }
inline void set_reference_dfo_id(uint64_t reference_dfo_id) { reference_dfo_id_ = reference_dfo_id; }
inline uint64_t get_reference_dfo_id() { return reference_dfo_id_; }
inline void set_is_related_pair(bool is_related_pair) { is_related_pair_ = is_related_pair; }
inline bool is_related_pair() { return is_related_pair_; }
ObPxPartChMapArray &get_part_ch_map() { return part_ch_map_; }

// DFO 分布,DFO 在各个 server 上的任务状态
Expand Down Expand Up @@ -778,7 +778,7 @@ class ObDfo
// ---------------
ObPxCoordInfo *coord_info_ptr_;
bool force_bushy_;
uint64_t reference_dfo_id_;
bool is_related_pair_;
};


Expand Down
4 changes: 2 additions & 2 deletions src/sql/engine/px/ob_dfo_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -631,8 +631,8 @@ int ObDfoMgr::do_split(ObExecContext &exec_ctx,
parent_dfo->set_slave_mapping_type(transmit->get_slave_mapping_type());
dfo->set_pkey_table_loc_id(
(reinterpret_cast<const ObPxTransmitSpec *>(transmit))->repartition_table_id_);
parent_dfo->set_reference_dfo_id(
reinterpret_cast<const ObPxTransmitSpec *>(transmit)->is_related_pair_ ? dfo->get_dfo_id() : common::OB_INVALID_ID);
parent_dfo->set_is_related_pair(transmit->is_related_child_ || parent_dfo->is_related_pair());
dfo->set_is_related_pair(transmit->is_related_child_ || dfo->is_related_pair());
if (OB_ISNULL(parent_dfo)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("parent dfo should not be null", K(ret));
Expand Down
25 changes: 9 additions & 16 deletions src/sql/engine/px/ob_dfo_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -307,12 +307,11 @@ int ObSerialDfoScheduler::init_all_dfo_channel(ObExecContext &ctx) const
} else if (parent->is_root_dfo() && !parent->is_thread_inited() &&
OB_FAIL(ObPXServerAddrUtil::alloc_by_local_distribution(ctx, *parent))) {
LOG_WARN("fail to alloc local distribution", K(ret));
} else if (!parent->is_root_dfo() &&
ObPQDistributeMethod::PARTITION_HASH == child->get_dist_method()) {
} else if (!parent->is_root_dfo() && parent->is_related_pair()) {
if (OB_FAIL(ObPXServerAddrUtil::alloc_by_reference_child_distribution(
coord_info_.pruning_table_location_,
ctx,
*child, *parent))) {
*parent))) {
LOG_WARN("fail alloc addr by data distribution", K(parent), K(child), K(ret));
}
} else if (!parent->is_root_dfo() && !parent->is_thread_inited() &&
Expand Down Expand Up @@ -1409,12 +1408,13 @@ int ObParallelDfoScheduler::schedule_pair(ObExecContext &exec_ctx,
// sqc。
//
// 下面只实现了第一种、第二种情况,第三种需求不明确,列为 TODO
if (common::OB_INVALID_ID != parent.get_reference_dfo_id() &&
OB_FAIL(ObPXServerAddrUtil::alloc_by_reference_child_distribution(
coord_info_.pruning_table_location_,
exec_ctx,
parent))) {
LOG_WARN("fail alloc addr by reference child distribution", K(parent), K(child), K(ret));
if (parent.is_related_pair()) {
if (OB_FAIL(ObPXServerAddrUtil::alloc_by_reference_child_distribution(
coord_info_.pruning_table_location_,
exec_ctx,
parent))) {
LOG_WARN("fail alloc addr by reference child distribution", K(parent), K(child), K(ret));
}
} else if (parent.has_scan_op() || parent.has_dml_op()) { // 参考 Partial Partition Wise Join
// 当DFO中存在TSC或者pdml中的global index maintain op:
// 1. 当存在TSC情况下,sqcs的location信息使用tsc表的location信息
Expand All @@ -1432,13 +1432,6 @@ int ObParallelDfoScheduler::schedule_pair(ObExecContext &exec_ctx,
LOG_WARN("fail alloc addr by data distribution", K(parent), K(ret));
}
LOG_TRACE("alloc_by_local_distribution", K(parent));
} else if (ObPQDistributeMethod::PARTITION_HASH == child.get_dist_method()) {
if (OB_FAIL(ObPXServerAddrUtil::alloc_by_reference_child_distribution(
coord_info_.pruning_table_location_,
exec_ctx,
child, parent))) {
LOG_WARN("fail alloc addr by data distribution", K(parent), K(child), K(ret));
}
} else if (OB_FAIL(ObPXServerAddrUtil::alloc_by_random_distribution(exec_ctx, child, parent))) {
LOG_WARN("fail alloc addr by data distribution", K(parent), K(child), K(ret));
}
Expand Down
71 changes: 31 additions & 40 deletions src/sql/engine/px/ob_px_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,8 @@ int ObPXServerAddrUtil::alloc_by_child_distribution(const ObDfo &child, ObDfo &p
ObArray<const ObPxSqcMeta *> sqcs;
if (OB_FAIL(child.get_sqcs(sqcs))) {
LOG_WARN("fail get sqcs", K(ret));
} else if (OB_FAIL(generate_dh_map_info(parent))) {
LOG_WARN("fail to generate dh map info", K(ret));
} else {
for (int64_t i = 0; i < sqcs.count() && OB_SUCC(ret); ++i) {
const ObPxSqcMeta &child_sqc = *sqcs.at(i);
Expand All @@ -703,6 +705,13 @@ int ObPXServerAddrUtil::alloc_by_child_distribution(const ObDfo &child, ObDfo &p
sqc.set_qc_server_id(parent.get_qc_server_id());
sqc.set_parent_dfo_id(parent.get_parent_dfo_id());
sqc.get_monitoring_info().assign(child_sqc.get_monitoring_info());
if (OB_SUCC(ret)) {
if (!parent.get_p2p_dh_map_info().is_empty()) {
if (OB_FAIL(sqc.get_p2p_dh_map_info().assign(parent.get_p2p_dh_map_info()))) {
LOG_WARN("fail to assign p2p dh map info", K(ret));
}
}
}
if (OB_FAIL(parent.add_sqc(sqc))) {
LOG_WARN("fail add sqc", K(sqc), K(ret));
}
Expand Down Expand Up @@ -866,52 +875,34 @@ int ObPXServerAddrUtil::alloc_by_local_distribution(ObExecContext &exec_ctx,
* dfo2的sqc来进行构建。
*
*/
int ObPXServerAddrUtil::alloc_by_reference_child_distribution(
const ObIArray<ObTableLocation> *table_locations,
ObExecContext &exec_ctx,
ObDfo &child,
ObDfo &parent)
{
int ret = OB_SUCCESS;
ObDfo *reference_child = nullptr;
if (2 != parent.get_child_count()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("parent should has two child", K(ret));
} else if (OB_FAIL(parent.get_child_dfo(0, reference_child))) {
LOG_WARN("failed to get reference_child", K(ret));
} else if (reference_child->get_dfo_id() == child.get_dfo_id()
&& OB_FAIL(parent.get_child_dfo(1, reference_child))) {
LOG_WARN("failed to get reference_child", K(ret));
} else if (OB_FAIL(alloc_by_data_distribution(table_locations, exec_ctx, *reference_child))) {
LOG_WARN("failed to alloc by data", K(ret));
} else if (OB_FAIL(alloc_by_child_distribution(*reference_child, parent))) {
LOG_WARN("failed to alloc by child distribution", K(ret));
}
return ret;
}

int ObPXServerAddrUtil::alloc_by_reference_child_distribution(
const ObIArray<ObTableLocation> *table_locations,
ObExecContext &exec_ctx,
ObDfo &parent)
{
int ret = OB_SUCCESS;
ObDfo *reference_child = nullptr;
bool found = false;
for (int64_t i = 0; OB_SUCC(ret) && i < parent.get_child_count() && !found; i++) {
OZ (parent.get_child_dfo(i, reference_child));
if (reference_child->get_dfo_id() == parent.get_reference_dfo_id()) {
found = true;
if (0 != parent.get_sqcs_count()) {
/**
* this dfo has been build. do nothing.
*/
} else {
ObDfo *reference_child = nullptr;
bool found = false;
for (int64_t i = 0; OB_SUCC(ret) && i < parent.get_child_count() && !found; i++) {
OZ (parent.get_child_dfo(i, reference_child));
if (reference_child->is_related_pair()) {
found = true;
}
}
}
if (OB_SUCC(ret)) {
if (!found) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to get reference_child", K(ret), K(parent.get_reference_dfo_id()));
} else if (alloc_by_data_distribution(table_locations, exec_ctx, *reference_child)) {
LOG_WARN("failed to alloc by data", K(ret));
} else if (OB_FAIL(alloc_by_child_distribution(*reference_child, parent))) {
LOG_WARN("failed to alloc by child distribution", K(ret));
if (OB_SUCC(ret)) {
if (!found) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to get reference_child", K(ret));
} else if (alloc_by_data_distribution(table_locations, exec_ctx, *reference_child)) {
LOG_WARN("failed to alloc by data", K(ret));
} else if (OB_FAIL(alloc_by_child_distribution(*reference_child, parent))) {
LOG_WARN("failed to alloc by child distribution", K(ret));
}
}
}
return ret;
Expand Down Expand Up @@ -3022,7 +3013,7 @@ int ObSlaveMapUtil::build_pwj_slave_map_mn_group(ObDfo &parent, ObDfo &child, ui
*/
if (parent.get_sqcs_count() != child.get_sqcs_count()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("pwj must have some sqc count", K(ret));
LOG_WARN("pwj must have same sqc count", K(ret));
} else if (OB_FAIL(ObDfo::check_dfo_pair(parent, child, child_dfo_idx))) {
LOG_WARN("failed to check dfo pair", K(ret));
} else if (OB_FAIL(build_mn_channel_per_sqcs(
Expand Down
4 changes: 0 additions & 4 deletions src/sql/engine/px/ob_px_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,6 @@ class ObPXServerAddrUtil
ObDfo &child);
static int alloc_by_local_distribution(ObExecContext &exec_ctx,
ObDfo &root);
static int alloc_by_reference_child_distribution(const ObIArray<ObTableLocation> *table_locations,
ObExecContext &exec_ctx,
ObDfo &child,
ObDfo &parent);
static int alloc_by_reference_child_distribution(const ObIArray<ObTableLocation> *table_locations,
ObExecContext &exec_ctx,
ObDfo &parent);
Expand Down
1 change: 1 addition & 0 deletions src/sql/optimizer/ob_log_exchange.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,7 @@ int ObLogExchange::set_exchange_info(const ObExchangeInfo &exch_info)
null_row_dist_method_ = exch_info.null_row_dist_method_;
slave_mapping_type_ = exch_info.slave_mapping_type_;
if (is_producer()) {
is_related_child_ = exch_info.is_related_child_;
in_server_cnt_ = exch_info.server_cnt_;
slice_count_ = exch_info.slice_count_;
repartition_type_ = exch_info.repartition_type_;
Expand Down
7 changes: 3 additions & 4 deletions src/sql/optimizer/ob_log_exchange.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class ObLogExchange : public ObLogicalOperator
is_old_unblock_mode_(true),
sample_type_(NOT_INIT_SAMPLE_TYPE),
in_server_cnt_(0),
is_related_pair_(false)
is_related_child_(false)
{
repartition_table_id_ = 0;
}
Expand Down Expand Up @@ -188,8 +188,7 @@ class ObLogExchange : public ObLogicalOperator
const ObIArray<ObRawExpr *> &keys);
inline void set_in_server_cnt(int64_t in_server_cnt) { in_server_cnt_ = in_server_cnt; }
inline int64_t get_in_server_cnt() { return in_server_cnt_; }
inline void set_is_related_pair(bool is_related_pair) { is_related_pair_ = is_related_pair; }
inline bool is_related_pair() const { return is_related_pair_; }
inline bool is_related_child() const { return is_related_child_; }
private:
int prepare_px_pruning_param(ObLogicalOperator *op, int64_t &count,
common::ObIArray<const ObDMLStmt *> &stmts, common::ObIArray<int64_t> &drop_expr_idxs);
Expand Down Expand Up @@ -266,7 +265,7 @@ class ObLogExchange : public ObLogicalOperator
ObPxSampleType sample_type_;
// -end pkey range/range
int64_t in_server_cnt_; // for producer, need use exchange in server cnt to compute cost
bool is_related_pair_; // for random shuffle in nlj and spf
bool is_related_child_; // for random shuffle in nlj and spf
DISALLOW_COPY_AND_ASSIGN(ObLogExchange);
};
} // end of namespace sql
Expand Down
11 changes: 6 additions & 5 deletions src/sql/optimizer/ob_log_plan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4504,6 +4504,7 @@ int ObLogPlan::compute_join_exchange_info(JoinPath &join_path,
} else {
left_exch_info.dist_method_ = ObPQDistributeMethod::PARTITION_HASH;
right_exch_info.dist_method_ = ObPQDistributeMethod::HASH;
right_exch_info.is_related_child_ = true;
left_exch_info.slave_mapping_type_ = sm_type;
right_exch_info.slave_mapping_type_ = sm_type;
}
Expand Down Expand Up @@ -4535,6 +4536,7 @@ int ObLogPlan::compute_join_exchange_info(JoinPath &join_path,
} else {
left_exch_info.dist_method_ = ObPQDistributeMethod::HASH;
right_exch_info.dist_method_ = ObPQDistributeMethod::PARTITION_HASH;
left_exch_info.is_related_child_ = true;
left_exch_info.slave_mapping_type_ = sm_type;
right_exch_info.slave_mapping_type_ = sm_type;
}
Expand Down Expand Up @@ -4635,8 +4637,9 @@ int ObLogPlan::compute_join_exchange_info(JoinPath &join_path,
task skew. We insert a random shuffle operator on GI to shuffle the data and achieve load balancing.
*/
left_exch_info.dist_method_ = ObPQDistributeMethod::RANDOM;
left_exch_info.is_related_pair_ = true;
} else if (DistAlgo::DIST_ALL_NONE == join_path.join_dist_algo_) {
left_exch_info.is_related_child_ = true;
} else if (DistAlgo::DIST_NONE_ALL == join_path.join_dist_algo_ ||
DistAlgo::DIST_ALL_NONE == join_path.join_dist_algo_) {
// do nothing
} else { /*do nothing*/ }

Expand Down Expand Up @@ -7865,8 +7868,6 @@ int ObLogPlan::allocate_exchange_as_top(ObLogicalOperator *&top,
producer->set_to_producer();
consumer->set_to_consumer();
producer->set_sample_type(exch_info.sample_type_);
consumer->set_is_related_pair(exch_info.is_related_pair_);
producer->set_is_related_pair(exch_info.is_related_pair_);
if (OB_FAIL(producer->set_exchange_info(exch_info))) {
LOG_WARN("failed to set exchange info", K(ret));
} else if (OB_FAIL(producer->compute_property())) {
Expand Down Expand Up @@ -9129,7 +9130,7 @@ int ObLogPlan::create_subplan_filter_plan(ObLogicalOperator *&top,
ObExchangeInfo exch_info;
if (DistAlgo::DIST_NONE_ALL == dist_algo && get_optimizer_context().get_parallel() > 1) {
exch_info.dist_method_ = ObPQDistributeMethod::RANDOM;
exch_info.is_related_pair_ = true;
exch_info.is_related_child_ = true;
if (OB_FAIL(allocate_exchange_as_top(top, exch_info))) {
LOG_WARN("failed to allocate exchange as top");
} else if (OB_FAIL(allocate_subplan_filter_as_top(top,
Expand Down
6 changes: 3 additions & 3 deletions src/sql/optimizer/ob_logical_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ struct ObExchangeInfo
parallel_(ObGlobalHint::UNSET_PARALLEL),
server_cnt_(0),
server_list_(),
is_related_pair_(false)
is_related_child_(false)
{
repartition_table_id_ = 0;
}
Expand Down Expand Up @@ -529,7 +529,7 @@ struct ObExchangeInfo
in left table will be sent to NLJ operator via network. To minimize network transmission, we
schedule DFO with NLJ on machines where left table data is located.
*/
bool is_related_pair_;
bool is_related_child_;

TO_STRING_KV(K_(is_remote),
K_(is_task_order),
Expand Down Expand Up @@ -557,7 +557,7 @@ struct ObExchangeInfo
K_(parallel),
K_(server_cnt),
K_(server_list),
K_(is_related_pair));
K_(is_related_child));
private:
DISALLOW_COPY_AND_ASSIGN(ObExchangeInfo);
};
Expand Down

0 comments on commit e933fae

Please sign in to comment.