Skip to content

Commit

Permalink
Add catalog entries for correlated constraint
Browse files Browse the repository at this point in the history
Create dimension_slice and chunk_constraint entries for chunks which
have correlated constraints on them. The dimension slice entry will
have -inf/+inf as start/end range initially for a given correlated
constraint and the chunk_constraint entry will refer back to this slice
entry.

This start/end range will be refreshed later. One of the entry points
is during compression for now.
  • Loading branch information
nikkhils committed Mar 8, 2024
1 parent 2d4c8be commit 97cc717
Show file tree
Hide file tree
Showing 32 changed files with 532 additions and 74 deletions.
89 changes: 87 additions & 2 deletions sql/updates/latest-dev.sql
Original file line number Diff line number Diff line change
Expand Up @@ -182,5 +182,90 @@ CREATE OR REPLACE FUNCTION @extschema@.add_dimension(
) RETURNS TABLE(dimension_id INT, schema_name NAME, table_name NAME, column_name NAME, created BOOL)
AS '@MODULE_PATHNAME@', 'ts_dimension_add' LANGUAGE C VOLATILE;

ALTER TABLE _timescaledb_catalog.dimension ADD COLUMN correlated BOOLEAN;
UPDATE _timescaledb_catalog.dimension SET correlated = FALSE;

