Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cubesql): Flatten list expression rewrites to improve performance #8248

Merged
merged 1 commit into from
May 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/cubejs-backend-native/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion rust/cubesql/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion rust/cubesql/cubesql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ nanoid = "0.3.0"
tokio-util = { version = "0.6.2", features=["compat"] }
comfy-table = "7.1.0"
bitflags = "1.3.2"
egg = {rev = "7e60716cc757448bd672f2dc28ef9a0d074dce71", git = "https://github.com/egraphs-good/egg.git"}
egg = { rev = "bdf05cee0a145a524fe8c6c33aa577ac50ace7c9", git = "https://github.com/cube-js/egg.git" }
paste = "1.0.6"
csv = "1.1.6"
tracing = { version = "0.1.40", features = ["async-await"] }
Expand Down
91 changes: 66 additions & 25 deletions rust/cubesql/cubesql/src/compile/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ impl QueryPlanner {
stmt: &ast::Statement,
qtrace: &mut Option<Qtrace>,
span_id: Option<Arc<SpanId>>,
flat_list: bool,
) -> CompilationResult<QueryPlan> {
let planning_start = SystemTime::now();
if let Some(span_id) = span_id.as_ref() {
Expand All @@ -134,7 +135,7 @@ impl QueryPlanner {
}
}
let result = self
.create_df_logical_plan(stmt.clone(), qtrace, span_id.clone())
.create_df_logical_plan(stmt.clone(), qtrace, span_id.clone(), flat_list)
.await?;

if let Some(span_id) = span_id.as_ref() {
Expand Down Expand Up @@ -165,16 +166,20 @@ impl QueryPlanner {
stmt: &ast::Statement,
qtrace: &mut Option<Qtrace>,
span_id: Option<Arc<SpanId>>,
flat_list: bool,
) -> CompilationResult<QueryPlan> {
let plan = match (stmt, &self.state.protocol) {
(ast::Statement::Query(q), _) => {
if let ast::SetExpr::Select(select) = &q.body {
if let Some(into) = &select.into {
return self.select_into_to_plan(into, q, qtrace, span_id).await;
return self
.select_into_to_plan(into, q, qtrace, span_id, flat_list)
.await;
}
}

self.select_to_plan(stmt, qtrace, span_id.clone()).await
self.select_to_plan(stmt, qtrace, span_id.clone(), flat_list)
.await
}
(ast::Statement::SetTransaction { .. }, _) => Ok(QueryPlan::MetaTabular(
StatusFlags::empty(),
Expand Down Expand Up @@ -204,10 +209,12 @@ impl QueryPlanner {
self.set_variable_to_plan(&key_values).await
}
(ast::Statement::ShowVariable { variable }, _) => {
self.show_variable_to_plan(variable, span_id.clone()).await
self.show_variable_to_plan(variable, span_id.clone(), flat_list)
.await
}
(ast::Statement::ShowVariables { filter }, DatabaseProtocol::MySQL) => {
self.show_variables_to_plan(&filter, span_id.clone()).await
self.show_variables_to_plan(&filter, span_id.clone(), flat_list)
.await
}
(ast::Statement::ShowCreate { obj_name, obj_type }, DatabaseProtocol::MySQL) => {
self.show_create_to_plan(&obj_name, &obj_type)
Expand All @@ -221,8 +228,15 @@ impl QueryPlanner {
},
DatabaseProtocol::MySQL,
) => {
self.show_columns_to_plan(*extended, *full, &filter, &table_name, span_id.clone())
.await
self.show_columns_to_plan(
*extended,
*full,
&filter,
&table_name,
span_id.clone(),
flat_list,
)
.await
}
(
ast::Statement::ShowTables {
Expand All @@ -233,14 +247,22 @@ impl QueryPlanner {
},
DatabaseProtocol::MySQL,
) => {
self.show_tables_to_plan(*extended, *full, &filter, &db_name, span_id.clone())
.await
self.show_tables_to_plan(
*extended,
*full,
&filter,
&db_name,
span_id.clone(),
flat_list,
)
.await
}
(ast::Statement::ShowCollation { filter }, DatabaseProtocol::MySQL) => {
self.show_collation_to_plan(&filter, span_id.clone()).await
self.show_collation_to_plan(&filter, span_id.clone(), flat_list)
.await
}
(ast::Statement::ExplainTable { table_name, .. }, DatabaseProtocol::MySQL) => {
self.explain_table_to_plan(&table_name, span_id.clone())
self.explain_table_to_plan(&table_name, span_id.clone(), flat_list)
.await
}
(
Expand All @@ -251,7 +273,10 @@ impl QueryPlanner {
..
},
_,
) => self.explain_to_plan(&statement, *verbose, *analyze).await,
) => {
self.explain_to_plan(&statement, *verbose, *analyze, flat_list)
.await
}
(ast::Statement::Use { db_name }, DatabaseProtocol::MySQL) => {
self.use_to_plan(&db_name)
}
Expand Down Expand Up @@ -304,7 +329,7 @@ impl QueryPlanner {
&& *temporary =>
{
let stmt = ast::Statement::Query(query.clone());
self.create_table_to_plan(name, &stmt, qtrace, span_id.clone())
self.create_table_to_plan(name, &stmt, qtrace, span_id.clone(), flat_list)
.await
}
(
Expand Down Expand Up @@ -339,6 +364,7 @@ impl QueryPlanner {
&self,
variable: &Vec<ast::Ident>,
span_id: Option<Arc<SpanId>>,
flat_list: bool,
) -> CompilationResult<QueryPlan> {
let name = variable.to_vec()[0].value.clone();
if self.state.protocol == DatabaseProtocol::PostgreSQL {
Expand Down Expand Up @@ -366,7 +392,7 @@ impl QueryPlanner {
)?
};

self.create_df_logical_plan(stmt, &mut None, span_id.clone())
self.create_df_logical_plan(stmt, &mut None, span_id.clone(), flat_list)
.await
} else if name.eq_ignore_ascii_case("databases") || name.eq_ignore_ascii_case("schemas") {
Ok(QueryPlan::MetaTabular(
Expand Down Expand Up @@ -399,7 +425,7 @@ impl QueryPlanner {
&mut None,
)?;

self.create_df_logical_plan(stmt, &mut None, span_id.clone())
self.create_df_logical_plan(stmt, &mut None, span_id.clone(), flat_list)
.await
} else if name.eq_ignore_ascii_case("warnings") {
Ok(QueryPlan::MetaTabular(
Expand Down Expand Up @@ -432,6 +458,7 @@ impl QueryPlanner {
},
&mut None,
span_id.clone(),
flat_list,
)
.await
}
Expand All @@ -441,6 +468,7 @@ impl QueryPlanner {
&self,
filter: &Option<ast::ShowStatementFilter>,
span_id: Option<Arc<SpanId>>,
flat_list: bool,
) -> Result<QueryPlan, CompilationError> {
let filter = match filter {
Some(stmt @ ast::ShowStatementFilter::Like(_)) => {
Expand All @@ -467,7 +495,7 @@ impl QueryPlanner {
&mut None,
)?;

self.create_df_logical_plan(stmt, &mut None, span_id.clone())
self.create_df_logical_plan(stmt, &mut None, span_id.clone(), flat_list)
.await
}

Expand Down Expand Up @@ -539,6 +567,7 @@ impl QueryPlanner {
filter: &Option<ast::ShowStatementFilter>,
table_name: &ast::ObjectName,
span_id: Option<Arc<SpanId>>,
flat_list: bool,
) -> Result<QueryPlan, CompilationError> {
let extended = match extended {
false => "".to_string(),
Expand Down Expand Up @@ -600,7 +629,7 @@ impl QueryPlanner {
&mut None,
)?;

self.create_df_logical_plan(stmt, &mut None, span_id.clone())
self.create_df_logical_plan(stmt, &mut None, span_id.clone(), flat_list)
.await
}

Expand All @@ -612,6 +641,7 @@ impl QueryPlanner {
filter: &Option<ast::ShowStatementFilter>,
db_name: &Option<ast::Ident>,
span_id: Option<Arc<SpanId>>,
flat_list: bool,
) -> Result<QueryPlan, CompilationError> {
let db_name = match db_name {
Some(db_name) => db_name.clone(),
Expand Down Expand Up @@ -660,14 +690,15 @@ WHERE `TABLE_SCHEMA` = '{}'",
&mut None,
)?;

self.create_df_logical_plan(stmt, &mut None, span_id.clone())
self.create_df_logical_plan(stmt, &mut None, span_id.clone(), flat_list)
.await
}

async fn show_collation_to_plan(
&self,
filter: &Option<ast::ShowStatementFilter>,
span_id: Option<Arc<SpanId>>,
flat_list: bool,
) -> Result<QueryPlan, CompilationError> {
let filter = match filter {
Some(stmt @ ast::ShowStatementFilter::Like(_)) => {
Expand Down Expand Up @@ -695,17 +726,18 @@ WHERE `TABLE_SCHEMA` = '{}'",
&mut None,
)?;

self.create_df_logical_plan(stmt, &mut None, span_id.clone())
self.create_df_logical_plan(stmt, &mut None, span_id.clone(), flat_list)
.await
}

async fn explain_table_to_plan(
&self,
table_name: &ast::ObjectName,
span_id: Option<Arc<SpanId>>,
flat_list: bool,
) -> Result<QueryPlan, CompilationError> {
// EXPLAIN <table> matches the SHOW COLUMNS output exactly, reuse the plan
self.show_columns_to_plan(false, false, &None, table_name, span_id)
self.show_columns_to_plan(false, false, &None, table_name, span_id, flat_list)
.await
}

Expand All @@ -714,14 +746,17 @@ WHERE `TABLE_SCHEMA` = '{}'",
statement: &Box<ast::Statement>,
verbose: bool,
analyze: bool,
flat_list: bool,
) -> Pin<Box<dyn Future<Output = Result<QueryPlan, CompilationError>> + Send>> {
let self_cloned = self.clone();

let statement = statement.clone();
// This Boxing construct here because of recursive call to self.plan()
Box::pin(async move {
// TODO span_id ?
let plan = self_cloned.plan(&statement, &mut None, None).await?;
let plan = self_cloned
.plan(&statement, &mut None, None, flat_list)
.await?;

match plan {
QueryPlan::MetaOk(_, _) | QueryPlan::MetaTabular(_, _) => Ok(QueryPlan::MetaTabular(
Expand Down Expand Up @@ -995,8 +1030,11 @@ WHERE `TABLE_SCHEMA` = '{}'",
stmt: &ast::Statement,
qtrace: &mut Option<Qtrace>,
span_id: Option<Arc<SpanId>>,
flat_list: bool,
) -> Result<QueryPlan, CompilationError> {
let plan = self.select_to_plan(stmt, qtrace, span_id).await?;
let plan = self
.select_to_plan(stmt, qtrace, span_id, flat_list)
.await?;
let QueryPlan::DataFusionSelect(flags, plan, ctx) = plan else {
return Err(CompilationError::internal(
"unable to build DataFusion plan from Query".to_string(),
Expand Down Expand Up @@ -1024,6 +1062,7 @@ WHERE `TABLE_SCHEMA` = '{}'",
query: &Box<ast::Query>,
qtrace: &mut Option<Qtrace>,
span_id: Option<Arc<SpanId>>,
flat_list: bool,
) -> Result<QueryPlan, CompilationError> {
if !into.temporary || !into.table {
return Err(CompilationError::unsupported(
Expand All @@ -1040,7 +1079,7 @@ WHERE `TABLE_SCHEMA` = '{}'",
));
}
let new_stmt = ast::Statement::Query(new_query);
self.create_table_to_plan(&into.name, &new_stmt, qtrace, span_id)
self.create_table_to_plan(&into.name, &new_stmt, qtrace, span_id, flat_list)
.await
}

Expand Down Expand Up @@ -1240,6 +1279,7 @@ WHERE `TABLE_SCHEMA` = '{}'",
stmt: ast::Statement,
qtrace: &mut Option<Qtrace>,
span_id: Option<Arc<SpanId>>,
flat_list: bool,
) -> CompilationResult<QueryPlan> {
self.reauthenticate_if_needed().await?;

Expand Down Expand Up @@ -1310,7 +1350,7 @@ WHERE `TABLE_SCHEMA` = '{}'",
log::debug!("Initial Plan: {:#?}", optimized_plan);

let cube_ctx = Arc::new(cube_ctx);
let mut converter = LogicalPlanToLanguageConverter::new(cube_ctx.clone());
let mut converter = LogicalPlanToLanguageConverter::new(cube_ctx.clone(), flat_list);
let mut query_params = Some(HashMap::new());
let root = converter
.add_logical_plan_replace_params(
Expand Down Expand Up @@ -1532,7 +1572,8 @@ pub async fn convert_statement_to_cube_query(
}

let planner = QueryPlanner::new(session.state.clone(), meta, session.session_manager.clone());
planner.plan(&stmt, qtrace, span_id).await
let flat_list = session.server.config_obj.push_down_pull_up_split();
planner.plan(&stmt, qtrace, span_id, flat_list).await
}

#[derive(Debug, PartialEq, Serialize)]
Expand Down