Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

do not use jsonb_set in postgreSQL #1814

Merged
merged 3 commits into from Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 3 additions & 3 deletions packages/node-client/lib/index.ts
Expand Up @@ -81,10 +81,10 @@ export interface NangoSyncWebhookBody {
export type LastAction = 'ADDED' | 'UPDATED' | 'DELETED';

export interface RecordMetadata {
first_seen_at: Date;
last_seen_at: Date;
first_seen_at: string;
last_seen_at: string;
last_action: LastAction;
deleted_at: Date | null;
deleted_at: string | null;
cursor: string;
}

Expand Down
4 changes: 2 additions & 2 deletions packages/shared/lib/db/seeders/connection.seeder.ts
Expand Up @@ -25,8 +25,8 @@ export const createConnectionSeeds = async (environmentName = ''): Promise<numbe

for (const [name] of connectionParams) {
const [result] = (await connectionService.upsertConnection(
name as string,
name as string,
`conn-${name}`,
`provider-${name}`,
'google',
{} as AuthCredentials,
{},
Expand Down
26 changes: 13 additions & 13 deletions packages/shared/lib/models/Sync.ts
Expand Up @@ -216,23 +216,22 @@ export type CustomerFacingDataRecord = {
_nango_metadata: RecordMetadata;
} & Record<string, any> & { id: string | number };

export interface EncryptedRecord {
export type EncryptedRawRecord = {
iv: string;
authTag: string;
encryptedValue: string;
}

export type EncryptedInternalDataRecord = {
_nango_metadata: RecordMetadata;
} & EncryptedRecord;
};

export type GetRecordsResponse = { records: CustomerFacingDataRecord[] | DataRecordWithMetadata[]; next_cursor?: string | null } | null;
export type UnencryptedRawRecord = Record<string, any> & { id: string | number };

export interface RawDataRecordResult {
export type RawDataRecordResult = {
id: string | number;
record: CustomerFacingDataRecord | EncryptedInternalDataRecord;
}
record: UnencryptedRawRecord | EncryptedRawRecord;
} & RecordMetadata;

export type GetRecordsResponse = { records: CustomerFacingDataRecord[]; next_cursor?: string | null } | null;

// TO DEPRECATE
export type RecordWrapCustomerFacingDataRecord = { record: CustomerFacingDataRecord }[];

export interface DataRecord extends Timestamps {
Expand All @@ -256,13 +255,14 @@ export interface DataRecord extends Timestamps {
export type LastAction = 'ADDED' | 'UPDATED' | 'DELETED' | 'added' | 'updated' | 'deleted';

interface RecordMetadata {
first_seen_at: Date;
last_modified_at: Date;
first_seen_at: string;
last_modified_at: string;
last_action: LastAction;
deleted_at: Date | null;
deleted_at: string | null;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

those were never date since they are coming from pg as json encoded date.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should update the types in the node-client too to match these: https://github.com/NangoHQ/nango/blob/master/packages/node-client/lib/index.ts#L83

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix in 7c90610

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

knex is doing some transformation so I guess it was kinda true :D

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not even. knex was fine with those fields defined as date but at runtime typeof would be string 😡

cursor: string;
}

// DEPRECATED
export interface DataRecordWithMetadata extends RecordMetadata {
record: object;
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only use in the deprecated records function so it should be removed when we finally delete the deprecated record fetching method

Comment on lines +265 to 268
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// DEPRECATED
export interface DataRecordWithMetadata extends RecordMetadata {
record: object;
}
/**
* @deprecated
*/
export interface DataRecordWithMetadata extends RecordMetadata {
record: object;
}

Expand Down
51 changes: 50 additions & 1 deletion packages/shared/lib/services/sync/data/mocks.ts
Expand Up @@ -3,15 +3,64 @@ import { createSyncSeeds } from '../../../db/seeders/sync.seeder.js';
import { createSyncJobSeeds } from '../../../db/seeders/sync-job.seeder.js';
import { formatDataRecords } from './records.service.js';
import type { DataResponse } from '../../../models/Data.js';
import connectionService from '../../connection.service.js';
import * as DataService from './data.service.js';
import type { Connection } from '../../../models/Connection.js';

export async function upsertRecords(n: number): Promise<{ connection: Connection; model: string }> {
const activityLogId = 1;
const environmentId = 1;
const environmentName = 'mock-records';
const toInsert = generateInsertableJson(n);
const {
response: { response: records },
meta: { modelName, nangoConnectionId }
} = await createRecords(toInsert, environmentName);
if (!records) {
throw new Error('Failed to format records');
}
const connection = await connectionService.getConnectionById(nangoConnectionId);
if (!connection) {
throw new Error(`Connection '${nangoConnectionId}' not found`);
}
const chunkSize = 1000;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

personal opinion: code without line break is a bit harder to read in the long run

for (let i = 0; i < records.length; i += chunkSize) {
const { error, success } = await DataService.upsert(
records.slice(i, i + chunkSize),
'_nango_sync_data_records',
'external_id',
nangoConnectionId,
modelName,
activityLogId,
environmentId
);
if (!success) {
throw new Error(`Failed to upsert records: ${error}`);
}
}
return {
connection: connection as Connection,
model: modelName
};
}

export async function createRecords(records: DataResponse[], environmentName = '') {
const connections = await createConnectionSeeds(environmentName);

const [nangoConnectionId]: number[] = connections;
if (!nangoConnectionId) {
throw new Error('Failed to create connection');
}
const sync = await createSyncSeeds(nangoConnectionId);
if (!sync.id) {
throw new Error('Failed to create sync');
}
const job = await createSyncJobSeeds(nangoConnectionId);
if (!job.id) {
throw new Error('Failed to create job');
}
const modelName = Math.random().toString(36).substring(7);
const response = formatDataRecords(records, nangoConnectionId as number, modelName, sync.id as string, job.id as number);
const response = formatDataRecords(records, nangoConnectionId, modelName, sync.id, job.id);

return {
meta: {
Expand Down
@@ -1,56 +1,59 @@
import { expect, describe, it, beforeAll } from 'vitest';
import dayjs from 'dayjs';
import { multipleMigrations } from '../../../db/database.js';
import * as DataService from './data.service.js';
import * as RecordsService from './records.service.js';
import { createConfigSeeds } from '../../../db/seeders/config.seeder.js';
import type { GetRecordsResponse, DataRecord } from '../../../models/Sync.js';
import type { ServiceResponse } from '../../../models/Generic.js';
import connectionService from '../../connection.service.js';
import { generateInsertableJson, createRecords } from './mocks.js';
import { upsertRecords } from './mocks.js';

const environmentName = 'records-service';

describe('Records service integration test', () => {
describe('Records service', () => {
beforeAll(async () => {
await multipleMigrations();
await createConfigSeeds(environmentName);
});

it('Should retrieve records', async () => {
const n = 10;
const { connection, model } = await upsertRecords(n);
const { success, response, error } = await RecordsService.getAllDataRecords(
connection.connection_id,
connection.provider_config_key,
connection.environment_id,
model
);
expect(success).toBe(true);
expect(error).toBe(null);
expect(response?.records.length).toBe(n);
const timestampRegex = /^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.\d{6}\+\d{2}:\d{2}$/;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had created an helper in vitest to achieve something similar without copy pasting

const dateReg = /^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.\d{3}Z$/;

expect.extend({
  toBeIsoDate: (received: any) => {
    if (!dateReg.test(received)) {
      return {
        message: () => `expected ${received} to be an ISO Date`,
        pass: false,
      };
    }
    return { pass: true, message: () => '' };
  },
});

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I extended expect at first as well but since it was actually shorter to use stringMatching I went for that

expect(response?.records[0]?.['_nango_metadata']).toMatchObject({
first_seen_at: expect.stringMatching(timestampRegex),
last_modified_at: expect.stringMatching(timestampRegex),
last_action: 'ADDED',
deleted_at: null,
cursor: expect.stringMatching(/^[A-Za-z0-9+/]+={0,2}$/) // base64 encoded string
});
expect(response?.next_cursor).toBe(null); // no next page
});
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

adding a test to make sure the format of metadata isn't modified with those changes


it('Should paginate the records to retrieve all records', async () => {
const numOfRecords = 3000;
const limit = 100;
const records = generateInsertableJson(numOfRecords);
const { response, meta } = await createRecords(records, environmentName);
const { response: formattedResults } = response;
const { modelName, nangoConnectionId } = meta;
const connection = await connectionService.getConnectionById(nangoConnectionId as number);
const { error, success } = await DataService.upsert(
formattedResults as unknown as DataRecord[],
'_nango_sync_data_records',
'external_id',
nangoConnectionId as number,
modelName,
1,
1
);
expect(success).toBe(true);
expect(error).toBe(undefined);
const { connection, model } = await upsertRecords(numOfRecords);

let cursor = null;

const allFetchedRecords = [];
do {
const result = (await RecordsService.getAllDataRecords(
connection?.connection_id as string, // connectionId
connection?.provider_config_key as string, // providerConfigKey
connection?.environment_id as number, // environmentId
modelName, // model
const result = await RecordsService.getAllDataRecords(
connection.connection_id,
connection.provider_config_key,
connection.environment_id,
model,
undefined, // delta
limit, // limit
undefined, // filter
cursor // cursor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

json params would make this better :D

)) as unknown as ServiceResponse<GetRecordsResponse>;
);

if (!result.response) {
throw new Error('Response is undefined');
Expand All @@ -59,7 +62,7 @@ describe('Records service integration test', () => {
const { response: recordsResponse, error } = result;

expect(error).toBe(null);
expect(response).not.toBe(undefined);
expect(recordsResponse).not.toBe(undefined);

const { records, next_cursor } = recordsResponse;

Expand All @@ -72,10 +75,8 @@ describe('Records service integration test', () => {
} while (cursor);

for (let i = 1; i < allFetchedRecords.length; i++) {
// @ts-expect-error
const currentRecordDate = dayjs(allFetchedRecords[i]._nango_metadata.first_seen_at);
// @ts-expect-error
const previousRecordDate = dayjs(allFetchedRecords[i - 1]._nango_metadata.first_seen_at);
const currentRecordDate = dayjs(allFetchedRecords[i]?._nango_metadata.first_seen_at);
const previousRecordDate = dayjs(allFetchedRecords[i - 1]?._nango_metadata.first_seen_at);

expect(currentRecordDate.isAfter(previousRecordDate) || currentRecordDate.isSame(previousRecordDate)).toBe(true);
}
Expand All @@ -85,44 +86,23 @@ describe('Records service integration test', () => {
it('Should be able to retrieve 20K records in under 5s with a cursor', async () => {
const numOfRecords = 20000;
const limit = 1000;
const records = generateInsertableJson(numOfRecords);
const { response, meta } = await createRecords(records, environmentName);
const { response: formattedResults } = response;
const { modelName, nangoConnectionId } = meta;

// insert in chunks of 1000
// @ts-expect-error
for (let i = 0; i < formattedResults?.length; i += 1000) {
const { error, success } = await DataService.upsert(
formattedResults?.slice(i, i + 1000) as unknown as DataRecord[],
'_nango_sync_data_records',
'external_id',
nangoConnectionId as number,
modelName,
1,
1
);
expect(success).toBe(true);
expect(error).toBe(undefined);
}

const connection = await connectionService.getConnectionById(nangoConnectionId as number);
const { connection, model } = await upsertRecords(numOfRecords);

let cursor: string | undefined | null = null;
let allRecordsLength = 0;

const startTime = Date.now();
do {
const { response, error } = (await RecordsService.getAllDataRecords(
connection?.connection_id as string, // connectionId
connection?.provider_config_key as string, // providerConfigKey
connection?.environment_id as number, // environmentId
modelName, // model
const { response, error } = await RecordsService.getAllDataRecords(
connection.connection_id,
connection.provider_config_key,
connection.environment_id,
model, // model
undefined, // delta
limit, // limit
undefined, // filter
cursor // cursor
)) as unknown as ServiceResponse<GetRecordsResponse>;
);

if (!response) {
throw new Error('Response is undefined');
Expand Down