From 49b26a9920a77da9c2acbcb2bee3097b7e62c0b8 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Thu, 22 Feb 2024 18:16:54 +0000 Subject: [PATCH 01/11] Add migrations for HBPostgresJobQueue --- .../CreateJobQueue.swift | 59 +++++++++++++ .../PostgresJobsQueue.swift | 87 ++++++++----------- .../HummingbirdPostgresTests/JobsTests.swift | 45 ++++++---- 3 files changed, 124 insertions(+), 67 deletions(-) create mode 100644 Sources/HummingbirdJobsPostgres/CreateJobQueue.swift diff --git a/Sources/HummingbirdJobsPostgres/CreateJobQueue.swift b/Sources/HummingbirdJobsPostgres/CreateJobQueue.swift new file mode 100644 index 0000000..6b1aa93 --- /dev/null +++ b/Sources/HummingbirdJobsPostgres/CreateJobQueue.swift @@ -0,0 +1,59 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Hummingbird server framework project +// +// Copyright (c) 2024 the Hummingbird authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import HummingbirdPostgres +import Logging +@_spi(ConnectionPool) import PostgresNIO + +struct CreateJobQueue: HBPostgresMigration { + func apply(connection: PostgresConnection, logger: Logger) async throws { + try await connection.query( + """ + CREATE TABLE IF NOT EXISTS _hb_jobs ( + id uuid PRIMARY KEY, + job json, + status smallint + ) + """, + logger: logger + ) + try await connection.query( + """ + CREATE TABLE IF NOT EXISTS _hb_job_queue ( + job_id uuid PRIMARY KEY, + createdAt timestamp with time zone + ) + """, + logger: logger + ) + try await connection.query( + """ + CREATE INDEX IF NOT EXISTS _hb_job_queueidx + ON _hb_job_queue (createdAt ASC) + """, + logger: logger + ) + } + + func revert(connection: PostgresConnection, logger: Logger) async throws { + try await connection.query( + "DROP TABLE _hb_jobs", + logger: logger + ) + try await connection.query( + "DROP TABLE _hb_job_queue", + logger: logger + ) + } +} diff --git a/Sources/HummingbirdJobsPostgres/PostgresJobsQueue.swift b/Sources/HummingbirdJobsPostgres/PostgresJobsQueue.swift index a38414e..685de72 100644 --- a/Sources/HummingbirdJobsPostgres/PostgresJobsQueue.swift +++ b/Sources/HummingbirdJobsPostgres/PostgresJobsQueue.swift @@ -1,3 +1,17 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Hummingbird server framework project +// +// Copyright (c) 2024 the Hummingbird authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + import Foundation import HummingbirdJobs @_spi(ConnectionPool) import HummingbirdPostgres @@ -38,23 +52,17 @@ public final class HBPostgresQueue: HBJobQueueDriver { /// Queue configuration public struct Configuration: Sendable { - let jobTable: String - let jobQueueTable: String let pendingJobsInitialization: JobInitialization let failedJobsInitialization: JobInitialization let processingJobsInitialization: JobInitialization let pollTime: Duration public init( - jobTable: String = "_hb_jobs", - jobQueueTable: String = "_hb_job_queue", - pendingJobsInitialization: HBPostgresQueue.JobInitialization = .doNothing, - failedJobsInitialization: HBPostgresQueue.JobInitialization = .rerun, - processingJobsInitialization: HBPostgresQueue.JobInitialization = .rerun, + pendingJobsInitialization: JobInitialization = .doNothing, + failedJobsInitialization: JobInitialization = .rerun, + processingJobsInitialization: JobInitialization = .rerun, pollTime: Duration = .milliseconds(100) ) { - self.jobTable = jobTable - self.jobQueueTable = jobQueueTable self.pendingJobsInitialization = pendingJobsInitialization self.failedJobsInitialization = failedJobsInitialization self.processingJobsInitialization = processingJobsInitialization @@ -68,50 +76,27 @@ public final class HBPostgresQueue: HBJobQueueDriver { public let configuration: Configuration /// Logger used by queue public let logger: Logger + + let migrations: HBPostgresMigrations let isStopped: NIOLockedValueBox /// Initialize a HBPostgresJobQueue - /// - Parameters: - /// - client: Postgres client - /// - configuration: Queue configuration - /// - logger: Logger used by queue - public init(client: PostgresClient, configuration: Configuration = .init(), logger: Logger) { + public init(client: PostgresClient, migrations: HBPostgresMigrations, configuration: Configuration = .init(), logger: Logger) async { self.client = client self.configuration = configuration self.logger = logger self.isStopped = .init(false) + self.migrations = migrations + await migrations.add(CreateJobQueue()) } /// Run on initialization of the job queue public func onInit() async throws { do { + self.logger.debug("Waiting for JobQueue migrations") + try await self.migrations.waitUntilCompleted() _ = try await self.client.withConnection { connection in - try await connection.query( - """ - CREATE TABLE IF NOT EXISTS \(unescaped: self.configuration.jobTable) ( - id uuid PRIMARY KEY, - job bytea, - status smallint - ) - """, - logger: self.logger - ) - try await connection.query( - """ - CREATE TABLE IF NOT EXISTS \(unescaped: self.configuration.jobQueueTable) ( - job_id uuid PRIMARY KEY, - createdAt timestamp with time zone - ) - """, - logger: self.logger - ) - try await connection.query( - """ - CREATE INDEX IF NOT EXISTS \(unescaped: self.configuration.jobQueueTable)idx - ON \(unescaped: self.configuration.jobQueueTable) (createdAt ASC) - """, - logger: self.logger - ) + self.logger.debug("Update Jobs at initialization") try await self.updateJobsOnInit(withStatus: .pending, onInit: self.configuration.pendingJobsInitialization, connection: connection) try await self.updateJobsOnInit(withStatus: .processing, onInit: self.configuration.processingJobsInitialization, connection: connection) try await self.updateJobsOnInit(withStatus: .failed, onInit: self.configuration.failedJobsInitialization, connection: connection) @@ -163,10 +148,10 @@ public final class HBPostgresQueue: HBJobQueueDriver { let stream = try await connection.query( """ DELETE - FROM \(unescaped: self.configuration.jobQueueTable) pse + FROM _hb_job_queue pse WHERE pse.job_id = (SELECT pse_inner.job_id - FROM \(unescaped: self.configuration.jobQueueTable) pse_inner + FROM _hb_job_queue pse_inner ORDER BY pse_inner.createdAt ASC FOR UPDATE SKIP LOCKED LIMIT 1) @@ -180,7 +165,7 @@ public final class HBPostgresQueue: HBJobQueueDriver { } // select job from job table let stream2 = try await connection.query( - "SELECT job FROM \(unescaped: self.configuration.jobTable) WHERE id = \(jobId)", + "SELECT job FROM _hb_jobs WHERE id = \(jobId)", logger: self.logger ) @@ -206,7 +191,7 @@ public final class HBPostgresQueue: HBJobQueueDriver { func add(_ job: HBQueuedJob, connection: PostgresConnection) async throws { try await connection.query( """ - INSERT INTO \(unescaped: self.configuration.jobTable) (id, job, status) + INSERT INTO _hb_jobs (id, job, status) VALUES (\(job.id), \(job.jobBuffer), \(Status.pending)) """, logger: self.logger @@ -215,7 +200,7 @@ public final class HBPostgresQueue: HBJobQueueDriver { func delete(jobId: JobID, connection: PostgresConnection) async throws { try await connection.query( - "DELETE FROM \(unescaped: self.configuration.jobTable) WHERE id = \(jobId)", + "DELETE FROM _hb_jobs WHERE id = \(jobId)", logger: self.logger ) } @@ -223,7 +208,7 @@ public final class HBPostgresQueue: HBJobQueueDriver { func addToQueue(jobId: JobID, connection: PostgresConnection) async throws { try await connection.query( """ - INSERT INTO \(unescaped: self.configuration.jobQueueTable) (job_id, createdAt) VALUES (\(jobId), \(Date.now)) + INSERT INTO _hb_job_queue (job_id, createdAt) VALUES (\(jobId), \(Date.now)) """, logger: self.logger ) @@ -231,7 +216,7 @@ public final class HBPostgresQueue: HBJobQueueDriver { func setStatus(jobId: JobID, status: Status, connection: PostgresConnection) async throws { try await connection.query( - "UPDATE \(unescaped: self.configuration.jobTable) SET status = \(status) WHERE id = \(jobId)", + "UPDATE _hb_jobs SET status = \(status) WHERE id = \(jobId)", logger: self.logger ) } @@ -239,7 +224,7 @@ public final class HBPostgresQueue: HBJobQueueDriver { func getJobs(withStatus status: Status) async throws -> [JobID] { return try await self.client.withConnection { connection in let stream = try await connection.query( - "SELECT id FROM \(unescaped: self.configuration.jobTable) WHERE status = \(status)", + "SELECT id FROM _hb_jobs WHERE status = \(status)", logger: self.logger ) var jobs: [JobID] = [] @@ -254,7 +239,7 @@ public final class HBPostgresQueue: HBJobQueueDriver { switch onInit { case .remove: try await connection.query( - "DELETE FROM \(unescaped: self.configuration.jobTable) WHERE status = \(status)", + "DELETE FROM _hb_jobs WHERE status = \(status)", logger: self.logger ) case .rerun: @@ -300,7 +285,7 @@ extension HBJobQueueDriver where Self == HBPostgresQueue { /// - client: Postgres client /// - configuration: Queue configuration /// - logger: Logger used by queue - public static func postgres(client: PostgresClient, configuration: HBPostgresQueue.Configuration = .init(), logger: Logger) -> Self { - .init(client: client, configuration: configuration, logger: logger) + public static func postgres(client: PostgresClient, migrations: HBPostgresMigrations, configuration: HBPostgresQueue.Configuration = .init(), logger: Logger) async -> Self { + await Self(client: client, migrations: migrations, configuration: configuration, logger: logger) } } diff --git a/Tests/HummingbirdPostgresTests/JobsTests.swift b/Tests/HummingbirdPostgresTests/JobsTests.swift index 2a66e81..c06c156 100644 --- a/Tests/HummingbirdPostgresTests/JobsTests.swift +++ b/Tests/HummingbirdPostgresTests/JobsTests.swift @@ -15,6 +15,7 @@ import Atomics import Hummingbird import HummingbirdJobs +@testable @_spi(ConnectionPool) import HummingbirdPostgres @testable @_spi(ConnectionPool) import HummingbirdJobsPostgres import HummingbirdXCT import NIOConcurrencyHelpers @@ -50,9 +51,11 @@ final class JobsTests: XCTestCase { configuration: getPostgresConfiguration(), backgroundLogger: logger ) + let postgresMigrations = HBPostgresMigrations() return HBJobQueue( HBPostgresQueue( client: postgresClient, + migrations: postgresMigrations, configuration: configuration, logger: logger ), @@ -81,9 +84,9 @@ final class JobsTests: XCTestCase { group.addTask { try await serviceGroup.run() } - try await Task.sleep(for: .seconds(1)) do { - let value = try await test(jobQueue) + try await postgresMigrations.apply(client: postgresClient, logger: logger, dryRun: false) + let value = try await test(postgresJobQueue) await serviceGroup.triggerGracefulShutdown() return value } catch let error as PSQLError { @@ -314,14 +317,22 @@ final class JobsTests: XCTestCase { configuration: getPostgresConfiguration(), backgroundLogger: logger ) + let postgresMigrations = HBPostgresMigrations() let jobQueue = HBJobQueue( - .postgres(client: postgresClient, logger: logger), + .postgres( + client: postgresClient, + migrations: postgresMigrations, + configuration: .init(failedJobsInitialization: .remove, processingJobsInitialization: .remove), + logger: logger + ), numWorkers: 2, logger: logger ) let jobQueue2 = HBJobQueue( HBPostgresQueue( client: postgresClient, + migrations: postgresMigrations, + configuration: .init(failedJobsInitialization: .remove, processingJobsInitialization: .remove), logger: logger ), numWorkers: 2, @@ -337,20 +348,22 @@ final class JobsTests: XCTestCase { gracefulShutdownSignals: [.sigterm, .sigint], logger: logger ) - ) - group.addTask { - try await serviceGroup.run() - } - do { - for i in 0..<200 { - try await jobQueue.push(id: jobIdentifer, parameters: i) + group.addTask { + try await serviceGroup.run() + } + try await postgresMigrations.apply(client: postgresClient, logger: logger, dryRun: false) + try await postgresMigrations2.apply(client: postgresClient, logger: logger, dryRun: false) + do { + for i in 0..<200 { + try await postgresJobQueue.push(id: jobIdentifer, parameters: i) + } + await self.wait(for: [expectation], timeout: 5) + await serviceGroup.triggerGracefulShutdown() + } catch { + XCTFail("\(String(reflecting: error))") + await serviceGroup.triggerGracefulShutdown() + throw error } - await self.wait(for: [expectation], timeout: 5) - await serviceGroup.triggerGracefulShutdown() - } catch { - XCTFail("\(String(reflecting: error))") - await serviceGroup.triggerGracefulShutdown() - throw error } } } From cace687f0be9c96c5976434ce715374e328ab97a Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Fri, 23 Feb 2024 07:52:33 +0000 Subject: [PATCH 02/11] Add persist migration --- .../CreateJobQueue.swift | 2 + .../CreatePersistTable.swift | 40 +++++++++++++++++++ .../PostgresPersistDriver.swift | 15 +++---- .../PersistTests.swift | 13 +++--- 4 files changed, 58 insertions(+), 12 deletions(-) create mode 100644 Sources/HummingbirdPostgres/CreatePersistTable.swift diff --git a/Sources/HummingbirdJobsPostgres/CreateJobQueue.swift b/Sources/HummingbirdJobsPostgres/CreateJobQueue.swift index 6b1aa93..64a3403 100644 --- a/Sources/HummingbirdJobsPostgres/CreateJobQueue.swift +++ b/Sources/HummingbirdJobsPostgres/CreateJobQueue.swift @@ -56,4 +56,6 @@ struct CreateJobQueue: HBPostgresMigration { logger: logger ) } + + var name: String { "_Create_JobQueue_Table_" } } diff --git a/Sources/HummingbirdPostgres/CreatePersistTable.swift b/Sources/HummingbirdPostgres/CreatePersistTable.swift new file mode 100644 index 0000000..aa8b688 --- /dev/null +++ b/Sources/HummingbirdPostgres/CreatePersistTable.swift @@ -0,0 +1,40 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Hummingbird server framework project +// +// Copyright (c) 2024 the Hummingbird authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Logging +@_spi(ConnectionPool) import PostgresNIO + +struct CreatePersistTable: HBPostgresMigration { + func apply(connection: PostgresConnection, logger: Logger) async throws { + try await connection.query( + """ + CREATE TABLE IF NOT EXISTS _hb_persist ( + "id" text PRIMARY KEY, + "data" json NOT NULL, + "expires" timestamp with time zone NOT NULL + ) + """, + logger: logger + ) + } + + func revert(connection: PostgresConnection, logger: Logger) async throws { + try await connection.query( + "DROP TABLE _hb_persist", + logger: logger + ) + } + + var name: String { "_Create_Persist_Table_"} +} diff --git a/Sources/HummingbirdPostgres/PostgresPersistDriver.swift b/Sources/HummingbirdPostgres/PostgresPersistDriver.swift index 4e0b8f7..36a7fa8 100644 --- a/Sources/HummingbirdPostgres/PostgresPersistDriver.swift +++ b/Sources/HummingbirdPostgres/PostgresPersistDriver.swift @@ -56,10 +56,11 @@ public final class HBPostgresPersistDriver: HBPersistDriver { /// - client: Postgres client /// - tidyUpFrequequency: How frequently cleanup expired database entries should occur @_spi(ConnectionPool) - public init(client: PostgresClient, tidyUpFrequency: Duration = .seconds(600), logger: Logger) { + public init(client: PostgresClient, migrations: HBPostgresMigrations, tidyUpFrequency: Duration = .seconds(600), logger: Logger) async { self.client = client self.logger = logger self.tidyUpFrequency = tidyUpFrequency + await migrations.add(CreatePersistTable()) } /// Create new key. This doesn't check for the existence of this key already so may fail if the key already exists @@ -68,7 +69,7 @@ public final class HBPostgresPersistDriver: HBPersistDriver { try await self.client.withConnection { connection in do { try await connection.query( - "INSERT INTO _hb_psql_persist (id, data, expires) VALUES (\(key), \(WrapperObject(value)), \(expires))", + "INSERT INTO _hb_persist (id, data, expires) VALUES (\(key), \(WrapperObject(value)), \(expires))", logger: self.logger ) } catch let error as PSQLError { @@ -87,7 +88,7 @@ public final class HBPostgresPersistDriver: HBPersistDriver { _ = try await self.client.withConnection { connection in try await connection.query( """ - INSERT INTO _hb_psql_persist (id, data, expires) VALUES (\(key), \(WrapperObject(value)), \(expires)) + INSERT INTO _hb_persist (id, data, expires) VALUES (\(key), \(WrapperObject(value)), \(expires)) ON CONFLICT (id) DO UPDATE SET data = \(WrapperObject(value)), expires = \(expires) """, @@ -100,7 +101,7 @@ public final class HBPostgresPersistDriver: HBPersistDriver { public func get(key: String, as object: Object.Type) async throws -> Object? { try await self.client.withConnection { connection in let stream = try await connection.query( - "SELECT data, expires FROM _hb_psql_persist WHERE id = \(key)", + "SELECT data, expires FROM _hb_persist WHERE id = \(key)", logger: self.logger ) guard let result = try await stream.decode((WrapperObject, Date).self) @@ -117,7 +118,7 @@ public final class HBPostgresPersistDriver: HBPersistDriver { public func remove(key: String) async throws { _ = try await self.client.withConnection { connection in try await connection.query( - "DELETE FROM _hb_psql_persist WHERE id = \(key)", + "DELETE FROM _hb_persist WHERE id = \(key)", logger: self.logger ) } @@ -127,7 +128,7 @@ public final class HBPostgresPersistDriver: HBPersistDriver { func tidy() async throws { _ = try await self.client.withConnection { connection in try await connection.query( - "DELETE FROM _hb_psql_persist WHERE expires < \(Date.now)", + "DELETE FROM _hb_persist WHERE expires < \(Date.now)", logger: self.logger ) } @@ -141,7 +142,7 @@ extension HBPostgresPersistDriver { _ = try await self.client.withConnection { connection in try await connection.query( """ - CREATE TABLE IF NOT EXISTS _hb_psql_persist ( + CREATE TABLE IF NOT EXISTS _hb_persist ( "id" text PRIMARY KEY, "data" json NOT NULL, "expires" timestamp with time zone NOT NULL diff --git a/Tests/HummingbirdPostgresTests/PersistTests.swift b/Tests/HummingbirdPostgresTests/PersistTests.swift index dc85399..a9e318c 100644 --- a/Tests/HummingbirdPostgresTests/PersistTests.swift +++ b/Tests/HummingbirdPostgresTests/PersistTests.swift @@ -34,13 +34,17 @@ final class PersistTests: XCTestCase { } } } - var logger = Logger(label: "PersistTests") - logger.logLevel = .debug + let logger = { + var logger = Logger(label: "PersistTests") + logger.logLevel = .debug + return logger + }() let postgresClient = try await PostgresClient( configuration: getPostgresConfiguration(), backgroundLogger: logger ) - let persist = HBPostgresPersistDriver(client: postgresClient, logger: logger) + let postgresMigrations = HBPostgresMigrations() + let persist = await HBPostgresPersistDriver(client: postgresClient, migrations: postgresMigrations, logger: logger) let router = HBRouter() router.middlewares.add(PostgresErrorMiddleware()) router.put("/persist/:tag") { request, context -> HTTPResponse.Status in @@ -69,8 +73,7 @@ final class PersistTests: XCTestCase { var app = HBApplication(responder: router.buildResponder()) app.addServices(PostgresClientService(client: postgresClient), persist) app.runBeforeServerStart { - // temporary fix to ensure persist table is created before we use it - try await Task.sleep(for: .milliseconds(400)) + try await postgresMigrations.apply(client: postgresClient, logger: logger, dryRun: false) } return app From 28a831ef4aea6b813a1d7ced0ae7d70d6c04eeb4 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Fri, 23 Feb 2024 12:14:00 +0000 Subject: [PATCH 03/11] Set job queue group --- Sources/HummingbirdJobsPostgres/CreateJobQueue.swift | 5 +++++ Tests/HummingbirdPostgresTests/JobsTests.swift | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/Sources/HummingbirdJobsPostgres/CreateJobQueue.swift b/Sources/HummingbirdJobsPostgres/CreateJobQueue.swift index 64a3403..1994424 100644 --- a/Sources/HummingbirdJobsPostgres/CreateJobQueue.swift +++ b/Sources/HummingbirdJobsPostgres/CreateJobQueue.swift @@ -58,4 +58,9 @@ struct CreateJobQueue: HBPostgresMigration { } var name: String { "_Create_JobQueue_Table_" } + var group: HBMigrationGroup { .jobQueue } +} + +extension HBMigrationGroup { + static var jobQueue: Self { .init("_hb_jobqueue") } } diff --git a/Tests/HummingbirdPostgresTests/JobsTests.swift b/Tests/HummingbirdPostgresTests/JobsTests.swift index c06c156..e6f6b6c 100644 --- a/Tests/HummingbirdPostgresTests/JobsTests.swift +++ b/Tests/HummingbirdPostgresTests/JobsTests.swift @@ -85,7 +85,7 @@ final class JobsTests: XCTestCase { try await serviceGroup.run() } do { - try await postgresMigrations.apply(client: postgresClient, logger: logger, dryRun: false) + try await postgresMigrations.apply(client: postgresClient, groups: [.jobQueue], logger: logger, dryRun: false) let value = try await test(postgresJobQueue) await serviceGroup.triggerGracefulShutdown() return value From 77ae661f272ab66b06b6b4a54fa793e8b5bec050 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Fri, 23 Feb 2024 12:39:42 +0000 Subject: [PATCH 04/11] Persist driver uses migrations --- .../PostgresPersistDriver.swift | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/Sources/HummingbirdPostgres/PostgresPersistDriver.swift b/Sources/HummingbirdPostgres/PostgresPersistDriver.swift index 36a7fa8..874440e 100644 --- a/Sources/HummingbirdPostgres/PostgresPersistDriver.swift +++ b/Sources/HummingbirdPostgres/PostgresPersistDriver.swift @@ -50,6 +50,7 @@ public final class HBPostgresPersistDriver: HBPersistDriver { let client: PostgresClient let logger: Logger let tidyUpFrequency: Duration + let migrations: HBPostgresMigrations /// Initialize HBFluentPersistDriver /// - Parameters: @@ -60,6 +61,7 @@ public final class HBPostgresPersistDriver: HBPersistDriver { self.client = client self.logger = logger self.tidyUpFrequency = tidyUpFrequency + self.migrations = migrations await migrations.add(CreatePersistTable()) } @@ -138,20 +140,11 @@ public final class HBPostgresPersistDriver: HBPersistDriver { /// Service protocol requirements extension HBPostgresPersistDriver { public func run() async throws { - // create table to save persist models - _ = try await self.client.withConnection { connection in - try await connection.query( - """ - CREATE TABLE IF NOT EXISTS _hb_persist ( - "id" text PRIMARY KEY, - "data" json NOT NULL, - "expires" timestamp with time zone NOT NULL - ) - """, - logger: self.logger - ) - } + self.logger.info("Waiting for persist driver migrations to complete") + try await self.migrations.waitUntilCompleted() + // do an initial tidy to clear out expired values + self.logger.info("Tidy persist database") try await self.tidy() let timerSequence = AsyncTimerSequence( From 7c0470f28214b46c37346a92de6d29d88e1ed0e0 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Fri, 23 Feb 2024 12:40:08 +0000 Subject: [PATCH 05/11] Set migration group for jobs queue and persist --- Sources/HummingbirdJobsPostgres/CreateJobQueue.swift | 3 ++- Sources/HummingbirdPostgres/CreatePersistTable.swift | 8 +++++++- Tests/HummingbirdPostgresTests/JobsTests.swift | 4 ++-- Tests/HummingbirdPostgresTests/PersistTests.swift | 2 +- 4 files changed, 12 insertions(+), 5 deletions(-) diff --git a/Sources/HummingbirdJobsPostgres/CreateJobQueue.swift b/Sources/HummingbirdJobsPostgres/CreateJobQueue.swift index 1994424..1402b63 100644 --- a/Sources/HummingbirdJobsPostgres/CreateJobQueue.swift +++ b/Sources/HummingbirdJobsPostgres/CreateJobQueue.swift @@ -62,5 +62,6 @@ struct CreateJobQueue: HBPostgresMigration { } extension HBMigrationGroup { - static var jobQueue: Self { .init("_hb_jobqueue") } + /// JobQueue migration group + public static var jobQueue: Self { .init("_hb_jobqueue") } } diff --git a/Sources/HummingbirdPostgres/CreatePersistTable.swift b/Sources/HummingbirdPostgres/CreatePersistTable.swift index aa8b688..b7656b3 100644 --- a/Sources/HummingbirdPostgres/CreatePersistTable.swift +++ b/Sources/HummingbirdPostgres/CreatePersistTable.swift @@ -36,5 +36,11 @@ struct CreatePersistTable: HBPostgresMigration { ) } - var name: String { "_Create_Persist_Table_"} + var name: String { "_Create_Persist_Table_" } + var group: HBMigrationGroup { .persist } +} + +extension HBMigrationGroup { + /// Persist driver migration group + public static var persist: Self { .init("_hb_persist") } } diff --git a/Tests/HummingbirdPostgresTests/JobsTests.swift b/Tests/HummingbirdPostgresTests/JobsTests.swift index e6f6b6c..bcfd744 100644 --- a/Tests/HummingbirdPostgresTests/JobsTests.swift +++ b/Tests/HummingbirdPostgresTests/JobsTests.swift @@ -351,8 +351,8 @@ final class JobsTests: XCTestCase { group.addTask { try await serviceGroup.run() } - try await postgresMigrations.apply(client: postgresClient, logger: logger, dryRun: false) - try await postgresMigrations2.apply(client: postgresClient, logger: logger, dryRun: false) + try await postgresMigrations.apply(client: postgresClient, groups: [.jobQueue], logger: logger, dryRun: false) + try await postgresMigrations2.apply(client: postgresClient, groups: [.jobQueue], logger: logger, dryRun: false) do { for i in 0..<200 { try await postgresJobQueue.push(id: jobIdentifer, parameters: i) diff --git a/Tests/HummingbirdPostgresTests/PersistTests.swift b/Tests/HummingbirdPostgresTests/PersistTests.swift index a9e318c..39207f9 100644 --- a/Tests/HummingbirdPostgresTests/PersistTests.swift +++ b/Tests/HummingbirdPostgresTests/PersistTests.swift @@ -73,7 +73,7 @@ final class PersistTests: XCTestCase { var app = HBApplication(responder: router.buildResponder()) app.addServices(PostgresClientService(client: postgresClient), persist) app.runBeforeServerStart { - try await postgresMigrations.apply(client: postgresClient, logger: logger, dryRun: false) + try await postgresMigrations.apply(client: postgresClient, groups: [.persist], logger: logger, dryRun: false) } return app From 3c94c17ba85b947f10dc74a37dfae693806b888d Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Mon, 4 Mar 2024 15:26:39 +0000 Subject: [PATCH 06/11] Add Job last modified --- Sources/HummingbirdJobsPostgres/CreateJobQueue.swift | 3 ++- Sources/HummingbirdJobsPostgres/PostgresJobsQueue.swift | 2 +- Tests/HummingbirdPostgresTests/JobsTests.swift | 4 ++-- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/Sources/HummingbirdJobsPostgres/CreateJobQueue.swift b/Sources/HummingbirdJobsPostgres/CreateJobQueue.swift index 1402b63..f18ba66 100644 --- a/Sources/HummingbirdJobsPostgres/CreateJobQueue.swift +++ b/Sources/HummingbirdJobsPostgres/CreateJobQueue.swift @@ -23,7 +23,8 @@ struct CreateJobQueue: HBPostgresMigration { CREATE TABLE IF NOT EXISTS _hb_jobs ( id uuid PRIMARY KEY, job json, - status smallint + status smallint, + lastModified TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP ) """, logger: logger diff --git a/Sources/HummingbirdJobsPostgres/PostgresJobsQueue.swift b/Sources/HummingbirdJobsPostgres/PostgresJobsQueue.swift index 685de72..a44adf6 100644 --- a/Sources/HummingbirdJobsPostgres/PostgresJobsQueue.swift +++ b/Sources/HummingbirdJobsPostgres/PostgresJobsQueue.swift @@ -216,7 +216,7 @@ public final class HBPostgresQueue: HBJobQueueDriver { func setStatus(jobId: JobID, status: Status, connection: PostgresConnection) async throws { try await connection.query( - "UPDATE _hb_jobs SET status = \(status) WHERE id = \(jobId)", + "UPDATE _hb_jobs SET status = \(status), lastModified = \(Date.now) WHERE id = \(jobId)", logger: self.logger ) } diff --git a/Tests/HummingbirdPostgresTests/JobsTests.swift b/Tests/HummingbirdPostgresTests/JobsTests.swift index bcfd744..5105c23 100644 --- a/Tests/HummingbirdPostgresTests/JobsTests.swift +++ b/Tests/HummingbirdPostgresTests/JobsTests.swift @@ -231,8 +231,8 @@ final class JobsTests: XCTestCase { try await jobQueue.push(id: jobIdentifer, parameters: 0) await self.wait(for: [expectation], timeout: 5) - let failedJobs = try await jobQueue.queue.getJobs(withStatus: .processing) - XCTAssertEqual(failedJobs.count, 1) + let processingJobs = try await jobQueue.queue.getJobs(withStatus: .processing) + XCTAssertEqual(processingJobs.count, 1) let pendingJobs = try await jobQueue.queue.getJobs(withStatus: .pending) XCTAssertEqual(pendingJobs.count, 0) return jobQueue From 345f0b6d2020d9bdbbdcb01e1907e5d2d13b5d63 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Wed, 6 Mar 2024 12:07:06 +0000 Subject: [PATCH 07/11] Revert migrations in tests, don't flag migrations as complete in a revert --- .../{ => Migrations}/CreateJobQueue.swift | 20 ------- .../Migrations/CreateJobs.swift | 48 ++++++++++++++++ .../PostgresJobsQueue.swift | 1 + Sources/HummingbirdPostgres/Migrations.swift | 9 ++- .../HummingbirdPostgresTests/JobsTests.swift | 56 +++++++++++-------- 5 files changed, 87 insertions(+), 47 deletions(-) rename Sources/HummingbirdJobsPostgres/{ => Migrations}/CreateJobQueue.swift (71%) create mode 100644 Sources/HummingbirdJobsPostgres/Migrations/CreateJobs.swift diff --git a/Sources/HummingbirdJobsPostgres/CreateJobQueue.swift b/Sources/HummingbirdJobsPostgres/Migrations/CreateJobQueue.swift similarity index 71% rename from Sources/HummingbirdJobsPostgres/CreateJobQueue.swift rename to Sources/HummingbirdJobsPostgres/Migrations/CreateJobQueue.swift index f18ba66..369581a 100644 --- a/Sources/HummingbirdJobsPostgres/CreateJobQueue.swift +++ b/Sources/HummingbirdJobsPostgres/Migrations/CreateJobQueue.swift @@ -18,17 +18,6 @@ import Logging struct CreateJobQueue: HBPostgresMigration { func apply(connection: PostgresConnection, logger: Logger) async throws { - try await connection.query( - """ - CREATE TABLE IF NOT EXISTS _hb_jobs ( - id uuid PRIMARY KEY, - job json, - status smallint, - lastModified TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP - ) - """, - logger: logger - ) try await connection.query( """ CREATE TABLE IF NOT EXISTS _hb_job_queue ( @@ -48,10 +37,6 @@ struct CreateJobQueue: HBPostgresMigration { } func revert(connection: PostgresConnection, logger: Logger) async throws { - try await connection.query( - "DROP TABLE _hb_jobs", - logger: logger - ) try await connection.query( "DROP TABLE _hb_job_queue", logger: logger @@ -61,8 +46,3 @@ struct CreateJobQueue: HBPostgresMigration { var name: String { "_Create_JobQueue_Table_" } var group: HBMigrationGroup { .jobQueue } } - -extension HBMigrationGroup { - /// JobQueue migration group - public static var jobQueue: Self { .init("_hb_jobqueue") } -} diff --git a/Sources/HummingbirdJobsPostgres/Migrations/CreateJobs.swift b/Sources/HummingbirdJobsPostgres/Migrations/CreateJobs.swift new file mode 100644 index 0000000..06681db --- /dev/null +++ b/Sources/HummingbirdJobsPostgres/Migrations/CreateJobs.swift @@ -0,0 +1,48 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Hummingbird server framework project +// +// Copyright (c) 2024 the Hummingbird authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import HummingbirdPostgres +import Logging +@_spi(ConnectionPool) import PostgresNIO + +struct CreateJobs: HBPostgresMigration { + func apply(connection: PostgresConnection, logger: Logger) async throws { + try await connection.query( + """ + CREATE TABLE IF NOT EXISTS _hb_jobs ( + id uuid PRIMARY KEY, + job bytea, + status smallint, + lastModified TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP + ) + """, + logger: logger + ) + } + + func revert(connection: PostgresConnection, logger: Logger) async throws { + try await connection.query( + "DROP TABLE _hb_jobs", + logger: logger + ) + } + + var name: String { "_Create_Jobs_Table_" } + var group: HBMigrationGroup { .jobQueue } +} + +extension HBMigrationGroup { + /// JobQueue migration group + public static var jobQueue: Self { .init("_hb_jobqueue") } +} diff --git a/Sources/HummingbirdJobsPostgres/PostgresJobsQueue.swift b/Sources/HummingbirdJobsPostgres/PostgresJobsQueue.swift index a44adf6..e095e3b 100644 --- a/Sources/HummingbirdJobsPostgres/PostgresJobsQueue.swift +++ b/Sources/HummingbirdJobsPostgres/PostgresJobsQueue.swift @@ -87,6 +87,7 @@ public final class HBPostgresQueue: HBJobQueueDriver { self.logger = logger self.isStopped = .init(false) self.migrations = migrations + await migrations.add(CreateJobs()) await migrations.add(CreateJobQueue()) } diff --git a/Sources/HummingbirdPostgres/Migrations.swift b/Sources/HummingbirdPostgres/Migrations.swift index a294785..e177a17 100644 --- a/Sources/HummingbirdPostgres/Migrations.swift +++ b/Sources/HummingbirdPostgres/Migrations.swift @@ -65,7 +65,7 @@ public actor HBPostgresMigrations { /// - dryRun: Should migrations actually be applied, or should we just report what would be applied and reverted @_spi(ConnectionPool) public func apply(client: PostgresClient, groups: [HBMigrationGroup] = [], logger: Logger, dryRun: Bool) async throws { - try await self.migrate(client: client, migrations: self.migrations, groups: groups, logger: logger, dryRun: dryRun) + try await self.migrate(client: client, migrations: self.migrations, groups: groups, logger: logger, completeMigrations: true, dryRun: dryRun) } /// Revery database migrations @@ -75,7 +75,7 @@ public actor HBPostgresMigrations { /// - dryRun: Should migrations actually be reverted, or should we just report what would be reverted @_spi(ConnectionPool) public func revert(client: PostgresClient, groups: [HBMigrationGroup] = [], logger: Logger, dryRun: Bool) async throws { - try await self.migrate(client: client, migrations: [], groups: groups, logger: logger, dryRun: dryRun) + try await self.migrate(client: client, migrations: [], groups: groups, logger: logger, completeMigrations: false, dryRun: dryRun) } private func migrate( @@ -83,6 +83,7 @@ public actor HBPostgresMigrations { migrations: [HBPostgresMigration], groups: [HBMigrationGroup], logger: Logger, + completeMigrations: Bool, dryRun: Bool ) async throws { switch self.state { @@ -151,7 +152,9 @@ public actor HBPostgresMigrations { self.setFailed(error) throw error } - self.setCompleted() + if completeMigrations { + self.setCompleted() + } } /// Report if the migration process has completed diff --git a/Tests/HummingbirdPostgresTests/JobsTests.swift b/Tests/HummingbirdPostgresTests/JobsTests.swift index 5105c23..eca32b9 100644 --- a/Tests/HummingbirdPostgresTests/JobsTests.swift +++ b/Tests/HummingbirdPostgresTests/JobsTests.swift @@ -52,7 +52,7 @@ final class JobsTests: XCTestCase { backgroundLogger: logger ) let postgresMigrations = HBPostgresMigrations() - return HBJobQueue( + return await HBJobQueue( HBPostgresQueue( client: postgresClient, migrations: postgresMigrations, @@ -70,6 +70,7 @@ final class JobsTests: XCTestCase { /// shutdown correctly @discardableResult public func testJobQueue( jobQueue: HBJobQueue, + revertMigrations: Bool = false, test: (HBJobQueue) async throws -> T ) async throws -> T { do { @@ -85,8 +86,14 @@ final class JobsTests: XCTestCase { try await serviceGroup.run() } do { - try await postgresMigrations.apply(client: postgresClient, groups: [.jobQueue], logger: logger, dryRun: false) - let value = try await test(postgresJobQueue) + let migrations = jobQueue.queue.migrations + let client = jobQueue.queue.client + let logger = jobQueue.queue.logger + if revertMigrations { + try await migrations.revert(client: client, groups: [.jobQueue], logger: logger, dryRun: false) + } + try await migrations.apply(client: client, groups: [.jobQueue], logger: logger, dryRun: false) + let value = try await test(jobQueue) await serviceGroup.triggerGracefulShutdown() return value } catch let error as PSQLError { @@ -114,7 +121,7 @@ final class JobsTests: XCTestCase { test: (HBJobQueue) async throws -> T ) async throws -> T { let jobQueue = try await self.createJobQueue(numWorkers: numWorkers, configuration: configuration) - return try await self.testJobQueue(jobQueue: jobQueue, test: test) + return try await self.testJobQueue(jobQueue: jobQueue, revertMigrations: true, test: test) } func testBasic() async throws { @@ -318,20 +325,21 @@ final class JobsTests: XCTestCase { backgroundLogger: logger ) let postgresMigrations = HBPostgresMigrations() - let jobQueue = HBJobQueue( + let jobQueue = await HBJobQueue( .postgres( - client: postgresClient, - migrations: postgresMigrations, + client: postgresClient, + migrations: postgresMigrations, configuration: .init(failedJobsInitialization: .remove, processingJobsInitialization: .remove), logger: logger ), numWorkers: 2, logger: logger ) - let jobQueue2 = HBJobQueue( + let postgresMigrations2 = HBPostgresMigrations() + let jobQueue2 = await HBJobQueue( HBPostgresQueue( client: postgresClient, - migrations: postgresMigrations, + migrations: postgresMigrations2, configuration: .init(failedJobsInitialization: .remove, processingJobsInitialization: .remove), logger: logger ), @@ -348,22 +356,22 @@ final class JobsTests: XCTestCase { gracefulShutdownSignals: [.sigterm, .sigint], logger: logger ) - group.addTask { - try await serviceGroup.run() - } - try await postgresMigrations.apply(client: postgresClient, groups: [.jobQueue], logger: logger, dryRun: false) - try await postgresMigrations2.apply(client: postgresClient, groups: [.jobQueue], logger: logger, dryRun: false) - do { - for i in 0..<200 { - try await postgresJobQueue.push(id: jobIdentifer, parameters: i) - } - await self.wait(for: [expectation], timeout: 5) - await serviceGroup.triggerGracefulShutdown() - } catch { - XCTFail("\(String(reflecting: error))") - await serviceGroup.triggerGracefulShutdown() - throw error + ) + group.addTask { + try await serviceGroup.run() + } + try await postgresMigrations.apply(client: postgresClient, groups: [.jobQueue], logger: logger, dryRun: false) + try await postgresMigrations2.apply(client: postgresClient, groups: [.jobQueue], logger: logger, dryRun: false) + do { + for i in 0..<200 { + try await jobQueue.push(id: jobIdentifer, parameters: i) } + await self.wait(for: [expectation], timeout: 5) + await serviceGroup.triggerGracefulShutdown() + } catch { + XCTFail("\(String(reflecting: error))") + await serviceGroup.triggerGracefulShutdown() + throw error } } } From 1de57aa89328999273cce78d825976a4e02b72c2 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Wed, 6 Mar 2024 13:39:09 +0000 Subject: [PATCH 08/11] Set job queue label as test name --- Tests/HummingbirdPostgresTests/JobsTests.swift | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/Tests/HummingbirdPostgresTests/JobsTests.swift b/Tests/HummingbirdPostgresTests/JobsTests.swift index eca32b9..f6fa1a8 100644 --- a/Tests/HummingbirdPostgresTests/JobsTests.swift +++ b/Tests/HummingbirdPostgresTests/JobsTests.swift @@ -41,9 +41,9 @@ final class JobsTests: XCTestCase { static let env = HBEnvironment() - func createJobQueue(numWorkers: Int, configuration: HBPostgresQueue.Configuration) async throws -> HBJobQueue { + func createJobQueue(numWorkers: Int, configuration: HBPostgresQueue.Configuration, function: String = #function) async throws -> HBJobQueue { let logger = { - var logger = Logger(label: "JobsTests") + var logger = Logger(label: function) logger.logLevel = .debug return logger }() @@ -118,10 +118,12 @@ final class JobsTests: XCTestCase { @discardableResult public func testJobQueue( numWorkers: Int, configuration: HBPostgresQueue.Configuration = .init(failedJobsInitialization: .remove, processingJobsInitialization: .remove), + revertMigrations: Bool = true, + function: String = #function, test: (HBJobQueue) async throws -> T ) async throws -> T { - let jobQueue = try await self.createJobQueue(numWorkers: numWorkers, configuration: configuration) - return try await self.testJobQueue(jobQueue: jobQueue, revertMigrations: true, test: test) + let jobQueue = try await self.createJobQueue(numWorkers: numWorkers, configuration: configuration, function: function) + return try await self.testJobQueue(jobQueue: jobQueue, revertMigrations: revertMigrations, test: test) } func testBasic() async throws { @@ -228,8 +230,7 @@ final class JobsTests: XCTestCase { func testShutdownJob() async throws { let jobIdentifer = HBJobIdentifier(#function) let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 1) - var logger = Logger(label: "HummingbirdJobsTests") - logger.logLevel = .trace + try await self.testJobQueue(numWorkers: 4) { jobQueue in jobQueue.registerJob(jobIdentifer) { _, _ in expectation.fulfill() @@ -311,7 +312,7 @@ final class JobsTests: XCTestCase { let jobIdentifer = HBJobIdentifier(#function) let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 200) let logger = { - var logger = Logger(label: "HummingbirdJobsTests") + var logger = Logger(label: "testMultipleJobQueueHandlers") logger.logLevel = .debug return logger }() From 29366887589a21e2ca7549aa3b3641c78319b59a Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Wed, 6 Mar 2024 13:57:41 +0000 Subject: [PATCH 09/11] Hopefully fix testRerunAtStartup --- Sources/HummingbirdJobsPostgres/PostgresJobsQueue.swift | 1 + Tests/HummingbirdPostgresTests/JobsTests.swift | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/Sources/HummingbirdJobsPostgres/PostgresJobsQueue.swift b/Sources/HummingbirdJobsPostgres/PostgresJobsQueue.swift index e095e3b..19404a2 100644 --- a/Sources/HummingbirdJobsPostgres/PostgresJobsQueue.swift +++ b/Sources/HummingbirdJobsPostgres/PostgresJobsQueue.swift @@ -246,6 +246,7 @@ public final class HBPostgresQueue: HBJobQueueDriver { case .rerun: guard status != .pending else { return } let jobs = try await getJobs(withStatus: status) + self.logger.info("Moving \(jobs.count) jobs with status: \(status) to job queue") for jobId in jobs { try await self.addToQueue(jobId: jobId, connection: connection) } diff --git a/Tests/HummingbirdPostgresTests/JobsTests.swift b/Tests/HummingbirdPostgresTests/JobsTests.swift index f6fa1a8..c72c288 100644 --- a/Tests/HummingbirdPostgresTests/JobsTests.swift +++ b/Tests/HummingbirdPostgresTests/JobsTests.swift @@ -287,7 +287,7 @@ final class JobsTests: XCTestCase { } let jobQueue = try await createJobQueue(numWorkers: 1, configuration: .init(pendingJobsInitialization: .remove, failedJobsInitialization: .rerun)) jobQueue.registerJob(job) - try await self.testJobQueue(jobQueue: jobQueue) { jobQueue in + try await self.testJobQueue(jobQueue: jobQueue, revertMigrations: true) { jobQueue in try await jobQueue.push(id: jobIdentifer, parameters: 0) From f027180467da5017d604745fef2bbd7bf53b358d Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Wed, 6 Mar 2024 18:34:17 +0000 Subject: [PATCH 10/11] Update log level for onInit logging --- Sources/HummingbirdJobsPostgres/PostgresJobsQueue.swift | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Sources/HummingbirdJobsPostgres/PostgresJobsQueue.swift b/Sources/HummingbirdJobsPostgres/PostgresJobsQueue.swift index 19404a2..6e00e63 100644 --- a/Sources/HummingbirdJobsPostgres/PostgresJobsQueue.swift +++ b/Sources/HummingbirdJobsPostgres/PostgresJobsQueue.swift @@ -94,10 +94,10 @@ public final class HBPostgresQueue: HBJobQueueDriver { /// Run on initialization of the job queue public func onInit() async throws { do { - self.logger.debug("Waiting for JobQueue migrations") + self.logger.info("Waiting for JobQueue migrations") try await self.migrations.waitUntilCompleted() _ = try await self.client.withConnection { connection in - self.logger.debug("Update Jobs at initialization") + self.logger.info("Update Jobs at initialization") try await self.updateJobsOnInit(withStatus: .pending, onInit: self.configuration.pendingJobsInitialization, connection: connection) try await self.updateJobsOnInit(withStatus: .processing, onInit: self.configuration.processingJobsInitialization, connection: connection) try await self.updateJobsOnInit(withStatus: .failed, onInit: self.configuration.failedJobsInitialization, connection: connection) From 7b45b6043d0af330fff95cb9f3bb24d752989c16 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Thu, 7 Mar 2024 17:52:59 +0000 Subject: [PATCH 11/11] Rename tables to include pg --- .../Migrations/CreateJobQueue.swift | 6 +++--- .../Migrations/CreateJobs.swift | 4 ++-- .../PostgresJobsQueue.swift | 18 +++++++++--------- .../CreatePersistTable.swift | 6 +++--- .../PostgresPersistDriver.swift | 10 +++++----- 5 files changed, 22 insertions(+), 22 deletions(-) diff --git a/Sources/HummingbirdJobsPostgres/Migrations/CreateJobQueue.swift b/Sources/HummingbirdJobsPostgres/Migrations/CreateJobQueue.swift index 369581a..6353981 100644 --- a/Sources/HummingbirdJobsPostgres/Migrations/CreateJobQueue.swift +++ b/Sources/HummingbirdJobsPostgres/Migrations/CreateJobQueue.swift @@ -20,7 +20,7 @@ struct CreateJobQueue: HBPostgresMigration { func apply(connection: PostgresConnection, logger: Logger) async throws { try await connection.query( """ - CREATE TABLE IF NOT EXISTS _hb_job_queue ( + CREATE TABLE IF NOT EXISTS _hb_pg_job_queue ( job_id uuid PRIMARY KEY, createdAt timestamp with time zone ) @@ -30,7 +30,7 @@ struct CreateJobQueue: HBPostgresMigration { try await connection.query( """ CREATE INDEX IF NOT EXISTS _hb_job_queueidx - ON _hb_job_queue (createdAt ASC) + ON _hb_pg_job_queue (createdAt ASC) """, logger: logger ) @@ -38,7 +38,7 @@ struct CreateJobQueue: HBPostgresMigration { func revert(connection: PostgresConnection, logger: Logger) async throws { try await connection.query( - "DROP TABLE _hb_job_queue", + "DROP TABLE _hb_pg_job_queue", logger: logger ) } diff --git a/Sources/HummingbirdJobsPostgres/Migrations/CreateJobs.swift b/Sources/HummingbirdJobsPostgres/Migrations/CreateJobs.swift index 06681db..bad67e0 100644 --- a/Sources/HummingbirdJobsPostgres/Migrations/CreateJobs.swift +++ b/Sources/HummingbirdJobsPostgres/Migrations/CreateJobs.swift @@ -20,7 +20,7 @@ struct CreateJobs: HBPostgresMigration { func apply(connection: PostgresConnection, logger: Logger) async throws { try await connection.query( """ - CREATE TABLE IF NOT EXISTS _hb_jobs ( + CREATE TABLE IF NOT EXISTS _hb_pg_jobs ( id uuid PRIMARY KEY, job bytea, status smallint, @@ -33,7 +33,7 @@ struct CreateJobs: HBPostgresMigration { func revert(connection: PostgresConnection, logger: Logger) async throws { try await connection.query( - "DROP TABLE _hb_jobs", + "DROP TABLE _hb_pg_jobs", logger: logger ) } diff --git a/Sources/HummingbirdJobsPostgres/PostgresJobsQueue.swift b/Sources/HummingbirdJobsPostgres/PostgresJobsQueue.swift index 6e00e63..88ddb26 100644 --- a/Sources/HummingbirdJobsPostgres/PostgresJobsQueue.swift +++ b/Sources/HummingbirdJobsPostgres/PostgresJobsQueue.swift @@ -149,10 +149,10 @@ public final class HBPostgresQueue: HBJobQueueDriver { let stream = try await connection.query( """ DELETE - FROM _hb_job_queue pse + FROM _hb_pg_job_queue pse WHERE pse.job_id = (SELECT pse_inner.job_id - FROM _hb_job_queue pse_inner + FROM _hb_pg_job_queue pse_inner ORDER BY pse_inner.createdAt ASC FOR UPDATE SKIP LOCKED LIMIT 1) @@ -166,7 +166,7 @@ public final class HBPostgresQueue: HBJobQueueDriver { } // select job from job table let stream2 = try await connection.query( - "SELECT job FROM _hb_jobs WHERE id = \(jobId)", + "SELECT job FROM _hb_pg_jobs WHERE id = \(jobId)", logger: self.logger ) @@ -192,7 +192,7 @@ public final class HBPostgresQueue: HBJobQueueDriver { func add(_ job: HBQueuedJob, connection: PostgresConnection) async throws { try await connection.query( """ - INSERT INTO _hb_jobs (id, job, status) + INSERT INTO _hb_pg_jobs (id, job, status) VALUES (\(job.id), \(job.jobBuffer), \(Status.pending)) """, logger: self.logger @@ -201,7 +201,7 @@ public final class HBPostgresQueue: HBJobQueueDriver { func delete(jobId: JobID, connection: PostgresConnection) async throws { try await connection.query( - "DELETE FROM _hb_jobs WHERE id = \(jobId)", + "DELETE FROM _hb_pg_jobs WHERE id = \(jobId)", logger: self.logger ) } @@ -209,7 +209,7 @@ public final class HBPostgresQueue: HBJobQueueDriver { func addToQueue(jobId: JobID, connection: PostgresConnection) async throws { try await connection.query( """ - INSERT INTO _hb_job_queue (job_id, createdAt) VALUES (\(jobId), \(Date.now)) + INSERT INTO _hb_pg_job_queue (job_id, createdAt) VALUES (\(jobId), \(Date.now)) """, logger: self.logger ) @@ -217,7 +217,7 @@ public final class HBPostgresQueue: HBJobQueueDriver { func setStatus(jobId: JobID, status: Status, connection: PostgresConnection) async throws { try await connection.query( - "UPDATE _hb_jobs SET status = \(status), lastModified = \(Date.now) WHERE id = \(jobId)", + "UPDATE _hb_pg_jobs SET status = \(status), lastModified = \(Date.now) WHERE id = \(jobId)", logger: self.logger ) } @@ -225,7 +225,7 @@ public final class HBPostgresQueue: HBJobQueueDriver { func getJobs(withStatus status: Status) async throws -> [JobID] { return try await self.client.withConnection { connection in let stream = try await connection.query( - "SELECT id FROM _hb_jobs WHERE status = \(status)", + "SELECT id FROM _hb_pg_jobs WHERE status = \(status)", logger: self.logger ) var jobs: [JobID] = [] @@ -240,7 +240,7 @@ public final class HBPostgresQueue: HBJobQueueDriver { switch onInit { case .remove: try await connection.query( - "DELETE FROM _hb_jobs WHERE status = \(status)", + "DELETE FROM _hb_pg_jobs WHERE status = \(status)", logger: self.logger ) case .rerun: diff --git a/Sources/HummingbirdPostgres/CreatePersistTable.swift b/Sources/HummingbirdPostgres/CreatePersistTable.swift index b7656b3..576639c 100644 --- a/Sources/HummingbirdPostgres/CreatePersistTable.swift +++ b/Sources/HummingbirdPostgres/CreatePersistTable.swift @@ -19,7 +19,7 @@ struct CreatePersistTable: HBPostgresMigration { func apply(connection: PostgresConnection, logger: Logger) async throws { try await connection.query( """ - CREATE TABLE IF NOT EXISTS _hb_persist ( + CREATE TABLE IF NOT EXISTS _hb_pg_persist ( "id" text PRIMARY KEY, "data" json NOT NULL, "expires" timestamp with time zone NOT NULL @@ -31,7 +31,7 @@ struct CreatePersistTable: HBPostgresMigration { func revert(connection: PostgresConnection, logger: Logger) async throws { try await connection.query( - "DROP TABLE _hb_persist", + "DROP TABLE _hb_pg_persist", logger: logger ) } @@ -42,5 +42,5 @@ struct CreatePersistTable: HBPostgresMigration { extension HBMigrationGroup { /// Persist driver migration group - public static var persist: Self { .init("_hb_persist") } + public static var persist: Self { .init("_hb_pg_persist") } } diff --git a/Sources/HummingbirdPostgres/PostgresPersistDriver.swift b/Sources/HummingbirdPostgres/PostgresPersistDriver.swift index 874440e..f379719 100644 --- a/Sources/HummingbirdPostgres/PostgresPersistDriver.swift +++ b/Sources/HummingbirdPostgres/PostgresPersistDriver.swift @@ -71,7 +71,7 @@ public final class HBPostgresPersistDriver: HBPersistDriver { try await self.client.withConnection { connection in do { try await connection.query( - "INSERT INTO _hb_persist (id, data, expires) VALUES (\(key), \(WrapperObject(value)), \(expires))", + "INSERT INTO _hb_pg_persist (id, data, expires) VALUES (\(key), \(WrapperObject(value)), \(expires))", logger: self.logger ) } catch let error as PSQLError { @@ -90,7 +90,7 @@ public final class HBPostgresPersistDriver: HBPersistDriver { _ = try await self.client.withConnection { connection in try await connection.query( """ - INSERT INTO _hb_persist (id, data, expires) VALUES (\(key), \(WrapperObject(value)), \(expires)) + INSERT INTO _hb_pg_persist (id, data, expires) VALUES (\(key), \(WrapperObject(value)), \(expires)) ON CONFLICT (id) DO UPDATE SET data = \(WrapperObject(value)), expires = \(expires) """, @@ -103,7 +103,7 @@ public final class HBPostgresPersistDriver: HBPersistDriver { public func get(key: String, as object: Object.Type) async throws -> Object? { try await self.client.withConnection { connection in let stream = try await connection.query( - "SELECT data, expires FROM _hb_persist WHERE id = \(key)", + "SELECT data, expires FROM _hb_pg_persist WHERE id = \(key)", logger: self.logger ) guard let result = try await stream.decode((WrapperObject, Date).self) @@ -120,7 +120,7 @@ public final class HBPostgresPersistDriver: HBPersistDriver { public func remove(key: String) async throws { _ = try await self.client.withConnection { connection in try await connection.query( - "DELETE FROM _hb_persist WHERE id = \(key)", + "DELETE FROM _hb_pg_persist WHERE id = \(key)", logger: self.logger ) } @@ -130,7 +130,7 @@ public final class HBPostgresPersistDriver: HBPersistDriver { func tidy() async throws { _ = try await self.client.withConnection { connection in try await connection.query( - "DELETE FROM _hb_persist WHERE expires < \(Date.now)", + "DELETE FROM _hb_pg_persist WHERE expires < \(Date.now)", logger: self.logger ) }