Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce transparent_decompress_chunk flakiness #6550

Draft
wants to merge 32 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
b479e08
Reduce transparent_decompress_chunk flakiness
akuzm Jan 19, 2024
2563983
Merge remote-tracking branch 'origin/main' into HEAD
akuzm Jan 19, 2024
b70ebd9
refs
akuzm Jan 19, 2024
d3228b9
Merge remote-tracking branch 'origin/main' into HEAD
akuzm Jan 24, 2024
260b2bc
add order by to remove flakiness
akuzm Jan 24, 2024
f3b857c
reference transparent_decompress_chunk-* ordered_append-* ordered_app…
akuzm Jan 24, 2024
0501724
reference transparent_decompress_chunk-* ordered_append-* ordered_app…
akuzm Jan 24, 2024
5c2fb57
fixes...
akuzm Jan 24, 2024
01422d4
fixes
akuzm Jan 24, 2024
7dd8697
better selectivity estimates
akuzm Jan 24, 2024
8529781
fixes
akuzm Jan 24, 2024
5e9e450
Account for filters in DecompressChunk row estimates
akuzm Jan 24, 2024
ee4147f
refs
akuzm Jan 24, 2024
b710c3e
stable mapc test
akuzm Jan 24, 2024
87101d2
reference transparent_decompression_ordered_index-* merge_append_part…
akuzm Jan 25, 2024
c744dfd
reference transparent_decompression_ordered_index-* merge_append_part…
akuzm Jan 25, 2024
a8f83e7
reference transparent_decompression_ordered_index-* merge_append_part…
akuzm Jan 25, 2024
b822a4e
reference transparent_decompress_chunk-*
akuzm Jan 25, 2024
713d4e3
reference transparent_decompress_chunk-*
akuzm Jan 25, 2024
a7ddc1f
reference transparent_decompress_chunk-*
akuzm Jan 25, 2024
9826897
Merge remote-tracking branch 'akuzm/filter-size' into HEAD
akuzm Jan 25, 2024
72645e9
Merge remote-tracking branch 'origin/main' into HEAD
akuzm Jan 25, 2024
8dd8196
fix the row count estimates again
akuzm Jan 25, 2024
ce2b7cd
Merge remote-tracking branch 'origin/main' into HEAD
akuzm Jan 25, 2024
6a2190a
remove debug
akuzm Jan 25, 2024
afcbe74
size fixes
akuzm Jan 26, 2024
4be8f38
Merge remote-tracking branch 'akuzm/filter-size' into HEAD
akuzm Jan 26, 2024
b7c7dc4
uncontroversial test refs
akuzm Jan 26, 2024
97d5046
workaround for merge append cost problem
akuzm Jan 29, 2024
eb52026
Merge remote-tracking branch 'origin/main' into HEAD
akuzm Jan 29, 2024
2571bd8
comment
akuzm Jan 29, 2024
c27f73d
more ref changes
akuzm Jan 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
132 changes: 77 additions & 55 deletions tsl/src/nodes/decompress_chunk/decompress_chunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ static CustomPathMethods decompress_chunk_path_methods = {

typedef struct SortInfo
{
List *compressed_pathkeys;
List *required_compressed_pathkeys;
bool needs_sequence_num;
bool can_pushdown_sort; /* sort can be pushed below DecompressChunk */
bool reverse;
Expand Down Expand Up @@ -153,7 +153,7 @@ build_compressed_scan_pathkeys(SortInfo *sort_info, PlannerInfo *root, List *chu
{
Var *var;
int varattno;
List *compressed_pathkeys = NIL;
List *required_compressed_pathkeys = NIL;
PathKey *pk;

/*
Expand Down Expand Up @@ -198,7 +198,7 @@ build_compressed_scan_pathkeys(SortInfo *sort_info, PlannerInfo *root, List *chu
*/
Assert(compressed_em != NULL);

compressed_pathkeys = lappend(compressed_pathkeys, pk);
required_compressed_pathkeys = lappend(required_compressed_pathkeys, pk);

segmentby_columns =
bms_add_member(segmentby_columns, castNode(Var, compressed_em->em_expr)->varattno);
Expand All @@ -210,7 +210,7 @@ build_compressed_scan_pathkeys(SortInfo *sort_info, PlannerInfo *root, List *chu
* asserting here.
*/
Assert(bms_num_members(segmentby_columns) == info->num_segmentby_columns ||
list_length(compressed_pathkeys) == list_length(chunk_pathkeys));
list_length(required_compressed_pathkeys) == list_length(chunk_pathkeys));
}

/*
Expand Down Expand Up @@ -251,9 +251,9 @@ build_compressed_scan_pathkeys(SortInfo *sort_info, PlannerInfo *root, List *chu

pk = make_canonical_pathkey(root, ec, opfamily, strategy, nulls_first);

compressed_pathkeys = lappend(compressed_pathkeys, pk);
required_compressed_pathkeys = lappend(required_compressed_pathkeys, pk);
}
sort_info->compressed_pathkeys = compressed_pathkeys;
sort_info->required_compressed_pathkeys = required_compressed_pathkeys;
}

static DecompressChunkPath *
Expand Down Expand Up @@ -321,15 +321,24 @@ build_compressioninfo(PlannerInfo *root, Hypertable *ht, RelOptInfo *chunk_rel)
* we put cost of 1 tuple of compressed_scan as startup cost
*/
static void
cost_decompress_chunk(Path *path, Path *compressed_path)
cost_decompress_chunk(PlannerInfo *root, Path *path, Path *compressed_path)
{
/* Set the row number estimate. */
if (path->param_info != NULL)
{
path->rows = path->param_info->ppi_rows;
}
else
{
path->rows = path->parent->rows;
}

/* startup_cost is cost before fetching first tuple */
if (compressed_path->rows > 0)
path->startup_cost = compressed_path->total_cost / compressed_path->rows;

/* total_cost is cost for fetching all tuples */
path->total_cost = compressed_path->total_cost + path->rows * cpu_tuple_cost;
path->rows = compressed_path->rows * DECOMPRESS_CHUNK_BATCH_SIZE;
}

/* Smoothstep function S1 (the h01 cubic Hermite spline). */
Expand Down Expand Up @@ -360,7 +369,7 @@ cost_batch_sorted_merge(PlannerInfo *root, CompressionInfo *compression_info,
enable_sort = true;
cost_sort(&sort_path,
root,
dcpath->compressed_pathkeys,
dcpath->required_compressed_pathkeys,
compressed_path->total_cost,
compressed_path->rows,
compressed_path->pathtarget->width,
Expand Down Expand Up @@ -690,7 +699,6 @@ ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *chunk_rel, Hyp
{
RelOptInfo *compressed_rel;
ListCell *lc;
double new_row_estimate;
Index ht_relid = 0;

CompressionInfo *compression_info = build_compressioninfo(root, ht, chunk_rel);
Expand Down Expand Up @@ -724,8 +732,12 @@ ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *chunk_rel, Hyp
chunk_rel,
compressed_rel,
ts_chunk_is_partial(chunk));

set_baserel_size_estimates(root, compressed_rel);
new_row_estimate = compressed_rel->rows * DECOMPRESS_CHUNK_BATCH_SIZE;
const double new_tuples_estimate = compressed_rel->rows * DECOMPRESS_CHUNK_BATCH_SIZE;
const double new_rows_estimate =
new_tuples_estimate *
clauselist_selectivity(root, chunk_rel->baserestrictinfo, 0, JOIN_INNER, NULL);

if (!compression_info->single_chunk)
{
Expand All @@ -734,10 +746,23 @@ ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *chunk_rel, Hyp
Assert(chunk_info->parent_reloid == ht->main_table_relid);
ht_relid = chunk_info->parent_relid;
RelOptInfo *hypertable_rel = root->simple_rel_array[ht_relid];
hypertable_rel->rows += (new_row_estimate - chunk_rel->rows);
hypertable_rel->rows =
clamp_row_est(hypertable_rel->rows + new_rows_estimate - chunk_rel->rows);
hypertable_rel->tuples =
clamp_row_est(hypertable_rel->tuples + new_tuples_estimate - chunk_rel->tuples);
}

chunk_rel->rows = new_row_estimate;
/*
* Note that we can be overwriting the estimate for uncompressed chunk part of a
* partial chunk here, but the paths for the uncompressed part were already
* built, so it is OK.
*/
chunk_rel->rows = new_rows_estimate;
/*
* This is a workaround for the bug in upstream merge append cost estimation. See
* https://www.postgresql.org/message-id/flat/CALzhyqyhoXQDR-Usd_0HeWk%3DuqNLzoVeT8KhRoo%3DpV_KzgO3QQ%40mail.gmail.com
*/
chunk_rel->tuples = new_rows_estimate;

create_compressed_scan_paths(root, compressed_rel, compression_info, &sort_info);

Expand All @@ -750,7 +775,6 @@ ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *chunk_rel, Hyp
foreach (lc, compressed_rel->pathlist)
{
Path *compressed_path = lfirst(lc);
Path *path;

/*
* We skip any BitmapScan parameterized paths here as supporting
Expand Down Expand Up @@ -830,7 +854,8 @@ ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *chunk_rel, Hyp
continue;
}

path = (Path *) decompress_chunk_path_create(root, compression_info, 0, compressed_path);
Path *chunk_path =
(Path *) decompress_chunk_path_create(root, compression_info, 0, compressed_path);

/*
* Create a path for the batch sorted merge optimization. This optimization performs a
Expand All @@ -846,7 +871,7 @@ ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *chunk_rel, Hyp
MergeBatchResult merge_result = can_batch_sorted_merge(root, compression_info, chunk);
if (merge_result != MERGE_NOT_POSSIBLE)
{
batch_merge_path = copy_decompress_chunk_path((DecompressChunkPath *) path);
batch_merge_path = copy_decompress_chunk_path((DecompressChunkPath *) chunk_path);

batch_merge_path->reverse = (merge_result != SCAN_FORWARD);
batch_merge_path->batch_sorted_merge = true;
Expand All @@ -873,42 +898,38 @@ ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *chunk_rel, Hyp
* between the decompression node and the scan during plan creation */
if (sort_info.can_pushdown_sort)
{
DecompressChunkPath *dcpath = copy_decompress_chunk_path((DecompressChunkPath *) path);
dcpath->reverse = sort_info.reverse;
dcpath->needs_sequence_num = sort_info.needs_sequence_num;
dcpath->compressed_pathkeys = sort_info.compressed_pathkeys;
dcpath->custom_path.path.pathkeys = root->query_pathkeys;
DecompressChunkPath *path_copy =
copy_decompress_chunk_path((DecompressChunkPath *) chunk_path);
path_copy->reverse = sort_info.reverse;
path_copy->needs_sequence_num = sort_info.needs_sequence_num;
path_copy->required_compressed_pathkeys = sort_info.required_compressed_pathkeys;
path_copy->custom_path.path.pathkeys = root->query_pathkeys;

/*
* Add costing for a sort. The standard Postgres pattern is to add the cost during
* path creation, but not add the sort path itself, that's done during plan
* creation. Examples of this in: create_merge_append_path &
* create_merge_append_plan
*/
if (!pathkeys_contained_in(dcpath->compressed_pathkeys, compressed_path->pathkeys))
if (!pathkeys_contained_in(sort_info.required_compressed_pathkeys,
compressed_path->pathkeys))
{
Path sort_path; /* dummy for result of cost_sort */

cost_sort(&sort_path,
root,
dcpath->compressed_pathkeys,
sort_info.required_compressed_pathkeys,
compressed_path->total_cost,
compressed_path->rows,
compressed_path->pathtarget->width,
0.0,
work_mem,
-1);

cost_decompress_chunk(&dcpath->custom_path.path, &sort_path);
cost_decompress_chunk(root, &path_copy->custom_path.path, &sort_path);
}
/*
* if chunk is partially compressed don't add this now but add an append path later
* combining the uncompressed and compressed parts of the chunk
*/
if (!ts_chunk_is_partial(chunk))
add_path(chunk_rel, &dcpath->custom_path.path);
else
path = &dcpath->custom_path.path;

chunk_path = &path_copy->custom_path.path;
}

/*
Expand All @@ -917,7 +938,7 @@ ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *chunk_rel, Hyp
*/
if (ts_chunk_is_partial(chunk))
{
Bitmapset *req_outer = PATH_REQ_OUTER(path);
Bitmapset *req_outer = PATH_REQ_OUTER(chunk_path);
Path *uncompressed_path =
get_cheapest_path_for_pathkeys(initial_pathlist, NIL, req_outer, TOTAL_COST, false);

Expand All @@ -940,13 +961,13 @@ ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *chunk_rel, Hyp
*/
if (batch_merge_path != NULL)
{
path = (Path *) create_merge_append_path_compat(root,
chunk_rel,
list_make2(batch_merge_path,
uncompressed_path),
root->query_pathkeys,
req_outer,
NIL);
chunk_path = (Path *) create_merge_append_path_compat(root,
chunk_rel,
list_make2(batch_merge_path,
uncompressed_path),
root->query_pathkeys,
req_outer,
NIL);
}
else
/*
Expand All @@ -955,24 +976,25 @@ ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *chunk_rel, Hyp
* and directly add its children, so we have to combine the children
* into a MergeAppend node later, at the chunk append level.
*/
path = (Path *) create_append_path_compat(root,
chunk_rel,
list_make2(path, uncompressed_path),
NIL /* partial paths */,
root->query_pathkeys /* pathkeys */,
req_outer,
0,
false,
false,
path->rows + uncompressed_path->rows);
chunk_path =
(Path *) create_append_path_compat(root,
chunk_rel,
list_make2(chunk_path, uncompressed_path),
NIL /* partial paths */,
root->query_pathkeys /* pathkeys */,
req_outer,
0,
false,
false,
chunk_path->rows + uncompressed_path->rows);
}

/* Add useful sorted versions of the decompress path */
add_chunk_sorted_paths(root, chunk_rel, ht, ht_relid, path, compressed_path);
add_chunk_sorted_paths(root, chunk_rel, ht, ht_relid, chunk_path, compressed_path);

/* this has to go after the path is copied for the ordered path since path can get freed in
* add_path */
add_path(chunk_rel, path);
add_path(chunk_rel, chunk_path);
}

/* the chunk_rel now owns the paths, remove them from the compressed_rel so they can't be freed
Expand Down Expand Up @@ -1748,8 +1770,8 @@ decompress_chunk_path_create(PlannerInfo *root, CompressionInfo *info, int paral

path->custom_path.custom_paths = list_make1(compressed_path);
path->reverse = false;
path->compressed_pathkeys = NIL;
cost_decompress_chunk(&path->custom_path.path, compressed_path);
path->required_compressed_pathkeys = NIL;
cost_decompress_chunk(root, &path->custom_path.path, compressed_path);

return path;
}
Expand Down Expand Up @@ -1818,7 +1840,7 @@ create_compressed_scan_paths(PlannerInfo *root, RelOptInfo *compressed_rel, Comp
*/
List *orig_pathkeys = root->query_pathkeys;
build_compressed_scan_pathkeys(sort_info, root, root->query_pathkeys, info);
root->query_pathkeys = sort_info->compressed_pathkeys;
root->query_pathkeys = sort_info->required_compressed_pathkeys;
check_index_predicates(root, compressed_rel);
create_index_paths(root, compressed_rel);
root->query_pathkeys = orig_pathkeys;
Expand Down
2 changes: 1 addition & 1 deletion tsl/src/nodes/decompress_chunk/decompress_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ typedef struct DecompressChunkPath
*/
List *aggregated_column_type;

List *compressed_pathkeys;
List *required_compressed_pathkeys;
bool needs_sequence_num;
bool reverse;
bool batch_sorted_merge;
Expand Down
9 changes: 6 additions & 3 deletions tsl/src/nodes/decompress_chunk/planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -939,7 +939,7 @@ decompress_chunk_plan_create(PlannerInfo *root, RelOptInfo *rel, CustomPath *pat
collations,
nullsFirst);

ts_label_sort_with_costsize(root, sort, /* limit_tuples = */ 0);
ts_label_sort_with_costsize(root, sort, /* limit_tuples = */ -1.0);

decompress_plan->custom_plans = list_make1(sort);
}
Expand All @@ -948,12 +948,15 @@ decompress_chunk_plan_create(PlannerInfo *root, RelOptInfo *rel, CustomPath *pat
/*
* Add a sort if the compressed scan is not ordered appropriately.
*/
if (!pathkeys_contained_in(dcpath->compressed_pathkeys, compressed_path->pathkeys))
if (!pathkeys_contained_in(dcpath->required_compressed_pathkeys, compressed_path->pathkeys))
{
List *compressed_pks = dcpath->compressed_pathkeys;
List *compressed_pks = dcpath->required_compressed_pathkeys;
Sort *sort = ts_make_sort_from_pathkeys((Plan *) compressed_scan,
compressed_pks,
bms_make_singleton(compressed_scan->scanrelid));

ts_label_sort_with_costsize(root, sort, /* limit_tuples = */ -1.0);

decompress_plan->custom_plans = list_make1(sort);
}
else
Expand Down
12 changes: 12 additions & 0 deletions tsl/src/nodes/decompress_chunk/qual_pushdown.c
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,18 @@ pushdown_quals(PlannerInfo *root, CompressionSettings *settings, RelOptInfo *chu
{
decompress_clauses = lappend(decompress_clauses, ri);
}

if (context.needs_recheck)
{
/*
* If we managed to push down the comparison of orderby column
* to the compressed scan, most matched batches are likely to
* match entirely, so the selectivity of the recheck will be
* close to 1.
*/
ri->norm_selec = 1;
Assert(context.can_pushdown);
}
}
chunk_rel->baserestrictinfo = decompress_clauses;
}
Expand Down
17 changes: 7 additions & 10 deletions tsl/test/expected/compression.out
Original file line number Diff line number Diff line change
Expand Up @@ -1825,17 +1825,14 @@ SET min_parallel_table_scan_size = 0;
CREATE INDEX ON f_sensor_data (time, sensor_id);
:explain
SELECT * FROM f_sensor_data WHERE sensor_id > 100;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Gather
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_37_71_chunk
Output: _hyper_37_71_chunk."time", _hyper_37_71_chunk.sensor_id, _hyper_37_71_chunk.cpu, _hyper_37_71_chunk.temperature
Workers Planned: 2
-> Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_37_71_chunk
Output: _hyper_37_71_chunk."time", _hyper_37_71_chunk.sensor_id, _hyper_37_71_chunk.cpu, _hyper_37_71_chunk.temperature
-> Parallel Index Scan using compress_hyper_38_72_chunk_sensor_id__ts_meta_sequence_num_idx on _timescaledb_internal.compress_hyper_38_72_chunk
Output: compress_hyper_38_72_chunk."time", compress_hyper_38_72_chunk.sensor_id, compress_hyper_38_72_chunk.cpu, compress_hyper_38_72_chunk.temperature, compress_hyper_38_72_chunk._ts_meta_count, compress_hyper_38_72_chunk._ts_meta_sequence_num, compress_hyper_38_72_chunk._ts_meta_min_1, compress_hyper_38_72_chunk._ts_meta_max_1
Index Cond: (compress_hyper_38_72_chunk.sensor_id > 100)
(8 rows)
-> Index Scan using compress_hyper_38_72_chunk_sensor_id__ts_meta_sequence_num_idx on _timescaledb_internal.compress_hyper_38_72_chunk
Output: compress_hyper_38_72_chunk."time", compress_hyper_38_72_chunk.sensor_id, compress_hyper_38_72_chunk.cpu, compress_hyper_38_72_chunk.temperature, compress_hyper_38_72_chunk._ts_meta_count, compress_hyper_38_72_chunk._ts_meta_sequence_num, compress_hyper_38_72_chunk._ts_meta_min_1, compress_hyper_38_72_chunk._ts_meta_max_1
Index Cond: (compress_hyper_38_72_chunk.sensor_id > 100)
(5 rows)

RESET enable_parallel_append;
-- Test for partially compressed chunks
Expand Down