--
-- Rebuild the catalog table `_timescaledb_catalog.dimension with correlated column
--

CREATE TABLE _timescaledb_internal.dimension_tmp
AS SELECT * from _timescaledb_catalog.dimension;

CREATE TABLE _timescaledb_internal.tmp_dimension_seq_value AS
SELECT last_value, is_called FROM _timescaledb_catalog.dimension_id_seq;

--drop foreign keys on dimension table
ALTER TABLE _timescaledb_catalog.dimension_slice DROP CONSTRAINT
dimension_slice_dimension_id_fkey;

--drop dependent views
DROP VIEW IF EXISTS timescaledb_information.chunks;
DROP VIEW IF EXISTS timescaledb_information.dimensions;
DROP VIEW IF EXISTS timescaledb_information.hypertable_compression_settings;

ALTER EXTENSION timescaledb DROP TABLE _timescaledb_catalog.dimension;
ALTER EXTENSION timescaledb DROP SEQUENCE _timescaledb_catalog.dimension_id_seq;
DROP TABLE _timescaledb_catalog.dimension;

CREATE TABLE _timescaledb_catalog.dimension (
id serial NOT NULL ,
hypertable_id integer NOT NULL,
column_name name NOT NULL,
column_type REGTYPE NOT NULL,
aligned boolean NOT NULL,
-- closed dimensions
num_slices smallint NULL,
partitioning_func_schema name NULL,
partitioning_func name NULL,
-- open dimensions (e.g., time)
interval_length bigint NULL,
-- compress interval is used by rollup procedure during compression
-- in order to merge multiple chunks into a single one
compress_interval_length bigint NULL,
integer_now_func_schema name NULL,
integer_now_func name NULL,
correlated boolean NOT NULL,
-- table constraints
CONSTRAINT dimension_pkey PRIMARY KEY (id),
CONSTRAINT dimension_hypertable_id_column_name_key UNIQUE (hypertable_id, column_name),
CONSTRAINT dimension_check CHECK ((partitioning_func_schema IS NULL AND partitioning_func IS NULL) OR (partitioning_func_schema IS NOT NULL AND partitioning_func IS NOT NULL)),
CONSTRAINT dimension_check1 CHECK ((num_slices IS NULL AND interval_length IS NOT NULL) OR (num_slices IS NOT NULL AND interval_length IS NULL)),
CONSTRAINT dimension_check2 CHECK ((integer_now_func_schema IS NULL AND integer_now_func IS NULL) OR (integer_now_func_schema IS NOT NULL AND integer_now_func IS NOT NULL)),
CONSTRAINT dimension_interval_length_check CHECK (interval_length IS NULL OR interval_length > 0 OR correlated IS true),
CONSTRAINT dimension_compress_interval_length_check CHECK (compress_interval_length IS NULL OR compress_interval_length > 0),
CONSTRAINT dimension_hypertable_id_fkey FOREIGN KEY (hypertable_id) REFERENCES _timescaledb_catalog.hypertable (id) ON DELETE CASCADE
);

INSERT INTO _timescaledb_catalog.dimension
( id, hypertable_id, column_name, column_type,
aligned, num_slices, partitioning_func_schema,
partitioning_func, interval_length,
compress_interval_length,
integer_now_func_schema, integer_now_func,
correlated)
SELECT id, hypertable_id, column_name, column_type,
aligned, num_slices, partitioning_func_schema,
partitioning_func, interval_length,
compress_interval_length,
integer_now_func_schema, integer_now_func,
false AS correlated
FROM _timescaledb_internal.dimension_tmp;

ALTER SEQUENCE _timescaledb_catalog.dimension_id_seq OWNED BY _timescaledb_catalog.dimension.id;
SELECT setval('_timescaledb_catalog.dimension_id_seq', last_value, is_called) FROM _timescaledb_internal.tmp_dimension_seq_value;

SELECT pg_catalog.pg_extension_config_dump('_timescaledb_catalog.dimension', '');
SELECT pg_catalog.pg_extension_config_dump(pg_get_serial_sequence('_timescaledb_catalog.dimension', 'id'), '');

--add the foreign key constraints
ALTER TABLE _timescaledb_catalog.dimension_slice ADD CONSTRAINT
dimension_slice_dimension_id_fkey FOREIGN KEY (dimension_id)
REFERENCES _timescaledb_catalog.dimension(id) ON DELETE CASCADE;

--cleanup
DROP TABLE _timescaledb_internal.dimension_tmp;
DROP TABLE _timescaledb_internal.tmp_dimension_seq_value;

GRANT SELECT ON _timescaledb_catalog.dimension_id_seq TO PUBLIC;
GRANT SELECT ON _timescaledb_catalog.dimension TO PUBLIC;

-- end recreate _timescaledb_catalog.dimension table --
81 changes: 79 additions & 2 deletions sql/updates/reverse-dev.sql
Original file line number Diff line number Diff line change
Expand Up @@ -90,5 +90,82 @@ CREATE OR REPLACE FUNCTION @extschema@.add_dimension(
) RETURNS TABLE(dimension_id INT, schema_name NAME, table_name NAME, column_name NAME, created BOOL)
AS '@MODULE_PATHNAME@', 'ts_dimension_add' LANGUAGE C VOLATILE;

DELETE FROM _timescaledb_catalog.dimension WHERE correlated IS TRUE;
ALTER TABLE _timescaledb_catalog.dimension DROP COLUMN correlated;
-- Recreate _timescaledb_catalog.dimension table without the crrelated column --
CREATE TABLE _timescaledb_internal.dimension_tmp
AS SELECT * from _timescaledb_catalog.dimension;

CREATE TABLE _timescaledb_internal.tmp_dimension_seq_value AS
SELECT last_value, is_called FROM _timescaledb_catalog.dimension_id_seq;

--drop foreign keys on dimension table
ALTER TABLE _timescaledb_catalog.dimension_slice DROP CONSTRAINT
dimension_slice_dimension_id_fkey;

--drop dependent views
DROP VIEW IF EXISTS timescaledb_information.chunks;
DROP VIEW IF EXISTS timescaledb_information.dimensions;
DROP VIEW IF EXISTS timescaledb_information.hypertable_compression_settings;

ALTER EXTENSION timescaledb DROP TABLE _timescaledb_catalog.dimension;
ALTER EXTENSION timescaledb DROP SEQUENCE _timescaledb_catalog.dimension_id_seq;
DROP TABLE _timescaledb_catalog.dimension;

CREATE TABLE _timescaledb_catalog.dimension (
id serial NOT NULL ,
hypertable_id integer NOT NULL,
column_name name NOT NULL,
column_type REGTYPE NOT NULL,
aligned boolean NOT NULL,
-- closed dimensions
num_slices smallint NULL,
partitioning_func_schema name NULL,
partitioning_func name NULL,
-- open dimensions (e.g., time)
interval_length bigint NULL,
-- compress interval is used by rollup procedure during compression
-- in order to merge multiple chunks into a single one
compress_interval_length bigint NULL,
integer_now_func_schema name NULL,
integer_now_func name NULL,
-- table constraints
CONSTRAINT dimension_pkey PRIMARY KEY (id),
CONSTRAINT dimension_hypertable_id_column_name_key UNIQUE (hypertable_id, column_name),
CONSTRAINT dimension_check CHECK ((partitioning_func_schema IS NULL AND partitioning_func IS NULL) OR (partitioning_func_schema IS NOT NULL AND partitioning_func IS NOT NULL)),
CONSTRAINT dimension_check1 CHECK ((num_slices IS NULL AND interval_length IS NOT NULL) OR (num_slices IS NOT NULL AND interval_length IS NULL)),
CONSTRAINT dimension_check2 CHECK ((integer_now_func_schema IS NULL AND integer_now_func IS NULL) OR (integer_now_func_schema IS NOT NULL AND integer_now_func IS NOT NULL)),
CONSTRAINT dimension_interval_length_check CHECK (interval_length IS NULL OR interval_length > 0),
CONSTRAINT dimension_hypertable_id_fkey FOREIGN KEY (hypertable_id) REFERENCES _timescaledb_catalog.hypertable (id) ON DELETE CASCADE
);

INSERT INTO _timescaledb_catalog.dimension
( id, hypertable_id, column_name, column_type,
aligned, num_slices, partitioning_func_schema,
partitioning_func, interval_length,
compress_interval_length,
integer_now_func_schema, integer_now_func)
SELECT id, hypertable_id, column_name, column_type,
aligned, num_slices, partitioning_func_schema,
partitioning_func, interval_length,
compress_interval_length,
integer_now_func_schema, integer_now_func
FROM _timescaledb_internal.dimension_tmp;

ALTER SEQUENCE _timescaledb_catalog.dimension_id_seq OWNED BY _timescaledb_catalog.dimension.id;
SELECT setval('_timescaledb_catalog.dimension_id_seq', last_value, is_called) FROM _timescaledb_internal.tmp_dimension_seq_value;

SELECT pg_catalog.pg_extension_config_dump('_timescaledb_catalog.dimension', '');
SELECT pg_catalog.pg_extension_config_dump(pg_get_serial_sequence('_timescaledb_catalog.dimension', 'id'), '');

--add the foreign key constraints
ALTER TABLE _timescaledb_catalog.dimension_slice ADD CONSTRAINT
dimension_slice_dimension_id_fkey FOREIGN KEY (dimension_id)
REFERENCES _timescaledb_catalog.dimension(id) ON DELETE CASCADE;

--cleanup
DROP TABLE _timescaledb_internal.dimension_tmp;
DROP TABLE _timescaledb_internal.tmp_dimension_seq_value;

GRANT SELECT ON _timescaledb_catalog.dimension_id_seq TO PUBLIC;
GRANT SELECT ON _timescaledb_catalog.dimension TO PUBLIC;

-- end recreate _timescaledb_catalog.dimension table --
7 changes: 6 additions & 1 deletion src/chunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -841,14 +841,16 @@ chunk_create_object(const Hypertable *ht, Hypercube *cube, const char *schema_na
const char *table_name, const char *prefix, int32 chunk_id)
{
const Hyperspace *hs = ht->space;
const Hyperspace *ss = ht->correlated_space;
Chunk *chunk;
const char relkind = RELKIND_RELATION;

if (NULL == schema_name || schema_name[0] == '\0')
schema_name = NameStr(ht->fd.associated_schema_name);

/* Create a new chunk based on the hypercube */
chunk = ts_chunk_create_base(chunk_id, hs->num_dimensions, relkind);
chunk =
ts_chunk_create_base(chunk_id, hs->num_dimensions + (ss ? ss->num_dimensions : 0), relkind);

chunk->fd.hypertable_id = hs->hypertable_id;
chunk->cube = cube;
Expand Down Expand Up @@ -1091,6 +1093,9 @@ chunk_create_from_hypercube_after_lock(const Hypertable *ht, Hypercube *cube,
prefix,
get_next_chunk_id());

/* Insert any new correlated constraint slices into metadata */
ts_correlated_constraints_dimension_slice_insert(ht, chunk);

chunk_add_constraints(chunk);
chunk_insert_into_metadata_after_lock(chunk);
chunk_create_table_constraints(ht, chunk);
Expand Down
17 changes: 11 additions & 6 deletions src/chunk_adaptive.c
Original file line number Diff line number Diff line change
Expand Up @@ -266,8 +266,9 @@ table_has_minmax_index(Oid relid, Oid atttype, Name attname, AttrNumber attnum)
*
* Returns true iff min and max is found, otherwise false.
*/
static bool
chunk_get_minmax(Oid relid, Oid atttype, AttrNumber attnum, Datum minmax[2])
bool
ts_chunk_get_minmax(Oid relid, Oid atttype, AttrNumber attnum, const char *call_context,
Datum minmax[2])
{
Relation rel = table_open(relid, AccessShareLock);
NameData attname;
Expand All @@ -279,11 +280,11 @@ chunk_get_minmax(Oid relid, Oid atttype, AttrNumber attnum, Datum minmax[2])
if (res == MINMAX_NO_INDEX)
{
ereport(WARNING,
(errmsg("no index on \"%s\" found for adaptive chunking on chunk \"%s\"",
(errmsg("no index on \"%s\" found for %s on chunk \"%s\"",
NameStr(attname),
call_context,
get_rel_name(relid)),
errdetail("Adaptive chunking works best with an index on the dimension being "
"adapted.")));
errdetail("%s works best with an index on the dimension.", call_context)));

res = minmax_heapscan(rel, atttype, attnum, minmax);
}
Expand Down Expand Up @@ -469,7 +470,11 @@ ts_calculate_chunk_interval(PG_FUNCTION_ARGS)

slice_interval = slice->fd.range_end - slice->fd.range_start;

if (chunk_get_minmax(chunk->table_id, dim->fd.column_type, attno, minmax))
if (ts_chunk_get_minmax(chunk->table_id,
dim->fd.column_type,
attno,
"adaptive chunking",
minmax))
{
int64 min = ts_time_value_to_internal(minmax[0], dim->fd.column_type);
int64 max = ts_time_value_to_internal(minmax[1], dim->fd.column_type);
Expand Down
2 changes: 2 additions & 0 deletions src/chunk_adaptive.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ typedef struct ChunkSizingInfo

extern void ts_chunk_adaptive_sizing_info_validate(ChunkSizingInfo *info);
extern void ts_chunk_sizing_func_validate(regproc func, ChunkSizingInfo *info);
extern bool ts_chunk_get_minmax(Oid relid, Oid atttype, AttrNumber attnum, const char *call_context,
Datum minmax[2]);
extern TSDLLEXPORT ChunkSizingInfo *ts_chunk_sizing_info_get_default_disabled(Oid table_relid);

extern TSDLLEXPORT int64 ts_chunk_calculate_initial_chunk_target_size(void);
48 changes: 36 additions & 12 deletions src/chunk_constraint.c
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,9 @@ chunk_constraints_expand(ChunkConstraints *ccs, int16 new_capacity)
}

static void
chunk_constraint_dimension_choose_name(Name dst, int32 dimension_slice_id)
chunk_constraint_dimension_choose_name(Name dst, const char *prefix, int32 dimension_slice_id)
{
snprintf(NameStr(*dst), NAMEDATALEN, "constraint_%d", dimension_slice_id);
snprintf(NameStr(*dst), NAMEDATALEN, "%sconstraint_%d", prefix, dimension_slice_id);
}

static void
Expand All @@ -115,20 +115,27 @@ chunk_constraint_choose_name(Name dst, const char *hypertable_constraint_name, i

ChunkConstraint *
ts_chunk_constraints_add(ChunkConstraints *ccs, int32 chunk_id, int32 dimension_slice_id,
const char *constraint_name, const char *hypertable_constraint_name)
const char *constraint_name, const char *hypertable_constraint_name,
bool correlated)
{
ChunkConstraint *cc;

chunk_constraints_expand(ccs, ccs->num_constraints + 1);
cc = &ccs->constraints[ccs->num_constraints++];
cc->fd.chunk_id = chunk_id;
cc->fd.dimension_slice_id = dimension_slice_id;
cc->correlated = correlated;

if (NULL == constraint_name)
{
if (is_dimension_constraint(cc))
if (is_dimension_constraint(cc) || correlated)
{
/*
* for correlated constraints we choose a prefix of "_$CC_" to help
* us identify it on re-reading from the catalog.
*/
chunk_constraint_dimension_choose_name(&cc->fd.constraint_name,
correlated ? CC_DIM_PREFIX : "",
cc->fd.dimension_slice_id);
namestrcpy(&cc->fd.hypertable_constraint_name, "");
}
Expand Down Expand Up @@ -163,7 +170,7 @@ chunk_constraint_fill_tuple_values(const ChunkConstraint *cc, Datum values[Natts
values[AttrNumberGetAttrOffset(Anum_chunk_constraint_hypertable_constraint_name)] =
NameGetDatum(&cc->fd.hypertable_constraint_name);

if (is_dimension_constraint(cc))
if (is_dimension_constraint(cc) || cc->correlated)
nulls[AttrNumberGetAttrOffset(Anum_chunk_constraint_hypertable_constraint_name)] = true;
else
nulls[AttrNumberGetAttrOffset(Anum_chunk_constraint_dimension_slice_id)] = true;
Expand Down Expand Up @@ -229,7 +236,7 @@ ts_chunk_constraints_add_from_tuple(ChunkConstraints *ccs, const TupleInfo *ti)
int32 dimension_slice_id;
Name constraint_name;
Name hypertable_constraint_name;
bool should_free;
bool should_free, correlated = false;
HeapTuple tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free);
MemoryContext oldcxt;

Expand All @@ -253,12 +260,17 @@ ts_chunk_constraints_add_from_tuple(ChunkConstraints *ccs, const TupleInfo *ti)
hypertable_constraint_name = DatumGetName(DirectFunctionCall1(namein, CStringGetDatum("")));
}

/* check if it's a correlated constraint */
if (strncmp(CC_DIM_PREFIX, NameStr(*constraint_name), strlen(CC_DIM_PREFIX)) == 0)
correlated = true;

constraints = ts_chunk_constraints_add(ccs,
DatumGetInt32(values[AttrNumberGetAttrOffset(
Anum_chunk_constraint_chunk_id)]),
dimension_slice_id,
NameStr(*constraint_name),
NameStr(*hypertable_constraint_name));
NameStr(*hypertable_constraint_name),
correlated);

MemoryContextSwitchTo(oldcxt);

Expand Down Expand Up @@ -492,7 +504,7 @@ ts_chunk_constraints_create(const Hypertable *ht, const Chunk *chunk)
if (constr != NULL)
newconstrs = lappend(newconstrs, constr);
}
else
else if (!cc->correlated)
{
create_non_dimensional_constraint(cc,
chunk->table_id,
Expand Down Expand Up @@ -781,7 +793,7 @@ ts_chunk_constraints_add_dimension_constraints(ChunkConstraints *ccs, int32 chun
int i;

for (i = 0; i < cube->num_slices; i++)
ts_chunk_constraints_add(ccs, chunk_id, cube->slices[i]->fd.id, NULL, NULL);
ts_chunk_constraints_add(ccs, chunk_id, cube->slices[i]->fd.id, NULL, NULL, false);

return cube->num_slices;
}
Expand All @@ -802,7 +814,17 @@ chunk_constraint_add(HeapTuple constraint_tuple, void *arg)

if (chunk_constraint_need_on_chunk(cc->chunk_relkind, constraint))
{
ts_chunk_constraints_add(cc->ccs, cc->chunk_id, 0, NULL, NameStr(constraint->conname));
bool correlated = false;

/* check if it's a correlated constraint */
if (strncmp(CC_DIM_PREFIX, NameStr(constraint->conname), strlen(CC_DIM_PREFIX)) == 0)
correlated = true;
ts_chunk_constraints_add(cc->ccs,
cc->chunk_id,
0,
NULL,
NameStr(constraint->conname),
correlated);
return CONSTR_PROCESSED;
}

Expand Down Expand Up @@ -835,7 +857,8 @@ chunk_constraint_add_check(HeapTuple constraint_tuple, void *arg)
cc->chunk_id,
0,
NameStr(constraint->conname),
NameStr(constraint->conname));
NameStr(constraint->conname),
false);
return CONSTR_PROCESSED;
}

Expand Down Expand Up @@ -874,7 +897,8 @@ ts_chunk_constraint_create_on_chunk(const Hypertable *ht, const Chunk *chunk, Oi
chunk->fd.id,
0,
NULL,
NameStr(con->conname));
NameStr(con->conname),
false);

ts_chunk_constraint_insert(cc);
create_non_dimensional_constraint(cc,
Expand Down

0 comments on commit 97cc717

Please sign in to comment.