-
Notifications
You must be signed in to change notification settings - Fork 368
/
temporal.ts
83 lines (70 loc) · 2.81 KB
/
temporal.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import { Worker, NativeConnection } from '@temporalio/worker';
import fs from 'fs-extra';
import * as dotenv from 'dotenv';
import { createRequire } from 'module';
import * as activities from './activities.js';
import { SYNC_TASK_QUEUE, WEBHOOK_TASK_QUEUE } from '@nangohq/shared';
import { isProd, isEnterprise } from '@nangohq/utils/dist/environment/detection.js';
import { getLogger } from '@nangohq/utils/dist/logger.js';
const logger = getLogger('Jobs.Temporal');
const TEMPORAL_WORKER_MAX_CONCURRENCY = parseInt(process.env['TEMPORAL_WORKER_MAX_CONCURRENCY'] || '0') || 500;
export class Temporal {
namespace: string;
workers: Worker[] | null;
constructor(namespace: string) {
this.namespace = namespace;
this.workers = null;
}
async start() {
logger.info('Starting Temporal worker');
if (process.env['SERVER_RUN_MODE'] !== 'DOCKERIZED') {
dotenv.config({ path: '../../.env' });
}
let crt: Buffer | null = null;
let key: Buffer | null = null;
if (isProd || isEnterprise) {
crt = await fs.readFile(`/etc/secrets/${this.namespace}.crt`);
key = await fs.readFile(`/etc/secrets/${this.namespace}.key`);
}
try {
const connection = await NativeConnection.connect({
address: process.env['TEMPORAL_ADDRESS'] || 'localhost:7233',
tls:
!isProd && !isEnterprise
? false
: {
clientCertPair: {
crt: crt as Buffer,
key: key as Buffer
}
}
});
const syncWorker = {
connection,
namespace: this.namespace,
workflowsPath: createRequire(import.meta.url).resolve('./workflows'),
activities,
maxConcurrentWorkflowTaskExecutions: TEMPORAL_WORKER_MAX_CONCURRENCY,
taskQueue: SYNC_TASK_QUEUE
};
const webhookWorker = {
connection,
namespace: this.namespace,
workflowsPath: createRequire(import.meta.url).resolve('./workflows'),
activities,
maxConcurrentWorkflowTaskExecutions: 50,
maxActivitiesPerSecond: 50,
taskQueue: WEBHOOK_TASK_QUEUE
};
this.workers = await Promise.all([Worker.create(syncWorker), Worker.create(webhookWorker)]);
await Promise.all(this.workers.map((worker) => worker.run()));
} catch (e) {
logger.error(e);
}
}
stop() {
if (this.workers) {
this.workers.forEach((worker) => worker.shutdown());
}
}
}