Skip to content

Commit

Permalink
fix: prevent race condition when updating job results
Browse files Browse the repository at this point in the history
locking the job row to prevent concurrent call to UpdateSyncJobResult to
overwrite each other
  • Loading branch information
TBonnin committed Mar 18, 2024
1 parent 7797512 commit 5fac4c9
Showing 1 changed file with 50 additions and 37 deletions.
87 changes: 50 additions & 37 deletions packages/shared/lib/services/sync/job.service.ts
Expand Up @@ -89,47 +89,60 @@ export const updateLatestJobSyncStatus = async (sync_id: string, status: SyncSta
* @desc grab any existing results and add them to the current
*/
export const updateSyncJobResult = async (id: number, result: SyncResultByModel, model: string): Promise<SyncJob> => {
const { result: existingResult } = await schema().from<SyncJob>(SYNC_JOB_TABLE).select('result').where({ id }).first();

if (!existingResult || Object.keys(existingResult).length === 0) {
const [updatedRow] = await schema()
return db.knex.transaction(async (trx) => {
const { result: existingResult } = await schema()
.from<SyncJob>(SYNC_JOB_TABLE)
.where({ id, deleted: false })
.update({
result
})
.returning('*');

return updatedRow as SyncJob;
} else {
const { added, updated, deleted } = existingResult[model] || { added: 0, updated: 0, deleted: 0 };

const incomingResult = result[model];
const finalResult = {
...existingResult,
[model]: {
added: Number(added) + Number(incomingResult?.added),
updated: Number(updated) + Number(incomingResult?.updated)
}
};

const deletedValue = Number(deleted) || 0;
const incomingDeletedValue = Number(incomingResult?.deleted) || 0;
.transacting(trx)
.forUpdate()
.select('result')
.forUpdate()
.where({ id })
.first();

if (!existingResult || Object.keys(existingResult).length === 0) {
const [updatedRow] = await schema()
.from<SyncJob>(SYNC_JOB_TABLE)
.transacting(trx)
.forUpdate()
.where({ id, deleted: false })
.update({
result
})
.returning('*');

return updatedRow as SyncJob;
} else {
const { added, updated, deleted } = existingResult[model] || { added: 0, updated: 0, deleted: 0 };

const incomingResult = result[model];
const finalResult = {
...existingResult,
[model]: {
added: Number(added) + Number(incomingResult?.added),
updated: Number(updated) + Number(incomingResult?.updated)
}
};

if (deletedValue !== 0 || incomingDeletedValue !== 0) {
finalResult[model].deleted = deletedValue + incomingDeletedValue;
}
const deletedValue = Number(deleted) || 0;
const incomingDeletedValue = Number(incomingResult?.deleted) || 0;

const [updatedRow] = await schema()
.from<SyncJob>(SYNC_JOB_TABLE)
.where({ id, deleted: false })
.update({
result: finalResult
})
.returning('*');
if (deletedValue !== 0 || incomingDeletedValue !== 0) {
finalResult[model].deleted = deletedValue + incomingDeletedValue;

Check warning

Code scanning / CodeQL

Prototype-polluting assignment Medium

This assignment may alter Object.prototype if a malicious '__proto__' string is injected from
user controlled input
.
This assignment may alter Object.prototype if a malicious '__proto__' string is injected from
user controlled input
.
This assignment may alter Object.prototype if a malicious '__proto__' string is injected from
user controlled input
.
}

return updatedRow as SyncJob;
}
const [updatedRow] = await schema()
.from<SyncJob>(SYNC_JOB_TABLE)
.transacting(trx)
.forUpdate()
.where({ id, deleted: false })
.update({
result: finalResult
})
.returning('*');

return updatedRow as SyncJob;
}
});
};

export const addSyncConfigToJob = async (id: number, sync_config_id: number): Promise<void> => {
Expand Down

0 comments on commit 5fac4c9

Please sign in to comment.