Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Account for filters in DecompressChunk row estimates #6563

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
128 changes: 73 additions & 55 deletions tsl/src/nodes/decompress_chunk/decompress_chunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ static CustomPathMethods decompress_chunk_path_methods = {

typedef struct SortInfo
{
List *compressed_pathkeys;
List *required_compressed_pathkeys;
bool needs_sequence_num;
bool can_pushdown_sort; /* sort can be pushed below DecompressChunk */
bool reverse;
Expand Down Expand Up @@ -153,7 +153,7 @@ build_compressed_scan_pathkeys(SortInfo *sort_info, PlannerInfo *root, List *chu
{
Var *var;
int varattno;
List *compressed_pathkeys = NIL;
List *required_compressed_pathkeys = NIL;
PathKey *pk;

/*
Expand Down Expand Up @@ -198,7 +198,7 @@ build_compressed_scan_pathkeys(SortInfo *sort_info, PlannerInfo *root, List *chu
*/
Assert(compressed_em != NULL);

compressed_pathkeys = lappend(compressed_pathkeys, pk);
required_compressed_pathkeys = lappend(required_compressed_pathkeys, pk);

segmentby_columns =
bms_add_member(segmentby_columns, castNode(Var, compressed_em->em_expr)->varattno);
Expand All @@ -210,7 +210,7 @@ build_compressed_scan_pathkeys(SortInfo *sort_info, PlannerInfo *root, List *chu
* asserting here.
*/
Assert(bms_num_members(segmentby_columns) == info->num_segmentby_columns ||
list_length(compressed_pathkeys) == list_length(chunk_pathkeys));
list_length(required_compressed_pathkeys) == list_length(chunk_pathkeys));
}

/*
Expand Down Expand Up @@ -251,9 +251,9 @@ build_compressed_scan_pathkeys(SortInfo *sort_info, PlannerInfo *root, List *chu

pk = make_canonical_pathkey(root, ec, opfamily, strategy, nulls_first);

compressed_pathkeys = lappend(compressed_pathkeys, pk);
required_compressed_pathkeys = lappend(required_compressed_pathkeys, pk);
}
sort_info->compressed_pathkeys = compressed_pathkeys;
sort_info->required_compressed_pathkeys = required_compressed_pathkeys;
}

static DecompressChunkPath *
Expand Down Expand Up @@ -321,15 +321,24 @@ build_compressioninfo(PlannerInfo *root, Hypertable *ht, RelOptInfo *chunk_rel)
* we put cost of 1 tuple of compressed_scan as startup cost
*/
static void
cost_decompress_chunk(Path *path, Path *compressed_path)
cost_decompress_chunk(PlannerInfo *root, Path *path, Path *compressed_path)
{
/* Set the row number estimate. */
if (path->param_info != NULL)
{
path->rows = path->param_info->ppi_rows;
}
else
{
path->rows = path->parent->rows;
}

/* startup_cost is cost before fetching first tuple */
if (compressed_path->rows > 0)
path->startup_cost = compressed_path->total_cost / compressed_path->rows;

/* total_cost is cost for fetching all tuples */
path->total_cost = compressed_path->total_cost + path->rows * cpu_tuple_cost;
path->rows = compressed_path->rows * DECOMPRESS_CHUNK_BATCH_SIZE;
}

/* Smoothstep function S1 (the h01 cubic Hermite spline). */
Expand Down Expand Up @@ -360,7 +369,7 @@ cost_batch_sorted_merge(PlannerInfo *root, CompressionInfo *compression_info,
enable_sort = true;
cost_sort(&sort_path,
root,
dcpath->compressed_pathkeys,
dcpath->required_compressed_pathkeys,
compressed_path->total_cost,
compressed_path->rows,
compressed_path->pathtarget->width,
Expand Down Expand Up @@ -690,7 +699,6 @@ ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *chunk_rel, Hyp
{
RelOptInfo *compressed_rel;
ListCell *lc;
double new_row_estimate;
Index ht_relid = 0;

CompressionInfo *compression_info = build_compressioninfo(root, ht, chunk_rel);
Expand Down Expand Up @@ -724,8 +732,12 @@ ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *chunk_rel, Hyp
chunk_rel,
compressed_rel,
ts_chunk_is_partial(chunk));

set_baserel_size_estimates(root, compressed_rel);
new_row_estimate = compressed_rel->rows * DECOMPRESS_CHUNK_BATCH_SIZE;
const double new_tuples_estimate = compressed_rel->rows * DECOMPRESS_CHUNK_BATCH_SIZE;
const double new_rows_estimate =
new_tuples_estimate *
clauselist_selectivity(root, chunk_rel->baserestrictinfo, 0, JOIN_INNER, NULL);

if (!compression_info->single_chunk)
{
Expand All @@ -734,10 +746,19 @@ ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *chunk_rel, Hyp
Assert(chunk_info->parent_reloid == ht->main_table_relid);
ht_relid = chunk_info->parent_relid;
RelOptInfo *hypertable_rel = root->simple_rel_array[ht_relid];
hypertable_rel->rows += (new_row_estimate - chunk_rel->rows);
hypertable_rel->rows =
clamp_row_est(hypertable_rel->rows + new_rows_estimate - chunk_rel->rows);
hypertable_rel->tuples =
clamp_row_est(hypertable_rel->tuples + new_tuples_estimate - chunk_rel->tuples);
}

chunk_rel->rows = new_row_estimate;
/*
* Note that we can be overwriting the estimate for uncompressed chunk part of a
* partial chunk here, but the paths for the uncompressed part were already
* built, so it is OK.
*/
chunk_rel->tuples = new_tuples_estimate;
chunk_rel->rows = new_rows_estimate;

create_compressed_scan_paths(root, compressed_rel, compression_info, &sort_info);

Expand All @@ -750,7 +771,6 @@ ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *chunk_rel, Hyp
foreach (lc, compressed_rel->pathlist)
{
Path *compressed_path = lfirst(lc);
Path *path;

/*
* We skip any BitmapScan parameterized paths here as supporting
Expand Down Expand Up @@ -830,7 +850,8 @@ ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *chunk_rel, Hyp
continue;
}

path = (Path *) decompress_chunk_path_create(root, compression_info, 0, compressed_path);
Path *chunk_path =
(Path *) decompress_chunk_path_create(root, compression_info, 0, compressed_path);

/*
* Create a path for the batch sorted merge optimization. This optimization performs a
Expand All @@ -846,7 +867,7 @@ ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *chunk_rel, Hyp
MergeBatchResult merge_result = can_batch_sorted_merge(root, compression_info, chunk);
if (merge_result != MERGE_NOT_POSSIBLE)
{
batch_merge_path = copy_decompress_chunk_path((DecompressChunkPath *) path);
batch_merge_path = copy_decompress_chunk_path((DecompressChunkPath *) chunk_path);

batch_merge_path->reverse = (merge_result != SCAN_FORWARD);
batch_merge_path->batch_sorted_merge = true;
Expand All @@ -873,42 +894,38 @@ ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *chunk_rel, Hyp
* between the decompression node and the scan during plan creation */
if (sort_info.can_pushdown_sort)
{
DecompressChunkPath *dcpath = copy_decompress_chunk_path((DecompressChunkPath *) path);
dcpath->reverse = sort_info.reverse;
dcpath->needs_sequence_num = sort_info.needs_sequence_num;
dcpath->compressed_pathkeys = sort_info.compressed_pathkeys;
dcpath->custom_path.path.pathkeys = root->query_pathkeys;
DecompressChunkPath *path_copy =
copy_decompress_chunk_path((DecompressChunkPath *) chunk_path);
path_copy->reverse = sort_info.reverse;
path_copy->needs_sequence_num = sort_info.needs_sequence_num;
path_copy->required_compressed_pathkeys = sort_info.required_compressed_pathkeys;
path_copy->custom_path.path.pathkeys = root->query_pathkeys;

/*
* Add costing for a sort. The standard Postgres pattern is to add the cost during
* path creation, but not add the sort path itself, that's done during plan
* creation. Examples of this in: create_merge_append_path &
* create_merge_append_plan
*/
if (!pathkeys_contained_in(dcpath->compressed_pathkeys, compressed_path->pathkeys))
if (!pathkeys_contained_in(sort_info.required_compressed_pathkeys,
compressed_path->pathkeys))
{
Path sort_path; /* dummy for result of cost_sort */

cost_sort(&sort_path,
root,
dcpath->compressed_pathkeys,
sort_info.required_compressed_pathkeys,
compressed_path->total_cost,
compressed_path->rows,
compressed_path->pathtarget->width,
0.0,
work_mem,
-1);

cost_decompress_chunk(&dcpath->custom_path.path, &sort_path);
cost_decompress_chunk(root, &path_copy->custom_path.path, &sort_path);
}
/*
* if chunk is partially compressed don't add this now but add an append path later
* combining the uncompressed and compressed parts of the chunk
*/
if (!ts_chunk_is_partial(chunk))
add_path(chunk_rel, &dcpath->custom_path.path);
else
path = &dcpath->custom_path.path;

chunk_path = &path_copy->custom_path.path;
}

/*
Expand All @@ -917,7 +934,7 @@ ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *chunk_rel, Hyp
*/
if (ts_chunk_is_partial(chunk))
{
Bitmapset *req_outer = PATH_REQ_OUTER(path);
Bitmapset *req_outer = PATH_REQ_OUTER(chunk_path);
Path *uncompressed_path =
get_cheapest_path_for_pathkeys(initial_pathlist, NIL, req_outer, TOTAL_COST, false);

Expand All @@ -940,13 +957,13 @@ ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *chunk_rel, Hyp
*/
if (batch_merge_path != NULL)
{
path = (Path *) create_merge_append_path_compat(root,
chunk_rel,
list_make2(batch_merge_path,
uncompressed_path),
root->query_pathkeys,
req_outer,
NIL);
chunk_path = (Path *) create_merge_append_path_compat(root,
chunk_rel,
list_make2(batch_merge_path,
uncompressed_path),
root->query_pathkeys,
req_outer,
NIL);
}
else
/*
Expand All @@ -955,24 +972,25 @@ ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *chunk_rel, Hyp
* and directly add its children, so we have to combine the children
* into a MergeAppend node later, at the chunk append level.
*/
path = (Path *) create_append_path_compat(root,
chunk_rel,
list_make2(path, uncompressed_path),
NIL /* partial paths */,
root->query_pathkeys /* pathkeys */,
req_outer,
0,
false,
false,
path->rows + uncompressed_path->rows);
chunk_path =
(Path *) create_append_path_compat(root,
chunk_rel,
list_make2(chunk_path, uncompressed_path),
NIL /* partial paths */,
root->query_pathkeys /* pathkeys */,
req_outer,
0,
false,
false,
chunk_path->rows + uncompressed_path->rows);
}

/* Add useful sorted versions of the decompress path */
add_chunk_sorted_paths(root, chunk_rel, ht, ht_relid, path, compressed_path);
add_chunk_sorted_paths(root, chunk_rel, ht, ht_relid, chunk_path, compressed_path);

/* this has to go after the path is copied for the ordered path since path can get freed in
* add_path */
add_path(chunk_rel, path);
add_path(chunk_rel, chunk_path);
}

/* the chunk_rel now owns the paths, remove them from the compressed_rel so they can't be freed
Expand Down Expand Up @@ -1748,8 +1766,8 @@ decompress_chunk_path_create(PlannerInfo *root, CompressionInfo *info, int paral

path->custom_path.custom_paths = list_make1(compressed_path);
path->reverse = false;
path->compressed_pathkeys = NIL;
cost_decompress_chunk(&path->custom_path.path, compressed_path);
path->required_compressed_pathkeys = NIL;
cost_decompress_chunk(root, &path->custom_path.path, compressed_path);

return path;
}
Expand Down Expand Up @@ -1818,7 +1836,7 @@ create_compressed_scan_paths(PlannerInfo *root, RelOptInfo *compressed_rel, Comp
*/
List *orig_pathkeys = root->query_pathkeys;
build_compressed_scan_pathkeys(sort_info, root, root->query_pathkeys, info);
root->query_pathkeys = sort_info->compressed_pathkeys;
root->query_pathkeys = sort_info->required_compressed_pathkeys;
check_index_predicates(root, compressed_rel);
create_index_paths(root, compressed_rel);
root->query_pathkeys = orig_pathkeys;
Expand Down
2 changes: 1 addition & 1 deletion tsl/src/nodes/decompress_chunk/decompress_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ typedef struct DecompressChunkPath
*/
List *aggregated_column_type;

List *compressed_pathkeys;
List *required_compressed_pathkeys;
bool needs_sequence_num;
bool reverse;
bool batch_sorted_merge;
Expand Down
9 changes: 6 additions & 3 deletions tsl/src/nodes/decompress_chunk/planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -939,7 +939,7 @@ decompress_chunk_plan_create(PlannerInfo *root, RelOptInfo *rel, CustomPath *pat
collations,
nullsFirst);

ts_label_sort_with_costsize(root, sort, /* limit_tuples = */ 0);
ts_label_sort_with_costsize(root, sort, /* limit_tuples = */ -1.0);

decompress_plan->custom_plans = list_make1(sort);
}
Expand All @@ -948,12 +948,15 @@ decompress_chunk_plan_create(PlannerInfo *root, RelOptInfo *rel, CustomPath *pat
/*
* Add a sort if the compressed scan is not ordered appropriately.
*/
if (!pathkeys_contained_in(dcpath->compressed_pathkeys, compressed_path->pathkeys))
if (!pathkeys_contained_in(dcpath->required_compressed_pathkeys, compressed_path->pathkeys))
{
List *compressed_pks = dcpath->compressed_pathkeys;
List *compressed_pks = dcpath->required_compressed_pathkeys;
Sort *sort = ts_make_sort_from_pathkeys((Plan *) compressed_scan,
compressed_pks,
bms_make_singleton(compressed_scan->scanrelid));

ts_label_sort_with_costsize(root, sort, /* limit_tuples = */ -1.0);

decompress_plan->custom_plans = list_make1(sort);
}
else
Expand Down
12 changes: 12 additions & 0 deletions tsl/src/nodes/decompress_chunk/qual_pushdown.c
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,18 @@ pushdown_quals(PlannerInfo *root, CompressionSettings *settings, RelOptInfo *chu
{
decompress_clauses = lappend(decompress_clauses, ri);
}

if (context.needs_recheck)
{
/*
* If we managed to push down the comparison of orderby column
* to the compressed scan, most matched batches are likely to
* match entirely, so the selectivity of the recheck will be
* close to 1.
*/
ri->norm_selec = 1;
Assert(context.can_pushdown);
}
}
chunk_rel->baserestrictinfo = decompress_clauses;
}
Expand Down