Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize unnecessary column copy for HashAgg #8985

Merged
merged 60 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
60bd8ea
tmp save
guo-shaoge Apr 22, 2024
a857c53
refine for spill(enable_skip_serialize_key arg)
guo-shaoge Apr 25, 2024
ee3d867
Merge branch 'master' of github.com:pingcap/tiflash into optimize_dup…
guo-shaoge Apr 25, 2024
fac85e5
fmt
guo-shaoge Apr 26, 2024
f78ddbf
map -> set
guo-shaoge Apr 26, 2024
b1c98fa
tidy
guo-shaoge Apr 26, 2024
2f6eb26
fix
guo-shaoge Apr 28, 2024
b4d72ea
fix
guo-shaoge Apr 28, 2024
5b7a84c
Merge branch 'master' of github.com:pingcap/tiflash into optimize_dup…
guo-shaoge Apr 28, 2024
bd66dec
use unordered_map as key_from_agg_func
guo-shaoge Apr 28, 2024
e78cbce
reorder collator && fix insert key helper crash
guo-shaoge Apr 28, 2024
6ef75b6
disable opt for spill process
guo-shaoge Apr 28, 2024
d009d19
fix prepareBlockAndFill for spill
guo-shaoge Apr 29, 2024
881141f
fmt
guo-shaoge Apr 29, 2024
9914d0b
fix case
guo-shaoge Apr 29, 2024
76e75fc
fix case
guo-shaoge Apr 29, 2024
f0c7302
tidy
guo-shaoge Apr 29, 2024
d9e8880
refine
guo-shaoge Apr 29, 2024
addd8d3
fix case
guo-shaoge Apr 29, 2024
6751837
fix case
guo-shaoge Apr 29, 2024
ffd5c4e
update comment
guo-shaoge Apr 30, 2024
cbf8de5
agg_func_ref_key optimization; gtest
guo-shaoge May 6, 2024
c0cb939
append copy column action after agg
guo-shaoge May 7, 2024
ede3a80
case && fmt
guo-shaoge May 7, 2024
b62ed01
Merge branch 'master' of github.com:pingcap/tiflash into optimize_dup…
guo-shaoge May 7, 2024
0d0d74b
fix case
guo-shaoge May 7, 2024
65e33d5
fix case by integrate enable_convert_key_optimization into fianl flag
guo-shaoge May 8, 2024
b0b1df7
refine
guo-shaoge May 8, 2024
7b1c0b8
Merge branch 'master' of github.com:pingcap/tiflash into optimize_dup…
guo-shaoge May 8, 2024
2240c9b
fmt
guo-shaoge May 8, 2024
df112a8
add check for shuffle key not compatible with key_ref_agg_func
guo-shaoge May 22, 2024
dcd81b8
refine gtest
guo-shaoge May 22, 2024
f1dd218
refine comments
guo-shaoge May 22, 2024
69e3509
Merge branch 'master' of github.com:pingcap/tiflash into optimize_dup…
guo-shaoge May 22, 2024
c312346
lint
guo-shaoge May 22, 2024
ea15bd7
dbg
guo-shaoge May 22, 2024
1d66cdb
fix case
guo-shaoge May 23, 2024
0a51cff
fmt
guo-shaoge May 23, 2024
8f4a577
more check
guo-shaoge May 23, 2024
aef88e2
use first_row instead of any
guo-shaoge May 23, 2024
f233a5c
del useless Params ctor
guo-shaoge May 23, 2024
e8ae06e
fix InterpreterSelectQuery
guo-shaoge May 23, 2024
3228bf6
refine aggregate_output_columns logic
guo-shaoge May 24, 2024
f6d29e8
Merge branch 'master' of github.com:pingcap/tiflash into optimize_dup…
guo-shaoge May 24, 2024
827533d
fix case
guo-shaoge May 24, 2024
4834e00
Revert "fix case"
guo-shaoge May 27, 2024
93cb868
Revert "refine aggregate_output_columns logic"
guo-shaoge May 27, 2024
a4cb243
key name
guo-shaoge May 27, 2024
08595ce
remove eliminate any
guo-shaoge May 27, 2024
29f993c
comment/ctor/operator
guo-shaoge May 27, 2024
1fce8cc
comment
guo-shaoge May 27, 2024
435f850
Merge branch 'master' of github.com:pingcap/tiflash into optimize_dup…
guo-shaoge May 27, 2024
478bc9f
case
guo-shaoge May 27, 2024
df059e0
unique_ptr for buildParams()
guo-shaoge May 28, 2024
be21ad4
check first_row_type
guo-shaoge May 28, 2024
3c37638
refine findFirstRow
guo-shaoge May 28, 2024
240f816
fix wrong if
guo-shaoge May 29, 2024
7d9e7e3
Merge branch 'master' into optimize_duplicated_agg_func
guo-shaoge May 29, 2024
53112b1
comment && collators using map instead of vector
guo-shaoge May 29, 2024
c517463
Merge branch 'master' into optimize_duplicated_agg_func
ti-chi-bot[bot] May 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion dbms/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ if (ENABLE_TESTS)
)
target_include_directories(bench_dbms BEFORE PRIVATE ${SPARCEHASH_INCLUDE_DIR} ${benchmark_SOURCE_DIR}/include)
target_compile_definitions(bench_dbms PUBLIC DBMS_PUBLIC_GTEST)
target_link_libraries(bench_dbms gtest dbms test_util_bench_main benchmark tiflash_functions server_for_test delta_merge kvstore)
target_link_libraries(bench_dbms gtest dbms test_util_bench_main benchmark tiflash_functions server_for_test delta_merge kvstore tiflash_aggregate_functions)

