-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Aggregate function pushdown to different decoders #1488
base: master
Are you sure you want to change the base?
Conversation
…nter_diff decoder
…nter_diff decoder
…nter_diff decoder
Please add some explanation and comparison results. |
} | ||
} else if (const_ref == dict_decoder_.get_dict_header()->count_) { | ||
// Const value is null | ||
LOG_INFO("No const", K(ret)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
info日志可以去掉,默认级别会打印
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LOG_INFO("No const", K(ret)); | |
} | |
} else if (const_ref == dict_decoder_.get_dict_header()->count_) { | |
// Const value is null | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
已修改
ObDictDecoderIterator dict_iter = dict_decoder_.begin(&ctx, dict_meta_length); | ||
ObObj& const_obj = *(dict_iter + meta_header_->const_ref_); | ||
if (const_obj.is_fixed_len_char_type() && nullptr != ctx.col_param_) { | ||
if (OB_FAIL(storage::pad_column(ctx.col_param_->get_accuracy(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
应该不需要pad
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
收到,已修改
next_except_row_id = row_id_arr.at_(meta_header_->payload_ + count, except_table_pos); | ||
if (except_table_pos == count || row_id != next_except_row_id) { | ||
} else { | ||
*curr_ref = reinterpret_cast<const uint8_t *>(meta_header_->payload_)[except_table_pos]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个和上面代码是重复的,最好抽出来吧
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
收到,已修改
if (row_cap > 1) { | ||
monotonic_inc = row_ids[1] > row_ids[0]; | ||
} | ||
int64_t step = monotonic_inc ? 1 : -1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
收到,已修改
if (meta_header_->is_sorted_dict()) { | ||
if (!agg_info.get_is_min()) { | ||
traverse_it = end_it; | ||
if((*traverse_it).is_null()){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里有可能是null么
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
是存在这种可能的
} else if (OB_FAIL(agg_info.update_min_or_max(datum_buf[i]))){ | ||
LOG_WARN("Failed to update_min_or_max", K(ret), K(datum_buf[i]), K(agg_info)); | ||
} | ||
++i; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
也可以直接写在for里面
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
收到,已修改
} else { | ||
if ((ref < count) && !ref_map.test(ref)){ | ||
if (meta_header_->is_sorted_dict()){ | ||
if ((!agg_info.get_is_min() && res_ref < ref) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
res_ref并没有更新
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
收到,已修改
if (meta_header_->is_sorted_dict()){ | ||
if ((!agg_info.get_is_min() && res_ref < ref) | ||
|| (agg_info.get_is_min() && res_ref > ref)){ | ||
decode(ctx.obj_meta_, cell, ref, ctx.col_header_->length_); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
需要判断错误码
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
收到,已修改
} | ||
} | ||
} else { | ||
decode(ctx.obj_meta_, cell, ref, ctx.col_header_->length_); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
需要判断错误码
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
收到,已修改
ObDatum *datum) const | ||
{ | ||
int ret = OB_SUCCESS; | ||
switch (ctx.col_header_->type_){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个是不需的,直接返回OB_NOT_SUPPORTED就可以,子类覆盖了实现
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
收到,已修改
LOG_WARN("fail to update_min_or_max", K(ret), K(i), K(datum_buf[i]), K(agg_info)); | ||
if (OB_FAIL(decoders_[col_id].decoder_->get_aggregate_result( | ||
*column_decoder->ctx_,row_ids,row_cap,agg_info,datum_buf))){ | ||
LOG_WARN("Unsupported encoding type to get aggregate result", K(ret), K(col_id), K(row_cap)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
应该只有返回的ret=ob_not_supporeted时才需要回退处理,其他情况报错退出
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
收到,已修改
const ObColumnHeader *col_header = ctx.col_header_; | ||
const ObObjTypeStoreClass store_class = | ||
get_store_class_map()[ob_obj_type_class(ctx.col_header_->get_store_obj_type())]; | ||
if (col_header->is_fix_length()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
非fix length可以支持么?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
变长列读取需要MEMCPY操作,优化效果应该不明显
struct RawFixIntGetMinMaxFunc_T | ||
{ | ||
static void raw_fix_int_get_min_or_max_func( | ||
const int64_t col_len, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个没用到不需要传?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
收到,已修改
@@ -120,6 +120,88 @@ int ObRLEDecoder::get_null_count( | |||
return ret; | |||
} | |||
|
|||
int ObRLEDecoder::get_aggregate_result( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里看下面实现和字典是类似的,可以直接调用字典实现?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
整个微块部分可以直接调用字典实现,部分微块与const相似,存有next_ref_row_id信息,应该比直接调用字典更快。
{ | ||
filter_pushdown_comaprison_neg_test(); | ||
} | ||
// TEST_F(TestIntBaseDiffDecoder, filter_pushdown_comaprison_neg_test) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
原来的测试用例后面要保留的
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
收到,已修改
if (OB_FAIL(read_ref(row_id, ctx.is_bit_packing(), col_data, ref))) { | ||
LOG_WARN("Failed to read reference for dictionary", K(ret), K(col_data), K(row_id)); | ||
} else { | ||
if ((ref < count) && !ref_map.test(ref)){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
else if 合一起吧
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
收到,已修改
const int64_t dict_meta_length = ctx.col_header_->length_ - meta_header_->offset_; | ||
if (dict_count > 0) { | ||
if(row_cap == ctx.micro_block_header_->row_count_){ | ||
dict_decoder_.get_aggregate_result(ctx, row_ids, row_cap, agg_info, datum_buf); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
判断错误码
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
收到,已修改
agg_min_or_max_test(false, false); | ||
} | ||
|
||
// TEST_F(TestConstDecoder, batch_decode_to_datum_test_with_expection) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个注释掉的恢复下吧
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
收到,已修改
for (int64_t i = 0; OB_SUCC(ret) && traverse_it != end_it; ++traverse_it, ++i ){ | ||
if (OB_FAIL(datum_buf[i].from_obj(*traverse_it))){ | ||
LOG_WARN("Failed to trans to datum",K(ret),K(*traverse_it)); | ||
} else if (OB_FAIL(agg_info.update_min_or_max(datum_buf[i]))){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这部分逻辑重复很多, update_min_or_max支持obj可能会好一点
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
收到,已修改
res_ref = ref; | ||
} | ||
} | ||
if(OB_FAIL(decode(ctx.obj_meta_, cell, res_ref, ctx.col_header_->length_))){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
decode和update的逻辑, 应该都是能共享代码的
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
收到,已修改
ctx, row_ids, row_cap, datum_len, data_offset, datum_buf))) { | ||
LOG_WARN("Failed to batch unpack delta values", K(ret), K(ctx)); | ||
} | ||
for (int64_t i = 0; i < row_cap; ++i) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
丢错误码
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
收到,已修改
ctx, row_ids, row_cap, datum_len, data_offset, datum_buf))) { | ||
LOG_WARN("Failed to batch unpack delta values", K(ret), K(ctx)); | ||
} | ||
for (int64_t i = 0; i < row_cap; ++i) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
丢错误码
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
收到,已修改
} | ||
} | ||
res_value += base_; | ||
MEMCPY(const_cast<char *>(datum_buf[0].ptr_), &res_value, datum_len); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
datum应该有set int接口的?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
收到,已修改
if (OB_FAIL(get_col_datums(col_id, row_ids, cell_datas, row_cap, datum_buf))) { | ||
LOG_WARN("Failed to get col datums", K(ret), K(col_id), K(row_cap)); | ||
} else { | ||
for (int64_t i = 0; OB_SUCC(ret) && i < row_cap; ++i) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里尽量不要重复用i了
} | ||
} | ||
*reinterpret_cast<DatumType *>(const_cast<char *>(datum.ptr_)) = tmp; | ||
datum.pack_ = sizeof(DatumType); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
int类型的datum长度应该都是8, 这里也一样可以直接用set int接口
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
收到,已修改
Task Description
ref #1487 .
Solution Description
Add aggregate functions for RAW/DICT/RLE/CONST/INTER_DIFF decoders.
Passed Regressions
Run Unittest for ObMicroBlockDecoder::get_min_or_max() to check col_decoder.get_aggregate_result() .The following tables shows the speedup ratios before and after optimization in four different situations. The ordinate represents the number of rows in the microblock. In order to fully test the supported data types, each column of the microblock has a different data type.
RAW
only optimized data with storage types of IntSC/UIntSC, only these data types were tested.
INTER_DIFF
only optimized for reading the entire microblock and taking out the min.
Unit test
Value
the speedup ratio is calculated from the average time of 100 runs for all columns.
Whole/Part
whole indicates that the whole microblock needs to be read, and part indicates that only part of the microblock within the row_cap range needs to be read.
RAW
DICT
RLE
CONST
INTER_DIFF
Data value examples of values used for different encodings.
Data value example for RAW/DICT/RLE/INTER_DIFF
Data value example for CONST
Mysql test
Experimental environment
Since the optimization object is for the encoded data, all experiments are run after manually triggering the MAJOR FREEZE.
For hot and cold queries, the experimental results are the average time of multiple hot queries, which is the result of effectively using the cache.
Data value example for mysql test
Data value for DICT
Data value for RLE
Data value for CONST
Data value for INTER_DIFF
Data value for RAW
Result of mysql test
scan whole table
SQL:SELECT min(value) FROM table;
scan part table with different filter rate
SQL:select min(v_num) from const where id <60000;