Skip to content

Commit

Permalink
Changes required by Jobs refactor (#13)
Browse files Browse the repository at this point in the history
* Refactor of job queue

* Use 2.x.x-jobs-refactor branch

* Revert HummingbirdRedisJobsTests.wait change

* Fix comment

* Update testJobQueue

* Replace Data with ByteBuffer in JobsQueue

* Changes for HummingbirdTesting
  • Loading branch information
adam-fowler committed Mar 8, 2024
1 parent 5fd9f31 commit ed1a8a4
Show file tree
Hide file tree
Showing 6 changed files with 232 additions and 250 deletions.
6 changes: 3 additions & 3 deletions Package.swift
Expand Up @@ -11,7 +11,7 @@ let package = Package(
.library(name: "HummingbirdJobsRedis", targets: ["HummingbirdJobsRedis"]),
],
dependencies: [
.package(url: "https://github.com/hummingbird-project/hummingbird.git", from: "2.0.0-alpha.3"),
.package(url: "https://github.com/hummingbird-project/hummingbird.git", branch: "2.x.x"),
.package(url: "https://github.com/swift-server/RediStack.git", from: "1.4.0"),
],
targets: [
Expand All @@ -28,13 +28,13 @@ let package = Package(
.testTarget(name: "HummingbirdRedisTests", dependencies: [
.byName(name: "HummingbirdRedis"),
.product(name: "Hummingbird", package: "hummingbird"),
.product(name: "HummingbirdXCT", package: "hummingbird"),
.product(name: "HummingbirdTesting", package: "hummingbird"),
]),
.testTarget(name: "HummingbirdJobsRedisTests", dependencies: [
.byName(name: "HummingbirdJobsRedis"),
.product(name: "Hummingbird", package: "hummingbird"),
.product(name: "HummingbirdJobs", package: "hummingbird"),
.product(name: "HummingbirdXCT", package: "hummingbird"),
.product(name: "HummingbirdTesting", package: "hummingbird"),
]),
]
)
2 changes: 1 addition & 1 deletion Sources/HummingbirdJobsRedis/Configuration.swift
Expand Up @@ -15,7 +15,7 @@
import NIOCore
import RediStack

extension HBRedisJobQueue {
extension HBRedisQueue {
/// what to do with failed/processing jobs from last time queue was handled
public enum JobInitialization: Sendable {
case doNothing
Expand Down
68 changes: 42 additions & 26 deletions Sources/HummingbirdJobsRedis/RedisJobQueue.swift
Expand Up @@ -22,8 +22,8 @@ import HummingbirdRedis
import NIOCore
import RediStack

/// Redis implementation of job queues
public final class HBRedisJobQueue: HBJobQueue {
/// Redis implementation of job queue driver
public final class HBRedisQueue: HBJobQueueDriver {
public struct JobID: Sendable, CustomStringConvertible {
let id: String

Expand Down Expand Up @@ -73,7 +73,7 @@ public final class HBRedisJobQueue: HBJobQueue {

/// Initialize redis job queue
/// - Parameters:
/// - redisConnectionPoolGroup: Redis connection pool group
/// - redisConnectionPoolService: Redis connection pool
/// - configuration: configuration
public init(_ redisConnectionPoolService: HBRedisConnectionPoolService, configuration: Configuration = .init()) {
self.redisConnectionPool = redisConnectionPoolService
Expand All @@ -92,15 +92,15 @@ public final class HBRedisJobQueue: HBJobQueue {
try await self.initQueue(queueKey: self.configuration.failedQueueKey, onInit: self.configuration.failedJobsInitialization)
}

/// Push Job onto queue
/// Push job data onto queue
/// - Parameters:
/// - job: Job descriptor
/// - Returns: Queued job identifier
@discardableResult public func push(_ job: any HBJob) async throws -> JobID {
let id = JobID()
try await self.set(jobId: id, job: job)
_ = try await self.redisConnectionPool.lpush(id.redisKey, into: self.configuration.queueKey).get()
return id
/// - data: Job data
/// - Returns: Queued job
@discardableResult public func push(_ buffer: ByteBuffer) async throws -> JobID {
let jobInstanceID = JobID()
try await self.set(jobId: jobInstanceID, buffer: buffer)
_ = try await self.redisConnectionPool.lpush(jobInstanceID.redisKey, into: self.configuration.queueKey).get()
return jobInstanceID
}

/// Flag job is done
Expand Down Expand Up @@ -142,8 +142,8 @@ public final class HBRedisJobQueue: HBJobQueue {
throw RedisQueueError.unexpectedRedisKeyType

Check warning on line 142 in Sources/HummingbirdJobsRedis/RedisJobQueue.swift

View check run for this annotation

Codecov / codecov/patch

Sources/HummingbirdJobsRedis/RedisJobQueue.swift#L142

Added line #L142 was not covered by tests
}
let identifier = JobID(key)
if let job = try await self.get(jobId: identifier) {
return .init(id: identifier, job: job)
if let buffer = try await self.get(jobId: identifier) {
return .init(id: identifier, jobBuffer: buffer)
} else {
throw RedisQueueError.jobMissing(identifier)
}

Check warning on line 149 in Sources/HummingbirdJobsRedis/RedisJobQueue.swift

View check run for this annotation

Codecov / codecov/patch

Sources/HummingbirdJobsRedis/RedisJobQueue.swift#L148-L149

Added lines #L148 - L149 were not covered by tests
Expand Down Expand Up @@ -186,19 +186,12 @@ public final class HBRedisJobQueue: HBJobQueue {
}
}

func get(jobId: JobID) async throws -> HBJob? {
guard let data = try await self.redisConnectionPool.get(jobId.redisKey, as: Data.self).get() else {
return nil
}
do {
return try JSONDecoder().decode(HBAnyCodableJob.self, from: data).job
} catch {
throw JobQueueError.decodeJobFailed
}
func get(jobId: JobID) async throws -> ByteBuffer? {
return try await self.redisConnectionPool.get(jobId.redisKey).get().byteBuffer
}

func set(jobId: JobID, job: HBJob) async throws {
return try await self.redisConnectionPool.set(jobId.redisKey, toJSON: HBAnyCodableJob(job)).get()
func set(jobId: JobID, buffer: ByteBuffer) async throws {
return try await self.redisConnectionPool.set(jobId.redisKey, to: buffer).get()
}

func delete(jobId: JobID) async throws {
Expand All @@ -207,9 +200,9 @@ public final class HBRedisJobQueue: HBJobQueue {
}

/// extend HBRedisJobQueue to conform to AsyncSequence
extension HBRedisJobQueue {
extension HBRedisQueue {
public struct AsyncIterator: AsyncIteratorProtocol {
let queue: HBRedisJobQueue
let queue: HBRedisQueue

public func next() async throws -> Element? {
while true {
Expand All @@ -229,3 +222,26 @@ extension HBRedisJobQueue {
return .init(queue: self)
}
}

extension HBJobQueueDriver where Self == HBRedisQueue {
/// Return Redis driver for Job Queue
/// - Parameters:
/// - redisConnectionPoolService: Redis connection pool
/// - configuration: configuration
public static func redis(_ redisConnectionPoolService: HBRedisConnectionPoolService, configuration: HBRedisQueue.Configuration = .init()) -> Self {
.init(redisConnectionPoolService, configuration: configuration)
}
}

// Extend ByteBuffer so that is conforms to `RESPValueConvertible`. Really not sure why
// this isnt available already
extension ByteBuffer: RESPValueConvertible {
public init?(fromRESP value: RESPValue) {
guard let buffer = value.byteBuffer else { return nil }
self = buffer
}

Check warning on line 242 in Sources/HummingbirdJobsRedis/RedisJobQueue.swift

View check run for this annotation

Codecov / codecov/patch

Sources/HummingbirdJobsRedis/RedisJobQueue.swift#L239-L242

Added lines #L239 - L242 were not covered by tests

public func convertedToRESPValue() -> RESPValue {
return .bulkString(self)
}
}

0 comments on commit ed1a8a4

Please sign in to comment.