Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aggregate function pushdown to different decoders #1488

Open
wants to merge 17 commits into
base: master
Choose a base branch
from

Conversation

jgszxlyh
Copy link

@jgszxlyh jgszxlyh commented Aug 1, 2023

Task Description

ref #1487 .

Solution Description

Add aggregate functions for RAW/DICT/RLE/CONST/INTER_DIFF decoders.

Passed Regressions

Run Unittest for ObMicroBlockDecoder::get_min_or_max() to check col_decoder.get_aggregate_result() .The following tables shows the speedup ratios before and after optimization in four different situations. The ordinate represents the number of rows in the microblock. In order to fully test the supported data types, each column of the microblock has a different data type.

RAW

only optimized data with storage types of IntSC/UIntSC, only these data types were tested.

INTER_DIFF

only optimized for reading the entire microblock and taking out the min.

Unit test

Value

the speedup ratio is calculated from the average time of 100 runs for all columns.

Whole/Part

whole indicates that the whole microblock needs to be read, and part indicates that only part of the microblock within the row_cap range needs to be read.

RAW

ROWCNT whole&part
128 7.35
256 10.85
512 13.27

DICT

ROWCNT whole part
64 77.71 14.26
128 129.64 15.24
256 293.84 16.79

RLE

ROWCNT whole part
64 31.86 30.25
128 37.88 36.26
256 44.27 41.84

CONST

ROWCNT whole part
64 82.38 69.47
128 161.51 111.57
256 355.81 196.77

INTER_DIFF

ROWCNT whole&min
64 18.33
128 31.00
256 53.00

Data value examples of values used for different encodings.

Data value example for RAW/DICT/RLE/INTER_DIFF

A A A ... A B B

Data value example for CONST

A A A ... A A A

Mysql test

Experimental environment

Since the optimization object is for the encoded data, all experiments are run after manually triggering the MAJOR FREEZE.

For hot and cold queries, the experimental results are the average time of multiple hot queries, which is the result of effectively using the cache.

Data value example for mysql test

Data value for DICT

A B C A B C ... A B C

Data value for RLE

A ... A B ... B C ... C ... A ... A B ... B C ... C

Data value for CONST

A A A ... A A A

Data value for INTER_DIFF

10000000 10000001 10000002 ... 19999997 19999998 19999999

Data value for RAW

A B G ... F L K

Result of mysql test

scan whole table
SQL:SELECT min(value) FROM table;

   data_type int number char varchar
encoding_type row_count o_time(ms) n_time(ms) o_time(ms) n_time(ms) o_time(ms) n_time(ms) o_time(ms) n_time(ms)
DICT 6000000 618 30 686 30 911 31 977 31
CONST 6000000 428 30 524 29 836 29 995 29
RLE 6000000 541 30 589 31 829 34 965 32
INTER_DIFF(min) 6000000 883 53 / / / / / /
RAW 6000000 532 114 / / / / / /

scan part table with different filter rate
SQL:select min(v_num) from const where id <60000;

    Filter range 1.0% 10.0% 100.0%
encoding_type row_count data_type o_time(ms) n_time(ms) o_time(ms) n_time(ms) o_time(ms) n_time(ms)
DICT 6000000 number 10 4 72 6 680 26
CONST 6000000 number 8 2 56 5 524 25
RLE 6000000 number 9 3 64 5 586 26
INTER_DIFF(min) 6000000 int 9 4 57 6 521 29
RAW 6000000 int 9 4 57 16 521 113

@CLAassistant
Copy link

CLAassistant commented Aug 1, 2023

CLA assistant check
All committers have signed the CLA.

