Skip to content

Commit

Permalink
Simplification and cleanup of decompression planning (#6731)
Browse files Browse the repository at this point in the history
This PR renames some things in preparation to cost changes from
#6550

Also simplifies path creation for partial chunks, improving some plans
(now index scans are used instead of sort).
  • Loading branch information
akuzm committed Mar 6, 2024
1 parent 119d541 commit 7d6173b
Show file tree
Hide file tree
Showing 7 changed files with 215 additions and 384 deletions.
100 changes: 48 additions & 52 deletions tsl/src/nodes/decompress_chunk/decompress_chunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ static CustomPathMethods decompress_chunk_path_methods = {

typedef struct SortInfo
{
List *compressed_pathkeys;
List *required_compressed_pathkeys;
bool needs_sequence_num;
bool can_pushdown_sort; /* sort can be pushed below DecompressChunk */
bool reverse;
Expand Down Expand Up @@ -153,7 +153,7 @@ build_compressed_scan_pathkeys(SortInfo *sort_info, PlannerInfo *root, List *chu
{
Var *var;
int varattno;
List *compressed_pathkeys = NIL;
List *required_compressed_pathkeys = NIL;
PathKey *pk;

/*
Expand Down Expand Up @@ -198,7 +198,7 @@ build_compressed_scan_pathkeys(SortInfo *sort_info, PlannerInfo *root, List *chu
*/
Assert(compressed_em != NULL);

compressed_pathkeys = lappend(compressed_pathkeys, pk);
required_compressed_pathkeys = lappend(required_compressed_pathkeys, pk);

segmentby_columns =
bms_add_member(segmentby_columns, castNode(Var, compressed_em->em_expr)->varattno);
Expand All @@ -210,7 +210,7 @@ build_compressed_scan_pathkeys(SortInfo *sort_info, PlannerInfo *root, List *chu
* asserting here.
*/
Assert(bms_num_members(segmentby_columns) == info->num_segmentby_columns ||
list_length(compressed_pathkeys) == list_length(chunk_pathkeys));
list_length(required_compressed_pathkeys) == list_length(chunk_pathkeys));
}

/*
Expand Down Expand Up @@ -251,9 +251,9 @@ build_compressed_scan_pathkeys(SortInfo *sort_info, PlannerInfo *root, List *chu

pk = make_canonical_pathkey(root, ec, opfamily, strategy, nulls_first);

compressed_pathkeys = lappend(compressed_pathkeys, pk);
required_compressed_pathkeys = lappend(required_compressed_pathkeys, pk);
}
sort_info->compressed_pathkeys = compressed_pathkeys;
sort_info->required_compressed_pathkeys = required_compressed_pathkeys;
}

static DecompressChunkPath *
Expand Down Expand Up @@ -322,7 +322,7 @@ build_compressioninfo(PlannerInfo *root, Hypertable *ht, Chunk *chunk, RelOptInf
* we put cost of 1 tuple of compressed_scan as startup cost
*/
static void
cost_decompress_chunk(Path *path, Path *compressed_path)
cost_decompress_chunk(PlannerInfo *root, Path *path, Path *compressed_path)
{
/* startup_cost is cost before fetching first tuple */
if (compressed_path->rows > 0)
Expand Down Expand Up @@ -361,7 +361,7 @@ cost_batch_sorted_merge(PlannerInfo *root, CompressionInfo *compression_info,
enable_sort = true;
cost_sort(&sort_path,
root,
dcpath->compressed_pathkeys,
dcpath->required_compressed_pathkeys,
compressed_path->total_cost,
compressed_path->rows,
compressed_path->pathtarget->width,
Expand Down Expand Up @@ -699,7 +699,6 @@ ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *chunk_rel, Hyp
{
RelOptInfo *compressed_rel;
ListCell *lc;
double new_row_estimate;
Index ht_relid = 0;

CompressionInfo *compression_info = build_compressioninfo(root, ht, chunk, chunk_rel);
Expand Down Expand Up @@ -734,7 +733,7 @@ ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *chunk_rel, Hyp
compressed_rel,
ts_chunk_is_partial(chunk));
set_baserel_size_estimates(root, compressed_rel);
new_row_estimate = compressed_rel->rows * DECOMPRESS_CHUNK_BATCH_SIZE;
double new_row_estimate = compressed_rel->rows * DECOMPRESS_CHUNK_BATCH_SIZE;

if (!compression_info->single_chunk)
{
Expand All @@ -759,7 +758,6 @@ ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *chunk_rel, Hyp
foreach (lc, compressed_rel->pathlist)
{
Path *compressed_path = lfirst(lc);
Path *path;

/*
* We skip any BitmapScan parameterized paths here as supporting
Expand Down Expand Up @@ -839,7 +837,8 @@ ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *chunk_rel, Hyp
continue;
}

path = (Path *) decompress_chunk_path_create(root, compression_info, 0, compressed_path);
Path *chunk_path =
(Path *) decompress_chunk_path_create(root, compression_info, 0, compressed_path);

/*
* Create a path for the batch sorted merge optimization. This optimization performs a
Expand All @@ -855,7 +854,7 @@ ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *chunk_rel, Hyp
MergeBatchResult merge_result = can_batch_sorted_merge(root, compression_info, chunk);
if (merge_result != MERGE_NOT_POSSIBLE)
{
batch_merge_path = copy_decompress_chunk_path((DecompressChunkPath *) path);
batch_merge_path = copy_decompress_chunk_path((DecompressChunkPath *) chunk_path);

batch_merge_path->reverse = (merge_result != SCAN_FORWARD);
batch_merge_path->batch_sorted_merge = true;
Expand All @@ -882,42 +881,38 @@ ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *chunk_rel, Hyp
* between the decompression node and the scan during plan creation */
if (sort_info.can_pushdown_sort)
{
DecompressChunkPath *dcpath = copy_decompress_chunk_path((DecompressChunkPath *) path);
dcpath->reverse = sort_info.reverse;
dcpath->needs_sequence_num = sort_info.needs_sequence_num;
dcpath->compressed_pathkeys = sort_info.compressed_pathkeys;
dcpath->custom_path.path.pathkeys = root->query_pathkeys;
DecompressChunkPath *path_copy =
copy_decompress_chunk_path((DecompressChunkPath *) chunk_path);
path_copy->reverse = sort_info.reverse;
path_copy->needs_sequence_num = sort_info.needs_sequence_num;
path_copy->required_compressed_pathkeys = sort_info.required_compressed_pathkeys;
path_copy->custom_path.path.pathkeys = root->query_pathkeys;

/*
* Add costing for a sort. The standard Postgres pattern is to add the cost during
* path creation, but not add the sort path itself, that's done during plan
* creation. Examples of this in: create_merge_append_path &
* create_merge_append_plan
*/
if (!pathkeys_contained_in(dcpath->compressed_pathkeys, compressed_path->pathkeys))
if (!pathkeys_contained_in(sort_info.required_compressed_pathkeys,
compressed_path->pathkeys))
{
Path sort_path; /* dummy for result of cost_sort */

cost_sort(&sort_path,
root,
dcpath->compressed_pathkeys,
sort_info.required_compressed_pathkeys,
compressed_path->total_cost,
compressed_path->rows,
compressed_path->pathtarget->width,
0.0,
work_mem,
-1);

cost_decompress_chunk(&dcpath->custom_path.path, &sort_path);
cost_decompress_chunk(root, &path_copy->custom_path.path, &sort_path);
}
/*
* if chunk is partially compressed don't add this now but add an append path later
* combining the uncompressed and compressed parts of the chunk
*/
if (!ts_chunk_is_partial(chunk))
add_path(chunk_rel, &dcpath->custom_path.path);
else
path = &dcpath->custom_path.path;

chunk_path = &path_copy->custom_path.path;
}

/*
Expand All @@ -926,7 +921,7 @@ ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *chunk_rel, Hyp
*/
if (ts_chunk_is_partial(chunk))
{
Bitmapset *req_outer = PATH_REQ_OUTER(path);
Bitmapset *req_outer = PATH_REQ_OUTER(chunk_path);
Path *uncompressed_path =
get_cheapest_path_for_pathkeys(initial_pathlist, NIL, req_outer, TOTAL_COST, false);

Expand All @@ -949,13 +944,13 @@ ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *chunk_rel, Hyp
*/
if (batch_merge_path != NULL)
{
path = (Path *) create_merge_append_path_compat(root,
chunk_rel,
list_make2(batch_merge_path,
uncompressed_path),
root->query_pathkeys,
req_outer,
NIL);
chunk_path = (Path *) create_merge_append_path_compat(root,
chunk_rel,
list_make2(batch_merge_path,
uncompressed_path),
root->query_pathkeys,
req_outer,
NIL);
}
else
/*
Expand All @@ -964,24 +959,25 @@ ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *chunk_rel, Hyp
* and directly add its children, so we have to combine the children
* into a MergeAppend node later, at the chunk append level.
*/
path = (Path *) create_append_path_compat(root,
chunk_rel,
list_make2(path, uncompressed_path),
NIL /* partial paths */,
root->query_pathkeys /* pathkeys */,
req_outer,
0,
false,
false,
path->rows + uncompressed_path->rows);
chunk_path =
(Path *) create_append_path_compat(root,
chunk_rel,
list_make2(chunk_path, uncompressed_path),
NIL /* partial paths */,
root->query_pathkeys /* pathkeys */,
req_outer,
0,
false,
false,
chunk_path->rows + uncompressed_path->rows);
}

/* Add useful sorted versions of the decompress path */
add_chunk_sorted_paths(root, chunk_rel, ht, ht_relid, path, compressed_path);
add_chunk_sorted_paths(root, chunk_rel, ht, ht_relid, chunk_path, compressed_path);

/* this has to go after the path is copied for the ordered path since path can get freed in
* add_path */
add_path(chunk_rel, path);
add_path(chunk_rel, chunk_path);
}

/* the chunk_rel now owns the paths, remove them from the compressed_rel so they can't be freed
Expand Down Expand Up @@ -1757,8 +1753,8 @@ decompress_chunk_path_create(PlannerInfo *root, CompressionInfo *info, int paral

path->custom_path.custom_paths = list_make1(compressed_path);
path->reverse = false;
path->compressed_pathkeys = NIL;
cost_decompress_chunk(&path->custom_path.path, compressed_path);
path->required_compressed_pathkeys = NIL;
cost_decompress_chunk(root, &path->custom_path.path, compressed_path);

return path;
}
Expand Down Expand Up @@ -1827,7 +1823,7 @@ create_compressed_scan_paths(PlannerInfo *root, RelOptInfo *compressed_rel, Comp
*/
List *orig_pathkeys = root->query_pathkeys;
build_compressed_scan_pathkeys(sort_info, root, root->query_pathkeys, info);
root->query_pathkeys = sort_info->compressed_pathkeys;
root->query_pathkeys = sort_info->required_compressed_pathkeys;
check_index_predicates(root, compressed_rel);
create_index_paths(root, compressed_rel);
root->query_pathkeys = orig_pathkeys;
Expand Down
2 changes: 1 addition & 1 deletion tsl/src/nodes/decompress_chunk/decompress_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ typedef struct DecompressChunkPath
*/
List *aggregated_column_type;

List *compressed_pathkeys;
List *required_compressed_pathkeys;
bool needs_sequence_num;
bool reverse;
bool batch_sorted_merge;
Expand Down
9 changes: 6 additions & 3 deletions tsl/src/nodes/decompress_chunk/planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -996,7 +996,7 @@ decompress_chunk_plan_create(PlannerInfo *root, RelOptInfo *rel, CustomPath *pat
collations,
nullsFirst);

ts_label_sort_with_costsize(root, sort, /* limit_tuples = */ 0);
ts_label_sort_with_costsize(root, sort, /* limit_tuples = */ -1.0);

decompress_plan->custom_plans = list_make1(sort);
}
Expand All @@ -1005,12 +1005,15 @@ decompress_chunk_plan_create(PlannerInfo *root, RelOptInfo *rel, CustomPath *pat
/*
* Add a sort if the compressed scan is not ordered appropriately.
*/
if (!pathkeys_contained_in(dcpath->compressed_pathkeys, compressed_path->pathkeys))
if (!pathkeys_contained_in(dcpath->required_compressed_pathkeys, compressed_path->pathkeys))
{
List *compressed_pks = dcpath->compressed_pathkeys;
List *compressed_pks = dcpath->required_compressed_pathkeys;
Sort *sort = ts_make_sort_from_pathkeys((Plan *) compressed_scan,
compressed_pks,
bms_make_singleton(compressed_scan->scanrelid));

ts_label_sort_with_costsize(root, sort, /* limit_tuples = */ -1.0);

decompress_plan->custom_plans = list_make1(sort);
}
else
Expand Down

0 comments on commit 7d6173b

Please sign in to comment.