-
Notifications
You must be signed in to change notification settings - Fork 368
/
runner.ts
135 lines (120 loc) · 4.14 KB
/
runner.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import type { KVStore } from '@nangohq/shared/lib/utils/kvstore/KVStore.js';
import { LocalRunner } from './local.runner.js';
import { RenderRunner } from './render.runner.js';
import { RemoteRunner } from './remote.runner.js';
import { isEnterprise, env } from '@nangohq/utils/dist/environment/detection.js';
import { getRedisUrl, InMemoryKVStore, RedisKVStore } from '@nangohq/shared';
import type { ProxyAppRouter } from '@nangohq/nango-runner';
import Logger from '@nangohq/utils/dist/logger.js';
const { logger } = new Logger('Runner');
export enum RunnerType {
Local = 'local',
Render = 'render',
Remote = 'remote'
}
export interface Runner {
runnerType: RunnerType;
id: string;
client: ProxyAppRouter;
url: string;
suspend(): Promise<void> | void;
toJSON(): Record<string, any>;
}
export function getRunnerId(suffix: string): string {
return `${env}-runner-account-${suffix}`;
}
export async function getOrStartRunner(runnerId: string): Promise<Runner> {
const waitForRunner = async function (runner: Runner): Promise<void> {
const timeoutMs = 5000;
let healthCheck = false;
const startTime = Date.now();
while (!healthCheck && Date.now() - startTime < timeoutMs) {
try {
await runner.client.health.query();
healthCheck = true;
} catch {
await new Promise((resolve) => setTimeout(resolve, 1000));
}
}
if (!healthCheck) {
throw new Error(`Runner '${runnerId}' hasn't started after ${timeoutMs}ms,`);
}
};
const cachedRunner = await runnersCache.get(runnerId);
if (cachedRunner) {
try {
await waitForRunner(cachedRunner);
return cachedRunner;
} catch (err) {
logger.error(err);
}
}
const isRender = process.env['IS_RENDER'] === 'true';
let runner: Runner;
if (isEnterprise) {
runner = await RemoteRunner.getOrStart(runnerId);
} else if (isRender) {
runner = await RenderRunner.getOrStart(runnerId);
} else {
runner = await LocalRunner.getOrStart(runnerId);
}
await waitForRunner(runner);
await runnersCache.set(runner);
return runner;
}
export async function suspendRunner(runnerId: string): Promise<void> {
const isRender = process.env['IS_RENDER'] === 'true';
if (isRender) {
// we only suspend render runners
const runner = await RenderRunner.get(runnerId);
if (runner) {
await runner.suspend();
}
}
await runnersCache.delete(runnerId);
}
// Caching the runners to minimize calls made to Render api
// and to better handle Render rate limits and potential downtime
class RunnerCache {
constructor(private readonly store: KVStore) {}
private cacheKey(s: string): string {
return `jobs:runner:${s}`;
}
async get(runnerId: string): Promise<Runner | undefined> {
try {
const cached = await this.store.get(this.cacheKey(runnerId));
if (cached) {
const obj = JSON.parse(cached);
switch (obj.runnerType) {
case RunnerType.Local:
return LocalRunner.fromJSON(obj);
case RunnerType.Render:
return RenderRunner.fromJSON(obj);
case RunnerType.Remote:
return RemoteRunner.fromJSON(obj);
}
}
return undefined;
} catch {
return undefined;
}
}
async set(runner: Runner): Promise<void> {
const ttl = 7 * 24 * 60 * 60 * 1000; // 7 days
await this.store.set(this.cacheKey(runner.id), JSON.stringify(runner), true, ttl);
}
async delete(runnerId: string): Promise<void> {
await this.store.delete(runnerId);
}
}
const runnersCache = await (async () => {
let store: KVStore;
const url = getRedisUrl();
if (url) {
store = new RedisKVStore(url);
await (store as RedisKVStore).connect();
} else {
store = new InMemoryKVStore();
}
return new RunnerCache(store);
})();