Skip to content

Commit

Permalink
enhancing-copy-thoughput
Browse files Browse the repository at this point in the history
  • Loading branch information
naanagon committed Apr 20, 2024
1 parent 625a139 commit e0f846e
Show file tree
Hide file tree
Showing 9 changed files with 41 additions and 15 deletions.
6 changes: 5 additions & 1 deletion src/postgres/src/backend/bootstrap/bootstrap.c
Expand Up @@ -824,14 +824,18 @@ InsertOneTuple(Oid objectid)
if (objectid != (Oid) 0)
HeapTupleSetOid(tuple, objectid);

TupleTableSlot *slot = MakeSingleTupleTableSlot(tupDesc);
ExecStoreHeapTuple(tuple, slot, false);

if (IsYugaByteEnabled())
YBCExecuteInsert(boot_reldesc, tupDesc, tuple, ONCONFLICT_NONE);
YBCExecuteInsert(boot_reldesc, slot, tupDesc, tuple, ONCONFLICT_NONE);
else
simple_heap_insert(boot_reldesc, tuple);

pfree(tupDesc); /* just free's tupDesc, not the attrtypes */

heap_freetuple(tuple);
ExecDropSingleTupleTableSlot(slot);
elog(DEBUG4, "row inserted");

/*
Expand Down
16 changes: 10 additions & 6 deletions src/postgres/src/backend/catalog/indexing.c
Expand Up @@ -288,7 +288,8 @@ YBCatalogTupleInsert(Relation heapRel, HeapTuple tup, bool yb_shared_insert)
{
/* Keep ybctid consistent across all databases. */
Datum ybctid = 0;

