Skip to content

Commit

Permalink
Modify dimension slice catalog update API
Browse files Browse the repository at this point in the history
Use the same logic as PR 6773 while updating dimension slice catalog
tuples. PR 6773 addresses chunk catalog updates. We first lock the
tuple and then modify the values and update the locked tuple. Replace
ts_dimension_slice_update_by_id with field specific APIs and use
dimension_slice_update_catalog_tuple calls consistently.
  • Loading branch information
antekresic committed May 13, 2024
1 parent 543c2c2 commit 32723f2
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 39 deletions.
181 changes: 143 additions & 38 deletions src/dimension_slice.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,51 @@ dimension_slice_from_slot(TupleTableSlot *slot)
return slice;
}

static HeapTuple
dimension_slice_formdata_make_tuple(const FormData_dimension_slice *fd, TupleDesc desc)
{
Datum values[Natts_dimension_slice];
bool nulls[Natts_dimension_slice] = { false };

memset(values, 0, sizeof(Datum) * Natts_dimension_slice);

values[AttrNumberGetAttrOffset(Anum_dimension_slice_id)] = Int32GetDatum(fd->id);
values[AttrNumberGetAttrOffset(Anum_dimension_slice_dimension_id)] =
Int32GetDatum(fd->dimension_id);
values[AttrNumberGetAttrOffset(Anum_dimension_slice_range_start)] =
Int32GetDatum(fd->range_start);
values[AttrNumberGetAttrOffset(Anum_dimension_slice_range_end)] = Int32GetDatum(fd->range_end);

return heap_form_tuple(desc, values, nulls);
}

static inline void
dimension_slice_formdata_fill(FormData_dimension_slice *fd, const TupleInfo *ti)
{
bool nulls[Natts_dimension_slice];
Datum values[Natts_dimension_slice];
bool should_free;
HeapTuple tuple;

tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free);
heap_deform_tuple(tuple, ts_scanner_get_tupledesc(ti), values, nulls);

Assert(!nulls[AttrNumberGetAttrOffset(Anum_dimension_slice_id)]);
Assert(!nulls[AttrNumberGetAttrOffset(Anum_dimension_slice_dimension_id)]);
Assert(!nulls[AttrNumberGetAttrOffset(Anum_dimension_slice_range_start)]);
Assert(!nulls[AttrNumberGetAttrOffset(Anum_dimension_slice_range_end)]);

fd->id = DatumGetInt32(values[AttrNumberGetAttrOffset(Anum_dimension_slice_id)]);
fd->dimension_id =
DatumGetInt32(values[AttrNumberGetAttrOffset(Anum_dimension_slice_dimension_id)]);
fd->range_start =
DatumGetInt32(values[AttrNumberGetAttrOffset(Anum_dimension_slice_range_start)]);
fd->range_end = DatumGetInt32(values[AttrNumberGetAttrOffset(Anum_dimension_slice_range_end)]);

if (should_free)
heap_freetuple(tuple);

Check warning on line 109 in src/dimension_slice.c

View check run for this annotation

Codecov / codecov/patch

src/dimension_slice.c#L109

Added line #L109 was not covered by tests
}

DimensionSlice *
ts_dimension_slice_create(int dimension_id, int64 range_start, int64 range_end)
{
Expand Down Expand Up @@ -1179,54 +1224,114 @@ ts_osm_chunk_range_overlaps(int32 osm_dimension_slice_id, int32 dimension_id, in
return res;
}

static ScanTupleResult
dimension_slice_tuple_update(TupleInfo *ti, void *data)
static bool
lock_dimension_slice_tuple(int32 dimension_slice_id, ItemPointer tid,
FormData_dimension_slice *form)
{
bool should_free;
HeapTuple tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free);
FormData_dimension_slice *fd = (FormData_dimension_slice *) data;

Datum values[Natts_dimension_slice] = { 0 };
bool isnull[Natts_dimension_slice] = { 0 };
bool doReplace[Natts_dimension_slice] = { 0 };
bool success = false;
ScanTupLock scantuplock = {
.waitpolicy = LockWaitBlock,
.lockmode = LockTupleExclusive,
};
ScanIterator iterator =
ts_scan_iterator_create(DIMENSION_SLICE, RowShareLock, CurrentMemoryContext);
iterator.ctx.index =
catalog_get_index(ts_catalog_get(), DIMENSION_SLICE, DIMENSION_SLICE_ID_IDX);
iterator.ctx.tuplock = &scantuplock;
/* Keeping the lock since we presumably want to update the tuple */
iterator.ctx.flags = SCANNER_F_KEEPLOCK;

/* see table_tuple_lock for details about flags that are set in TupleExclusive mode */
scantuplock.lockflags = TUPLE_LOCK_FLAG_LOCK_UPDATE_IN_PROGRESS;
if (!IsolationUsesXactSnapshot())
{
/* in read committed mode, we follow all updates to this tuple */
scantuplock.lockflags |= TUPLE_LOCK_FLAG_FIND_LAST_VERSION;
}

values[AttrNumberGetAttrOffset(Anum_dimension_slice_range_start)] =
Int64GetDatum(fd->range_start);
doReplace[AttrNumberGetAttrOffset(Anum_dimension_slice_range_start)] = true;
ts_scan_iterator_scan_key_init(&iterator,
Anum_dimension_slice_id,
BTEqualStrategyNumber,
F_INT4EQ,
Int32GetDatum(dimension_slice_id));

values[AttrNumberGetAttrOffset(Anum_dimension_slice_range_end)] = Int64GetDatum(fd->range_end);
doReplace[AttrNumberGetAttrOffset(Anum_dimension_slice_range_end)] = true;
ts_scanner_foreach(&iterator)
{
TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator);
if (ti->lockresult != TM_Ok)
{
if (IsolationUsesXactSnapshot())
{
/* For Repeatable Read and Serializable isolation level report error
* if we cannot lock the tuple
*/
ereport(ERROR,
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
errmsg("could not serialize access due to concurrent update")));
}
else
{
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("unable to lock hypertable catalog tuple, lock result is %d for "
"hypertable "
"ID (%d)",
ti->lockresult,
dimension_slice_id)));
}
}
dimension_slice_formdata_fill(form, ti);
ItemPointer result_tid = ts_scanner_get_tuple_tid(ti);
tid->ip_blkid = result_tid->ip_blkid;
tid->ip_posid = result_tid->ip_posid;
success = true;
break;
}
ts_scan_iterator_close(&iterator);
return success;
}

