Skip to content

Commit

Permalink
Feature: TJDB bulk upload support for primary key (#9503)
Browse files Browse the repository at this point in the history
* Make primary key data mandatory except for serial data type

* Remove unnecessary console.log and comments

* Detect serial datatype against int datatype

* add ability to upsert rows on conflicting pk

* fix generated SQL for placeholder mismatch

* add type info

* discard serial values in csv

* add pk explicit check

---------

Co-authored-by: parthy007 <parthadhikari1812@gmail.com>
  • Loading branch information
akshaysasidrn and parthy007 committed May 6, 2024
1 parent 4e0facb commit 897e410
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 58 deletions.
7 changes: 3 additions & 4 deletions frontend/src/TooljetDatabase/Table/Header.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,14 @@ const Header = ({
return;
}

const { processed_rows: processedRows, rows_inserted: rowsInserted, rows_updated: rowsUpdated } = data.result;
const toastMessage =
`${pluralize(rowsInserted, 'new row')} added, ` + `${pluralize(rowsUpdated, 'row')} updated.`;
const { processed_rows: processedRows } = data.result;
const toastMessage = `${pluralize(processedRows, 'row')} processed`;

toast.success(toastMessage, {
position: 'top-center',
});

setUploadResult({ processedRows, rowsInserted, rowsUpdated });
setUploadResult({ processedRows });
} catch (error) {
toast.error(error.errors, { position: 'top-center' });
setIsBulkUploading(false);
Expand Down
148 changes: 94 additions & 54 deletions server/src/services/tooljet_db_bulk_upload.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { InjectEntityManager } from '@nestjs/typeorm';
import { isEmpty } from 'lodash';
import { pipeline } from 'stream/promises';
import { PassThrough } from 'stream';
import { v4 as uuid } from 'uuid';

const MAX_ROW_COUNT = 1000;

Expand Down Expand Up @@ -40,37 +41,51 @@ export class TooljetDbBulkUploadService {
internalTableId: string,
internalTableColumnSchema: TableColumnSchema[],
fileBuffer: Buffer
): Promise<{ processedRows: number; rowsInserted: number; rowsUpdated: number }> {
): Promise<{ processedRows: number }> {
const rowsToUpsert = [];
const passThrough = new PassThrough();
const csvStream = csv.parseString(fileBuffer.toString(), {
headers: true,
strictColumnHandling: true,
discardUnmappedColumns: true,
});
const rowsToInsert = [];
const rowsToUpdate = [];
const idstoUpdate = new Set();
const primaryKeyColumnSchema = internalTableColumnSchema.filter(
(colDetails) => colDetails.keytype === 'PRIMARY KEY'
);
const primaryKeyValuesToUpsert = new Set();
let rowsProcessed = 0;

const passThrough = new PassThrough();

csvStream
.on('headers', (headers) => this.validateHeadersAsColumnSubset(internalTableColumnSchema, headers, csvStream))
.transform((row) => this.validateAndParseColumnDataType(internalTableColumnSchema, row, rowsProcessed, csvStream))
.transform((row) =>
this.validateAndParseColumnDataType(
internalTableColumnSchema,
primaryKeyColumnSchema,
row,
rowsProcessed,
csvStream
)
)
.on('data', (row) => {
rowsProcessed++;
if (row.id) {
if (idstoUpdate.has(row.id)) {
throw new BadRequestException(`Duplicate 'id' value found on row[${rowsProcessed + 1}]: ${row.id}`);
}

idstoUpdate.add(row.id);
rowsToUpdate.push(row);
} else {
// TODO: Revise logic for primary key instead of hardcoded id column
// eslint-disable-next-line @typescript-eslint/no-unused-vars
const { id, ...rowWithoutId } = row;
rowsToInsert.push(rowWithoutId);
const primaryKeyValuesIdentifier = Object.entries(row)
.map(([columnName, value]) => {
const primaryKey = this.findPrimaryKey(columnName, primaryKeyColumnSchema);

if (isEmpty(primaryKey)) return null;
if (isEmpty(value) && !isEmpty(primaryKey.column_default)) return uuid();
return value;
})
.filter((value) => value !== null)
.join('-');

if (primaryKeyValuesToUpsert.has(primaryKeyValuesIdentifier)) {
throw new BadRequestException(`Duplicate primary key found on row[${rowsProcessed + 1}]`);
}

primaryKeyValuesToUpsert.add(primaryKeyValuesIdentifier);
rowsToUpsert.push(row);
})
.on('error', (error) => {
csvStream.destroy();
Expand All @@ -83,50 +98,63 @@ export class TooljetDbBulkUploadService {
await pipeline(passThrough, csvStream);

await this.tooljetDbManager.transaction(async (tooljetDbManager) => {
await this.bulkInsertRows(tooljetDbManager, rowsToInsert, internalTableId);
await this.bulkUpdateRows(tooljetDbManager, rowsToUpdate, internalTableId);
await this.bulkUpsertRows(tooljetDbManager, rowsToUpsert, internalTableId, internalTableColumnSchema);
});

return { processedRows: rowsProcessed, rowsInserted: rowsToInsert.length, rowsUpdated: rowsToUpdate.length };
return { processedRows: rowsProcessed };
}

async bulkUpdateRows(tooljetDbManager: EntityManager, rowsToUpdate: unknown[], internalTableId: string) {
if (isEmpty(rowsToUpdate)) return;

const updateQueries = rowsToUpdate.map((row) => {
const columnNames = Object.keys(rowsToUpdate[0]);
const setClauses = columnNames
.map((column) => {
return `${column} = $${columnNames.indexOf(column) + 1}`;
})
.join(', ');

return {
text: `UPDATE "${internalTableId}" SET ${setClauses} WHERE id = $${columnNames.indexOf('id') + 1}`,
values: columnNames.map((column) => row[column]),
};
});
async bulkUpsertRows(
tooljetDbManager: EntityManager,
rowsToUpsert: unknown[],
internalTableId: string,
internalTableColumnSchema: TableColumnSchema[]
) {
if (isEmpty(rowsToUpsert)) return;

for (const updateQuery of updateQueries) {
await tooljetDbManager.query(updateQuery.text, updateQuery.values);
}
}
const primaryKeyColumns = internalTableColumnSchema
.filter((colDetails) => colDetails.keytype === 'PRIMARY KEY')
.map((colDetails) => colDetails.column_name);

async bulkInsertRows(tooljetDbManager: EntityManager, rowsToInsert: unknown[], internalTableId: string) {
if (isEmpty(rowsToInsert)) return;
const serialTypeColumns = internalTableColumnSchema
.filter((colDetails) => colDetails.data_type === 'integer' && /^nextval\(/.test(colDetails.column_default))
.map((colDetails) => colDetails.column_name);

const insertQueries = rowsToInsert.map((row, index) => {
return {
text: `INSERT INTO "${internalTableId}" (${Object.keys(row).join(', ')}) VALUES (${Object.values(row).map(
(_, index) => `$${index + 1}`
)})`,
values: Object.values(row),
};
});
const allValueSets = [];
let allPlaceholders = [];
let parameterIndex = 1;

for (const insertQuery of insertQueries) {
await tooljetDbManager.query(insertQuery.text, insertQuery.values);
for (const row of rowsToUpsert) {
const valueSet = [];
const currentPlaceholders = [];

for (const col of Object.keys(row)) {
if (serialTypeColumns.includes(col) || (row[col] === null && primaryKeyColumns.includes(col))) {
valueSet.push('DEFAULT');
} else {
valueSet.push(`$${parameterIndex++}`);
currentPlaceholders.push(row[col]);
}
}

allValueSets.push(`(${valueSet.join(', ')})`);
allPlaceholders = allPlaceholders.concat(currentPlaceholders);
}

const allColumns = Object.keys(rowsToUpsert[0]);

const onConflictUpdate = allColumns
.filter((col) => !primaryKeyColumns.includes(col))
.map((col) => `"${col}" = EXCLUDED."${col}"`)
.join(', ');

const queryText =
`INSERT INTO "${internalTableId}" ("${allColumns.join('", "')}") ` +
`VALUES ${allValueSets.join(', ')} ` +
`ON CONFLICT (${primaryKeyColumns.join(', ')}) ` +
`DO UPDATE SET ${onConflictUpdate};`;

await tooljetDbManager.query(queryText, allPlaceholders);
}

async validateHeadersAsColumnSubset(
Expand All @@ -145,8 +173,15 @@ export class TooljetDbBulkUploadService {
}
}

findPrimaryKey(columnName: string, primaryKeyColumns: TableColumnSchema[]) {
return primaryKeyColumns.find(
(colDetails) => colDetails.column_name === columnName && colDetails.keytype === 'PRIMARY KEY'
);
}

validateAndParseColumnDataType(
internalTableColumnSchema: TableColumnSchema[],
primaryKeyColumnSchema: TableColumnSchema[],
row: unknown,
rowsProcessed: number,
csvStream: csv.CsvParserStream<csv.ParserRow<any>, csv.ParserRow<any>>
Expand All @@ -157,13 +192,18 @@ export class TooljetDbBulkUploadService {
const columnsInCsv = Object.keys(row);
const transformedRow = columnsInCsv.reduce((result, columnInCsv) => {
const columnDetails = internalTableColumnSchema.find((colDetails) => colDetails.column_name === columnInCsv);
const primaryKey = this.findPrimaryKey(columnInCsv, primaryKeyColumnSchema);

if (!isEmpty(primaryKey) && isEmpty(primaryKey.column_default) && isEmpty(row[columnInCsv]))
throw `Primary key required for column ${columnDetails.column_name}`;

result[columnInCsv] = this.convertToDataType(row[columnInCsv], columnDetails.data_type);
return result;
}, {});

return transformedRow;
} catch (error) {
csvStream.emit('error', `Data type error at row[${rowsProcessed + 1}]: ${error}`);
csvStream.emit('error', `Error at row[${rowsProcessed + 1}]: ${error}`);
}
}

Expand Down

0 comments on commit 897e410

Please sign in to comment.