Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove @_spi(ConnectionPool), use PostgresClient.query #12

Merged
merged 3 commits into from Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion Package.swift
Expand Up @@ -11,7 +11,7 @@ let package = Package(
],
dependencies: [
.package(url: "https://github.com/hummingbird-project/hummingbird.git", from: "2.0.0-beta.1"),
.package(url: "https://github.com/vapor/postgres-nio", from: "1.20.0"),
.package(url: "https://github.com/vapor/postgres-nio", from: "1.21.0"),
],
targets: [
.target(
Expand Down
Expand Up @@ -14,7 +14,7 @@

import HummingbirdPostgres
import Logging
@_spi(ConnectionPool) import PostgresNIO
import PostgresNIO

struct CreateJobQueue: PostgresMigration {
func apply(connection: PostgresConnection, logger: Logger) async throws {
Expand Down
Expand Up @@ -14,7 +14,7 @@

import HummingbirdPostgres
import Logging
@_spi(ConnectionPool) import PostgresNIO
import PostgresNIO

struct CreateJobs: PostgresMigration {
func apply(connection: PostgresConnection, logger: Logger) async throws {
Expand Down
43 changes: 21 additions & 22 deletions Sources/HummingbirdJobsPostgres/PostgresJobsQueue.swift
Expand Up @@ -14,13 +14,12 @@

import Foundation
import HummingbirdJobs
@_spi(ConnectionPool) import HummingbirdPostgres
import HummingbirdPostgres
import Logging
import NIOConcurrencyHelpers
import NIOCore
@_spi(ConnectionPool) import PostgresNIO
import PostgresNIO

@_spi(ConnectionPool)
public final class PostgresQueue: JobQueueDriver {
public typealias JobID = UUID

Expand Down Expand Up @@ -121,16 +120,12 @@ public final class PostgresQueue: JobQueueDriver {

/// This is called to say job has finished processing and it can be deleted
public func finished(jobId: JobID) async throws {
_ = try await self.client.withConnection { connection in
try await self.delete(jobId: jobId, connection: connection)
}
try await self.delete(jobId: jobId)
}

/// This is called to say job has failed to run and should be put aside
public func failed(jobId: JobID, error: Error) async throws {
_ = try await self.client.withConnection { connection in
try await self.setStatus(jobId: jobId, status: .failed, connection: connection)
}
try await self.setStatus(jobId: jobId, status: .failed)
}

/// stop serving jobs
Expand Down Expand Up @@ -199,8 +194,8 @@ public final class PostgresQueue: JobQueueDriver {
)
}

func delete(jobId: JobID, connection: PostgresConnection) async throws {
try await connection.query(
func delete(jobId: JobID) async throws {
try await self.client.query(
"DELETE FROM _hb_pg_jobs WHERE id = \(jobId)",
logger: self.logger
)
Expand All @@ -222,18 +217,23 @@ public final class PostgresQueue: JobQueueDriver {
)
}

func setStatus(jobId: JobID, status: Status) async throws {
try await self.client.query(
"UPDATE _hb_pg_jobs SET status = \(status), lastModified = \(Date.now) WHERE id = \(jobId)",
logger: self.logger
)
}

func getJobs(withStatus status: Status) async throws -> [JobID] {
return try await self.client.withConnection { connection in
let stream = try await connection.query(
"SELECT id FROM _hb_pg_jobs WHERE status = \(status)",
logger: self.logger
)
var jobs: [JobID] = []
for try await id in stream.decode(JobID.self, context: .default) {
jobs.append(id)
}
return jobs
let stream = try await self.client.query(
"SELECT id FROM _hb_pg_jobs WHERE status = \(status)",
logger: self.logger
)
var jobs: [JobID] = []
for try await id in stream.decode(JobID.self, context: .default) {
jobs.append(id)
}
return jobs
}

func updateJobsOnInit(withStatus status: Status, onInit: JobInitialization, connection: PostgresConnection) async throws {
Expand Down Expand Up @@ -280,7 +280,6 @@ extension PostgresQueue {
}
}

@_spi(ConnectionPool)
extension JobQueueDriver where Self == PostgresQueue {
/// Return Postgres driver for Job Queue
/// - Parameters:
Expand Down
2 changes: 1 addition & 1 deletion Sources/HummingbirdPostgres/CreatePersistTable.swift
Expand Up @@ -13,7 +13,7 @@
//===----------------------------------------------------------------------===//

import Logging
@_spi(ConnectionPool) import PostgresNIO
import PostgresNIO

struct CreatePersistTable: PostgresMigration {
func apply(connection: PostgresConnection, logger: Logger) async throws {
Expand Down
2 changes: 1 addition & 1 deletion Sources/HummingbirdPostgres/Migration.swift
Expand Up @@ -13,7 +13,7 @@
//===----------------------------------------------------------------------===//

import Logging
@_spi(ConnectionPool) import PostgresNIO
import PostgresNIO

/// Protocol for a database migration
///
Expand Down
6 changes: 3 additions & 3 deletions Sources/HummingbirdPostgres/Migrations.swift
Expand Up @@ -13,7 +13,7 @@
//===----------------------------------------------------------------------===//

import Logging
@_spi(ConnectionPool) import PostgresNIO
import PostgresNIO

/// Database migration support
public actor PostgresMigrations {
Expand Down Expand Up @@ -63,7 +63,7 @@ public actor PostgresMigrations {
/// - client: Postgres client
/// - logger: Logger to use
/// - dryRun: Should migrations actually be applied, or should we just report what would be applied and reverted
@_spi(ConnectionPool)

public func apply(client: PostgresClient, groups: [MigrationGroup] = [], logger: Logger, dryRun: Bool) async throws {
try await self.migrate(client: client, migrations: self.migrations, groups: groups, logger: logger, completeMigrations: true, dryRun: dryRun)
}
Expand All @@ -73,7 +73,7 @@ public actor PostgresMigrations {
/// - client: Postgres client
/// - logger: Logger to use
/// - dryRun: Should migrations actually be reverted, or should we just report what would be reverted
@_spi(ConnectionPool)

public func revert(client: PostgresClient, groups: [MigrationGroup] = [], logger: Logger, dryRun: Bool) async throws {
try await self.migrate(client: client, migrations: [], groups: groups, logger: logger, completeMigrations: false, dryRun: dryRun)
}
Expand Down
86 changes: 38 additions & 48 deletions Sources/HummingbirdPostgres/PostgresPersistDriver.swift
Expand Up @@ -16,7 +16,7 @@ import AsyncAlgorithms
import Foundation
import Hummingbird
import NIOCore
@_spi(ConnectionPool) import PostgresNIO
import PostgresNIO

extension PSQLError {
public var serverError: PostgresError.Code? {
Expand Down Expand Up @@ -56,7 +56,7 @@ public final class PostgresPersistDriver: PersistDriver {
/// - Parameters:
/// - client: Postgres client
/// - tidyUpFrequequency: How frequently cleanup expired database entries should occur
@_spi(ConnectionPool)

public init(client: PostgresClient, migrations: PostgresMigrations, tidyUpFrequency: Duration = .seconds(600), logger: Logger) async {
self.client = client
self.logger = logger
Expand All @@ -68,72 +68,62 @@ public final class PostgresPersistDriver: PersistDriver {
/// Create new key. This doesn't check for the existence of this key already so may fail if the key already exists
public func create(key: String, value: some Codable, expires: Duration?) async throws {
let expires = expires.map { Date.now + Double($0.components.seconds) } ?? Date.distantFuture
try await self.client.withConnection { connection in
do {
try await connection.query(
"INSERT INTO _hb_pg_persist (id, data, expires) VALUES (\(key), \(WrapperObject(value)), \(expires))",
logger: self.logger
)
} catch let error as PSQLError {
if error.serverError == .uniqueViolation {
throw PersistError.duplicate
} else {
throw error
}
do {
try await self.client.query(
"INSERT INTO _hb_pg_persist (id, data, expires) VALUES (\(key), \(WrapperObject(value)), \(expires))",
logger: self.logger
)
} catch let error as PSQLError {
if error.serverError == .uniqueViolation {
throw PersistError.duplicate
} else {
throw error
}
}
}

/// Set value for key.
public func set(key: String, value: some Codable, expires: Duration?) async throws {
let expires = expires.map { Date.now + Double($0.components.seconds) } ?? Date.distantFuture
_ = try await self.client.withConnection { connection in
try await connection.query(
"""
INSERT INTO _hb_pg_persist (id, data, expires) VALUES (\(key), \(WrapperObject(value)), \(expires))
ON CONFLICT (id)
DO UPDATE SET data = \(WrapperObject(value)), expires = \(expires)
""",
logger: self.logger
)
}
try await self.client.query(
"""
INSERT INTO _hb_pg_persist (id, data, expires) VALUES (\(key), \(WrapperObject(value)), \(expires))
ON CONFLICT (id)
DO UPDATE SET data = \(WrapperObject(value)), expires = \(expires)
""",
logger: self.logger
)
}

/// Get value for key
public func get<Object: Codable>(key: String, as object: Object.Type) async throws -> Object? {
try await self.client.withConnection { connection in
let stream = try await connection.query(
"SELECT data, expires FROM _hb_pg_persist WHERE id = \(key)",
logger: self.logger
)
guard let result = try await stream.decode((WrapperObject<Object>, Date).self)
.first(where: { _ in true })
else {
return nil
}
guard result.1 > .now else { return nil }
return result.0.value
let stream = try await self.client.query(
"SELECT data, expires FROM _hb_pg_persist WHERE id = \(key)",
logger: self.logger
)
guard let (object, expires) = try await stream.decode((WrapperObject<Object>, Date).self)
.first(where: { _ in true })
else {
return nil
}
guard expires > .now else { return nil }
return object.value
}

/// Remove key
public func remove(key: String) async throws {
_ = try await self.client.withConnection { connection in
try await connection.query(
"DELETE FROM _hb_pg_persist WHERE id = \(key)",
logger: self.logger
)
}
try await self.client.query(
"DELETE FROM _hb_pg_persist WHERE id = \(key)",
logger: self.logger
)
}

/// tidy up database by cleaning out expired keys
func tidy() async throws {
_ = try await self.client.withConnection { connection in
try await connection.query(
"DELETE FROM _hb_pg_persist WHERE expires < \(Date.now)",
logger: self.logger
)
}
try await self.client.query(
"DELETE FROM _hb_pg_persist WHERE expires < \(Date.now)",
logger: self.logger
)
}
}

Expand Down
6 changes: 3 additions & 3 deletions Tests/HummingbirdPostgresTests/JobsTests.swift
Expand Up @@ -15,11 +15,11 @@
import Atomics
import Hummingbird
import HummingbirdJobs
@testable @_spi(ConnectionPool) import HummingbirdPostgres
@testable @_spi(ConnectionPool) import HummingbirdJobsPostgres
@testable import HummingbirdJobsPostgres
@testable import HummingbirdPostgres
import HummingbirdTesting
import NIOConcurrencyHelpers
@_spi(ConnectionPool) import PostgresNIO
import PostgresNIO
import ServiceLifecycle
import XCTest

Expand Down
4 changes: 2 additions & 2 deletions Tests/HummingbirdPostgresTests/MigrationTests.swift
@@ -1,7 +1,7 @@
import Atomics
@testable @_spi(ConnectionPool) import HummingbirdPostgres
@testable import HummingbirdPostgres
import Logging
@_spi(ConnectionPool) import PostgresNIO
import PostgresNIO
import XCTest

final class MigrationTests: XCTestCase {
Expand Down
4 changes: 2 additions & 2 deletions Tests/HummingbirdPostgresTests/PersistTests.swift
Expand Up @@ -13,10 +13,10 @@
//===----------------------------------------------------------------------===//

import Hummingbird
@_spi(ConnectionPool) import HummingbirdPostgres
import HummingbirdPostgres
import HummingbirdTesting
import Logging
@_spi(ConnectionPool) import PostgresNIO
import PostgresNIO
import ServiceLifecycle
import XCTest

Expand Down
2 changes: 1 addition & 1 deletion Tests/HummingbirdPostgresTests/TestUtils.swift
@@ -1,5 +1,5 @@
import Hummingbird
@_spi(ConnectionPool) import PostgresNIO
import PostgresNIO
import ServiceLifecycle

/// Manage the lifecycle of a PostgresClient
Expand Down