From e0a87fc3a46e0b18691f8af2891d02f8ad668aa2 Mon Sep 17 00:00:00 2001 From: Franz Busch Date: Fri, 16 Feb 2024 15:59:31 +0000 Subject: [PATCH 1/2] Fix `EventLoopFuture` and `EventLoopPromise` under strict concurrency checking # Motivation We need to tackle the remaining strict concurrency checking related `Sendable` warnings in NIO. The first place to start is making sure that `EventLoopFuture` and `EventLoopPromise` are properly annotated. # Modification In a previous https://github.com/apple/swift-nio/pull/2496, @weissi changed the `@unchecked Sendable` conformances of `EventLoopFuture/Promise` to be conditional on the sendability of the generic `Value` type. After having looked at all the APIs on the future and promise types as well as reading the latest Concurrency evolution proposals, specifically the [Region based Isolation](https://github.com/apple/swift-evolution/blob/main/proposals/0414-region-based-isolation.md), I came to the conclusion that the previous `@unchecked Sendable` annotations were correct. The reasoning for this is: 1. An `EventLoopPromise` and `EventLoopFuture` pair are tied to a specific `EventLoop` 2. An `EventLoop` represents an isolation region and values tied to its isolation are not allowed to be shared outside of it unless they are disconnected from the region 3. The `value` used to succeed a promise often come from outside the isolation domain of the `EventLoop` hence they must be transferred into the promise. 4. The isolation region of the event loop is enforced through `@Sendable` annotations on all closures that receive the value in some kind of transformation e.g. `map()` 5. Any method on `EventLoopFuture` that combines itself with another future must require `Sendable` of the other futures `Value` since we cannot statically enforce that futures are bound to the same event loop i.e. to the same isolation domain Due to the above rules, this PR adds back the `@unchecked Sendable` conformances to both types. Furthermore, this PR revisits every single method on `EventLoopPromise/Future` and adds missing `Sendable` and `@Sendable` annotation where necessary to uphold the above rules. A few important things to call out: - Since `transferring` is currently not available this PR requires a `Sendable` conformance for some methods on `EventLoopPromise/Future` that should rather take a `transffering` argument - To enable the common case where a value from the same event loop is used to succeed a promise I added two additional methods that take a `eventLoopBoundResult` and enforce dynamic isolation checking. We might have to do this for more methods once we adopt those changes in other targets/packages. # Result After this PR has landed our lowest level building block should be inline with what the rest of the language enforces in Concurrency. The `EventLoopFuture.swift` produces no more warnings under strict concurrency checking on the latest 5.10 snapshots. --- Sources/NIOCore/AsyncAwaitSupport.swift | 13 +- .../NIOCore/DispatchQueue+WithFuture.swift | 5 +- Sources/NIOCore/EventLoop.swift | 50 +- .../NIOCore/EventLoopFuture+Deprecated.swift | 68 +- .../EventLoopFuture+WithEventLoop.swift | 13 +- Sources/NIOCore/EventLoopFuture.swift | 587 ++++++++++++------ Tests/NIOPosixTests/EventLoopFutureTest.swift | 8 +- 7 files changed, 470 insertions(+), 274 deletions(-) diff --git a/Sources/NIOCore/AsyncAwaitSupport.swift b/Sources/NIOCore/AsyncAwaitSupport.swift index bcdc7a848d..105ce1c3c5 100644 --- a/Sources/NIOCore/AsyncAwaitSupport.swift +++ b/Sources/NIOCore/AsyncAwaitSupport.swift @@ -18,8 +18,9 @@ extension EventLoopFuture { /// This function can be used to bridge an `EventLoopFuture` into the `async` world. Ie. if you're in an `async` /// function and want to get the result of this future. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) + @preconcurrency @inlinable - public func get() async throws -> Value { + public func get() async throws -> Value where Value: Sendable { return try await withUnsafeThrowingContinuation { (cont: UnsafeContinuation, Error>) in self.whenComplete { result in switch result { @@ -60,8 +61,11 @@ extension EventLoopPromise { /// - returns: A `Task` which was created to `await` the `body`. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) @discardableResult + @preconcurrency @inlinable - public func completeWithTask(_ body: @escaping @Sendable () async throws -> Value) -> Task { + public func completeWithTask( + _ body: @escaping @Sendable () async throws -> Value + ) -> Task where Value: Sendable { Task { do { let value = try await body() @@ -333,8 +337,11 @@ struct AsyncSequenceFromIterator: AsyncSeq @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) extension EventLoop { + @preconcurrency @inlinable - public func makeFutureWithTask(_ body: @Sendable @escaping () async throws -> Return) -> EventLoopFuture { + public func makeFutureWithTask( + _ body: @Sendable @escaping () async throws -> Return + ) -> EventLoopFuture { let promise = self.makePromise(of: Return.self) promise.completeWithTask(body) return promise.futureResult diff --git a/Sources/NIOCore/DispatchQueue+WithFuture.swift b/Sources/NIOCore/DispatchQueue+WithFuture.swift index f85ea96dbf..beb0e2c74e 100644 --- a/Sources/NIOCore/DispatchQueue+WithFuture.swift +++ b/Sources/NIOCore/DispatchQueue+WithFuture.swift @@ -28,9 +28,10 @@ extension DispatchQueue { /// - callbackMayBlock: The scheduled callback for the IO / task. /// - returns a new `EventLoopFuture` with value returned by the `block` parameter. @inlinable - public func asyncWithFuture( + @preconcurrency + public func asyncWithFuture( eventLoop: EventLoop, - _ callbackMayBlock: @escaping () throws -> NewValue + _ callbackMayBlock: @escaping @Sendable () throws -> NewValue ) -> EventLoopFuture { let promise = eventLoop.makePromise(of: NewValue.self) diff --git a/Sources/NIOCore/EventLoop.swift b/Sources/NIOCore/EventLoop.swift index 50b90ed925..b2fe4c8f14 100644 --- a/Sources/NIOCore/EventLoop.swift +++ b/Sources/NIOCore/EventLoop.swift @@ -713,12 +713,6 @@ extension EventLoop { @inlinable @preconcurrency public func submit(_ task: @escaping @Sendable () throws -> T) -> EventLoopFuture { - _submit(task) - } - @usableFromInline typealias SubmitCallback = @Sendable () throws -> T - - @inlinable - func _submit(_ task: @escaping SubmitCallback) -> EventLoopFuture { let promise: EventLoopPromise = makePromise(file: #fileID, line: #line) self.execute { @@ -742,18 +736,15 @@ extension EventLoop { /// - returns: An `EventLoopFuture` identical to the `EventLoopFuture` returned from `task`. @inlinable @preconcurrency - public func flatSubmit(_ task: @escaping @Sendable () -> EventLoopFuture) -> EventLoopFuture { - self._flatSubmit(task) - } - @usableFromInline typealias FlatSubmitCallback = @Sendable () -> EventLoopFuture - - @inlinable - func _flatSubmit(_ task: @escaping FlatSubmitCallback) -> EventLoopFuture { + public func flatSubmit(_ task: @escaping @Sendable () -> EventLoopFuture) -> EventLoopFuture { // TODO: This should take a closure that returns fresh self.submit(task).flatMap { $0 } } /// Schedule a `task` that is executed by this `EventLoop` at the given time. /// + /// - Note: The `T` must be `Sendable` since the isolation domains of the event loop future returned from `task` and + /// this event loop might differ. + /// /// - parameters: /// - task: The asynchronous task to run. As with everything that runs on the `EventLoop`, it must not block. /// - returns: A `Scheduled` object which may be used to cancel the task if it has not yet run, or to wait @@ -763,23 +754,11 @@ extension EventLoop { @discardableResult @inlinable @preconcurrency - public func flatScheduleTask( + public func flatScheduleTask( deadline: NIODeadline, file: StaticString = #fileID, line: UInt = #line, _ task: @escaping @Sendable () throws -> EventLoopFuture - ) -> Scheduled { - self._flatScheduleTask(deadline: deadline, file: file, line: line, task) - } - @usableFromInline typealias FlatScheduleTaskDeadlineCallback = () throws -> EventLoopFuture - - @discardableResult - @inlinable - func _flatScheduleTask( - deadline: NIODeadline, - file: StaticString, - line: UInt, - _ task: @escaping FlatScheduleTaskDelayCallback ) -> Scheduled { let promise: EventLoopPromise = self.makePromise(file: file, line: line) let scheduled = self.scheduleTask(deadline: deadline, task) @@ -790,6 +769,9 @@ extension EventLoop { /// Schedule a `task` that is executed by this `EventLoop` after the given amount of time. /// + /// - Note: The `T` must be `Sendable` since the isolation domains of the event loop future returned from `task` and + /// this event loop might differ. + /// /// - parameters: /// - task: The asynchronous task to run. As everything that runs on the `EventLoop`, it must not block. /// - returns: A `Scheduled` object which may be used to cancel the task if it has not yet run, or to wait @@ -799,23 +781,11 @@ extension EventLoop { @discardableResult @inlinable @preconcurrency - public func flatScheduleTask( + public func flatScheduleTask( in delay: TimeAmount, file: StaticString = #fileID, line: UInt = #line, _ task: @escaping @Sendable () throws -> EventLoopFuture - ) -> Scheduled { - self._flatScheduleTask(in: delay, file: file, line: line, task) - } - - @usableFromInline typealias FlatScheduleTaskDelayCallback = @Sendable () throws -> EventLoopFuture - - @inlinable - func _flatScheduleTask( - in delay: TimeAmount, - file: StaticString, - line: UInt, - _ task: @escaping FlatScheduleTaskDelayCallback ) -> Scheduled { let promise: EventLoopPromise = self.makePromise(file: file, line: line) let scheduled = self.scheduleTask(in: delay, task) @@ -951,7 +921,7 @@ extension EventLoop { notifying promise: EventLoopPromise?, _ task: @escaping ScheduleRepeatedTaskCallback ) -> RepeatedTask { - let futureTask: (RepeatedTask) -> EventLoopFuture = { repeatedTask in + let futureTask: @Sendable (RepeatedTask) -> EventLoopFuture = { repeatedTask in do { try task(repeatedTask) return self.makeSucceededFuture(()) diff --git a/Sources/NIOCore/EventLoopFuture+Deprecated.swift b/Sources/NIOCore/EventLoopFuture+Deprecated.swift index 75cfb07162..d2e370ebe4 100644 --- a/Sources/NIOCore/EventLoopFuture+Deprecated.swift +++ b/Sources/NIOCore/EventLoopFuture+Deprecated.swift @@ -13,65 +13,99 @@ //===----------------------------------------------------------------------===// extension EventLoopFuture { + @preconcurrency @inlinable @available(*, deprecated, message: "Please don't pass file:line:, there's no point.") - public func flatMap(file: StaticString = #fileID, line: UInt = #line, _ callback: @escaping (Value) -> EventLoopFuture) -> EventLoopFuture { + public func flatMap( + file: StaticString = #fileID, + line: UInt = #line, + _ callback: @escaping @Sendable (Value) -> EventLoopFuture + ) -> EventLoopFuture { return self.flatMap(callback) } + @preconcurrency @inlinable @available(*, deprecated, message: "Please don't pass file:line:, there's no point.") - public func flatMapThrowing(file: StaticString = #fileID, - line: UInt = #line, - _ callback: @escaping (Value) throws -> NewValue) -> EventLoopFuture { + public func flatMapThrowing( + file: StaticString = #fileID, + line: UInt = #line, + _ callback: @escaping @Sendable (Value) throws -> NewValue + ) -> EventLoopFuture { return self.flatMapThrowing(callback) } @inlinable @available(*, deprecated, message: "Please don't pass file:line:, there's no point.") - public func flatMapErrorThrowing(file: StaticString = #fileID, line: UInt = #line, _ callback: @escaping (Error) throws -> Value) -> EventLoopFuture { + public func flatMapErrorThrowing( + file: StaticString = #fileID, + line: UInt = #line, + _ callback: @escaping @Sendable (Error) throws -> Value + ) -> EventLoopFuture { return self.flatMapErrorThrowing(callback) } @inlinable @available(*, deprecated, message: "Please don't pass file:line:, there's no point.") - public func map(file: StaticString = #fileID, line: UInt = #line, _ callback: @escaping (Value) -> (NewValue)) -> EventLoopFuture { + public func map( + file: StaticString = #fileID, + line: UInt = #line, + _ callback: @escaping @Sendable (Value) -> (NewValue) + ) -> EventLoopFuture { return self.map(callback) } @inlinable @available(*, deprecated, message: "Please don't pass file:line:, there's no point.") - public func flatMapError(file: StaticString = #fileID, line: UInt = #line, _ callback: @escaping (Error) -> EventLoopFuture) -> EventLoopFuture { + public func flatMapError( + file: StaticString = #fileID, + line: UInt = #line, + _ callback: @escaping @Sendable (Error) -> EventLoopFuture + ) -> EventLoopFuture where Value: Sendable { return self.flatMapError(callback) } + @preconcurrency @inlinable @available(*, deprecated, message: "Please don't pass file:line:, there's no point.") - public func flatMapResult(file: StaticString = #fileID, - line: UInt = #line, - _ body: @escaping (Value) -> Result) -> EventLoopFuture { + public func flatMapResult( + file: StaticString = #fileID, + line: UInt = #line, + _ body: @escaping @Sendable (Value) -> Result + ) -> EventLoopFuture { return self.flatMapResult(body) } + @preconcurrency @inlinable @available(*, deprecated, message: "Please don't pass file:line:, there's no point.") - public func recover(file: StaticString = #fileID, line: UInt = #line, _ callback: @escaping (Error) -> Value) -> EventLoopFuture { + public func recover( + file: StaticString = #fileID, + line: UInt = #line, + _ callback: @escaping @Sendable (Error) -> Value + ) -> EventLoopFuture { return self.recover(callback) } + @preconcurrency @inlinable @available(*, deprecated, message: "Please don't pass file:line:, there's no point.") - public func and(_ other: EventLoopFuture, - file: StaticString = #fileID, - line: UInt = #line) -> EventLoopFuture<(Value, OtherValue)> { + public func and( + _ other: EventLoopFuture, + file: StaticString = #fileID, + line: UInt = #line + ) -> EventLoopFuture<(Value, OtherValue)> { return self.and(other) } + @preconcurrency @inlinable @available(*, deprecated, message: "Please don't pass file:line:, there's no point.") - public func and(value: OtherValue, - file: StaticString = #fileID, - line: UInt = #line) -> EventLoopFuture<(Value, OtherValue)> { + public func and( + value: OtherValue, + file: StaticString = #fileID, + line: UInt = #line + ) -> EventLoopFuture<(Value, OtherValue)> { return self.and(value: value) } } diff --git a/Sources/NIOCore/EventLoopFuture+WithEventLoop.swift b/Sources/NIOCore/EventLoopFuture+WithEventLoop.swift index 4d105fa6e5..637f674242 100644 --- a/Sources/NIOCore/EventLoopFuture+WithEventLoop.swift +++ b/Sources/NIOCore/EventLoopFuture+WithEventLoop.swift @@ -41,7 +41,9 @@ extension EventLoopFuture { /// - returns: A future that will receive the eventual value. @inlinable @preconcurrency - public func flatMapWithEventLoop(_ callback: @escaping @Sendable (Value, EventLoop) -> EventLoopFuture) -> EventLoopFuture { + public func flatMapWithEventLoop( + _ callback: @escaping @Sendable (Value, EventLoop) -> EventLoopFuture + ) -> EventLoopFuture { let next = EventLoopPromise.makeUnleakablePromise(eventLoop: self.eventLoop) self._whenComplete { [eventLoop = self.eventLoop] in switch self._value! { @@ -75,7 +77,9 @@ extension EventLoopFuture { /// - returns: A future that will receive the recovered value. @inlinable @preconcurrency - public func flatMapErrorWithEventLoop(_ callback: @escaping @Sendable (Error, EventLoop) -> EventLoopFuture) -> EventLoopFuture { + public func flatMapErrorWithEventLoop( + _ callback: @escaping @Sendable (Error, EventLoop) -> EventLoopFuture + ) -> EventLoopFuture where Value: Sendable { let next = EventLoopPromise.makeUnleakablePromise(eventLoop: self.eventLoop) self._whenComplete { [eventLoop = self.eventLoop] in switch self._value! { @@ -114,10 +118,11 @@ extension EventLoopFuture { /// - returns: A new `EventLoopFuture` with the folded value whose callbacks run on `self.eventLoop`. @inlinable @preconcurrency - public func foldWithEventLoop( + public func foldWithEventLoop( _ futures: [EventLoopFuture], with combiningFunction: @escaping @Sendable (Value, OtherValue, EventLoop) -> EventLoopFuture - ) -> EventLoopFuture { + ) -> EventLoopFuture where Value: Sendable { + @Sendable func fold0(eventLoop: EventLoop) -> EventLoopFuture { let body = futures.reduce(self) { (f1: EventLoopFuture, f2: EventLoopFuture) -> EventLoopFuture in let newFuture = f1.and(f2).flatMap { (args: (Value, OtherValue)) -> EventLoopFuture in diff --git a/Sources/NIOCore/EventLoopFuture.swift b/Sources/NIOCore/EventLoopFuture.swift index 185e00fecc..55e1756ceb 100644 --- a/Sources/NIOCore/EventLoopFuture.swift +++ b/Sources/NIOCore/EventLoopFuture.swift @@ -178,30 +178,52 @@ public struct EventLoopPromise { /// /// - parameters: /// - value: The successful result of the operation. + @preconcurrency @inlinable - public func succeed(_ value: Value) { + public func succeed(_ value: Value) where Value: Sendable { self._resolve(value: .success(value)) } + /// Deliver a successful result to the associated `EventLoopFuture` object. + /// + /// - Note: The call to this method must happen on the same event loop as this promise was created from. + /// + /// - parameters: + /// - eventLoopBoundValue: The successful result of the operation. + @inlinable + public func succeed(eventLoopBoundValue: Value) { + self._resolve(eventLoopBoundResult: .success(eventLoopBoundValue)) + } + /// Deliver an error to the associated `EventLoopFuture` object. /// /// - parameters: /// - error: The error from the operation. @inlinable public func fail(_ error: Error) { - self._resolve(value: .failure(error)) + if self.futureResult.eventLoop.inEventLoop { + self.futureResult._setError(error)._run() + } else { + self.futureResult.eventLoop.execute { + self.futureResult._setError(error)._run() + } + } } /// Complete the promise with the passed in `EventLoopFuture`. /// /// This method is equivalent to invoking `future.cascade(to: promise)`, /// but sometimes may read better than its cascade counterpart. - /// + /// + /// - Note: The `Value` must be `Sendable` since the isolation domains of the passed future and this promise might differ i.e. + /// they might be bound to different event loops. + /// /// - parameters: /// - future: The future whose value will be used to succeed or fail this promise. /// - seealso: `EventLoopFuture.cascade(to:)` + @preconcurrency @inlinable - public func completeWith(_ future: EventLoopFuture) { + public func completeWith(_ future: EventLoopFuture) where Value: Sendable { future.cascade(to: self) } @@ -219,11 +241,33 @@ public struct EventLoopPromise { /// /// - parameters: /// - result: The result which will be used to succeed or fail this promise. + @preconcurrency @inlinable - public func completeWith(_ result: Result) { + public func completeWith(_ result: Result) where Value: Sendable { self._resolve(value: result) } + /// Complete the promise with the passed in `Result`. + /// + /// This method is equivalent to invoking: + /// ``` + /// switch result { + /// case .success(let value): + /// promise.succeed(value) + /// case .failure(let error): + /// promise.fail(error) + /// } + /// ``` + /// + /// - Note: The call to this method must happen on the same event loop as this promise was created from. + /// + /// - parameters: + /// - result: The result which will be used to succeed or fail this promise. + @inlinable + public func completeWith(eventLoopBoundResult: Result) { + self._resolve(eventLoopBoundResult: eventLoopBoundResult) + } + /// Fire the associated `EventLoopFuture` on the appropriate event loop. /// /// This method provides the primary difference between the `EventLoopPromise` and most @@ -233,7 +277,7 @@ public struct EventLoopPromise { /// - parameters: /// - value: The value to fire the future with. @inlinable - internal func _resolve(value: Result) { + internal func _resolve(value: Result) where Value: Sendable { if self.futureResult.eventLoop.inEventLoop { self._setValue(value: value)._run() } else { @@ -243,6 +287,23 @@ public struct EventLoopPromise { } } + /// Fire the associated `EventLoopFuture` on the appropriate event loop. + /// + /// This method provides the primary difference between the `EventLoopPromise` and most + /// other `Promise` implementations: specifically, all callbacks fire on the `EventLoop` + /// that was used to create the promise. + /// + /// - Note: The call to this method must happen on the same event loop as this promise was created from. + /// + /// - parameters: + /// - value: The value to fire the future with. + @inlinable + internal func _resolve(eventLoopBoundResult: Result) { + self.futureResult.eventLoop.assertInEventLoop() + + self._setValue(value: eventLoopBoundResult)._run() + } + /// Set the future result and get the associated callbacks. /// /// - parameters: @@ -464,19 +525,18 @@ extension EventLoopFuture { /// /// Note: In a sense, the `EventLoopFuture` is returned before it's created. /// + /// - Note: The `NewValue` must be `Sendable` since the isolation domains of this future and the future returned from the callback + /// might differ i.e. they might be bound to different event loops. + /// /// - parameters: /// - callback: Function that will receive the value of this `EventLoopFuture` and return /// a new `EventLoopFuture`. /// - returns: A future that will receive the eventual value. @inlinable @preconcurrency - public func flatMap(_ callback: @escaping @Sendable (Value) -> EventLoopFuture) -> EventLoopFuture { - self._flatMap(callback) - } - @usableFromInline typealias FlatMapCallback = @Sendable (Value) -> EventLoopFuture - - @inlinable - func _flatMap(_ callback: @escaping FlatMapCallback) -> EventLoopFuture { + public func flatMap( + _ callback: @escaping @Sendable (Value) -> EventLoopFuture + ) -> EventLoopFuture { let next = EventLoopPromise.makeUnleakablePromise(eventLoop: self.eventLoop) self._whenComplete { switch self._value! { @@ -507,19 +567,18 @@ extension EventLoopFuture { /// /// If your callback function throws, the returned `EventLoopFuture` will error. /// + /// - Note: The `NewValue` must be `Sendable` since the isolation domains of this future and the future returned from the callback + /// might differ i.e. they might be bound to different event loops. + /// /// - parameters: /// - callback: Function that will receive the value of this `EventLoopFuture` and return /// a new value lifted into a new `EventLoopFuture`. /// - returns: A future that will receive the eventual value. @inlinable @preconcurrency - public func flatMapThrowing(_ callback: @escaping @Sendable (Value) throws -> NewValue) -> EventLoopFuture { - self._flatMapThrowing(callback) - } - @usableFromInline typealias FlatMapThrowingCallback = @Sendable (Value) throws -> NewValue - - @inlinable - func _flatMapThrowing(_ callback: @escaping FlatMapThrowingCallback) -> EventLoopFuture { + public func flatMapThrowing( + _ callback: @escaping @Sendable (Value) throws -> NewValue + ) -> EventLoopFuture { let next = EventLoopPromise.makeUnleakablePromise(eventLoop: self.eventLoop) self._whenComplete { switch self._value! { @@ -553,13 +612,9 @@ extension EventLoopFuture { /// - returns: A future that will receive the eventual value or a rethrown error. @inlinable @preconcurrency - public func flatMapErrorThrowing(_ callback: @escaping @Sendable (Error) throws -> Value) -> EventLoopFuture { - self._flatMapErrorThrowing(callback) - } - @usableFromInline typealias FlatMapErrorThrowingCallback = @Sendable (Error) throws -> Value - - @inlinable - func _flatMapErrorThrowing(_ callback: @escaping FlatMapErrorThrowingCallback) -> EventLoopFuture { + public func flatMapErrorThrowing( + _ callback: @escaping @Sendable (Error) throws -> Value + ) -> EventLoopFuture { let next = EventLoopPromise.makeUnleakablePromise(eventLoop: self.eventLoop) self._whenComplete { switch self._value! { @@ -605,13 +660,9 @@ extension EventLoopFuture { /// - returns: A future that will receive the eventual value. @inlinable @preconcurrency - public func map(_ callback: @escaping @Sendable (Value) -> (NewValue)) -> EventLoopFuture { - self._map(callback) - } - @usableFromInline typealias MapCallback = @Sendable (Value) -> (NewValue) - - @inlinable - func _map(_ callback: @escaping MapCallback) -> EventLoopFuture { + public func map( + _ callback: @escaping @Sendable (Value) -> (NewValue) + ) -> EventLoopFuture { if NewValue.self == Value.self && NewValue.self == Void.self { self.whenSuccess(callback as! @Sendable (Value) -> Void) return self as! EventLoopFuture @@ -631,19 +682,18 @@ extension EventLoopFuture { /// /// If the callback cannot recover it should return a failed `EventLoopFuture`. /// + /// - Note: The `Value` must be `Sendable` since the isolation domains of this future and the future returned from the callback + /// might differ i.e. they might be bound to different event loops. + /// /// - parameters: /// - callback: Function that will receive the error value of this `EventLoopFuture` and return /// a new value lifted into a new `EventLoopFuture`. /// - returns: A future that will receive the recovered value. @inlinable @preconcurrency - public func flatMapError(_ callback: @escaping @Sendable (Error) -> EventLoopFuture) -> EventLoopFuture { - self._flatMapError(callback) - } - @usableFromInline typealias FlatMapErrorCallback = @Sendable (Error) -> EventLoopFuture - - @inlinable - func _flatMapError(_ callback: @escaping FlatMapErrorCallback) -> EventLoopFuture { + public func flatMapError( + _ callback: @escaping @Sendable (Error) -> EventLoopFuture + ) -> EventLoopFuture where Value: Sendable { let next = EventLoopPromise.makeUnleakablePromise(eventLoop: self.eventLoop) self._whenComplete { switch self._value! { @@ -679,13 +729,9 @@ extension EventLoopFuture { /// - returns: A future that will receive the eventual value. @inlinable @preconcurrency - public func flatMapResult(_ body: @escaping @Sendable (Value) -> Result) -> EventLoopFuture { - self._flatMapResult(body) - } - @usableFromInline typealias FlatMapResultCallback = @Sendable (Value) -> Result - - @inlinable - func _flatMapResult(_ body: @escaping FlatMapResultCallback) -> EventLoopFuture { + public func flatMapResult( + _ body: @escaping @Sendable (Value) -> Result + ) -> EventLoopFuture { let next = EventLoopPromise.makeUnleakablePromise(eventLoop: self.eventLoop) self._whenComplete { switch self._value! { @@ -718,12 +764,6 @@ extension EventLoopFuture { @inlinable @preconcurrency public func recover(_ callback: @escaping @Sendable (Error) -> Value) -> EventLoopFuture { - self._recover(callback) - } - @usableFromInline typealias RecoverCallback = @Sendable (Error) -> Value - - @inlinable - func _recover(_ callback: @escaping RecoverCallback) -> EventLoopFuture { let next = EventLoopPromise.makeUnleakablePromise(eventLoop: self.eventLoop) self._whenComplete { switch self._value! { @@ -736,10 +776,9 @@ extension EventLoopFuture { return next.futureResult } - @usableFromInline typealias AddCallbackCallback = @Sendable () -> CallbackList /// Add a callback. If there's already a value, invoke it and return the resulting list of new callback functions. @inlinable - internal func _addCallback(_ callback: @escaping AddCallbackCallback) -> CallbackList { + internal func _addCallback(_ callback: @escaping @Sendable () -> CallbackList) -> CallbackList { self.eventLoop.assertInEventLoop() if self._value == nil { self._callbacks.append(callback) @@ -754,11 +793,10 @@ extension EventLoopFuture { internal func _whenComplete(_ callback: @escaping @Sendable () -> CallbackList) { self._internalWhenComplete(callback) } - @usableFromInline typealias InternalWhenCompleteCallback = @Sendable () -> CallbackList /// Add a callback. If there's already a value, run as much of the chain as we can. @inlinable - internal func _internalWhenComplete(_ callback: @escaping InternalWhenCompleteCallback) { + internal func _internalWhenComplete(_ callback: @escaping @Sendable () -> CallbackList) { if self.eventLoop.inEventLoop { self._addCallback(callback)._run() } else { @@ -781,12 +819,6 @@ extension EventLoopFuture { @inlinable @preconcurrency public func whenSuccess(_ callback: @escaping @Sendable (Value) -> Void) { - self._whenSuccess(callback) - } - @usableFromInline typealias WhenSuccessCallback = @Sendable (Value) -> Void - - @inlinable - func _whenSuccess(_ callback: @escaping WhenSuccessCallback) { self._whenComplete { if case .success(let t) = self._value! { callback(t) @@ -808,12 +840,6 @@ extension EventLoopFuture { @inlinable @preconcurrency public func whenFailure(_ callback: @escaping @Sendable (Error) -> Void) { - self._whenFailure(callback) - } - @usableFromInline typealias WhenFailureCallback = @Sendable (Error) -> Void - - @inlinable - func _whenFailure(_ callback: @escaping WhenFailureCallback) { self._whenComplete { if case .failure(let e) = self._value! { callback(e) @@ -830,11 +856,6 @@ extension EventLoopFuture { @inlinable @preconcurrency public func whenComplete(_ callback: @escaping @Sendable (Result) -> Void) { - self._publicWhenComplete(callback) - } - @usableFromInline typealias WhenCompleteCallback = @Sendable (Result) -> Void - @inlinable - func _publicWhenComplete(_ callback: @escaping WhenCompleteCallback) { self._whenComplete { callback(self._value!) return CallbackList() @@ -853,6 +874,21 @@ extension EventLoopFuture { } return CallbackList() } + + /// Internal: Set the value and return a list of callbacks that should be invoked as a result. + /// + /// We need a seperate method for setting the error to avoid Sendable checking of `Value` + @inlinable + internal func _setError(_ error: Error) -> CallbackList { + self.eventLoop.assertInEventLoop() + if self._value == nil { + self._value = .failure(error) + let callbacks = self._callbacks + self._callbacks = CallbackList() + return callbacks + } + return CallbackList() + } } // MARK: and @@ -862,8 +898,14 @@ extension EventLoopFuture { /// provided `EventLoopFuture` both succeed. It then provides the pair /// of results. If either one fails, the combined `EventLoopFuture` will fail with /// the first error encountered. + /// + /// - Note: The `NewValue` must be `Sendable` since the isolation domains of this future and the other future might differ i.e. + /// they might be bound to different event loops. + @preconcurrency @inlinable - public func and(_ other: EventLoopFuture) -> EventLoopFuture<(Value, OtherValue)> { + public func and( + _ other: EventLoopFuture + ) -> EventLoopFuture<(Value, OtherValue)> { let promise = EventLoopPromise<(Value, OtherValue)>.makeUnleakablePromise(eventLoop: self.eventLoop) let box: UnsafeMutableTransferBox<(t:Value?, u: OtherValue?)> = .init((nil, nil)) @@ -903,8 +945,11 @@ extension EventLoopFuture { /// Return a new EventLoopFuture that contains this "and" another value. /// This is just syntactic sugar for `future.and(loop.makeSucceedFuture(value))`. + @preconcurrency @inlinable - public func and(value: OtherValue) -> EventLoopFuture<(Value, OtherValue)> { + public func and( + value: OtherValue // TODO: This should be transferring + ) -> EventLoopFuture<(Value, OtherValue)> { return self.and(EventLoopFuture(eventLoop: self.eventLoop, value: value)) } } @@ -931,10 +976,14 @@ extension EventLoopFuture { /// }.cascade(to: userPromise) /// ``` /// + /// - Note: The `Value` must be `Sendable` since the isolation domains of this future and the promise might differ i.e. + /// they might be bound to different event loops. + /// /// - Parameter to: The `EventLoopPromise` to fulfill with the results of this future. /// - SeeAlso: `EventLoopPromise.completeWith(_:)` + @preconcurrency @inlinable - public func cascade(to promise: EventLoopPromise?) { + public func cascade(to promise: EventLoopPromise?) where Value: Sendable { guard let promise = promise else { return } self.whenComplete { result in switch result { @@ -955,9 +1004,13 @@ extension EventLoopFuture { /// doWorkReturningInt().map({ $0 >= 0 }).cascade(to: boolPromise) /// ``` /// + /// - Note: The `Value` must be `Sendable` since the isolation domains of this future and the promise might differ i.e. + /// they might be bound to different event loops. + /// /// - Parameter to: The `EventLoopPromise` to fulfill when a successful result is available. + @preconcurrency @inlinable - public func cascadeSuccess(to promise: EventLoopPromise?) { + public func cascadeSuccess(to promise: EventLoopPromise?) where Value: Sendable { guard let promise = promise else { return } self.whenSuccess { promise.succeed($0) } } @@ -967,6 +1020,7 @@ extension EventLoopFuture { /// This is an alternative variant of `cascade` that allows you to potentially return early failures in /// error cases, while passing the user `EventLoopPromise` onwards. /// + /// /// - Parameter to: The `EventLoopPromise` that should fail with the error of this `EventLoopFuture`. @inlinable public func cascadeFailure(to promise: EventLoopPromise?) { @@ -989,16 +1043,14 @@ extension EventLoopFuture { /// /// This is also forbidden in async contexts: prefer ``EventLoopFuture/get()``. /// + /// - Note: The `Value` must be `Sendable` since it is shared outside of the isolation domain of the event loop. + /// /// - returns: The value of the `EventLoopFuture` when it completes. /// - throws: The error value of the `EventLoopFuture` if it errors. @available(*, noasync, message: "wait() can block indefinitely, prefer get()", renamed: "get()") + @preconcurrency @inlinable - public func wait(file: StaticString = #file, line: UInt = #line) throws -> Value { - return try self._wait(file: file, line: line) - } - - @inlinable - func _wait(file: StaticString, line: UInt) throws -> Value { + public func wait(file: StaticString = #file, line: UInt = #line) throws -> Value where Value: Sendable { self.eventLoop._preconditionSafeToWait(file: file, line: line) let v: UnsafeMutableTransferBox?> = .init(nil) @@ -1036,25 +1088,20 @@ extension EventLoopFuture { /// `EventLoopFuture` objects will no longer be waited for. This function therefore fails fast: once /// a failure is encountered, it will immediately fail the overall EventLoopFuture. /// + /// - Note: The `Value` and `NewValue` must be `Sendable` since the isolation domains of this future and the other futures might differ i.e. + /// they might be bound to different event loops. + /// /// - parameters: /// - futures: An array of `EventLoopFuture` to wait for. /// - with: A function that will be used to fold the values of two `EventLoopFuture`s and return a new value wrapped in an `EventLoopFuture`. /// - returns: A new `EventLoopFuture` with the folded value whose callbacks run on `self.eventLoop`. @inlinable @preconcurrency - public func fold( + public func fold( _ futures: [EventLoopFuture], with combiningFunction: @escaping @Sendable (Value, OtherValue) -> EventLoopFuture - ) -> EventLoopFuture { - self._fold(futures, with: combiningFunction) - } - @usableFromInline typealias FoldCallback = @Sendable (Value, OtherValue) -> EventLoopFuture - - @inlinable - func _fold( - _ futures: [EventLoopFuture], - with combiningFunction: @escaping FoldCallback - ) -> EventLoopFuture { + ) -> EventLoopFuture where Value: Sendable { + @Sendable func fold0() -> EventLoopFuture { let body = futures.reduce(self) { (f1: EventLoopFuture, f2: EventLoopFuture) -> EventLoopFuture in let newFuture = f1.and(f2).flatMap { (args: (Value, OtherValue)) -> EventLoopFuture in @@ -1096,6 +1143,9 @@ extension EventLoopFuture { /// `EventLoopFuture` objects will no longer be waited for. This function therefore fails fast: once /// a failure is encountered, it will immediately fail the overall `EventLoopFuture`. /// + /// - Note: The `Value` and `InputValue` must be `Sendable` since the isolation domains of this future and the other futures might differ i.e. + /// they might be bound to different event loops. + /// /// - parameters: /// - initialResult: An initial result to begin the reduction. /// - futures: An array of `EventLoopFuture` to wait for. @@ -1104,23 +1154,12 @@ extension EventLoopFuture { /// - returns: A new `EventLoopFuture` with the reduced value. @preconcurrency @inlinable - public static func reduce( - _ initialResult: Value, - _ futures: [EventLoopFuture], - on eventLoop: EventLoop, - _ nextPartialResult: @escaping @Sendable (Value, InputValue) -> Value - ) -> EventLoopFuture { - Self._reduce(initialResult, futures, on: eventLoop, nextPartialResult) - } - @usableFromInline typealias ReduceCallback = @Sendable (Value, InputValue) -> Value - - @inlinable - static func _reduce( + public static func reduce( _ initialResult: Value, _ futures: [EventLoopFuture], on eventLoop: EventLoop, - _ nextPartialResult: @escaping ReduceCallback - ) -> EventLoopFuture { + _ nextPartialResult: @escaping @Sendable (Value, InputValue) -> Value + ) -> EventLoopFuture where Value: Sendable { let f0 = eventLoop.makeSucceededFuture(initialResult) let body = f0.fold(futures) { (t: Value, u: InputValue) -> EventLoopFuture in @@ -1141,6 +1180,9 @@ extension EventLoopFuture { /// `EventLoopFuture` objects will no longer be waited for. This function therefore fails fast: once /// a failure is encountered, it will immediately fail the overall `EventLoopFuture`. /// + /// - Note: The `Value` and `InputValue` must be `Sendable` since the isolation domains of this future and the other futures might differ i.e. + /// they might be bound to different event loops. + /// /// - parameters: /// - initialResult: An initial result to begin the reduction. /// - futures: An array of `EventLoopFuture` to wait for. @@ -1149,36 +1191,27 @@ extension EventLoopFuture { /// - returns: A new `EventLoopFuture` with the combined value. @inlinable @preconcurrency - public static func reduce( + public static func reduce( into initialResult: Value, _ futures: [EventLoopFuture], on eventLoop: EventLoop, _ updateAccumulatingResult: @escaping @Sendable (inout Value, InputValue) -> Void - ) -> EventLoopFuture { - Self._reduce(into: initialResult, futures, on: eventLoop, updateAccumulatingResult) - } - @usableFromInline typealias ReduceIntoCallback = @Sendable (inout Value, InputValue) -> Void - - @inlinable - static func _reduce( - into initialResult: Value, - _ futures: [EventLoopFuture], - on eventLoop: EventLoop, - _ updateAccumulatingResult: @escaping ReduceIntoCallback - ) -> EventLoopFuture { + ) -> EventLoopFuture where Value: Sendable { let p0 = eventLoop.makePromise(of: Value.self) - var value: Value = initialResult + let value = NIOLoopBoundBox(_value: initialResult, uncheckedEventLoop: eventLoop) let f0 = eventLoop.makeSucceededFuture(()) let future = f0.fold(futures) { (_: (), newValue: InputValue) -> EventLoopFuture in eventLoop.assertInEventLoop() - updateAccumulatingResult(&value, newValue) + var v = value.value + updateAccumulatingResult(&v, newValue) + value.value = v return eventLoop.makeSucceededFuture(()) } future.whenSuccess { eventLoop.assertInEventLoop() - p0.succeed(value) + p0.succeed(value.value) } future.whenFailure { (error) in eventLoop.assertInEventLoop() @@ -1203,7 +1236,10 @@ extension EventLoopFuture { /// - on: The `EventLoop` on which the new `EventLoopFuture` callbacks will execute on. /// - Returns: A new `EventLoopFuture` that waits for the other futures to succeed. @inlinable - public static func andAllSucceed(_ futures: [EventLoopFuture], on eventLoop: EventLoop) -> EventLoopFuture { + public static func andAllSucceed( + _ futures: [EventLoopFuture], + on eventLoop: EventLoop + ) -> EventLoopFuture { let promise = eventLoop.makePromise(of: Void.self) EventLoopFuture.andAllSucceed(futures, promise: promise) return promise.futureResult @@ -1218,14 +1254,17 @@ extension EventLoopFuture { /// - futures: An array of homogenous `EventLoopFutures`s to wait for. /// - promise: The `EventLoopPromise` to complete with the result of this call. @inlinable - public static func andAllSucceed(_ futures: [EventLoopFuture], promise: EventLoopPromise) { + public static func andAllSucceed( + _ futures: [EventLoopFuture], + promise: EventLoopPromise + ) { let eventLoop = promise.futureResult.eventLoop if eventLoop.inEventLoop { - self._reduceSuccesses0(promise, futures, eventLoop, onValue: { _, _ in }) + self._reduceSuccesses0(promise, futures, eventLoop) } else { eventLoop.execute { - self._reduceSuccesses0(promise, futures, eventLoop, onValue: { _, _ in }) + self._reduceSuccesses0(promise, futures, eventLoop) } } } @@ -1234,11 +1273,19 @@ extension EventLoopFuture { /// The new `EventLoopFuture` will contain all of the values fulfilled by the futures. /// /// The returned `EventLoopFuture` will fail as soon as any of the futures fails. + /// + /// - Note: The `Value` must be `Sendable` since the isolation domains of the futures might differ i.e. + /// they might be bound to different event loops. + /// /// - Parameters: /// - futures: An array of homogenous `EventLoopFuture`s to wait on for fulfilled values. /// - on: The `EventLoop` on which the new `EventLoopFuture` callbacks will fire. /// - Returns: A new `EventLoopFuture` with all of the values fulfilled by the provided futures. - public static func whenAllSucceed(_ futures: [EventLoopFuture], on eventLoop: EventLoop) -> EventLoopFuture<[Value]> { + @preconcurrency + public static func whenAllSucceed( + _ futures: [EventLoopFuture], + on eventLoop: EventLoop + ) -> EventLoopFuture<[Value]> where Value: Sendable { let promise = eventLoop.makePromise(of: [Value].self) EventLoopFuture.whenAllSucceed(futures, promise: promise) return promise.futureResult @@ -1249,10 +1296,17 @@ extension EventLoopFuture { /// /// If the _results of all futures should be collected use `andAllComplete` instead. /// + /// - Note: The `Value` must be `Sendable` since the isolation domains of the futures might differ i.e. + /// they might be bound to different event loops. + /// /// - Parameters: /// - futures: An array of homogenous `EventLoopFutures`s to wait for. /// - promise: The `EventLoopPromise` to complete with the result of this call. - public static func whenAllSucceed(_ futures: [EventLoopFuture], promise: EventLoopPromise<[Value]>) { + @preconcurrency + public static func whenAllSucceed( + _ futures: [EventLoopFuture], + promise: EventLoopPromise<[Value]> + ) where Value: Sendable { let eventLoop = promise.futureResult.eventLoop let reduced = eventLoop.makePromise(of: Void.self) @@ -1281,7 +1335,6 @@ extension EventLoopFuture { } } - @usableFromInline typealias ReduceSuccessCallback = @Sendable (Int, InputValue) -> Void /// Loops through the futures array and attaches callbacks to execute `onValue` on the provided `EventLoop` when /// they succeed. The `onValue` will receive the index of the future that fulfilled the provided `Result`. /// @@ -1292,25 +1345,26 @@ extension EventLoopFuture { _ promise: EventLoopPromise, _ futures: [EventLoopFuture], _ eventLoop: EventLoop, - onValue: @escaping ReduceSuccessCallback - ) { + onValue: @escaping @Sendable (Int, InputValue) -> Void + ) where InputValue: Sendable { eventLoop.assertInEventLoop() - var remainingCount = futures.count - - if remainingCount == 0 { + if futures.count == 0 { promise.succeed(()) return } + let remainingCount = NIOLoopBoundBox(_value: futures.count, uncheckedEventLoop: eventLoop) + // Sends the result to `onValue` in case of success and succeeds/fails the input promise, if appropriate. + @Sendable func processResult(_ index: Int, _ result: Result) { switch result { case .success(let result): onValue(index, result) - remainingCount -= 1 + remainingCount.value -= 1 - if remainingCount == 0 { + if remainingCount.value == 0 { promise.succeed(()) } case .failure(let error): @@ -1334,6 +1388,67 @@ extension EventLoopFuture { } } } + + /// Loops through the futures array and attaches callbacks to execute `onValue` on the provided `EventLoop` when + /// they succeed. The `onValue` will receive the index of the future that fulfilled the provided `Result`. + /// + /// Once all the futures have succeed, the provided promise will succeed. + /// Once any future fails, the provided promise will fail. + @inlinable + internal static func _reduceSuccesses0( + _ promise: EventLoopPromise, + _ futures: [EventLoopFuture], + _ eventLoop: EventLoop + ) { + eventLoop.assertInEventLoop() + + if futures.count == 0 { + promise.succeed(()) + return + } + + let remainingCount = NIOLoopBoundBox(_value: futures.count, uncheckedEventLoop: eventLoop) + + // Sends the result to `onValue` in case of success and succeeds/fails the input promise, if appropriate. + @Sendable + func processResult(_ index: Int, _ result: Result) { + switch result { + case .success: + remainingCount.value -= 1 + + if remainingCount.value == 0 { + promise.succeed(()) + } + case .failure(let error): + promise.fail(error) + } + } + // loop through the futures to chain callbacks to execute on the initiating event loop and grab their index + // in the "futures" to pass their result to the caller + for (index, future) in futures.enumerated() { + if future.eventLoop.inEventLoop, + let result = future._value { + // Fast-track already-fulfilled results without the overhead of calling `whenComplete`. This can yield a + // ~20% performance improvement in the case of large arrays where all elements are already fulfilled. + switch result { + case .success: + processResult(index, .success(())) + case .failure(let error): + processResult(index, .failure(error)) + } + if case .failure = result { + return // Once the promise is failed, future results do not need to be processed. + } + } else { + // We have to map to `Void` here to avoid sharing the potentially non-Sendable + // value across event loops. + future + .map { _ in () } + .hop(to: eventLoop) + .whenComplete { result in processResult(index, result) } + } + } + } } // MARK: "fail slow" reduce @@ -1350,7 +1465,10 @@ extension EventLoopFuture { /// - on: The `EventLoop` on which the new `EventLoopFuture` callbacks will execute on. /// - Returns: A new `EventLoopFuture` that succeeds after all futures complete. @inlinable - public static func andAllComplete(_ futures: [EventLoopFuture], on eventLoop: EventLoop) -> EventLoopFuture { + public static func andAllComplete( + _ futures: [EventLoopFuture], + on eventLoop: EventLoop + ) -> EventLoopFuture { let promise = eventLoop.makePromise(of: Void.self) EventLoopFuture.andAllComplete(futures, promise: promise) return promise.futureResult @@ -1366,14 +1484,17 @@ extension EventLoopFuture { /// - futures: An array of homogenous `EventLoopFuture`s to wait for. /// - promise: The `EventLoopPromise` to succeed when all futures have completed. @inlinable - public static func andAllComplete(_ futures: [EventLoopFuture], promise: EventLoopPromise) { + public static func andAllComplete( + _ futures: [EventLoopFuture], + promise: EventLoopPromise + ) { let eventLoop = promise.futureResult.eventLoop if eventLoop.inEventLoop { - self._reduceCompletions0(promise, futures, eventLoop, onResult: { _, _ in }) + self._reduceCompletions0(promise, futures, eventLoop) } else { eventLoop.execute { - self._reduceCompletions0(promise, futures, eventLoop, onResult: { _, _ in }) + self._reduceCompletions0(promise, futures, eventLoop) } } } @@ -1383,15 +1504,21 @@ extension EventLoopFuture { /// /// The returned `EventLoopFuture` always succeeds, regardless of any failures from the waiting futures. /// + /// - Note: The `Value` must be `Sendable` since the isolation domains of the futures might differ i.e. + /// they might be bound to different event loops. + /// /// If it is desired to flatten them into a single `EventLoopFuture` that fails on the first `EventLoopFuture` failure, /// use one of the `reduce` methods instead. /// - Parameters: /// - futures: An array of homogenous `EventLoopFuture`s to gather results from. /// - on: The `EventLoop` on which the new `EventLoopFuture` callbacks will fire. /// - Returns: A new `EventLoopFuture` with all the results of the provided futures. + @preconcurrency @inlinable - public static func whenAllComplete(_ futures: [EventLoopFuture], - on eventLoop: EventLoop) -> EventLoopFuture<[Result]> { + public static func whenAllComplete( + _ futures: [EventLoopFuture], + on eventLoop: EventLoop + ) -> EventLoopFuture<[Result]> where Value: Sendable { let promise = eventLoop.makePromise(of: [Result].self) EventLoopFuture.whenAllComplete(futures, promise: promise) return promise.futureResult @@ -1401,12 +1528,18 @@ extension EventLoopFuture { /// /// The promise will always be succeeded, regardless of the outcome of the futures. /// + /// - Note: The `Value` must be `Sendable` since the isolation domains of the futures might differ i.e. + /// they might be bound to different event loops. + /// /// - Parameters: /// - futures: An array of homogenous `EventLoopFuture`s to gather results from. /// - promise: The `EventLoopPromise` to complete with the result of the futures. + @preconcurrency @inlinable - public static func whenAllComplete(_ futures: [EventLoopFuture], - promise: EventLoopPromise<[Result]>) { + public static func whenAllComplete( + _ futures: [EventLoopFuture], + promise: EventLoopPromise<[Result]> + ) where Value: Sendable { let eventLoop = promise.futureResult.eventLoop let reduced = eventLoop.makePromise(of: Void.self) @@ -1439,34 +1572,33 @@ extension EventLoopFuture { } } - @usableFromInline typealias ReduceCompletions = @Sendable (Int, Result) -> Void - /// Loops through the futures array and attaches callbacks to execute `onResult` on the provided `EventLoop` when /// they complete. The `onResult` will receive the index of the future that fulfilled the provided `Result`. /// /// Once all the futures have completed, the provided promise will succeed. @inlinable - internal static func _reduceCompletions0( + internal static func _reduceCompletions0( _ promise: EventLoopPromise, _ futures: [EventLoopFuture], _ eventLoop: EventLoop, - onResult: @escaping ReduceCompletions + onResult: @escaping @Sendable (Int, Result) -> Void ) { eventLoop.assertInEventLoop() - var remainingCount = futures.count - - if remainingCount == 0 { + if futures.count == 0 { promise.succeed(()) return } + let remainingCount = NIOLoopBoundBox(_value: futures.count, uncheckedEventLoop: eventLoop) + // Sends the result to `onResult` in case of success and succeeds the input promise, if appropriate. + @Sendable func processResult(_ index: Int, _ result: Result) { onResult(index, result) - remainingCount -= 1 + remainingCount.value -= 1 - if remainingCount == 0 { + if remainingCount.value == 0 { promise.succeed(()) } } @@ -1484,6 +1616,58 @@ extension EventLoopFuture { } } } + + /// Loops through the futures array and attaches callbacks to execute `onResult` on the provided `EventLoop` when + /// they complete. The `onResult` will receive the index of the future that fulfilled the provided `Result`. + /// + /// Once all the futures have completed, the provided promise will succeed. + @inlinable + internal static func _reduceCompletions0( + _ promise: EventLoopPromise, + _ futures: [EventLoopFuture], + _ eventLoop: EventLoop + ) { + eventLoop.assertInEventLoop() + + if futures.count == 0 { + promise.succeed(()) + return + } + + let remainingCount = NIOLoopBoundBox(_value: futures.count, uncheckedEventLoop: eventLoop) + + // Sends the result to `onResult` in case of success and succeeds the input promise, if appropriate. + @Sendable + func processResult(_ index: Int, _ result: Result) { + remainingCount.value -= 1 + + if remainingCount.value == 0 { + promise.succeed(()) + } + } + // loop through the futures to chain callbacks to execute on the initiating event loop and grab their index + // in the "futures" to pass their result to the caller + for (index, future) in futures.enumerated() { + if future.eventLoop.inEventLoop, + let result = future._value { + // Fast-track already-fulfilled results without the overhead of calling `whenComplete`. This can yield a + // ~30% performance improvement in the case of large arrays where all elements are already fulfilled. + switch result { + case .success: + processResult(index, .success(())) + case .failure(let error): + processResult(index, .failure(error)) + } + } else { + // We have to map to `Void` here to avoid sharing the potentially non-Sendable + // value across event loops. + future + .map { _ in () } + .hop(to: eventLoop) + .whenComplete { result in processResult(index, result) } + } + } + } } // MARK: hop @@ -1497,11 +1681,14 @@ extension EventLoopFuture { /// succinctly. It also contains an optimisation for the case when the loop you're hopping *from* is the same as /// the one you're hopping *to*, allowing you to avoid doing allocations in that case. /// + /// - Note: The `Value` must be `Sendable` since it is shared with the isolation domain of the target event loop. + /// /// - parameters: /// - to: The `EventLoop` that the returned `EventLoopFuture` will run on. /// - returns: An `EventLoopFuture` whose callbacks run on `target` instead of the original loop. + @preconcurrency @inlinable - public func hop(to target: EventLoop) -> EventLoopFuture { + public func hop(to target: EventLoop) -> EventLoopFuture where Value: Sendable { if target === self.eventLoop { // We're already on that event loop, nothing to do here. Save an allocation. return self @@ -1524,12 +1711,6 @@ extension EventLoopFuture { @inlinable @preconcurrency public func always(_ callback: @escaping @Sendable (Result) -> Void) -> EventLoopFuture { - self._always(callback) - } - @usableFromInline typealias AlwaysCallback = @Sendable (Result) -> Void - - @inlinable - func _always(_ callback: @escaping AlwaysCallback) -> EventLoopFuture { self.whenComplete { result in callback(result) } return self } @@ -1577,8 +1758,11 @@ extension EventLoopFuture { /// - parameters: /// - orReplace: the value of the returned `EventLoopFuture` when then resolved future's value is `Optional.some()`. /// - returns: an new `EventLoopFuture` with new type parameter `NewValue` and the value passed in the `orReplace` parameter. + @preconcurrency @inlinable - public func unwrap(orReplace replacement: NewValue) -> EventLoopFuture where Value == Optional { + public func unwrap( + orReplace replacement: NewValue + ) -> EventLoopFuture where Value == Optional { return self.map { (value) -> NewValue in guard let value = value else { return replacement @@ -1605,14 +1789,6 @@ extension EventLoopFuture { @preconcurrency public func unwrap( orElse callback: @escaping @Sendable () -> NewValue - ) -> EventLoopFuture where Value == Optional { - self._unwrap(orElse: callback) - } - @usableFromInline typealias UnwrapCallback = @Sendable () -> NewValue - - @inlinable - func _unwrap( - orElse callback: @escaping UnwrapCallback ) -> EventLoopFuture where Value == Optional { return self.map { (value) -> NewValue in guard let value = value else { @@ -1632,25 +1808,18 @@ extension EventLoopFuture { /// blockingTask(value) /// } /// + /// - Note: The `Value` and `NewValue` must be `Sendable` since it is shared between the isolation region queue and the event loop. + /// /// - parameters: /// - onto: the `DispatchQueue` on which the blocking IO / task specified by `callbackMayBlock` is scheduled. /// - callbackMayBlock: Function that will receive the value of this `EventLoopFuture` and return /// a new `EventLoopFuture`. @inlinable @preconcurrency - public func flatMapBlocking( + public func flatMapBlocking( onto queue: DispatchQueue, _ callbackMayBlock: @escaping @Sendable (Value) throws -> NewValue - ) -> EventLoopFuture { - self._flatMapBlocking(onto: queue, callbackMayBlock) - } - @usableFromInline typealias FlatMapBlockingCallback = @Sendable (Value) throws -> NewValue - - @inlinable - func _flatMapBlocking( - onto queue: DispatchQueue, - _ callbackMayBlock: @escaping FlatMapBlockingCallback - ) -> EventLoopFuture { + ) -> EventLoopFuture where Value: Sendable { return self.flatMap { result in queue.asyncWithFuture(eventLoop: self.eventLoop) { try callbackMayBlock(result) } } @@ -1664,11 +1833,17 @@ extension EventLoopFuture { /// If you find yourself passing the results from this `EventLoopFuture` to a new `EventLoopPromise` /// in the body of this function, consider using `cascade` instead. /// + /// - Note: The `NewValue` must be `Sendable` since it is shared between the isolation region queue and the event loop. + /// /// - parameters: /// - onto: the `DispatchQueue` on which the blocking IO / task specified by `callbackMayBlock` is scheduled. /// - callbackMayBlock: The callback that is called with the successful result of the `EventLoopFuture`. + @preconcurrency @inlinable - public func whenSuccessBlocking(onto queue: DispatchQueue, _ callbackMayBlock: @escaping (Value) -> Void) { + public func whenSuccessBlocking( + onto queue: DispatchQueue, + _ callbackMayBlock: @escaping @Sendable (Value) -> Void + ) where Value: Sendable { self.whenSuccess { value in queue.async { callbackMayBlock(value) } } @@ -1687,13 +1862,10 @@ extension EventLoopFuture { /// - callbackMayBlock: The callback that is called with the failed result of the `EventLoopFuture`. @inlinable @preconcurrency - public func whenFailureBlocking(onto queue: DispatchQueue, _ callbackMayBlock: @escaping @Sendable (Error) -> Void) { - self._whenFailureBlocking(onto: queue, callbackMayBlock) - } - @usableFromInline typealias WhenFailureBlockingCallback = @Sendable (Error) -> Void - - @inlinable - func _whenFailureBlocking(onto queue: DispatchQueue, _ callbackMayBlock: @escaping WhenFailureBlockingCallback) { + public func whenFailureBlocking( + onto queue: DispatchQueue, + _ callbackMayBlock: @escaping @Sendable (Error) -> Void + ) { self.whenFailure { err in queue.async { callbackMayBlock(err) } } @@ -1702,18 +1874,17 @@ extension EventLoopFuture { /// Adds an observer callback to this `EventLoopFuture` that is called when the /// `EventLoopFuture` has any result. The observer callback is permitted to block. /// + /// - Note: The `NewValue` must be `Sendable` since it is shared between the isolation region queue and the event loop. + /// /// - parameters: /// - onto: the `DispatchQueue` on which the blocking IO / task specified by `callbackMayBlock` is scheduled. /// - callbackMayBlock: The callback that is called when the `EventLoopFuture` is fulfilled. @inlinable @preconcurrency - public func whenCompleteBlocking(onto queue: DispatchQueue, _ callbackMayBlock: @escaping @Sendable (Result) -> Void) { - self._whenCompleteBlocking(onto: queue, callbackMayBlock) - } - @usableFromInline typealias WhenCompleteBlocking = @Sendable (Result) -> Void - - @inlinable - func _whenCompleteBlocking(onto queue: DispatchQueue, _ callbackMayBlock: @escaping WhenCompleteBlocking) { + public func whenCompleteBlocking( + onto queue: DispatchQueue, + _ callbackMayBlock: @escaping @Sendable (Result) -> Void + ) where Value: Sendable { self.whenComplete { value in queue.async { callbackMayBlock(value) } } @@ -1825,13 +1996,19 @@ public struct _NIOEventLoopFutureIdentifier: Hashable, Sendable { } } -// EventLoopPromise is a reference type, but by its very nature is Sendable (if its Value is). -extension EventLoopPromise: Sendable where Value: Sendable { } +// The promise and future are both unchecked Sendable following the below isolation rules this is safe +// +// 1. Receiving the value of the future is always done on the EventLoop of the future, hence +// the value is never transferred out of the event loops isolation domain. It only gets transferred +// by certain methods such as `hop()` and those methods are annotated with requiring the Value to be +// Sendable +// 2. The promise is `Sendable` but fulfilling the promise with a value requires the user to +// transfer the value to the promise. This ensures that the value is now isolated to the event loops +// isolation domain. Note: Sendable values can always be transferred + +extension EventLoopPromise: @unchecked Sendable { } -// EventLoopFuture is a reference type, but it is Sendable (if its Value is). However, we enforce -// that by way of the guarantees of the EventLoop protocol, so the compiler cannot -// check it. -extension EventLoopFuture: @unchecked Sendable where Value: Sendable { } +extension EventLoopFuture: @unchecked Sendable { } extension EventLoopPromise where Value == Void { // Deliver a successful result to the associated `EventLoopFuture` object. diff --git a/Tests/NIOPosixTests/EventLoopFutureTest.swift b/Tests/NIOPosixTests/EventLoopFutureTest.swift index d7939adc31..54dbdf4363 100644 --- a/Tests/NIOPosixTests/EventLoopFutureTest.swift +++ b/Tests/NIOPosixTests/EventLoopFutureTest.swift @@ -17,6 +17,7 @@ import Dispatch @testable import NIOCore import NIOEmbedded import NIOPosix +import NIOConcurrencyHelpers enum EventLoopFutureTestError : Error { case example @@ -1380,18 +1381,19 @@ class EventLoopFutureTest : XCTestCase { func testWhenSuccessBlocking() { let eventLoop = EmbeddedEventLoop() let sem = DispatchSemaphore(value: 0) - var nonBlockingRan = false + let nonBlockingRan = NIOLockedValueBox(false) let p = eventLoop.makePromise(of: String.self) p.futureResult.whenSuccessBlocking(onto: DispatchQueue.global()) { sem.wait() // Block in callback XCTAssertEqual($0, "hello") - XCTAssertTrue(nonBlockingRan) + nonBlockingRan.withLockedValue { XCTAssertTrue($0) } + } p.succeed("hello") let p2 = eventLoop.makePromise(of: Bool.self) p2.futureResult.whenSuccess { _ in - nonBlockingRan = true + nonBlockingRan.withLockedValue { $0 = true } } p2.succeed(true) From 5d20621e1fae0f6eb13f8d7282f8c79e89608f1d Mon Sep 17 00:00:00 2001 From: Franz Busch Date: Mon, 19 Feb 2024 15:02:56 +0000 Subject: [PATCH 2/2] George review --- Sources/NIOCore/EventLoop.swift | 2 +- Sources/NIOCore/EventLoopFuture.swift | 73 +++++---------------------- 2 files changed, 14 insertions(+), 61 deletions(-) diff --git a/Sources/NIOCore/EventLoop.swift b/Sources/NIOCore/EventLoop.swift index b2fe4c8f14..4c9d6c6018 100644 --- a/Sources/NIOCore/EventLoop.swift +++ b/Sources/NIOCore/EventLoop.swift @@ -736,7 +736,7 @@ extension EventLoop { /// - returns: An `EventLoopFuture` identical to the `EventLoopFuture` returned from `task`. @inlinable @preconcurrency - public func flatSubmit(_ task: @escaping @Sendable () -> EventLoopFuture) -> EventLoopFuture { // TODO: This should take a closure that returns fresh + public func flatSubmit(_ task: @escaping @Sendable () -> EventLoopFuture) -> EventLoopFuture { self.submit(task).flatMap { $0 } } diff --git a/Sources/NIOCore/EventLoopFuture.swift b/Sources/NIOCore/EventLoopFuture.swift index 55e1756ceb..4e58738844 100644 --- a/Sources/NIOCore/EventLoopFuture.swift +++ b/Sources/NIOCore/EventLoopFuture.swift @@ -184,17 +184,6 @@ public struct EventLoopPromise { self._resolve(value: .success(value)) } - /// Deliver a successful result to the associated `EventLoopFuture` object. - /// - /// - Note: The call to this method must happen on the same event loop as this promise was created from. - /// - /// - parameters: - /// - eventLoopBoundValue: The successful result of the operation. - @inlinable - public func succeed(eventLoopBoundValue: Value) { - self._resolve(eventLoopBoundResult: .success(eventLoopBoundValue)) - } - /// Deliver an error to the associated `EventLoopFuture` object. /// /// - parameters: @@ -247,27 +236,6 @@ public struct EventLoopPromise { self._resolve(value: result) } - /// Complete the promise with the passed in `Result`. - /// - /// This method is equivalent to invoking: - /// ``` - /// switch result { - /// case .success(let value): - /// promise.succeed(value) - /// case .failure(let error): - /// promise.fail(error) - /// } - /// ``` - /// - /// - Note: The call to this method must happen on the same event loop as this promise was created from. - /// - /// - parameters: - /// - result: The result which will be used to succeed or fail this promise. - @inlinable - public func completeWith(eventLoopBoundResult: Result) { - self._resolve(eventLoopBoundResult: eventLoopBoundResult) - } - /// Fire the associated `EventLoopFuture` on the appropriate event loop. /// /// This method provides the primary difference between the `EventLoopPromise` and most @@ -287,23 +255,6 @@ public struct EventLoopPromise { } } - /// Fire the associated `EventLoopFuture` on the appropriate event loop. - /// - /// This method provides the primary difference between the `EventLoopPromise` and most - /// other `Promise` implementations: specifically, all callbacks fire on the `EventLoop` - /// that was used to create the promise. - /// - /// - Note: The call to this method must happen on the same event loop as this promise was created from. - /// - /// - parameters: - /// - value: The value to fire the future with. - @inlinable - internal func _resolve(eventLoopBoundResult: Result) { - self.futureResult.eventLoop.assertInEventLoop() - - self._setValue(value: eventLoopBoundResult)._run() - } - /// Set the future result and get the associated callbacks. /// /// - parameters: @@ -1435,17 +1386,17 @@ extension EventLoopFuture { processResult(index, .success(())) case .failure(let error): processResult(index, .failure(error)) - } - if case .failure = result { - return // Once the promise is failed, future results do not need to be processed. + return } } else { // We have to map to `Void` here to avoid sharing the potentially non-Sendable // value across event loops. - future - .map { _ in () } - .hop(to: eventLoop) - .whenComplete { result in processResult(index, result) } + future.whenComplete { result in + let voidResult = result.map { _ in } + future.eventLoop.execute { + processResult(index, voidResult) + } + } } } } @@ -1661,10 +1612,12 @@ extension EventLoopFuture { } else { // We have to map to `Void` here to avoid sharing the potentially non-Sendable // value across event loops. - future - .map { _ in () } - .hop(to: eventLoop) - .whenComplete { result in processResult(index, result) } + future.whenComplete { result in + let voidResult = result.map { _ in } + future.eventLoop.execute { + processResult(index, voidResult) + } + } } } }