diff --git a/Sources/HummingbirdJobsPostgres/Migrations/CreateJobQueue.swift b/Sources/HummingbirdJobsPostgres/Migrations/CreateJobQueue.swift new file mode 100644 index 0000000..6353981 --- /dev/null +++ b/Sources/HummingbirdJobsPostgres/Migrations/CreateJobQueue.swift @@ -0,0 +1,48 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Hummingbird server framework project +// +// Copyright (c) 2024 the Hummingbird authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import HummingbirdPostgres +import Logging +@_spi(ConnectionPool) import PostgresNIO + +struct CreateJobQueue: HBPostgresMigration { + func apply(connection: PostgresConnection, logger: Logger) async throws { + try await connection.query( + """ + CREATE TABLE IF NOT EXISTS _hb_pg_job_queue ( + job_id uuid PRIMARY KEY, + createdAt timestamp with time zone + ) + """, + logger: logger + ) + try await connection.query( + """ + CREATE INDEX IF NOT EXISTS _hb_job_queueidx + ON _hb_pg_job_queue (createdAt ASC) + """, + logger: logger + ) + } + + func revert(connection: PostgresConnection, logger: Logger) async throws { + try await connection.query( + "DROP TABLE _hb_pg_job_queue", + logger: logger + ) + } + + var name: String { "_Create_JobQueue_Table_" } + var group: HBMigrationGroup { .jobQueue } +} diff --git a/Sources/HummingbirdJobsPostgres/Migrations/CreateJobs.swift b/Sources/HummingbirdJobsPostgres/Migrations/CreateJobs.swift new file mode 100644 index 0000000..bad67e0 --- /dev/null +++ b/Sources/HummingbirdJobsPostgres/Migrations/CreateJobs.swift @@ -0,0 +1,48 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Hummingbird server framework project +// +// Copyright (c) 2024 the Hummingbird authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import HummingbirdPostgres +import Logging +@_spi(ConnectionPool) import PostgresNIO + +struct CreateJobs: HBPostgresMigration { + func apply(connection: PostgresConnection, logger: Logger) async throws { + try await connection.query( + """ + CREATE TABLE IF NOT EXISTS _hb_pg_jobs ( + id uuid PRIMARY KEY, + job bytea, + status smallint, + lastModified TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP + ) + """, + logger: logger + ) + } + + func revert(connection: PostgresConnection, logger: Logger) async throws { + try await connection.query( + "DROP TABLE _hb_pg_jobs", + logger: logger + ) + } + + var name: String { "_Create_Jobs_Table_" } + var group: HBMigrationGroup { .jobQueue } +} + +extension HBMigrationGroup { + /// JobQueue migration group + public static var jobQueue: Self { .init("_hb_jobqueue") } +} diff --git a/Sources/HummingbirdJobsPostgres/PostgresJobsQueue.swift b/Sources/HummingbirdJobsPostgres/PostgresJobsQueue.swift index a38414e..88ddb26 100644 --- a/Sources/HummingbirdJobsPostgres/PostgresJobsQueue.swift +++ b/Sources/HummingbirdJobsPostgres/PostgresJobsQueue.swift @@ -1,3 +1,17 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Hummingbird server framework project +// +// Copyright (c) 2024 the Hummingbird authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + import Foundation import HummingbirdJobs @_spi(ConnectionPool) import HummingbirdPostgres @@ -38,23 +52,17 @@ public final class HBPostgresQueue: HBJobQueueDriver { /// Queue configuration public struct Configuration: Sendable { - let jobTable: String - let jobQueueTable: String let pendingJobsInitialization: JobInitialization let failedJobsInitialization: JobInitialization let processingJobsInitialization: JobInitialization let pollTime: Duration public init( - jobTable: String = "_hb_jobs", - jobQueueTable: String = "_hb_job_queue", - pendingJobsInitialization: HBPostgresQueue.JobInitialization = .doNothing, - failedJobsInitialization: HBPostgresQueue.JobInitialization = .rerun, - processingJobsInitialization: HBPostgresQueue.JobInitialization = .rerun, + pendingJobsInitialization: JobInitialization = .doNothing, + failedJobsInitialization: JobInitialization = .rerun, + processingJobsInitialization: JobInitialization = .rerun, pollTime: Duration = .milliseconds(100) ) { - self.jobTable = jobTable - self.jobQueueTable = jobQueueTable self.pendingJobsInitialization = pendingJobsInitialization self.failedJobsInitialization = failedJobsInitialization self.processingJobsInitialization = processingJobsInitialization @@ -68,50 +76,28 @@ public final class HBPostgresQueue: HBJobQueueDriver { public let configuration: Configuration /// Logger used by queue public let logger: Logger + + let migrations: HBPostgresMigrations let isStopped: NIOLockedValueBox /// Initialize a HBPostgresJobQueue - /// - Parameters: - /// - client: Postgres client - /// - configuration: Queue configuration - /// - logger: Logger used by queue - public init(client: PostgresClient, configuration: Configuration = .init(), logger: Logger) { + public init(client: PostgresClient, migrations: HBPostgresMigrations, configuration: Configuration = .init(), logger: Logger) async { self.client = client self.configuration = configuration self.logger = logger self.isStopped = .init(false) + self.migrations = migrations + await migrations.add(CreateJobs()) + await migrations.add(CreateJobQueue()) } /// Run on initialization of the job queue public func onInit() async throws { do { + self.logger.info("Waiting for JobQueue migrations") + try await self.migrations.waitUntilCompleted() _ = try await self.client.withConnection { connection in - try await connection.query( - """ - CREATE TABLE IF NOT EXISTS \(unescaped: self.configuration.jobTable) ( - id uuid PRIMARY KEY, - job bytea, - status smallint - ) - """, - logger: self.logger - ) - try await connection.query( - """ - CREATE TABLE IF NOT EXISTS \(unescaped: self.configuration.jobQueueTable) ( - job_id uuid PRIMARY KEY, - createdAt timestamp with time zone - ) - """, - logger: self.logger - ) - try await connection.query( - """ - CREATE INDEX IF NOT EXISTS \(unescaped: self.configuration.jobQueueTable)idx - ON \(unescaped: self.configuration.jobQueueTable) (createdAt ASC) - """, - logger: self.logger - ) + self.logger.info("Update Jobs at initialization") try await self.updateJobsOnInit(withStatus: .pending, onInit: self.configuration.pendingJobsInitialization, connection: connection) try await self.updateJobsOnInit(withStatus: .processing, onInit: self.configuration.processingJobsInitialization, connection: connection) try await self.updateJobsOnInit(withStatus: .failed, onInit: self.configuration.failedJobsInitialization, connection: connection) @@ -163,10 +149,10 @@ public final class HBPostgresQueue: HBJobQueueDriver { let stream = try await connection.query( """ DELETE - FROM \(unescaped: self.configuration.jobQueueTable) pse + FROM _hb_pg_job_queue pse WHERE pse.job_id = (SELECT pse_inner.job_id - FROM \(unescaped: self.configuration.jobQueueTable) pse_inner + FROM _hb_pg_job_queue pse_inner ORDER BY pse_inner.createdAt ASC FOR UPDATE SKIP LOCKED LIMIT 1) @@ -180,7 +166,7 @@ public final class HBPostgresQueue: HBJobQueueDriver { } // select job from job table let stream2 = try await connection.query( - "SELECT job FROM \(unescaped: self.configuration.jobTable) WHERE id = \(jobId)", + "SELECT job FROM _hb_pg_jobs WHERE id = \(jobId)", logger: self.logger ) @@ -206,7 +192,7 @@ public final class HBPostgresQueue: HBJobQueueDriver { func add(_ job: HBQueuedJob, connection: PostgresConnection) async throws { try await connection.query( """ - INSERT INTO \(unescaped: self.configuration.jobTable) (id, job, status) + INSERT INTO _hb_pg_jobs (id, job, status) VALUES (\(job.id), \(job.jobBuffer), \(Status.pending)) """, logger: self.logger @@ -215,7 +201,7 @@ public final class HBPostgresQueue: HBJobQueueDriver { func delete(jobId: JobID, connection: PostgresConnection) async throws { try await connection.query( - "DELETE FROM \(unescaped: self.configuration.jobTable) WHERE id = \(jobId)", + "DELETE FROM _hb_pg_jobs WHERE id = \(jobId)", logger: self.logger ) } @@ -223,7 +209,7 @@ public final class HBPostgresQueue: HBJobQueueDriver { func addToQueue(jobId: JobID, connection: PostgresConnection) async throws { try await connection.query( """ - INSERT INTO \(unescaped: self.configuration.jobQueueTable) (job_id, createdAt) VALUES (\(jobId), \(Date.now)) + INSERT INTO _hb_pg_job_queue (job_id, createdAt) VALUES (\(jobId), \(Date.now)) """, logger: self.logger ) @@ -231,7 +217,7 @@ public final class HBPostgresQueue: HBJobQueueDriver { func setStatus(jobId: JobID, status: Status, connection: PostgresConnection) async throws { try await connection.query( - "UPDATE \(unescaped: self.configuration.jobTable) SET status = \(status) WHERE id = \(jobId)", + "UPDATE _hb_pg_jobs SET status = \(status), lastModified = \(Date.now) WHERE id = \(jobId)", logger: self.logger ) } @@ -239,7 +225,7 @@ public final class HBPostgresQueue: HBJobQueueDriver { func getJobs(withStatus status: Status) async throws -> [JobID] { return try await self.client.withConnection { connection in let stream = try await connection.query( - "SELECT id FROM \(unescaped: self.configuration.jobTable) WHERE status = \(status)", + "SELECT id FROM _hb_pg_jobs WHERE status = \(status)", logger: self.logger ) var jobs: [JobID] = [] @@ -254,12 +240,13 @@ public final class HBPostgresQueue: HBJobQueueDriver { switch onInit { case .remove: try await connection.query( - "DELETE FROM \(unescaped: self.configuration.jobTable) WHERE status = \(status)", + "DELETE FROM _hb_pg_jobs WHERE status = \(status)", logger: self.logger ) case .rerun: guard status != .pending else { return } let jobs = try await getJobs(withStatus: status) + self.logger.info("Moving \(jobs.count) jobs with status: \(status) to job queue") for jobId in jobs { try await self.addToQueue(jobId: jobId, connection: connection) } @@ -300,7 +287,7 @@ extension HBJobQueueDriver where Self == HBPostgresQueue { /// - client: Postgres client /// - configuration: Queue configuration /// - logger: Logger used by queue - public static func postgres(client: PostgresClient, configuration: HBPostgresQueue.Configuration = .init(), logger: Logger) -> Self { - .init(client: client, configuration: configuration, logger: logger) + public static func postgres(client: PostgresClient, migrations: HBPostgresMigrations, configuration: HBPostgresQueue.Configuration = .init(), logger: Logger) async -> Self { + await Self(client: client, migrations: migrations, configuration: configuration, logger: logger) } } diff --git a/Sources/HummingbirdPostgres/CreatePersistTable.swift b/Sources/HummingbirdPostgres/CreatePersistTable.swift new file mode 100644 index 0000000..576639c --- /dev/null +++ b/Sources/HummingbirdPostgres/CreatePersistTable.swift @@ -0,0 +1,46 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Hummingbird server framework project +// +// Copyright (c) 2024 the Hummingbird authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Logging +@_spi(ConnectionPool) import PostgresNIO + +struct CreatePersistTable: HBPostgresMigration { + func apply(connection: PostgresConnection, logger: Logger) async throws { + try await connection.query( + """ + CREATE TABLE IF NOT EXISTS _hb_pg_persist ( + "id" text PRIMARY KEY, + "data" json NOT NULL, + "expires" timestamp with time zone NOT NULL + ) + """, + logger: logger + ) + } + + func revert(connection: PostgresConnection, logger: Logger) async throws { + try await connection.query( + "DROP TABLE _hb_pg_persist", + logger: logger + ) + } + + var name: String { "_Create_Persist_Table_" } + var group: HBMigrationGroup { .persist } +} + +extension HBMigrationGroup { + /// Persist driver migration group + public static var persist: Self { .init("_hb_pg_persist") } +} diff --git a/Sources/HummingbirdPostgres/Migrations.swift b/Sources/HummingbirdPostgres/Migrations.swift index a294785..e177a17 100644 --- a/Sources/HummingbirdPostgres/Migrations.swift +++ b/Sources/HummingbirdPostgres/Migrations.swift @@ -65,7 +65,7 @@ public actor HBPostgresMigrations { /// - dryRun: Should migrations actually be applied, or should we just report what would be applied and reverted @_spi(ConnectionPool) public func apply(client: PostgresClient, groups: [HBMigrationGroup] = [], logger: Logger, dryRun: Bool) async throws { - try await self.migrate(client: client, migrations: self.migrations, groups: groups, logger: logger, dryRun: dryRun) + try await self.migrate(client: client, migrations: self.migrations, groups: groups, logger: logger, completeMigrations: true, dryRun: dryRun) } /// Revery database migrations @@ -75,7 +75,7 @@ public actor HBPostgresMigrations { /// - dryRun: Should migrations actually be reverted, or should we just report what would be reverted @_spi(ConnectionPool) public func revert(client: PostgresClient, groups: [HBMigrationGroup] = [], logger: Logger, dryRun: Bool) async throws { - try await self.migrate(client: client, migrations: [], groups: groups, logger: logger, dryRun: dryRun) + try await self.migrate(client: client, migrations: [], groups: groups, logger: logger, completeMigrations: false, dryRun: dryRun) } private func migrate( @@ -83,6 +83,7 @@ public actor HBPostgresMigrations { migrations: [HBPostgresMigration], groups: [HBMigrationGroup], logger: Logger, + completeMigrations: Bool, dryRun: Bool ) async throws { switch self.state { @@ -151,7 +152,9 @@ public actor HBPostgresMigrations { self.setFailed(error) throw error } - self.setCompleted() + if completeMigrations { + self.setCompleted() + } } /// Report if the migration process has completed diff --git a/Sources/HummingbirdPostgres/PostgresPersistDriver.swift b/Sources/HummingbirdPostgres/PostgresPersistDriver.swift index 4e0b8f7..f379719 100644 --- a/Sources/HummingbirdPostgres/PostgresPersistDriver.swift +++ b/Sources/HummingbirdPostgres/PostgresPersistDriver.swift @@ -50,16 +50,19 @@ public final class HBPostgresPersistDriver: HBPersistDriver { let client: PostgresClient let logger: Logger let tidyUpFrequency: Duration + let migrations: HBPostgresMigrations /// Initialize HBFluentPersistDriver /// - Parameters: /// - client: Postgres client /// - tidyUpFrequequency: How frequently cleanup expired database entries should occur @_spi(ConnectionPool) - public init(client: PostgresClient, tidyUpFrequency: Duration = .seconds(600), logger: Logger) { + public init(client: PostgresClient, migrations: HBPostgresMigrations, tidyUpFrequency: Duration = .seconds(600), logger: Logger) async { self.client = client self.logger = logger self.tidyUpFrequency = tidyUpFrequency + self.migrations = migrations + await migrations.add(CreatePersistTable()) } /// Create new key. This doesn't check for the existence of this key already so may fail if the key already exists @@ -68,7 +71,7 @@ public final class HBPostgresPersistDriver: HBPersistDriver { try await self.client.withConnection { connection in do { try await connection.query( - "INSERT INTO _hb_psql_persist (id, data, expires) VALUES (\(key), \(WrapperObject(value)), \(expires))", + "INSERT INTO _hb_pg_persist (id, data, expires) VALUES (\(key), \(WrapperObject(value)), \(expires))", logger: self.logger ) } catch let error as PSQLError { @@ -87,7 +90,7 @@ public final class HBPostgresPersistDriver: HBPersistDriver { _ = try await self.client.withConnection { connection in try await connection.query( """ - INSERT INTO _hb_psql_persist (id, data, expires) VALUES (\(key), \(WrapperObject(value)), \(expires)) + INSERT INTO _hb_pg_persist (id, data, expires) VALUES (\(key), \(WrapperObject(value)), \(expires)) ON CONFLICT (id) DO UPDATE SET data = \(WrapperObject(value)), expires = \(expires) """, @@ -100,7 +103,7 @@ public final class HBPostgresPersistDriver: HBPersistDriver { public func get(key: String, as object: Object.Type) async throws -> Object? { try await self.client.withConnection { connection in let stream = try await connection.query( - "SELECT data, expires FROM _hb_psql_persist WHERE id = \(key)", + "SELECT data, expires FROM _hb_pg_persist WHERE id = \(key)", logger: self.logger ) guard let result = try await stream.decode((WrapperObject, Date).self) @@ -117,7 +120,7 @@ public final class HBPostgresPersistDriver: HBPersistDriver { public func remove(key: String) async throws { _ = try await self.client.withConnection { connection in try await connection.query( - "DELETE FROM _hb_psql_persist WHERE id = \(key)", + "DELETE FROM _hb_pg_persist WHERE id = \(key)", logger: self.logger ) } @@ -127,7 +130,7 @@ public final class HBPostgresPersistDriver: HBPersistDriver { func tidy() async throws { _ = try await self.client.withConnection { connection in try await connection.query( - "DELETE FROM _hb_psql_persist WHERE expires < \(Date.now)", + "DELETE FROM _hb_pg_persist WHERE expires < \(Date.now)", logger: self.logger ) } @@ -137,20 +140,11 @@ public final class HBPostgresPersistDriver: HBPersistDriver { /// Service protocol requirements extension HBPostgresPersistDriver { public func run() async throws { - // create table to save persist models - _ = try await self.client.withConnection { connection in - try await connection.query( - """ - CREATE TABLE IF NOT EXISTS _hb_psql_persist ( - "id" text PRIMARY KEY, - "data" json NOT NULL, - "expires" timestamp with time zone NOT NULL - ) - """, - logger: self.logger - ) - } + self.logger.info("Waiting for persist driver migrations to complete") + try await self.migrations.waitUntilCompleted() + // do an initial tidy to clear out expired values + self.logger.info("Tidy persist database") try await self.tidy() let timerSequence = AsyncTimerSequence( diff --git a/Tests/HummingbirdPostgresTests/JobsTests.swift b/Tests/HummingbirdPostgresTests/JobsTests.swift index 2a66e81..c72c288 100644 --- a/Tests/HummingbirdPostgresTests/JobsTests.swift +++ b/Tests/HummingbirdPostgresTests/JobsTests.swift @@ -15,6 +15,7 @@ import Atomics import Hummingbird import HummingbirdJobs +@testable @_spi(ConnectionPool) import HummingbirdPostgres @testable @_spi(ConnectionPool) import HummingbirdJobsPostgres import HummingbirdXCT import NIOConcurrencyHelpers @@ -40,9 +41,9 @@ final class JobsTests: XCTestCase { static let env = HBEnvironment() - func createJobQueue(numWorkers: Int, configuration: HBPostgresQueue.Configuration) async throws -> HBJobQueue { + func createJobQueue(numWorkers: Int, configuration: HBPostgresQueue.Configuration, function: String = #function) async throws -> HBJobQueue { let logger = { - var logger = Logger(label: "JobsTests") + var logger = Logger(label: function) logger.logLevel = .debug return logger }() @@ -50,9 +51,11 @@ final class JobsTests: XCTestCase { configuration: getPostgresConfiguration(), backgroundLogger: logger ) - return HBJobQueue( + let postgresMigrations = HBPostgresMigrations() + return await HBJobQueue( HBPostgresQueue( client: postgresClient, + migrations: postgresMigrations, configuration: configuration, logger: logger ), @@ -67,6 +70,7 @@ final class JobsTests: XCTestCase { /// shutdown correctly @discardableResult public func testJobQueue( jobQueue: HBJobQueue, + revertMigrations: Bool = false, test: (HBJobQueue) async throws -> T ) async throws -> T { do { @@ -81,8 +85,14 @@ final class JobsTests: XCTestCase { group.addTask { try await serviceGroup.run() } - try await Task.sleep(for: .seconds(1)) do { + let migrations = jobQueue.queue.migrations + let client = jobQueue.queue.client + let logger = jobQueue.queue.logger + if revertMigrations { + try await migrations.revert(client: client, groups: [.jobQueue], logger: logger, dryRun: false) + } + try await migrations.apply(client: client, groups: [.jobQueue], logger: logger, dryRun: false) let value = try await test(jobQueue) await serviceGroup.triggerGracefulShutdown() return value @@ -108,10 +118,12 @@ final class JobsTests: XCTestCase { @discardableResult public func testJobQueue( numWorkers: Int, configuration: HBPostgresQueue.Configuration = .init(failedJobsInitialization: .remove, processingJobsInitialization: .remove), + revertMigrations: Bool = true, + function: String = #function, test: (HBJobQueue) async throws -> T ) async throws -> T { - let jobQueue = try await self.createJobQueue(numWorkers: numWorkers, configuration: configuration) - return try await self.testJobQueue(jobQueue: jobQueue, test: test) + let jobQueue = try await self.createJobQueue(numWorkers: numWorkers, configuration: configuration, function: function) + return try await self.testJobQueue(jobQueue: jobQueue, revertMigrations: revertMigrations, test: test) } func testBasic() async throws { @@ -218,8 +230,7 @@ final class JobsTests: XCTestCase { func testShutdownJob() async throws { let jobIdentifer = HBJobIdentifier(#function) let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 1) - var logger = Logger(label: "HummingbirdJobsTests") - logger.logLevel = .trace + try await self.testJobQueue(numWorkers: 4) { jobQueue in jobQueue.registerJob(jobIdentifer) { _, _ in expectation.fulfill() @@ -228,8 +239,8 @@ final class JobsTests: XCTestCase { try await jobQueue.push(id: jobIdentifer, parameters: 0) await self.wait(for: [expectation], timeout: 5) - let failedJobs = try await jobQueue.queue.getJobs(withStatus: .processing) - XCTAssertEqual(failedJobs.count, 1) + let processingJobs = try await jobQueue.queue.getJobs(withStatus: .processing) + XCTAssertEqual(processingJobs.count, 1) let pendingJobs = try await jobQueue.queue.getJobs(withStatus: .pending) XCTAssertEqual(pendingJobs.count, 0) return jobQueue @@ -276,7 +287,7 @@ final class JobsTests: XCTestCase { } let jobQueue = try await createJobQueue(numWorkers: 1, configuration: .init(pendingJobsInitialization: .remove, failedJobsInitialization: .rerun)) jobQueue.registerJob(job) - try await self.testJobQueue(jobQueue: jobQueue) { jobQueue in + try await self.testJobQueue(jobQueue: jobQueue, revertMigrations: true) { jobQueue in try await jobQueue.push(id: jobIdentifer, parameters: 0) @@ -301,7 +312,7 @@ final class JobsTests: XCTestCase { let jobIdentifer = HBJobIdentifier(#function) let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 200) let logger = { - var logger = Logger(label: "HummingbirdJobsTests") + var logger = Logger(label: "testMultipleJobQueueHandlers") logger.logLevel = .debug return logger }() @@ -314,14 +325,23 @@ final class JobsTests: XCTestCase { configuration: getPostgresConfiguration(), backgroundLogger: logger ) - let jobQueue = HBJobQueue( - .postgres(client: postgresClient, logger: logger), + let postgresMigrations = HBPostgresMigrations() + let jobQueue = await HBJobQueue( + .postgres( + client: postgresClient, + migrations: postgresMigrations, + configuration: .init(failedJobsInitialization: .remove, processingJobsInitialization: .remove), + logger: logger + ), numWorkers: 2, logger: logger ) - let jobQueue2 = HBJobQueue( + let postgresMigrations2 = HBPostgresMigrations() + let jobQueue2 = await HBJobQueue( HBPostgresQueue( client: postgresClient, + migrations: postgresMigrations2, + configuration: .init(failedJobsInitialization: .remove, processingJobsInitialization: .remove), logger: logger ), numWorkers: 2, @@ -341,6 +361,8 @@ final class JobsTests: XCTestCase { group.addTask { try await serviceGroup.run() } + try await postgresMigrations.apply(client: postgresClient, groups: [.jobQueue], logger: logger, dryRun: false) + try await postgresMigrations2.apply(client: postgresClient, groups: [.jobQueue], logger: logger, dryRun: false) do { for i in 0..<200 { try await jobQueue.push(id: jobIdentifer, parameters: i) diff --git a/Tests/HummingbirdPostgresTests/PersistTests.swift b/Tests/HummingbirdPostgresTests/PersistTests.swift index dc85399..39207f9 100644 --- a/Tests/HummingbirdPostgresTests/PersistTests.swift +++ b/Tests/HummingbirdPostgresTests/PersistTests.swift @@ -34,13 +34,17 @@ final class PersistTests: XCTestCase { } } } - var logger = Logger(label: "PersistTests") - logger.logLevel = .debug + let logger = { + var logger = Logger(label: "PersistTests") + logger.logLevel = .debug + return logger + }() let postgresClient = try await PostgresClient( configuration: getPostgresConfiguration(), backgroundLogger: logger ) - let persist = HBPostgresPersistDriver(client: postgresClient, logger: logger) + let postgresMigrations = HBPostgresMigrations() + let persist = await HBPostgresPersistDriver(client: postgresClient, migrations: postgresMigrations, logger: logger) let router = HBRouter() router.middlewares.add(PostgresErrorMiddleware()) router.put("/persist/:tag") { request, context -> HTTPResponse.Status in @@ -69,8 +73,7 @@ final class PersistTests: XCTestCase { var app = HBApplication(responder: router.buildResponder()) app.addServices(PostgresClientService(client: postgresClient), persist) app.runBeforeServerStart { - // temporary fix to ensure persist table is created before we use it - try await Task.sleep(for: .milliseconds(400)) + try await postgresMigrations.apply(client: postgresClient, groups: [.persist], logger: logger, dryRun: false) } return app