Skip to content

Commit

Permalink
Add support for correlated constraints
Browse files Browse the repository at this point in the history
Allow users to specify a specific column to be used as a correlated
constraint using the add_dimension() API. We store such correlated
constraints in the dimensions related timescaledb catalog tables. The
"dimension" catalog table has been amended by adding a "type" column.
This column now explicitly stores the type: open, closed or correlated
as appropriate.

We create dimension_slice and chunk_constraint entries for chunks which
have correlated constraints on them. The dimension slice entry will
have -inf/+inf as start/end range initially for a given correlated
constraint and the chunk_constraint entry will refer back to this slice
entry.

This start/end range will be refreshed later. One of the entry points
is during compression for now.

We can thus store the min/max values for such correlated contraints
in these catalog tables at the per-chunk level. Note that correlated
constraints do not participate in partitioning of the data. Such a
correlated constraint will be used for chunk pruning if the WHERE
clause of a SQL query specifies ranges on such a column.

A "DROP COLUMN" on a column with a correlated constraint on it ends up
removing all relevant entries from the catalog tables.
  • Loading branch information
nikkhils committed May 1, 2024
1 parent acc73b9 commit 812bb75
Show file tree
Hide file tree
Showing 41 changed files with 1,474 additions and 179 deletions.
7 changes: 6 additions & 1 deletion sql/ddl_api.sql
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ CREATE OR REPLACE FUNCTION @extschema@.add_dimension(
number_partitions INTEGER = NULL,
chunk_time_interval ANYELEMENT = NULL::BIGINT,
partitioning_func REGPROC = NULL,
if_not_exists BOOLEAN = FALSE
if_not_exists BOOLEAN = FALSE,
correlated BOOLEAN = FALSE
) RETURNS TABLE(dimension_id INT, schema_name NAME, table_name NAME, column_name NAME, created BOOL)
AS '@MODULE_PATHNAME@', 'ts_dimension_add' LANGUAGE C VOLATILE;

