Skip to content

Commit

Permalink
(feat) [nan-591] insert into bigQuery (#1903)
Browse files Browse the repository at this point in the history
## Describe your changes
Add in BigQuery data ingestion from sync, actions, and webhooks

## Issue ticket number and link
NAN-591

## Checklist before requesting a review (skip if just adding/editing
APIs & templates)
- [ ] I added tests, otherwise the reason is: 
- [ ] I added observability, otherwise the reason is:
- [ ] I added analytics, otherwise the reason is:
  • Loading branch information
khaliqgant committed Mar 29, 2024
1 parent 8a3e0f9 commit 98bb60c
Show file tree
Hide file tree
Showing 20 changed files with 643 additions and 30 deletions.
3 changes: 3 additions & 0 deletions .env.example
Expand Up @@ -74,6 +74,9 @@ DEFAULT_GITHUB_CLIENT_SECRET=
WORKOS_API_KEY=
WORKOS_CLIENT_ID=

# Google Cloud Configuration
GOOGLE_APPLICATION_CREDENTIALS=

# Encryption key for secrets / records stored in database
NANGO_ENCRYPTION_KEY=

Expand Down
1 change: 1 addition & 0 deletions Dockerfile
Expand Up @@ -16,6 +16,7 @@ COPY packages/runner/package.json ./packages/runner/package.json
COPY packages/server/package.json ./packages/server/package.json
COPY packages/shared/package.json ./packages/shared/package.json
COPY packages/webapp/package.json ./packages/webapp/package.json
COPY packages/data-ingestion/package.json ./packages/data-ingestion/package.json
COPY package*.json ./

