diff --git a/packages/shared/lib/services/sync/job.service.ts b/packages/shared/lib/services/sync/job.service.ts index 78baff9692..e5721c3ff2 100644 --- a/packages/shared/lib/services/sync/job.service.ts +++ b/packages/shared/lib/services/sync/job.service.ts @@ -89,47 +89,60 @@ export const updateLatestJobSyncStatus = async (sync_id: string, status: SyncSta * @desc grab any existing results and add them to the current */ export const updateSyncJobResult = async (id: number, result: SyncResultByModel, model: string): Promise => { - const { result: existingResult } = await schema().from(SYNC_JOB_TABLE).select('result').where({ id }).first(); - - if (!existingResult || Object.keys(existingResult).length === 0) { - const [updatedRow] = await schema() + return db.knex.transaction(async (trx) => { + const { result: existingResult } = await schema() .from(SYNC_JOB_TABLE) - .where({ id, deleted: false }) - .update({ - result - }) - .returning('*'); - - return updatedRow as SyncJob; - } else { - const { added, updated, deleted } = existingResult[model] || { added: 0, updated: 0, deleted: 0 }; - - const incomingResult = result[model]; - const finalResult = { - ...existingResult, - [model]: { - added: Number(added) + Number(incomingResult?.added), - updated: Number(updated) + Number(incomingResult?.updated) - } - }; - - const deletedValue = Number(deleted) || 0; - const incomingDeletedValue = Number(incomingResult?.deleted) || 0; + .transacting(trx) + .forUpdate() + .select('result') + .forUpdate() + .where({ id }) + .first(); + + if (!existingResult || Object.keys(existingResult).length === 0) { + const [updatedRow] = await schema() + .from(SYNC_JOB_TABLE) + .transacting(trx) + .forUpdate() + .where({ id, deleted: false }) + .update({ + result + }) + .returning('*'); + + return updatedRow as SyncJob; + } else { + const { added, updated, deleted } = existingResult[model] || { added: 0, updated: 0, deleted: 0 }; + + const incomingResult = result[model]; + const finalResult = { + ...existingResult, + [model]: { + added: Number(added) + Number(incomingResult?.added), + updated: Number(updated) + Number(incomingResult?.updated) + } + }; - if (deletedValue !== 0 || incomingDeletedValue !== 0) { - finalResult[model].deleted = deletedValue + incomingDeletedValue; - } + const deletedValue = Number(deleted) || 0; + const incomingDeletedValue = Number(incomingResult?.deleted) || 0; - const [updatedRow] = await schema() - .from(SYNC_JOB_TABLE) - .where({ id, deleted: false }) - .update({ - result: finalResult - }) - .returning('*'); + if (deletedValue !== 0 || incomingDeletedValue !== 0) { + finalResult[model].deleted = deletedValue + incomingDeletedValue; + } - return updatedRow as SyncJob; - } + const [updatedRow] = await schema() + .from(SYNC_JOB_TABLE) + .transacting(trx) + .forUpdate() + .where({ id, deleted: false }) + .update({ + result: finalResult + }) + .returning('*'); + + return updatedRow as SyncJob; + } + }); }; export const addSyncConfigToJob = async (id: number, sync_config_id: number): Promise => {