Skip to content

Commit

Permalink
fix: discard records with deleted_at in getAddedKeys/getUpdatedKeys
Browse files Browse the repository at this point in the history
  • Loading branch information
TBonnin committed Mar 22, 2024
1 parent 46b0cfe commit ac3033f
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 13 deletions.
14 changes: 10 additions & 4 deletions packages/shared/lib/services/sync/data/data.service.ts
Expand Up @@ -188,8 +188,11 @@ export async function getAddedKeys(response: DataRecord[], nangoConnectionId: nu

const knownKeys: string[] = (await schema()
.from(RECORDS_TABLE)
.where('nango_connection_id', nangoConnectionId)
.where('model', model)
.where({
nango_connection_id: nangoConnectionId,
model,
external_deleted_at: null
})
.whereIn('external_id', keys)
.pluck('external_id')) as unknown as string[];

Expand All @@ -212,8 +215,11 @@ export async function getUpdatedKeys(response: DataRecord[], nangoConnectionId:
const rowsToUpdate = await schema()
.from(RECORDS_TABLE)
.pluck('external_id')
.where('nango_connection_id', nangoConnectionId)
.where('model', model)
.where({
nango_connection_id: nangoConnectionId,
model,
external_deleted_at: null
})
.whereIn('external_id', keys)
.whereNotIn(['external_id', 'data_hash'], keysWithHash);

Expand Down
20 changes: 11 additions & 9 deletions packages/shared/lib/services/sync/run.service.integration.test.ts
Expand Up @@ -146,7 +146,8 @@ describe('Running sync', () => {
expect(record._nango_metadata.last_action).toEqual('DELETED');

// records '2' should be back
await runJob(initialRecords, activityLogId, model, connection, sync, trackDeletes, false);
const result = await runJob(initialRecords, activityLogId, model, connection, sync, trackDeletes, false);
expect(result).toEqual({ added: 1, updated: 0, deleted: 0 });

const recordsAfter = await getRecords(connection, model);
const recordAfter = recordsAfter.find((record) => record.id == 2);
Expand Down Expand Up @@ -308,7 +309,7 @@ const runJob = async (
sync: Sync,
trackDeletes: boolean,
softDelete: boolean
) => {
): Promise<SyncResult> => {
// create new sync job
const syncJob = (await jobService.createSyncJob(sync.id, SyncType.INCREMENTAL, SyncStatus.RUNNING, 'test-job-id', connection)) as SyncJob;
if (!syncJob) {
Expand Down Expand Up @@ -345,6 +346,13 @@ const runJob = async (
await jobService.updateSyncJobResult(syncJob.id, updatedResults, model);
// finish the sync
await syncRun.finishSync([model], new Date(), `v1`, 10, trackDeletes);

const syncJobResult = await jobService.getLatestSyncJob(sync.id);
return {
added: syncJobResult?.result?.[model]?.added || 0,
updated: syncJobResult?.result?.[model]?.updated || 0,
deleted: syncJobResult?.result?.[model]?.deleted || 0
};
};

const verifySyncRun = async (
Expand All @@ -358,14 +366,8 @@ const verifySyncRun = async (
const { connection, model, sync, activityLogId } = await dataMocks.upsertRecords(initialRecords);

// Run job to save new records
await runJob(newRecords, activityLogId, model, connection, sync, trackDeletes, softDelete);
const result = await runJob(newRecords, activityLogId, model, connection, sync, trackDeletes, softDelete);

const syncJobResult = await jobService.getLatestSyncJob(sync.id);
const result = {
added: syncJobResult?.result?.[model]?.added || 0,
updated: syncJobResult?.result?.[model]?.updated || 0,
deleted: syncJobResult?.result?.[model]?.deleted || 0
};
expect(result).toEqual(expectedResult);

const records = await getRecords(connection, model);
Expand Down

0 comments on commit ac3033f

Please sign in to comment.