Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(feat) [nan-591] insert into bigQuery #1903

Merged
merged 16 commits into from Mar 29, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions .env.example
Expand Up @@ -74,6 +74,9 @@ DEFAULT_GITHUB_CLIENT_SECRET=
WORKOS_API_KEY=
WORKOS_CLIENT_ID=

# Google Cloud Configuration
GOOGLE_APPLICATION_CREDENTIALS=

# Encryption key for secrets / records stored in database
NANGO_ENCRYPTION_KEY=

Expand Down
1 change: 1 addition & 0 deletions Dockerfile
Expand Up @@ -16,6 +16,7 @@ COPY packages/runner/package.json ./packages/runner/package.json
COPY packages/server/package.json ./packages/server/package.json
COPY packages/shared/package.json ./packages/shared/package.json
COPY packages/webapp/package.json ./packages/webapp/package.json
COPY packages/data-ingestion/package.json ./packages/data-ingestion/package.json
COPY package*.json ./

# Install every dependencies
Expand Down
378 changes: 374 additions & 4 deletions package-lock.json

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion package.json
Expand Up @@ -11,7 +11,8 @@
"packages/persist",
"packages/jobs",
"packages/webapp",
"packages/utils"
"packages/utils",
"packages/data-ingestion"
],
"scripts": {
"create:migration": "cd packages/shared/lib/db && knex migrate:make $1 --esm --knexfile ./knexfile.cjs",
Expand Down
1 change: 1 addition & 0 deletions packages/cli/lib/services/dryrun.service.ts
Expand Up @@ -149,6 +149,7 @@ class DryRunService {
};

const syncRun = new syncRunService({
bigQueryClient: { insert: async () => {} },
integrationService,
writeToDb: false,
nangoConnection,
Expand Down
3 changes: 3 additions & 0 deletions packages/data-ingestion/.gitignore
@@ -0,0 +1,3 @@
tsconfig.tsbuildinfo
dist/*
node_modules
104 changes: 104 additions & 0 deletions packages/data-ingestion/lib/index.ts
@@ -0,0 +1,104 @@
import { BigQuery } from '@google-cloud/bigquery';
import type { BigQuery as BigQueryType } from '@google-cloud/bigquery';
import { getLogger } from '@nangohq/utils/dist/logger.js';
import { isCloud, isLocal } from '@nangohq/utils/dist/environment/detection.js';

const logger = getLogger('BigQueryClient');

interface RunScriptRow {
executionType: string;
internalConnectionId: number | undefined;
connectionId: string;
accountId: number | undefined;
scriptName: string;
scriptType: string;
environmentId: number;
providerConfigKey: string;
status: string;
syncId: string;
content: string;
runTimeInSeconds: number;
createdAt: number;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not a blocker: the schema is not linked to the type, which could lead to desync

const fields: { name: keyof RunScriptRow; type: 'STRING' | 'INTEGER' }[] = [
    { name: 'accountId', type: 'STRING' },
    { name: 'connectionId', type: 'STRING' }
];


class BigQueryClient {
private client: BigQuery;
private datasetName: string;
private tableName: string;

constructor({ datasetName, tableName }: { datasetName: string; tableName: string }) {
this.client = new BigQuery();
this.tableName = tableName;
this.datasetName = datasetName;
khaliqgant marked this conversation as resolved.
Show resolved Hide resolved
}

static async createInstance({ datasetName, tableName }: { datasetName?: string; tableName: string }) {
const instance = new BigQueryClient({
datasetName: datasetName || 'raw',
tableName
});
await instance.initialize();
return instance;
}

private async initialize() {
try {
if (isCloud) {
await this.createDataSet();
await this.createTable();
}
} catch (e) {
logger.error('Error initializing', e);
}
}

private async createDataSet() {
const dataset = this.client.dataset(this.datasetName);
const [exists] = await dataset.exists();
if (!exists) {
await this.client.createDataset(this.datasetName);
}
}

private async createTable() {
const table = this.client.dataset(this.datasetName).table(this.tableName);
const [exists] = await table.exists();
if (!exists) {
await table.create({
schema: {
fields: [
{ name: 'executionType', type: 'STRING' },
{ name: 'internalConnectionId', type: 'INTEGER' },
{ name: 'connectionId', type: 'STRING' },
{ name: 'accountId', type: 'INTEGER' },
{ name: 'scriptName', type: 'STRING' },
{ name: 'scriptType', type: 'STRING' },
{ name: 'environmentId', type: 'INTEGER' },
{ name: 'providerConfigKey', type: 'STRING' },
{ name: 'status', type: 'STRING' },
{ name: 'syncId', type: 'STRING' },
{ name: 'content', type: 'STRING' },
{ name: 'runTimeInSeconds', type: 'FLOAT' },
{ name: 'createdAt', type: 'INTEGER' }
]
}
});
}
}

public async insert(data: RunScriptRow, tableName?: string) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are not enforcing type and schema matching, should insert simply accept any record<string, string|number>?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't work as then there would be a type mismatch between the two.

const table = tableName || this.tableName;
try {
if (isCloud) {
await this.client.dataset(this.datasetName).table(table).insert(data);
}
if (isLocal) {
logger.info(`Data would be inserted into BigQuery type ${JSON.stringify(data, null, 2)}`);
}
khaliqgant marked this conversation as resolved.
Show resolved Hide resolved
} catch (e) {
logger.error('Error inserting into BigQuery', e);
}
}
}

export { BigQueryClient, BigQueryType };
23 changes: 23 additions & 0 deletions packages/data-ingestion/package.json
@@ -0,0 +1,23 @@
{
"name": "@nangohq/data-ingestion",
khaliqgant marked this conversation as resolved.
Show resolved Hide resolved
"version": "1.0.0",
"description": "Package to ingest Nango data for analytics",
"type": "module",
"main": "dist/index.js",
"typings": "dist/index.d.ts",
"scripts": {
"build": "rimraf ./dist && tsc"
},
"keywords": [],
"repository": {
"type": "git",
"url": "git+https://github.com/NangoHQ/nango.git",
"directory": "packages/utils"
},
"license": "SEE LICENSE IN LICENSE FILE IN GIT REPOSITORY",
"dependencies": {
"@google-cloud/bigquery": "7.5.1",
"@nangohq/utils": "file:../utils"
},
"devDependencies": {}
}
13 changes: 13 additions & 0 deletions packages/data-ingestion/tsconfig.json
@@ -0,0 +1,13 @@
{
"extends": "../../tsconfig.json",
"compilerOptions": {
"rootDir": "lib",
"outDir": "dist"
},
"include": ["lib/**/*"],
"references": [
{
"path": "../utils"
}
]
}
1 change: 1 addition & 0 deletions packages/jobs/Dockerfile
Expand Up @@ -11,6 +11,7 @@ WORKDIR /nango
COPY packages/node-client/ packages/node-client/
COPY packages/shared/ packages/shared/
COPY packages/utils/ packages/utils/
COPY packages/data-ingestion/ packages/data-ingestion/
COPY packages/jobs/ packages/jobs/
COPY packages/runner/ packages/runner/
COPY package*.json ./
Expand Down
11 changes: 11 additions & 0 deletions packages/jobs/lib/activities.ts
Expand Up @@ -23,11 +23,18 @@ import {
getLastSyncDate
} from '@nangohq/shared';
import { getLogger } from '@nangohq/utils/dist/logger.js';
import { BigQueryClient } from '@nangohq/data-ingestion/dist/index.js';
khaliqgant marked this conversation as resolved.
Show resolved Hide resolved
import { env } from '@nangohq/utils/dist/environment/detection.js';
import integrationService from './integration.service.js';
import type { ContinuousSyncArgs, InitialSyncArgs, ActionArgs, WebhookArgs } from './models/worker';

const logger = getLogger('Jobs');

const bigQueryClient = await BigQueryClient.createInstance({
datasetName: 'raw',
tableName: `${env}_script_runs`
});

export async function routeSync(args: InitialSyncArgs): Promise<boolean | object | null> {
const { syncId, syncJobId, syncName, nangoConnection, debug } = args;
let environmentId = nangoConnection?.environment_id;
Expand All @@ -53,6 +60,7 @@ export async function runAction(args: ActionArgs): Promise<ServiceResponse> {
const context: Context = Context.current();

const syncRun = new syncRunService({
bigQueryClient,
integrationService,
writeToDb: true,
nangoConnection,
Expand Down Expand Up @@ -225,6 +233,7 @@ export async function syncProvider(
}

const syncRun = new syncRunService({
bigQueryClient,
integrationService,
writeToDb: true,
syncId,
Expand Down Expand Up @@ -312,6 +321,7 @@ export async function runWebhook(args: WebhookArgs): Promise<boolean> {
);

const syncRun = new syncRunService({
bigQueryClient,
integrationService,
writeToDb: true,
nangoConnection,
Expand Down Expand Up @@ -404,6 +414,7 @@ export async function cancelActivity(workflowArguments: InitialSyncArgs | Contin
const syncType = lastSyncDate ? SyncType.INCREMENTAL : SyncType.INITIAL;

const syncRun = new syncRunService({
bigQueryClient,
integrationService,
writeToDb: true,
syncId,
Expand Down
4 changes: 2 additions & 2 deletions packages/jobs/nodemon.json
@@ -1,6 +1,6 @@
{
"watch": ["lib", "../shared/dist", "../utils/dist", "../../.env"],
"ext": "ts,json",
"watch": ["lib", "../shared/dist", "../utils/dist", "../data-ingestion/dist", "../../.env"],
"ext": "js,ts,json",
"ignore": ["lib/**/*.spec.ts"],
"exec": "tsx -r dotenv/config lib/app.ts dotenv_config_path=./../../.env",
"signal": "SIGTERM"
Expand Down
1 change: 1 addition & 0 deletions packages/jobs/package.json
Expand Up @@ -15,6 +15,7 @@
"directory": "packages/jobs"
},
"dependencies": {
"@nangohq/data-ingestion": "file:../data-ingestion",
"@nangohq/nango-runner": "^1.0.0",
"@nangohq/shared": "^0.39.13",
"@nangohq/utils": "file:../utils",
Expand Down
3 changes: 3 additions & 0 deletions packages/jobs/tsconfig.json
Expand Up @@ -13,6 +13,9 @@
},
{
"path": "../utils"
},
{
"path": "../data-ingestion"
}
],
"include": ["lib/**/*"]
Expand Down
2 changes: 1 addition & 1 deletion packages/persist/nodemon.json
@@ -1,6 +1,6 @@
{
"watch": ["lib", "../shared/dist", "../utils/dist", "../../.env"],
"ext": "ts,json",
"ext": "js,ts,json",
"ignore": ["lib/**/*.test.ts"],
"exec": "tsc && tsx -r dotenv/config lib/app.ts dotenv_config_path=./../../.env"
}
2 changes: 1 addition & 1 deletion packages/runner/nodemon.json
@@ -1,6 +1,6 @@
{
"watch": ["lib", "../shared/dist", "../utils/dist", "../../.env"],
"ext": "ts,json",
"ext": "js,ts,json",
"ignore": ["lib/**/*.test.ts"],
"exec": "tsc && tsx -r dotenv/config lib/app.ts dotenv_config_path=./../../.env"
}
2 changes: 1 addition & 1 deletion packages/server/nodemon.json
@@ -1,6 +1,6 @@
{
"watch": ["lib", "../shared/dist", "../utils/dist", "../../.env", "../shared/providers.yaml"],
"ext": "ts,json",
"ext": "js,ts,json",
"ignore": ["src/**/*.spec.ts"],
"exec": "tsx -r dotenv/config lib/server.ts Dotenv_config_path=./../../.env"
}
10 changes: 10 additions & 0 deletions packages/shared/lib/services/sync/run.service.integration.test.ts
Expand Up @@ -26,7 +26,14 @@ class integrationServiceMock implements IntegrationServiceInterface {
}
}

class bigQueryClientMock {
async insert() {
return;
}
}

const integrationService = new integrationServiceMock();
const bigQueryClient = new bigQueryClientMock();

describe('Running sync', () => {
beforeAll(async () => {
Expand Down Expand Up @@ -178,6 +185,7 @@ describe('Running sync', () => {

describe('SyncRun', () => {
const dryRunConfig = {
bigQueryClient,
integrationService: integrationService as unknown as IntegrationServiceInterface,
writeToDb: false,
nangoConnection: {
Expand All @@ -196,6 +204,7 @@ describe('SyncRun', () => {
it('should initialize correctly', () => {
const config = {
integrationService: integrationService as unknown as IntegrationServiceInterface,
bigQueryClient,
writeToDb: true,
nangoConnection: {
id: 1,
Expand Down Expand Up @@ -317,6 +326,7 @@ const runJob = async (
}
const config = {
integrationService: integrationService,
bigQueryClient,
writeToDb: true,
nangoConnection: connection,
syncName: sync.name,
Expand Down