Expand All @@ -160,6 +161,10 @@ CREATE OR REPLACE FUNCTION @extschema@.by_range(column_name NAME,
RETURNS _timescaledb_internal.dimension_info LANGUAGE C
AS '@MODULE_PATHNAME@', 'ts_range_dimension';

CREATE OR REPLACE FUNCTION @extschema@.by_correlation(column_name NAME)
RETURNS _timescaledb_internal.dimension_info LANGUAGE C
AS '@MODULE_PATHNAME@', 'ts_correlated_dimension';

CREATE OR REPLACE FUNCTION @extschema@.attach_tablespace(
tablespace NAME,
hypertable REGCLASS,
Expand Down
3 changes: 2 additions & 1 deletion sql/pre_install/tables.sql
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,14 @@ CREATE TABLE _timescaledb_catalog.dimension (
compress_interval_length bigint NULL,
integer_now_func_schema name NULL,
integer_now_func name NULL,
type "char",
-- table constraints
CONSTRAINT dimension_pkey PRIMARY KEY (id),
CONSTRAINT dimension_hypertable_id_column_name_key UNIQUE (hypertable_id, column_name),
CONSTRAINT dimension_check CHECK ((partitioning_func_schema IS NULL AND partitioning_func IS NULL) OR (partitioning_func_schema IS NOT NULL AND partitioning_func IS NOT NULL)),
CONSTRAINT dimension_check1 CHECK ((num_slices IS NULL AND interval_length IS NOT NULL) OR (num_slices IS NOT NULL AND interval_length IS NULL)),
CONSTRAINT dimension_check2 CHECK ((integer_now_func_schema IS NULL AND integer_now_func IS NULL) OR (integer_now_func_schema IS NOT NULL AND integer_now_func IS NOT NULL)),
CONSTRAINT dimension_interval_length_check CHECK (interval_length IS NULL OR interval_length > 0),
CONSTRAINT dimension_interval_length_check CHECK (interval_length IS NULL OR interval_length > 0 OR type = 'C'),
CONSTRAINT dimension_compress_interval_length_check CHECK (compress_interval_length IS NULL OR compress_interval_length > 0),
CONSTRAINT dimension_hypertable_id_fkey FOREIGN KEY (hypertable_id) REFERENCES _timescaledb_catalog.hypertable (id) ON DELETE CASCADE
);
Expand Down
130 changes: 130 additions & 0 deletions sql/updates/latest-dev.sql
Original file line number Diff line number Diff line change
Expand Up @@ -412,3 +412,133 @@ BEGIN
END LOOP;
END
$$;

DROP FUNCTION IF EXISTS @extschema@.add_dimension(
REGCLASS,
NAME,
INTEGER,
ANYELEMENT,
REGPROC,
BOOLEAN
);

CREATE FUNCTION @extschema@.add_dimension(
hypertable REGCLASS,
column_name NAME,
number_partitions INTEGER = NULL,
chunk_time_interval ANYELEMENT = NULL::BIGINT,
partitioning_func REGPROC = NULL,
if_not_exists BOOLEAN = FALSE,
correlated BOOLEAN = FALSE
) RETURNS TABLE(dimension_id INT, schema_name NAME, table_name NAME, column_name NAME, created BOOL)
AS '@MODULE_PATHNAME@', 'ts_dimension_add' LANGUAGE C VOLATILE;

CREATE FUNCTION @extschema@.by_correlation(column_name NAME)
RETURNS _timescaledb_internal.dimension_info LANGUAGE C
AS '@MODULE_PATHNAME@', 'ts_correlated_dimension';

--
-- Rebuild the catalog table `_timescaledb_catalog.dimension with type column
--

CREATE TABLE _timescaledb_internal._tmp_dimension
AS SELECT * from _timescaledb_catalog.dimension;

CREATE TABLE _timescaledb_internal.tmp_dimension_seq_value AS
SELECT last_value, is_called FROM _timescaledb_catalog.dimension_id_seq;

--drop foreign keys on dimension table
ALTER TABLE _timescaledb_catalog.dimension_slice DROP CONSTRAINT
dimension_slice_dimension_id_fkey;

--drop dependent views
DROP VIEW IF EXISTS timescaledb_information.chunks;
DROP VIEW IF EXISTS timescaledb_information.dimensions;
DROP VIEW IF EXISTS timescaledb_information.hypertable_compression_settings;

ALTER EXTENSION timescaledb DROP TABLE _timescaledb_catalog.dimension;
ALTER EXTENSION timescaledb DROP SEQUENCE _timescaledb_catalog.dimension_id_seq;
DROP TABLE _timescaledb_catalog.dimension;

CREATE TABLE _timescaledb_catalog.dimension (
id serial NOT NULL ,
hypertable_id integer NOT NULL,
column_name name NOT NULL,
column_type REGTYPE NOT NULL,
aligned boolean NOT NULL,
-- closed dimensions
num_slices smallint NULL,
partitioning_func_schema name NULL,
partitioning_func name NULL,
-- open dimensions (e.g., time)
interval_length bigint NULL,
-- compress interval is used by rollup procedure during compression
-- in order to merge multiple chunks into a single one
compress_interval_length bigint NULL,
integer_now_func_schema name NULL,
integer_now_func name NULL,
type "char",
-- table constraints
CONSTRAINT dimension_pkey PRIMARY KEY (id),
CONSTRAINT dimension_hypertable_id_column_name_key UNIQUE (hypertable_id, column_name),
CONSTRAINT dimension_check CHECK ((partitioning_func_schema IS NULL AND partitioning_func IS NULL) OR (partitioning_func_schema IS NOT NULL AND partitioning_func IS NOT NULL)),
CONSTRAINT dimension_check1 CHECK ((num_slices IS NULL AND interval_length IS NOT NULL) OR (num_slices IS NOT NULL AND interval_length IS NULL)),
CONSTRAINT dimension_check2 CHECK ((integer_now_func_schema IS NULL AND integer_now_func IS NULL) OR (integer_now_func_schema IS NOT NULL AND integer_now_func IS NOT NULL)),
CONSTRAINT dimension_interval_length_check CHECK (interval_length IS NULL OR interval_length > 0 OR type = 'C'),
CONSTRAINT dimension_compress_interval_length_check CHECK (compress_interval_length IS NULL OR compress_interval_length > 0),
CONSTRAINT dimension_hypertable_id_fkey FOREIGN KEY (hypertable_id) REFERENCES _timescaledb_catalog.hypertable (id) ON DELETE CASCADE
);

INSERT INTO _timescaledb_catalog.dimension
( id, hypertable_id, column_name, column_type,
aligned, num_slices, partitioning_func_schema,
partitioning_func, interval_length,
compress_interval_length,
integer_now_func_schema, integer_now_func,
type)
SELECT id, hypertable_id, column_name, column_type,
aligned, num_slices, partitioning_func_schema,
partitioning_func, interval_length,
compress_interval_length,
integer_now_func_schema, integer_now_func,
CASE WHEN interval_length IS NULL AND num_slices IS NOT NULL THEN
'c' -- closed dimension
WHEN interval_length IS NOT NULL AND num_slices is NULL THEN
'o' -- open dimension
ELSE
'a' -- any.. This should never happen
END as type
FROM _timescaledb_internal._tmp_dimension;

-- Check that there's no entry with type == 'a'
DO $$
DECLARE
count_var INTEGER;
BEGIN
SELECT count(*) FROM _timescaledb_catalog.dimension INTO count_var WHERE
type = 'a';
IF count_var != 0 THEN
RAISE EXCEPTION 'invalid dimension entry found!';
END IF;
END
$$;

ALTER SEQUENCE _timescaledb_catalog.dimension_id_seq OWNED BY _timescaledb_catalog.dimension.id;
SELECT setval('_timescaledb_catalog.dimension_id_seq', last_value, is_called) FROM _timescaledb_internal.tmp_dimension_seq_value;

SELECT pg_catalog.pg_extension_config_dump('_timescaledb_catalog.dimension', '');
SELECT pg_catalog.pg_extension_config_dump(pg_get_serial_sequence('_timescaledb_catalog.dimension', 'id'), '');

--add the foreign key constraints
ALTER TABLE _timescaledb_catalog.dimension_slice ADD CONSTRAINT
dimension_slice_dimension_id_fkey FOREIGN KEY (dimension_id)
REFERENCES _timescaledb_catalog.dimension(id) ON DELETE CASCADE;

--cleanup
DROP TABLE _timescaledb_internal._tmp_dimension;
DROP TABLE _timescaledb_internal.tmp_dimension_seq_value;

GRANT SELECT ON _timescaledb_catalog.dimension_id_seq TO PUBLIC;
GRANT SELECT ON _timescaledb_catalog.dimension TO PUBLIC;

-- end recreate _timescaledb_catalog.dimension table --
106 changes: 105 additions & 1 deletion sql/updates/reverse-dev.sql
Original file line number Diff line number Diff line change
Expand Up @@ -295,5 +295,109 @@ SET
WHERE
id = 2;


DROP PROCEDURE IF EXISTS _timescaledb_functions.cagg_migrate_to_time_bucket(cagg REGCLASS);

DROP FUNCTION IF EXISTS @extschema@.add_dimension(
REGCLASS,
NAME,
INTEGER,
ANYELEMENT,
REGPROC,
BOOLEAN,
BOOLEAN
);

CREATE FUNCTION @extschema@.add_dimension(
hypertable REGCLASS,
column_name NAME,
number_partitions INTEGER = NULL,
chunk_time_interval ANYELEMENT = NULL::BIGINT,
partitioning_func REGPROC = NULL,
if_not_exists BOOLEAN = FALSE
) RETURNS TABLE(dimension_id INT, schema_name NAME, table_name NAME, column_name NAME, created BOOL)
AS '@MODULE_PATHNAME@', 'ts_dimension_add' LANGUAGE C VOLATILE;

DROP FUNCTION IF EXISTS @extschema@.by_correlation(
NAME
);

-- Recreate _timescaledb_catalog.dimension table without the type column --
CREATE TABLE _timescaledb_internal._tmp_dimension
AS SELECT * from _timescaledb_catalog.dimension;

CREATE TABLE _timescaledb_internal.tmp_dimension_seq_value AS
SELECT last_value, is_called FROM _timescaledb_catalog.dimension_id_seq;

--drop foreign keys on dimension table
ALTER TABLE _timescaledb_catalog.dimension_slice DROP CONSTRAINT
dimension_slice_dimension_id_fkey;

--drop dependent views
DROP VIEW IF EXISTS timescaledb_information.chunks;
DROP VIEW IF EXISTS timescaledb_information.dimensions;
DROP VIEW IF EXISTS timescaledb_information.hypertable_compression_settings;

ALTER EXTENSION timescaledb DROP TABLE _timescaledb_catalog.dimension;
ALTER EXTENSION timescaledb DROP SEQUENCE _timescaledb_catalog.dimension_id_seq;
DROP TABLE _timescaledb_catalog.dimension;

CREATE TABLE _timescaledb_catalog.dimension (
id serial NOT NULL ,
hypertable_id integer NOT NULL,
column_name name NOT NULL,
column_type REGTYPE NOT NULL,
aligned boolean NOT NULL,
-- closed dimensions
num_slices smallint NULL,
partitioning_func_schema name NULL,
partitioning_func name NULL,
-- open dimensions (e.g., time)
interval_length bigint NULL,
-- compress interval is used by rollup procedure during compression
-- in order to merge multiple chunks into a single one
compress_interval_length bigint NULL,
integer_now_func_schema name NULL,
integer_now_func name NULL,
-- table constraints
CONSTRAINT dimension_pkey PRIMARY KEY (id),
CONSTRAINT dimension_hypertable_id_column_name_key UNIQUE (hypertable_id, column_name),
CONSTRAINT dimension_check CHECK ((partitioning_func_schema IS NULL AND partitioning_func IS NULL) OR (partitioning_func_schema IS NOT NULL AND partitioning_func IS NOT NULL)),
CONSTRAINT dimension_check1 CHECK ((num_slices IS NULL AND interval_length IS NOT NULL) OR (num_slices IS NOT NULL AND interval_length IS NULL)),
CONSTRAINT dimension_check2 CHECK ((integer_now_func_schema IS NULL AND integer_now_func IS NULL) OR (integer_now_func_schema IS NOT NULL AND integer_now_func IS NOT NULL)),
CONSTRAINT dimension_interval_length_check CHECK (interval_length IS NULL OR interval_length > 0),
CONSTRAINT dimension_compress_interval_length_check CHECK (compress_interval_length IS NULL OR compress_interval_length > 0),
CONSTRAINT dimension_hypertable_id_fkey FOREIGN KEY (hypertable_id) REFERENCES _timescaledb_catalog.hypertable (id) ON DELETE CASCADE
);

INSERT INTO _timescaledb_catalog.dimension
( id, hypertable_id, column_name, column_type,
aligned, num_slices, partitioning_func_schema,
partitioning_func, interval_length,
compress_interval_length,
integer_now_func_schema, integer_now_func)
SELECT id, hypertable_id, column_name, column_type,
aligned, num_slices, partitioning_func_schema,
partitioning_func, interval_length,
compress_interval_length,
integer_now_func_schema, integer_now_func
FROM _timescaledb_internal._tmp_dimension;

ALTER SEQUENCE _timescaledb_catalog.dimension_id_seq OWNED BY _timescaledb_catalog.dimension.id;
SELECT setval('_timescaledb_catalog.dimension_id_seq', last_value, is_called) FROM _timescaledb_internal.tmp_dimension_seq_value;

SELECT pg_catalog.pg_extension_config_dump('_timescaledb_catalog.dimension', '');
SELECT pg_catalog.pg_extension_config_dump(pg_get_serial_sequence('_timescaledb_catalog.dimension', 'id'), '');

--add the foreign key constraints
ALTER TABLE _timescaledb_catalog.dimension_slice ADD CONSTRAINT
dimension_slice_dimension_id_fkey FOREIGN KEY (dimension_id)
REFERENCES _timescaledb_catalog.dimension(id) ON DELETE CASCADE;

--cleanup
DROP TABLE _timescaledb_internal._tmp_dimension;
DROP TABLE _timescaledb_internal.tmp_dimension_seq_value;

GRANT SELECT ON _timescaledb_catalog.dimension_id_seq TO PUBLIC;
GRANT SELECT ON _timescaledb_catalog.dimension TO PUBLIC;

-- end recreate _timescaledb_catalog.dimension table --
20 changes: 17 additions & 3 deletions src/chunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -841,14 +841,16 @@ chunk_create_object(const Hypertable *ht, Hypercube *cube, const char *schema_na
const char *table_name, const char *prefix, int32 chunk_id)
{
const Hyperspace *hs = ht->space;
const Hyperspace *ss = ht->correlated_space;
Chunk *chunk;
const char relkind = RELKIND_RELATION;

if (NULL == schema_name || schema_name[0] == '\0')
schema_name = NameStr(ht->fd.associated_schema_name);

/* Create a new chunk based on the hypercube */
chunk = ts_chunk_create_base(chunk_id, hs->num_dimensions, relkind);
chunk =
ts_chunk_create_base(chunk_id, hs->num_dimensions + (ss ? ss->num_dimensions : 0), relkind);

chunk->fd.hypertable_id = hs->hypertable_id;
chunk->cube = cube;
Expand Down Expand Up @@ -1091,6 +1093,9 @@ chunk_create_from_hypercube_after_lock(const Hypertable *ht, Hypercube *cube,
prefix,
get_next_chunk_id());

/* Insert any new correlated constraint slices into metadata */
ts_correlated_constraints_dimension_slice_insert(ht, chunk);

chunk_add_constraints(chunk);
chunk_insert_into_metadata_after_lock(chunk);
chunk_create_table_constraints(ht, chunk);
Expand Down Expand Up @@ -3374,8 +3379,17 @@ ts_chunk_set_unordered(Chunk *chunk)
bool
ts_chunk_set_partial(Chunk *chunk)
{
Cache *hcache;
bool set_status;
Hypertable *htable =
ts_hypertable_cache_get_cache_and_entry(chunk->hypertable_relid, CACHE_FLAG_NONE, &hcache);
Assert(ts_chunk_is_compressed(chunk));
return ts_chunk_add_status(chunk, CHUNK_STATUS_COMPRESSED_PARTIAL);
set_status = ts_chunk_add_status(chunk, CHUNK_STATUS_COMPRESSED_PARTIAL);
/* reset any correlated constraints after the partial uncompressed data */
if (htable->correlated_space)
ts_correlated_constraints_dimension_slice_calculate_update(htable, chunk, true);
ts_cache_release(hcache);
return set_status;
}

/* No inserts, updates, and deletes are permitted on a frozen chunk.
Expand Down Expand Up @@ -4733,7 +4747,7 @@ ts_chunk_merge_on_dimension(const Hypertable *ht, Chunk *chunk, const Chunk *mer
ts_dimension_slice_insert(new_slice);
}

ts_chunk_constraint_update_slice_id(chunk->fd.id, slice->fd.id, new_slice->fd.id);
ts_chunk_constraint_update_slice_id_name(chunk->fd.id, slice->fd.id, new_slice->fd.id, NULL);
ChunkConstraints *ccs = ts_chunk_constraints_alloc(1, CurrentMemoryContext);
ScanIterator iterator =
ts_scan_iterator_create(CHUNK_CONSTRAINT, AccessShareLock, CurrentMemoryContext);
Expand Down
29 changes: 22 additions & 7 deletions src/chunk_adaptive.c
Original file line number Diff line number Diff line change
Expand Up @@ -188,10 +188,20 @@ minmax_indexscan(Relation rel, Relation idxrel, AttrNumber attnum, Datum minmax[
TupleTableSlot *slot = table_slot_create(rel, NULL);
bool nulls[2] = { true, true };
int i;
ScanDirection directions[2] = { ForwardScanDirection /* min */,
BackwardScanDirection /* max */ };
int16 option = idxrel->rd_indoption[0];
bool index_orderby_asc = ((option & INDOPTION_DESC) == 0);

/* default index ordering is ASC, check if that's not the case */
if (!index_orderby_asc)
{
directions[0] = BackwardScanDirection;
directions[1] = ForwardScanDirection;
}

for (i = 0; i < 2; i++)
{
static ScanDirection directions[2] = { BackwardScanDirection, ForwardScanDirection };
bool found_tuple;
bool isnull;

Expand Down Expand Up @@ -266,8 +276,9 @@ table_has_minmax_index(Oid relid, Oid atttype, Name attname, AttrNumber attnum)
*
* Returns true iff min and max is found, otherwise false.
*/
static bool
chunk_get_minmax(Oid relid, Oid atttype, AttrNumber attnum, Datum minmax[2])
bool
ts_chunk_get_minmax(Oid relid, Oid atttype, AttrNumber attnum, const char *call_context,
Datum minmax[2])
{
Relation rel = table_open(relid, AccessShareLock);
NameData attname;
Expand All @@ -279,11 +290,11 @@ chunk_get_minmax(Oid relid, Oid atttype, AttrNumber attnum, Datum minmax[2])
if (res == MINMAX_NO_INDEX)
{
ereport(WARNING,
(errmsg("no index on \"%s\" found for adaptive chunking on chunk \"%s\"",
(errmsg("no index on \"%s\" found for %s on chunk \"%s\"",
NameStr(attname),
call_context,
get_rel_name(relid)),
errdetail("Adaptive chunking works best with an index on the dimension being "
"adapted.")));
errdetail("%s works best with an index on the dimension.", call_context)));

res = minmax_heapscan(rel, atttype, attnum, minmax);
}
Expand Down Expand Up @@ -469,7 +480,11 @@ ts_calculate_chunk_interval(PG_FUNCTION_ARGS)

slice_interval = slice->fd.range_end - slice->fd.range_start;

if (chunk_get_minmax(chunk->table_id, dim->fd.column_type, attno, minmax))
if (ts_chunk_get_minmax(chunk->table_id,
dim->fd.column_type,
attno,
"adaptive chunking",
minmax))
{
int64 min = ts_time_value_to_internal(minmax[0], dim->fd.column_type);
int64 max = ts_time_value_to_internal(minmax[1], dim->fd.column_type);
Expand Down

0 comments on commit 812bb75

Please sign in to comment.