add_check(bench_dbms)
endif ()
Expand Down
19 changes: 16 additions & 3 deletions dbms/src/Flash/Coprocessor/AggregationInterpreterHelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,21 +76,33 @@ bool isGroupByCollationSensitive(const Context & context)
return context.getSettingsRef().group_by_collation_sensitive || context.getDAGContext()->isMPPTask();
}

Aggregator::Params buildParams(
std::shared_ptr<Aggregator::Params> buildParams(
guo-shaoge marked this conversation as resolved.
Show resolved Hide resolved
const Context & context,
const Block & before_agg_header,
size_t before_agg_streams_size,
size_t agg_streams_size,
const Names & key_names,
const std::unordered_map<String, String> & key_from_agg_func,
guo-shaoge marked this conversation as resolved.
Show resolved Hide resolved
const TiDB::TiDBCollators & collators,
const AggregateDescriptions & aggregate_descriptions,
bool is_final_agg,
const SpillConfig & spill_config)
{
ColumnNumbers keys;
keys.resize(key_names.size());
size_t normal_key_idx = 0;
size_t agg_func_as_key_idx = key_from_agg_func.size();
for (const auto & name : key_names)
{
keys.push_back(before_agg_header.getPositionByName(name));
auto col_idx = before_agg_header.getPositionByName(name);
if (key_from_agg_func.find(name) == key_from_agg_func.end())
{
keys[normal_key_idx++] = col_idx;
}
else
{
keys[agg_func_as_key_idx++] = col_idx;
}
}

const Settings & settings = context.getSettingsRef();
Expand All @@ -101,9 +113,10 @@ Aggregator::Params buildParams(

bool has_collator = std::any_of(begin(collators), end(collators), [](const auto & p) { return p != nullptr; });

return Aggregator::Params(
return std::make_shared<Aggregator::Params>(
guo-shaoge marked this conversation as resolved.
Show resolved Hide resolved
before_agg_header,
keys,
normal_key_idx,
aggregate_descriptions,
/// do not use the average value for key count threshold, because for a random distributed data, the key count
/// in every threads should almost be the same
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Flash/Coprocessor/AggregationInterpreterHelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,13 @@ bool isFinalAgg(const tipb::Aggregation & aggregation);

bool isGroupByCollationSensitive(const Context & context);

Aggregator::Params buildParams(
std::shared_ptr<Aggregator::Params> buildParams(
zanmato1984 marked this conversation as resolved.
Show resolved Hide resolved
guo-shaoge marked this conversation as resolved.
Show resolved Hide resolved
const Context & context,
const Block & before_agg_header,
size_t before_agg_streams_size,
size_t agg_streams_size,
const Names & key_names,
const std::unordered_map<String, String> & key_from_agg_func,
const TiDB::TiDBCollators & collators,
const AggregateDescriptions & aggregate_descriptions,
bool is_final_agg,
Expand Down
59 changes: 46 additions & 13 deletions dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -555,13 +555,26 @@ void DAGExpressionAnalyzer::buildCommonAggFunc(
context);
}

std::pair<String, DataTypePtr> findFirstRow(const AggregateDescriptions & aggregate_descriptions, const String & arg_name)
{
for (const auto & desc : aggregate_descriptions)
{
if (desc.function->getName() == "first_row" &&
desc.argument_names.size() == 1 &&
desc.argument_names[0] == arg_name)
return std::make_pair(desc.column_name, desc.function->getReturnType());
}
return std::make_pair("", nullptr);
}

void DAGExpressionAnalyzer::buildAggGroupBy(
const google::protobuf::RepeatedPtrField<tipb::Expr> & group_by,
const ExpressionActionsPtr & actions,
AggregateDescriptions & aggregate_descriptions,
NamesAndTypes & aggregated_columns,
Names & aggregation_keys,
std::unordered_set<String> & agg_key_set,
std::unordered_map<String, String> & key_from_agg_func,
bool group_by_collation_sensitive,
TiDB::TiDBCollators & collators)
{
Expand Down Expand Up @@ -591,19 +604,36 @@ void DAGExpressionAnalyzer::buildAggGroupBy(
collators.push_back(collator);
if (collator != nullptr)
{
/// if the column is a string with collation info, the `sort_key` of the column is used during
/// aggregation, but we can not reconstruct the origin column by `sort_key`, so add an extra
/// extra aggregation function any(group_by_column) here as the output of the group by column
TiDB::TiDBCollators arg_collators{collator};
appendAggDescription(
{name},
{type},
arg_collators,
"any",
aggregate_descriptions,
aggregated_columns,
false,
context);
// todo arg_collators
auto [first_row_name, first_row_type] = findFirstRow(aggregate_descriptions, name);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is first_row_type necessary here? Are the type at L600 and first_row_type always the same? If so, can we just use type?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think they are same. And add a check to make sure.

String agg_func_name = first_row_name;
if (!first_row_name.empty())
{
// todo need or not?
aggregated_columns.emplace_back(first_row_name, first_row_type);
}
else
{
/// if the column is a string with collation info, the `sort_key` of the column is used during
/// aggregation, but we can not reconstruct the origin column by `sort_key`, so add an extra
/// extra aggregation function any(group_by_column) here as the output of the group by column
guo-shaoge marked this conversation as resolved.
Show resolved Hide resolved
TiDB::TiDBCollators arg_collators{collator};
appendAggDescription(
{name},
{type},
arg_collators,
"any",
aggregate_descriptions,
aggregated_columns,
false,
context);
agg_func_name = aggregate_descriptions.back().column_name;
}
auto [iter, inserted] = key_from_agg_func.insert({name, agg_func_name});
if unlikely (!inserted)
{
throw Exception("unexpected already exists agg key: {}", name);
}
}
else
{
Expand Down Expand Up @@ -666,6 +696,8 @@ std::tuple<Names, TiDB::TiDBCollators, AggregateDescriptions, ExpressionActionsP
Names aggregation_keys;
TiDB::TiDBCollators collators;
std::unordered_set<String> agg_key_set;
// todo check analyzeExpressions DAGQueryBlockInterpreter.cpp
std::unordered_map<String, String> key_from_agg_func;
buildAggFuncs(agg, step.actions, aggregate_descriptions, aggregated_columns);
buildAggGroupBy(
agg.group_by(),
Expand All @@ -674,6 +706,7 @@ std::tuple<Names, TiDB::TiDBCollators, AggregateDescriptions, ExpressionActionsP
aggregated_columns,
aggregation_keys,
agg_key_set,
key_from_agg_func,
group_by_collation_sensitive,
collators);
// set required output for agg funcs's arguments and group by keys.
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ class DAGExpressionAnalyzer : private boost::noncopyable
NamesAndTypes & aggregated_columns,
Names & aggregation_keys,
std::unordered_set<String> & agg_key_set,
std::unordered_map<String, String> & key_from_agg_func,
bool group_by_collation_sensitive,
TiDB::TiDBCollators & collators);

Expand Down
5 changes: 4 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -512,12 +512,15 @@ void DAGQueryBlockInterpreter::executeAggregation(
context.getFileProvider(),
settings.max_threads,
settings.max_block_size);
auto params = AggregationInterpreterHelper::buildParams(
// todo: finish this
std::unordered_map<String, String> key_from_agg_func;
auto params = *AggregationInterpreterHelper::buildParams(
context,
before_agg_header,
pipeline.streams.size(),
enable_fine_grained_shuffle ? pipeline.streams.size() : 1,
key_names,
key_from_agg_func,
collators,
aggregate_descriptions,
is_final_agg,
Expand Down
9 changes: 7 additions & 2 deletions dbms/src/Flash/Planner/Plans/PhysicalAggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ PhysicalPlanNodePtr PhysicalAggregation::build(
NamesAndTypes aggregated_columns;
AggregateDescriptions aggregate_descriptions;
Names aggregation_keys;
std::unordered_map<String, String> key_from_agg_func;
TiDB::TiDBCollators collators;
{
std::unordered_set<String> agg_key_set;
Expand All @@ -65,6 +66,7 @@ PhysicalPlanNodePtr PhysicalAggregation::build(
aggregated_columns,
aggregation_keys,
agg_key_set,
key_from_agg_func,
AggregationInterpreterHelper::isGroupByCollationSensitive(context),
collators);
}
Expand All @@ -83,6 +85,7 @@ PhysicalPlanNodePtr PhysicalAggregation::build(
child,
before_agg_actions,
aggregation_keys,
key_from_agg_func,
collators,
AggregationInterpreterHelper::isFinalAgg(aggregation),
aggregate_descriptions,
Expand All @@ -107,12 +110,13 @@ void PhysicalAggregation::buildBlockInputStreamImpl(DAGPipeline & pipeline, Cont
context.getFileProvider(),
context.getSettingsRef().max_threads,
context.getSettingsRef().max_block_size);
auto params = AggregationInterpreterHelper::buildParams(
auto params = *AggregationInterpreterHelper::buildParams(
context,
before_agg_header,
pipeline.streams.size(),
fine_grained_shuffle.enable() ? pipeline.streams.size() : 1,
aggregation_keys,
key_from_agg_func,
aggregation_collators,
aggregate_descriptions,
is_final_agg,
Expand Down Expand Up @@ -223,12 +227,13 @@ void PhysicalAggregation::buildPipelineExecGroupImpl(
context.getFileProvider(),
context.getSettingsRef().max_threads,
context.getSettingsRef().max_block_size);
auto params = AggregationInterpreterHelper::buildParams(
auto params = *AggregationInterpreterHelper::buildParams(
context,
before_agg_header,
concurrency,
concurrency,
aggregation_keys,
key_from_agg_func,
aggregation_collators,
aggregate_descriptions,
is_final_agg,
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Flash/Planner/Plans/PhysicalAggregation.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,15 @@ class PhysicalAggregation : public PhysicalUnary
const PhysicalPlanNodePtr & child_,
const ExpressionActionsPtr & before_agg_actions_,
const Names & aggregation_keys_,
const std::unordered_map<String, String> & key_from_agg_func_,
const TiDB::TiDBCollators & aggregation_collators_,
bool is_final_agg_,
const AggregateDescriptions & aggregate_descriptions_,
const ExpressionActionsPtr & expr_after_agg_)
: PhysicalUnary(executor_id_, PlanType::Aggregation, schema_, fine_grained_shuffle_, req_id, child_)
, before_agg_actions(before_agg_actions_)
, aggregation_keys(aggregation_keys_)
, key_from_agg_func(key_from_agg_func_)
, aggregation_collators(aggregation_collators_)
, is_final_agg(is_final_agg_)
, aggregate_descriptions(aggregate_descriptions_)
Expand All @@ -71,6 +73,7 @@ class PhysicalAggregation : public PhysicalUnary
private:
ExpressionActionsPtr before_agg_actions;
Names aggregation_keys;
std::unordered_map<String, String> key_from_agg_func;
TiDB::TiDBCollators aggregation_collators;
bool is_final_agg;
AggregateDescriptions aggregate_descriptions;
Expand Down
6 changes: 5 additions & 1 deletion dbms/src/Flash/Planner/Plans/PhysicalAggregationBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,16 @@ void PhysicalAggregationBuild::buildPipelineExecGroupImpl(
context.getFileProvider(),
context.getSettingsRef().max_threads,
context.getSettingsRef().max_block_size);
auto params = AggregationInterpreterHelper::buildParams(
// todo finish this
std::unordered_map<String, String> key_from_agg_func;
// todo maybe keep use pointer?
auto params = *AggregationInterpreterHelper::buildParams(
context,
before_agg_header,
concurrency,
1,
aggregation_keys,
key_from_agg_func,
aggregation_collators,
aggregate_descriptions,
is_final_agg,
Expand Down