TupleTableSlot *slot = MakeSingleTupleTableSlot(RelationGetDescr(heapRel));
ExecStoreHeapTuple(tup, slot, false);
if (yb_shared_insert)
{
if (!IsYsqlUpgrade)
Expand All @@ -304,16 +305,17 @@ YBCatalogTupleInsert(Relation heapRel, HeapTuple tup, bool yb_shared_insert)
if (dboid == YBCGetDatabaseOid(heapRel))
continue; /* Will be done after the loop. */
YBCExecuteInsertForDb(
dboid, heapRel, RelationGetDescr(heapRel), tup, ONCONFLICT_NONE, &ybctid,
dboid, heapRel, slot, RelationGetDescr(heapRel), tup, ONCONFLICT_NONE, &ybctid,
YB_TRANSACTIONAL);
}
YB_FOR_EACH_DB_END;
}
oid = YBCExecuteInsertForDb(
YBCGetDatabaseOid(heapRel), heapRel, RelationGetDescr(heapRel), tup, ONCONFLICT_NONE,
YBCGetDatabaseOid(heapRel), heapRel, slot, RelationGetDescr(heapRel), tup, ONCONFLICT_NONE,
&ybctid, YB_TRANSACTIONAL);
/* Update the local cache automatically */
YbSetSysCacheTuple(heapRel, tup);
ExecDropSingleTupleTableSlot(slot);
}
else
{
Expand Down Expand Up @@ -351,7 +353,8 @@ CatalogTupleInsertWithInfo(Relation heapRel, HeapTuple tup,
{
/* Keep ybctid consistent across all databases. */
Datum ybctid = 0;

TupleTableSlot *slot = MakeSingleTupleTableSlot(RelationGetDescr(heapRel));
ExecStoreHeapTuple(tup, slot, false);
if (yb_shared_insert)
{
if (!IsYsqlUpgrade)
Expand All @@ -367,16 +370,17 @@ CatalogTupleInsertWithInfo(Relation heapRel, HeapTuple tup,
if (dboid == YBCGetDatabaseOid(heapRel))
continue; /* Will be done after the loop. */
YBCExecuteInsertForDb(
dboid, heapRel, RelationGetDescr(heapRel), tup, ONCONFLICT_NONE, &ybctid,
dboid, heapRel, slot, RelationGetDescr(heapRel), tup, ONCONFLICT_NONE, &ybctid,
YB_TRANSACTIONAL);
}
YB_FOR_EACH_DB_END;
}
oid = YBCExecuteInsertForDb(
YBCGetDatabaseOid(heapRel), heapRel, RelationGetDescr(heapRel), tup, ONCONFLICT_NONE,
YBCGetDatabaseOid(heapRel), heapRel, slot, RelationGetDescr(heapRel), tup, ONCONFLICT_NONE,
&ybctid, YB_TRANSACTIONAL);
/* Update the local cache automatically */
YbSetSysCacheTuple(heapRel, tup);
ExecDropSingleTupleTableSlot(slot);
}
else
{
Expand Down
2 changes: 2 additions & 0 deletions src/postgres/src/backend/commands/copy.c
Expand Up @@ -3082,13 +3082,15 @@ CopyFrom(CopyState cstate)
if (useNonTxnInsert)
{
YBCExecuteNonTxnInsert(resultRelInfo->ri_RelationDesc,
slot,
tupDesc,
tuple,
cstate->on_conflict_action);
}
else
{
YBCExecuteInsert(resultRelInfo->ri_RelationDesc,
slot,
tupDesc,
tuple,
cstate->on_conflict_action);
Expand Down
1 change: 1 addition & 0 deletions src/postgres/src/backend/commands/createas.c
Expand Up @@ -627,6 +627,7 @@ intorel_receive(TupleTableSlot *slot, DestReceiver *self)
if (IsYBRelation(myState->rel))
{
YBCExecuteInsert(myState->rel,
slot,
RelationGetDescr(myState->rel),
tuple,
ONCONFLICT_NONE);
Expand Down
1 change: 1 addition & 0 deletions src/postgres/src/backend/commands/matview.c
Expand Up @@ -486,6 +486,7 @@ transientrel_receive(TupleTableSlot *slot, DestReceiver *self)
if (IsYBRelation(myState->transientrel))
{
YBCExecuteInsert(myState->transientrel,
slot,
RelationGetDescr(myState->transientrel),
tuple,
ONCONFLICT_NONE);
Expand Down
3 changes: 2 additions & 1 deletion src/postgres/src/backend/commands/tablecmds.c
Expand Up @@ -5419,6 +5419,7 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode)
{
if (IsYBRelation(newrel))
YBCExecuteInsert(newrel,
newslot,
RelationGetDescr(newrel),
tuple,
ONCONFLICT_NONE);
Expand Down Expand Up @@ -18211,7 +18212,7 @@ YbATCopyTableRowsUnchecked(Relation old_rel, Relation new_rel,
ExecStoreHeapTuple(tuple, newslot, false);

/* Write the tuple out to the new relation */
YBCExecuteInsert(new_rel, newslot->tts_tupleDescriptor, tuple,
YBCExecuteInsert(new_rel, newslot, newslot->tts_tupleDescriptor, tuple,
ONCONFLICT_NONE);

MemoryContextReset(econtext->ecxt_per_tuple_memory);
Expand Down
21 changes: 15 additions & 6 deletions src/postgres/src/backend/executor/ybcModifyTable.c
Expand Up @@ -265,7 +265,7 @@ static void YBCExecWriteStmt(YBCPgStatement ybc_stmt,
}

static Oid YBCApplyInsertRow(
YBCPgStatement insert_stmt, Relation rel, TupleDesc tupleDesc,
YBCPgStatement insert_stmt, Relation rel, TupleTableSlot *slot, TupleDesc tupleDesc,
HeapTuple tuple, OnConflictAction onConflictAction, Datum *ybctid,
YBCPgTransactionSetting transaction_setting) {
Oid relfileNodeId = YbGetRelfileNodeId(rel);
Expand Down Expand Up @@ -323,13 +323,12 @@ static Oid YBCApplyInsertRow(
*/
Oid collation_id = YBEncodingCollation(insert_stmt, attnum,
ybc_get_attcollation(RelationGetDescr(rel), attnum));
column->datum = heap_getattr(tuple, attnum, tupleDesc, &column->is_null);
column->datum = slot_getattr(slot, attnum, &column->is_null);
YBGetCollationInfo(collation_id, column->type_entity, column->datum, column->is_null, &column->collation_info);

/* Add the column value to the insert request */
++column;
}

HandleYBStatus(YBCPgDmlBindRow(insert_stmt, tuple->t_ybctid, columns, column - columns));

/*
Expand Down Expand Up @@ -362,6 +361,7 @@ static Oid YBCApplyInsertRow(
*/
static Oid YBCExecuteInsertInternal(Oid dboid,
Relation rel,
TupleTableSlot *slot,
TupleDesc tupleDesc,
HeapTuple tuple,
OnConflictAction onConflictAction,
Expand All @@ -379,7 +379,7 @@ static Oid YBCExecuteInsertInternal(Oid dboid,
transaction_setting));

Oid result = YBCApplyInsertRow(
insert_stmt, rel, tupleDesc, tuple, onConflictAction, ybctid,
insert_stmt, rel, slot, tupleDesc, tuple, onConflictAction, ybctid,
transaction_setting);

/* Execute the insert */
Expand All @@ -389,12 +389,14 @@ static Oid YBCExecuteInsertInternal(Oid dboid,
}

Oid YBCExecuteInsert(Relation rel,
TupleTableSlot *slot,
TupleDesc tupleDesc,
HeapTuple tuple,
OnConflictAction onConflictAction)
{
return YBCExecuteInsertForDb(YBCGetDatabaseOid(rel),
rel,
slot,
tupleDesc,
tuple,
onConflictAction,
Expand Down Expand Up @@ -422,6 +424,7 @@ static YBCPgTransactionSetting YBCFixTransactionSetting(

Oid YBCExecuteInsertForDb(Oid dboid,
Relation rel,
TupleTableSlot *slot,
TupleDesc tupleDesc,
HeapTuple tuple,
OnConflictAction onConflictAction,
Expand All @@ -430,6 +433,7 @@ Oid YBCExecuteInsertForDb(Oid dboid,
{
return YBCExecuteInsertInternal(dboid,
rel,
slot,
tupleDesc,
tuple,
onConflictAction,
Expand All @@ -438,12 +442,14 @@ Oid YBCExecuteInsertForDb(Oid dboid,
}

Oid YBCExecuteNonTxnInsert(Relation rel,
TupleTableSlot *slot,
TupleDesc tupleDesc,
HeapTuple tuple,
OnConflictAction onConflictAction)
{
return YBCExecuteNonTxnInsertForDb(YBCGetDatabaseOid(rel),
rel,
slot,
tupleDesc,
tuple,
onConflictAction,
Expand All @@ -452,13 +458,15 @@ Oid YBCExecuteNonTxnInsert(Relation rel,

Oid YBCExecuteNonTxnInsertForDb(Oid dboid,
Relation rel,
TupleTableSlot *slot,
TupleDesc tupleDesc,
HeapTuple tuple,
OnConflictAction onConflictAction,
Datum *ybctid)
{
return YBCExecuteInsertInternal(dboid,
rel,
slot,
tupleDesc,
tuple,
onConflictAction,
Expand All @@ -481,7 +489,7 @@ Oid YBCHeapInsert(TupleTableSlot *slot,

if (blockInsertStmt) {
return YBCApplyInsertRow(
blockInsertStmt, resultRelationDesc, slot->tts_tupleDescriptor,
blockInsertStmt, resultRelationDesc, slot, slot->tts_tupleDescriptor,
tuple, ONCONFLICT_NONE, NULL /* ybctid */,
YBCFixTransactionSetting(resultRelationDesc, transaction_setting));
}
Expand All @@ -496,7 +504,7 @@ Oid YBCHeapInsert(TupleTableSlot *slot,
* there are no indices or triggers on the target table.
*/
return YBCExecuteInsertForDb(
dboid, resultRelationDesc, slot->tts_tupleDescriptor, tuple, ONCONFLICT_NONE, NULL /* ybctid */,
dboid, resultRelationDesc, slot, slot->tts_tupleDescriptor, tuple, ONCONFLICT_NONE, NULL /* ybctid */,
transaction_setting);
}

Expand Down Expand Up @@ -1174,6 +1182,7 @@ Oid YBCExecuteUpdateReplace(Relation rel,
estate);

return YBCExecuteInsert(rel,
slot,
RelationGetDescr(rel),
tuple,
ONCONFLICT_NONE);
Expand Down
2 changes: 1 addition & 1 deletion src/postgres/src/include/access/htup_details.h
Expand Up @@ -838,4 +838,4 @@ extern size_t varsize_any(void *p);
extern HeapTuple heap_expand_tuple(HeapTuple sourceTuple, TupleDesc tupleDesc);
extern MinimalTuple minimal_expand_tuple(HeapTuple sourceTuple, TupleDesc tupleDesc);

#endif /* HTUP_DETAILS_H */
#endif /* HTUP_DETAILS_H */
4 changes: 4 additions & 0 deletions src/postgres/src/include/executor/ybcModifyTable.h
Expand Up @@ -73,11 +73,13 @@ extern Oid YBCHeapInsert(TupleTableSlot *slot,
* to the generated value.
*/
extern Oid YBCExecuteInsert(Relation rel,
TupleTableSlot *slot,
TupleDesc tupleDesc,
HeapTuple tuple,
OnConflictAction onConflictAction);
extern Oid YBCExecuteInsertForDb(Oid dboid,
Relation rel,
TupleTableSlot *slot,
TupleDesc tupleDesc,
HeapTuple tuple,
OnConflictAction onConflictAction,
Expand All @@ -95,11 +97,13 @@ extern void YBCApplyWriteStmt(YBCPgStatement handle, Relation relation);
* to the generated value.
*/
extern Oid YBCExecuteNonTxnInsert(Relation rel,
TupleTableSlot *slot,
TupleDesc tupleDesc,
HeapTuple tuple,
OnConflictAction onConflictAction);
extern Oid YBCExecuteNonTxnInsertForDb(Oid dboid,
Relation rel,
TupleTableSlot *slot,
TupleDesc tupleDesc,
HeapTuple tuple,
OnConflictAction onConflictAction,
Expand Down

0 comments on commit e0f846e

Please sign in to comment.