HeapTuple new_tuple =
heap_modify_tuple(tuple, ts_scanner_get_tupledesc(ti), values, isnull, doReplace);
/* update the tuple at this tid. The assumption is that we already hold a
* tuple exclusive lock and no other transaction can modify this tuple
* The sequence of operations for any update is:
* lock the tuple using lock_hypertable_tuple.
* then update the required fields
* call dimension_slice_update_catalog_tuple to complete the update.
* This ensures correct tuple locking and tuple updates in the presence of
* concurrent transactions. Failure to follow this results in catalog corruption
*/
static void
dimension_slice_update_catalog_tuple(ItemPointer tid, FormData_dimension_slice *update)
{
HeapTuple new_tuple;
CatalogSecurityContext sec_ctx;
Catalog *catalog = ts_catalog_get();
Oid table = catalog_get_table_id(catalog, DIMENSION_SLICE);
Relation dimension_slice_rel = relation_open(table, RowExclusiveLock);

ts_catalog_update(ti->scanrel, new_tuple);
new_tuple = dimension_slice_formdata_make_tuple(update, dimension_slice_rel->rd_att);

ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx);
ts_catalog_update_tid(dimension_slice_rel, tid, new_tuple);
ts_catalog_restore_user(&sec_ctx);
heap_freetuple(new_tuple);
if (should_free)
heap_freetuple(tuple);

return SCAN_DONE;
relation_close(dimension_slice_rel, NoLock);
}

int
ts_dimension_slice_update_by_id(int32 dimension_slice_id, FormData_dimension_slice *fd_slice)
ts_dimension_slice_range_update(DimensionSlice *slice)
{
ScanKeyData scankey[1];
FormData_dimension_slice form;
ItemPointerData tid;
/* lock the tuple entry in the catalog table */
bool found = lock_dimension_slice_tuple(slice->fd.id, &tid, &form);
Ensure(found, "hypertable id %d not found", slice->fd.id);

ScanKeyInit(&scankey[0],
Anum_dimension_slice_id_idx_id,
BTEqualStrategyNumber,
F_INT4EQ,
Int32GetDatum(dimension_slice_id));

return dimension_slice_scan_limit_internal(DIMENSION_SLICE_ID_IDX,
scankey,
1,
dimension_slice_tuple_update,
fd_slice,
1,
RowExclusiveLock,
NULL,
CurrentMemoryContext);
if (form.range_start != slice->fd.range_start || form.range_end != slice->fd.range_end)
{
form.range_start = slice->fd.range_start;
form.range_end = slice->fd.range_end;
dimension_slice_update_catalog_tuple(&tid, &form);
}
return true;
}
1 change: 1 addition & 0 deletions src/dimension_slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ extern bool ts_osm_chunk_range_overlaps(int32 osm_dimension_slice_id, int32 dime

extern int ts_dimension_slice_update_by_id(int32 dimension_slice_id,
FormData_dimension_slice *fd_slice);
extern int ts_dimension_slice_range_update(DimensionSlice *slice);

#define dimension_slice_insert(slice) ts_dimension_slice_insert_multi(&(slice), 1)

Expand Down
2 changes: 1 addition & 1 deletion src/hypertable.c
Original file line number Diff line number Diff line change
Expand Up @@ -2658,7 +2658,7 @@ ts_hypertable_osm_range_update(PG_FUNCTION_ARGS)

slice->fd.range_start = range_start_internal;
slice->fd.range_end = range_end_internal;
ts_dimension_slice_update_by_id(dimension_slice_id, &slice->fd);
ts_dimension_slice_range_update(slice);

PG_RETURN_BOOL(overlap);
}

0 comments on commit 32723f2

Please sign in to comment.