Skip to content

Commit

Permalink
Use ByteBuffer instead of Data
Browse files Browse the repository at this point in the history
  • Loading branch information
adam-fowler committed Mar 4, 2024
1 parent 2a28feb commit 8149677
Showing 1 changed file with 6 additions and 5 deletions.
11 changes: 6 additions & 5 deletions Sources/HummingbirdJobsPostgres/PostgresJobsQueue.swift
Expand Up @@ -3,6 +3,7 @@ import HummingbirdJobs
@_spi(ConnectionPool) import HummingbirdPostgres
import Logging
import NIOConcurrencyHelpers
import NIOCore
@_spi(ConnectionPool) import PostgresNIO

@_spi(ConnectionPool)
Expand Down Expand Up @@ -123,9 +124,9 @@ public final class HBPostgresQueue: HBJobQueueDriver {

/// Push Job onto queue
/// - Returns: Identifier of queued job
@discardableResult public func push(data: Data) async throws -> JobID {
@discardableResult public func push(_ buffer: ByteBuffer) async throws -> JobID {
try await self.client.withConnection { connection in
let queuedJob = HBQueuedJob<JobID>(id: .init(), jobData: data)
let queuedJob = HBQueuedJob<JobID>(id: .init(), jobBuffer: buffer)
try await add(queuedJob, connection: connection)
try await addToQueue(jobId: queuedJob.id, connection: connection)
return queuedJob.id
Expand Down Expand Up @@ -186,10 +187,10 @@ public final class HBPostgresQueue: HBJobQueueDriver {
do {
try await self.setStatus(jobId: jobId, status: .processing, connection: connection)
// if failed to find a job in the job table try getting another index
guard let data = try await stream2.decode(Data.self, context: .default).first(where: { _ in true }) else {
guard let buffer = try await stream2.decode(ByteBuffer.self, context: .default).first(where: { _ in true }) else {
continue
}
return HBQueuedJob(id: jobId, jobData: data)
return HBQueuedJob(id: jobId, jobBuffer: buffer)
} catch {
try await self.setStatus(jobId: jobId, status: .failed, connection: connection)
throw JobQueueError.decodeJobFailed
Expand All @@ -206,7 +207,7 @@ public final class HBPostgresQueue: HBJobQueueDriver {
try await connection.query(
"""
INSERT INTO \(unescaped: self.configuration.jobTable) (id, job, status)
VALUES (\(job.id), \(job.jobData), \(Status.pending))
VALUES (\(job.id), \(job.jobBuffer), \(Status.pending))
""",
logger: self.logger
)
Expand Down

0 comments on commit 8149677

Please sign in to comment.