Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhancing copy throughput by retrieving datums with checkpointing available in TupleTableSlot #21929

Merged
merged 1 commit into from May 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 6 additions & 2 deletions src/postgres/src/backend/bootstrap/bootstrap.c
Expand Up @@ -823,9 +823,13 @@ InsertOneTuple(Oid objectid)
tuple = heap_form_tuple(tupDesc, values, Nulls);
if (objectid != (Oid) 0)
HeapTupleSetOid(tuple, objectid);

if (IsYugaByteEnabled())
YBCExecuteInsert(boot_reldesc, tupDesc, tuple, ONCONFLICT_NONE);
{
TupleTableSlot *slot = MakeSingleTupleTableSlot(tupDesc);
ExecStoreHeapTuple(tuple, slot, false);
YBCExecuteInsert(boot_reldesc, slot, ONCONFLICT_NONE);
ExecDropSingleTupleTableSlot(slot);
}
else
simple_heap_insert(boot_reldesc, tuple);

Expand Down
16 changes: 10 additions & 6 deletions src/postgres/src/backend/catalog/indexing.c
Expand Up @@ -288,7 +288,8 @@ YBCatalogTupleInsert(Relation heapRel, HeapTuple tup, bool yb_shared_insert)
{
/* Keep ybctid consistent across all databases. */
Datum ybctid = 0;

TupleTableSlot *slot = MakeSingleTupleTableSlot(RelationGetDescr(heapRel));
ExecStoreHeapTuple(tup, slot, false);
if (yb_shared_insert)
{
if (!IsYsqlUpgrade)
Expand All @@ -304,16 +305,17 @@ YBCatalogTupleInsert(Relation heapRel, HeapTuple tup, bool yb_shared_insert)
if (dboid == YBCGetDatabaseOid(heapRel))
continue; /* Will be done after the loop. */
YBCExecuteInsertForDb(
dboid, heapRel, RelationGetDescr(heapRel), tup, ONCONFLICT_NONE, &ybctid,
dboid, heapRel, slot, ONCONFLICT_NONE, &ybctid,
YB_TRANSACTIONAL);
}
YB_FOR_EACH_DB_END;
}
oid = YBCExecuteInsertForDb(
YBCGetDatabaseOid(heapRel), heapRel, RelationGetDescr(heapRel), tup, ONCONFLICT_NONE,
YBCGetDatabaseOid(heapRel), heapRel, slot, ONCONFLICT_NONE,
&ybctid, YB_TRANSACTIONAL);
/* Update the local cache automatically */
YbSetSysCacheTuple(heapRel, tup);
ExecDropSingleTupleTableSlot(slot);
}
else
{
Expand Down Expand Up @@ -351,7 +353,8 @@ CatalogTupleInsertWithInfo(Relation heapRel, HeapTuple tup,
{
/* Keep ybctid consistent across all databases. */
Datum ybctid = 0;

TupleTableSlot *slot = MakeSingleTupleTableSlot(RelationGetDescr(heapRel));
ExecStoreHeapTuple(tup, slot, false);
if (yb_shared_insert)
{
if (!IsYsqlUpgrade)
Expand All @@ -367,16 +370,17 @@ CatalogTupleInsertWithInfo(Relation heapRel, HeapTuple tup,
if (dboid == YBCGetDatabaseOid(heapRel))
continue; /* Will be done after the loop. */
YBCExecuteInsertForDb(
dboid, heapRel, RelationGetDescr(heapRel), tup, ONCONFLICT_NONE, &ybctid,
dboid, heapRel, slot, ONCONFLICT_NONE, &ybctid,
YB_TRANSACTIONAL);
}
YB_FOR_EACH_DB_END;
}
oid = YBCExecuteInsertForDb(
YBCGetDatabaseOid(heapRel), heapRel, RelationGetDescr(heapRel), tup, ONCONFLICT_NONE,
YBCGetDatabaseOid(heapRel), heapRel, slot, ONCONFLICT_NONE,
&ybctid, YB_TRANSACTIONAL);
/* Update the local cache automatically */
YbSetSysCacheTuple(heapRel, tup);
ExecDropSingleTupleTableSlot(slot);
}
else
{
Expand Down
6 changes: 2 additions & 4 deletions src/postgres/src/backend/commands/copy.c
Expand Up @@ -3082,15 +3082,13 @@ CopyFrom(CopyState cstate)
if (useNonTxnInsert)
{
YBCExecuteNonTxnInsert(resultRelInfo->ri_RelationDesc,
tupDesc,
tuple,
slot,
cstate->on_conflict_action);
}
else
{
YBCExecuteInsert(resultRelInfo->ri_RelationDesc,
tupDesc,
tuple,
slot,
cstate->on_conflict_action);
}
}
Expand Down
3 changes: 1 addition & 2 deletions src/postgres/src/backend/commands/createas.c
Expand Up @@ -627,8 +627,7 @@ intorel_receive(TupleTableSlot *slot, DestReceiver *self)
if (IsYBRelation(myState->rel))
{
YBCExecuteInsert(myState->rel,
RelationGetDescr(myState->rel),
tuple,
slot,
ONCONFLICT_NONE);
}
else
Expand Down
3 changes: 1 addition & 2 deletions src/postgres/src/backend/commands/matview.c
Expand Up @@ -486,8 +486,7 @@ transientrel_receive(TupleTableSlot *slot, DestReceiver *self)
if (IsYBRelation(myState->transientrel))
{
YBCExecuteInsert(myState->transientrel,
RelationGetDescr(myState->transientrel),
tuple,
slot,
ONCONFLICT_NONE);
}
else
Expand Down
6 changes: 2 additions & 4 deletions src/postgres/src/backend/commands/tablecmds.c
Expand Up @@ -5419,8 +5419,7 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode)
{
if (IsYBRelation(newrel))
YBCExecuteInsert(newrel,
RelationGetDescr(newrel),
tuple,
newslot,
ONCONFLICT_NONE);
else
heap_insert(newrel, tuple, mycid, hi_options, bistate);
Expand Down Expand Up @@ -18211,8 +18210,7 @@ YbATCopyTableRowsUnchecked(Relation old_rel, Relation new_rel,
ExecStoreHeapTuple(tuple, newslot, false);

/* Write the tuple out to the new relation */
YBCExecuteInsert(new_rel, newslot->tts_tupleDescriptor, tuple,
ONCONFLICT_NONE);
YBCExecuteInsert(new_rel, newslot, ONCONFLICT_NONE);

MemoryContextReset(econtext->ecxt_per_tuple_memory);

Expand Down
7 changes: 4 additions & 3 deletions src/postgres/src/backend/executor/nodeModifyTable.c
Expand Up @@ -560,7 +560,7 @@ ExecInsert(ModifyTableState *mtstate,
* locked and released in this call.
* TODO(Mikhail) Verify the YugaByte transaction support works properly for on-conflict.
*/
newId = YBCHeapInsert(slot, tuple, blockInsertStmt, estate);
newId = YBCHeapInsert(slot, blockInsertStmt, estate);

/* insert index entries for tuple */
recheckIndexes = ExecInsertIndexTuples(slot, tuple,
Expand Down Expand Up @@ -629,7 +629,7 @@ ExecInsert(ModifyTableState *mtstate,
if (IsYBRelation(resultRelationDesc))
{
MemoryContext oldContext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
newId = YBCHeapInsert(slot, tuple, blockInsertStmt, estate);
newId = YBCHeapInsert(slot, blockInsertStmt, estate);

/* insert index entries for tuple */
if (YBCRelInfoHasSecondaryIndices(resultRelInfo))
Expand Down Expand Up @@ -1573,7 +1573,8 @@ ExecUpdate(ModifyTableState *mtstate,
ModifyTable *plan = (ModifyTable *) mtstate->ps.plan;
if (is_pk_updated)
{
YBCExecuteUpdateReplace(resultRelationDesc, planSlot, tuple, estate);
slot->tts_tuple->t_ybctid = YBCGetYBTupleIdFromSlot(planSlot);
YBCExecuteUpdateReplace(resultRelationDesc, slot, estate);
row_found = true;
}
else
Expand Down
49 changes: 19 additions & 30 deletions src/postgres/src/backend/executor/ybcModifyTable.c
Expand Up @@ -265,13 +265,15 @@ static void YBCExecWriteStmt(YBCPgStatement ybc_stmt,
}

static Oid YBCApplyInsertRow(
YBCPgStatement insert_stmt, Relation rel, TupleDesc tupleDesc,
HeapTuple tuple, OnConflictAction onConflictAction, Datum *ybctid,
YBCPgStatement insert_stmt, Relation rel, TupleTableSlot *slot,
OnConflictAction onConflictAction, Datum *ybctid,
YBCPgTransactionSetting transaction_setting) {
Oid relfileNodeId = YbGetRelfileNodeId(rel);
AttrNumber minattr = YBGetFirstLowInvalidAttributeNumber(rel);
int natts = RelationGetNumberOfAttributes(rel);
Bitmapset *pkey = YBGetTablePrimaryKeyBms(rel);
TupleDesc tupleDesc = RelationGetDescr(rel);
HeapTuple tuple = slot->tts_tuple;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the definition of ExecMaterializeSlot, I haven't used it. So, ybctid and oid are updated in tuple itself. Please lmk if i'm missing anything.


/* Generate a new oid for this row if needed */
if (rel->rd_rel->relhasoids)
Expand Down Expand Up @@ -323,13 +325,12 @@ static Oid YBCApplyInsertRow(
*/
Oid collation_id = YBEncodingCollation(insert_stmt, attnum,
ybc_get_attcollation(RelationGetDescr(rel), attnum));
column->datum = heap_getattr(tuple, attnum, tupleDesc, &column->is_null);
column->datum = slot_getattr(slot, attnum, &column->is_null);
YBGetCollationInfo(collation_id, column->type_entity, column->datum, column->is_null, &column->collation_info);

/* Add the column value to the insert request */
++column;
}

HandleYBStatus(YBCPgDmlBindRow(insert_stmt, tuple->t_ybctid, columns, column - columns));

/*
Expand Down Expand Up @@ -362,8 +363,7 @@ static Oid YBCApplyInsertRow(
*/
static Oid YBCExecuteInsertInternal(Oid dboid,
Relation rel,
TupleDesc tupleDesc,
HeapTuple tuple,
TupleTableSlot *slot,
OnConflictAction onConflictAction,
Datum *ybctid,
YBCPgTransactionSetting transaction_setting)
Expand All @@ -379,7 +379,7 @@ static Oid YBCExecuteInsertInternal(Oid dboid,
transaction_setting));

Oid result = YBCApplyInsertRow(
insert_stmt, rel, tupleDesc, tuple, onConflictAction, ybctid,
insert_stmt, rel, slot, onConflictAction, ybctid,
transaction_setting);

/* Execute the insert */
Expand All @@ -389,14 +389,12 @@ static Oid YBCExecuteInsertInternal(Oid dboid,
}

Oid YBCExecuteInsert(Relation rel,
TupleDesc tupleDesc,
HeapTuple tuple,
TupleTableSlot *slot,
OnConflictAction onConflictAction)
{
return YBCExecuteInsertForDb(YBCGetDatabaseOid(rel),
rel,
tupleDesc,
tuple,
slot,
onConflictAction,
NULL /* ybctid */,
YB_TRANSACTIONAL);
Expand All @@ -422,52 +420,45 @@ static YBCPgTransactionSetting YBCFixTransactionSetting(

Oid YBCExecuteInsertForDb(Oid dboid,
Relation rel,
TupleDesc tupleDesc,
HeapTuple tuple,
TupleTableSlot *slot,
OnConflictAction onConflictAction,
Datum *ybctid,
YBCPgTransactionSetting transaction_setting)
{
return YBCExecuteInsertInternal(dboid,
rel,
tupleDesc,
tuple,
slot,
onConflictAction,
ybctid,
YBCFixTransactionSetting(rel, transaction_setting));
}

Oid YBCExecuteNonTxnInsert(Relation rel,
TupleDesc tupleDesc,
HeapTuple tuple,
TupleTableSlot *slot,
OnConflictAction onConflictAction)
{
return YBCExecuteNonTxnInsertForDb(YBCGetDatabaseOid(rel),
rel,
tupleDesc,
tuple,
slot,
onConflictAction,
NULL /* ybctid */);
}

Oid YBCExecuteNonTxnInsertForDb(Oid dboid,
Relation rel,
TupleDesc tupleDesc,
HeapTuple tuple,
TupleTableSlot *slot,
OnConflictAction onConflictAction,
Datum *ybctid)
{
return YBCExecuteInsertInternal(dboid,
rel,
tupleDesc,
tuple,
slot,
onConflictAction,
ybctid,
YB_NON_TRANSACTIONAL);
}

Oid YBCHeapInsert(TupleTableSlot *slot,
HeapTuple tuple,
YBCPgStatement blockInsertStmt,
EState *estate)
{
Expand All @@ -481,8 +472,8 @@ Oid YBCHeapInsert(TupleTableSlot *slot,

if (blockInsertStmt) {
return YBCApplyInsertRow(
blockInsertStmt, resultRelationDesc, slot->tts_tupleDescriptor,
tuple, ONCONFLICT_NONE, NULL /* ybctid */,
blockInsertStmt, resultRelationDesc, slot,
ONCONFLICT_NONE, NULL /* ybctid */,
YBCFixTransactionSetting(resultRelationDesc, transaction_setting));
}

Expand All @@ -496,7 +487,7 @@ Oid YBCHeapInsert(TupleTableSlot *slot,
* there are no indices or triggers on the target table.
*/
return YBCExecuteInsertForDb(
dboid, resultRelationDesc, slot->tts_tupleDescriptor, tuple, ONCONFLICT_NONE, NULL /* ybctid */,
dboid, resultRelationDesc, slot, ONCONFLICT_NONE, NULL /* ybctid */,
transaction_setting);
}

Expand Down Expand Up @@ -1162,7 +1153,6 @@ YBCExecuteUpdateLoginAttempts(Oid roleid,

Oid YBCExecuteUpdateReplace(Relation rel,
TupleTableSlot *slot,
HeapTuple tuple,
EState *estate)
{
YBCExecuteDelete(rel,
Expand All @@ -1174,8 +1164,7 @@ Oid YBCExecuteUpdateReplace(Relation rel,
estate);

return YBCExecuteInsert(rel,
RelationGetDescr(rel),
tuple,
slot,
ONCONFLICT_NONE);
}

Expand Down
14 changes: 4 additions & 10 deletions src/postgres/src/include/executor/ybcModifyTable.h
Expand Up @@ -60,7 +60,6 @@ typedef void (*yb_bind_for_write_function) (YBCPgStatement stmt,
* to the generated value.
*/
extern Oid YBCHeapInsert(TupleTableSlot *slot,
HeapTuple tuple,
YBCPgStatement blockInsertStmt,
EState *estate);

Expand All @@ -73,13 +72,11 @@ extern Oid YBCHeapInsert(TupleTableSlot *slot,
* to the generated value.
*/
extern Oid YBCExecuteInsert(Relation rel,
TupleDesc tupleDesc,
HeapTuple tuple,
TupleTableSlot *slot,
OnConflictAction onConflictAction);
extern Oid YBCExecuteInsertForDb(Oid dboid,
Relation rel,
TupleDesc tupleDesc,
HeapTuple tuple,
TupleTableSlot *slot,
OnConflictAction onConflictAction,
Datum *ybctid,
YBCPgTransactionSetting transaction_setting);
Expand All @@ -95,13 +92,11 @@ extern void YBCApplyWriteStmt(YBCPgStatement handle, Relation relation);
* to the generated value.
*/
extern Oid YBCExecuteNonTxnInsert(Relation rel,
TupleDesc tupleDesc,
HeapTuple tuple,
TupleTableSlot *slot,
OnConflictAction onConflictAction);
extern Oid YBCExecuteNonTxnInsertForDb(Oid dboid,
Relation rel,
TupleDesc tupleDesc,
HeapTuple tuple,
TupleTableSlot *slot,
OnConflictAction onConflictAction,
Datum *ybctid);

Expand Down Expand Up @@ -188,7 +183,6 @@ extern bool YBCExecuteUpdateLoginAttempts(Oid roleid,
*/
extern Oid YBCExecuteUpdateReplace(Relation rel,
TupleTableSlot *slot,
HeapTuple tuple,
EState *estate);

//------------------------------------------------------------------------------
Expand Down