-
Notifications
You must be signed in to change notification settings - Fork 368
/
schedule.service.ts
131 lines (105 loc) · 4.61 KB
/
schedule.service.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import db, { schema, dbNamespace } from '../../db/database.js';
import type { Schedule as SyncSchedule, SyncCommand } from '../../models/Sync.js';
import { ScheduleStatus, SyncCommandToScheduleStatus } from '../../models/Sync.js';
import type { ServiceResponse } from '../../models/Generic.js';
import { getInterval } from '../nango-config.service.js';
import SyncClient from '../../clients/sync.client.js';
import { createActivityLogDatabaseErrorMessageAndEnd } from '../activity/activity.service.js';
import { resultOk, type Result, resultErr } from '../../utils/result.js';
const TABLE = dbNamespace + 'sync_schedules';
export const createSchedule = async (sync_id: string, frequency: string, offset: number, status: ScheduleStatus, schedule_id: string): Promise<void> => {
await db.knex.from<SyncSchedule>(TABLE).insert({
sync_id,
status,
schedule_id,
frequency,
offset
});
};
export const getScheduleById = async (schedule_id: string): Promise<SyncSchedule | null> => {
const result = await schema().select('*').from<SyncSchedule>(TABLE).where({ schedule_id, deleted: false }).first();
return result || null;
};
export const getSchedule = async (sync_id: string): Promise<SyncSchedule | null> => {
const result = await schema().select('*').from<SyncSchedule>(TABLE).where({ sync_id, deleted: false }).first();
if (result) {
return result;
}
return null;
};
export const getSyncSchedules = async (sync_id: string): Promise<SyncSchedule[]> => {
const result = await schema().select('*').from<SyncSchedule>(TABLE).where({ sync_id, deleted: false });
if (Array.isArray(result) && result.length > 0) {
return result;
}
return [];
};
export const deleteScheduleForSync = async (sync_id: string, environmentId: number): Promise<void> => {
const syncClient = await SyncClient.getInstance();
const schedule = await getSchedule(sync_id);
if (schedule && syncClient) {
await syncClient.deleteSyncSchedule(schedule?.schedule_id, environmentId);
}
};
export const markAllAsStopped = async (): Promise<void> => {
await schema().update({ status: ScheduleStatus.STOPPED }).from<SyncSchedule>(TABLE);
};
export const updateScheduleStatus = async (
schedule_id: string,
status: SyncCommand,
activityLogId: number | null,
environment_id: number
): Promise<Result<boolean>> => {
try {
await schema().update({ status: SyncCommandToScheduleStatus[status] }).from<SyncSchedule>(TABLE).where({ schedule_id, deleted: false });
return resultOk(true);
} catch (error) {
if (activityLogId) {
await createActivityLogDatabaseErrorMessageAndEnd(
`Failed to update schedule status to ${status} for schedule_id: ${schedule_id}.`,
error,
activityLogId,
environment_id
);
}
return resultErr(error as Error);
}
};
export const updateSyncScheduleFrequency = async (
sync_id: string,
interval: string,
syncName: string,
environmentId: number,
activityLogId?: number
): Promise<ServiceResponse<boolean>> => {
const existingSchedule = await getSchedule(sync_id);
if (!existingSchedule) {
return { success: true, error: null, response: false };
}
const { success, error, response } = getInterval(interval, new Date());
if (!success || response === null) {
return { success: false, error, response: null };
}
const { interval: frequency, offset } = response;
if (existingSchedule.frequency !== frequency) {
await schema().update({ frequency }).from<SyncSchedule>(TABLE).where({ sync_id, deleted: false });
const syncClient = await SyncClient.getInstance();
await syncClient?.updateSyncSchedule(existingSchedule.schedule_id, frequency, offset, environmentId, syncName, activityLogId);
return { success: true, error: null, response: true };
}
return { success: true, error: null, response: false };
};
export const updateOffset = async (schedule_id: string, offset: number): Promise<void> => {
await schema().update({ offset }).from<SyncSchedule>(TABLE).where({ schedule_id, deleted: false });
};
export async function softDeleteSchedules({ syncId, limit }: { syncId: string; limit: number }): Promise<number> {
return db
.knex('_nango_sync_schedules')
.update({
deleted: true,
deleted_at: db.knex.fn.now()
})
.whereIn('id', function (sub) {
sub.select('id').from('_nango_sync_schedules').where({ deleted: false, sync_id: syncId }).limit(limit);
});
}