Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disable bulk decompression for small limits #6666

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions tsl/src/nodes/decompress_chunk/decompress_chunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -859,6 +859,7 @@ ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *chunk_rel, Hyp

batch_merge_path->reverse = (merge_result != SCAN_FORWARD);
batch_merge_path->batch_sorted_merge = true;
batch_merge_path->enable_bulk_decompression = false;

/* The segment by optimization is only enabled if it can deliver the tuples in the
* same order as the query requested it. So, we can just copy the pathkeys of the
Expand Down Expand Up @@ -1746,6 +1747,7 @@ decompress_chunk_path_create(PlannerInfo *root, CompressionInfo *info, int paral
path->custom_path.flags = 0;
path->custom_path.methods = &decompress_chunk_path_methods;
path->batch_sorted_merge = false;
path->enable_bulk_decompression = ts_guc_enable_bulk_decompression;

/* To prevent a non-parallel path with this node appearing
* in a parallel plan we only set parallel_safe to true
Expand Down
4 changes: 2 additions & 2 deletions tsl/src/nodes/decompress_chunk/decompress_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ typedef struct DecompressChunkPath
List *bulk_decompression_column;

/*
* If we produce at least some columns that support bulk decompression.
* Whether the bulk decompression is enabled for this path.
*/
bool have_bulk_decompression_columns;
bool enable_bulk_decompression;

