From da147fd1b00cca30515fc8ce39f4380279c8c194 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Mon, 26 Feb 2024 12:46:36 +0000 Subject: [PATCH 01/13] Register jobs with an id and a closure --- Sources/HummingbirdJobs/Job.swift | 95 ++----- .../{Exports.swift => JobContext.swift} | 8 +- Sources/HummingbirdJobs/JobIdentifier.swift | 29 ++ Sources/HummingbirdJobs/JobQueue.swift | 2 +- Sources/HummingbirdJobs/JobQueueHandler.swift | 6 +- Sources/HummingbirdJobs/JobRegister.swift | 115 ++++++++ Sources/HummingbirdJobs/MemoryJobQueue.swift | 29 +- Sources/HummingbirdJobs/QueuedJob.swift | 30 +-- .../HummingbirdJobsTests.swift | 255 ++++++++++-------- 9 files changed, 350 insertions(+), 219 deletions(-) rename Sources/HummingbirdJobs/{Exports.swift => JobContext.swift} (77%) create mode 100644 Sources/HummingbirdJobs/JobIdentifier.swift create mode 100644 Sources/HummingbirdJobs/JobRegister.swift diff --git a/Sources/HummingbirdJobs/Job.swift b/Sources/HummingbirdJobs/Job.swift index d36d3ff2c..5a794140b 100644 --- a/Sources/HummingbirdJobs/Job.swift +++ b/Sources/HummingbirdJobs/Job.swift @@ -2,7 +2,7 @@ // // This source file is part of the Hummingbird server framework project // -// Copyright (c) 2021-2023 the Hummingbird authors +// Copyright (c) 2024 the Hummingbird authors // Licensed under Apache License v2.0 // // See LICENSE.txt for license information @@ -12,83 +12,38 @@ // //===----------------------------------------------------------------------===// -import Foundation -import Logging -import NIOConcurrencyHelpers -import NIOCore - -/// Protocol for job description -/// -/// For a job to be decodable, it has to be registered. Call `MyJob.register()` to register a job. -public protocol HBJob: Codable, Sendable { - /// Unique Job name - static var name: String { get } - - /// Maximum times this job should be retried if it fails - static var maxRetryCount: Int { get } - - /// Execute job - /// - Returns: EventLoopFuture that is fulfulled when job is done - func execute(logger: Logger) async throws +/// Protocol for a Job +public protocol HBJob: Sendable { + /// Parameters job requries + associatedtype Parameters: Codable & Sendable + /// Job Type identifier + var id: HBJobIdentifier { get } + /// Maximum number of times a job will be retried before being classed as failed + var maxRetryCount: Int { get } + /// Function to execute the job + func execute(context: HBJobContext) async throws } extension HBJob { - /// maximum times this job should be retried - public static var maxRetryCount: Int { return 0 } - - /// register job - public static func register() { - HBJobRegister.register(job: Self.self) - } -} - -/// Register Jobs, for decoding and encoding -enum HBJobRegister { - static func decode(from decoder: Decoder) throws -> HBJob { - let container = try decoder.container(keyedBy: _HBJobCodingKey.self) - let key = container.allKeys.first! - let childDecoder = try container.superDecoder(forKey: key) - let jobType = try HBJobRegister.nameTypeMap.withLockedValue { - guard let job = $0[key.stringValue] else { throw JobQueueError.decodeJobFailed } - return job - } - return try jobType.init(from: childDecoder) - } - - static func encode(job: HBJob, to encoder: Encoder) throws { - var container = encoder.container(keyedBy: _HBJobCodingKey.self) - let childEncoder = container.superEncoder(forKey: .init(stringValue: type(of: job).name, intValue: nil)) - try job.encode(to: childEncoder) - } - - static func register(job: HBJob.Type) { - self.nameTypeMap.withLockedValue { $0[job.name] = job } + /// name of job type + public var name: String { + id.name } - - static let nameTypeMap: NIOLockedValueBox<[String: HBJob.Type]> = .init([:]) } -internal struct _HBJobCodingKey: CodingKey { - public var stringValue: String - public var intValue: Int? +/// Type used internally by job queue implementations to encode a job request +public struct _HBJobRequest: Encodable, Sendable { + let id: HBJobIdentifier + let parameters: Parameters - public init?(stringValue: String) { - self.stringValue = stringValue - self.intValue = nil + public init(id: HBJobIdentifier, parameters: Parameters) { + self.id = id + self.parameters = parameters } - public init?(intValue: Int) { - self.stringValue = "\(intValue)" - self.intValue = intValue - } - - public init(stringValue: String, intValue: Int?) { - self.stringValue = stringValue - self.intValue = intValue - } - - internal init(index: Int) { - self.stringValue = "Index \(index)" - self.intValue = index + public func encode(to encoder: Encoder) throws { + var container = encoder.container(keyedBy: _HBJobCodingKey.self) + let childEncoder = container.superEncoder(forKey: .init(stringValue: self.id.name, intValue: nil)) + try self.parameters.encode(to: childEncoder) } } diff --git a/Sources/HummingbirdJobs/Exports.swift b/Sources/HummingbirdJobs/JobContext.swift similarity index 77% rename from Sources/HummingbirdJobs/Exports.swift rename to Sources/HummingbirdJobs/JobContext.swift index e0b216ce1..a49b807c3 100644 --- a/Sources/HummingbirdJobs/Exports.swift +++ b/Sources/HummingbirdJobs/JobContext.swift @@ -2,7 +2,7 @@ // // This source file is part of the Hummingbird server framework project // -// Copyright (c) 2021-2021 the Hummingbird authors +// Copyright (c) 2024 the Hummingbird authors // Licensed under Apache License v2.0 // // See LICENSE.txt for license information @@ -12,4 +12,8 @@ // //===----------------------------------------------------------------------===// -@_exported @_documentation(visibility: internal) import struct Logging.Logger +import Logging + +public struct HBJobContext { + public let logger: Logger +} diff --git a/Sources/HummingbirdJobs/JobIdentifier.swift b/Sources/HummingbirdJobs/JobIdentifier.swift new file mode 100644 index 000000000..1b9d5cdae --- /dev/null +++ b/Sources/HummingbirdJobs/JobIdentifier.swift @@ -0,0 +1,29 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Hummingbird server framework project +// +// Copyright (c) 2024 the Hummingbird authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +/// Identifier for a Job type +/// +/// The identifier includes the type of the parameters required by the job to ensure +/// the wrong parameters are not passed to this job +/// +/// Extend this type to include your own job identifiers +/// ``` +/// extension HBJobIdentifier { +/// static var myJob: Self { .init("my-job") } +/// } +/// ``` +public struct HBJobIdentifier: Sendable, Hashable { + let name: String + public init(_ name: String, parameters: Parameters.Type = Parameters.self) { self.name = name } +} diff --git a/Sources/HummingbirdJobs/JobQueue.swift b/Sources/HummingbirdJobs/JobQueue.swift index 9237e5ce5..46fd22ab9 100644 --- a/Sources/HummingbirdJobs/JobQueue.swift +++ b/Sources/HummingbirdJobs/JobQueue.swift @@ -26,7 +26,7 @@ public protocol HBJobQueue: AsyncSequence, Sendable where Element == HBQueuedJob func onInit() async throws /// Push Job onto queue /// - Returns: Identifier of queued job - @discardableResult func push(_ job: HBJob) async throws -> JobID + @discardableResult func push(id: HBJobIdentifier, parameters: Parameters) async throws -> JobID /// This is called to say job has finished processing and it can be deleted func finished(jobId: JobID) async throws /// This is called to say job has failed to run and should be put aside diff --git a/Sources/HummingbirdJobs/JobQueueHandler.swift b/Sources/HummingbirdJobs/JobQueueHandler.swift index 0af6e43ba..67cc6edfe 100644 --- a/Sources/HummingbirdJobs/JobQueueHandler.swift +++ b/Sources/HummingbirdJobs/JobQueueHandler.swift @@ -68,16 +68,16 @@ public final class HBJobQueueHandler: Service { func runJob(_ queuedJob: HBQueuedJob) async { var logger = logger logger[metadataKey: "hb_job_id"] = .stringConvertible(queuedJob.id) - logger[metadataKey: "hb_job_type"] = .string(String(describing: type(of: queuedJob.job))) + logger[metadataKey: "hb_job_type"] = .string(queuedJob.job.name) let job = queuedJob.job - var count = type(of: job).maxRetryCount + var count = job.maxRetryCount logger.debug("Starting Job") do { while true { do { - try await job.execute(logger: self.logger) + try await job.execute(context: .init(logger: logger)) break } catch let error as CancellationError { logger.debug("Job cancelled") diff --git a/Sources/HummingbirdJobs/JobRegister.swift b/Sources/HummingbirdJobs/JobRegister.swift new file mode 100644 index 000000000..9d5ac3e37 --- /dev/null +++ b/Sources/HummingbirdJobs/JobRegister.swift @@ -0,0 +1,115 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Hummingbird server framework project +// +// Copyright (c) 2024 the Hummingbird authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import NIOConcurrencyHelpers + +/// Registry for job types +public enum HBJobRegister { + /// Register job + /// - Parameters: + /// - id: Job Identifier + /// - maxRetryCount: Maximum number of times job is retried before being flagged as failed + /// - execute: Job code + public static func registerJob( + _ id: HBJobIdentifier, + maxRetryCount: Int = 0, + execute: @escaping @Sendable ( + Parameters, + HBJobContext + ) async throws -> Void + ) { + let definition = HBJobInstance.Definition(id: id, maxRetryCount: maxRetryCount, execute: execute) + let builder = { (decoder: Decoder) in + let parameters = try Parameters(from: decoder) + return try HBJobInstance(job: definition, parameters: parameters) + } + self.idTypeMap.withLockedValue { $0[id.name] = builder } + } + + static func decode(from decoder: Decoder) throws -> any HBJob { + let container = try decoder.container(keyedBy: _HBJobCodingKey.self) + let key = container.allKeys.first! + let childDecoder = try container.superDecoder(forKey: key) + let jobDefinitionBuilder = try Self.idTypeMap.withLockedValue { + guard let job = $0[key.stringValue] else { throw JobQueueError.decodeJobFailed } + return job + } + return try jobDefinitionBuilder(childDecoder) + } + + static let idTypeMap: NIOLockedValueBox < [String: (Decoder) throws -> any HBJob]> = .init([:]) +} + +/// Internal job instance type +struct HBJobInstance: HBJob { + /// Job definition type + struct Definition { + let id: HBJobIdentifier + let maxRetryCount: Int + let _execute: @Sendable (Parameters, HBJobContext) async throws -> Void + + init(id: HBJobIdentifier, maxRetryCount: Int, execute: @escaping @Sendable (Parameters, HBJobContext) async throws -> Void) { + self.id = id + self.maxRetryCount = maxRetryCount + self._execute = execute + } + + public func execute(_ parameters: Parameters, context: HBJobContext) async throws { + try await self._execute(parameters, context) + } + } + + /// job definition + let job: Definition + /// job parameters + let parameters: Parameters + + /// get i + var id: HBJobIdentifier { self.job.id } + var maxRetryCount: Int { self.job.maxRetryCount } + + func execute(context: HBJobContext) async throws { + try await self.job.execute(self.parameters, context: context) + } + + init(job: Definition, parameters: Parameters) throws { + self.job = job + self.parameters = parameters + } +} + +internal struct _HBJobCodingKey: CodingKey { + public var stringValue: String + public var intValue: Int? + + public init?(stringValue: String) { + self.stringValue = stringValue + self.intValue = nil + } + + public init?(intValue: Int) { + self.stringValue = "\(intValue)" + self.intValue = intValue + } + + public init(stringValue: String, intValue: Int?) { + self.stringValue = stringValue + self.intValue = intValue + } + + internal init(index: Int) { + self.stringValue = "Index \(index)" + self.intValue = index + } +} diff --git a/Sources/HummingbirdJobs/MemoryJobQueue.swift b/Sources/HummingbirdJobs/MemoryJobQueue.swift index b565ce8b5..0a17ee482 100644 --- a/Sources/HummingbirdJobs/MemoryJobQueue.swift +++ b/Sources/HummingbirdJobs/MemoryJobQueue.swift @@ -45,8 +45,9 @@ public final class HBMemoryJobQueue: HBJobQueue { /// - job: Job /// - eventLoop: Eventloop to run process on (ignored in this case) /// - Returns: Queued job - @discardableResult public func push(_ job: HBJob) async throws -> JobID { - try await self.queue.push(job) + @discardableResult public func push(id: HBJobIdentifier, parameters: Parameters) async throws -> JobID { + let job = _HBJobRequest(id: id, parameters: parameters) + return try await self.queue.push(job) } public func finished(jobId: JobID) async throws { @@ -61,8 +62,8 @@ public final class HBMemoryJobQueue: HBJobQueue { /// Internal actor managing the job queue fileprivate actor Internal { - var queue: Deque - var pendingJobs: [JobID: HBJob] + var queue: Deque<(JobID, Data)> + var pendingJobs: [JobID: any HBJob] var isStopped: Bool init() { @@ -71,18 +72,18 @@ public final class HBMemoryJobQueue: HBJobQueue { self.pendingJobs = .init() } - func push(_ job: HBJob) throws -> JobID { - let queuedJob = HBQueuedJob(id: JobID(), job: job) - let jsonData = try JSONEncoder().encode(queuedJob) - self.queue.append(jsonData) - return queuedJob.id + func push(_ jobRequest: _HBJobRequest) throws -> JobID { + let id = JobID() + let request = try (id, JSONEncoder().encode(jobRequest)) + self.queue.append(request) + return id } func clearPendingJob(jobId: JobID) { self.pendingJobs[jobId] = nil } - func clearAndReturnPendingJob(jobId: JobID) -> HBJob? { + func clearAndReturnPendingJob(jobId: JobID) -> (any HBJob)? { let instance = self.pendingJobs[jobId] self.pendingJobs[jobId] = nil return instance @@ -93,11 +94,11 @@ public final class HBMemoryJobQueue: HBJobQueue { if self.isStopped { return nil } - if let data = queue.popFirst() { + if let request = queue.popFirst() { do { - let job = try JSONDecoder().decode(HBQueuedJob.self, from: data) - self.pendingJobs[job.id] = job.job - return job + let job = try JSONDecoder().decode(HBAnyCodableJob.self, from: request.1) + self.pendingJobs[request.0] = job.job + return HBQueuedJob(id: request.0, job: job.job) } catch { throw JobQueueError.decodeJobFailed } diff --git a/Sources/HummingbirdJobs/QueuedJob.swift b/Sources/HummingbirdJobs/QueuedJob.swift index 097670d0e..b3ef0f996 100644 --- a/Sources/HummingbirdJobs/QueuedJob.swift +++ b/Sources/HummingbirdJobs/QueuedJob.swift @@ -15,25 +15,17 @@ import Foundation /// Add codable support for decoding/encoding any HBJob -public struct HBAnyCodableJob: Codable, Sendable { +public struct HBAnyCodableJob: Decodable, Sendable { /// Job data - public let job: HBJob + public let job: any HBJob /// Initialize a queue job - public init(_ job: HBJob) { + public init(_ job: any HBJob) { self.job = job } public init(from decoder: Decoder) throws { - let container = try decoder.container(keyedBy: CodingKeys.self) - let jobDecoder = try container.superDecoder(forKey: .job) - self.job = try HBJobRegister.decode(from: jobDecoder) - } - - public func encode(to encoder: Encoder) throws { - var container = encoder.container(keyedBy: CodingKeys.self) - let jobEncoder = container.superEncoder(forKey: .job) - try HBJobRegister.encode(job: self.job, to: jobEncoder) + self.job = try HBJobRegister.decode(from: decoder) } private enum CodingKeys: String, CodingKey { @@ -43,20 +35,14 @@ public struct HBAnyCodableJob: Codable, Sendable { /// Queued job. Includes job, plus the id for the job public struct HBQueuedJob: Sendable { - /// Job id + /// Job instance id public let id: JobID /// Job data - private let _job: HBAnyCodableJob - /// Job data - public var job: HBJob { self._job.job } - /// Job data in a codable form - public var anyCodableJob: HBAnyCodableJob { self._job } + public let job: any HBJob /// Initialize a queue job - public init(id: JobID, job: HBJob) { - self._job = .init(job) + public init(id: JobID, job: any HBJob) { + self.job = job self.id = id } } - -extension HBQueuedJob: Codable where JobID: Codable {} diff --git a/Tests/HummingbirdJobsTests/HummingbirdJobsTests.swift b/Tests/HummingbirdJobsTests/HummingbirdJobsTests.swift index 2811c9819..2955c8e54 100644 --- a/Tests/HummingbirdJobsTests/HummingbirdJobsTests.swift +++ b/Tests/HummingbirdJobsTests/HummingbirdJobsTests.swift @@ -2,7 +2,7 @@ // // This source file is part of the Hummingbird server framework project // -// Copyright (c) 2021-2021 the Hummingbird authors +// Copyright (c) 2021-2024 the Hummingbird authors // Licensed under Apache License v2.0 // // See LICENSE.txt for license information @@ -15,6 +15,8 @@ import Atomics import HummingbirdJobs import HummingbirdXCT +import Logging +import NIOConcurrencyHelpers import ServiceLifecycle import XCTest @@ -59,18 +61,13 @@ final class HummingbirdJobsTests: XCTestCase { } func testBasic() async throws { - struct TestJob: HBJob { - static let name = "testBasic" - static let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 10) - - let value: Int - func execute(logger: Logger) async throws { - print(self.value) - try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50))) - Self.expectation.fulfill() - } + let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 10) + let jobIdentifer = HBJobIdentifier(#function) + HBJobRegister.registerJob(jobIdentifer) { parameters, context in + context.logger.info("Parameters=\(parameters)") + try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50))) + expectation.fulfill() } - TestJob.register() let jobQueue = HBMemoryJobQueue() let jobQueueHandler = HBJobQueueHandler( queue: jobQueue, @@ -78,41 +75,36 @@ final class HummingbirdJobsTests: XCTestCase { logger: Logger(label: "HummingbirdJobsTests") ) try await testJobQueue(jobQueueHandler) { - try await jobQueue.push(TestJob(value: 1)) - try await jobQueue.push(TestJob(value: 2)) - try await jobQueue.push(TestJob(value: 3)) - try await jobQueue.push(TestJob(value: 4)) - try await jobQueue.push(TestJob(value: 5)) - try await jobQueue.push(TestJob(value: 6)) - try await jobQueue.push(TestJob(value: 7)) - try await jobQueue.push(TestJob(value: 8)) - try await jobQueue.push(TestJob(value: 9)) - try await jobQueue.push(TestJob(value: 10)) - - await self.wait(for: [TestJob.expectation], timeout: 5) + try await jobQueue.push(id: jobIdentifer, parameters: 1) + try await jobQueue.push(id: jobIdentifer, parameters: 2) + try await jobQueue.push(id: jobIdentifer, parameters: 3) + try await jobQueue.push(id: jobIdentifer, parameters: 4) + try await jobQueue.push(id: jobIdentifer, parameters: 5) + try await jobQueue.push(id: jobIdentifer, parameters: 6) + try await jobQueue.push(id: jobIdentifer, parameters: 7) + try await jobQueue.push(id: jobIdentifer, parameters: 8) + try await jobQueue.push(id: jobIdentifer, parameters: 9) + try await jobQueue.push(id: jobIdentifer, parameters: 10) + + await self.wait(for: [expectation], timeout: 5) } } func testMultipleWorkers() async throws { - struct TestJob: HBJob { - static let name = "testBasic" - static let runningJobCounter = ManagedAtomic(0) - static let maxRunningJobCounter = ManagedAtomic(0) - static let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 10) - - let value: Int - func execute(logger: Logger) async throws { - let runningJobs = Self.runningJobCounter.wrappingIncrementThenLoad(by: 1, ordering: .relaxed) - if runningJobs > Self.maxRunningJobCounter.load(ordering: .relaxed) { - Self.maxRunningJobCounter.store(runningJobs, ordering: .relaxed) - } - try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50))) - print(self.value) - Self.expectation.fulfill() - Self.runningJobCounter.wrappingDecrement(by: 1, ordering: .relaxed) + let jobIdentifer = HBJobIdentifier(#function) + let runningJobCounter = ManagedAtomic(0) + let maxRunningJobCounter = ManagedAtomic(0) + let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 10) + HBJobRegister.registerJob(jobIdentifer) { parameters, context in + let runningJobs = runningJobCounter.wrappingIncrementThenLoad(by: 1, ordering: .relaxed) + if runningJobs > maxRunningJobCounter.load(ordering: .relaxed) { + maxRunningJobCounter.store(runningJobs, ordering: .relaxed) } + try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50))) + context.logger.info("Parameters=\(parameters)") + expectation.fulfill() + runningJobCounter.wrappingDecrement(by: 1, ordering: .relaxed) } - TestJob.register() let jobQueue = HBMemoryJobQueue() let jobQueueHandler = HBJobQueueHandler( @@ -121,38 +113,33 @@ final class HummingbirdJobsTests: XCTestCase { logger: Logger(label: "HummingbirdJobsTests") ) try await testJobQueue(jobQueueHandler) { - try await jobQueue.push(TestJob(value: 1)) - try await jobQueue.push(TestJob(value: 2)) - try await jobQueue.push(TestJob(value: 3)) - try await jobQueue.push(TestJob(value: 4)) - try await jobQueue.push(TestJob(value: 5)) - try await jobQueue.push(TestJob(value: 6)) - try await jobQueue.push(TestJob(value: 7)) - try await jobQueue.push(TestJob(value: 8)) - try await jobQueue.push(TestJob(value: 9)) - try await jobQueue.push(TestJob(value: 10)) - - await self.wait(for: [TestJob.expectation], timeout: 5) - - XCTAssertGreaterThan(TestJob.maxRunningJobCounter.load(ordering: .relaxed), 1) - XCTAssertLessThanOrEqual(TestJob.maxRunningJobCounter.load(ordering: .relaxed), 4) + try await jobQueue.push(id: jobIdentifer, parameters: 1) + try await jobQueue.push(id: jobIdentifer, parameters: 2) + try await jobQueue.push(id: jobIdentifer, parameters: 3) + try await jobQueue.push(id: jobIdentifer, parameters: 4) + try await jobQueue.push(id: jobIdentifer, parameters: 5) + try await jobQueue.push(id: jobIdentifer, parameters: 6) + try await jobQueue.push(id: jobIdentifer, parameters: 7) + try await jobQueue.push(id: jobIdentifer, parameters: 8) + try await jobQueue.push(id: jobIdentifer, parameters: 9) + try await jobQueue.push(id: jobIdentifer, parameters: 10) + + await self.wait(for: [expectation], timeout: 5) + + XCTAssertGreaterThan(maxRunningJobCounter.load(ordering: .relaxed), 1) + XCTAssertLessThanOrEqual(maxRunningJobCounter.load(ordering: .relaxed), 4) } } func testErrorRetryCount() async throws { + let jobIdentifer = HBJobIdentifier(#function) + let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 4) let failedJobCount = ManagedAtomic(0) struct FailedError: Error {} - - struct TestJob: HBJob { - static let name = "testErrorRetryCount" - static let maxRetryCount = 3 - static let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 4) - func execute(logger: Logger) async throws { - Self.expectation.fulfill() - throw FailedError() - } + HBJobRegister.registerJob(jobIdentifer, maxRetryCount: 3) { _, _ in + expectation.fulfill() + throw FailedError() } - TestJob.register() var logger = Logger(label: "HummingbirdJobsTests") logger.logLevel = .trace let jobQueue = HBMemoryJobQueue { _, _ in failedJobCount.wrappingIncrement(by: 1, ordering: .relaxed) } @@ -162,38 +149,46 @@ final class HummingbirdJobsTests: XCTestCase { logger: logger ) try await testJobQueue(jobQueueHandler) { - try await jobQueue.push(TestJob()) + try await jobQueue.push(id: jobIdentifer, parameters: 0) - await self.wait(for: [TestJob.expectation], timeout: 5) + await self.wait(for: [expectation], timeout: 5) } XCTAssertEqual(failedJobCount.load(ordering: .relaxed), 1) } - func testJobSerialization() throws { - struct TestJob: HBJob, Equatable { - static let name = "testJobSerialization" - let value: Int - func execute(logger: Logger) async throws {} - } - TestJob.register() - let job = TestJob(value: 2) - let codableJob = HBAnyCodableJob(job) - let data = try JSONEncoder().encode(codableJob) - let codableJob2 = try JSONDecoder().decode(HBAnyCodableJob.self, from: data) - XCTAssertEqual(codableJob2.job as? TestJob, job) + func testJobSerialization() async throws { + struct TestJobParameters: Codable { + let id: Int + let message: String + } + let expectation = XCTestExpectation(description: "TestJob.execute was called") + let jobIdentifer = HBJobIdentifier(#function) + HBJobRegister.registerJob(jobIdentifer) { parameters, _ in + XCTAssertEqual(parameters.id, 23) + XCTAssertEqual(parameters.message, "Hello!") + expectation.fulfill() + } + let jobQueue = HBMemoryJobQueue() + let jobQueueHandler = HBJobQueueHandler( + queue: jobQueue, + numWorkers: 1, + logger: Logger(label: "HummingbirdJobsTests") + ) + try await testJobQueue(jobQueueHandler) { + try await jobQueue.push(id: jobIdentifer, parameters: .init(id: 23, message: "Hello!")) + + await self.wait(for: [expectation], timeout: 5) + } } /// Test job is cancelled on shutdown func testShutdownJob() async throws { - struct TestJob: HBJob { - static let name = "testShutdownJob" - static let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 1) - func execute(logger: Logger) async throws { - Self.expectation.fulfill() - try await Task.sleep(for: .milliseconds(1000)) - } + let jobIdentifer = HBJobIdentifier(#function) + let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 1) + HBJobRegister.registerJob(jobIdentifer) { _, _ in + expectation.fulfill() + try await Task.sleep(for: .milliseconds(1000)) } - TestJob.register() let cancelledJobCount = ManagedAtomic(0) var logger = Logger(label: "HummingbirdJobsTests") @@ -209,8 +204,8 @@ final class HummingbirdJobsTests: XCTestCase { logger: logger ) try await testJobQueue(jobQueueHandler) { - try await jobQueue.push(TestJob()) - await self.wait(for: [TestJob.expectation], timeout: 5) + try await jobQueue.push(id: jobIdentifer, parameters: 0) + await self.wait(for: [expectation], timeout: 5) } XCTAssertEqual(cancelledJobCount.load(ordering: .relaxed), 1) @@ -218,19 +213,14 @@ final class HummingbirdJobsTests: XCTestCase { /// test job fails to decode but queue continues to process func testFailToDecode() async throws { - struct TestJob1: HBJob { - static let name = "testFailToDecode" - func execute(logger: Logger) async throws {} - } - struct TestJob2: HBJob { - static let name = "testFailToDecode" - static var value: String? - let value: String - func execute(logger: Logger) async throws { - Self.value = self.value - } + let string: NIOLockedValueBox = .init("") + let jobIdentifer1 = HBJobIdentifier(#function) + let jobIdentifer2 = HBJobIdentifier(#function) + let expectation = XCTestExpectation(description: "job was called", expectedFulfillmentCount: 1) + HBJobRegister.registerJob(jobIdentifer2) { parameters, _ in + string.withLockedValue { $0 = parameters } + expectation.fulfill() } - TestJob2.register() let jobQueue = HBMemoryJobQueue() let jobQueueHandler = HBJobQueueHandler( @@ -239,12 +229,63 @@ final class HummingbirdJobsTests: XCTestCase { logger: Logger(label: "HummingbirdJobsTests") ) try await testJobQueue(jobQueueHandler) { - try await jobQueue.push(TestJob1()) - try await jobQueue.push(TestJob2(value: "test")) - // stall to give job chance to start running - try await Task.sleep(for: .milliseconds(500)) + try await jobQueue.push(id: jobIdentifer1, parameters: 2) + try await jobQueue.push(id: jobIdentifer2, parameters: "test") + await self.wait(for: [expectation], timeout: 5) } + string.withLockedValue { + XCTAssertEqual($0, "test") + } + } - XCTAssertEqual(TestJob2.value, "test") + func testMultipleJobQueueHandlers() async throws { + let jobIdentifer = HBJobIdentifier(#function) + let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 200) + HBJobRegister.registerJob(jobIdentifer) { parameters, context in + context.logger.info("Parameters=\(parameters)") + try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50))) + expectation.fulfill() + } + let logger = { + var logger = Logger(label: "HummingbirdJobsTests") + logger.logLevel = .debug + return logger + }() + let jobQueue = HBMemoryJobQueue() + let jobQueueHandler = HBJobQueueHandler( + queue: jobQueue, + numWorkers: 2, + logger: logger + ) + let jobQueue2 = HBMemoryJobQueue() + let jobQueueHandler2 = HBJobQueueHandler( + queue: jobQueue2, + numWorkers: 2, + logger: logger + ) + + try await withThrowingTaskGroup(of: Void.self) { group in + let serviceGroup = ServiceGroup( + configuration: .init( + services: [jobQueueHandler, jobQueueHandler2], + gracefulShutdownSignals: [.sigterm, .sigint], + logger: logger + ) + ) + group.addTask { + try await serviceGroup.run() + } + do { + for i in 0..<200 { + try await jobQueue.push(id: jobIdentifer, parameters: i) + } + await self.wait(for: [expectation], timeout: 5) + await serviceGroup.triggerGracefulShutdown() + } catch { + XCTFail("\(String(reflecting: error))") + await serviceGroup.triggerGracefulShutdown() + throw error + } + } } } From 63c24d98e04fb3178eac31319096c89ed643d511 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Mon, 26 Feb 2024 14:30:06 +0000 Subject: [PATCH 02/13] Add precondition to check job isn't registered twice --- Sources/HummingbirdJobs/JobRegister.swift | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/Sources/HummingbirdJobs/JobRegister.swift b/Sources/HummingbirdJobs/JobRegister.swift index 9d5ac3e37..d32d110e1 100644 --- a/Sources/HummingbirdJobs/JobRegister.swift +++ b/Sources/HummingbirdJobs/JobRegister.swift @@ -34,7 +34,10 @@ public enum HBJobRegister { let parameters = try Parameters(from: decoder) return try HBJobInstance(job: definition, parameters: parameters) } - self.idTypeMap.withLockedValue { $0[id.name] = builder } + self.idTypeMap.withLockedValue { + precondition($0[id.name] == nil, "There is a job already registered under id \"\(id.name)\"") + $0[id.name] = builder + } } static func decode(from decoder: Decoder) throws -> any HBJob { From 16a77c28cb848788ae99d92319d5ff631e6a3e2e Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Mon, 26 Feb 2024 17:18:20 +0000 Subject: [PATCH 03/13] Move job registry inside queue handler --- Sources/HummingbirdJobs/Job.swift | 17 ---- Sources/HummingbirdJobs/JobDefinition.swift | 30 +++++++ Sources/HummingbirdJobs/JobQueue.swift | 26 +++++- Sources/HummingbirdJobs/JobQueueError.swift | 3 + Sources/HummingbirdJobs/JobQueueHandler.swift | 64 ++++++++++----- Sources/HummingbirdJobs/JobRegister.swift | 77 +++++++++--------- Sources/HummingbirdJobs/MemoryJobQueue.swift | 23 +++--- Sources/HummingbirdJobs/QueuedJob.swift | 27 +------ .../HummingbirdJobsTests.swift | 80 ++++++++++--------- 9 files changed, 200 insertions(+), 147 deletions(-) create mode 100644 Sources/HummingbirdJobs/JobDefinition.swift diff --git a/Sources/HummingbirdJobs/Job.swift b/Sources/HummingbirdJobs/Job.swift index 5a794140b..eb1197cad 100644 --- a/Sources/HummingbirdJobs/Job.swift +++ b/Sources/HummingbirdJobs/Job.swift @@ -30,20 +30,3 @@ extension HBJob { id.name } } - -/// Type used internally by job queue implementations to encode a job request -public struct _HBJobRequest: Encodable, Sendable { - let id: HBJobIdentifier - let parameters: Parameters - - public init(id: HBJobIdentifier, parameters: Parameters) { - self.id = id - self.parameters = parameters - } - - public func encode(to encoder: Encoder) throws { - var container = encoder.container(keyedBy: _HBJobCodingKey.self) - let childEncoder = container.superEncoder(forKey: .init(stringValue: self.id.name, intValue: nil)) - try self.parameters.encode(to: childEncoder) - } -} diff --git a/Sources/HummingbirdJobs/JobDefinition.swift b/Sources/HummingbirdJobs/JobDefinition.swift new file mode 100644 index 000000000..bb9f4705d --- /dev/null +++ b/Sources/HummingbirdJobs/JobDefinition.swift @@ -0,0 +1,30 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Hummingbird server framework project +// +// Copyright (c) 2024 the Hummingbird authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +/// Job definition type +public struct HBJobDefinition { + let id: HBJobIdentifier + let maxRetryCount: Int + let _execute: @Sendable (Parameters, HBJobContext) async throws -> Void + + public init(id: HBJobIdentifier, maxRetryCount: Int = 0, execute: @escaping @Sendable (Parameters, HBJobContext) async throws -> Void) { + self.id = id + self.maxRetryCount = maxRetryCount + self._execute = execute + } + + func execute(_ parameters: Parameters, context: HBJobContext) async throws { + try await self._execute(parameters, context) + } +} diff --git a/Sources/HummingbirdJobs/JobQueue.swift b/Sources/HummingbirdJobs/JobQueue.swift index 46fd22ab9..1506a8791 100644 --- a/Sources/HummingbirdJobs/JobQueue.swift +++ b/Sources/HummingbirdJobs/JobQueue.swift @@ -26,7 +26,7 @@ public protocol HBJobQueue: AsyncSequence, Sendable where Element == HBQueuedJob func onInit() async throws /// Push Job onto queue /// - Returns: Identifier of queued job - @discardableResult func push(id: HBJobIdentifier, parameters: Parameters) async throws -> JobID + @discardableResult func _push(data: Data) async throws -> JobID /// This is called to say job has finished processing and it can be deleted func finished(jobId: JobID) async throws /// This is called to say job has failed to run and should be put aside @@ -40,4 +40,28 @@ public protocol HBJobQueue: AsyncSequence, Sendable where Element == HBQueuedJob extension HBJobQueue { // default version of onInit doing nothing public func onInit() async throws {} + /// Push Job onto queue + /// - Returns: Identifier of queued job + @discardableResult public func push(id: HBJobIdentifier, parameters: Parameters) async throws -> JobID { + let jobRequest = HBJobRequest(id: id, parameters: parameters) + let data = try JSONEncoder().encode(jobRequest) + return try await _push(data: data) + } +} + +/// Type used internally to encode a request +struct HBJobRequest: Encodable, Sendable { + let id: HBJobIdentifier + let parameters: Parameters + + public init(id: HBJobIdentifier, parameters: Parameters) { + self.id = id + self.parameters = parameters + } + + public func encode(to encoder: Encoder) throws { + var container = encoder.container(keyedBy: _HBJobCodingKey.self) + let childEncoder = container.superEncoder(forKey: .init(stringValue: self.id.name, intValue: nil)) + try self.parameters.encode(to: childEncoder) + } } diff --git a/Sources/HummingbirdJobs/JobQueueError.swift b/Sources/HummingbirdJobs/JobQueueError.swift index 4d24665cd..3ec07b125 100644 --- a/Sources/HummingbirdJobs/JobQueueError.swift +++ b/Sources/HummingbirdJobs/JobQueueError.swift @@ -17,11 +17,14 @@ public struct JobQueueError: Error, Equatable { /// failed to decode job. Possibly because it hasn't been registered or data that was expected /// is not available public static var decodeJobFailed: Self { .init(.decodeJobFailed) } + /// failed to decode job as the job id is not recognised + public static var unrecognisedJobId: Self { .init(.unrecognisedJobId) } /// failed to get job from queue public static var dequeueError: Self { .init(.dequeueError) } private enum QueueError { case decodeJobFailed + case unrecognisedJobId case dequeueError } diff --git a/Sources/HummingbirdJobs/JobQueueHandler.swift b/Sources/HummingbirdJobs/JobQueueHandler.swift index 67cc6edfe..991d9003f 100644 --- a/Sources/HummingbirdJobs/JobQueueHandler.swift +++ b/Sources/HummingbirdJobs/JobQueueHandler.swift @@ -23,6 +23,33 @@ public final class HBJobQueueHandler: Service { self.queue = queue self.numWorkers = numWorkers self.logger = logger + self.jobRegister = .init() + } + + /// Register job + /// - Parameters: + /// - id: Job Identifier + /// - maxRetryCount: Maximum number of times job is retried before being flagged as failed + /// - execute: Job code + public func registerJob( + _ id: HBJobIdentifier, + maxRetryCount: Int = 0, + execute: @escaping @Sendable ( + Parameters, + HBJobContext + ) async throws -> Void + ) { + let definition = HBJobDefinition(id: id, maxRetryCount: maxRetryCount, execute: execute) + self.jobRegister.registerJob(job: definition) + } + + /// Register job + /// - Parameters: + /// - id: Job Identifier + /// - maxRetryCount: Maximum number of times job is retried before being flagged as failed + /// - execute: Job code + public func registerJob(_ job: HBJobDefinition) { + self.jobRegister.registerJob(job: job) } public func run() async throws { @@ -32,16 +59,16 @@ public final class HBJobQueueHandler: Service { try await withThrowingTaskGroup(of: Void.self) { group in var iterator = self.queue.makeAsyncIterator() for _ in 0..: Service { } } - func getNextJob(_ queueIterator: inout Queue.AsyncIterator) async throws -> HBQueuedJob? { - while true { - do { - let job = try await queueIterator.next() - return job - } catch let error as JobQueueError where error == JobQueueError.decodeJobFailed { - self.logger.error("Job failed to decode.") - } - } - } - - func runJob(_ queuedJob: HBQueuedJob) async { + func runJob(_ queuedJob: HBQueuedJob) async throws { var logger = logger logger[metadataKey: "hb_job_id"] = .stringConvertible(queuedJob.id) - logger[metadataKey: "hb_job_type"] = .string(queuedJob.job.name) + let job: any HBJob + do { + job = try self.jobRegister.decode(data: queuedJob.jobData) + } catch let error as JobQueueError where error == .unrecognisedJobId { + logger.debug("Failed to find Job with ID while decoding") + try await self.queue.failed(jobId: queuedJob.id, error: error) + return + } catch { + logger.debug("Job failed to decode") + try await self.queue.failed(jobId: queuedJob.id, error: JobQueueError.decodeJobFailed) + return + } + logger[metadataKey: "hb_job_type"] = .string(job.name) - let job = queuedJob.job var count = job.maxRetryCount logger.debug("Starting Job") @@ -103,6 +130,7 @@ public final class HBJobQueueHandler: Service { } } + private let jobRegister: HBJobRegister private let queue: Queue private let numWorkers: Int let logger: Logger diff --git a/Sources/HummingbirdJobs/JobRegister.swift b/Sources/HummingbirdJobs/JobRegister.swift index d32d110e1..de2416cdb 100644 --- a/Sources/HummingbirdJobs/JobRegister.swift +++ b/Sources/HummingbirdJobs/JobRegister.swift @@ -12,69 +12,51 @@ // //===----------------------------------------------------------------------===// +import Foundation import NIOConcurrencyHelpers /// Registry for job types -public enum HBJobRegister { +struct HBJobRegister: Sendable { /// Register job /// - Parameters: /// - id: Job Identifier /// - maxRetryCount: Maximum number of times job is retried before being flagged as failed /// - execute: Job code - public static func registerJob( - _ id: HBJobIdentifier, - maxRetryCount: Int = 0, - execute: @escaping @Sendable ( - Parameters, - HBJobContext - ) async throws -> Void + public func registerJob( + job: HBJobDefinition ) { - let definition = HBJobInstance.Definition(id: id, maxRetryCount: maxRetryCount, execute: execute) let builder = { (decoder: Decoder) in let parameters = try Parameters(from: decoder) - return try HBJobInstance(job: definition, parameters: parameters) + return try HBJobInstance(job: job, parameters: parameters) } - self.idTypeMap.withLockedValue { - precondition($0[id.name] == nil, "There is a job already registered under id \"\(id.name)\"") - $0[id.name] = builder + self.builderTypeMap.withLockedValue { + precondition($0[job.id.name] == nil, "There is a job already registered under id \"\(job.id.name)\"") + $0[job.id.name] = builder } } - static func decode(from decoder: Decoder) throws -> any HBJob { + func decode(data: Data) throws -> any HBJob { + return try JSONDecoder().decode(HBAnyCodableJob.self, from: data, configuration: self).job + } + + func decode(from decoder: Decoder) throws -> any HBJob { let container = try decoder.container(keyedBy: _HBJobCodingKey.self) let key = container.allKeys.first! let childDecoder = try container.superDecoder(forKey: key) - let jobDefinitionBuilder = try Self.idTypeMap.withLockedValue { - guard let job = $0[key.stringValue] else { throw JobQueueError.decodeJobFailed } + let jobDefinitionBuilder = try self.builderTypeMap.withLockedValue { + guard let job = $0[key.stringValue] else { throw JobQueueError.unrecognisedJobId } return job } return try jobDefinitionBuilder(childDecoder) } - static let idTypeMap: NIOLockedValueBox < [String: (Decoder) throws -> any HBJob]> = .init([:]) + let builderTypeMap: NIOLockedValueBox < [String: @Sendable (Decoder) throws -> any HBJob]> = .init([:]) } /// Internal job instance type -struct HBJobInstance: HBJob { - /// Job definition type - struct Definition { - let id: HBJobIdentifier - let maxRetryCount: Int - let _execute: @Sendable (Parameters, HBJobContext) async throws -> Void - - init(id: HBJobIdentifier, maxRetryCount: Int, execute: @escaping @Sendable (Parameters, HBJobContext) async throws -> Void) { - self.id = id - self.maxRetryCount = maxRetryCount - self._execute = execute - } - - public func execute(_ parameters: Parameters, context: HBJobContext) async throws { - try await self._execute(parameters, context) - } - } - +internal struct HBJobInstance: HBJob { /// job definition - let job: Definition + let job: HBJobDefinition /// job parameters let parameters: Parameters @@ -86,12 +68,33 @@ struct HBJobInstance: HBJob { try await self.job.execute(self.parameters, context: context) } - init(job: Definition, parameters: Parameters) throws { + init(job: HBJobDefinition, parameters: Parameters) throws { self.job = job self.parameters = parameters } } +/// Add codable support for decoding/encoding any HBJob +internal struct HBAnyCodableJob: DecodableWithConfiguration, Sendable { + typealias DecodingConfiguration = HBJobRegister + + init(from decoder: Decoder, configuration register: DecodingConfiguration) throws { + self.job = try register.decode(from: decoder) + } + + /// Job data + let job: any HBJob + + /// Initialize a queue job + init(_ job: any HBJob) { + self.job = job + } + + private enum CodingKeys: String, CodingKey { + case job + } +} + internal struct _HBJobCodingKey: CodingKey { public var stringValue: String public var intValue: Int? diff --git a/Sources/HummingbirdJobs/MemoryJobQueue.swift b/Sources/HummingbirdJobs/MemoryJobQueue.swift index 0a17ee482..e333c2de5 100644 --- a/Sources/HummingbirdJobs/MemoryJobQueue.swift +++ b/Sources/HummingbirdJobs/MemoryJobQueue.swift @@ -45,9 +45,8 @@ public final class HBMemoryJobQueue: HBJobQueue { /// - job: Job /// - eventLoop: Eventloop to run process on (ignored in this case) /// - Returns: Queued job - @discardableResult public func push(id: HBJobIdentifier, parameters: Parameters) async throws -> JobID { - let job = _HBJobRequest(id: id, parameters: parameters) - return try await self.queue.push(job) + @discardableResult public func _push(data: Data) async throws -> JobID { + return try await self.queue.push(data) } public func finished(jobId: JobID) async throws { @@ -56,14 +55,14 @@ public final class HBMemoryJobQueue: HBJobQueue { public func failed(jobId: JobID, error: any Error) async throws { if let job = await self.queue.clearAndReturnPendingJob(jobId: jobId) { - self.onFailedJob(.init(id: jobId, job: job), error) + self.onFailedJob(.init(id: jobId, jobData: job), error) } } /// Internal actor managing the job queue fileprivate actor Internal { - var queue: Deque<(JobID, Data)> - var pendingJobs: [JobID: any HBJob] + var queue: Deque> + var pendingJobs: [JobID: Data] var isStopped: Bool init() { @@ -72,10 +71,9 @@ public final class HBMemoryJobQueue: HBJobQueue { self.pendingJobs = .init() } - func push(_ jobRequest: _HBJobRequest) throws -> JobID { + func push(_ jobData: Data) throws -> JobID { let id = JobID() - let request = try (id, JSONEncoder().encode(jobRequest)) - self.queue.append(request) + self.queue.append(HBQueuedJob(id: id, jobData: jobData)) return id } @@ -83,7 +81,7 @@ public final class HBMemoryJobQueue: HBJobQueue { self.pendingJobs[jobId] = nil } - func clearAndReturnPendingJob(jobId: JobID) -> (any HBJob)? { + func clearAndReturnPendingJob(jobId: JobID) -> Data? { let instance = self.pendingJobs[jobId] self.pendingJobs[jobId] = nil return instance @@ -96,9 +94,8 @@ public final class HBMemoryJobQueue: HBJobQueue { } if let request = queue.popFirst() { do { - let job = try JSONDecoder().decode(HBAnyCodableJob.self, from: request.1) - self.pendingJobs[request.0] = job.job - return HBQueuedJob(id: request.0, job: job.job) + self.pendingJobs[request.id] = request.jobData + return request } catch { throw JobQueueError.decodeJobFailed } diff --git a/Sources/HummingbirdJobs/QueuedJob.swift b/Sources/HummingbirdJobs/QueuedJob.swift index b3ef0f996..1e0e1737d 100644 --- a/Sources/HummingbirdJobs/QueuedJob.swift +++ b/Sources/HummingbirdJobs/QueuedJob.swift @@ -14,35 +14,16 @@ import Foundation -/// Add codable support for decoding/encoding any HBJob -public struct HBAnyCodableJob: Decodable, Sendable { - /// Job data - public let job: any HBJob - - /// Initialize a queue job - public init(_ job: any HBJob) { - self.job = job - } - - public init(from decoder: Decoder) throws { - self.job = try HBJobRegister.decode(from: decoder) - } - - private enum CodingKeys: String, CodingKey { - case job - } -} - -/// Queued job. Includes job, plus the id for the job +/// Queued job. Includes job data, plus the id for the job public struct HBQueuedJob: Sendable { /// Job instance id public let id: JobID /// Job data - public let job: any HBJob + public let jobData: Data /// Initialize a queue job - public init(id: JobID, job: any HBJob) { - self.job = job + public init(id: JobID, jobData: Data) { + self.jobData = jobData self.id = id } } diff --git a/Tests/HummingbirdJobsTests/HummingbirdJobsTests.swift b/Tests/HummingbirdJobsTests/HummingbirdJobsTests.swift index 2955c8e54..cc7209fff 100644 --- a/Tests/HummingbirdJobsTests/HummingbirdJobsTests.swift +++ b/Tests/HummingbirdJobsTests/HummingbirdJobsTests.swift @@ -63,18 +63,18 @@ final class HummingbirdJobsTests: XCTestCase { func testBasic() async throws { let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 10) let jobIdentifer = HBJobIdentifier(#function) - HBJobRegister.registerJob(jobIdentifer) { parameters, context in - context.logger.info("Parameters=\(parameters)") - try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50))) - expectation.fulfill() - } let jobQueue = HBMemoryJobQueue() let jobQueueHandler = HBJobQueueHandler( queue: jobQueue, numWorkers: 1, logger: Logger(label: "HummingbirdJobsTests") ) - try await testJobQueue(jobQueueHandler) { + jobQueueHandler.registerJob(jobIdentifer) { parameters, context in + context.logger.info("Parameters=\(parameters)") + try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50))) + expectation.fulfill() + } + try await self.testJobQueue(jobQueueHandler) { try await jobQueue.push(id: jobIdentifer, parameters: 1) try await jobQueue.push(id: jobIdentifer, parameters: 2) try await jobQueue.push(id: jobIdentifer, parameters: 3) @@ -95,7 +95,14 @@ final class HummingbirdJobsTests: XCTestCase { let runningJobCounter = ManagedAtomic(0) let maxRunningJobCounter = ManagedAtomic(0) let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 10) - HBJobRegister.registerJob(jobIdentifer) { parameters, context in + + let jobQueue = HBMemoryJobQueue() + let jobQueueHandler = HBJobQueueHandler( + queue: jobQueue, + numWorkers: 4, + logger: Logger(label: "HummingbirdJobsTests") + ) + jobQueueHandler.registerJob(jobIdentifer) { parameters, context in let runningJobs = runningJobCounter.wrappingIncrementThenLoad(by: 1, ordering: .relaxed) if runningJobs > maxRunningJobCounter.load(ordering: .relaxed) { maxRunningJobCounter.store(runningJobs, ordering: .relaxed) @@ -105,14 +112,7 @@ final class HummingbirdJobsTests: XCTestCase { expectation.fulfill() runningJobCounter.wrappingDecrement(by: 1, ordering: .relaxed) } - - let jobQueue = HBMemoryJobQueue() - let jobQueueHandler = HBJobQueueHandler( - queue: jobQueue, - numWorkers: 4, - logger: Logger(label: "HummingbirdJobsTests") - ) - try await testJobQueue(jobQueueHandler) { + try await self.testJobQueue(jobQueueHandler) { try await jobQueue.push(id: jobIdentifer, parameters: 1) try await jobQueue.push(id: jobIdentifer, parameters: 2) try await jobQueue.push(id: jobIdentifer, parameters: 3) @@ -136,10 +136,6 @@ final class HummingbirdJobsTests: XCTestCase { let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 4) let failedJobCount = ManagedAtomic(0) struct FailedError: Error {} - HBJobRegister.registerJob(jobIdentifer, maxRetryCount: 3) { _, _ in - expectation.fulfill() - throw FailedError() - } var logger = Logger(label: "HummingbirdJobsTests") logger.logLevel = .trace let jobQueue = HBMemoryJobQueue { _, _ in failedJobCount.wrappingIncrement(by: 1, ordering: .relaxed) } @@ -148,7 +144,11 @@ final class HummingbirdJobsTests: XCTestCase { numWorkers: 4, logger: logger ) - try await testJobQueue(jobQueueHandler) { + jobQueueHandler.registerJob(jobIdentifer, maxRetryCount: 3) { _, _ in + expectation.fulfill() + throw FailedError() + } + try await self.testJobQueue(jobQueueHandler) { try await jobQueue.push(id: jobIdentifer, parameters: 0) await self.wait(for: [expectation], timeout: 5) @@ -163,18 +163,18 @@ final class HummingbirdJobsTests: XCTestCase { } let expectation = XCTestExpectation(description: "TestJob.execute was called") let jobIdentifer = HBJobIdentifier(#function) - HBJobRegister.registerJob(jobIdentifer) { parameters, _ in - XCTAssertEqual(parameters.id, 23) - XCTAssertEqual(parameters.message, "Hello!") - expectation.fulfill() - } let jobQueue = HBMemoryJobQueue() let jobQueueHandler = HBJobQueueHandler( queue: jobQueue, numWorkers: 1, logger: Logger(label: "HummingbirdJobsTests") ) - try await testJobQueue(jobQueueHandler) { + jobQueueHandler.registerJob(jobIdentifer) { parameters, _ in + XCTAssertEqual(parameters.id, 23) + XCTAssertEqual(parameters.message, "Hello!") + expectation.fulfill() + } + try await self.testJobQueue(jobQueueHandler) { try await jobQueue.push(id: jobIdentifer, parameters: .init(id: 23, message: "Hello!")) await self.wait(for: [expectation], timeout: 5) @@ -185,10 +185,6 @@ final class HummingbirdJobsTests: XCTestCase { func testShutdownJob() async throws { let jobIdentifer = HBJobIdentifier(#function) let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 1) - HBJobRegister.registerJob(jobIdentifer) { _, _ in - expectation.fulfill() - try await Task.sleep(for: .milliseconds(1000)) - } let cancelledJobCount = ManagedAtomic(0) var logger = Logger(label: "HummingbirdJobsTests") @@ -203,7 +199,11 @@ final class HummingbirdJobsTests: XCTestCase { numWorkers: 4, logger: logger ) - try await testJobQueue(jobQueueHandler) { + jobQueueHandler.registerJob(jobIdentifer) { _, _ in + expectation.fulfill() + try await Task.sleep(for: .milliseconds(1000)) + } + try await self.testJobQueue(jobQueueHandler) { try await jobQueue.push(id: jobIdentifer, parameters: 0) await self.wait(for: [expectation], timeout: 5) } @@ -217,18 +217,20 @@ final class HummingbirdJobsTests: XCTestCase { let jobIdentifer1 = HBJobIdentifier(#function) let jobIdentifer2 = HBJobIdentifier(#function) let expectation = XCTestExpectation(description: "job was called", expectedFulfillmentCount: 1) - HBJobRegister.registerJob(jobIdentifer2) { parameters, _ in - string.withLockedValue { $0 = parameters } - expectation.fulfill() - } + var logger = Logger(label: "HummingbirdJobsTests") + logger.logLevel = .debug let jobQueue = HBMemoryJobQueue() let jobQueueHandler = HBJobQueueHandler( queue: jobQueue, numWorkers: 1, - logger: Logger(label: "HummingbirdJobsTests") + logger: logger ) - try await testJobQueue(jobQueueHandler) { + jobQueueHandler.registerJob(jobIdentifer2) { parameters, _ in + string.withLockedValue { $0 = parameters } + expectation.fulfill() + } + try await self.testJobQueue(jobQueueHandler) { try await jobQueue.push(id: jobIdentifer1, parameters: 2) try await jobQueue.push(id: jobIdentifer2, parameters: "test") await self.wait(for: [expectation], timeout: 5) @@ -241,7 +243,7 @@ final class HummingbirdJobsTests: XCTestCase { func testMultipleJobQueueHandlers() async throws { let jobIdentifer = HBJobIdentifier(#function) let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 200) - HBJobRegister.registerJob(jobIdentifer) { parameters, context in + let job = HBJobDefinition(id: jobIdentifer) { parameters, context in context.logger.info("Parameters=\(parameters)") try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50))) expectation.fulfill() @@ -257,12 +259,14 @@ final class HummingbirdJobsTests: XCTestCase { numWorkers: 2, logger: logger ) + jobQueueHandler.registerJob(job) let jobQueue2 = HBMemoryJobQueue() let jobQueueHandler2 = HBJobQueueHandler( queue: jobQueue2, numWorkers: 2, logger: logger ) + jobQueueHandler2.registerJob(job) try await withThrowingTaskGroup(of: Void.self) { group in let serviceGroup = ServiceGroup( From d1600a9c4e4686759ab30880f326c9d898a0e832 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Mon, 26 Feb 2024 17:48:09 +0000 Subject: [PATCH 04/13] HBJobQueue is now a struct containing a driver and handler --- Sources/HummingbirdJobs/JobDefinition.swift | 2 +- Sources/HummingbirdJobs/JobQueue.swift | 69 +++++++++----- Sources/HummingbirdJobs/JobQueueDriver.swift | 43 +++++++++ Sources/HummingbirdJobs/JobQueueHandler.swift | 25 +---- Sources/HummingbirdJobs/JobRegister.swift | 2 +- Sources/HummingbirdJobs/MemoryJobQueue.swift | 12 +-- .../HummingbirdJobsTests.swift | 95 ++++++------------- 7 files changed, 130 insertions(+), 118 deletions(-) create mode 100644 Sources/HummingbirdJobs/JobQueueDriver.swift diff --git a/Sources/HummingbirdJobs/JobDefinition.swift b/Sources/HummingbirdJobs/JobDefinition.swift index bb9f4705d..1ef3b8d3c 100644 --- a/Sources/HummingbirdJobs/JobDefinition.swift +++ b/Sources/HummingbirdJobs/JobDefinition.swift @@ -13,7 +13,7 @@ //===----------------------------------------------------------------------===// /// Job definition type -public struct HBJobDefinition { +public struct HBJobDefinition: Sendable { let id: HBJobIdentifier let maxRetryCount: Int let _execute: @Sendable (Parameters, HBJobContext) async throws -> Void diff --git a/Sources/HummingbirdJobs/JobQueue.swift b/Sources/HummingbirdJobs/JobQueue.swift index 1506a8791..4f65068fc 100644 --- a/Sources/HummingbirdJobs/JobQueue.swift +++ b/Sources/HummingbirdJobs/JobQueue.swift @@ -15,37 +15,60 @@ import Foundation import Hummingbird import Logging +import ServiceLifecycle -/// Job queue protocol. +/// Job queue /// -/// Defines how to push and pop jobs off a queue -public protocol HBJobQueue: AsyncSequence, Sendable where Element == HBQueuedJob { - associatedtype JobID: CustomStringConvertible & Sendable +/// Wrapper type to bring together a job queue implementation and a job queue +/// handler. Before you can push jobs onto a queue you should register it +/// with the queue via either ``HBJobQueue.registerJob(id:maxRetryCount:execute:)`` or +/// ``HBJobQueue.registerJob(_:)``. +public struct HBJobQueue: Service { + let queue: Queue + let handler: HBJobQueueHandler - /// Called when JobQueueHandler is initialised with this queue - func onInit() async throws - /// Push Job onto queue - /// - Returns: Identifier of queued job - @discardableResult func _push(data: Data) async throws -> JobID - /// This is called to say job has finished processing and it can be deleted - func finished(jobId: JobID) async throws - /// This is called to say job has failed to run and should be put aside - func failed(jobId: JobID, error: any Error) async throws - /// stop serving jobs - func stop() async - /// shutdown queue - func shutdownGracefully() async -} + public init(_ queue: Queue, numWorkers: Int = 1, logger: Logger) { + self.queue = queue + self.handler = .init(queue: queue, numWorkers: numWorkers, logger: logger) + } -extension HBJobQueue { - // default version of onInit doing nothing - public func onInit() async throws {} /// Push Job onto queue /// - Returns: Identifier of queued job - @discardableResult public func push(id: HBJobIdentifier, parameters: Parameters) async throws -> JobID { + @discardableResult public func push(id: HBJobIdentifier, parameters: Parameters) async throws -> Queue.JobID { let jobRequest = HBJobRequest(id: id, parameters: parameters) let data = try JSONEncoder().encode(jobRequest) - return try await _push(data: data) + return try await self.queue.push(data: data) + } + + /// Register job + /// - Parameters: + /// - id: Job Identifier + /// - maxRetryCount: Maximum number of times job is retried before being flagged as failed + /// - execute: Job code + public func registerJob( + _ id: HBJobIdentifier, + maxRetryCount: Int = 0, + execute: @escaping @Sendable ( + Parameters, + HBJobContext + ) async throws -> Void + ) { + let job = HBJobDefinition(id: id, maxRetryCount: maxRetryCount, execute: execute) + self.handler.registerJob(job) + } + + /// Register job + /// - Parameters: + /// - id: Job Identifier + /// - maxRetryCount: Maximum number of times job is retried before being flagged as failed + /// - execute: Job code + public func registerJob(_ job: HBJobDefinition) { + self.handler.registerJob(job) + } + + /// Run queue handler + public func run() async throws { + try await self.handler.run() } } diff --git a/Sources/HummingbirdJobs/JobQueueDriver.swift b/Sources/HummingbirdJobs/JobQueueDriver.swift new file mode 100644 index 000000000..79820a6f0 --- /dev/null +++ b/Sources/HummingbirdJobs/JobQueueDriver.swift @@ -0,0 +1,43 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Hummingbird server framework project +// +// Copyright (c) 2021-2023 the Hummingbird authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Foundation +import Hummingbird +import Logging + +/// Job queue protocol. +/// +/// Defines how to push and pop job data off a queue +public protocol HBJobQueueDriver: AsyncSequence, Sendable where Element == HBQueuedJob { + associatedtype JobID: CustomStringConvertible & Sendable + + /// Called when JobQueueHandler is initialised with this queue + func onInit() async throws + /// Push Job onto queue + /// - Returns: Identifier of queued job + func push(data: Data) async throws -> JobID + /// This is called to say job has finished processing and it can be deleted + func finished(jobId: JobID) async throws + /// This is called to say job has failed to run and should be put aside + func failed(jobId: JobID, error: any Error) async throws + /// stop serving jobs + func stop() async + /// shutdown queue + func shutdownGracefully() async +} + +extension HBJobQueueDriver { + // default version of onInit doing nothing + public func onInit() async throws {} +} diff --git a/Sources/HummingbirdJobs/JobQueueHandler.swift b/Sources/HummingbirdJobs/JobQueueHandler.swift index 991d9003f..49ab0da44 100644 --- a/Sources/HummingbirdJobs/JobQueueHandler.swift +++ b/Sources/HummingbirdJobs/JobQueueHandler.swift @@ -18,8 +18,8 @@ import Logging import ServiceLifecycle /// Object handling a single job queue -public final class HBJobQueueHandler: Service { - public init(queue: Queue, numWorkers: Int, logger: Logger) { +final class HBJobQueueHandler: Service { + init(queue: Queue, numWorkers: Int, logger: Logger) { self.queue = queue self.numWorkers = numWorkers self.logger = logger @@ -31,28 +31,11 @@ public final class HBJobQueueHandler: Service { /// - id: Job Identifier /// - maxRetryCount: Maximum number of times job is retried before being flagged as failed /// - execute: Job code - public func registerJob( - _ id: HBJobIdentifier, - maxRetryCount: Int = 0, - execute: @escaping @Sendable ( - Parameters, - HBJobContext - ) async throws -> Void - ) { - let definition = HBJobDefinition(id: id, maxRetryCount: maxRetryCount, execute: execute) - self.jobRegister.registerJob(job: definition) - } - - /// Register job - /// - Parameters: - /// - id: Job Identifier - /// - maxRetryCount: Maximum number of times job is retried before being flagged as failed - /// - execute: Job code - public func registerJob(_ job: HBJobDefinition) { + func registerJob(_ job: HBJobDefinition) { self.jobRegister.registerJob(job: job) } - public func run() async throws { + func run() async throws { try await self.queue.onInit() try await withGracefulShutdownHandler { diff --git a/Sources/HummingbirdJobs/JobRegister.swift b/Sources/HummingbirdJobs/JobRegister.swift index de2416cdb..b0c05f18b 100644 --- a/Sources/HummingbirdJobs/JobRegister.swift +++ b/Sources/HummingbirdJobs/JobRegister.swift @@ -25,7 +25,7 @@ struct HBJobRegister: Sendable { public func registerJob( job: HBJobDefinition ) { - let builder = { (decoder: Decoder) in + let builder: @Sendable (Decoder) throws -> any HBJob = { decoder in let parameters = try Parameters(from: decoder) return try HBJobInstance(job: job, parameters: parameters) } diff --git a/Sources/HummingbirdJobs/MemoryJobQueue.swift b/Sources/HummingbirdJobs/MemoryJobQueue.swift index e333c2de5..c638f8a1d 100644 --- a/Sources/HummingbirdJobs/MemoryJobQueue.swift +++ b/Sources/HummingbirdJobs/MemoryJobQueue.swift @@ -16,7 +16,7 @@ import Collections import Foundation /// In memory implementation of job queue driver. Stores jobs in a circular buffer -public final class HBMemoryJobQueue: HBJobQueue { +public final class HBMemoryJobQueue: HBJobQueueDriver { public typealias Element = HBQueuedJob public typealias JobID = UUID @@ -45,7 +45,7 @@ public final class HBMemoryJobQueue: HBJobQueue { /// - job: Job /// - eventLoop: Eventloop to run process on (ignored in this case) /// - Returns: Queued job - @discardableResult public func _push(data: Data) async throws -> JobID { + @discardableResult public func push(data: Data) async throws -> JobID { return try await self.queue.push(data) } @@ -93,12 +93,8 @@ public final class HBMemoryJobQueue: HBJobQueue { return nil } if let request = queue.popFirst() { - do { - self.pendingJobs[request.id] = request.jobData - return request - } catch { - throw JobQueueError.decodeJobFailed - } + self.pendingJobs[request.id] = request.jobData + return request } try await Task.sleep(for: .milliseconds(100)) } diff --git a/Tests/HummingbirdJobsTests/HummingbirdJobsTests.swift b/Tests/HummingbirdJobsTests/HummingbirdJobsTests.swift index cc7209fff..506f7555a 100644 --- a/Tests/HummingbirdJobsTests/HummingbirdJobsTests.swift +++ b/Tests/HummingbirdJobsTests/HummingbirdJobsTests.swift @@ -41,13 +41,13 @@ final class HummingbirdJobsTests: XCTestCase { /// Creates test client, runs test function abd ensures everything is /// shutdown correctly public func testJobQueue( - _ jobQueueHandler: HBJobQueueHandler, + _ jobQueue: Service, _ test: () async throws -> Void ) async throws { try await withThrowingTaskGroup(of: Void.self) { group in let serviceGroup = ServiceGroup( configuration: .init( - services: [jobQueueHandler], + services: [jobQueue], gracefulShutdownSignals: [.sigterm, .sigint], logger: Logger(label: "JobQueueService") ) @@ -63,18 +63,13 @@ final class HummingbirdJobsTests: XCTestCase { func testBasic() async throws { let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 10) let jobIdentifer = HBJobIdentifier(#function) - let jobQueue = HBMemoryJobQueue() - let jobQueueHandler = HBJobQueueHandler( - queue: jobQueue, - numWorkers: 1, - logger: Logger(label: "HummingbirdJobsTests") - ) - jobQueueHandler.registerJob(jobIdentifer) { parameters, context in + let jobQueue = HBJobQueue(HBMemoryJobQueue(), numWorkers: 1, logger: Logger(label: "HummingbirdJobsTests")) + jobQueue.registerJob(jobIdentifer) { parameters, context in context.logger.info("Parameters=\(parameters)") try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50))) expectation.fulfill() } - try await self.testJobQueue(jobQueueHandler) { + try await self.testJobQueue(jobQueue) { try await jobQueue.push(id: jobIdentifer, parameters: 1) try await jobQueue.push(id: jobIdentifer, parameters: 2) try await jobQueue.push(id: jobIdentifer, parameters: 3) @@ -96,13 +91,8 @@ final class HummingbirdJobsTests: XCTestCase { let maxRunningJobCounter = ManagedAtomic(0) let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 10) - let jobQueue = HBMemoryJobQueue() - let jobQueueHandler = HBJobQueueHandler( - queue: jobQueue, - numWorkers: 4, - logger: Logger(label: "HummingbirdJobsTests") - ) - jobQueueHandler.registerJob(jobIdentifer) { parameters, context in + let jobQueue = HBJobQueue(HBMemoryJobQueue(), numWorkers: 4, logger: Logger(label: "HummingbirdJobsTests")) + jobQueue.registerJob(jobIdentifer) { parameters, context in let runningJobs = runningJobCounter.wrappingIncrementThenLoad(by: 1, ordering: .relaxed) if runningJobs > maxRunningJobCounter.load(ordering: .relaxed) { maxRunningJobCounter.store(runningJobs, ordering: .relaxed) @@ -112,7 +102,7 @@ final class HummingbirdJobsTests: XCTestCase { expectation.fulfill() runningJobCounter.wrappingDecrement(by: 1, ordering: .relaxed) } - try await self.testJobQueue(jobQueueHandler) { + try await self.testJobQueue(jobQueue) { try await jobQueue.push(id: jobIdentifer, parameters: 1) try await jobQueue.push(id: jobIdentifer, parameters: 2) try await jobQueue.push(id: jobIdentifer, parameters: 3) @@ -138,17 +128,15 @@ final class HummingbirdJobsTests: XCTestCase { struct FailedError: Error {} var logger = Logger(label: "HummingbirdJobsTests") logger.logLevel = .trace - let jobQueue = HBMemoryJobQueue { _, _ in failedJobCount.wrappingIncrement(by: 1, ordering: .relaxed) } - let jobQueueHandler = HBJobQueueHandler( - queue: jobQueue, - numWorkers: 4, + let jobQueue = HBJobQueue( + HBMemoryJobQueue { _, _ in failedJobCount.wrappingIncrement(by: 1, ordering: .relaxed) }, logger: logger ) - jobQueueHandler.registerJob(jobIdentifer, maxRetryCount: 3) { _, _ in + jobQueue.registerJob(jobIdentifer, maxRetryCount: 3) { _, _ in expectation.fulfill() throw FailedError() } - try await self.testJobQueue(jobQueueHandler) { + try await self.testJobQueue(jobQueue) { try await jobQueue.push(id: jobIdentifer, parameters: 0) await self.wait(for: [expectation], timeout: 5) @@ -163,18 +151,13 @@ final class HummingbirdJobsTests: XCTestCase { } let expectation = XCTestExpectation(description: "TestJob.execute was called") let jobIdentifer = HBJobIdentifier(#function) - let jobQueue = HBMemoryJobQueue() - let jobQueueHandler = HBJobQueueHandler( - queue: jobQueue, - numWorkers: 1, - logger: Logger(label: "HummingbirdJobsTests") - ) - jobQueueHandler.registerJob(jobIdentifer) { parameters, _ in + let jobQueue = HBJobQueue(HBMemoryJobQueue(), numWorkers: 1, logger: Logger(label: "HummingbirdJobsTests")) + jobQueue.registerJob(jobIdentifer) { parameters, _ in XCTAssertEqual(parameters.id, 23) XCTAssertEqual(parameters.message, "Hello!") expectation.fulfill() } - try await self.testJobQueue(jobQueueHandler) { + try await self.testJobQueue(jobQueue) { try await jobQueue.push(id: jobIdentifer, parameters: .init(id: 23, message: "Hello!")) await self.wait(for: [expectation], timeout: 5) @@ -189,21 +172,20 @@ final class HummingbirdJobsTests: XCTestCase { let cancelledJobCount = ManagedAtomic(0) var logger = Logger(label: "HummingbirdJobsTests") logger.logLevel = .trace - let jobQueue = HBMemoryJobQueue { _, error in - if error is CancellationError { - cancelledJobCount.wrappingIncrement(by: 1, ordering: .relaxed) - } - } - let jobQueueHandler = HBJobQueueHandler( - queue: jobQueue, + let jobQueue = HBJobQueue( + HBMemoryJobQueue { _, error in + if error is CancellationError { + cancelledJobCount.wrappingIncrement(by: 1, ordering: .relaxed) + } + }, numWorkers: 4, logger: logger ) - jobQueueHandler.registerJob(jobIdentifer) { _, _ in + jobQueue.registerJob(jobIdentifer) { _, _ in expectation.fulfill() try await Task.sleep(for: .milliseconds(1000)) } - try await self.testJobQueue(jobQueueHandler) { + try await self.testJobQueue(jobQueue) { try await jobQueue.push(id: jobIdentifer, parameters: 0) await self.wait(for: [expectation], timeout: 5) } @@ -220,17 +202,12 @@ final class HummingbirdJobsTests: XCTestCase { var logger = Logger(label: "HummingbirdJobsTests") logger.logLevel = .debug - let jobQueue = HBMemoryJobQueue() - let jobQueueHandler = HBJobQueueHandler( - queue: jobQueue, - numWorkers: 1, - logger: logger - ) - jobQueueHandler.registerJob(jobIdentifer2) { parameters, _ in + let jobQueue = HBJobQueue(HBMemoryJobQueue(), numWorkers: 1, logger: Logger(label: "HummingbirdJobsTests")) + jobQueue.registerJob(jobIdentifer2) { parameters, _ in string.withLockedValue { $0 = parameters } expectation.fulfill() } - try await self.testJobQueue(jobQueueHandler) { + try await self.testJobQueue(jobQueue) { try await jobQueue.push(id: jobIdentifer1, parameters: 2) try await jobQueue.push(id: jobIdentifer2, parameters: "test") await self.wait(for: [expectation], timeout: 5) @@ -253,25 +230,15 @@ final class HummingbirdJobsTests: XCTestCase { logger.logLevel = .debug return logger }() - let jobQueue = HBMemoryJobQueue() - let jobQueueHandler = HBJobQueueHandler( - queue: jobQueue, - numWorkers: 2, - logger: logger - ) - jobQueueHandler.registerJob(job) - let jobQueue2 = HBMemoryJobQueue() - let jobQueueHandler2 = HBJobQueueHandler( - queue: jobQueue2, - numWorkers: 2, - logger: logger - ) - jobQueueHandler2.registerJob(job) + let jobQueue = HBJobQueue(HBMemoryJobQueue(), numWorkers: 2, logger: Logger(label: "HummingbirdJobsTests")) + jobQueue.registerJob(job) + let jobQueue2 = HBJobQueue(HBMemoryJobQueue(), numWorkers: 1, logger: Logger(label: "HummingbirdJobsTests")) + jobQueue2.registerJob(job) try await withThrowingTaskGroup(of: Void.self) { group in let serviceGroup = ServiceGroup( configuration: .init( - services: [jobQueueHandler, jobQueueHandler2], + services: [jobQueue, jobQueue2], gracefulShutdownSignals: [.sigterm, .sigint], logger: logger ) From 6a33bd44e23f8850092b14d03982593802880366 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Mon, 26 Feb 2024 17:49:12 +0000 Subject: [PATCH 05/13] comments --- Sources/HummingbirdJobs/JobQueue.swift | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/Sources/HummingbirdJobs/JobQueue.swift b/Sources/HummingbirdJobs/JobQueue.swift index 4f65068fc..dcc75667c 100644 --- a/Sources/HummingbirdJobs/JobQueue.swift +++ b/Sources/HummingbirdJobs/JobQueue.swift @@ -32,7 +32,10 @@ public struct HBJobQueue: Service { self.handler = .init(queue: queue, numWorkers: numWorkers, logger: logger) } - /// Push Job onto queue + /// Push Job onto queue + /// - Parameters: + /// - id: Job identifier + /// - parameters: parameters for the job /// - Returns: Identifier of queued job @discardableResult public func push(id: HBJobIdentifier, parameters: Parameters) async throws -> Queue.JobID { let jobRequest = HBJobRequest(id: id, parameters: parameters) From da0f9e69774a66f6ef0e57d230e8a2519413eaa9 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Mon, 26 Feb 2024 22:33:04 +0000 Subject: [PATCH 06/13] Add DecodableWithUserInfoConfiguration --- Sources/HummingbirdJobs/JobQueue.swift | 2 +- Sources/HummingbirdJobs/JobQueueDriver.swift | 2 +- Sources/HummingbirdJobs/JobQueueError.swift | 2 +- Sources/HummingbirdJobs/JobQueueHandler.swift | 2 +- Sources/HummingbirdJobs/JobRegister.swift | 4 +- Sources/HummingbirdJobs/MemoryJobQueue.swift | 2 +- Sources/HummingbirdJobs/QueuedJob.swift | 2 +- .../DecodableWithUserInfoConfiguration.swift | 49 +++++++++++++++++++ 8 files changed, 57 insertions(+), 8 deletions(-) create mode 100644 Sources/HummingbirdJobs/Utils/DecodableWithUserInfoConfiguration.swift diff --git a/Sources/HummingbirdJobs/JobQueue.swift b/Sources/HummingbirdJobs/JobQueue.swift index dcc75667c..f3077870f 100644 --- a/Sources/HummingbirdJobs/JobQueue.swift +++ b/Sources/HummingbirdJobs/JobQueue.swift @@ -2,7 +2,7 @@ // // This source file is part of the Hummingbird server framework project // -// Copyright (c) 2021-2023 the Hummingbird authors +// Copyright (c) 2021-2024 the Hummingbird authors // Licensed under Apache License v2.0 // // See LICENSE.txt for license information diff --git a/Sources/HummingbirdJobs/JobQueueDriver.swift b/Sources/HummingbirdJobs/JobQueueDriver.swift index 79820a6f0..98c5a0f7d 100644 --- a/Sources/HummingbirdJobs/JobQueueDriver.swift +++ b/Sources/HummingbirdJobs/JobQueueDriver.swift @@ -2,7 +2,7 @@ // // This source file is part of the Hummingbird server framework project // -// Copyright (c) 2021-2023 the Hummingbird authors +// Copyright (c) 2021-2024 the Hummingbird authors // Licensed under Apache License v2.0 // // See LICENSE.txt for license information diff --git a/Sources/HummingbirdJobs/JobQueueError.swift b/Sources/HummingbirdJobs/JobQueueError.swift index 3ec07b125..3105e9672 100644 --- a/Sources/HummingbirdJobs/JobQueueError.swift +++ b/Sources/HummingbirdJobs/JobQueueError.swift @@ -2,7 +2,7 @@ // // This source file is part of the Hummingbird server framework project // -// Copyright (c) 2021-2021 the Hummingbird authors +// Copyright (c) 2021-2024 the Hummingbird authors // Licensed under Apache License v2.0 // // See LICENSE.txt for license information diff --git a/Sources/HummingbirdJobs/JobQueueHandler.swift b/Sources/HummingbirdJobs/JobQueueHandler.swift index 49ab0da44..7c84ddb4d 100644 --- a/Sources/HummingbirdJobs/JobQueueHandler.swift +++ b/Sources/HummingbirdJobs/JobQueueHandler.swift @@ -2,7 +2,7 @@ // // This source file is part of the Hummingbird server framework project // -// Copyright (c) 2021-2021 the Hummingbird authors +// Copyright (c) 2021-2024 the Hummingbird authors // Licensed under Apache License v2.0 // // See LICENSE.txt for license information diff --git a/Sources/HummingbirdJobs/JobRegister.swift b/Sources/HummingbirdJobs/JobRegister.swift index b0c05f18b..854208ddd 100644 --- a/Sources/HummingbirdJobs/JobRegister.swift +++ b/Sources/HummingbirdJobs/JobRegister.swift @@ -36,7 +36,7 @@ struct HBJobRegister: Sendable { } func decode(data: Data) throws -> any HBJob { - return try JSONDecoder().decode(HBAnyCodableJob.self, from: data, configuration: self).job + return try JSONDecoder().decode(HBAnyCodableJob.self, from: data, userInfoConfiguration: self).job } func decode(from decoder: Decoder) throws -> any HBJob { @@ -75,7 +75,7 @@ internal struct HBJobInstance: HBJob { } /// Add codable support for decoding/encoding any HBJob -internal struct HBAnyCodableJob: DecodableWithConfiguration, Sendable { +internal struct HBAnyCodableJob: DecodableWithUserInfoConfiguration, Sendable { typealias DecodingConfiguration = HBJobRegister init(from decoder: Decoder, configuration register: DecodingConfiguration) throws { diff --git a/Sources/HummingbirdJobs/MemoryJobQueue.swift b/Sources/HummingbirdJobs/MemoryJobQueue.swift index c638f8a1d..6bda20b2e 100644 --- a/Sources/HummingbirdJobs/MemoryJobQueue.swift +++ b/Sources/HummingbirdJobs/MemoryJobQueue.swift @@ -2,7 +2,7 @@ // // This source file is part of the Hummingbird server framework project // -// Copyright (c) 2021-2021 the Hummingbird authors +// Copyright (c) 2021-2024 the Hummingbird authors // Licensed under Apache License v2.0 // // See LICENSE.txt for license information diff --git a/Sources/HummingbirdJobs/QueuedJob.swift b/Sources/HummingbirdJobs/QueuedJob.swift index 1e0e1737d..017679b90 100644 --- a/Sources/HummingbirdJobs/QueuedJob.swift +++ b/Sources/HummingbirdJobs/QueuedJob.swift @@ -2,7 +2,7 @@ // // This source file is part of the Hummingbird server framework project // -// Copyright (c) 2021-2023 the Hummingbird authors +// Copyright (c) 2021-2024 the Hummingbird authors // Licensed under Apache License v2.0 // // See LICENSE.txt for license information diff --git a/Sources/HummingbirdJobs/Utils/DecodableWithUserInfoConfiguration.swift b/Sources/HummingbirdJobs/Utils/DecodableWithUserInfoConfiguration.swift new file mode 100644 index 000000000..f5c38734e --- /dev/null +++ b/Sources/HummingbirdJobs/Utils/DecodableWithUserInfoConfiguration.swift @@ -0,0 +1,49 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Hummingbird server framework project +// +// Copyright (c) 2024 the Hummingbird authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Foundation + +/// Implementation of DecodableWithConfiguration which extracts the configuration from the userInfo array +/// +/// This is necessary as Linux Foundation does not have support for setting DecodableWithConfiguration +/// configuration from the JSONDecoder +protocol DecodableWithUserInfoConfiguration: Decodable, DecodableWithConfiguration {} + +/// Implement `init(from: Decoder)`` by extracting configuration from the userInfo dictionary. +extension DecodableWithUserInfoConfiguration { + init(from decoder: Decoder) throws { + guard let configuration = decoder.userInfo[.configuration] as? DecodingConfiguration else { + throw DecodingError.valueNotFound(DecodingConfiguration.self, .init(codingPath: decoder.codingPath, debugDescription: "Failed to find Decoding configuration")) + } + try self.init(from: decoder, configuration: configuration) + } +} + +extension CodingUserInfoKey { + /// Coding UserInfo key used to store DecodableWithUserInfoConfiguration configuration + static var configuration: Self { return .init(rawValue: "_configuration_")! } +} + +extension JSONDecoder { + /// Version of JSONDecoder that sets up configuration userInfo for the DecodableWithUserInfoConfiguration + /// protocol + func decode( + _ type: T.Type, + from data: Data, + userInfoConfiguration: T.DecodingConfiguration + ) throws -> T where T: DecodableWithUserInfoConfiguration { + self.userInfo[.configuration] = userInfoConfiguration + return try self.decode(type, from: data) + } +} From 284223d4d3f9eb7256b8e90a2bf60bc65013d452 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Tue, 27 Feb 2024 07:29:31 +0000 Subject: [PATCH 07/13] Init HBJobIdentifier from string literal --- Sources/HummingbirdJobs/JobIdentifier.swift | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/Sources/HummingbirdJobs/JobIdentifier.swift b/Sources/HummingbirdJobs/JobIdentifier.swift index 1b9d5cdae..ec7dc0f45 100644 --- a/Sources/HummingbirdJobs/JobIdentifier.swift +++ b/Sources/HummingbirdJobs/JobIdentifier.swift @@ -23,7 +23,20 @@ /// static var myJob: Self { .init("my-job") } /// } /// ``` -public struct HBJobIdentifier: Sendable, Hashable { +public struct HBJobIdentifier: Sendable, Hashable, ExpressibleByStringLiteral { let name: String + /// Initialize a HBJobIdentifier + /// + /// - Parameters: + /// - name: Unique name for identifier + /// - parameters: Parameter type associated with Job public init(_ name: String, parameters: Parameters.Type = Parameters.self) { self.name = name } + + /// Initialize a HBJobIdentifier from a string literal + /// + /// This can only be used in a situation where the Parameter type is defined elsewhere + /// - Parameter string: + public init(stringLiteral string: String) { + self.name = string + } } From 3afb86bfaeef57de259524f47a5ebddaad24f7bc Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Tue, 27 Feb 2024 07:30:16 +0000 Subject: [PATCH 08/13] Make HBJobDefinition.id public, Rename HBJobRegister -> HBJobRegistry --- Sources/HummingbirdJobs/JobDefinition.swift | 2 +- Sources/HummingbirdJobs/JobQueueHandler.swift | 8 +++---- .../{JobRegister.swift => JobRegistry.swift} | 4 ++-- .../HummingbirdJobsTests.swift | 24 +++++++++---------- 4 files changed, 19 insertions(+), 19 deletions(-) rename Sources/HummingbirdJobs/{JobRegister.swift => JobRegistry.swift} (97%) diff --git a/Sources/HummingbirdJobs/JobDefinition.swift b/Sources/HummingbirdJobs/JobDefinition.swift index 1ef3b8d3c..8ce468e6a 100644 --- a/Sources/HummingbirdJobs/JobDefinition.swift +++ b/Sources/HummingbirdJobs/JobDefinition.swift @@ -14,7 +14,7 @@ /// Job definition type public struct HBJobDefinition: Sendable { - let id: HBJobIdentifier + public let id: HBJobIdentifier let maxRetryCount: Int let _execute: @Sendable (Parameters, HBJobContext) async throws -> Void diff --git a/Sources/HummingbirdJobs/JobQueueHandler.swift b/Sources/HummingbirdJobs/JobQueueHandler.swift index 7c84ddb4d..fc5bf4928 100644 --- a/Sources/HummingbirdJobs/JobQueueHandler.swift +++ b/Sources/HummingbirdJobs/JobQueueHandler.swift @@ -23,7 +23,7 @@ final class HBJobQueueHandler: Service { self.queue = queue self.numWorkers = numWorkers self.logger = logger - self.jobRegister = .init() + self.jobRegistry = .init() } /// Register job @@ -32,7 +32,7 @@ final class HBJobQueueHandler: Service { /// - maxRetryCount: Maximum number of times job is retried before being flagged as failed /// - execute: Job code func registerJob(_ job: HBJobDefinition) { - self.jobRegister.registerJob(job: job) + self.jobRegistry.registerJob(job: job) } func run() async throws { @@ -69,7 +69,7 @@ final class HBJobQueueHandler: Service { logger[metadataKey: "hb_job_id"] = .stringConvertible(queuedJob.id) let job: any HBJob do { - job = try self.jobRegister.decode(data: queuedJob.jobData) + job = try self.jobRegistry.decode(data: queuedJob.jobData) } catch let error as JobQueueError where error == .unrecognisedJobId { logger.debug("Failed to find Job with ID while decoding") try await self.queue.failed(jobId: queuedJob.id, error: error) @@ -113,7 +113,7 @@ final class HBJobQueueHandler: Service { } } - private let jobRegister: HBJobRegister + private let jobRegistry: HBJobRegistry private let queue: Queue private let numWorkers: Int let logger: Logger diff --git a/Sources/HummingbirdJobs/JobRegister.swift b/Sources/HummingbirdJobs/JobRegistry.swift similarity index 97% rename from Sources/HummingbirdJobs/JobRegister.swift rename to Sources/HummingbirdJobs/JobRegistry.swift index 854208ddd..2f39432b7 100644 --- a/Sources/HummingbirdJobs/JobRegister.swift +++ b/Sources/HummingbirdJobs/JobRegistry.swift @@ -16,7 +16,7 @@ import Foundation import NIOConcurrencyHelpers /// Registry for job types -struct HBJobRegister: Sendable { +struct HBJobRegistry: Sendable { /// Register job /// - Parameters: /// - id: Job Identifier @@ -76,7 +76,7 @@ internal struct HBJobInstance: HBJob { /// Add codable support for decoding/encoding any HBJob internal struct HBAnyCodableJob: DecodableWithUserInfoConfiguration, Sendable { - typealias DecodingConfiguration = HBJobRegister + typealias DecodingConfiguration = HBJobRegistry init(from decoder: Decoder, configuration register: DecodingConfiguration) throws { self.job = try register.decode(from: decoder) diff --git a/Tests/HummingbirdJobsTests/HummingbirdJobsTests.swift b/Tests/HummingbirdJobsTests/HummingbirdJobsTests.swift index 506f7555a..2ba023eec 100644 --- a/Tests/HummingbirdJobsTests/HummingbirdJobsTests.swift +++ b/Tests/HummingbirdJobsTests/HummingbirdJobsTests.swift @@ -62,24 +62,24 @@ final class HummingbirdJobsTests: XCTestCase { func testBasic() async throws { let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 10) - let jobIdentifer = HBJobIdentifier(#function) let jobQueue = HBJobQueue(HBMemoryJobQueue(), numWorkers: 1, logger: Logger(label: "HummingbirdJobsTests")) - jobQueue.registerJob(jobIdentifer) { parameters, context in + let job = HBJobDefinition(id: "testBasic") { (parameters: Int, context) in context.logger.info("Parameters=\(parameters)") try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50))) expectation.fulfill() } + jobQueue.registerJob(job) try await self.testJobQueue(jobQueue) { - try await jobQueue.push(id: jobIdentifer, parameters: 1) - try await jobQueue.push(id: jobIdentifer, parameters: 2) - try await jobQueue.push(id: jobIdentifer, parameters: 3) - try await jobQueue.push(id: jobIdentifer, parameters: 4) - try await jobQueue.push(id: jobIdentifer, parameters: 5) - try await jobQueue.push(id: jobIdentifer, parameters: 6) - try await jobQueue.push(id: jobIdentifer, parameters: 7) - try await jobQueue.push(id: jobIdentifer, parameters: 8) - try await jobQueue.push(id: jobIdentifer, parameters: 9) - try await jobQueue.push(id: jobIdentifer, parameters: 10) + try await jobQueue.push(id: job.id, parameters: 1) + try await jobQueue.push(id: job.id, parameters: 2) + try await jobQueue.push(id: job.id, parameters: 3) + try await jobQueue.push(id: job.id, parameters: 4) + try await jobQueue.push(id: job.id, parameters: 5) + try await jobQueue.push(id: job.id, parameters: 6) + try await jobQueue.push(id: job.id, parameters: 7) + try await jobQueue.push(id: job.id, parameters: 8) + try await jobQueue.push(id: job.id, parameters: 9) + try await jobQueue.push(id: job.id, parameters: 10) await self.wait(for: [expectation], timeout: 5) } From 8a898a6262e96e013cb8bfc6c4801ff9cce31b0a Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Tue, 27 Feb 2024 09:41:37 +0000 Subject: [PATCH 09/13] Remove Hummingbird dependency from HummingbirdJobs --- Package.swift | 6 +++++- Sources/HummingbirdJobs/JobQueue.swift | 13 +++++-------- Sources/HummingbirdJobs/JobQueueDriver.swift | 1 - Sources/HummingbirdJobs/JobQueueHandler.swift | 2 -- 4 files changed, 10 insertions(+), 12 deletions(-) diff --git a/Package.swift b/Package.swift index 073a4ddaf..20c297323 100644 --- a/Package.swift +++ b/Package.swift @@ -21,6 +21,7 @@ let package = Package( dependencies: [ .package(url: "https://github.com/apple/swift-async-algorithms.git", from: "1.0.0"), .package(url: "https://github.com/apple/swift-atomics.git", from: "1.0.0"), + .package(url: "https://github.com/apple/swift-collections.git", from: "1.0.0"), .package(url: "https://github.com/apple/swift-log.git", from: "1.4.0"), .package(url: "https://github.com/apple/swift-http-types.git", from: "1.0.0"), .package(url: "https://github.com/apple/swift-metrics.git", "1.0.0"..<"3.0.0"), @@ -70,8 +71,10 @@ let package = Package( .target( name: "HummingbirdJobs", dependencies: [ - .byName(name: "Hummingbird"), + .product(name: "Collections", package: "swift-collections"), .product(name: "Logging", package: "swift-log"), + .product(name: "NIOConcurrencyHelpers", package: "swift-nio"), + .product(name: "ServiceLifecycle", package: "swift-service-lifecycle"), ], swiftSettings: swiftSettings ), @@ -138,6 +141,7 @@ let package = Package( .testTarget(name: "HummingbirdJobsTests", dependencies: [ .byName(name: "HummingbirdJobs"), .byName(name: "HummingbirdXCT"), + .product(name: "Atomics", package: "swift-atomics"), ]), .testTarget(name: "HummingbirdRouterTests", dependencies: [ .byName(name: "HummingbirdRouter"), diff --git a/Sources/HummingbirdJobs/JobQueue.swift b/Sources/HummingbirdJobs/JobQueue.swift index f3077870f..703c64f88 100644 --- a/Sources/HummingbirdJobs/JobQueue.swift +++ b/Sources/HummingbirdJobs/JobQueue.swift @@ -13,7 +13,6 @@ //===----------------------------------------------------------------------===// import Foundation -import Hummingbird import Logging import ServiceLifecycle @@ -43,7 +42,7 @@ public struct HBJobQueue: Service { return try await self.queue.push(data: data) } - /// Register job + /// Register job type /// - Parameters: /// - id: Job Identifier /// - maxRetryCount: Maximum number of times job is retried before being flagged as failed @@ -57,15 +56,13 @@ public struct HBJobQueue: Service { ) async throws -> Void ) { let job = HBJobDefinition(id: id, maxRetryCount: maxRetryCount, execute: execute) - self.handler.registerJob(job) + self.registerJob(job) } - /// Register job + /// Register job type /// - Parameters: - /// - id: Job Identifier - /// - maxRetryCount: Maximum number of times job is retried before being flagged as failed - /// - execute: Job code - public func registerJob(_ job: HBJobDefinition) { + /// - job: Job definition + public func registerJob(_ job: HBJobDefinition) { self.handler.registerJob(job) } diff --git a/Sources/HummingbirdJobs/JobQueueDriver.swift b/Sources/HummingbirdJobs/JobQueueDriver.swift index 98c5a0f7d..70d05bac9 100644 --- a/Sources/HummingbirdJobs/JobQueueDriver.swift +++ b/Sources/HummingbirdJobs/JobQueueDriver.swift @@ -13,7 +13,6 @@ //===----------------------------------------------------------------------===// import Foundation -import Hummingbird import Logging /// Job queue protocol. diff --git a/Sources/HummingbirdJobs/JobQueueHandler.swift b/Sources/HummingbirdJobs/JobQueueHandler.swift index fc5bf4928..50516b37e 100644 --- a/Sources/HummingbirdJobs/JobQueueHandler.swift +++ b/Sources/HummingbirdJobs/JobQueueHandler.swift @@ -12,8 +12,6 @@ // //===----------------------------------------------------------------------===// -import AsyncAlgorithms -import Hummingbird import Logging import ServiceLifecycle From 5026e63a46409d892a807dac0855a8abdcb10516 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Tue, 27 Feb 2024 13:23:41 +0000 Subject: [PATCH 10/13] Add HBJobQueueDriver.memory --- Sources/HummingbirdJobs/JobQueue.swift | 5 +++-- Sources/HummingbirdJobs/MemoryJobQueue.swift | 15 ++++++++++++--- .../HummingbirdJobsTests.swift | 16 ++++++++-------- 3 files changed, 23 insertions(+), 13 deletions(-) diff --git a/Sources/HummingbirdJobs/JobQueue.swift b/Sources/HummingbirdJobs/JobQueue.swift index 703c64f88..0cd09d16d 100644 --- a/Sources/HummingbirdJobs/JobQueue.swift +++ b/Sources/HummingbirdJobs/JobQueue.swift @@ -23,7 +23,8 @@ import ServiceLifecycle /// with the queue via either ``HBJobQueue.registerJob(id:maxRetryCount:execute:)`` or /// ``HBJobQueue.registerJob(_:)``. public struct HBJobQueue: Service { - let queue: Queue + /// underlying driver for queue + public let queue: Queue let handler: HBJobQueueHandler public init(_ queue: Queue, numWorkers: Int = 1, logger: Logger) { @@ -62,7 +63,7 @@ public struct HBJobQueue: Service { /// Register job type /// - Parameters: /// - job: Job definition - public func registerJob(_ job: HBJobDefinition) { + public func registerJob(_ job: HBJobDefinition) { self.handler.registerJob(job) } diff --git a/Sources/HummingbirdJobs/MemoryJobQueue.swift b/Sources/HummingbirdJobs/MemoryJobQueue.swift index 6bda20b2e..be8c8a8d5 100644 --- a/Sources/HummingbirdJobs/MemoryJobQueue.swift +++ b/Sources/HummingbirdJobs/MemoryJobQueue.swift @@ -15,8 +15,8 @@ import Collections import Foundation -/// In memory implementation of job queue driver. Stores jobs in a circular buffer -public final class HBMemoryJobQueue: HBJobQueueDriver { +/// In memory implementation of job queue driver. Stores job data in a circular buffer +public final class HBMemoryQueue: HBJobQueueDriver { public typealias Element = HBQueuedJob public typealias JobID = UUID @@ -111,7 +111,7 @@ public final class HBMemoryJobQueue: HBJobQueueDriver { } } -extension HBMemoryJobQueue { +extension HBMemoryQueue { public struct AsyncIterator: AsyncIteratorProtocol { fileprivate let queue: Internal @@ -124,3 +124,12 @@ extension HBMemoryJobQueue { .init(queue: self.queue) } } + +extension HBJobQueueDriver where Self == HBMemoryQueue { + /// Return In memory driver for Job Queue + /// - Parameters: + /// - onFailedJob: Closure called when a job fails + public static var memory: HBMemoryQueue { + .init() + } +} diff --git a/Tests/HummingbirdJobsTests/HummingbirdJobsTests.swift b/Tests/HummingbirdJobsTests/HummingbirdJobsTests.swift index 2ba023eec..c1b070900 100644 --- a/Tests/HummingbirdJobsTests/HummingbirdJobsTests.swift +++ b/Tests/HummingbirdJobsTests/HummingbirdJobsTests.swift @@ -62,7 +62,7 @@ final class HummingbirdJobsTests: XCTestCase { func testBasic() async throws { let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 10) - let jobQueue = HBJobQueue(HBMemoryJobQueue(), numWorkers: 1, logger: Logger(label: "HummingbirdJobsTests")) + let jobQueue = HBJobQueue(.memory, numWorkers: 1, logger: Logger(label: "HummingbirdJobsTests")) let job = HBJobDefinition(id: "testBasic") { (parameters: Int, context) in context.logger.info("Parameters=\(parameters)") try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50))) @@ -91,7 +91,7 @@ final class HummingbirdJobsTests: XCTestCase { let maxRunningJobCounter = ManagedAtomic(0) let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 10) - let jobQueue = HBJobQueue(HBMemoryJobQueue(), numWorkers: 4, logger: Logger(label: "HummingbirdJobsTests")) + let jobQueue = HBJobQueue(.memory, numWorkers: 4, logger: Logger(label: "HummingbirdJobsTests")) jobQueue.registerJob(jobIdentifer) { parameters, context in let runningJobs = runningJobCounter.wrappingIncrementThenLoad(by: 1, ordering: .relaxed) if runningJobs > maxRunningJobCounter.load(ordering: .relaxed) { @@ -129,7 +129,7 @@ final class HummingbirdJobsTests: XCTestCase { var logger = Logger(label: "HummingbirdJobsTests") logger.logLevel = .trace let jobQueue = HBJobQueue( - HBMemoryJobQueue { _, _ in failedJobCount.wrappingIncrement(by: 1, ordering: .relaxed) }, + HBMemoryQueue { _, _ in failedJobCount.wrappingIncrement(by: 1, ordering: .relaxed) }, logger: logger ) jobQueue.registerJob(jobIdentifer, maxRetryCount: 3) { _, _ in @@ -151,7 +151,7 @@ final class HummingbirdJobsTests: XCTestCase { } let expectation = XCTestExpectation(description: "TestJob.execute was called") let jobIdentifer = HBJobIdentifier(#function) - let jobQueue = HBJobQueue(HBMemoryJobQueue(), numWorkers: 1, logger: Logger(label: "HummingbirdJobsTests")) + let jobQueue = HBJobQueue(.memory, numWorkers: 1, logger: Logger(label: "HummingbirdJobsTests")) jobQueue.registerJob(jobIdentifer) { parameters, _ in XCTAssertEqual(parameters.id, 23) XCTAssertEqual(parameters.message, "Hello!") @@ -173,7 +173,7 @@ final class HummingbirdJobsTests: XCTestCase { var logger = Logger(label: "HummingbirdJobsTests") logger.logLevel = .trace let jobQueue = HBJobQueue( - HBMemoryJobQueue { _, error in + HBMemoryQueue { _, error in if error is CancellationError { cancelledJobCount.wrappingIncrement(by: 1, ordering: .relaxed) } @@ -202,7 +202,7 @@ final class HummingbirdJobsTests: XCTestCase { var logger = Logger(label: "HummingbirdJobsTests") logger.logLevel = .debug - let jobQueue = HBJobQueue(HBMemoryJobQueue(), numWorkers: 1, logger: Logger(label: "HummingbirdJobsTests")) + let jobQueue = HBJobQueue(.memory, numWorkers: 1, logger: Logger(label: "HummingbirdJobsTests")) jobQueue.registerJob(jobIdentifer2) { parameters, _ in string.withLockedValue { $0 = parameters } expectation.fulfill() @@ -230,9 +230,9 @@ final class HummingbirdJobsTests: XCTestCase { logger.logLevel = .debug return logger }() - let jobQueue = HBJobQueue(HBMemoryJobQueue(), numWorkers: 2, logger: Logger(label: "HummingbirdJobsTests")) + let jobQueue = HBJobQueue(.memory, numWorkers: 2, logger: Logger(label: "HummingbirdJobsTests")) jobQueue.registerJob(job) - let jobQueue2 = HBJobQueue(HBMemoryJobQueue(), numWorkers: 1, logger: Logger(label: "HummingbirdJobsTests")) + let jobQueue2 = HBJobQueue(.memory, numWorkers: 1, logger: Logger(label: "HummingbirdJobsTests")) jobQueue2.registerJob(job) try await withThrowingTaskGroup(of: Void.self) { group in From 217ab3d03e831838b5796487e4882d7fce35f80c Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Tue, 27 Feb 2024 13:31:47 +0000 Subject: [PATCH 11/13] Update preprocessor check in HummingbirdJobsTests.wait --- Tests/HummingbirdJobsTests/HummingbirdJobsTests.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Tests/HummingbirdJobsTests/HummingbirdJobsTests.swift b/Tests/HummingbirdJobsTests/HummingbirdJobsTests.swift index c1b070900..ef56cedf1 100644 --- a/Tests/HummingbirdJobsTests/HummingbirdJobsTests.swift +++ b/Tests/HummingbirdJobsTests/HummingbirdJobsTests.swift @@ -29,7 +29,7 @@ extension XCTestExpectation { final class HummingbirdJobsTests: XCTestCase { func wait(for expectations: [XCTestExpectation], timeout: TimeInterval) async { - #if (os(Linux) && swift(<5.10)) || swift(<5.8) + #if (os(Linux) && swift(<5.9)) || swift(<5.8) super.wait(for: expectations, timeout: timeout) #else await fulfillment(of: expectations, timeout: timeout) From f3eff76b9c87b5fa2df0d40fb6b2485862e52f01 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Mon, 4 Mar 2024 16:37:16 +0000 Subject: [PATCH 12/13] Use ByteBuffer instead of Data --- Sources/HummingbirdJobs/JobQueue.swift | 8 ++++++-- Sources/HummingbirdJobs/JobQueueDriver.swift | 3 ++- Sources/HummingbirdJobs/JobQueueHandler.swift | 2 +- Sources/HummingbirdJobs/JobRegistry.swift | 5 +++-- Sources/HummingbirdJobs/MemoryJobQueue.swift | 17 +++++++++-------- Sources/HummingbirdJobs/QueuedJob.swift | 7 ++++--- .../DecodableWithUserInfoConfiguration.swift | 6 ++++-- 7 files changed, 29 insertions(+), 19 deletions(-) diff --git a/Sources/HummingbirdJobs/JobQueue.swift b/Sources/HummingbirdJobs/JobQueue.swift index 0cd09d16d..3dd65834a 100644 --- a/Sources/HummingbirdJobs/JobQueue.swift +++ b/Sources/HummingbirdJobs/JobQueue.swift @@ -14,6 +14,8 @@ import Foundation import Logging +import NIOCore +import NIOFoundationCompat import ServiceLifecycle /// Job queue @@ -26,10 +28,12 @@ public struct HBJobQueue: Service { /// underlying driver for queue public let queue: Queue let handler: HBJobQueueHandler + let allocator: ByteBufferAllocator public init(_ queue: Queue, numWorkers: Int = 1, logger: Logger) { self.queue = queue self.handler = .init(queue: queue, numWorkers: numWorkers, logger: logger) + self.allocator = .init() } /// Push Job onto queue @@ -39,8 +43,8 @@ public struct HBJobQueue: Service { /// - Returns: Identifier of queued job @discardableResult public func push(id: HBJobIdentifier, parameters: Parameters) async throws -> Queue.JobID { let jobRequest = HBJobRequest(id: id, parameters: parameters) - let data = try JSONEncoder().encode(jobRequest) - return try await self.queue.push(data: data) + let buffer = try JSONEncoder().encodeAsByteBuffer(jobRequest, allocator: self.allocator) + return try await self.queue.push(buffer) } /// Register job type diff --git a/Sources/HummingbirdJobs/JobQueueDriver.swift b/Sources/HummingbirdJobs/JobQueueDriver.swift index 70d05bac9..cb7a13e19 100644 --- a/Sources/HummingbirdJobs/JobQueueDriver.swift +++ b/Sources/HummingbirdJobs/JobQueueDriver.swift @@ -14,6 +14,7 @@ import Foundation import Logging +import NIOCore /// Job queue protocol. /// @@ -25,7 +26,7 @@ public protocol HBJobQueueDriver: AsyncSequence, Sendable where Element == HBQue func onInit() async throws /// Push Job onto queue /// - Returns: Identifier of queued job - func push(data: Data) async throws -> JobID + func push(_ buffer: ByteBuffer) async throws -> JobID /// This is called to say job has finished processing and it can be deleted func finished(jobId: JobID) async throws /// This is called to say job has failed to run and should be put aside diff --git a/Sources/HummingbirdJobs/JobQueueHandler.swift b/Sources/HummingbirdJobs/JobQueueHandler.swift index 50516b37e..4986d83a8 100644 --- a/Sources/HummingbirdJobs/JobQueueHandler.swift +++ b/Sources/HummingbirdJobs/JobQueueHandler.swift @@ -67,7 +67,7 @@ final class HBJobQueueHandler: Service { logger[metadataKey: "hb_job_id"] = .stringConvertible(queuedJob.id) let job: any HBJob do { - job = try self.jobRegistry.decode(data: queuedJob.jobData) + job = try self.jobRegistry.decode(queuedJob.jobBuffer) } catch let error as JobQueueError where error == .unrecognisedJobId { logger.debug("Failed to find Job with ID while decoding") try await self.queue.failed(jobId: queuedJob.id, error: error) diff --git a/Sources/HummingbirdJobs/JobRegistry.swift b/Sources/HummingbirdJobs/JobRegistry.swift index 2f39432b7..455749b2f 100644 --- a/Sources/HummingbirdJobs/JobRegistry.swift +++ b/Sources/HummingbirdJobs/JobRegistry.swift @@ -14,6 +14,7 @@ import Foundation import NIOConcurrencyHelpers +import NIOCore /// Registry for job types struct HBJobRegistry: Sendable { @@ -35,8 +36,8 @@ struct HBJobRegistry: Sendable { } } - func decode(data: Data) throws -> any HBJob { - return try JSONDecoder().decode(HBAnyCodableJob.self, from: data, userInfoConfiguration: self).job + func decode(_ buffer: ByteBuffer) throws -> any HBJob { + return try JSONDecoder().decode(HBAnyCodableJob.self, from: buffer, userInfoConfiguration: self).job } func decode(from decoder: Decoder) throws -> any HBJob { diff --git a/Sources/HummingbirdJobs/MemoryJobQueue.swift b/Sources/HummingbirdJobs/MemoryJobQueue.swift index be8c8a8d5..e17d86eb4 100644 --- a/Sources/HummingbirdJobs/MemoryJobQueue.swift +++ b/Sources/HummingbirdJobs/MemoryJobQueue.swift @@ -14,6 +14,7 @@ import Collections import Foundation +import NIOCore /// In memory implementation of job queue driver. Stores job data in a circular buffer public final class HBMemoryQueue: HBJobQueueDriver { @@ -45,8 +46,8 @@ public final class HBMemoryQueue: HBJobQueueDriver { /// - job: Job /// - eventLoop: Eventloop to run process on (ignored in this case) /// - Returns: Queued job - @discardableResult public func push(data: Data) async throws -> JobID { - return try await self.queue.push(data) + @discardableResult public func push(_ buffer: ByteBuffer) async throws -> JobID { + return try await self.queue.push(buffer) } public func finished(jobId: JobID) async throws { @@ -55,14 +56,14 @@ public final class HBMemoryQueue: HBJobQueueDriver { public func failed(jobId: JobID, error: any Error) async throws { if let job = await self.queue.clearAndReturnPendingJob(jobId: jobId) { - self.onFailedJob(.init(id: jobId, jobData: job), error) + self.onFailedJob(.init(id: jobId, jobBuffer: job), error) } } /// Internal actor managing the job queue fileprivate actor Internal { var queue: Deque> - var pendingJobs: [JobID: Data] + var pendingJobs: [JobID: ByteBuffer] var isStopped: Bool init() { @@ -71,9 +72,9 @@ public final class HBMemoryQueue: HBJobQueueDriver { self.pendingJobs = .init() } - func push(_ jobData: Data) throws -> JobID { + func push(_ jobBuffer: ByteBuffer) throws -> JobID { let id = JobID() - self.queue.append(HBQueuedJob(id: id, jobData: jobData)) + self.queue.append(HBQueuedJob(id: id, jobBuffer: jobBuffer)) return id } @@ -81,7 +82,7 @@ public final class HBMemoryQueue: HBJobQueueDriver { self.pendingJobs[jobId] = nil } - func clearAndReturnPendingJob(jobId: JobID) -> Data? { + func clearAndReturnPendingJob(jobId: JobID) -> ByteBuffer? { let instance = self.pendingJobs[jobId] self.pendingJobs[jobId] = nil return instance @@ -93,7 +94,7 @@ public final class HBMemoryQueue: HBJobQueueDriver { return nil } if let request = queue.popFirst() { - self.pendingJobs[request.id] = request.jobData + self.pendingJobs[request.id] = request.jobBuffer return request } try await Task.sleep(for: .milliseconds(100)) diff --git a/Sources/HummingbirdJobs/QueuedJob.swift b/Sources/HummingbirdJobs/QueuedJob.swift index 017679b90..62e59d86c 100644 --- a/Sources/HummingbirdJobs/QueuedJob.swift +++ b/Sources/HummingbirdJobs/QueuedJob.swift @@ -13,17 +13,18 @@ //===----------------------------------------------------------------------===// import Foundation +import NIOCore /// Queued job. Includes job data, plus the id for the job public struct HBQueuedJob: Sendable { /// Job instance id public let id: JobID /// Job data - public let jobData: Data + public let jobBuffer: ByteBuffer /// Initialize a queue job - public init(id: JobID, jobData: Data) { - self.jobData = jobData + public init(id: JobID, jobBuffer: ByteBuffer) { + self.jobBuffer = jobBuffer self.id = id } } diff --git a/Sources/HummingbirdJobs/Utils/DecodableWithUserInfoConfiguration.swift b/Sources/HummingbirdJobs/Utils/DecodableWithUserInfoConfiguration.swift index f5c38734e..9747de690 100644 --- a/Sources/HummingbirdJobs/Utils/DecodableWithUserInfoConfiguration.swift +++ b/Sources/HummingbirdJobs/Utils/DecodableWithUserInfoConfiguration.swift @@ -13,6 +13,8 @@ //===----------------------------------------------------------------------===// import Foundation +import NIOCore +import NIOFoundationCompat /// Implementation of DecodableWithConfiguration which extracts the configuration from the userInfo array /// @@ -40,10 +42,10 @@ extension JSONDecoder { /// protocol func decode( _ type: T.Type, - from data: Data, + from buffer: ByteBuffer, userInfoConfiguration: T.DecodingConfiguration ) throws -> T where T: DecodableWithUserInfoConfiguration { self.userInfo[.configuration] = userInfoConfiguration - return try self.decode(type, from: data) + return try self.decode(type, from: buffer) } } From f130b5723119373e7c91320202f9e7667e7a05b1 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Mon, 4 Mar 2024 17:04:51 +0000 Subject: [PATCH 13/13] Add dependent modules to HummingbirdJobs --- Package.swift | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Package.swift b/Package.swift index 20c297323..3acca322d 100644 --- a/Package.swift +++ b/Package.swift @@ -74,6 +74,8 @@ let package = Package( .product(name: "Collections", package: "swift-collections"), .product(name: "Logging", package: "swift-log"), .product(name: "NIOConcurrencyHelpers", package: "swift-nio"), + .product(name: "NIOCore", package: "swift-nio"), + .product(name: "NIOFoundationCompat", package: "swift-nio"), .product(name: "ServiceLifecycle", package: "swift-service-lifecycle"), ], swiftSettings: swiftSettings