Skip to content

Commit

Permalink
enhancing-copy-thoughput
Browse files Browse the repository at this point in the history
  • Loading branch information
naanagon committed May 11, 2024
1 parent 625a139 commit eb4d763
Show file tree
Hide file tree
Showing 9 changed files with 50 additions and 36 deletions.
8 changes: 6 additions & 2 deletions src/postgres/src/backend/bootstrap/bootstrap.c
Expand Up @@ -823,9 +823,13 @@ InsertOneTuple(Oid objectid)
tuple = heap_form_tuple(tupDesc, values, Nulls);
if (objectid != (Oid) 0)
HeapTupleSetOid(tuple, objectid);

if (IsYugaByteEnabled())
YBCExecuteInsert(boot_reldesc, tupDesc, tuple, ONCONFLICT_NONE);
{
TupleTableSlot *slot = MakeSingleTupleTableSlot(tupDesc);
ExecStoreHeapTuple(tuple, slot, false);
YBCExecuteInsert(boot_reldesc, slot, tuple, ONCONFLICT_NONE);
ExecDropSingleTupleTableSlot(slot);
}
else
simple_heap_insert(boot_reldesc, tuple);

Expand Down
16 changes: 10 additions & 6 deletions src/postgres/src/backend/catalog/indexing.c
Expand Up @@ -288,7 +288,8 @@ YBCatalogTupleInsert(Relation heapRel, HeapTuple tup, bool yb_shared_insert)
{
/* Keep ybctid consistent across all databases. */
Datum ybctid = 0;

TupleTableSlot *slot = MakeSingleTupleTableSlot(RelationGetDescr(heapRel));
ExecStoreHeapTuple(tup, slot, false);
if (yb_shared_insert)
{
if (!IsYsqlUpgrade)
Expand All @@ -304,16 +305,17 @@ YBCatalogTupleInsert(Relation heapRel, HeapTuple tup, bool yb_shared_insert)
if (dboid == YBCGetDatabaseOid(heapRel))
continue; /* Will be done after the loop. */
YBCExecuteInsertForDb(
dboid, heapRel, RelationGetDescr(heapRel), tup, ONCONFLICT_NONE, &ybctid,
dboid, heapRel, slot, tup, ONCONFLICT_NONE, &ybctid,
YB_TRANSACTIONAL);
}
YB_FOR_EACH_DB_END;
}
oid = YBCExecuteInsertForDb(
YBCGetDatabaseOid(heapRel), heapRel, RelationGetDescr(heapRel), tup, ONCONFLICT_NONE,
YBCGetDatabaseOid(heapRel), heapRel, slot, tup, ONCONFLICT_NONE,
&ybctid, YB_TRANSACTIONAL);
/* Update the local cache automatically */
YbSetSysCacheTuple(heapRel, tup);
ExecDropSingleTupleTableSlot(slot);
}
else
{
Expand Down Expand Up @@ -351,7 +353,8 @@ CatalogTupleInsertWithInfo(Relation heapRel, HeapTuple tup,
{
/* Keep ybctid consistent across all databases. */
Datum ybctid = 0;

TupleTableSlot *slot = MakeSingleTupleTableSlot(RelationGetDescr(heapRel));
ExecStoreHeapTuple(tup, slot, false);
if (yb_shared_insert)
{
if (!IsYsqlUpgrade)
Expand All @@ -367,16 +370,17 @@ CatalogTupleInsertWithInfo(Relation heapRel, HeapTuple tup,
if (dboid == YBCGetDatabaseOid(heapRel))
continue; /* Will be done after the loop. */
YBCExecuteInsertForDb(
dboid, heapRel, RelationGetDescr(heapRel), tup, ONCONFLICT_NONE, &ybctid,
dboid, heapRel, slot, tup, ONCONFLICT_NONE, &ybctid,
YB_TRANSACTIONAL);
}
YB_FOR_EACH_DB_END;
}
oid = YBCExecuteInsertForDb(
YBCGetDatabaseOid(heapRel), heapRel, RelationGetDescr(heapRel), tup, ONCONFLICT_NONE,
YBCGetDatabaseOid(heapRel), heapRel, slot, tup, ONCONFLICT_NONE,
&ybctid, YB_TRANSACTIONAL);
/* Update the local cache automatically */
YbSetSysCacheTuple(heapRel, tup);
ExecDropSingleTupleTableSlot(slot);
}
else
{
Expand Down
4 changes: 2 additions & 2 deletions src/postgres/src/backend/commands/copy.c
Expand Up @@ -3082,14 +3082,14 @@ CopyFrom(CopyState cstate)
if (useNonTxnInsert)
{
YBCExecuteNonTxnInsert(resultRelInfo->ri_RelationDesc,
tupDesc,
slot,
tuple,
cstate->on_conflict_action);
}
else
{
YBCExecuteInsert(resultRelInfo->ri_RelationDesc,
tupDesc,
slot,
tuple,
cstate->on_conflict_action);
}
Expand Down
2 changes: 1 addition & 1 deletion src/postgres/src/backend/commands/createas.c
Expand Up @@ -627,7 +627,7 @@ intorel_receive(TupleTableSlot *slot, DestReceiver *self)
if (IsYBRelation(myState->rel))
{
YBCExecuteInsert(myState->rel,
RelationGetDescr(myState->rel),
slot,
tuple,
ONCONFLICT_NONE);
}
Expand Down
2 changes: 1 addition & 1 deletion src/postgres/src/backend/commands/matview.c
Expand Up @@ -486,7 +486,7 @@ transientrel_receive(TupleTableSlot *slot, DestReceiver *self)
if (IsYBRelation(myState->transientrel))
{
YBCExecuteInsert(myState->transientrel,
RelationGetDescr(myState->transientrel),
slot,
tuple,
ONCONFLICT_NONE);
}
Expand Down
6 changes: 4 additions & 2 deletions src/postgres/src/backend/commands/tablecmds.c
Expand Up @@ -5418,10 +5418,12 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode)
if (newrel)
{
if (IsYBRelation(newrel))
{
YBCExecuteInsert(newrel,
RelationGetDescr(newrel),
newslot,
tuple,
ONCONFLICT_NONE);
}
else
heap_insert(newrel, tuple, mycid, hi_options, bistate);
}
Expand Down Expand Up @@ -18211,7 +18213,7 @@ YbATCopyTableRowsUnchecked(Relation old_rel, Relation new_rel,
ExecStoreHeapTuple(tuple, newslot, false);

/* Write the tuple out to the new relation */
YBCExecuteInsert(new_rel, newslot->tts_tupleDescriptor, tuple,
YBCExecuteInsert(new_rel, newslot, tuple,
ONCONFLICT_NONE);

MemoryContextReset(econtext->ecxt_per_tuple_memory);
Expand Down
3 changes: 2 additions & 1 deletion src/postgres/src/backend/executor/nodeModifyTable.c
Expand Up @@ -1573,7 +1573,8 @@ ExecUpdate(ModifyTableState *mtstate,
ModifyTable *plan = (ModifyTable *) mtstate->ps.plan;
if (is_pk_updated)
{
YBCExecuteUpdateReplace(resultRelationDesc, planSlot, tuple, estate);
slot->tts_tuple->t_ybctid = YBCGetYBTupleIdFromSlot(planSlot);
YBCExecuteUpdateReplace(resultRelationDesc, slot, tuple, estate);
row_found = true;
}
else
Expand Down
37 changes: 20 additions & 17 deletions src/postgres/src/backend/executor/ybcModifyTable.c
Expand Up @@ -265,14 +265,18 @@ static void YBCExecWriteStmt(YBCPgStatement ybc_stmt,
}

static Oid YBCApplyInsertRow(
YBCPgStatement insert_stmt, Relation rel, TupleDesc tupleDesc,
YBCPgStatement insert_stmt, Relation rel, TupleTableSlot *slot,
HeapTuple tuple, OnConflictAction onConflictAction, Datum *ybctid,
YBCPgTransactionSetting transaction_setting) {
Oid relfileNodeId = YbGetRelfileNodeId(rel);
AttrNumber minattr = YBGetFirstLowInvalidAttributeNumber(rel);
int natts = RelationGetNumberOfAttributes(rel);
Bitmapset *pkey = YBGetTablePrimaryKeyBms(rel);

TupleDesc tupleDesc = slot->tts_tupleDescriptor;
if(tupleDesc == NULL || tupleDesc != RelationGetDescr(rel))
{
tupleDesc = RelationGetDescr(rel);
}
/* Generate a new oid for this row if needed */
if (rel->rd_rel->relhasoids)
{
Expand Down Expand Up @@ -323,13 +327,12 @@ static Oid YBCApplyInsertRow(
*/
Oid collation_id = YBEncodingCollation(insert_stmt, attnum,
ybc_get_attcollation(RelationGetDescr(rel), attnum));
column->datum = heap_getattr(tuple, attnum, tupleDesc, &column->is_null);
column->datum = slot_getattr(slot, attnum, &column->is_null);
YBGetCollationInfo(collation_id, column->type_entity, column->datum, column->is_null, &column->collation_info);

/* Add the column value to the insert request */
++column;
}

HandleYBStatus(YBCPgDmlBindRow(insert_stmt, tuple->t_ybctid, columns, column - columns));

/*
Expand Down Expand Up @@ -362,7 +365,7 @@ static Oid YBCApplyInsertRow(
*/
static Oid YBCExecuteInsertInternal(Oid dboid,
Relation rel,
TupleDesc tupleDesc,
TupleTableSlot *slot,
HeapTuple tuple,
OnConflictAction onConflictAction,
Datum *ybctid,
Expand All @@ -379,7 +382,7 @@ static Oid YBCExecuteInsertInternal(Oid dboid,
transaction_setting));

Oid result = YBCApplyInsertRow(
insert_stmt, rel, tupleDesc, tuple, onConflictAction, ybctid,
insert_stmt, rel, slot, tuple, onConflictAction, ybctid,
transaction_setting);

/* Execute the insert */
Expand All @@ -389,13 +392,13 @@ static Oid YBCExecuteInsertInternal(Oid dboid,
}

Oid YBCExecuteInsert(Relation rel,
TupleDesc tupleDesc,
TupleTableSlot *slot,
HeapTuple tuple,
OnConflictAction onConflictAction)
{
return YBCExecuteInsertForDb(YBCGetDatabaseOid(rel),
rel,
tupleDesc,
slot,
tuple,
onConflictAction,
NULL /* ybctid */,
Expand All @@ -422,44 +425,44 @@ static YBCPgTransactionSetting YBCFixTransactionSetting(

Oid YBCExecuteInsertForDb(Oid dboid,
Relation rel,
TupleDesc tupleDesc,
TupleTableSlot *slot,
HeapTuple tuple,
OnConflictAction onConflictAction,
Datum *ybctid,
YBCPgTransactionSetting transaction_setting)
{
return YBCExecuteInsertInternal(dboid,
rel,
tupleDesc,
slot,
tuple,
onConflictAction,
ybctid,
YBCFixTransactionSetting(rel, transaction_setting));
}

Oid YBCExecuteNonTxnInsert(Relation rel,
TupleDesc tupleDesc,
TupleTableSlot *slot,
HeapTuple tuple,
OnConflictAction onConflictAction)
{
return YBCExecuteNonTxnInsertForDb(YBCGetDatabaseOid(rel),
rel,
tupleDesc,
slot,
tuple,
onConflictAction,
NULL /* ybctid */);
}

Oid YBCExecuteNonTxnInsertForDb(Oid dboid,
Relation rel,
TupleDesc tupleDesc,
TupleTableSlot *slot,
HeapTuple tuple,
OnConflictAction onConflictAction,
Datum *ybctid)
{
return YBCExecuteInsertInternal(dboid,
rel,
tupleDesc,
slot,
tuple,
onConflictAction,
ybctid,
Expand All @@ -481,7 +484,7 @@ Oid YBCHeapInsert(TupleTableSlot *slot,

if (blockInsertStmt) {
return YBCApplyInsertRow(
blockInsertStmt, resultRelationDesc, slot->tts_tupleDescriptor,
blockInsertStmt, resultRelationDesc, slot,
tuple, ONCONFLICT_NONE, NULL /* ybctid */,
YBCFixTransactionSetting(resultRelationDesc, transaction_setting));
}
Expand All @@ -496,7 +499,7 @@ Oid YBCHeapInsert(TupleTableSlot *slot,
* there are no indices or triggers on the target table.
*/
return YBCExecuteInsertForDb(
dboid, resultRelationDesc, slot->tts_tupleDescriptor, tuple, ONCONFLICT_NONE, NULL /* ybctid */,
dboid, resultRelationDesc, slot, tuple, ONCONFLICT_NONE, NULL /* ybctid */,
transaction_setting);
}

Expand Down Expand Up @@ -1174,7 +1177,7 @@ Oid YBCExecuteUpdateReplace(Relation rel,
estate);

return YBCExecuteInsert(rel,
RelationGetDescr(rel),
slot,
tuple,
ONCONFLICT_NONE);
}
Expand Down
8 changes: 4 additions & 4 deletions src/postgres/src/include/executor/ybcModifyTable.h
Expand Up @@ -73,12 +73,12 @@ extern Oid YBCHeapInsert(TupleTableSlot *slot,
* to the generated value.
*/
extern Oid YBCExecuteInsert(Relation rel,
TupleDesc tupleDesc,
TupleTableSlot *slot,
HeapTuple tuple,
OnConflictAction onConflictAction);
extern Oid YBCExecuteInsertForDb(Oid dboid,
Relation rel,
TupleDesc tupleDesc,
TupleTableSlot *slot,
HeapTuple tuple,
OnConflictAction onConflictAction,
Datum *ybctid,
Expand All @@ -95,12 +95,12 @@ extern void YBCApplyWriteStmt(YBCPgStatement handle, Relation relation);
* to the generated value.
*/
extern Oid YBCExecuteNonTxnInsert(Relation rel,
TupleDesc tupleDesc,
TupleTableSlot *slot,
HeapTuple tuple,
OnConflictAction onConflictAction);
extern Oid YBCExecuteNonTxnInsertForDb(Oid dboid,
Relation rel,
TupleDesc tupleDesc,
TupleTableSlot *slot,
HeapTuple tuple,
OnConflictAction onConflictAction,
Datum *ybctid);
Expand Down

0 comments on commit eb4d763

Please sign in to comment.