Skip to content

Commit

Permalink
tmp save
Browse files Browse the repository at this point in the history
Signed-off-by: guo-shaoge <shaoge1994@163.com>
  • Loading branch information
guo-shaoge committed Apr 23, 2024
1 parent 14a1278 commit 60bd8ea
Show file tree
Hide file tree
Showing 14 changed files with 586 additions and 79 deletions.
2 changes: 1 addition & 1 deletion dbms/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ if (ENABLE_TESTS)
)
target_include_directories(bench_dbms BEFORE PRIVATE ${SPARCEHASH_INCLUDE_DIR} ${benchmark_SOURCE_DIR}/include)
target_compile_definitions(bench_dbms PUBLIC DBMS_PUBLIC_GTEST)
target_link_libraries(bench_dbms gtest dbms test_util_bench_main benchmark tiflash_functions server_for_test delta_merge kvstore)
target_link_libraries(bench_dbms gtest dbms test_util_bench_main benchmark tiflash_functions server_for_test delta_merge kvstore tiflash_aggregate_functions)

add_check(bench_dbms)
endif ()
Expand Down
19 changes: 16 additions & 3 deletions dbms/src/Flash/Coprocessor/AggregationInterpreterHelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,21 +76,33 @@ bool isGroupByCollationSensitive(const Context & context)
return context.getSettingsRef().group_by_collation_sensitive || context.getDAGContext()->isMPPTask();
}

