Skip to content

Commit

Permalink
add random exchange in NLJ and SPF when right child is DAS
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangyuqin1998 committed Jul 26, 2023
1 parent 1511bfc commit 9f44290
Show file tree
Hide file tree
Showing 15 changed files with 283 additions and 11 deletions.
1 change: 1 addition & 0 deletions src/sql/code_generator/ob_static_engine_cg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2792,6 +2792,7 @@ int ObStaticEngineCG::generate_basic_transmit_spec(
spec.is_wf_hybrid_ = op.is_wf_hybrid();
spec.sample_type_ = op.get_sample_type();
spec.repartition_table_id_ = op.get_repartition_table_id();
spec.is_related_pair_ = op.is_related_pair();
OZ(check_rollup_distributor(&spec));
LOG_TRACE("CG transmit", K(op.get_dfo_id()), K(op.get_op_id()),
K(op.get_dist_method()), K(op.get_unmatch_row_dist_method()));
Expand Down
6 changes: 4 additions & 2 deletions src/sql/engine/px/exchange/ob_px_transmit_op.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ int ObPxTransmitOpInput::get_data_ch(ObPxTaskChSet &task_ch_set, int64_t timeout
OB_SERIALIZE_MEMBER((ObPxTransmitSpec, ObTransmitSpec),
sample_type_, need_null_aware_shuffle_, tablet_id_expr_,
random_expr_, sampling_saving_row_, repartition_table_id_,
wf_hybrid_aggr_status_expr_, wf_hybrid_pby_exprs_cnt_array_);
wf_hybrid_aggr_status_expr_, wf_hybrid_pby_exprs_cnt_array_,
is_related_pair_);

ObPxTransmitSpec::ObPxTransmitSpec(ObIAllocator &alloc, const ObPhyOperatorType type)
: ObTransmitSpec(alloc, type),
Expand All @@ -115,7 +116,8 @@ ObPxTransmitSpec::ObPxTransmitSpec(ObIAllocator &alloc, const ObPhyOperatorType
sampling_saving_row_(alloc),
repartition_table_id_(0),
wf_hybrid_aggr_status_expr_(NULL),
wf_hybrid_pby_exprs_cnt_array_(alloc)
wf_hybrid_pby_exprs_cnt_array_(alloc),
is_related_pair_(false)
{
}

Expand Down
2 changes: 2 additions & 0 deletions src/sql/engine/px/exchange/ob_px_transmit_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ class ObPxTransmitSpec : public ObTransmitSpec
int64_t repartition_table_id_; // for pkey, target table location id
ObExpr *wf_hybrid_aggr_status_expr_;
common::ObFixedArray<int64_t, common::ObIAllocator> wf_hybrid_pby_exprs_cnt_array_;
// for random shuffle in nlj and spf
bool is_related_pair_;
};

class ObPxTransmitOp : public ObTransmitOp
Expand Down
7 changes: 5 additions & 2 deletions src/sql/engine/px/ob_dfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,8 @@ class ObDfo
p2p_dh_loc_(nullptr),
need_p2p_info_(false),
p2p_dh_map_info_(),
coord_info_ptr_(nullptr)
coord_info_ptr_(nullptr),
reference_dfo_id_(common::OB_INVALID_ID)
{
}

Expand Down Expand Up @@ -541,7 +542,8 @@ class ObDfo
inline void set_slave_mapping_type(SlaveMappingType v) { slave_mapping_type_ = v; }
inline SlaveMappingType get_slave_mapping_type() { return slave_mapping_type_; }
inline bool is_slave_mapping() { return SlaveMappingType::SM_NONE != slave_mapping_type_; }

inline void set_reference_dfo_id(uint64_t reference_dfo_id) { reference_dfo_id_ = reference_dfo_id; }
inline uint64_t get_reference_dfo_id() { return reference_dfo_id_; }
ObPxPartChMapArray &get_part_ch_map() { return part_ch_map_; }

// DFO 分布,DFO 在各个 server 上的任务状态
Expand Down Expand Up @@ -776,6 +778,7 @@ class ObDfo
// ---------------
ObPxCoordInfo *coord_info_ptr_;
bool force_bushy_;
uint64_t reference_dfo_id_;
};