/*
* Maps the uncompressed chunk attno to the respective column compression
Expand Down
23 changes: 15 additions & 8 deletions tsl/src/nodes/decompress_chunk/planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ build_decompression_map(PlannerInfo *root, DecompressChunkPath *path, List *scan
* Go over the scan targetlist and determine to which output column each
* scan column goes, saving other additional info as we do that.
*/
path->have_bulk_decompression_columns = false;
bool bulk_decompression_possible_for_some_columns = false;
path->decompression_map = NIL;
foreach (lc, scan_tlist)
{
Expand Down Expand Up @@ -222,9 +222,9 @@ build_decompression_map(PlannerInfo *root, DecompressChunkPath *path, List *scan
!is_segment && destination_attno_in_uncompressed_chunk > 0 &&
tsl_get_decompress_all_function(compression_get_default_algorithm(typoid), typoid) !=
NULL;
path->have_bulk_decompression_columns |= bulk_decompression_possible;
path->bulk_decompression_column =
lappend_int(path->bulk_decompression_column, bulk_decompression_possible);
bulk_decompression_possible_for_some_columns |= bulk_decompression_possible;

/*
* Save information about decompressed columns in uncompressed chunk
Expand All @@ -251,6 +251,16 @@ build_decompression_map(PlannerInfo *root, DecompressChunkPath *path, List *scan
}
}

if (!bulk_decompression_possible_for_some_columns)
{
/*
* This is mostly a cosmetic thing for EXPLAIN -- don't say that we're
* using bulk decompression, if it is enabled but we have no columns
* that support it.
*/
path->enable_bulk_decompression = false;
}

/*
* Check that we have found all the needed columns in the compressed targetlist.
* We can't conveniently check that we have all columns for all-row vars, so
Expand Down Expand Up @@ -1021,18 +1031,15 @@ decompress_chunk_plan_create(PlannerInfo *root, RelOptInfo *rel, CustomPath *pat

Assert(list_length(custom_plans) == 1);

const bool enable_bulk_decompression = !dcpath->batch_sorted_merge &&
ts_guc_enable_bulk_decompression &&
dcpath->have_bulk_decompression_columns;

/*
* For some predicates, we have more efficient implementation that work on
* the entire compressed batch in one go. They go to this list, and the rest
* goes into the usual scan.plan.qual.
*/
List *vectorized_quals = NIL;
if (enable_bulk_decompression)
if (dcpath->enable_bulk_decompression)
{
Assert(!dcpath->batch_sorted_merge);
List *nonvectorized_quals = NIL;
find_vectorized_quals(dcpath,
decompress_plan->scan.plan.qual,
Expand Down Expand Up @@ -1064,7 +1071,7 @@ decompress_chunk_plan_create(PlannerInfo *root, RelOptInfo *rel, CustomPath *pat
dcpath->info->chunk_rte->relid,
dcpath->reverse,
dcpath->batch_sorted_merge,
enable_bulk_decompression,
dcpath->enable_bulk_decompression,
dcpath->perform_vectorized_aggregation);

/*
Expand Down
138 changes: 138 additions & 0 deletions tsl/src/planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "nodes/frozen_chunk_dml/frozen_chunk_dml.h"
#include "nodes/decompress_chunk/decompress_chunk.h"
#include "nodes/gapfill/gapfill.h"
#include "nodes/chunk_append/chunk_append.h"
#include "planner.h"

#include <math.h>
Expand All @@ -43,6 +44,140 @@ is_osm_present()
}
#endif

/*
* Try to disable bulk decompression on DecompressChunkPath, skipping the above
* Projection path and also handling Lists.
*/
static void
try_disable_bulk_decompression(PlannerInfo *root, Node *node)
{
if (IsA(node, List))
{
ListCell *lc;
foreach (lc, (List *) node)
{
try_disable_bulk_decompression(root, (Node *) lfirst(lc));
}
return;
}

if (IsA(node, ProjectionPath))
{
try_disable_bulk_decompression(root, (Node *) castNode(ProjectionPath, node)->subpath);
return;
}

if (IsA(node, AppendPath))
{
try_disable_bulk_decompression(root, (Node *) castNode(AppendPath, node)->subpaths);
return;
}

if (IsA(node, MergeAppendPath))
{
MergeAppendPath *mergeappend = castNode(MergeAppendPath, node);
ListCell *lc;
foreach (lc, mergeappend->subpaths)
{
Path *child = (Path *) lfirst(lc);
if (pathkeys_contained_in(mergeappend->path.pathkeys, child->pathkeys))
{
try_disable_bulk_decompression(root, (Node *) child);
}
}
return;
}

if (!IsA(node, CustomPath))
{
return;
}

CustomPath *custom_child = castNode(CustomPath, node);
if (strcmp(custom_child->methods->CustomName, "ChunkAppend") == 0)
{
try_disable_bulk_decompression(root, (Node *) custom_child->custom_paths);
return;
}

if (strcmp(custom_child->methods->CustomName, "DecompressChunk") != 0)
{
return;
}

DecompressChunkPath *dcpath = (DecompressChunkPath *) custom_child;
dcpath->enable_bulk_decompression = false;
}

/*
* When we have a small limit above chunk decompression, it is more efficient to
* use the row-by-row decompression iterators than the bulk decompression. Since
* bulk decompression is about 10x faster than row-by-row, this advantage goes
* away on limits > 100. This hook disables bulk decompression under small limits.
*/
static void
check_limit_bulk_decompression(PlannerInfo *root, Node *node)
{
ListCell *lc;
switch (node->type)
{
case T_List:
foreach (lc, (List *) node)
{
check_limit_bulk_decompression(root, lfirst(lc));
}
break;
case T_LimitPath:
{
double limit = -1;
LimitPath *path = castNode(LimitPath, node);

if (path->limitCount != NULL)
{
Const *count = castNode(Const, path->limitCount);
Assert(count->consttype == INT8OID);
Assert(DatumGetInt64(count->constvalue) >= 0);
limit = DatumGetInt64(count->constvalue);
}

if (path->limitOffset != NULL)
{
Const *offset = castNode(Const, path->limitOffset);
Assert(offset->consttype == INT8OID);
Assert(DatumGetInt64(offset->constvalue) >= 0);
limit += DatumGetInt64(offset->constvalue);
}

if (limit > 0 && limit < 100)
{
try_disable_bulk_decompression(root, (Node *) path->subpath);
}

break;
}
#if PG14_GE
case T_MemoizePath:
check_limit_bulk_decompression(root, (Node *) castNode(MemoizePath, node)->subpath);
break;
#endif
case T_ProjectionPath:
check_limit_bulk_decompression(root, (Node *) castNode(ProjectionPath, node)->subpath);
break;
case T_SubqueryScanPath:
check_limit_bulk_decompression(root,
(Node *) castNode(SubqueryScanPath, node)->subpath);
break;
case T_NestPath:
case T_MergePath:
case T_HashPath:
check_limit_bulk_decompression(root, (Node *) ((JoinPath *) node)->outerjoinpath);
check_limit_bulk_decompression(root, (Node *) ((JoinPath *) node)->innerjoinpath);
break;
default:
break;
}
}

void
tsl_create_upper_paths_hook(PlannerInfo *root, UpperRelationKind stage, RelOptInfo *input_rel,
RelOptInfo *output_rel, TsRelType input_reltype, Hypertable *ht,
Expand All @@ -61,6 +196,9 @@ tsl_create_upper_paths_hook(PlannerInfo *root, UpperRelationKind stage, RelOptIn
case UPPERREL_DISTINCT:
tsl_skip_scan_paths_add(root, input_rel, output_rel);
break;
case UPPERREL_FINAL:
check_limit_bulk_decompression(root, (Node *) output_rel->pathlist);
break;
default:
break;
}
Expand Down
8 changes: 4 additions & 4 deletions tsl/test/expected/transparent_decompression-15.out
Original file line number Diff line number Diff line change
Expand Up @@ -774,7 +774,7 @@ LIMIT 10;
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Limit (actual rows=5 loops=1)
-> Custom Scan (DecompressChunk) on _hyper_1_1_chunk (actual rows=5 loops=1)
Vectorized Filter: ("time" = 'Fri Dec 31 17:00:00 1999 PST'::timestamp with time zone)
Filter: ("time" = 'Fri Dec 31 17:00:00 1999 PST'::timestamp with time zone)
Rows Removed by Filter: 1795
-> Index Scan using compress_hyper_5_15_chunk_device_id_device_id_peer__ts_meta_idx on compress_hyper_5_15_chunk (actual rows=5 loops=1)
Filter: ((_ts_meta_min_3 <= 'Fri Dec 31 17:00:00 1999 PST'::timestamp with time zone) AND (_ts_meta_max_3 >= 'Fri Dec 31 17:00:00 1999 PST'::timestamp with time zone))
Expand Down Expand Up @@ -4076,23 +4076,23 @@ LIMIT 10;
-> Merge Append (actual rows=5 loops=1)
Sort Key: _hyper_2_4_chunk.device_id
-> Custom Scan (DecompressChunk) on _hyper_2_4_chunk (actual rows=1 loops=1)
Vectorized Filter: ("time" = 'Fri Dec 31 17:00:00 1999 PST'::timestamp with time zone)
Filter: ("time" = 'Fri Dec 31 17:00:00 1999 PST'::timestamp with time zone)
Rows Removed by Filter: 359
-> Sort (actual rows=1 loops=1)
Sort Key: compress_hyper_6_17_chunk.device_id
Sort Method: quicksort
-> Seq Scan on compress_hyper_6_17_chunk (actual rows=1 loops=1)
Filter: ((_ts_meta_min_3 <= 'Fri Dec 31 17:00:00 1999 PST'::timestamp with time zone) AND (_ts_meta_max_3 >= 'Fri Dec 31 17:00:00 1999 PST'::timestamp with time zone))
-> Custom Scan (DecompressChunk) on _hyper_2_5_chunk (actual rows=3 loops=1)
Vectorized Filter: ("time" = 'Fri Dec 31 17:00:00 1999 PST'::timestamp with time zone)
Filter: ("time" = 'Fri Dec 31 17:00:00 1999 PST'::timestamp with time zone)
Rows Removed by Filter: 1077
-> Sort (actual rows=3 loops=1)
Sort Key: compress_hyper_6_18_chunk.device_id
Sort Method: quicksort
-> Seq Scan on compress_hyper_6_18_chunk (actual rows=3 loops=1)
Filter: ((_ts_meta_min_3 <= 'Fri Dec 31 17:00:00 1999 PST'::timestamp with time zone) AND (_ts_meta_max_3 >= 'Fri Dec 31 17:00:00 1999 PST'::timestamp with time zone))
-> Custom Scan (DecompressChunk) on _hyper_2_6_chunk (actual rows=1 loops=1)
Vectorized Filter: ("time" = 'Fri Dec 31 17:00:00 1999 PST'::timestamp with time zone)
Filter: ("time" = 'Fri Dec 31 17:00:00 1999 PST'::timestamp with time zone)
Rows Removed by Filter: 359
-> Sort (actual rows=1 loops=1)
Sort Key: compress_hyper_6_19_chunk.device_id
Expand Down