# Install every dependencies
Expand Down
378 changes: 374 additions & 4 deletions package-lock.json

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion package.json
Expand Up @@ -11,7 +11,8 @@
"packages/persist",
"packages/jobs",
"packages/webapp",
"packages/utils"
"packages/utils",
"packages/data-ingestion"
],
"scripts": {
"create:migration": "cd packages/shared/lib/db && knex migrate:make $1 --esm --knexfile ./knexfile.cjs",
Expand Down
3 changes: 3 additions & 0 deletions packages/data-ingestion/.gitignore
@@ -0,0 +1,3 @@
tsconfig.tsbuildinfo
dist/*
node_modules
101 changes: 101 additions & 0 deletions packages/data-ingestion/lib/index.ts
@@ -0,0 +1,101 @@
import { BigQuery } from '@google-cloud/bigquery';
import type { BigQuery as BigQueryType } from '@google-cloud/bigquery';
import { getLogger } from '@nangohq/utils/dist/logger.js';
import { isCloud } from '@nangohq/utils/dist/environment/detection.js';

const logger = getLogger('BigQueryClient');

interface RunScriptRow {
executionType: string;
internalConnectionId: number | undefined;
connectionId: string;
accountId: number | undefined;
scriptName: string;
scriptType: string;
environmentId: number;
providerConfigKey: string;
status: string;
syncId: string;
content: string;
runTimeInSeconds: number;
createdAt: number;
}

class BigQueryClient {
private client: BigQuery;
private datasetName: string;
private tableName: string;

constructor({ datasetName, tableName }: { datasetName: string; tableName: string }) {
this.client = new BigQuery();
this.tableName = tableName;
this.datasetName = datasetName;
}

static async createInstance({ datasetName, tableName }: { datasetName?: string; tableName: string }) {
const instance = new BigQueryClient({
datasetName: datasetName || 'raw',
tableName
});
await instance.initialize();
return instance;
}

private async initialize() {
try {
if (isCloud) {
await this.createDataSet();
await this.createTable();
}
} catch (e) {
logger.error('Error initializing', e);
}
}

private async createDataSet() {
const dataset = this.client.dataset(this.datasetName);
const [exists] = await dataset.exists();
if (!exists) {
await this.client.createDataset(this.datasetName);
}
}

private async createTable() {
const table = this.client.dataset(this.datasetName).table(this.tableName);
const [exists] = await table.exists();
if (!exists) {
await table.create({
schema: {
fields: [
{ name: 'executionType', type: 'STRING' },
{ name: 'internalConnectionId', type: 'INTEGER' },
{ name: 'connectionId', type: 'STRING' },
{ name: 'accountId', type: 'INTEGER' },
{ name: 'scriptName', type: 'STRING' },
{ name: 'scriptType', type: 'STRING' },
{ name: 'environmentId', type: 'INTEGER' },
{ name: 'providerConfigKey', type: 'STRING' },
{ name: 'status', type: 'STRING' },
{ name: 'syncId', type: 'STRING' },
{ name: 'content', type: 'STRING' },
{ name: 'runTimeInSeconds', type: 'FLOAT' },
{ name: 'createdAt', type: 'INTEGER' }
]
}
});
}
}

public async insert(data: RunScriptRow, tableName?: string) {
const table = tableName || this.tableName;
try {
if (isCloud) {
await this.client.dataset(this.datasetName).table(table).insert(data);
}
} catch (e) {
logger.error('Error inserting into BigQuery', e);
}
}
}

export { BigQueryClient, BigQueryType };
23 changes: 23 additions & 0 deletions packages/data-ingestion/package.json
@@ -0,0 +1,23 @@
{
"name": "@nangohq/data-ingestion",
"version": "1.0.0",
"description": "Package to ingest Nango data for analytics",
"type": "module",
"main": "dist/index.js",
"typings": "dist/index.d.ts",
"scripts": {
"build": "rimraf ./dist && tsc"
},
"keywords": [],
"repository": {
"type": "git",
"url": "git+https://github.com/NangoHQ/nango.git",
"directory": "packages/utils"
},
"license": "SEE LICENSE IN LICENSE FILE IN GIT REPOSITORY",
"dependencies": {
"@google-cloud/bigquery": "7.5.1",
"@nangohq/utils": "file:../utils"
},
"devDependencies": {}
}
13 changes: 13 additions & 0 deletions packages/data-ingestion/tsconfig.json
@@ -0,0 +1,13 @@
{
"extends": "../../tsconfig.json",
"compilerOptions": {
"rootDir": "lib",
"outDir": "dist"
},
"include": ["lib/**/*"],
"references": [
{
"path": "../utils"
}
]
}
1 change: 1 addition & 0 deletions packages/jobs/Dockerfile
Expand Up @@ -11,6 +11,7 @@ WORKDIR /nango
COPY packages/node-client/ packages/node-client/
COPY packages/shared/ packages/shared/
COPY packages/utils/ packages/utils/
COPY packages/data-ingestion/ packages/data-ingestion/
COPY packages/jobs/ packages/jobs/
COPY packages/runner/ packages/runner/
COPY package*.json ./
Expand Down
11 changes: 11 additions & 0 deletions packages/jobs/lib/activities.ts
Expand Up @@ -23,11 +23,18 @@ import {
getLastSyncDate
} from '@nangohq/shared';
import { getLogger } from '@nangohq/utils/dist/logger.js';
import { BigQueryClient } from '@nangohq/data-ingestion/dist/index.js';
import { env } from '@nangohq/utils/dist/environment/detection.js';
import integrationService from './integration.service.js';
import type { ContinuousSyncArgs, InitialSyncArgs, ActionArgs, WebhookArgs } from './models/worker';

const logger = getLogger('Jobs');

const bigQueryClient = await BigQueryClient.createInstance({
datasetName: 'raw',
tableName: `${env}_script_runs`
});