@jgszxlyh jgszxlyh changed the title {issue #1487} Aggregate function pushdown to different decoders Aug 1, 2023
@haitaoy
Copy link
Contributor

haitaoy commented Aug 1, 2023

Please add some explanation and comparison results.

}
} else if (const_ref == dict_decoder_.get_dict_header()->count_) {
// Const value is null
LOG_INFO("No const", K(ret));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

info日志可以去掉,默认级别会打印

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
LOG_INFO("No const", K(ret));
}
} else if (const_ref == dict_decoder_.get_dict_header()->count_) {
// Const value is null
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

已修改

ObDictDecoderIterator dict_iter = dict_decoder_.begin(&ctx, dict_meta_length);
ObObj& const_obj = *(dict_iter + meta_header_->const_ref_);
if (const_obj.is_fixed_len_char_type() && nullptr != ctx.col_param_) {
if (OB_FAIL(storage::pad_column(ctx.col_param_->get_accuracy(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

应该不需要pad

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

收到,已修改

next_except_row_id = row_id_arr.at_(meta_header_->payload_ + count, except_table_pos);
if (except_table_pos == count || row_id != next_except_row_id) {
} else {
*curr_ref = reinterpret_cast<const uint8_t *>(meta_header_->payload_)[except_table_pos];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个和上面代码是重复的,最好抽出来吧

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

收到,已修改

if (row_cap > 1) {
monotonic_inc = row_ids[1] > row_ids[0];
}
int64_t step = monotonic_inc ? 1 : -1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

收到,已修改

if (meta_header_->is_sorted_dict()) {
if (!agg_info.get_is_min()) {
traverse_it = end_it;
if((*traverse_it).is_null()){
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里有可能是null么

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

是存在这种可能的

} else if (OB_FAIL(agg_info.update_min_or_max(datum_buf[i]))){
LOG_WARN("Failed to update_min_or_max", K(ret), K(datum_buf[i]), K(agg_info));
}
++i;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

也可以直接写在for里面

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

收到,已修改

} else {
if ((ref < count) && !ref_map.test(ref)){
if (meta_header_->is_sorted_dict()){
if ((!agg_info.get_is_min() && res_ref < ref)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

res_ref并没有更新

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

收到,已修改

if (meta_header_->is_sorted_dict()){
if ((!agg_info.get_is_min() && res_ref < ref)
|| (agg_info.get_is_min() && res_ref > ref)){
decode(ctx.obj_meta_, cell, ref, ctx.col_header_->length_);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

需要判断错误码

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

收到,已修改

}
}
} else {
decode(ctx.obj_meta_, cell, ref, ctx.col_header_->length_);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

需要判断错误码

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

收到,已修改

ObDatum *datum) const
{
int ret = OB_SUCCESS;
switch (ctx.col_header_->type_){
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个是不需的,直接返回OB_NOT_SUPPORTED就可以,子类覆盖了实现

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

收到,已修改

LOG_WARN("fail to update_min_or_max", K(ret), K(i), K(datum_buf[i]), K(agg_info));
if (OB_FAIL(decoders_[col_id].decoder_->get_aggregate_result(
*column_decoder->ctx_,row_ids,row_cap,agg_info,datum_buf))){
LOG_WARN("Unsupported encoding type to get aggregate result", K(ret), K(col_id), K(row_cap));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

应该只有返回的ret=ob_not_supporeted时才需要回退处理,其他情况报错退出

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

收到,已修改

const ObColumnHeader *col_header = ctx.col_header_;
const ObObjTypeStoreClass store_class =
get_store_class_map()[ob_obj_type_class(ctx.col_header_->get_store_obj_type())];
if (col_header->is_fix_length()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

非fix length可以支持么?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

变长列读取需要MEMCPY操作,优化效果应该不明显

struct RawFixIntGetMinMaxFunc_T
{
static void raw_fix_int_get_min_or_max_func(
const int64_t col_len,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个没用到不需要传?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

收到,已修改

@@ -120,6 +120,88 @@ int ObRLEDecoder::get_null_count(
return ret;
}

int ObRLEDecoder::get_aggregate_result(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里看下面实现和字典是类似的,可以直接调用字典实现?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

整个微块部分可以直接调用字典实现,部分微块与const相似,存有next_ref_row_id信息,应该比直接调用字典更快。

{
filter_pushdown_comaprison_neg_test();
}
// TEST_F(TestIntBaseDiffDecoder, filter_pushdown_comaprison_neg_test)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

原来的测试用例后面要保留的

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

收到,已修改

if (OB_FAIL(read_ref(row_id, ctx.is_bit_packing(), col_data, ref))) {
LOG_WARN("Failed to read reference for dictionary", K(ret), K(col_data), K(row_id));
} else {
if ((ref < count) && !ref_map.test(ref)){
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

else if 合一起吧

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

收到,已修改

const int64_t dict_meta_length = ctx.col_header_->length_ - meta_header_->offset_;
if (dict_count > 0) {
if(row_cap == ctx.micro_block_header_->row_count_){
dict_decoder_.get_aggregate_result(ctx, row_ids, row_cap, agg_info, datum_buf);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

判断错误码

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

收到,已修改

agg_min_or_max_test(false, false);
}

// TEST_F(TestConstDecoder, batch_decode_to_datum_test_with_expection)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个注释掉的恢复下吧

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

收到,已修改

for (int64_t i = 0; OB_SUCC(ret) && traverse_it != end_it; ++traverse_it, ++i ){
if (OB_FAIL(datum_buf[i].from_obj(*traverse_it))){
LOG_WARN("Failed to trans to datum",K(ret),K(*traverse_it));
} else if (OB_FAIL(agg_info.update_min_or_max(datum_buf[i]))){

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这部分逻辑重复很多, update_min_or_max支持obj可能会好一点

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

收到,已修改

res_ref = ref;
}
}
if(OB_FAIL(decode(ctx.obj_meta_, cell, res_ref, ctx.col_header_->length_))){

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

decode和update的逻辑, 应该都是能共享代码的

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

收到,已修改

ctx, row_ids, row_cap, datum_len, data_offset, datum_buf))) {
LOG_WARN("Failed to batch unpack delta values", K(ret), K(ctx));
}
for (int64_t i = 0; i < row_cap; ++i) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

丢错误码

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

收到,已修改

ctx, row_ids, row_cap, datum_len, data_offset, datum_buf))) {
LOG_WARN("Failed to batch unpack delta values", K(ret), K(ctx));
}
for (int64_t i = 0; i < row_cap; ++i) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

丢错误码

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

收到,已修改

}
}
res_value += base_;
MEMCPY(const_cast<char *>(datum_buf[0].ptr_), &res_value, datum_len);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

datum应该有set int接口的?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

收到,已修改

if (OB_FAIL(get_col_datums(col_id, row_ids, cell_datas, row_cap, datum_buf))) {
LOG_WARN("Failed to get col datums", K(ret), K(col_id), K(row_cap));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < row_cap; ++i) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里尽量不要重复用i了

}
}
*reinterpret_cast<DatumType *>(const_cast<char *>(datum.ptr_)) = tmp;
datum.pack_ = sizeof(DatumType);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

int类型的datum长度应该都是8, 这里也一样可以直接用set int接口

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

收到,已修改

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants