New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
do not use jsonb_set in postgreSQL #1814
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -216,23 +216,22 @@ export type CustomerFacingDataRecord = { | |||||||||||||||||||||
_nango_metadata: RecordMetadata; | ||||||||||||||||||||||
} & Record<string, any> & { id: string | number }; | ||||||||||||||||||||||
|
||||||||||||||||||||||
export interface EncryptedRecord { | ||||||||||||||||||||||
export type EncryptedRawRecord = { | ||||||||||||||||||||||
iv: string; | ||||||||||||||||||||||
authTag: string; | ||||||||||||||||||||||
encryptedValue: string; | ||||||||||||||||||||||
} | ||||||||||||||||||||||
|
||||||||||||||||||||||
export type EncryptedInternalDataRecord = { | ||||||||||||||||||||||
_nango_metadata: RecordMetadata; | ||||||||||||||||||||||
} & EncryptedRecord; | ||||||||||||||||||||||
}; | ||||||||||||||||||||||
|
||||||||||||||||||||||
export type GetRecordsResponse = { records: CustomerFacingDataRecord[] | DataRecordWithMetadata[]; next_cursor?: string | null } | null; | ||||||||||||||||||||||
export type UnencryptedRawRecord = Record<string, any> & { id: string | number }; | ||||||||||||||||||||||
|
||||||||||||||||||||||
export interface RawDataRecordResult { | ||||||||||||||||||||||
export type RawDataRecordResult = { | ||||||||||||||||||||||
id: string | number; | ||||||||||||||||||||||
record: CustomerFacingDataRecord | EncryptedInternalDataRecord; | ||||||||||||||||||||||
} | ||||||||||||||||||||||
record: UnencryptedRawRecord | EncryptedRawRecord; | ||||||||||||||||||||||
} & RecordMetadata; | ||||||||||||||||||||||
|
||||||||||||||||||||||
export type GetRecordsResponse = { records: CustomerFacingDataRecord[]; next_cursor?: string | null } | null; | ||||||||||||||||||||||
|
||||||||||||||||||||||
// TO DEPRECATE | ||||||||||||||||||||||
export type RecordWrapCustomerFacingDataRecord = { record: CustomerFacingDataRecord }[]; | ||||||||||||||||||||||
|
||||||||||||||||||||||
export interface DataRecord extends Timestamps { | ||||||||||||||||||||||
|
@@ -256,13 +255,14 @@ export interface DataRecord extends Timestamps { | |||||||||||||||||||||
export type LastAction = 'ADDED' | 'UPDATED' | 'DELETED' | 'added' | 'updated' | 'deleted'; | ||||||||||||||||||||||
|
||||||||||||||||||||||
interface RecordMetadata { | ||||||||||||||||||||||
first_seen_at: Date; | ||||||||||||||||||||||
last_modified_at: Date; | ||||||||||||||||||||||
first_seen_at: string; | ||||||||||||||||||||||
last_modified_at: string; | ||||||||||||||||||||||
last_action: LastAction; | ||||||||||||||||||||||
deleted_at: Date | null; | ||||||||||||||||||||||
deleted_at: string | null; | ||||||||||||||||||||||
cursor: string; | ||||||||||||||||||||||
} | ||||||||||||||||||||||
|
||||||||||||||||||||||
// DEPRECATED | ||||||||||||||||||||||
export interface DataRecordWithMetadata extends RecordMetadata { | ||||||||||||||||||||||
record: object; | ||||||||||||||||||||||
} | ||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. only use in the deprecated records function so it should be removed when we finally delete the deprecated record fetching method
Comment on lines
+265
to
268
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,15 +3,64 @@ import { createSyncSeeds } from '../../../db/seeders/sync.seeder.js'; | |
import { createSyncJobSeeds } from '../../../db/seeders/sync-job.seeder.js'; | ||
import { formatDataRecords } from './records.service.js'; | ||
import type { DataResponse } from '../../../models/Data.js'; | ||
import connectionService from '../../connection.service.js'; | ||
import * as DataService from './data.service.js'; | ||
import type { Connection } from '../../../models/Connection.js'; | ||
|
||
export async function upsertRecords(n: number): Promise<{ connection: Connection; model: string }> { | ||
const activityLogId = 1; | ||
const environmentId = 1; | ||
const environmentName = 'mock-records'; | ||
const toInsert = generateInsertableJson(n); | ||
const { | ||
response: { response: records }, | ||
meta: { modelName, nangoConnectionId } | ||
} = await createRecords(toInsert, environmentName); | ||
if (!records) { | ||
throw new Error('Failed to format records'); | ||
} | ||
const connection = await connectionService.getConnectionById(nangoConnectionId); | ||
if (!connection) { | ||
throw new Error(`Connection '${nangoConnectionId}' not found`); | ||
} | ||
const chunkSize = 1000; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. personal opinion: code without line break is a bit harder to read in the long run |
||
for (let i = 0; i < records.length; i += chunkSize) { | ||
const { error, success } = await DataService.upsert( | ||
records.slice(i, i + chunkSize), | ||
'_nango_sync_data_records', | ||
'external_id', | ||
nangoConnectionId, | ||
modelName, | ||
activityLogId, | ||
environmentId | ||
); | ||
if (!success) { | ||
throw new Error(`Failed to upsert records: ${error}`); | ||
} | ||
} | ||
return { | ||
connection: connection as Connection, | ||
model: modelName | ||
}; | ||
} | ||
|
||
export async function createRecords(records: DataResponse[], environmentName = '') { | ||
const connections = await createConnectionSeeds(environmentName); | ||
|
||
const [nangoConnectionId]: number[] = connections; | ||
if (!nangoConnectionId) { | ||
throw new Error('Failed to create connection'); | ||
} | ||
const sync = await createSyncSeeds(nangoConnectionId); | ||
if (!sync.id) { | ||
throw new Error('Failed to create sync'); | ||
} | ||
const job = await createSyncJobSeeds(nangoConnectionId); | ||
if (!job.id) { | ||
throw new Error('Failed to create job'); | ||
} | ||
const modelName = Math.random().toString(36).substring(7); | ||
const response = formatDataRecords(records, nangoConnectionId as number, modelName, sync.id as string, job.id as number); | ||
const response = formatDataRecords(records, nangoConnectionId, modelName, sync.id, job.id); | ||
|
||
return { | ||
meta: { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,56 +1,59 @@ | ||
import { expect, describe, it, beforeAll } from 'vitest'; | ||
import dayjs from 'dayjs'; | ||
import { multipleMigrations } from '../../../db/database.js'; | ||
import * as DataService from './data.service.js'; | ||
import * as RecordsService from './records.service.js'; | ||
import { createConfigSeeds } from '../../../db/seeders/config.seeder.js'; | ||
import type { GetRecordsResponse, DataRecord } from '../../../models/Sync.js'; | ||
import type { ServiceResponse } from '../../../models/Generic.js'; | ||
import connectionService from '../../connection.service.js'; | ||
import { generateInsertableJson, createRecords } from './mocks.js'; | ||
import { upsertRecords } from './mocks.js'; | ||
|
||
const environmentName = 'records-service'; | ||
|
||
describe('Records service integration test', () => { | ||
describe('Records service', () => { | ||
beforeAll(async () => { | ||
await multipleMigrations(); | ||
await createConfigSeeds(environmentName); | ||
}); | ||
|
||
it('Should retrieve records', async () => { | ||
const n = 10; | ||
const { connection, model } = await upsertRecords(n); | ||
const { success, response, error } = await RecordsService.getAllDataRecords( | ||
connection.connection_id, | ||
connection.provider_config_key, | ||
connection.environment_id, | ||
model | ||
); | ||
expect(success).toBe(true); | ||
expect(error).toBe(null); | ||
expect(response?.records.length).toBe(n); | ||
const timestampRegex = /^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.\d{6}\+\d{2}:\d{2}$/; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I had created an helper in vitest to achieve something similar without copy pasting const dateReg = /^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.\d{3}Z$/;
expect.extend({
toBeIsoDate: (received: any) => {
if (!dateReg.test(received)) {
return {
message: () => `expected ${received} to be an ISO Date`,
pass: false,
};
}
return { pass: true, message: () => '' };
},
}); There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I extended expect at first as well but since it was actually shorter to use stringMatching I went for that |
||
expect(response?.records[0]?.['_nango_metadata']).toMatchObject({ | ||
first_seen_at: expect.stringMatching(timestampRegex), | ||
last_modified_at: expect.stringMatching(timestampRegex), | ||
last_action: 'ADDED', | ||
deleted_at: null, | ||
cursor: expect.stringMatching(/^[A-Za-z0-9+/]+={0,2}$/) // base64 encoded string | ||
}); | ||
expect(response?.next_cursor).toBe(null); // no next page | ||
}); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. adding a test to make sure the format of metadata isn't modified with those changes |
||
|
||
it('Should paginate the records to retrieve all records', async () => { | ||
const numOfRecords = 3000; | ||
const limit = 100; | ||
const records = generateInsertableJson(numOfRecords); | ||
const { response, meta } = await createRecords(records, environmentName); | ||
const { response: formattedResults } = response; | ||
const { modelName, nangoConnectionId } = meta; | ||
const connection = await connectionService.getConnectionById(nangoConnectionId as number); | ||
const { error, success } = await DataService.upsert( | ||
formattedResults as unknown as DataRecord[], | ||
'_nango_sync_data_records', | ||
'external_id', | ||
nangoConnectionId as number, | ||
modelName, | ||
1, | ||
1 | ||
); | ||
expect(success).toBe(true); | ||
expect(error).toBe(undefined); | ||
const { connection, model } = await upsertRecords(numOfRecords); | ||
|
||
let cursor = null; | ||
|
||
const allFetchedRecords = []; | ||
do { | ||
const result = (await RecordsService.getAllDataRecords( | ||
connection?.connection_id as string, // connectionId | ||
connection?.provider_config_key as string, // providerConfigKey | ||
connection?.environment_id as number, // environmentId | ||
modelName, // model | ||
const result = await RecordsService.getAllDataRecords( | ||
connection.connection_id, | ||
connection.provider_config_key, | ||
connection.environment_id, | ||
model, | ||
undefined, // delta | ||
limit, // limit | ||
undefined, // filter | ||
cursor // cursor | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. json params would make this better :D |
||
)) as unknown as ServiceResponse<GetRecordsResponse>; | ||
); | ||
|
||
if (!result.response) { | ||
throw new Error('Response is undefined'); | ||
|
@@ -59,7 +62,7 @@ describe('Records service integration test', () => { | |
const { response: recordsResponse, error } = result; | ||
|
||
expect(error).toBe(null); | ||
expect(response).not.toBe(undefined); | ||
expect(recordsResponse).not.toBe(undefined); | ||
|
||
const { records, next_cursor } = recordsResponse; | ||
|
||
|
@@ -72,10 +75,8 @@ describe('Records service integration test', () => { | |
} while (cursor); | ||
|
||
for (let i = 1; i < allFetchedRecords.length; i++) { | ||
// @ts-expect-error | ||
const currentRecordDate = dayjs(allFetchedRecords[i]._nango_metadata.first_seen_at); | ||
// @ts-expect-error | ||
const previousRecordDate = dayjs(allFetchedRecords[i - 1]._nango_metadata.first_seen_at); | ||
const currentRecordDate = dayjs(allFetchedRecords[i]?._nango_metadata.first_seen_at); | ||
const previousRecordDate = dayjs(allFetchedRecords[i - 1]?._nango_metadata.first_seen_at); | ||
|
||
expect(currentRecordDate.isAfter(previousRecordDate) || currentRecordDate.isSame(previousRecordDate)).toBe(true); | ||
} | ||
|
@@ -85,44 +86,23 @@ describe('Records service integration test', () => { | |
it('Should be able to retrieve 20K records in under 5s with a cursor', async () => { | ||
const numOfRecords = 20000; | ||
const limit = 1000; | ||
const records = generateInsertableJson(numOfRecords); | ||
const { response, meta } = await createRecords(records, environmentName); | ||
const { response: formattedResults } = response; | ||
const { modelName, nangoConnectionId } = meta; | ||
|
||
// insert in chunks of 1000 | ||
// @ts-expect-error | ||
for (let i = 0; i < formattedResults?.length; i += 1000) { | ||
const { error, success } = await DataService.upsert( | ||
formattedResults?.slice(i, i + 1000) as unknown as DataRecord[], | ||
'_nango_sync_data_records', | ||
'external_id', | ||
nangoConnectionId as number, | ||
modelName, | ||
1, | ||
1 | ||
); | ||
expect(success).toBe(true); | ||
expect(error).toBe(undefined); | ||
} | ||
|
||
const connection = await connectionService.getConnectionById(nangoConnectionId as number); | ||
const { connection, model } = await upsertRecords(numOfRecords); | ||
|
||
let cursor: string | undefined | null = null; | ||
let allRecordsLength = 0; | ||
|
||
const startTime = Date.now(); | ||
do { | ||
const { response, error } = (await RecordsService.getAllDataRecords( | ||
connection?.connection_id as string, // connectionId | ||
connection?.provider_config_key as string, // providerConfigKey | ||
connection?.environment_id as number, // environmentId | ||
modelName, // model | ||
const { response, error } = await RecordsService.getAllDataRecords( | ||
connection.connection_id, | ||
connection.provider_config_key, | ||
connection.environment_id, | ||
model, // model | ||
undefined, // delta | ||
limit, // limit | ||
undefined, // filter | ||
cursor // cursor | ||
)) as unknown as ServiceResponse<GetRecordsResponse>; | ||
); | ||
|
||
if (!response) { | ||
throw new Error('Response is undefined'); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
those were never date since they are coming from pg as json encoded date.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should update the types in the node-client too to match these: https://github.com/NangoHQ/nango/blob/master/packages/node-client/lib/index.ts#L83
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix in 7c90610
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
knex is doing some transformation so I guess it was kinda true :D
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not even. knex was fine with those fields defined as date but at runtime
typeof
would bestring
😡