Skip to content

Commit

Permalink
Jobs Refactor (#391)
Browse files Browse the repository at this point in the history
* Register jobs with an id and a closure

* Add precondition to check job isn't registered twice

* Move job registry inside queue handler

* HBJobQueue is now a struct containing a driver and handler

* comments

* Add DecodableWithUserInfoConfiguration

* Init HBJobIdentifier from string literal

* Make HBJobDefinition.id public, Rename HBJobRegister -> HBJobRegistry

* Remove Hummingbird dependency from HummingbirdJobs

* Add HBJobQueueDriver.memory

* Update preprocessor check in HummingbirdJobsTests.wait

* Use ByteBuffer instead of Data

* Add dependent modules to HummingbirdJobs
  • Loading branch information
adam-fowler committed Mar 5, 2024
1 parent 5928003 commit b5fd7d4
Show file tree
Hide file tree
Showing 14 changed files with 619 additions and 335 deletions.
8 changes: 7 additions & 1 deletion Package.swift
Expand Up @@ -21,6 +21,7 @@ let package = Package(
dependencies: [
.package(url: "https://github.com/apple/swift-async-algorithms.git", from: "1.0.0"),
.package(url: "https://github.com/apple/swift-atomics.git", from: "1.0.0"),
.package(url: "https://github.com/apple/swift-collections.git", from: "1.0.0"),
.package(url: "https://github.com/apple/swift-log.git", from: "1.4.0"),
.package(url: "https://github.com/apple/swift-http-types.git", from: "1.0.0"),
.package(url: "https://github.com/apple/swift-metrics.git", "1.0.0"..<"3.0.0"),
Expand Down Expand Up @@ -70,8 +71,12 @@ let package = Package(
.target(
name: "HummingbirdJobs",
dependencies: [
.byName(name: "Hummingbird"),
.product(name: "Collections", package: "swift-collections"),
.product(name: "Logging", package: "swift-log"),
.product(name: "NIOConcurrencyHelpers", package: "swift-nio"),
.product(name: "NIOCore", package: "swift-nio"),
.product(name: "NIOFoundationCompat", package: "swift-nio"),
.product(name: "ServiceLifecycle", package: "swift-service-lifecycle"),
],
swiftSettings: swiftSettings
),
Expand Down Expand Up @@ -138,6 +143,7 @@ let package = Package(
.testTarget(name: "HummingbirdJobsTests", dependencies: [
.byName(name: "HummingbirdJobs"),
.byName(name: "HummingbirdXCT"),
.product(name: "Atomics", package: "swift-atomics"),
]),
.testTarget(name: "HummingbirdRouterTests", dependencies: [
.byName(name: "HummingbirdRouter"),
Expand Down
90 changes: 14 additions & 76 deletions Sources/HummingbirdJobs/Job.swift
Expand Up @@ -2,7 +2,7 @@
//
// This source file is part of the Hummingbird server framework project
//
// Copyright (c) 2021-2023 the Hummingbird authors
// Copyright (c) 2024 the Hummingbird authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
Expand All @@ -12,83 +12,21 @@
//
//===----------------------------------------------------------------------===//

import Foundation
import Logging
import NIOConcurrencyHelpers
import NIOCore

/// Protocol for job description
///
/// For a job to be decodable, it has to be registered. Call `MyJob.register()` to register a job.
public protocol HBJob: Codable, Sendable {
/// Unique Job name
static var name: String { get }

/// Maximum times this job should be retried if it fails
static var maxRetryCount: Int { get }

/// Execute job
/// - Returns: EventLoopFuture that is fulfulled when job is done
func execute(logger: Logger) async throws
/// Protocol for a Job
public protocol HBJob: Sendable {
/// Parameters job requries
associatedtype Parameters: Codable & Sendable
/// Job Type identifier
var id: HBJobIdentifier<Parameters> { get }
/// Maximum number of times a job will be retried before being classed as failed
var maxRetryCount: Int { get }
/// Function to execute the job
func execute(context: HBJobContext) async throws
}

extension HBJob {
/// maximum times this job should be retried
public static var maxRetryCount: Int { return 0 }

/// register job
public static func register() {
HBJobRegister.register(job: Self.self)
}
}

/// Register Jobs, for decoding and encoding
enum HBJobRegister {
static func decode(from decoder: Decoder) throws -> HBJob {
let container = try decoder.container(keyedBy: _HBJobCodingKey.self)
let key = container.allKeys.first!
let childDecoder = try container.superDecoder(forKey: key)
let jobType = try HBJobRegister.nameTypeMap.withLockedValue {
guard let job = $0[key.stringValue] else { throw JobQueueError.decodeJobFailed }
return job
}
return try jobType.init(from: childDecoder)
}

static func encode(job: HBJob, to encoder: Encoder) throws {
var container = encoder.container(keyedBy: _HBJobCodingKey.self)
let childEncoder = container.superEncoder(forKey: .init(stringValue: type(of: job).name, intValue: nil))
try job.encode(to: childEncoder)
}

static func register(job: HBJob.Type) {
self.nameTypeMap.withLockedValue { $0[job.name] = job }
}

static let nameTypeMap: NIOLockedValueBox<[String: HBJob.Type]> = .init([:])
}

internal struct _HBJobCodingKey: CodingKey {
public var stringValue: String
public var intValue: Int?

public init?(stringValue: String) {
self.stringValue = stringValue
self.intValue = nil
}

public init?(intValue: Int) {
self.stringValue = "\(intValue)"
self.intValue = intValue
}

public init(stringValue: String, intValue: Int?) {
self.stringValue = stringValue
self.intValue = intValue
}

internal init(index: Int) {
self.stringValue = "Index \(index)"
self.intValue = index
/// name of job type
public var name: String {
id.name
}
}
Expand Up @@ -2,7 +2,7 @@
//
// This source file is part of the Hummingbird server framework project
//
// Copyright (c) 2021-2021 the Hummingbird authors
// Copyright (c) 2024 the Hummingbird authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
Expand All @@ -12,4 +12,8 @@
//
//===----------------------------------------------------------------------===//

@_exported @_documentation(visibility: internal) import struct Logging.Logger
import Logging

public struct HBJobContext {
public let logger: Logger
}
30 changes: 30 additions & 0 deletions Sources/HummingbirdJobs/JobDefinition.swift
@@ -0,0 +1,30 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Hummingbird server framework project
//
// Copyright (c) 2024 the Hummingbird authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

/// Job definition type
public struct HBJobDefinition<Parameters: Codable & Sendable>: Sendable {
public let id: HBJobIdentifier<Parameters>
let maxRetryCount: Int
let _execute: @Sendable (Parameters, HBJobContext) async throws -> Void

public init(id: HBJobIdentifier<Parameters>, maxRetryCount: Int = 0, execute: @escaping @Sendable (Parameters, HBJobContext) async throws -> Void) {
self.id = id
self.maxRetryCount = maxRetryCount
self._execute = execute
}

func execute(_ parameters: Parameters, context: HBJobContext) async throws {
try await self._execute(parameters, context)
}
}
42 changes: 42 additions & 0 deletions Sources/HummingbirdJobs/JobIdentifier.swift
@@ -0,0 +1,42 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Hummingbird server framework project
//
// Copyright (c) 2024 the Hummingbird authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

/// Identifier for a Job type
///
/// The identifier includes the type of the parameters required by the job to ensure
/// the wrong parameters are not passed to this job
///
/// Extend this type to include your own job identifiers
/// ```
/// extension HBJobIdentifier<String> {
/// static var myJob: Self { .init("my-job") }
/// }
/// ```
public struct HBJobIdentifier<Parameters>: Sendable, Hashable, ExpressibleByStringLiteral {
let name: String
/// Initialize a HBJobIdentifier
///
/// - Parameters:
/// - name: Unique name for identifier
/// - parameters: Parameter type associated with Job
public init(_ name: String, parameters: Parameters.Type = Parameters.self) { self.name = name }

/// Initialize a HBJobIdentifier from a string literal
///
/// This can only be used in a situation where the Parameter type is defined elsewhere
/// - Parameter string:
public init(stringLiteral string: String) {
self.name = string
}
}
94 changes: 73 additions & 21 deletions Sources/HummingbirdJobs/JobQueue.swift
Expand Up @@ -2,7 +2,7 @@
//
// This source file is part of the Hummingbird server framework project
//
// Copyright (c) 2021-2023 the Hummingbird authors
// Copyright (c) 2021-2024 the Hummingbird authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
Expand All @@ -13,31 +13,83 @@
//===----------------------------------------------------------------------===//

import Foundation
import Hummingbird
import Logging
import NIOCore
import NIOFoundationCompat
import ServiceLifecycle

/// Job queue protocol.
/// Job queue
///
/// Defines how to push and pop jobs off a queue
public protocol HBJobQueue: AsyncSequence, Sendable where Element == HBQueuedJob<JobID> {
associatedtype JobID: CustomStringConvertible & Sendable
/// Wrapper type to bring together a job queue implementation and a job queue
/// handler. Before you can push jobs onto a queue you should register it
/// with the queue via either ``HBJobQueue.registerJob(id:maxRetryCount:execute:)`` or
/// ``HBJobQueue.registerJob(_:)``.
public struct HBJobQueue<Queue: HBJobQueueDriver>: Service {
/// underlying driver for queue
public let queue: Queue
let handler: HBJobQueueHandler<Queue>
let allocator: ByteBufferAllocator

/// Called when JobQueueHandler is initialised with this queue
func onInit() async throws
/// Push Job onto queue
public init(_ queue: Queue, numWorkers: Int = 1, logger: Logger) {
self.queue = queue
self.handler = .init(queue: queue, numWorkers: numWorkers, logger: logger)
self.allocator = .init()
}

/// Push Job onto queue
/// - Parameters:
/// - id: Job identifier
/// - parameters: parameters for the job
/// - Returns: Identifier of queued job
@discardableResult func push(_ job: HBJob) async throws -> JobID
/// This is called to say job has finished processing and it can be deleted
func finished(jobId: JobID) async throws
/// This is called to say job has failed to run and should be put aside
func failed(jobId: JobID, error: any Error) async throws
/// stop serving jobs
func stop() async
/// shutdown queue
func shutdownGracefully() async
@discardableResult public func push<Parameters: Codable & Sendable>(id: HBJobIdentifier<Parameters>, parameters: Parameters) async throws -> Queue.JobID {
let jobRequest = HBJobRequest(id: id, parameters: parameters)
let buffer = try JSONEncoder().encodeAsByteBuffer(jobRequest, allocator: self.allocator)
return try await self.queue.push(buffer)
}

/// Register job type
/// - Parameters:
/// - id: Job Identifier
/// - maxRetryCount: Maximum number of times job is retried before being flagged as failed
/// - execute: Job code
public func registerJob<Parameters: Codable & Sendable>(
_ id: HBJobIdentifier<Parameters>,
maxRetryCount: Int = 0,
execute: @escaping @Sendable (
Parameters,
HBJobContext
) async throws -> Void
) {
let job = HBJobDefinition<Parameters>(id: id, maxRetryCount: maxRetryCount, execute: execute)
self.registerJob(job)
}

/// Register job type
/// - Parameters:
/// - job: Job definition
public func registerJob(_ job: HBJobDefinition<some Codable & Sendable>) {
self.handler.registerJob(job)
}

/// Run queue handler
public func run() async throws {
try await self.handler.run()
}
}

extension HBJobQueue {
// default version of onInit doing nothing
public func onInit() async throws {}
/// Type used internally to encode a request
struct HBJobRequest<Parameters: Codable & Sendable>: Encodable, Sendable {
let id: HBJobIdentifier<Parameters>
let parameters: Parameters

public init(id: HBJobIdentifier<Parameters>, parameters: Parameters) {
self.id = id
self.parameters = parameters
}

public func encode(to encoder: Encoder) throws {
var container = encoder.container(keyedBy: _HBJobCodingKey.self)
let childEncoder = container.superEncoder(forKey: .init(stringValue: self.id.name, intValue: nil))
try self.parameters.encode(to: childEncoder)
}
}
43 changes: 43 additions & 0 deletions Sources/HummingbirdJobs/JobQueueDriver.swift
@@ -0,0 +1,43 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Hummingbird server framework project
//
// Copyright (c) 2021-2024 the Hummingbird authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import Foundation
import Logging
import NIOCore

/// Job queue protocol.
///
/// Defines how to push and pop job data off a queue
public protocol HBJobQueueDriver: AsyncSequence, Sendable where Element == HBQueuedJob<JobID> {
associatedtype JobID: CustomStringConvertible & Sendable

/// Called when JobQueueHandler is initialised with this queue
func onInit() async throws
/// Push Job onto queue
/// - Returns: Identifier of queued job
func push(_ buffer: ByteBuffer) async throws -> JobID
/// This is called to say job has finished processing and it can be deleted
func finished(jobId: JobID) async throws
/// This is called to say job has failed to run and should be put aside
func failed(jobId: JobID, error: any Error) async throws
/// stop serving jobs
func stop() async
/// shutdown queue
func shutdownGracefully() async
}

extension HBJobQueueDriver {
// default version of onInit doing nothing
public func onInit() async throws {}
}

0 comments on commit b5fd7d4

Please sign in to comment.