/
PostgresJobsQueue.swift
293 lines (264 loc) · 10.9 KB
/
PostgresJobsQueue.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
//===----------------------------------------------------------------------===//
//
// This source file is part of the Hummingbird server framework project
//
// Copyright (c) 2024 the Hummingbird authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
import Foundation
import HummingbirdJobs
@_spi(ConnectionPool) import HummingbirdPostgres
import Logging
import NIOConcurrencyHelpers
import NIOCore
@_spi(ConnectionPool) import PostgresNIO
@_spi(ConnectionPool)
public final class PostgresQueue: JobQueueDriver {
public typealias JobID = UUID
/// what to do with failed/processing jobs from last time queue was handled
public enum JobInitialization: Sendable {
case doNothing
case rerun
case remove
}
/// Errors thrown by PostgresJobQueue
public enum PostgresQueueError: Error, CustomStringConvertible {
case failedToAdd
public var description: String {
switch self {
case .failedToAdd:
return "Failed to add job to queue"
}
}
}
/// Job Status
enum Status: Int16, PostgresCodable {
case pending = 0
case processing = 1
case failed = 2
}
/// Queue configuration
public struct Configuration: Sendable {
let pendingJobsInitialization: JobInitialization
let failedJobsInitialization: JobInitialization
let processingJobsInitialization: JobInitialization
let pollTime: Duration
public init(
pendingJobsInitialization: JobInitialization = .doNothing,
failedJobsInitialization: JobInitialization = .rerun,
processingJobsInitialization: JobInitialization = .rerun,
pollTime: Duration = .milliseconds(100)
) {
self.pendingJobsInitialization = pendingJobsInitialization
self.failedJobsInitialization = failedJobsInitialization
self.processingJobsInitialization = processingJobsInitialization
self.pollTime = pollTime
}
}
/// Postgres client used by Job queue
public let client: PostgresClient
/// Job queue configuration
public let configuration: Configuration
/// Logger used by queue
public let logger: Logger
let migrations: PostgresMigrations
let isStopped: NIOLockedValueBox<Bool>
/// Initialize a PostgresJobQueue
public init(client: PostgresClient, migrations: PostgresMigrations, configuration: Configuration = .init(), logger: Logger) async {
self.client = client
self.configuration = configuration
self.logger = logger
self.isStopped = .init(false)
self.migrations = migrations
await migrations.add(CreateJobs())
await migrations.add(CreateJobQueue())
}
/// Run on initialization of the job queue
public func onInit() async throws {
do {
self.logger.info("Waiting for JobQueue migrations")
try await self.migrations.waitUntilCompleted()
_ = try await self.client.withConnection { connection in
self.logger.info("Update Jobs at initialization")
try await self.updateJobsOnInit(withStatus: .pending, onInit: self.configuration.pendingJobsInitialization, connection: connection)
try await self.updateJobsOnInit(withStatus: .processing, onInit: self.configuration.processingJobsInitialization, connection: connection)
try await self.updateJobsOnInit(withStatus: .failed, onInit: self.configuration.failedJobsInitialization, connection: connection)
}
} catch let error as PSQLError {
print("\(String(reflecting: error))")
throw error
}
}
/// Push Job onto queue
/// - Returns: Identifier of queued job
@discardableResult public func push(_ buffer: ByteBuffer) async throws -> JobID {
try await self.client.withConnection { connection in
let queuedJob = QueuedJob<JobID>(id: .init(), jobBuffer: buffer)
try await add(queuedJob, connection: connection)
try await addToQueue(jobId: queuedJob.id, connection: connection)
return queuedJob.id
}
}
/// This is called to say job has finished processing and it can be deleted
public func finished(jobId: JobID) async throws {
_ = try await self.client.withConnection { connection in
try await self.delete(jobId: jobId, connection: connection)
}
}
/// This is called to say job has failed to run and should be put aside
public func failed(jobId: JobID, error: Error) async throws {
_ = try await self.client.withConnection { connection in
try await self.setStatus(jobId: jobId, status: .failed, connection: connection)
}
}
/// stop serving jobs
public func stop() async {
self.isStopped.withLockedValue { $0 = true }
}
/// shutdown queue once all active jobs have been processed
public func shutdownGracefully() async {}
func popFirst() async throws -> QueuedJob<JobID>? {
do {
return try await self.client.withConnection { connection -> QueuedJob? in
while true {
try Task.checkCancellation()
let stream = try await connection.query(
"""
DELETE
FROM _hb_pg_job_queue pse
WHERE pse.job_id =
(SELECT pse_inner.job_id
FROM _hb_pg_job_queue pse_inner
ORDER BY pse_inner.createdAt ASC
FOR UPDATE SKIP LOCKED
LIMIT 1)
RETURNING pse.job_id
""",
logger: self.logger
)
// return nil if nothing in queue
guard let jobId = try await stream.decode(UUID.self, context: .default).first(where: { _ in true }) else {
return nil
}
// select job from job table
let stream2 = try await connection.query(
"SELECT job FROM _hb_pg_jobs WHERE id = \(jobId)",
logger: self.logger
)
do {
try await self.setStatus(jobId: jobId, status: .processing, connection: connection)
// if failed to find a job in the job table try getting another index
guard let buffer = try await stream2.decode(ByteBuffer.self, context: .default).first(where: { _ in true }) else {
continue
}
return QueuedJob(id: jobId, jobBuffer: buffer)
} catch {
try await self.setStatus(jobId: jobId, status: .failed, connection: connection)
throw JobQueueError.decodeJobFailed
}
}
}
} catch let error as PSQLError {
print("\(String(reflecting: error))")
throw error
}
}
func add(_ job: QueuedJob<JobID>, connection: PostgresConnection) async throws {
try await connection.query(
"""
INSERT INTO _hb_pg_jobs (id, job, status)
VALUES (\(job.id), \(job.jobBuffer), \(Status.pending))
""",
logger: self.logger
)
}
func delete(jobId: JobID, connection: PostgresConnection) async throws {
try await connection.query(
"DELETE FROM _hb_pg_jobs WHERE id = \(jobId)",
logger: self.logger
)
}
func addToQueue(jobId: JobID, connection: PostgresConnection) async throws {
try await connection.query(
"""
INSERT INTO _hb_pg_job_queue (job_id, createdAt) VALUES (\(jobId), \(Date.now))
""",
logger: self.logger
)
}
func setStatus(jobId: JobID, status: Status, connection: PostgresConnection) async throws {
try await connection.query(
"UPDATE _hb_pg_jobs SET status = \(status), lastModified = \(Date.now) WHERE id = \(jobId)",
logger: self.logger
)
}
func getJobs(withStatus status: Status) async throws -> [JobID] {
return try await self.client.withConnection { connection in
let stream = try await connection.query(
"SELECT id FROM _hb_pg_jobs WHERE status = \(status)",
logger: self.logger
)
var jobs: [JobID] = []
for try await id in stream.decode(JobID.self, context: .default) {
jobs.append(id)
}
return jobs
}
}
func updateJobsOnInit(withStatus status: Status, onInit: JobInitialization, connection: PostgresConnection) async throws {
switch onInit {
case .remove:
try await connection.query(
"DELETE FROM _hb_pg_jobs WHERE status = \(status)",
logger: self.logger
)
case .rerun:
guard status != .pending else { return }
let jobs = try await getJobs(withStatus: status)
self.logger.info("Moving \(jobs.count) jobs with status: \(status) to job queue")
for jobId in jobs {
try await self.addToQueue(jobId: jobId, connection: connection)
}
case .doNothing:
break
}
}
}
/// extend PostgresJobQueue to conform to AsyncSequence
extension PostgresQueue {
public struct AsyncIterator: AsyncIteratorProtocol {
let queue: PostgresQueue
public func next() async throws -> Element? {
while true {
if self.queue.isStopped.withLockedValue({ $0 }) {
return nil
}
if let job = try await queue.popFirst() {
return job
}
// we only sleep if we didn't receive a job
try await Task.sleep(for: self.queue.configuration.pollTime)
}
}
}
public func makeAsyncIterator() -> AsyncIterator {
return .init(queue: self)
}
}
@_spi(ConnectionPool)
extension JobQueueDriver where Self == PostgresQueue {
/// Return Postgres driver for Job Queue
/// - Parameters:
/// - client: Postgres client
/// - configuration: Queue configuration
/// - logger: Logger used by queue
public static func postgres(client: PostgresClient, migrations: PostgresMigrations, configuration: PostgresQueue.Configuration = .init(), logger: Logger) async -> Self {
await Self(client: client, migrations: migrations, configuration: configuration, logger: logger)
}
}