Expand Down
2 changes: 2 additions & 0 deletions src/sql/engine/px/ob_dfo_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,8 @@ int ObDfoMgr::do_split(ObExecContext &exec_ctx,
parent_dfo->set_slave_mapping_type(transmit->get_slave_mapping_type());
dfo->set_pkey_table_loc_id(
(reinterpret_cast<const ObPxTransmitSpec *>(transmit))->repartition_table_id_);
parent_dfo->set_reference_dfo_id(
reinterpret_cast<const ObPxTransmitSpec *>(transmit)->is_related_pair_ ? dfo->get_dfo_id() : common::OB_INVALID_ID);
if (OB_ISNULL(parent_dfo)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("parent dfo should not be null", K(ret));
Expand Down
8 changes: 7 additions & 1 deletion src/sql/engine/px/ob_dfo_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1407,7 +1407,13 @@ int ObParallelDfoScheduler::schedule_pair(ObExecContext &exec_ctx,
// sqc。
//
// 下面只实现了第一种、第二种情况,第三种需求不明确,列为 TODO
if (parent.has_scan_op() || parent.has_dml_op()) { // 参考 Partial Partition Wise Join
if (common::OB_INVALID_ID != parent.get_reference_dfo_id() &&
OB_FAIL(ObPXServerAddrUtil::alloc_by_reference_child_distribution(
coord_info_.pruning_table_location_,
exec_ctx,
parent))) {
LOG_WARN("fail alloc addr by reference child distribution", K(parent), K(child), K(ret));
} else if (parent.has_scan_op() || parent.has_dml_op()) { // 参考 Partial Partition Wise Join
// 当DFO中存在TSC或者pdml中的global index maintain op:
// 1. 当存在TSC情况下,sqcs的location信息使用tsc表的location信息
// 2. 当是pdml、dml+px情况下,sqcs的locations信息使用DML对应的表的locations
Expand Down
27 changes: 27 additions & 0 deletions src/sql/engine/px/ob_px_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -882,6 +882,33 @@ int ObPXServerAddrUtil::alloc_by_reference_child_distribution(
return ret;
}

int ObPXServerAddrUtil::alloc_by_reference_child_distribution(
const ObIArray<ObTableLocation> *table_locations,
ObExecContext &exec_ctx,
ObDfo &parent)
{
int ret = OB_SUCCESS;
ObDfo *reference_child = nullptr;
bool found = false;
for (int64_t i = 0; OB_SUCC(ret) && i < parent.get_child_count() && !found; i++) {
OZ (parent.get_child_dfo(i, reference_child));
if (reference_child->get_dfo_id() == parent.get_reference_dfo_id()) {
found = true;
}
}
if (OB_SUCC(ret)) {
if (!found) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to get reference_child", K(ret), K(parent.get_reference_dfo_id()));
} else if (alloc_by_data_distribution(table_locations, exec_ctx, *reference_child)) {
LOG_WARN("failed to alloc by data", K(ret));
} else if (OB_FAIL(alloc_by_child_distribution(*reference_child, parent))) {
LOG_WARN("failed to alloc by child distribution", K(ret));
}
}
return ret;
}

int ObPXServerAddrUtil::check_partition_wise_location_valid(DASTabletLocIArray &tsc_locations)
{
int ret = OB_SUCCESS;
Expand Down
3 changes: 3 additions & 0 deletions src/sql/engine/px/ob_px_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ class ObPXServerAddrUtil
ObExecContext &exec_ctx,
ObDfo &child,
ObDfo &parent);
static int alloc_by_reference_child_distribution(const ObIArray<ObTableLocation> *table_locations,
ObExecContext &exec_ctx,
ObDfo &parent);
static int split_parallel_into_task(const int64_t parallelism,
const common::ObIArray<int64_t> &sqc_partition_count,
common::ObIArray<int64_t> &results);
Expand Down
6 changes: 5 additions & 1 deletion src/sql/optimizer/ob_log_exchange.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ class ObLogExchange : public ObLogicalOperator
need_null_aware_shuffle_(false),
is_old_unblock_mode_(true),
sample_type_(NOT_INIT_SAMPLE_TYPE),
in_server_cnt_(0)
in_server_cnt_(0),
is_related_pair_(false)
{
repartition_table_id_ = 0;
}
Expand Down Expand Up @@ -186,6 +187,8 @@ class ObLogExchange : public ObLogicalOperator
const ObIArray<ObRawExpr *> &keys);
inline void set_in_server_cnt(int64_t in_server_cnt) { in_server_cnt_ = in_server_cnt; }
inline int64_t get_in_server_cnt() { return in_server_cnt_; }
inline void set_is_related_pair(bool is_related_pair) { is_related_pair_ = is_related_pair; }
inline bool is_related_pair() const { return is_related_pair_; }
private:
int prepare_px_pruning_param(ObLogicalOperator *op, int64_t &count,
common::ObIArray<const ObDMLStmt *> &stmts, common::ObIArray<int64_t> &drop_expr_idxs);
Expand Down Expand Up @@ -262,6 +265,7 @@ class ObLogExchange : public ObLogicalOperator
ObPxSampleType sample_type_;
// -end pkey range/range
int64_t in_server_cnt_; // for producer, need use exchange in server cnt to compute cost
bool is_related_pair_; // for random shuffle in nlj and spf
DISALLOW_COPY_AND_ASSIGN(ObLogExchange);
};
} // end of namespace sql
Expand Down
34 changes: 31 additions & 3 deletions src/sql/optimizer/ob_log_plan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4627,8 +4627,17 @@ int ObLogPlan::compute_join_exchange_info(JoinPath &join_path,
if (join_path.right_path_->is_sharding() && !join_path.right_path_->contain_fake_cte()) {
right_exch_info.dist_method_ = ObPQDistributeMethod::LOCAL;
}
} else if (DistAlgo::DIST_NONE_ALL == join_path.join_dist_algo_ ||
DistAlgo::DIST_ALL_NONE == join_path.join_dist_algo_) {
} else if (DistAlgo::DIST_NONE_ALL == join_path.join_dist_algo_
&& JoinAlgo::NESTED_LOOP_JOIN == join_path.join_algo_
&& join_path.right_path_->is_access_path()
&& get_optimizer_context().get_parallel() > 1) {
/*
When the data volume is small, the GI operator can only divide a small number of granules, resulting in
task skew. We insert a random shuffle operator on GI to shuffle the data and achieve load balancing.
*/
left_exch_info.dist_method_ = ObPQDistributeMethod::RANDOM;
left_exch_info.is_related_pair_ = true;
} else if (DistAlgo::DIST_ALL_NONE == join_path.join_dist_algo_) {
// do nothing
} else { /*do nothing*/ }

Expand Down Expand Up @@ -7847,6 +7856,8 @@ int ObLogPlan::allocate_exchange_as_top(ObLogicalOperator *&top,
producer->set_to_producer();
consumer->set_to_consumer();
producer->set_sample_type(exch_info.sample_type_);
consumer->set_is_related_pair(exch_info.is_related_pair_);
producer->set_is_related_pair(exch_info.is_related_pair_);
if (OB_FAIL(producer->set_exchange_info(exch_info))) {
LOG_WARN("failed to set exchange info", K(ret));
} else if (OB_FAIL(producer->compute_property())) {
Expand Down Expand Up @@ -9094,7 +9105,24 @@ int ObLogPlan::create_subplan_filter_plan(ObLogicalOperator *&top,
}
if (OB_SUCC(ret)) {
ObExchangeInfo exch_info;
if (DistAlgo::DIST_BASIC_METHOD == dist_algo ||
if (DistAlgo::DIST_NONE_ALL == dist_algo && get_optimizer_context().get_parallel() > 1) {
exch_info.dist_method_ = ObPQDistributeMethod::RANDOM;
exch_info.is_related_pair_ = true;
if (OB_FAIL(allocate_exchange_as_top(top, exch_info))) {
LOG_WARN("failed to allocate exchange as top");
} else if (OB_FAIL(allocate_subplan_filter_as_top(top,
subquery_ops,
query_ref_exprs,
params,
onetime_exprs,
initplan_idxs,
onetime_idxs,
filters,
dist_algo,
is_update_set))) {
LOG_WARN("failed to allocate subplan filter as top", K(ret));
}
} else if (DistAlgo::DIST_BASIC_METHOD == dist_algo ||
DistAlgo::DIST_PARTITION_WISE == dist_algo ||
DistAlgo::DIST_NONE_ALL == dist_algo) {
// is basic or is_partition_wise
Expand Down
14 changes: 12 additions & 2 deletions src/sql/optimizer/ob_logical_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,8 @@ struct ObExchangeInfo
sample_type_(NOT_INIT_SAMPLE_TYPE),
parallel_(ObGlobalHint::UNSET_PARALLEL),
server_cnt_(0),
server_list_()
server_list_(),
is_related_pair_(false)
{
repartition_table_id_ = 0;
}
Expand Down Expand Up @@ -521,6 +522,14 @@ struct ObExchangeInfo
int64_t parallel_;
int64_t server_cnt_;
common::ObSEArray<common::ObAddr, 4> server_list_;
/**
When data volume of left table in NLJ is small, there is a task skew due to small granule count.
Therefore, we insert random shuffle above left table to achieve load balancing. Exchange operator
splits DFO into two, and DFO with NLJ is purely computing and scheduled to QC machine. So, data
in left table will be sent to NLJ operator via network. To minimize network transmission, we
schedule DFO with NLJ on machines where left table data is located.
*/
bool is_related_pair_;

TO_STRING_KV(K_(is_remote),
K_(is_task_order),
Expand All @@ -547,7 +556,8 @@ struct ObExchangeInfo
K_(sample_type),
K_(parallel),
K_(server_cnt),
K_(server_list));
K_(server_list),
K_(is_related_pair));
private:
DISALLOW_COPY_AND_ASSIGN(ObExchangeInfo);
};
Expand Down
69 changes: 69 additions & 0 deletions tools/deploy/mysql_test/test_suite/px/r/mysql/join_nlj.result
Original file line number Diff line number Diff line change
Expand Up @@ -198,3 +198,72 @@ insert into orders values(314268, 314268, 678, "192.168.1.8");
select t1.pname,t2.oid,t2.amount from product t1,orders t2 where t2.oid = 314265 and t1.pid=t2.pid;
pname oid amount
abcd 314265 678
drop table if exists rf_t1, rf_t2;
CREATE TABLE rf_t1 (c1 INT, c2 INT);
CREATE TABLE rf_t2 (c1 INT PRIMARY KEY, c2 INT);
insert into rf_t1 values(1, 1),(2, 2),(3, 3),(4, 4),(5, 5);
insert into rf_t2 values(1, 1),(2, 2),(3, 3),(4, 4),(5, 5);
explain select /*+parallel(1), leading(rf_t1), use_nl(rf_t1, rf_t2) */ * from rf_t1 join rf_t2 on rf_t1.c1 = rf_t2.c1;
Query Plan
========================================================
|ID|OPERATOR |NAME |EST.ROWS|EST.TIME(us)|
--------------------------------------------------------
|0 |NESTED-LOOP JOIN | |5 |83 |
|1 |├─TABLE FULL SCAN |rf_t1|5 |2 |
|2 |└─DISTRIBUTED TABLE GET|rf_t2|1 |16 |
========================================================
Outputs & filters:
-------------------------------------
0 - output([rf_t1.c1], [rf_t1.c2], [rf_t2.c1], [rf_t2.c2]), filter(nil), rowset=256
conds(nil), nl_params_([rf_t1.c1(:0)]), use_batch=true
1 - output([rf_t1.c1], [rf_t1.c2]), filter(nil), rowset=256
access([rf_t1.c1], [rf_t1.c2]), partitions(p0)
is_index_back=false, is_global_index=false,
range_key([rf_t1.__pk_increment]), range(MIN ; MAX)always true
2 - output([rf_t2.c1], [rf_t2.c2]), filter(nil), rowset=256
access([GROUP_ID], [rf_t2.c1], [rf_t2.c2]), partitions(p0)
is_index_back=false, is_global_index=false,
range_key([rf_t2.c1]), range(MIN ; MAX),
range_cond([:0 = rf_t2.c1])
explain select /*+parallel(2), leading(rf_t1), use_nl(rf_t1, rf_t2) */ * from rf_t1 join rf_t2 on rf_t1.c1 = rf_t2.c1;
Query Plan
=======================================================================
|ID|OPERATOR |NAME |EST.ROWS|EST.TIME(us)|
-----------------------------------------------------------------------
|0 |PX COORDINATOR | |5 |51 |
|1 |└─EXCHANGE OUT DISTR |:EX10001|5 |48 |
|2 | └─NESTED-LOOP JOIN | |5 |44 |
|3 | ├─EXCHANGE IN DISTR | |5 |4 |
|4 | │ └─EXCHANGE OUT DISTR (RANDOM)|:EX10000|5 |3 |
|5 | │ └─PX BLOCK ITERATOR | |5 |1 |
|6 | │ └─TABLE FULL SCAN |rf_t1 |5 |1 |
|7 | └─DISTRIBUTED TABLE GET |rf_t2 |1 |16 |
=======================================================================
Outputs & filters:
-------------------------------------
0 - output([INTERNAL_FUNCTION(rf_t1.c1, rf_t1.c2, rf_t2.c1, rf_t2.c2)]), filter(nil), rowset=256
1 - output([INTERNAL_FUNCTION(rf_t1.c1, rf_t1.c2, rf_t2.c1, rf_t2.c2)]), filter(nil), rowset=256
dop=2
2 - output([rf_t1.c1], [rf_t1.c2], [rf_t2.c1], [rf_t2.c2]), filter(nil), rowset=256
conds(nil), nl_params_([rf_t1.c1(:0)]), use_batch=true
3 - output([rf_t1.c1], [rf_t1.c2]), filter(nil), rowset=256
4 - output([rf_t1.c1], [rf_t1.c2]), filter(nil), rowset=256
dop=2
5 - output([rf_t1.c1], [rf_t1.c2]), filter(nil), rowset=256
6 - output([rf_t1.c1], [rf_t1.c2]), filter(nil), rowset=256
access([rf_t1.c1], [rf_t1.c2]), partitions(p0)
is_index_back=false, is_global_index=false,
range_key([rf_t1.__pk_increment]), range(MIN ; MAX)always true
7 - output([rf_t2.c1], [rf_t2.c2]), filter(nil), rowset=256
access([GROUP_ID], [rf_t2.c1], [rf_t2.c2]), partitions(p0)
is_index_back=false, is_global_index=false,
range_key([rf_t2.c1]), range(MIN ; MAX),
range_cond([:0 = rf_t2.c1])
select /*+parallel(2), leading(rf_t1), use_nl(rf_t1, rf_t2) */ * from rf_t1 join rf_t2 on rf_t1.c1 = rf_t2.c1;
c1 c2 c1 c2
1 1 1 1
2 2 2 2
3 3 3 3
4 4 4 4
5 5 5 5
drop table if exists rf_t1, rf_t2;
16 changes: 16 additions & 0 deletions tools/deploy/mysql_test/test_suite/px/t/join_nlj.test
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,20 @@ insert into orders values(314268, 314268, 678, "192.168.1.8");
#explain select t1.pname,t2.oid,t2.amount from product t1,orders t2 where t2.oid = 314265 and t1.pid=t2.pid;
select t1.pname,t2.oid,t2.amount from product t1,orders t2 where t2.oid = 314265 and t1.pid=t2.pid;

# 右表为das时, 左侧使用random shuffle来负载均衡, 但dop为1时不使用此规则
--disable_warnings
drop table if exists rf_t1, rf_t2;
CREATE TABLE rf_t1 (c1 INT, c2 INT);
CREATE TABLE rf_t2 (c1 INT PRIMARY KEY, c2 INT);
--enable_warnings

insert into rf_t1 values(1, 1),(2, 2),(3, 3),(4, 4),(5, 5);
insert into rf_t2 values(1, 1),(2, 2),(3, 3),(4, 4),(5, 5);

explain select /*+parallel(1), leading(rf_t1), use_nl(rf_t1, rf_t2) */ * from rf_t1 join rf_t2 on rf_t1.c1 = rf_t2.c1;
explain select /*+parallel(2), leading(rf_t1), use_nl(rf_t1, rf_t2) */ * from rf_t1 join rf_t2 on rf_t1.c1 = rf_t2.c1;

--sorted_result
select /*+parallel(2), leading(rf_t1), use_nl(rf_t1, rf_t2) */ * from rf_t1 join rf_t2 on rf_t1.c1 = rf_t2.c1;

drop table if exists rf_t1, rf_t2;

0 comments on commit 9f44290

Please sign in to comment.