Skip to content

Commit

Permalink
Use compressed chunk tupdesc to check which metadata columns we have (#…
Browse files Browse the repository at this point in the history
…6663)

The compressed qual pushdown will look at presence of metadata columns
in the compressed chunk, not at whether the uncompressed column is an
orderby. This way, the decision on which columns have metadata is made
only during the creation of compressed chunk.

Apply the same approach in row compressor as well.

This doesn't introduce any behavior changes, just changes the decision
about which columns have metadata to happen in a single place. This will
simplify experimenting with configurable metadata columns.
  • Loading branch information
akuzm committed Feb 23, 2024
1 parent b039465 commit 402c1c3
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 114 deletions.
2 changes: 1 addition & 1 deletion tsl/src/compression/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -1240,7 +1240,7 @@ recompress_chunk_segmentwise_impl(Chunk *uncompressed_chunk)
RowCompressor row_compressor;
row_compressor_init(settings,
&row_compressor,
uncompressed_rel_tupdesc,
uncompressed_chunk_rel,
compressed_chunk_rel,
compressed_rel_tupdesc->natts,
true /*need_bistate*/,
Expand Down
56 changes: 33 additions & 23 deletions tsl/src/compression/compression.c
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ compress_chunk(Oid in_table, Oid out_table, int insert_options)
bool reset_sequence = insert_options > 0;
row_compressor_init(settings,
&row_compressor,
in_desc,
in_rel,
out_rel,
out_desc->natts,
true /*need_bistate*/,
Expand Down Expand Up @@ -859,11 +859,12 @@ get_sequence_number_for_current_group(Relation table_rel, Oid index_oid,
}

static void
build_column_map(CompressionSettings *settings, TupleDesc in_desc, Relation compressed_table,
PerColumn **pcolumns, int16 **pmap)
build_column_map(CompressionSettings *settings, Relation uncompressed_table,
Relation compressed_table, PerColumn **pcolumns, int16 **pmap)
{
Oid compressed_data_type_oid = ts_custom_type_cache_get(CUSTOM_TYPE_COMPRESSED_DATA)->type_oid;
TupleDesc out_desc = RelationGetDescr(compressed_table);
TupleDesc in_desc = RelationGetDescr(uncompressed_table);

PerColumn *columns = palloc0(sizeof(PerColumn) * in_desc->natts);
int16 *map = palloc0(sizeof(int16) * in_desc->natts);
Expand All @@ -886,32 +887,41 @@ build_column_map(CompressionSettings *settings, TupleDesc in_desc, Relation comp

if (!is_segmentby)
{
int16 segment_min_attr_offset = -1;
int16 segment_max_attr_offset = -1;
SegmentMetaMinMaxBuilder *segment_min_max_builder = NULL;
if (compressed_column_attr->atttypid != compressed_data_type_oid)
elog(ERROR,
"expected column '%s' to be a compressed data type",
NameStr(attr->attname));

if (is_orderby)
AttrNumber segment_min_attr_number =
compressed_column_metadata_attno(settings,
uncompressed_table->rd_id,
attr->attnum,
compressed_table->rd_id,
"min");
AttrNumber segment_max_attr_number =
compressed_column_metadata_attno(settings,
uncompressed_table->rd_id,
attr->attnum,
compressed_table->rd_id,
"max");
int16 segment_min_attr_offset = segment_min_attr_number - 1;
int16 segment_max_attr_offset = segment_max_attr_number - 1;

SegmentMetaMinMaxBuilder *segment_min_max_builder = NULL;
if (segment_min_attr_number != InvalidAttrNumber ||
segment_max_attr_number != InvalidAttrNumber)
{
int16 index = ts_array_position(settings->fd.orderby, NameStr(attr->attname));
char *segment_min_col_name = column_segment_min_name(index);
char *segment_max_col_name = column_segment_max_name(index);
AttrNumber segment_min_attr_number =
get_attnum(compressed_table->rd_id, segment_min_col_name);
AttrNumber segment_max_attr_number =
get_attnum(compressed_table->rd_id, segment_max_col_name);
if (segment_min_attr_number == InvalidAttrNumber)
elog(ERROR, "couldn't find metadata column \"%s\"", segment_min_col_name);
if (segment_max_attr_number == InvalidAttrNumber)
elog(ERROR, "couldn't find metadata column \"%s\"", segment_max_col_name);
segment_min_attr_offset = AttrNumberGetAttrOffset(segment_min_attr_number);
segment_max_attr_offset = AttrNumberGetAttrOffset(segment_max_attr_number);
Ensure(segment_min_attr_number != InvalidAttrNumber,
"could not find the min metadata column");
Ensure(segment_max_attr_number != InvalidAttrNumber,
"could not find the min metadata column");
segment_min_max_builder =
segment_meta_min_max_builder_create(attr->atttypid, attr->attcollation);
}

Ensure(!is_orderby || segment_min_max_builder != NULL,
"orderby columns must have minmax metadata");

*column = (PerColumn){
.compressor = compressor_for_type(attr->atttypid),
.min_metadata_attr_offset = segment_min_attr_offset,
Expand Down Expand Up @@ -944,7 +954,7 @@ build_column_map(CompressionSettings *settings, TupleDesc in_desc, Relation comp
********************/
void
row_compressor_init(CompressionSettings *settings, RowCompressor *row_compressor,
TupleDesc uncompressed_tuple_desc, Relation compressed_table,
Relation uncompressed_table, Relation compressed_table,
int16 num_columns_in_compressed_table, bool need_bistate, bool reset_sequence,
int insert_options)
{
Expand Down Expand Up @@ -975,7 +985,7 @@ row_compressor_init(CompressionSettings *settings, RowCompressor *row_compressor
.compressed_table = compressed_table,
.bistate = need_bistate ? GetBulkInsertState() : NULL,
.resultRelInfo = ts_catalog_open_indexes(compressed_table),
.n_input_columns = uncompressed_tuple_desc->natts,
.n_input_columns = RelationGetDescr(uncompressed_table)->natts,
.count_metadata_column_offset = AttrNumberGetAttrOffset(count_metadata_column_num),
.sequence_num_metadata_column_offset = AttrNumberGetAttrOffset(sequence_num_column_num),
.compressed_values = palloc(sizeof(Datum) * num_columns_in_compressed_table),
Expand All @@ -992,7 +1002,7 @@ row_compressor_init(CompressionSettings *settings, RowCompressor *row_compressor
memset(row_compressor->compressed_is_null, 1, sizeof(bool) * num_columns_in_compressed_table);

build_column_map(settings,
uncompressed_tuple_desc,
uncompressed_table,
compressed_table,
&row_compressor->per_column,
&row_compressor->uncompressed_col_to_compressed_col);
Expand Down
2 changes: 1 addition & 1 deletion tsl/src/compression/compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ extern void compress_chunk_populate_sort_info_for_column(CompressionSettings *se
Oid *sort_operator, Oid *collation,
bool *nulls_first);
extern void row_compressor_init(CompressionSettings *settings, RowCompressor *row_compressor,
TupleDesc uncompressed_tuple_desc, Relation compressed_table,
Relation uncompressed_table, Relation compressed_table,
int16 num_columns_in_compressed_table, bool need_bistate,
bool reset_sequence, int insert_options);
extern void row_compressor_reset(RowCompressor *row_compressor);
Expand Down
21 changes: 19 additions & 2 deletions tsl/src/compression/create.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,7 @@ compression_column_segment_metadata_name(int16 column_index, const char *type)
int ret;

Assert(column_index > 0);
ret =
snprintf(buf, NAMEDATALEN, COMPRESSION_COLUMN_METADATA_PREFIX "%s_%d", type, column_index);
ret = snprintf(buf, NAMEDATALEN, COMPRESSION_COLUMN_METADATA_PATTERN_V1, type, column_index);
if (ret < 0 || ret > NAMEDATALEN)
{
ereport(ERROR,
Expand All @@ -84,6 +83,24 @@ column_segment_max_name(int16 column_index)
COMPRESSION_COLUMN_METADATA_MAX_COLUMN_NAME);
}

int
compressed_column_metadata_attno(CompressionSettings *settings, Oid chunk_reloid,
AttrNumber chunk_attno, Oid compressed_reloid, char *metadata_type)
{
Assert(strcmp(metadata_type, "min") == 0 || strcmp(metadata_type, "max") == 0);

char *attname = get_attname(chunk_reloid, chunk_attno, /* missing_ok = */ false);
int16 orderby_pos = ts_array_position(settings->fd.orderby, attname);

if (orderby_pos != 0)
{
char *metadata_name = compression_column_segment_metadata_name(orderby_pos, metadata_type);
return get_attnum(compressed_reloid, metadata_name);
}

return InvalidAttrNumber;
}

/*
* return the columndef list for compressed hypertable.
* we do this by getting the source hypertable's attrs,
Expand Down
6 changes: 6 additions & 0 deletions tsl/src/compression/create.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
COMPRESSION_COLUMN_METADATA_PREFIX "sequence_num"
#define COMPRESSION_COLUMN_METADATA_MIN_COLUMN_NAME "min"
#define COMPRESSION_COLUMN_METADATA_MAX_COLUMN_NAME "max"
#define COMPRESSION_COLUMN_METADATA_PATTERN_V1 "_ts_meta_%s_%d"

bool tsl_process_compress_table(AlterTableCmd *cmd, Hypertable *ht,
WithClauseResult *with_clause_options);
Expand All @@ -27,3 +28,8 @@ Chunk *create_compress_chunk(Hypertable *compress_ht, Chunk *src_chunk, Oid tabl

char *column_segment_min_name(int16 column_index);
char *column_segment_max_name(int16 column_index);

typedef struct CompressionSettings CompressionSettings;
int compressed_column_metadata_attno(CompressionSettings *settings, Oid chunk_reloid,
AttrNumber chunk_attno, Oid compressed_reloid,
char *metadata_type);

0 comments on commit 402c1c3

Please sign in to comment.