From 4a3b100813803e731091bb15bef8b959a8400aee Mon Sep 17 00:00:00 2001 From: Thomas Bonnin <233326+TBonnin@users.noreply.github.com> Date: Thu, 7 Mar 2024 07:04:00 +0100 Subject: [PATCH 1/3] do not use jsonb_set in postgreSQL Appending _nango_metadata to the record json in postgres tends to be slow when json is big This commit remove the call to jsonb_set in the query and manipulate the json in js land to add _nango_metadata child attributes --- .../lib/db/seeders/connection.seeder.ts | 4 +- packages/shared/lib/models/Sync.ts | 26 ++--- .../shared/lib/services/sync/data/mocks.ts | 51 ++++++++- .../data/records.service.integration.test.ts | 104 +++++++----------- .../lib/services/sync/data/records.service.ts | 99 ++++++++--------- .../shared/lib/utils/encryption.manager.ts | 21 +--- 6 files changed, 156 insertions(+), 149 deletions(-) diff --git a/packages/shared/lib/db/seeders/connection.seeder.ts b/packages/shared/lib/db/seeders/connection.seeder.ts index ef1fd984f3..0e2d4916f3 100644 --- a/packages/shared/lib/db/seeders/connection.seeder.ts +++ b/packages/shared/lib/db/seeders/connection.seeder.ts @@ -25,8 +25,8 @@ export const createConnectionSeeds = async (environmentName = ''): Promise & { id: string | number }; -export interface EncryptedRecord { +export type EncryptedRawRecord = { iv: string; authTag: string; encryptedValue: string; -} - -export type EncryptedInternalDataRecord = { - _nango_metadata: RecordMetadata; -} & EncryptedRecord; +}; -export type GetRecordsResponse = { records: CustomerFacingDataRecord[] | DataRecordWithMetadata[]; next_cursor?: string | null } | null; +export type UnencryptedRawRecord = Record & { id: string | number }; -export interface RawDataRecordResult { +export type RawDataRecordResult = { id: string | number; - record: CustomerFacingDataRecord | EncryptedInternalDataRecord; -} + record: UnencryptedRawRecord | EncryptedRawRecord; +} & RecordMetadata; + +export type GetRecordsResponse = { records: CustomerFacingDataRecord[]; next_cursor?: string | null } | null; +// TO DEPRECATE export type RecordWrapCustomerFacingDataRecord = { record: CustomerFacingDataRecord }[]; export interface DataRecord extends Timestamps { @@ -256,13 +255,14 @@ export interface DataRecord extends Timestamps { export type LastAction = 'ADDED' | 'UPDATED' | 'DELETED' | 'added' | 'updated' | 'deleted'; interface RecordMetadata { - first_seen_at: Date; - last_modified_at: Date; + first_seen_at: string; + last_modified_at: string; last_action: LastAction; - deleted_at: Date | null; + deleted_at: string | null; cursor: string; } +// DEPRECATED export interface DataRecordWithMetadata extends RecordMetadata { record: object; } diff --git a/packages/shared/lib/services/sync/data/mocks.ts b/packages/shared/lib/services/sync/data/mocks.ts index 0a7cbfdbc8..e3eefdd360 100644 --- a/packages/shared/lib/services/sync/data/mocks.ts +++ b/packages/shared/lib/services/sync/data/mocks.ts @@ -3,15 +3,64 @@ import { createSyncSeeds } from '../../../db/seeders/sync.seeder.js'; import { createSyncJobSeeds } from '../../../db/seeders/sync-job.seeder.js'; import { formatDataRecords } from './records.service.js'; import type { DataResponse } from '../../../models/Data.js'; +import connectionService from '../../connection.service.js'; +import * as DataService from './data.service.js'; +import type { Connection } from '../../../models/Connection.js'; + +export async function upsertRecords(n: number): Promise<{ connection: Connection; model: string }> { + const activityLogId = 1; + const environmentId = 1; + const environmentName = 'mock-records'; + const toInsert = generateInsertableJson(n); + const { + response: { response: records }, + meta: { modelName, nangoConnectionId } + } = await createRecords(toInsert, environmentName); + if (!records) { + throw new Error('Failed to format records'); + } + const connection = await connectionService.getConnectionById(nangoConnectionId); + if (!connection) { + throw new Error(`Connection '${nangoConnectionId}' not found`); + } + const chunkSize = 1000; + for (let i = 0; i < records.length; i += chunkSize) { + const { error, success } = await DataService.upsert( + records.slice(i, i + chunkSize), + '_nango_sync_data_records', + 'external_id', + nangoConnectionId, + modelName, + activityLogId, + environmentId + ); + if (!success) { + throw new Error(`Failed to upsert records: ${error}`); + } + } + return { + connection: connection as Connection, + model: modelName + }; +} export async function createRecords(records: DataResponse[], environmentName = '') { const connections = await createConnectionSeeds(environmentName); const [nangoConnectionId]: number[] = connections; + if (!nangoConnectionId) { + throw new Error('Failed to create connection'); + } const sync = await createSyncSeeds(nangoConnectionId); + if (!sync.id) { + throw new Error('Failed to create sync'); + } const job = await createSyncJobSeeds(nangoConnectionId); + if (!job.id) { + throw new Error('Failed to create job'); + } const modelName = Math.random().toString(36).substring(7); - const response = formatDataRecords(records, nangoConnectionId as number, modelName, sync.id as string, job.id as number); + const response = formatDataRecords(records, nangoConnectionId, modelName, sync.id, job.id); return { meta: { diff --git a/packages/shared/lib/services/sync/data/records.service.integration.test.ts b/packages/shared/lib/services/sync/data/records.service.integration.test.ts index 5a888cf5f2..809c4addfa 100644 --- a/packages/shared/lib/services/sync/data/records.service.integration.test.ts +++ b/packages/shared/lib/services/sync/data/records.service.integration.test.ts @@ -1,56 +1,59 @@ import { expect, describe, it, beforeAll } from 'vitest'; import dayjs from 'dayjs'; import { multipleMigrations } from '../../../db/database.js'; -import * as DataService from './data.service.js'; import * as RecordsService from './records.service.js'; import { createConfigSeeds } from '../../../db/seeders/config.seeder.js'; -import type { GetRecordsResponse, DataRecord } from '../../../models/Sync.js'; -import type { ServiceResponse } from '../../../models/Generic.js'; -import connectionService from '../../connection.service.js'; -import { generateInsertableJson, createRecords } from './mocks.js'; +import { upsertRecords } from './mocks.js'; const environmentName = 'records-service'; -describe('Records service integration test', () => { +describe('Records service', () => { beforeAll(async () => { await multipleMigrations(); await createConfigSeeds(environmentName); }); + it('Should retrieve records', async () => { + const n = 10; + const { connection, model } = await upsertRecords(n); + const { success, response, error } = await RecordsService.getAllDataRecords( + connection.connection_id, + connection.provider_config_key, + connection.environment_id, + model + ); + expect(success).toBe(true); + expect(error).toBe(null); + expect(response?.records.length).toBe(n); + const timestampRegex = /^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.\d{6}\+\d{2}:\d{2}$/; + expect(response?.records[0]?.['_nango_metadata']).toMatchObject({ + first_seen_at: expect.stringMatching(timestampRegex), + last_modified_at: expect.stringMatching(timestampRegex), + last_action: 'ADDED', + deleted_at: null, + cursor: expect.stringMatching(/^[A-Za-z0-9+/]+={0,2}$/) // base64 encoded string + }); + expect(response?.next_cursor).toBe(null); // no next page + }); + it('Should paginate the records to retrieve all records', async () => { const numOfRecords = 3000; const limit = 100; - const records = generateInsertableJson(numOfRecords); - const { response, meta } = await createRecords(records, environmentName); - const { response: formattedResults } = response; - const { modelName, nangoConnectionId } = meta; - const connection = await connectionService.getConnectionById(nangoConnectionId as number); - const { error, success } = await DataService.upsert( - formattedResults as unknown as DataRecord[], - '_nango_sync_data_records', - 'external_id', - nangoConnectionId as number, - modelName, - 1, - 1 - ); - expect(success).toBe(true); - expect(error).toBe(undefined); + const { connection, model } = await upsertRecords(numOfRecords); let cursor = null; - const allFetchedRecords = []; do { - const result = (await RecordsService.getAllDataRecords( - connection?.connection_id as string, // connectionId - connection?.provider_config_key as string, // providerConfigKey - connection?.environment_id as number, // environmentId - modelName, // model + const result = await RecordsService.getAllDataRecords( + connection.connection_id, + connection.provider_config_key, + connection.environment_id, + model, undefined, // delta limit, // limit undefined, // filter cursor // cursor - )) as unknown as ServiceResponse; + ); if (!result.response) { throw new Error('Response is undefined'); @@ -59,7 +62,7 @@ describe('Records service integration test', () => { const { response: recordsResponse, error } = result; expect(error).toBe(null); - expect(response).not.toBe(undefined); + expect(recordsResponse).not.toBe(undefined); const { records, next_cursor } = recordsResponse; @@ -72,10 +75,8 @@ describe('Records service integration test', () => { } while (cursor); for (let i = 1; i < allFetchedRecords.length; i++) { - // @ts-expect-error - const currentRecordDate = dayjs(allFetchedRecords[i]._nango_metadata.first_seen_at); - // @ts-expect-error - const previousRecordDate = dayjs(allFetchedRecords[i - 1]._nango_metadata.first_seen_at); + const currentRecordDate = dayjs(allFetchedRecords[i]?._nango_metadata.first_seen_at); + const previousRecordDate = dayjs(allFetchedRecords[i - 1]?._nango_metadata.first_seen_at); expect(currentRecordDate.isAfter(previousRecordDate) || currentRecordDate.isSame(previousRecordDate)).toBe(true); } @@ -85,44 +86,23 @@ describe('Records service integration test', () => { it('Should be able to retrieve 20K records in under 5s with a cursor', async () => { const numOfRecords = 20000; const limit = 1000; - const records = generateInsertableJson(numOfRecords); - const { response, meta } = await createRecords(records, environmentName); - const { response: formattedResults } = response; - const { modelName, nangoConnectionId } = meta; - - // insert in chunks of 1000 - // @ts-expect-error - for (let i = 0; i < formattedResults?.length; i += 1000) { - const { error, success } = await DataService.upsert( - formattedResults?.slice(i, i + 1000) as unknown as DataRecord[], - '_nango_sync_data_records', - 'external_id', - nangoConnectionId as number, - modelName, - 1, - 1 - ); - expect(success).toBe(true); - expect(error).toBe(undefined); - } - - const connection = await connectionService.getConnectionById(nangoConnectionId as number); + const { connection, model } = await upsertRecords(numOfRecords); let cursor: string | undefined | null = null; let allRecordsLength = 0; const startTime = Date.now(); do { - const { response, error } = (await RecordsService.getAllDataRecords( - connection?.connection_id as string, // connectionId - connection?.provider_config_key as string, // providerConfigKey - connection?.environment_id as number, // environmentId - modelName, // model + const { response, error } = await RecordsService.getAllDataRecords( + connection.connection_id, + connection.provider_config_key, + connection.environment_id, + model, // model undefined, // delta limit, // limit undefined, // filter cursor // cursor - )) as unknown as ServiceResponse; + ); if (!response) { throw new Error('Response is undefined'); diff --git a/packages/shared/lib/services/sync/data/records.service.ts b/packages/shared/lib/services/sync/data/records.service.ts index 9f4a0bbea6..3201787a0c 100644 --- a/packages/shared/lib/services/sync/data/records.service.ts +++ b/packages/shared/lib/services/sync/data/records.service.ts @@ -421,57 +421,66 @@ export async function getAllDataRecords( } } - let nextCursor = null; - - const result: RawDataRecordResult[] = await query.select( + const rawResults: RawDataRecordResult[] = await query.select( + // PostgreSQL stores timestamp with microseconds precision + // however, javascript date only supports milliseconds precision + // we therefore convert timestamp to string (using to_json()) in order to avoid precision loss db.knex.raw(` - jsonb_set( - json::jsonb, - '{_nango_metadata}', - jsonb_build_object( - 'first_seen_at', created_at, - 'last_modified_at', updated_at, - 'deleted_at', external_deleted_at, - 'last_action', - CASE - WHEN external_deleted_at IS NOT NULL THEN 'DELETED' - WHEN created_at = updated_at THEN 'ADDED' - ELSE 'UPDATED' - END - ) - ) as record, id + id, + json as record, + to_json(created_at) as first_seen_at, + to_json(updated_at) as last_modified_at, + to_json(external_deleted_at) as deleted_at, + CASE + WHEN external_deleted_at IS NOT NULL THEN 'DELETED' + WHEN created_at = updated_at THEN 'ADDED' + ELSE 'UPDATED' + END as last_action `) ); - if (result.length === 0) { - return { success: true, error: null, response: { records: [], next_cursor: nextCursor } }; + if (rawResults.length === 0) { + return { success: true, error: null, response: { records: [], next_cursor: null } }; } - const customerResult = result.map((item) => { + const results = rawResults.flatMap((item) => { const decryptedRecord = encryptionManager.decryptDataRecord(item); if (!decryptedRecord) { - return decryptedRecord; + return []; } - const nextCursor = item.record._nango_metadata.last_modified_at.toString(); + const lastModifiedAt = item.last_modified_at; const id = item.id; - const encodedCursorValue = Buffer.from(`${nextCursor}||${id}`).toString('base64'); - decryptedRecord['_nango_metadata']['cursor'] = encodedCursorValue; - return decryptedRecord; + const encodedCursor = Buffer.from(`${lastModifiedAt}||${id}`).toString('base64'); + return [ + { + ...decryptedRecord, + _nango_metadata: { + first_seen_at: item.first_seen_at, + last_modified_at: item.last_modified_at, + last_action: item.last_action, + deleted_at: item.deleted_at, + cursor: encodedCursor + } + } as CustomerFacingDataRecord + ]; }); - if (customerResult.length > Number(limit || 100)) { - customerResult.pop(); - result.pop(); + if (results.length > Number(limit || 100)) { + results.pop(); + rawResults.pop(); + + const cursorRawElement = rawResults[rawResults.length - 1]; - const cursorRawElement = result[result.length - 1] as SyncDataRecord; - const cursorElement = customerResult[customerResult.length - 1] as unknown as CustomerFacingDataRecord; + if (!cursorRawElement) { + return { success: true, error: null, response: { records: results, next_cursor: null } }; + } - nextCursor = cursorElement['_nango_metadata']['last_modified_at'] as unknown as string; - const encodedCursorValue = Buffer.from(`${nextCursor}||${cursorRawElement.id}`).toString('base64'); + const lastModifiedAt = cursorRawElement.last_modified_at; + const encodedCursorValue = Buffer.from(`${lastModifiedAt}||${cursorRawElement.id}`).toString('base64'); - return { success: true, error: null, response: { records: customerResult as CustomerFacingDataRecord[], next_cursor: encodedCursorValue } }; + return { success: true, error: null, response: { records: results, next_cursor: encodedCursorValue } }; } else { - return { success: true, error: null, response: { records: customerResult as CustomerFacingDataRecord[], next_cursor: nextCursor } }; + return { success: true, error: null, response: { records: results, next_cursor: null } }; } } catch (e: any) { const errorMessage = `List records error for model ${model}`; @@ -512,26 +521,6 @@ export function verifyUniqueKeysAreUnique(data: DataResponse[], optionalUniqueKe return { isUnique, nonUniqueKeys }; } -export async function getSingleRecord(external_id: string, nango_connection_id: number, model: string): Promise { - const encryptedRecord = await schema().from('_nango_sync_data_records').where({ - nango_connection_id, - model, - external_id - }); - - if (!encryptedRecord) { - return null; - } - - const result = encryptionManager.decryptDataRecords(encryptedRecord, 'json'); - - if (!result || result.length === 0) { - return null; - } - - return result[0] as unknown as SyncDataRecord; -} - export async function getRecordsByExternalIds(external_ids: string[], nango_connection_id: number, model: string): Promise { const encryptedRecords = await schema() .from('_nango_sync_data_records') diff --git a/packages/shared/lib/utils/encryption.manager.ts b/packages/shared/lib/utils/encryption.manager.ts index 4aba067ee5..9838aeb417 100644 --- a/packages/shared/lib/utils/encryption.manager.ts +++ b/packages/shared/lib/utils/encryption.manager.ts @@ -5,14 +5,7 @@ import type { DBConfig } from '../models/Generic.js'; import type { Environment } from '../models/Environment.js'; import type { EnvironmentVariable } from '../models/EnvironmentVariable.js'; import type { Connection, ApiConnection, StoredConnection } from '../models/Connection.js'; -import type { - EncryptedRecord, - CustomerFacingDataRecord, - RawDataRecordResult, - DataRecord, - DataRecordWithMetadata, - RecordWrapCustomerFacingDataRecord -} from '../models/Sync.js'; +import type { RawDataRecordResult, DataRecord, DataRecordWithMetadata, RecordWrapCustomerFacingDataRecord, UnencryptedRawRecord } from '../models/Sync.js'; import db from '../db/database.js'; import util from 'util'; @@ -279,7 +272,7 @@ class EncryptionManager { return decryptedDataRecords as unknown as DataRecordWithMetadata[] | RecordWrapCustomerFacingDataRecord; } - public decryptDataRecord(dataRecord: RawDataRecordResult): CustomerFacingDataRecord | null { + public decryptDataRecord(dataRecord: RawDataRecordResult): UnencryptedRawRecord | null { if (dataRecord === null) { return dataRecord; } @@ -287,20 +280,16 @@ class EncryptionManager { const record = dataRecord.record; if (!record['encryptedValue']) { - return record as CustomerFacingDataRecord; + return record as UnencryptedRawRecord; } - const { encryptedValue, iv, authTag } = record as EncryptedRecord; + const { encryptedValue, iv, authTag } = record; const decryptedString = this.decrypt(encryptedValue, iv, authTag); const updatedRecord = { ...JSON.parse(decryptedString) - } as CustomerFacingDataRecord; - - if (record._nango_metadata) { - updatedRecord['_nango_metadata'] = record['_nango_metadata']; - } + }; return updatedRecord; } From bca55c0241e238843bbc4595cfb18912fb7bdab7 Mon Sep 17 00:00:00 2001 From: Thomas Bonnin <233326+TBonnin@users.noreply.github.com> Date: Thu, 7 Mar 2024 09:02:47 +0100 Subject: [PATCH 2/3] fix: RecordMetadata date field to string --- packages/node-client/lib/index.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/node-client/lib/index.ts b/packages/node-client/lib/index.ts index 78de6178d8..0b7429b0f8 100644 --- a/packages/node-client/lib/index.ts +++ b/packages/node-client/lib/index.ts @@ -81,10 +81,10 @@ export interface NangoSyncWebhookBody { export type LastAction = 'ADDED' | 'UPDATED' | 'DELETED'; export interface RecordMetadata { - first_seen_at: Date; - last_seen_at: Date; + first_seen_at: string; + last_seen_at: string; last_action: LastAction; - deleted_at: Date | null; + deleted_at: string | null; cursor: string; } From 03dfb451f69583c4a2dda306069a3a3f102c01be Mon Sep 17 00:00:00 2001 From: Thomas Bonnin <233326+TBonnin@users.noreply.github.com> Date: Thu, 7 Mar 2024 09:09:48 +0100 Subject: [PATCH 3/3] record encryption cleanup --- .../lib/services/sync/data/records.service.ts | 45 +++++++------------ .../shared/lib/utils/encryption.manager.ts | 12 ++--- 2 files changed, 19 insertions(+), 38 deletions(-) diff --git a/packages/shared/lib/services/sync/data/records.service.ts b/packages/shared/lib/services/sync/data/records.service.ts index 3201787a0c..0b23e643a4 100644 --- a/packages/shared/lib/services/sync/data/records.service.ts +++ b/packages/shared/lib/services/sync/data/records.service.ts @@ -443,26 +443,19 @@ export async function getAllDataRecords( return { success: true, error: null, response: { records: [], next_cursor: null } }; } - const results = rawResults.flatMap((item) => { + const results = rawResults.map((item) => { const decryptedRecord = encryptionManager.decryptDataRecord(item); - if (!decryptedRecord) { - return []; - } - const lastModifiedAt = item.last_modified_at; - const id = item.id; - const encodedCursor = Buffer.from(`${lastModifiedAt}||${id}`).toString('base64'); - return [ - { - ...decryptedRecord, - _nango_metadata: { - first_seen_at: item.first_seen_at, - last_modified_at: item.last_modified_at, - last_action: item.last_action, - deleted_at: item.deleted_at, - cursor: encodedCursor - } - } as CustomerFacingDataRecord - ]; + const encodedCursor = Buffer.from(`${item.last_modified_at}||${item.id}`).toString('base64'); + return { + ...decryptedRecord, + _nango_metadata: { + first_seen_at: item.first_seen_at, + last_modified_at: item.last_modified_at, + last_action: item.last_action, + deleted_at: item.deleted_at, + cursor: encodedCursor + } + } as CustomerFacingDataRecord; }); if (results.length > Number(limit || 100)) { @@ -470,18 +463,12 @@ export async function getAllDataRecords( rawResults.pop(); const cursorRawElement = rawResults[rawResults.length - 1]; - - if (!cursorRawElement) { - return { success: true, error: null, response: { records: results, next_cursor: null } }; + if (cursorRawElement) { + const encodedCursorValue = Buffer.from(`${cursorRawElement.last_modified_at}||${cursorRawElement.id}`).toString('base64'); + return { success: true, error: null, response: { records: results, next_cursor: encodedCursorValue } }; } - - const lastModifiedAt = cursorRawElement.last_modified_at; - const encodedCursorValue = Buffer.from(`${lastModifiedAt}||${cursorRawElement.id}`).toString('base64'); - - return { success: true, error: null, response: { records: results, next_cursor: encodedCursorValue } }; - } else { - return { success: true, error: null, response: { records: results, next_cursor: null } }; } + return { success: true, error: null, response: { records: results, next_cursor: null } }; } catch (e: any) { const errorMessage = `List records error for model ${model}`; await telemetry.log(LogTypes.SYNC_GET_RECORDS_QUERY_TIMEOUT, errorMessage, LogActionEnum.SYNC, { diff --git a/packages/shared/lib/utils/encryption.manager.ts b/packages/shared/lib/utils/encryption.manager.ts index 9838aeb417..29d45cfe99 100644 --- a/packages/shared/lib/utils/encryption.manager.ts +++ b/packages/shared/lib/utils/encryption.manager.ts @@ -272,11 +272,7 @@ class EncryptionManager { return decryptedDataRecords as unknown as DataRecordWithMetadata[] | RecordWrapCustomerFacingDataRecord; } - public decryptDataRecord(dataRecord: RawDataRecordResult): UnencryptedRawRecord | null { - if (dataRecord === null) { - return dataRecord; - } - + public decryptDataRecord(dataRecord: RawDataRecordResult): UnencryptedRawRecord { const record = dataRecord.record; if (!record['encryptedValue']) { @@ -287,11 +283,9 @@ class EncryptionManager { const decryptedString = this.decrypt(encryptedValue, iv, authTag); - const updatedRecord = { + return { ...JSON.parse(decryptedString) - }; - - return updatedRecord; + } as UnencryptedRawRecord; } public async encryptAllDataRecords(): Promise {