diff --git a/Package.swift b/Package.swift index 073a4ddaf..3acca322d 100644 --- a/Package.swift +++ b/Package.swift @@ -21,6 +21,7 @@ let package = Package( dependencies: [ .package(url: "https://github.com/apple/swift-async-algorithms.git", from: "1.0.0"), .package(url: "https://github.com/apple/swift-atomics.git", from: "1.0.0"), + .package(url: "https://github.com/apple/swift-collections.git", from: "1.0.0"), .package(url: "https://github.com/apple/swift-log.git", from: "1.4.0"), .package(url: "https://github.com/apple/swift-http-types.git", from: "1.0.0"), .package(url: "https://github.com/apple/swift-metrics.git", "1.0.0"..<"3.0.0"), @@ -70,8 +71,12 @@ let package = Package( .target( name: "HummingbirdJobs", dependencies: [ - .byName(name: "Hummingbird"), + .product(name: "Collections", package: "swift-collections"), .product(name: "Logging", package: "swift-log"), + .product(name: "NIOConcurrencyHelpers", package: "swift-nio"), + .product(name: "NIOCore", package: "swift-nio"), + .product(name: "NIOFoundationCompat", package: "swift-nio"), + .product(name: "ServiceLifecycle", package: "swift-service-lifecycle"), ], swiftSettings: swiftSettings ), @@ -138,6 +143,7 @@ let package = Package( .testTarget(name: "HummingbirdJobsTests", dependencies: [ .byName(name: "HummingbirdJobs"), .byName(name: "HummingbirdXCT"), + .product(name: "Atomics", package: "swift-atomics"), ]), .testTarget(name: "HummingbirdRouterTests", dependencies: [ .byName(name: "HummingbirdRouter"), diff --git a/Sources/HummingbirdJobs/Job.swift b/Sources/HummingbirdJobs/Job.swift index d36d3ff2c..eb1197cad 100644 --- a/Sources/HummingbirdJobs/Job.swift +++ b/Sources/HummingbirdJobs/Job.swift @@ -2,7 +2,7 @@ // // This source file is part of the Hummingbird server framework project // -// Copyright (c) 2021-2023 the Hummingbird authors +// Copyright (c) 2024 the Hummingbird authors // Licensed under Apache License v2.0 // // See LICENSE.txt for license information @@ -12,83 +12,21 @@ // //===----------------------------------------------------------------------===// -import Foundation -import Logging -import NIOConcurrencyHelpers -import NIOCore - -/// Protocol for job description -/// -/// For a job to be decodable, it has to be registered. Call `MyJob.register()` to register a job. -public protocol HBJob: Codable, Sendable { - /// Unique Job name - static var name: String { get } - - /// Maximum times this job should be retried if it fails - static var maxRetryCount: Int { get } - - /// Execute job - /// - Returns: EventLoopFuture that is fulfulled when job is done - func execute(logger: Logger) async throws +/// Protocol for a Job +public protocol HBJob: Sendable { + /// Parameters job requries + associatedtype Parameters: Codable & Sendable + /// Job Type identifier + var id: HBJobIdentifier { get } + /// Maximum number of times a job will be retried before being classed as failed + var maxRetryCount: Int { get } + /// Function to execute the job + func execute(context: HBJobContext) async throws } extension HBJob { - /// maximum times this job should be retried - public static var maxRetryCount: Int { return 0 } - - /// register job - public static func register() { - HBJobRegister.register(job: Self.self) - } -} - -/// Register Jobs, for decoding and encoding -enum HBJobRegister { - static func decode(from decoder: Decoder) throws -> HBJob { - let container = try decoder.container(keyedBy: _HBJobCodingKey.self) - let key = container.allKeys.first! - let childDecoder = try container.superDecoder(forKey: key) - let jobType = try HBJobRegister.nameTypeMap.withLockedValue { - guard let job = $0[key.stringValue] else { throw JobQueueError.decodeJobFailed } - return job - } - return try jobType.init(from: childDecoder) - } - - static func encode(job: HBJob, to encoder: Encoder) throws { - var container = encoder.container(keyedBy: _HBJobCodingKey.self) - let childEncoder = container.superEncoder(forKey: .init(stringValue: type(of: job).name, intValue: nil)) - try job.encode(to: childEncoder) - } - - static func register(job: HBJob.Type) { - self.nameTypeMap.withLockedValue { $0[job.name] = job } - } - - static let nameTypeMap: NIOLockedValueBox<[String: HBJob.Type]> = .init([:]) -} - -internal struct _HBJobCodingKey: CodingKey { - public var stringValue: String - public var intValue: Int? - - public init?(stringValue: String) { - self.stringValue = stringValue - self.intValue = nil - } - - public init?(intValue: Int) { - self.stringValue = "\(intValue)" - self.intValue = intValue - } - - public init(stringValue: String, intValue: Int?) { - self.stringValue = stringValue - self.intValue = intValue - } - - internal init(index: Int) { - self.stringValue = "Index \(index)" - self.intValue = index + /// name of job type + public var name: String { + id.name } } diff --git a/Sources/HummingbirdJobs/Exports.swift b/Sources/HummingbirdJobs/JobContext.swift similarity index 77% rename from Sources/HummingbirdJobs/Exports.swift rename to Sources/HummingbirdJobs/JobContext.swift index e0b216ce1..a49b807c3 100644 --- a/Sources/HummingbirdJobs/Exports.swift +++ b/Sources/HummingbirdJobs/JobContext.swift @@ -2,7 +2,7 @@ // // This source file is part of the Hummingbird server framework project // -// Copyright (c) 2021-2021 the Hummingbird authors +// Copyright (c) 2024 the Hummingbird authors // Licensed under Apache License v2.0 // // See LICENSE.txt for license information @@ -12,4 +12,8 @@ // //===----------------------------------------------------------------------===// -@_exported @_documentation(visibility: internal) import struct Logging.Logger +import Logging + +public struct HBJobContext { + public let logger: Logger +} diff --git a/Sources/HummingbirdJobs/JobDefinition.swift b/Sources/HummingbirdJobs/JobDefinition.swift new file mode 100644 index 000000000..8ce468e6a --- /dev/null +++ b/Sources/HummingbirdJobs/JobDefinition.swift @@ -0,0 +1,30 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Hummingbird server framework project +// +// Copyright (c) 2024 the Hummingbird authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +/// Job definition type +public struct HBJobDefinition: Sendable { + public let id: HBJobIdentifier + let maxRetryCount: Int + let _execute: @Sendable (Parameters, HBJobContext) async throws -> Void + + public init(id: HBJobIdentifier, maxRetryCount: Int = 0, execute: @escaping @Sendable (Parameters, HBJobContext) async throws -> Void) { + self.id = id + self.maxRetryCount = maxRetryCount + self._execute = execute + } + + func execute(_ parameters: Parameters, context: HBJobContext) async throws { + try await self._execute(parameters, context) + } +} diff --git a/Sources/HummingbirdJobs/JobIdentifier.swift b/Sources/HummingbirdJobs/JobIdentifier.swift new file mode 100644 index 000000000..ec7dc0f45 --- /dev/null +++ b/Sources/HummingbirdJobs/JobIdentifier.swift @@ -0,0 +1,42 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Hummingbird server framework project +// +// Copyright (c) 2024 the Hummingbird authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +/// Identifier for a Job type +/// +/// The identifier includes the type of the parameters required by the job to ensure +/// the wrong parameters are not passed to this job +/// +/// Extend this type to include your own job identifiers +/// ``` +/// extension HBJobIdentifier { +/// static var myJob: Self { .init("my-job") } +/// } +/// ``` +public struct HBJobIdentifier: Sendable, Hashable, ExpressibleByStringLiteral { + let name: String + /// Initialize a HBJobIdentifier + /// + /// - Parameters: + /// - name: Unique name for identifier + /// - parameters: Parameter type associated with Job + public init(_ name: String, parameters: Parameters.Type = Parameters.self) { self.name = name } + + /// Initialize a HBJobIdentifier from a string literal + /// + /// This can only be used in a situation where the Parameter type is defined elsewhere + /// - Parameter string: + public init(stringLiteral string: String) { + self.name = string + } +} diff --git a/Sources/HummingbirdJobs/JobQueue.swift b/Sources/HummingbirdJobs/JobQueue.swift index 9237e5ce5..3dd65834a 100644 --- a/Sources/HummingbirdJobs/JobQueue.swift +++ b/Sources/HummingbirdJobs/JobQueue.swift @@ -2,7 +2,7 @@ // // This source file is part of the Hummingbird server framework project // -// Copyright (c) 2021-2023 the Hummingbird authors +// Copyright (c) 2021-2024 the Hummingbird authors // Licensed under Apache License v2.0 // // See LICENSE.txt for license information @@ -13,31 +13,83 @@ //===----------------------------------------------------------------------===// import Foundation -import Hummingbird import Logging +import NIOCore +import NIOFoundationCompat +import ServiceLifecycle -/// Job queue protocol. +/// Job queue /// -/// Defines how to push and pop jobs off a queue -public protocol HBJobQueue: AsyncSequence, Sendable where Element == HBQueuedJob { - associatedtype JobID: CustomStringConvertible & Sendable +/// Wrapper type to bring together a job queue implementation and a job queue +/// handler. Before you can push jobs onto a queue you should register it +/// with the queue via either ``HBJobQueue.registerJob(id:maxRetryCount:execute:)`` or +/// ``HBJobQueue.registerJob(_:)``. +public struct HBJobQueue: Service { + /// underlying driver for queue + public let queue: Queue + let handler: HBJobQueueHandler + let allocator: ByteBufferAllocator - /// Called when JobQueueHandler is initialised with this queue - func onInit() async throws - /// Push Job onto queue + public init(_ queue: Queue, numWorkers: Int = 1, logger: Logger) { + self.queue = queue + self.handler = .init(queue: queue, numWorkers: numWorkers, logger: logger) + self.allocator = .init() + } + + /// Push Job onto queue + /// - Parameters: + /// - id: Job identifier + /// - parameters: parameters for the job /// - Returns: Identifier of queued job - @discardableResult func push(_ job: HBJob) async throws -> JobID - /// This is called to say job has finished processing and it can be deleted - func finished(jobId: JobID) async throws - /// This is called to say job has failed to run and should be put aside - func failed(jobId: JobID, error: any Error) async throws - /// stop serving jobs - func stop() async - /// shutdown queue - func shutdownGracefully() async + @discardableResult public func push(id: HBJobIdentifier, parameters: Parameters) async throws -> Queue.JobID { + let jobRequest = HBJobRequest(id: id, parameters: parameters) + let buffer = try JSONEncoder().encodeAsByteBuffer(jobRequest, allocator: self.allocator) + return try await self.queue.push(buffer) + } + + /// Register job type + /// - Parameters: + /// - id: Job Identifier + /// - maxRetryCount: Maximum number of times job is retried before being flagged as failed + /// - execute: Job code + public func registerJob( + _ id: HBJobIdentifier, + maxRetryCount: Int = 0, + execute: @escaping @Sendable ( + Parameters, + HBJobContext + ) async throws -> Void + ) { + let job = HBJobDefinition(id: id, maxRetryCount: maxRetryCount, execute: execute) + self.registerJob(job) + } + + /// Register job type + /// - Parameters: + /// - job: Job definition + public func registerJob(_ job: HBJobDefinition) { + self.handler.registerJob(job) + } + + /// Run queue handler + public func run() async throws { + try await self.handler.run() + } } -extension HBJobQueue { - // default version of onInit doing nothing - public func onInit() async throws {} +/// Type used internally to encode a request +struct HBJobRequest: Encodable, Sendable { + let id: HBJobIdentifier + let parameters: Parameters + + public init(id: HBJobIdentifier, parameters: Parameters) { + self.id = id + self.parameters = parameters + } + + public func encode(to encoder: Encoder) throws { + var container = encoder.container(keyedBy: _HBJobCodingKey.self) + let childEncoder = container.superEncoder(forKey: .init(stringValue: self.id.name, intValue: nil)) + try self.parameters.encode(to: childEncoder) + } } diff --git a/Sources/HummingbirdJobs/JobQueueDriver.swift b/Sources/HummingbirdJobs/JobQueueDriver.swift new file mode 100644 index 000000000..cb7a13e19 --- /dev/null +++ b/Sources/HummingbirdJobs/JobQueueDriver.swift @@ -0,0 +1,43 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Hummingbird server framework project +// +// Copyright (c) 2021-2024 the Hummingbird authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Foundation +import Logging +import NIOCore + +/// Job queue protocol. +/// +/// Defines how to push and pop job data off a queue +public protocol HBJobQueueDriver: AsyncSequence, Sendable where Element == HBQueuedJob { + associatedtype JobID: CustomStringConvertible & Sendable + + /// Called when JobQueueHandler is initialised with this queue + func onInit() async throws + /// Push Job onto queue + /// - Returns: Identifier of queued job + func push(_ buffer: ByteBuffer) async throws -> JobID + /// This is called to say job has finished processing and it can be deleted + func finished(jobId: JobID) async throws + /// This is called to say job has failed to run and should be put aside + func failed(jobId: JobID, error: any Error) async throws + /// stop serving jobs + func stop() async + /// shutdown queue + func shutdownGracefully() async +} + +extension HBJobQueueDriver { + // default version of onInit doing nothing + public func onInit() async throws {} +} diff --git a/Sources/HummingbirdJobs/JobQueueError.swift b/Sources/HummingbirdJobs/JobQueueError.swift index 4d24665cd..3105e9672 100644 --- a/Sources/HummingbirdJobs/JobQueueError.swift +++ b/Sources/HummingbirdJobs/JobQueueError.swift @@ -2,7 +2,7 @@ // // This source file is part of the Hummingbird server framework project // -// Copyright (c) 2021-2021 the Hummingbird authors +// Copyright (c) 2021-2024 the Hummingbird authors // Licensed under Apache License v2.0 // // See LICENSE.txt for license information @@ -17,11 +17,14 @@ public struct JobQueueError: Error, Equatable { /// failed to decode job. Possibly because it hasn't been registered or data that was expected /// is not available public static var decodeJobFailed: Self { .init(.decodeJobFailed) } + /// failed to decode job as the job id is not recognised + public static var unrecognisedJobId: Self { .init(.unrecognisedJobId) } /// failed to get job from queue public static var dequeueError: Self { .init(.dequeueError) } private enum QueueError { case decodeJobFailed + case unrecognisedJobId case dequeueError } diff --git a/Sources/HummingbirdJobs/JobQueueHandler.swift b/Sources/HummingbirdJobs/JobQueueHandler.swift index 0af6e43ba..4986d83a8 100644 --- a/Sources/HummingbirdJobs/JobQueueHandler.swift +++ b/Sources/HummingbirdJobs/JobQueueHandler.swift @@ -2,7 +2,7 @@ // // This source file is part of the Hummingbird server framework project // -// Copyright (c) 2021-2021 the Hummingbird authors +// Copyright (c) 2021-2024 the Hummingbird authors // Licensed under Apache License v2.0 // // See LICENSE.txt for license information @@ -12,36 +12,44 @@ // //===----------------------------------------------------------------------===// -import AsyncAlgorithms -import Hummingbird import Logging import ServiceLifecycle /// Object handling a single job queue -public final class HBJobQueueHandler: Service { - public init(queue: Queue, numWorkers: Int, logger: Logger) { +final class HBJobQueueHandler: Service { + init(queue: Queue, numWorkers: Int, logger: Logger) { self.queue = queue self.numWorkers = numWorkers self.logger = logger + self.jobRegistry = .init() } - public func run() async throws { + /// Register job + /// - Parameters: + /// - id: Job Identifier + /// - maxRetryCount: Maximum number of times job is retried before being flagged as failed + /// - execute: Job code + func registerJob(_ job: HBJobDefinition) { + self.jobRegistry.registerJob(job: job) + } + + func run() async throws { try await self.queue.onInit() try await withGracefulShutdownHandler { try await withThrowingTaskGroup(of: Void.self) { group in var iterator = self.queue.makeAsyncIterator() for _ in 0..: Service { } } - func getNextJob(_ queueIterator: inout Queue.AsyncIterator) async throws -> HBQueuedJob? { - while true { - do { - let job = try await queueIterator.next() - return job - } catch let error as JobQueueError where error == JobQueueError.decodeJobFailed { - self.logger.error("Job failed to decode.") - } - } - } - - func runJob(_ queuedJob: HBQueuedJob) async { + func runJob(_ queuedJob: HBQueuedJob) async throws { var logger = logger logger[metadataKey: "hb_job_id"] = .stringConvertible(queuedJob.id) - logger[metadataKey: "hb_job_type"] = .string(String(describing: type(of: queuedJob.job))) + let job: any HBJob + do { + job = try self.jobRegistry.decode(queuedJob.jobBuffer) + } catch let error as JobQueueError where error == .unrecognisedJobId { + logger.debug("Failed to find Job with ID while decoding") + try await self.queue.failed(jobId: queuedJob.id, error: error) + return + } catch { + logger.debug("Job failed to decode") + try await self.queue.failed(jobId: queuedJob.id, error: JobQueueError.decodeJobFailed) + return + } + logger[metadataKey: "hb_job_type"] = .string(job.name) - let job = queuedJob.job - var count = type(of: job).maxRetryCount + var count = job.maxRetryCount logger.debug("Starting Job") do { while true { do { - try await job.execute(logger: self.logger) + try await job.execute(context: .init(logger: logger)) break } catch let error as CancellationError { logger.debug("Job cancelled") @@ -103,6 +111,7 @@ public final class HBJobQueueHandler: Service { } } + private let jobRegistry: HBJobRegistry private let queue: Queue private let numWorkers: Int let logger: Logger diff --git a/Sources/HummingbirdJobs/JobRegistry.swift b/Sources/HummingbirdJobs/JobRegistry.swift new file mode 100644 index 000000000..455749b2f --- /dev/null +++ b/Sources/HummingbirdJobs/JobRegistry.swift @@ -0,0 +1,122 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Hummingbird server framework project +// +// Copyright (c) 2024 the Hummingbird authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Foundation +import NIOConcurrencyHelpers +import NIOCore + +/// Registry for job types +struct HBJobRegistry: Sendable { + /// Register job + /// - Parameters: + /// - id: Job Identifier + /// - maxRetryCount: Maximum number of times job is retried before being flagged as failed + /// - execute: Job code + public func registerJob( + job: HBJobDefinition + ) { + let builder: @Sendable (Decoder) throws -> any HBJob = { decoder in + let parameters = try Parameters(from: decoder) + return try HBJobInstance(job: job, parameters: parameters) + } + self.builderTypeMap.withLockedValue { + precondition($0[job.id.name] == nil, "There is a job already registered under id \"\(job.id.name)\"") + $0[job.id.name] = builder + } + } + + func decode(_ buffer: ByteBuffer) throws -> any HBJob { + return try JSONDecoder().decode(HBAnyCodableJob.self, from: buffer, userInfoConfiguration: self).job + } + + func decode(from decoder: Decoder) throws -> any HBJob { + let container = try decoder.container(keyedBy: _HBJobCodingKey.self) + let key = container.allKeys.first! + let childDecoder = try container.superDecoder(forKey: key) + let jobDefinitionBuilder = try self.builderTypeMap.withLockedValue { + guard let job = $0[key.stringValue] else { throw JobQueueError.unrecognisedJobId } + return job + } + return try jobDefinitionBuilder(childDecoder) + } + + let builderTypeMap: NIOLockedValueBox < [String: @Sendable (Decoder) throws -> any HBJob]> = .init([:]) +} + +/// Internal job instance type +internal struct HBJobInstance: HBJob { + /// job definition + let job: HBJobDefinition + /// job parameters + let parameters: Parameters + + /// get i + var id: HBJobIdentifier { self.job.id } + var maxRetryCount: Int { self.job.maxRetryCount } + + func execute(context: HBJobContext) async throws { + try await self.job.execute(self.parameters, context: context) + } + + init(job: HBJobDefinition, parameters: Parameters) throws { + self.job = job + self.parameters = parameters + } +} + +/// Add codable support for decoding/encoding any HBJob +internal struct HBAnyCodableJob: DecodableWithUserInfoConfiguration, Sendable { + typealias DecodingConfiguration = HBJobRegistry + + init(from decoder: Decoder, configuration register: DecodingConfiguration) throws { + self.job = try register.decode(from: decoder) + } + + /// Job data + let job: any HBJob + + /// Initialize a queue job + init(_ job: any HBJob) { + self.job = job + } + + private enum CodingKeys: String, CodingKey { + case job + } +} + +internal struct _HBJobCodingKey: CodingKey { + public var stringValue: String + public var intValue: Int? + + public init?(stringValue: String) { + self.stringValue = stringValue + self.intValue = nil + } + + public init?(intValue: Int) { + self.stringValue = "\(intValue)" + self.intValue = intValue + } + + public init(stringValue: String, intValue: Int?) { + self.stringValue = stringValue + self.intValue = intValue + } + + internal init(index: Int) { + self.stringValue = "Index \(index)" + self.intValue = index + } +} diff --git a/Sources/HummingbirdJobs/MemoryJobQueue.swift b/Sources/HummingbirdJobs/MemoryJobQueue.swift index b565ce8b5..e17d86eb4 100644 --- a/Sources/HummingbirdJobs/MemoryJobQueue.swift +++ b/Sources/HummingbirdJobs/MemoryJobQueue.swift @@ -2,7 +2,7 @@ // // This source file is part of the Hummingbird server framework project // -// Copyright (c) 2021-2021 the Hummingbird authors +// Copyright (c) 2021-2024 the Hummingbird authors // Licensed under Apache License v2.0 // // See LICENSE.txt for license information @@ -14,9 +14,10 @@ import Collections import Foundation +import NIOCore -/// In memory implementation of job queue driver. Stores jobs in a circular buffer -public final class HBMemoryJobQueue: HBJobQueue { +/// In memory implementation of job queue driver. Stores job data in a circular buffer +public final class HBMemoryQueue: HBJobQueueDriver { public typealias Element = HBQueuedJob public typealias JobID = UUID @@ -45,8 +46,8 @@ public final class HBMemoryJobQueue: HBJobQueue { /// - job: Job /// - eventLoop: Eventloop to run process on (ignored in this case) /// - Returns: Queued job - @discardableResult public func push(_ job: HBJob) async throws -> JobID { - try await self.queue.push(job) + @discardableResult public func push(_ buffer: ByteBuffer) async throws -> JobID { + return try await self.queue.push(buffer) } public func finished(jobId: JobID) async throws { @@ -55,14 +56,14 @@ public final class HBMemoryJobQueue: HBJobQueue { public func failed(jobId: JobID, error: any Error) async throws { if let job = await self.queue.clearAndReturnPendingJob(jobId: jobId) { - self.onFailedJob(.init(id: jobId, job: job), error) + self.onFailedJob(.init(id: jobId, jobBuffer: job), error) } } /// Internal actor managing the job queue fileprivate actor Internal { - var queue: Deque - var pendingJobs: [JobID: HBJob] + var queue: Deque> + var pendingJobs: [JobID: ByteBuffer] var isStopped: Bool init() { @@ -71,18 +72,17 @@ public final class HBMemoryJobQueue: HBJobQueue { self.pendingJobs = .init() } - func push(_ job: HBJob) throws -> JobID { - let queuedJob = HBQueuedJob(id: JobID(), job: job) - let jsonData = try JSONEncoder().encode(queuedJob) - self.queue.append(jsonData) - return queuedJob.id + func push(_ jobBuffer: ByteBuffer) throws -> JobID { + let id = JobID() + self.queue.append(HBQueuedJob(id: id, jobBuffer: jobBuffer)) + return id } func clearPendingJob(jobId: JobID) { self.pendingJobs[jobId] = nil } - func clearAndReturnPendingJob(jobId: JobID) -> HBJob? { + func clearAndReturnPendingJob(jobId: JobID) -> ByteBuffer? { let instance = self.pendingJobs[jobId] self.pendingJobs[jobId] = nil return instance @@ -93,14 +93,9 @@ public final class HBMemoryJobQueue: HBJobQueue { if self.isStopped { return nil } - if let data = queue.popFirst() { - do { - let job = try JSONDecoder().decode(HBQueuedJob.self, from: data) - self.pendingJobs[job.id] = job.job - return job - } catch { - throw JobQueueError.decodeJobFailed - } + if let request = queue.popFirst() { + self.pendingJobs[request.id] = request.jobBuffer + return request } try await Task.sleep(for: .milliseconds(100)) } @@ -117,7 +112,7 @@ public final class HBMemoryJobQueue: HBJobQueue { } } -extension HBMemoryJobQueue { +extension HBMemoryQueue { public struct AsyncIterator: AsyncIteratorProtocol { fileprivate let queue: Internal @@ -130,3 +125,12 @@ extension HBMemoryJobQueue { .init(queue: self.queue) } } + +extension HBJobQueueDriver where Self == HBMemoryQueue { + /// Return In memory driver for Job Queue + /// - Parameters: + /// - onFailedJob: Closure called when a job fails + public static var memory: HBMemoryQueue { + .init() + } +} diff --git a/Sources/HummingbirdJobs/QueuedJob.swift b/Sources/HummingbirdJobs/QueuedJob.swift index 097670d0e..62e59d86c 100644 --- a/Sources/HummingbirdJobs/QueuedJob.swift +++ b/Sources/HummingbirdJobs/QueuedJob.swift @@ -2,7 +2,7 @@ // // This source file is part of the Hummingbird server framework project // -// Copyright (c) 2021-2023 the Hummingbird authors +// Copyright (c) 2021-2024 the Hummingbird authors // Licensed under Apache License v2.0 // // See LICENSE.txt for license information @@ -13,50 +13,18 @@ //===----------------------------------------------------------------------===// import Foundation +import NIOCore -/// Add codable support for decoding/encoding any HBJob -public struct HBAnyCodableJob: Codable, Sendable { - /// Job data - public let job: HBJob - - /// Initialize a queue job - public init(_ job: HBJob) { - self.job = job - } - - public init(from decoder: Decoder) throws { - let container = try decoder.container(keyedBy: CodingKeys.self) - let jobDecoder = try container.superDecoder(forKey: .job) - self.job = try HBJobRegister.decode(from: jobDecoder) - } - - public func encode(to encoder: Encoder) throws { - var container = encoder.container(keyedBy: CodingKeys.self) - let jobEncoder = container.superEncoder(forKey: .job) - try HBJobRegister.encode(job: self.job, to: jobEncoder) - } - - private enum CodingKeys: String, CodingKey { - case job - } -} - -/// Queued job. Includes job, plus the id for the job +/// Queued job. Includes job data, plus the id for the job public struct HBQueuedJob: Sendable { - /// Job id + /// Job instance id public let id: JobID /// Job data - private let _job: HBAnyCodableJob - /// Job data - public var job: HBJob { self._job.job } - /// Job data in a codable form - public var anyCodableJob: HBAnyCodableJob { self._job } + public let jobBuffer: ByteBuffer /// Initialize a queue job - public init(id: JobID, job: HBJob) { - self._job = .init(job) + public init(id: JobID, jobBuffer: ByteBuffer) { + self.jobBuffer = jobBuffer self.id = id } } - -extension HBQueuedJob: Codable where JobID: Codable {} diff --git a/Sources/HummingbirdJobs/Utils/DecodableWithUserInfoConfiguration.swift b/Sources/HummingbirdJobs/Utils/DecodableWithUserInfoConfiguration.swift new file mode 100644 index 000000000..9747de690 --- /dev/null +++ b/Sources/HummingbirdJobs/Utils/DecodableWithUserInfoConfiguration.swift @@ -0,0 +1,51 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Hummingbird server framework project +// +// Copyright (c) 2024 the Hummingbird authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Foundation +import NIOCore +import NIOFoundationCompat + +/// Implementation of DecodableWithConfiguration which extracts the configuration from the userInfo array +/// +/// This is necessary as Linux Foundation does not have support for setting DecodableWithConfiguration +/// configuration from the JSONDecoder +protocol DecodableWithUserInfoConfiguration: Decodable, DecodableWithConfiguration {} + +/// Implement `init(from: Decoder)`` by extracting configuration from the userInfo dictionary. +extension DecodableWithUserInfoConfiguration { + init(from decoder: Decoder) throws { + guard let configuration = decoder.userInfo[.configuration] as? DecodingConfiguration else { + throw DecodingError.valueNotFound(DecodingConfiguration.self, .init(codingPath: decoder.codingPath, debugDescription: "Failed to find Decoding configuration")) + } + try self.init(from: decoder, configuration: configuration) + } +} + +extension CodingUserInfoKey { + /// Coding UserInfo key used to store DecodableWithUserInfoConfiguration configuration + static var configuration: Self { return .init(rawValue: "_configuration_")! } +} + +extension JSONDecoder { + /// Version of JSONDecoder that sets up configuration userInfo for the DecodableWithUserInfoConfiguration + /// protocol + func decode( + _ type: T.Type, + from buffer: ByteBuffer, + userInfoConfiguration: T.DecodingConfiguration + ) throws -> T where T: DecodableWithUserInfoConfiguration { + self.userInfo[.configuration] = userInfoConfiguration + return try self.decode(type, from: buffer) + } +} diff --git a/Tests/HummingbirdJobsTests/HummingbirdJobsTests.swift b/Tests/HummingbirdJobsTests/HummingbirdJobsTests.swift index 2811c9819..ef56cedf1 100644 --- a/Tests/HummingbirdJobsTests/HummingbirdJobsTests.swift +++ b/Tests/HummingbirdJobsTests/HummingbirdJobsTests.swift @@ -2,7 +2,7 @@ // // This source file is part of the Hummingbird server framework project // -// Copyright (c) 2021-2021 the Hummingbird authors +// Copyright (c) 2021-2024 the Hummingbird authors // Licensed under Apache License v2.0 // // See LICENSE.txt for license information @@ -15,6 +15,8 @@ import Atomics import HummingbirdJobs import HummingbirdXCT +import Logging +import NIOConcurrencyHelpers import ServiceLifecycle import XCTest @@ -27,7 +29,7 @@ extension XCTestExpectation { final class HummingbirdJobsTests: XCTestCase { func wait(for expectations: [XCTestExpectation], timeout: TimeInterval) async { - #if (os(Linux) && swift(<5.10)) || swift(<5.8) + #if (os(Linux) && swift(<5.9)) || swift(<5.8) super.wait(for: expectations, timeout: timeout) #else await fulfillment(of: expectations, timeout: timeout) @@ -39,13 +41,13 @@ final class HummingbirdJobsTests: XCTestCase { /// Creates test client, runs test function abd ensures everything is /// shutdown correctly public func testJobQueue( - _ jobQueueHandler: HBJobQueueHandler, + _ jobQueue: Service, _ test: () async throws -> Void ) async throws { try await withThrowingTaskGroup(of: Void.self) { group in let serviceGroup = ServiceGroup( configuration: .init( - services: [jobQueueHandler], + services: [jobQueue], gracefulShutdownSignals: [.sigterm, .sigint], logger: Logger(label: "JobQueueService") ) @@ -59,158 +61,133 @@ final class HummingbirdJobsTests: XCTestCase { } func testBasic() async throws { - struct TestJob: HBJob { - static let name = "testBasic" - static let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 10) - - let value: Int - func execute(logger: Logger) async throws { - print(self.value) - try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50))) - Self.expectation.fulfill() - } + let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 10) + let jobQueue = HBJobQueue(.memory, numWorkers: 1, logger: Logger(label: "HummingbirdJobsTests")) + let job = HBJobDefinition(id: "testBasic") { (parameters: Int, context) in + context.logger.info("Parameters=\(parameters)") + try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50))) + expectation.fulfill() } - TestJob.register() - let jobQueue = HBMemoryJobQueue() - let jobQueueHandler = HBJobQueueHandler( - queue: jobQueue, - numWorkers: 1, - logger: Logger(label: "HummingbirdJobsTests") - ) - try await testJobQueue(jobQueueHandler) { - try await jobQueue.push(TestJob(value: 1)) - try await jobQueue.push(TestJob(value: 2)) - try await jobQueue.push(TestJob(value: 3)) - try await jobQueue.push(TestJob(value: 4)) - try await jobQueue.push(TestJob(value: 5)) - try await jobQueue.push(TestJob(value: 6)) - try await jobQueue.push(TestJob(value: 7)) - try await jobQueue.push(TestJob(value: 8)) - try await jobQueue.push(TestJob(value: 9)) - try await jobQueue.push(TestJob(value: 10)) - - await self.wait(for: [TestJob.expectation], timeout: 5) + jobQueue.registerJob(job) + try await self.testJobQueue(jobQueue) { + try await jobQueue.push(id: job.id, parameters: 1) + try await jobQueue.push(id: job.id, parameters: 2) + try await jobQueue.push(id: job.id, parameters: 3) + try await jobQueue.push(id: job.id, parameters: 4) + try await jobQueue.push(id: job.id, parameters: 5) + try await jobQueue.push(id: job.id, parameters: 6) + try await jobQueue.push(id: job.id, parameters: 7) + try await jobQueue.push(id: job.id, parameters: 8) + try await jobQueue.push(id: job.id, parameters: 9) + try await jobQueue.push(id: job.id, parameters: 10) + + await self.wait(for: [expectation], timeout: 5) } } func testMultipleWorkers() async throws { - struct TestJob: HBJob { - static let name = "testBasic" - static let runningJobCounter = ManagedAtomic(0) - static let maxRunningJobCounter = ManagedAtomic(0) - static let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 10) - - let value: Int - func execute(logger: Logger) async throws { - let runningJobs = Self.runningJobCounter.wrappingIncrementThenLoad(by: 1, ordering: .relaxed) - if runningJobs > Self.maxRunningJobCounter.load(ordering: .relaxed) { - Self.maxRunningJobCounter.store(runningJobs, ordering: .relaxed) - } - try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50))) - print(self.value) - Self.expectation.fulfill() - Self.runningJobCounter.wrappingDecrement(by: 1, ordering: .relaxed) + let jobIdentifer = HBJobIdentifier(#function) + let runningJobCounter = ManagedAtomic(0) + let maxRunningJobCounter = ManagedAtomic(0) + let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 10) + + let jobQueue = HBJobQueue(.memory, numWorkers: 4, logger: Logger(label: "HummingbirdJobsTests")) + jobQueue.registerJob(jobIdentifer) { parameters, context in + let runningJobs = runningJobCounter.wrappingIncrementThenLoad(by: 1, ordering: .relaxed) + if runningJobs > maxRunningJobCounter.load(ordering: .relaxed) { + maxRunningJobCounter.store(runningJobs, ordering: .relaxed) } + try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50))) + context.logger.info("Parameters=\(parameters)") + expectation.fulfill() + runningJobCounter.wrappingDecrement(by: 1, ordering: .relaxed) } - TestJob.register() + try await self.testJobQueue(jobQueue) { + try await jobQueue.push(id: jobIdentifer, parameters: 1) + try await jobQueue.push(id: jobIdentifer, parameters: 2) + try await jobQueue.push(id: jobIdentifer, parameters: 3) + try await jobQueue.push(id: jobIdentifer, parameters: 4) + try await jobQueue.push(id: jobIdentifer, parameters: 5) + try await jobQueue.push(id: jobIdentifer, parameters: 6) + try await jobQueue.push(id: jobIdentifer, parameters: 7) + try await jobQueue.push(id: jobIdentifer, parameters: 8) + try await jobQueue.push(id: jobIdentifer, parameters: 9) + try await jobQueue.push(id: jobIdentifer, parameters: 10) - let jobQueue = HBMemoryJobQueue() - let jobQueueHandler = HBJobQueueHandler( - queue: jobQueue, - numWorkers: 4, - logger: Logger(label: "HummingbirdJobsTests") - ) - try await testJobQueue(jobQueueHandler) { - try await jobQueue.push(TestJob(value: 1)) - try await jobQueue.push(TestJob(value: 2)) - try await jobQueue.push(TestJob(value: 3)) - try await jobQueue.push(TestJob(value: 4)) - try await jobQueue.push(TestJob(value: 5)) - try await jobQueue.push(TestJob(value: 6)) - try await jobQueue.push(TestJob(value: 7)) - try await jobQueue.push(TestJob(value: 8)) - try await jobQueue.push(TestJob(value: 9)) - try await jobQueue.push(TestJob(value: 10)) - - await self.wait(for: [TestJob.expectation], timeout: 5) - - XCTAssertGreaterThan(TestJob.maxRunningJobCounter.load(ordering: .relaxed), 1) - XCTAssertLessThanOrEqual(TestJob.maxRunningJobCounter.load(ordering: .relaxed), 4) + await self.wait(for: [expectation], timeout: 5) + + XCTAssertGreaterThan(maxRunningJobCounter.load(ordering: .relaxed), 1) + XCTAssertLessThanOrEqual(maxRunningJobCounter.load(ordering: .relaxed), 4) } } func testErrorRetryCount() async throws { + let jobIdentifer = HBJobIdentifier(#function) + let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 4) let failedJobCount = ManagedAtomic(0) struct FailedError: Error {} - - struct TestJob: HBJob { - static let name = "testErrorRetryCount" - static let maxRetryCount = 3 - static let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 4) - func execute(logger: Logger) async throws { - Self.expectation.fulfill() - throw FailedError() - } - } - TestJob.register() var logger = Logger(label: "HummingbirdJobsTests") logger.logLevel = .trace - let jobQueue = HBMemoryJobQueue { _, _ in failedJobCount.wrappingIncrement(by: 1, ordering: .relaxed) } - let jobQueueHandler = HBJobQueueHandler( - queue: jobQueue, - numWorkers: 4, + let jobQueue = HBJobQueue( + HBMemoryQueue { _, _ in failedJobCount.wrappingIncrement(by: 1, ordering: .relaxed) }, logger: logger ) - try await testJobQueue(jobQueueHandler) { - try await jobQueue.push(TestJob()) + jobQueue.registerJob(jobIdentifer, maxRetryCount: 3) { _, _ in + expectation.fulfill() + throw FailedError() + } + try await self.testJobQueue(jobQueue) { + try await jobQueue.push(id: jobIdentifer, parameters: 0) - await self.wait(for: [TestJob.expectation], timeout: 5) + await self.wait(for: [expectation], timeout: 5) } XCTAssertEqual(failedJobCount.load(ordering: .relaxed), 1) } - func testJobSerialization() throws { - struct TestJob: HBJob, Equatable { - static let name = "testJobSerialization" - let value: Int - func execute(logger: Logger) async throws {} - } - TestJob.register() - let job = TestJob(value: 2) - let codableJob = HBAnyCodableJob(job) - let data = try JSONEncoder().encode(codableJob) - let codableJob2 = try JSONDecoder().decode(HBAnyCodableJob.self, from: data) - XCTAssertEqual(codableJob2.job as? TestJob, job) + func testJobSerialization() async throws { + struct TestJobParameters: Codable { + let id: Int + let message: String + } + let expectation = XCTestExpectation(description: "TestJob.execute was called") + let jobIdentifer = HBJobIdentifier(#function) + let jobQueue = HBJobQueue(.memory, numWorkers: 1, logger: Logger(label: "HummingbirdJobsTests")) + jobQueue.registerJob(jobIdentifer) { parameters, _ in + XCTAssertEqual(parameters.id, 23) + XCTAssertEqual(parameters.message, "Hello!") + expectation.fulfill() + } + try await self.testJobQueue(jobQueue) { + try await jobQueue.push(id: jobIdentifer, parameters: .init(id: 23, message: "Hello!")) + + await self.wait(for: [expectation], timeout: 5) + } } /// Test job is cancelled on shutdown func testShutdownJob() async throws { - struct TestJob: HBJob { - static let name = "testShutdownJob" - static let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 1) - func execute(logger: Logger) async throws { - Self.expectation.fulfill() - try await Task.sleep(for: .milliseconds(1000)) - } - } - TestJob.register() + let jobIdentifer = HBJobIdentifier(#function) + let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 1) let cancelledJobCount = ManagedAtomic(0) var logger = Logger(label: "HummingbirdJobsTests") logger.logLevel = .trace - let jobQueue = HBMemoryJobQueue { _, error in - if error is CancellationError { - cancelledJobCount.wrappingIncrement(by: 1, ordering: .relaxed) - } - } - let jobQueueHandler = HBJobQueueHandler( - queue: jobQueue, + let jobQueue = HBJobQueue( + HBMemoryQueue { _, error in + if error is CancellationError { + cancelledJobCount.wrappingIncrement(by: 1, ordering: .relaxed) + } + }, numWorkers: 4, logger: logger ) - try await testJobQueue(jobQueueHandler) { - try await jobQueue.push(TestJob()) - await self.wait(for: [TestJob.expectation], timeout: 5) + jobQueue.registerJob(jobIdentifer) { _, _ in + expectation.fulfill() + try await Task.sleep(for: .milliseconds(1000)) + } + try await self.testJobQueue(jobQueue) { + try await jobQueue.push(id: jobIdentifer, parameters: 0) + await self.wait(for: [expectation], timeout: 5) } XCTAssertEqual(cancelledJobCount.load(ordering: .relaxed), 1) @@ -218,33 +195,68 @@ final class HummingbirdJobsTests: XCTestCase { /// test job fails to decode but queue continues to process func testFailToDecode() async throws { - struct TestJob1: HBJob { - static let name = "testFailToDecode" - func execute(logger: Logger) async throws {} - } - struct TestJob2: HBJob { - static let name = "testFailToDecode" - static var value: String? - let value: String - func execute(logger: Logger) async throws { - Self.value = self.value - } + let string: NIOLockedValueBox = .init("") + let jobIdentifer1 = HBJobIdentifier(#function) + let jobIdentifer2 = HBJobIdentifier(#function) + let expectation = XCTestExpectation(description: "job was called", expectedFulfillmentCount: 1) + + var logger = Logger(label: "HummingbirdJobsTests") + logger.logLevel = .debug + let jobQueue = HBJobQueue(.memory, numWorkers: 1, logger: Logger(label: "HummingbirdJobsTests")) + jobQueue.registerJob(jobIdentifer2) { parameters, _ in + string.withLockedValue { $0 = parameters } + expectation.fulfill() + } + try await self.testJobQueue(jobQueue) { + try await jobQueue.push(id: jobIdentifer1, parameters: 2) + try await jobQueue.push(id: jobIdentifer2, parameters: "test") + await self.wait(for: [expectation], timeout: 5) } - TestJob2.register() + string.withLockedValue { + XCTAssertEqual($0, "test") + } + } - let jobQueue = HBMemoryJobQueue() - let jobQueueHandler = HBJobQueueHandler( - queue: jobQueue, - numWorkers: 1, - logger: Logger(label: "HummingbirdJobsTests") - ) - try await testJobQueue(jobQueueHandler) { - try await jobQueue.push(TestJob1()) - try await jobQueue.push(TestJob2(value: "test")) - // stall to give job chance to start running - try await Task.sleep(for: .milliseconds(500)) + func testMultipleJobQueueHandlers() async throws { + let jobIdentifer = HBJobIdentifier(#function) + let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 200) + let job = HBJobDefinition(id: jobIdentifer) { parameters, context in + context.logger.info("Parameters=\(parameters)") + try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50))) + expectation.fulfill() } + let logger = { + var logger = Logger(label: "HummingbirdJobsTests") + logger.logLevel = .debug + return logger + }() + let jobQueue = HBJobQueue(.memory, numWorkers: 2, logger: Logger(label: "HummingbirdJobsTests")) + jobQueue.registerJob(job) + let jobQueue2 = HBJobQueue(.memory, numWorkers: 1, logger: Logger(label: "HummingbirdJobsTests")) + jobQueue2.registerJob(job) - XCTAssertEqual(TestJob2.value, "test") + try await withThrowingTaskGroup(of: Void.self) { group in + let serviceGroup = ServiceGroup( + configuration: .init( + services: [jobQueue, jobQueue2], + gracefulShutdownSignals: [.sigterm, .sigint], + logger: logger + ) + ) + group.addTask { + try await serviceGroup.run() + } + do { + for i in 0..<200 { + try await jobQueue.push(id: jobIdentifer, parameters: i) + } + await self.wait(for: [expectation], timeout: 5) + await serviceGroup.triggerGracefulShutdown() + } catch { + XCTFail("\(String(reflecting: error))") + await serviceGroup.triggerGracefulShutdown() + throw error + } + } } }