Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changes for jobs refactor in Hummingbird #7

Merged
merged 2 commits into from Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion Package.swift
Expand Up @@ -10,7 +10,7 @@ let package = Package(
.library(name: "HummingbirdPostgres", targets: ["HummingbirdPostgres"]),
],
dependencies: [
.package(url: "https://github.com/hummingbird-project/hummingbird.git", from: "2.0.0-alpha.3"),
.package(url: "https://github.com/hummingbird-project/hummingbird.git", branch: "2.x.x-jobs-refactor"),
.package(url: "https://github.com/vapor/postgres-nio", from: "1.20.0"),
],
targets: [
Expand Down
50 changes: 34 additions & 16 deletions Sources/HummingbirdJobsPostgres/PostgresJobsQueue.swift
Expand Up @@ -3,10 +3,11 @@ import HummingbirdJobs
@_spi(ConnectionPool) import HummingbirdPostgres
import Logging
import NIOConcurrencyHelpers
import NIOCore
@_spi(ConnectionPool) import PostgresNIO

@_spi(ConnectionPool)
public final class HBPostgresJobQueue: HBJobQueue {
public final class HBPostgresQueue: HBJobQueueDriver {
public typealias JobID = UUID

/// what to do with failed/processing jobs from last time queue was handled
Expand Down Expand Up @@ -47,9 +48,9 @@ public final class HBPostgresJobQueue: HBJobQueue {
public init(
jobTable: String = "_hb_jobs",
jobQueueTable: String = "_hb_job_queue",
pendingJobsInitialization: HBPostgresJobQueue.JobInitialization = .doNothing,
failedJobsInitialization: HBPostgresJobQueue.JobInitialization = .rerun,
processingJobsInitialization: HBPostgresJobQueue.JobInitialization = .rerun,
pendingJobsInitialization: HBPostgresQueue.JobInitialization = .doNothing,
failedJobsInitialization: HBPostgresQueue.JobInitialization = .rerun,
processingJobsInitialization: HBPostgresQueue.JobInitialization = .rerun,
pollTime: Duration = .milliseconds(100)
) {
self.jobTable = jobTable
Expand All @@ -61,12 +62,19 @@ public final class HBPostgresJobQueue: HBJobQueue {
}
}

let client: PostgresClient
let configuration: Configuration
let logger: Logger
/// Postgres client used by Job queue
public let client: PostgresClient
/// Job queue configuration
public let configuration: Configuration
/// Logger used by queue
public let logger: Logger
let isStopped: NIOLockedValueBox<Bool>

/// Initialize a HBPostgresJobQueue
/// - Parameters:
/// - client: Postgres client
/// - configuration: Queue configuration
/// - logger: Logger used by queue
public init(client: PostgresClient, configuration: Configuration = .init(), logger: Logger) {
self.client = client
self.configuration = configuration
Expand All @@ -82,7 +90,7 @@ public final class HBPostgresJobQueue: HBJobQueue {
"""
CREATE TABLE IF NOT EXISTS \(unescaped: self.configuration.jobTable) (
id uuid PRIMARY KEY,
job json,
job bytea,
status smallint
)
""",
Expand Down Expand Up @@ -116,9 +124,9 @@ public final class HBPostgresJobQueue: HBJobQueue {

/// Push Job onto queue
/// - Returns: Identifier of queued job
@discardableResult public func push(_ job: HBJob) async throws -> JobID {
@discardableResult public func push(_ buffer: ByteBuffer) async throws -> JobID {
try await self.client.withConnection { connection in
let queuedJob = HBQueuedJob<JobID>(id: .init(), job: job)
let queuedJob = HBQueuedJob<JobID>(id: .init(), jobBuffer: buffer)
try await add(queuedJob, connection: connection)
try await addToQueue(jobId: queuedJob.id, connection: connection)
return queuedJob.id
Expand Down Expand Up @@ -179,10 +187,10 @@ public final class HBPostgresJobQueue: HBJobQueue {
do {
try await self.setStatus(jobId: jobId, status: .processing, connection: connection)
// if failed to find a job in the job table try getting another index
guard let job = try await stream2.decode(HBAnyCodableJob.self, context: .default).first(where: { _ in true }) else {
guard let buffer = try await stream2.decode(ByteBuffer.self, context: .default).first(where: { _ in true }) else {
continue
}
return HBQueuedJob(id: jobId, job: job.job)
return HBQueuedJob(id: jobId, jobBuffer: buffer)
} catch {
try await self.setStatus(jobId: jobId, status: .failed, connection: connection)
throw JobQueueError.decodeJobFailed
Expand All @@ -199,7 +207,7 @@ public final class HBPostgresJobQueue: HBJobQueue {
try await connection.query(
"""
INSERT INTO \(unescaped: self.configuration.jobTable) (id, job, status)
VALUES (\(job.id), \(job.anyCodableJob), \(Status.pending))
VALUES (\(job.id), \(job.jobBuffer), \(Status.pending))
""",
logger: self.logger
)
Expand Down Expand Up @@ -262,9 +270,9 @@ public final class HBPostgresJobQueue: HBJobQueue {
}

/// extend HBPostgresJobQueue to conform to AsyncSequence
extension HBPostgresJobQueue {
extension HBPostgresQueue {
public struct AsyncIterator: AsyncIteratorProtocol {
let queue: HBPostgresJobQueue
let queue: HBPostgresQueue

public func next() async throws -> Element? {
while true {
Expand All @@ -285,4 +293,14 @@ extension HBPostgresJobQueue {
}
}

extension HBAnyCodableJob: PostgresCodable {}
@_spi(ConnectionPool)
extension HBJobQueueDriver where Self == HBPostgresQueue {
/// Return Postgres driver for Job Queue
/// - Parameters:
/// - client: Postgres client
/// - configuration: Queue configuration
/// - logger: Logger used by queue
public static func postgres(client: PostgresClient, configuration: HBPostgresQueue.Configuration = .init(), logger: Logger) -> Self {
.init(client: client, configuration: configuration, logger: logger)
}
}