export async function routeSync(args: InitialSyncArgs): Promise<boolean | object | null> {
const { syncId, syncJobId, syncName, nangoConnection, debug } = args;
let environmentId = nangoConnection?.environment_id;
Expand All @@ -53,6 +60,7 @@ export async function runAction(args: ActionArgs): Promise<ServiceResponse> {
const context: Context = Context.current();

const syncRun = new syncRunService({
bigQueryClient,
integrationService,
writeToDb: true,
nangoConnection,
Expand Down Expand Up @@ -225,6 +233,7 @@ export async function syncProvider(
}

const syncRun = new syncRunService({
bigQueryClient,
integrationService,
writeToDb: true,
syncId,
Expand Down Expand Up @@ -312,6 +321,7 @@ export async function runWebhook(args: WebhookArgs): Promise<boolean> {
);

const syncRun = new syncRunService({
bigQueryClient,
integrationService,
writeToDb: true,
nangoConnection,
Expand Down Expand Up @@ -404,6 +414,7 @@ export async function cancelActivity(workflowArguments: InitialSyncArgs | Contin
const syncType = lastSyncDate ? SyncType.INCREMENTAL : SyncType.INITIAL;

const syncRun = new syncRunService({
bigQueryClient,
integrationService,
writeToDb: true,
syncId,
Expand Down
4 changes: 2 additions & 2 deletions packages/jobs/nodemon.json
@@ -1,6 +1,6 @@
{
"watch": ["lib", "../shared/dist", "../utils/dist", "../../.env"],
"ext": "ts,json",
"watch": ["lib", "../shared/dist", "../utils/dist", "../data-ingestion/dist", "../../.env"],
"ext": "js,ts,json",
"ignore": ["lib/**/*.spec.ts"],
"exec": "tsx -r dotenv/config lib/app.ts dotenv_config_path=./../../.env",
"signal": "SIGTERM"
Expand Down
1 change: 1 addition & 0 deletions packages/jobs/package.json
Expand Up @@ -15,6 +15,7 @@
"directory": "packages/jobs"
},
"dependencies": {
"@nangohq/data-ingestion": "file:../data-ingestion",
"@nangohq/nango-runner": "^1.0.0",
"@nangohq/shared": "^0.39.13",
"@nangohq/utils": "file:../utils",
Expand Down
3 changes: 3 additions & 0 deletions packages/jobs/tsconfig.json
Expand Up @@ -13,6 +13,9 @@
},
{
"path": "../utils"
},
{
"path": "../data-ingestion"
}
],
"include": ["lib/**/*"]
Expand Down
2 changes: 1 addition & 1 deletion packages/persist/nodemon.json
@@ -1,6 +1,6 @@
{
"watch": ["lib", "../shared/dist", "../utils/dist", "../../.env"],
"ext": "ts,json",
"ext": "js,ts,json",
"ignore": ["lib/**/*.test.ts"],
"exec": "tsc && tsx -r dotenv/config lib/app.ts dotenv_config_path=./../../.env"
}
2 changes: 1 addition & 1 deletion packages/runner/nodemon.json
@@ -1,6 +1,6 @@
{
"watch": ["lib", "../shared/dist", "../utils/dist", "../../.env"],
"ext": "ts,json",
"ext": "js,ts,json",
"ignore": ["lib/**/*.test.ts"],
"exec": "tsc && tsx -r dotenv/config lib/app.ts dotenv_config_path=./../../.env"
}
2 changes: 1 addition & 1 deletion packages/server/nodemon.json
@@ -1,6 +1,6 @@
{
"watch": ["lib", "../shared/dist", "../utils/dist", "../../.env", "../shared/providers.yaml"],
"ext": "ts,json",
"ext": "js,ts,json",
"ignore": ["src/**/*.spec.ts"],
"exec": "tsx -r dotenv/config lib/server.ts Dotenv_config_path=./../../.env"
}
1 change: 1 addition & 0 deletions packages/shared/lib/services/sync/config/config.service.ts
Expand Up @@ -738,6 +738,7 @@ export async function getConfigWithEndpointsByProviderConfigKey(environment_id:
`${TABLE}.sync_type`,
`${TABLE}.track_deletes`,
`${TABLE}.auto_start`,
`${TABLE}.webhook_subscriptions`,
'_nango_configs.unique_key',
'_nango_configs.provider',
db.knex.raw(
Expand Down
Expand Up @@ -345,7 +345,7 @@ const runJob = async (
};
await jobService.updateSyncJobResult(syncJob.id, updatedResults, model);
// finish the sync
await syncRun.finishSync([model], new Date(), `v1`, 10, trackDeletes);
await syncRun.finishFlow([model], new Date(), `v1`, 10, trackDeletes);

const syncJobResult = await jobService.getLatestSyncJob(sync.id);
return {
Expand Down

0 comments on commit 98bb60c

Please sign in to comment.