Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changes required by Jobs refactor #13

Merged
merged 7 commits into from Mar 8, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion Package.swift
Expand Up @@ -11,7 +11,7 @@ let package = Package(
.library(name: "HummingbirdJobsRedis", targets: ["HummingbirdJobsRedis"]),
],
dependencies: [
.package(url: "https://github.com/hummingbird-project/hummingbird.git", from: "2.0.0-alpha.3"),
.package(url: "https://github.com/hummingbird-project/hummingbird.git", branch: "2.x.x-jobs-refactor"),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remember to fix this before merging

.package(url: "https://github.com/swift-server/RediStack.git", from: "1.4.0"),
],
targets: [
Expand Down
2 changes: 1 addition & 1 deletion Sources/HummingbirdJobsRedis/Configuration.swift
Expand Up @@ -15,7 +15,7 @@
import NIOCore
import RediStack

extension HBRedisJobQueue {
extension HBRedisQueue {
/// what to do with failed/processing jobs from last time queue was handled
public enum JobInitialization: Sendable {
case doNothing
Expand Down
56 changes: 30 additions & 26 deletions Sources/HummingbirdJobsRedis/RedisJobQueue.swift
Expand Up @@ -22,8 +22,8 @@ import HummingbirdRedis
import NIOCore
import RediStack

/// Redis implementation of job queues
public final class HBRedisJobQueue: HBJobQueue {
/// Redis implementation of job queue driver
public final class HBRedisQueue: HBJobQueueDriver {
public struct JobID: Sendable, CustomStringConvertible {
let id: String

Expand Down Expand Up @@ -73,7 +73,7 @@ public final class HBRedisJobQueue: HBJobQueue {

/// Initialize redis job queue
/// - Parameters:
/// - redisConnectionPoolGroup: Redis connection pool group
/// - redisConnectionPoolService: Redis connection pool
/// - configuration: configuration
public init(_ redisConnectionPoolService: HBRedisConnectionPoolService, configuration: Configuration = .init()) {
self.redisConnectionPool = redisConnectionPoolService
Expand All @@ -92,15 +92,16 @@ public final class HBRedisJobQueue: HBJobQueue {
try await self.initQueue(queueKey: self.configuration.failedQueueKey, onInit: self.configuration.failedJobsInitialization)
}

/// Push Job onto queue
/// Push job onto queue
/// - Parameters:
/// - job: Job descriptor
/// - Returns: Queued job identifier
@discardableResult public func push(_ job: any HBJob) async throws -> JobID {
let id = JobID()
try await self.set(jobId: id, job: job)
_ = try await self.redisConnectionPool.lpush(id.redisKey, into: self.configuration.queueKey).get()
return id
/// - job: Job
/// - eventLoop: Eventloop to run process on (ignored in this case)
/// - Returns: Queued job
@discardableResult public func push(data: Data) async throws -> JobID {
let jobInstanceID = JobID()
try await self.set(jobId: jobInstanceID, data: data)
_ = try await self.redisConnectionPool.lpush(jobInstanceID.redisKey, into: self.configuration.queueKey).get()
return jobInstanceID
}

/// Flag job is done
Expand Down Expand Up @@ -142,8 +143,8 @@ public final class HBRedisJobQueue: HBJobQueue {
throw RedisQueueError.unexpectedRedisKeyType
}
let identifier = JobID(key)
if let job = try await self.get(jobId: identifier) {
return .init(id: identifier, job: job)
if let data = try await self.get(jobId: identifier) {
return .init(id: identifier, jobData: data)
} else {
throw RedisQueueError.jobMissing(identifier)
}
Expand Down Expand Up @@ -186,19 +187,12 @@ public final class HBRedisJobQueue: HBJobQueue {
}
}

func get(jobId: JobID) async throws -> HBJob? {
guard let data = try await self.redisConnectionPool.get(jobId.redisKey, as: Data.self).get() else {
return nil
}
do {
return try JSONDecoder().decode(HBAnyCodableJob.self, from: data).job
} catch {
throw JobQueueError.decodeJobFailed
}
func get(jobId: JobID) async throws -> Data? {
return try await self.redisConnectionPool.get(jobId.redisKey, as: Data.self).get()
}

func set(jobId: JobID, job: HBJob) async throws {
return try await self.redisConnectionPool.set(jobId.redisKey, toJSON: HBAnyCodableJob(job)).get()
func set(jobId: JobID, data: Data) async throws {
return try await self.redisConnectionPool.set(jobId.redisKey, to: data).get()
}

func delete(jobId: JobID) async throws {
Expand All @@ -207,9 +201,9 @@ public final class HBRedisJobQueue: HBJobQueue {
}

/// extend HBRedisJobQueue to conform to AsyncSequence
extension HBRedisJobQueue {
extension HBRedisQueue {
public struct AsyncIterator: AsyncIteratorProtocol {
let queue: HBRedisJobQueue
let queue: HBRedisQueue

public func next() async throws -> Element? {
while true {
Expand All @@ -229,3 +223,13 @@ extension HBRedisJobQueue {
return .init(queue: self)
}
}

extension HBJobQueueDriver where Self == HBRedisQueue {
/// Return Redis driver for Job Queue
/// - Parameters:
/// - redisConnectionPoolService: Redis connection pool
/// - configuration: configuration
public static func redis(_ redisConnectionPoolService: HBRedisConnectionPoolService, configuration: HBRedisQueue.Configuration = .init()) -> Self {
.init(redisConnectionPoolService, configuration: configuration)
}
}