Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: TJDB bulk upload support for primary key #9503

Merged
Merged
7 changes: 3 additions & 4 deletions frontend/src/TooljetDatabase/Table/Header.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -100,15 +100,14 @@ const Header = ({
return;
}

const { processed_rows: processedRows, rows_inserted: rowsInserted, rows_updated: rowsUpdated } = data.result;
const toastMessage =
`${pluralize(rowsInserted, 'new row')} added, ` + `${pluralize(rowsUpdated, 'row')} updated.`;
const { processed_rows: processedRows } = data.result;
const toastMessage = `${pluralize(processedRows, 'row')} processed`;

toast.success(toastMessage, {
position: 'top-center',
});

setUploadResult({ processedRows, rowsInserted, rowsUpdated });
setUploadResult({ processedRows });
} catch (error) {
toast.error(error.errors, { position: 'top-center' });
setIsBulkUploading(false);
Expand Down
148 changes: 94 additions & 54 deletions server/src/services/tooljet_db_bulk_upload.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { InjectEntityManager } from '@nestjs/typeorm';
import { isEmpty } from 'lodash';
import { pipeline } from 'stream/promises';
import { PassThrough } from 'stream';
import { v4 as uuid } from 'uuid';

const MAX_ROW_COUNT = 1000;

Expand Down Expand Up @@ -40,37 +41,51 @@ export class TooljetDbBulkUploadService {
internalTableId: string,
internalTableColumnSchema: TableColumnSchema[],
fileBuffer: Buffer
): Promise<{ processedRows: number; rowsInserted: number; rowsUpdated: number }> {
): Promise<{ processedRows: number }> {
const rowsToUpsert = [];
const passThrough = new PassThrough();
const csvStream = csv.parseString(fileBuffer.toString(), {
headers: true,
strictColumnHandling: true,
discardUnmappedColumns: true,
});
const rowsToInsert = [];
const rowsToUpdate = [];
const idstoUpdate = new Set();
const primaryKeyColumnSchema = internalTableColumnSchema.filter(
(colDetails) => colDetails.keytype === 'PRIMARY KEY'
);
const primaryKeyValuesToUpsert = new Set();
let rowsProcessed = 0;

const passThrough = new PassThrough();

csvStream
.on('headers', (headers) => this.validateHeadersAsColumnSubset(internalTableColumnSchema, headers, csvStream))
.transform((row) => this.validateAndParseColumnDataType(internalTableColumnSchema, row, rowsProcessed, csvStream))
.transform((row) =>
this.validateAndParseColumnDataType(
internalTableColumnSchema,
primaryKeyColumnSchema,
row,
rowsProcessed,
csvStream
)
)
.on('data', (row) => {
rowsProcessed++;
if (row.id) {
if (idstoUpdate.has(row.id)) {
throw new BadRequestException(`Duplicate 'id' value found on row[${rowsProcessed + 1}]: ${row.id}`);
}

idstoUpdate.add(row.id);
rowsToUpdate.push(row);
} else {
// TODO: Revise logic for primary key instead of hardcoded id column
// eslint-disable-next-line @typescript-eslint/no-unused-vars
const { id, ...rowWithoutId } = row;
rowsToInsert.push(rowWithoutId);
const primaryKeyValuesIdentifier = Object.entries(row)
.map(([columnName, value]) => {
const primaryKey = this.findPrimaryKey(columnName, primaryKeyColumnSchema);

if (isEmpty(primaryKey)) return null;
if (isEmpty(value) && !isEmpty(primaryKey.column_default)) return uuid();
return value;
})
.filter((value) => value !== null)
.join('-');

if (primaryKeyValuesToUpsert.has(primaryKeyValuesIdentifier)) {
throw new BadRequestException(`Duplicate primary key found on row[${rowsProcessed + 1}]`);
}

primaryKeyValuesToUpsert.add(primaryKeyValuesIdentifier);
rowsToUpsert.push(row);
})
.on('error', (error) => {
csvStream.destroy();
Expand All @@ -83,50 +98,63 @@ export class TooljetDbBulkUploadService {
await pipeline(passThrough, csvStream);

await this.tooljetDbManager.transaction(async (tooljetDbManager) => {
await this.bulkInsertRows(tooljetDbManager, rowsToInsert, internalTableId);
await this.bulkUpdateRows(tooljetDbManager, rowsToUpdate, internalTableId);
await this.bulkUpsertRows(tooljetDbManager, rowsToUpsert, internalTableId, internalTableColumnSchema);
});

return { processedRows: rowsProcessed, rowsInserted: rowsToInsert.length, rowsUpdated: rowsToUpdate.length };
return { processedRows: rowsProcessed };
}

async bulkUpdateRows(tooljetDbManager: EntityManager, rowsToUpdate: unknown[], internalTableId: string) {
if (isEmpty(rowsToUpdate)) return;

const updateQueries = rowsToUpdate.map((row) => {
const columnNames = Object.keys(rowsToUpdate[0]);
const setClauses = columnNames
.map((column) => {
return `${column} = $${columnNames.indexOf(column) + 1}`;
})
.join(', ');

return {
text: `UPDATE "${internalTableId}" SET ${setClauses} WHERE id = $${columnNames.indexOf('id') + 1}`,
values: columnNames.map((column) => row[column]),
};
});
async bulkUpsertRows(
tooljetDbManager: EntityManager,
rowsToUpsert: unknown[],
internalTableId: string,
internalTableColumnSchema: TableColumnSchema[]
) {
if (isEmpty(rowsToUpsert)) return;

for (const updateQuery of updateQueries) {
await tooljetDbManager.query(updateQuery.text, updateQuery.values);
}
}
const primaryKeyColumns = internalTableColumnSchema
.filter((colDetails) => colDetails.keytype === 'PRIMARY KEY')
.map((colDetails) => colDetails.column_name);

async bulkInsertRows(tooljetDbManager: EntityManager, rowsToInsert: unknown[], internalTableId: string) {
if (isEmpty(rowsToInsert)) return;
const serialTypeColumns = internalTableColumnSchema
.filter((colDetails) => colDetails.data_type === 'integer' && /^nextval\(/.test(colDetails.column_default))
.map((colDetails) => colDetails.column_name);

const insertQueries = rowsToInsert.map((row, index) => {
return {
text: `INSERT INTO "${internalTableId}" (${Object.keys(row).join(', ')}) VALUES (${Object.values(row).map(
(_, index) => `$${index + 1}`
)})`,
values: Object.values(row),
};
});
const allValueSets = [];
let allPlaceholders = [];
let parameterIndex = 1;

for (const insertQuery of insertQueries) {
await tooljetDbManager.query(insertQuery.text, insertQuery.values);
for (const row of rowsToUpsert) {
const valueSet = [];
const currentPlaceholders = [];

for (const col of Object.keys(row)) {
if (serialTypeColumns.includes(col) || (row[col] === null && primaryKeyColumns.includes(col))) {
valueSet.push('DEFAULT');
} else {
valueSet.push(`$${parameterIndex++}`);
currentPlaceholders.push(row[col]);
}
}

allValueSets.push(`(${valueSet.join(', ')})`);
allPlaceholders = allPlaceholders.concat(currentPlaceholders);
}

const allColumns = Object.keys(rowsToUpsert[0]);

const onConflictUpdate = allColumns
.filter((col) => !primaryKeyColumns.includes(col))
.map((col) => `"${col}" = EXCLUDED."${col}"`)
.join(', ');

const queryText =
`INSERT INTO "${internalTableId}" ("${allColumns.join('", "')}") ` +
`VALUES ${allValueSets.join(', ')} ` +
`ON CONFLICT (${primaryKeyColumns.join(', ')}) ` +
`DO UPDATE SET ${onConflictUpdate};`;

await tooljetDbManager.query(queryText, allPlaceholders);
}

async validateHeadersAsColumnSubset(
Expand All @@ -145,8 +173,15 @@ export class TooljetDbBulkUploadService {
}
}

findPrimaryKey(columnName: string, primaryKeyColumns: TableColumnSchema[]) {
return primaryKeyColumns.find(
(colDetails) => colDetails.column_name === columnName && colDetails.keytype === 'PRIMARY KEY'
);
}

validateAndParseColumnDataType(
internalTableColumnSchema: TableColumnSchema[],
primaryKeyColumnSchema: TableColumnSchema[],
row: unknown,
rowsProcessed: number,
csvStream: csv.CsvParserStream<csv.ParserRow<any>, csv.ParserRow<any>>
Expand All @@ -157,13 +192,18 @@ export class TooljetDbBulkUploadService {
const columnsInCsv = Object.keys(row);
const transformedRow = columnsInCsv.reduce((result, columnInCsv) => {
const columnDetails = internalTableColumnSchema.find((colDetails) => colDetails.column_name === columnInCsv);
const primaryKey = this.findPrimaryKey(columnInCsv, primaryKeyColumnSchema);

if (!isEmpty(primaryKey) && isEmpty(primaryKey.column_default) && isEmpty(row[columnInCsv]))
throw `Primary key required for column ${columnDetails.column_name}`;

result[columnInCsv] = this.convertToDataType(row[columnInCsv], columnDetails.data_type);
return result;
}, {});

return transformedRow;
} catch (error) {
csvStream.emit('error', `Data type error at row[${rowsProcessed + 1}]: ${error}`);
csvStream.emit('error', `Error at row[${rowsProcessed + 1}]: ${error}`);
}
}

Expand Down