Aggregator::Params buildParams(
std::shared_ptr<Aggregator::Params> buildParams(
const Context & context,
const Block & before_agg_header,
size_t before_agg_streams_size,
size_t agg_streams_size,
const Names & key_names,
const std::unordered_map<String, String> & key_from_agg_func,
const TiDB::TiDBCollators & collators,
const AggregateDescriptions & aggregate_descriptions,
bool is_final_agg,
const SpillConfig & spill_config)
{
ColumnNumbers keys;
keys.resize(key_names.size());
size_t normal_key_idx = 0;
size_t agg_func_as_key_idx = key_from_agg_func.size();
for (const auto & name : key_names)
{
keys.push_back(before_agg_header.getPositionByName(name));
auto col_idx = before_agg_header.getPositionByName(name);
if (key_from_agg_func.find(name) == key_from_agg_func.end())
{
keys[normal_key_idx++] = col_idx;
}
else
{
keys[agg_func_as_key_idx++] = col_idx;
}
}

const Settings & settings = context.getSettingsRef();
Expand All @@ -101,9 +113,10 @@ Aggregator::Params buildParams(

bool has_collator = std::any_of(begin(collators), end(collators), [](const auto & p) { return p != nullptr; });

return Aggregator::Params(
return std::make_shared<Aggregator::Params>(
before_agg_header,
keys,
normal_key_idx,
aggregate_descriptions,
/// do not use the average value for key count threshold, because for a random distributed data, the key count
/// in every threads should almost be the same
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Flash/Coprocessor/AggregationInterpreterHelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,13 @@ bool isFinalAgg(const tipb::Aggregation & aggregation);

bool isGroupByCollationSensitive(const Context & context);

Aggregator::Params buildParams(
std::shared_ptr<Aggregator::Params> buildParams(
const Context & context,
const Block & before_agg_header,
size_t before_agg_streams_size,
size_t agg_streams_size,
const Names & key_names,
const std::unordered_map<String, String> & key_from_agg_func,
const TiDB::TiDBCollators & collators,
const AggregateDescriptions & aggregate_descriptions,
bool is_final_agg,
Expand Down
59 changes: 46 additions & 13 deletions dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -555,13 +555,26 @@ void DAGExpressionAnalyzer::buildCommonAggFunc(
context);
}

std::pair<String, DataTypePtr> findFirstRow(const AggregateDescriptions & aggregate_descriptions, const String & arg_name)
{
for (const auto & desc : aggregate_descriptions)
{
if (desc.function->getName() == "first_row" &&
desc.argument_names.size() == 1 &&
desc.argument_names[0] == arg_name)
return std::make_pair(desc.column_name, desc.function->getReturnType());
}
return std::make_pair("", nullptr);
}

void DAGExpressionAnalyzer::buildAggGroupBy(
const google::protobuf::RepeatedPtrField<tipb::Expr> & group_by,
const ExpressionActionsPtr & actions,
AggregateDescriptions & aggregate_descriptions,
NamesAndTypes & aggregated_columns,
Names & aggregation_keys,
std::unordered_set<String> & agg_key_set,
std::unordered_map<String, String> & key_from_agg_func,
bool group_by_collation_sensitive,
TiDB::TiDBCollators & collators)
{
Expand Down Expand Up @@ -591,19 +604,36 @@ void DAGExpressionAnalyzer::buildAggGroupBy(
collators.push_back(collator);
if (collator != nullptr)
{
/// if the column is a string with collation info, the `sort_key` of the column is used during
/// aggregation, but we can not reconstruct the origin column by `sort_key`, so add an extra
/// extra aggregation function any(group_by_column) here as the output of the group by column
TiDB::TiDBCollators arg_collators{collator};
appendAggDescription(
{name},
{type},
arg_collators,
"any",
aggregate_descriptions,
aggregated_columns,
false,
context);
// todo arg_collators
auto [first_row_name, first_row_type] = findFirstRow(aggregate_descriptions, name);
String agg_func_name = first_row_name;
if (!first_row_name.empty())
{
// todo need or not?
aggregated_columns.emplace_back(first_row_name, first_row_type);
}
else
{
/// if the column is a string with collation info, the `sort_key` of the column is used during
/// aggregation, but we can not reconstruct the origin column by `sort_key`, so add an extra
/// extra aggregation function any(group_by_column) here as the output of the group by column
TiDB::TiDBCollators arg_collators{collator};
appendAggDescription(
{name},
{type},
arg_collators,
"any",
aggregate_descriptions,
aggregated_columns,
false,
context);
agg_func_name = aggregate_descriptions.back().column_name;
}
auto [iter, inserted] = key_from_agg_func.insert({name, agg_func_name});
if unlikely (!inserted)
{
throw Exception("unexpected already exists agg key: {}", name);
}
}
else
{
Expand Down Expand Up @@ -666,6 +696,8 @@ std::tuple<Names, TiDB::TiDBCollators, AggregateDescriptions, ExpressionActionsP
Names aggregation_keys;
TiDB::TiDBCollators collators;
std::unordered_set<String> agg_key_set;
// todo check analyzeExpressions DAGQueryBlockInterpreter.cpp
std::unordered_map<String, String> key_from_agg_func;
buildAggFuncs(agg, step.actions, aggregate_descriptions, aggregated_columns);
buildAggGroupBy(
agg.group_by(),
Expand All @@ -674,6 +706,7 @@ std::tuple<Names, TiDB::TiDBCollators, AggregateDescriptions, ExpressionActionsP
aggregated_columns,
aggregation_keys,
agg_key_set,
key_from_agg_func,
group_by_collation_sensitive,
collators);
// set required output for agg funcs's arguments and group by keys.
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ class DAGExpressionAnalyzer : private boost::noncopyable
NamesAndTypes & aggregated_columns,
Names & aggregation_keys,
std::unordered_set<String> & agg_key_set,
std::unordered_map<String, String> & key_from_agg_func,
bool group_by_collation_sensitive,
TiDB::TiDBCollators & collators);

Expand Down
5 changes: 4 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -512,12 +512,15 @@ void DAGQueryBlockInterpreter::executeAggregation(
context.getFileProvider(),
settings.max_threads,
settings.max_block_size);
auto params = AggregationInterpreterHelper::buildParams(
// todo: finish this
std::unordered_map<String, String> key_from_agg_func;
auto params = *AggregationInterpreterHelper::buildParams(
context,
before_agg_header,
pipeline.streams.size(),
enable_fine_grained_shuffle ? pipeline.streams.size() : 1,
key_names,
key_from_agg_func,
collators,
aggregate_descriptions,
is_final_agg,
Expand Down
9 changes: 7 additions & 2 deletions dbms/src/Flash/Planner/Plans/PhysicalAggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ PhysicalPlanNodePtr PhysicalAggregation::build(
NamesAndTypes aggregated_columns;
AggregateDescriptions aggregate_descriptions;
Names aggregation_keys;
std::unordered_map<String, String> key_from_agg_func;
TiDB::TiDBCollators collators;
{
std::unordered_set<String> agg_key_set;
Expand All @@ -65,6 +66,7 @@ PhysicalPlanNodePtr PhysicalAggregation::build(
aggregated_columns,
aggregation_keys,
agg_key_set,
key_from_agg_func,
AggregationInterpreterHelper::isGroupByCollationSensitive(context),
collators);
}
Expand All @@ -83,6 +85,7 @@ PhysicalPlanNodePtr PhysicalAggregation::build(
child,
before_agg_actions,
aggregation_keys,
key_from_agg_func,
collators,
AggregationInterpreterHelper::isFinalAgg(aggregation),
aggregate_descriptions,
Expand All @@ -107,12 +110,13 @@ void PhysicalAggregation::buildBlockInputStreamImpl(DAGPipeline & pipeline, Cont
context.getFileProvider(),
context.getSettingsRef().max_threads,
context.getSettingsRef().max_block_size);
auto params = AggregationInterpreterHelper::buildParams(
auto params = *AggregationInterpreterHelper::buildParams(
context,
before_agg_header,
pipeline.streams.size(),
fine_grained_shuffle.enable() ? pipeline.streams.size() : 1,
aggregation_keys,
key_from_agg_func,
aggregation_collators,
aggregate_descriptions,
is_final_agg,
Expand Down Expand Up @@ -223,12 +227,13 @@ void PhysicalAggregation::buildPipelineExecGroupImpl(
context.getFileProvider(),
context.getSettingsRef().max_threads,
context.getSettingsRef().max_block_size);
auto params = AggregationInterpreterHelper::buildParams(
auto params = *AggregationInterpreterHelper::buildParams(
context,
before_agg_header,
concurrency,
concurrency,
aggregation_keys,
key_from_agg_func,
aggregation_collators,
aggregate_descriptions,
is_final_agg,
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Flash/Planner/Plans/PhysicalAggregation.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,15 @@ class PhysicalAggregation : public PhysicalUnary
const PhysicalPlanNodePtr & child_,
const ExpressionActionsPtr & before_agg_actions_,
const Names & aggregation_keys_,
const std::unordered_map<String, String> & key_from_agg_func_,
const TiDB::TiDBCollators & aggregation_collators_,
bool is_final_agg_,
const AggregateDescriptions & aggregate_descriptions_,
const ExpressionActionsPtr & expr_after_agg_)
: PhysicalUnary(executor_id_, PlanType::Aggregation, schema_, fine_grained_shuffle_, req_id, child_)
, before_agg_actions(before_agg_actions_)
, aggregation_keys(aggregation_keys_)
, key_from_agg_func(key_from_agg_func_)
, aggregation_collators(aggregation_collators_)
, is_final_agg(is_final_agg_)
, aggregate_descriptions(aggregate_descriptions_)
Expand All @@ -71,6 +73,7 @@ class PhysicalAggregation : public PhysicalUnary
private:
ExpressionActionsPtr before_agg_actions;
Names aggregation_keys;
std::unordered_map<String, String> key_from_agg_func;
TiDB::TiDBCollators aggregation_collators;
bool is_final_agg;
AggregateDescriptions aggregate_descriptions;
Expand Down
6 changes: 5 additions & 1 deletion dbms/src/Flash/Planner/Plans/PhysicalAggregationBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,16 @@ void PhysicalAggregationBuild::buildPipelineExecGroupImpl(
context.getFileProvider(),
context.getSettingsRef().max_threads,
context.getSettingsRef().max_block_size);
auto params = AggregationInterpreterHelper::buildParams(
// todo finish this
std::unordered_map<String, String> key_from_agg_func;
// todo maybe keep use pointer?
auto params = *AggregationInterpreterHelper::buildParams(
context,
before_agg_header,
concurrency,
1,
aggregation_keys,
key_from_agg_func,
aggregation_collators,
aggregate_descriptions,
is_final_agg,
Expand Down

0 comments on commit 60bd8ea

Please sign in to comment.