From 178634cbb1d77d78363b91634f941c401f5f8c16 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcos=20S=C3=A1nchez-Dehesa=20Carballo?= Date: Wed, 26 Aug 2020 23:14:44 +0200 Subject: [PATCH] AsyncMap & AsyncTryMap removal --- README.md | 25 +- sources/conbini/operators/AsyncMapOp.swift | 47 --- sources/conbini/publishers/AsyncMap.swift | 251 --------------- sources/conbini/publishers/AsyncTryMap.swift | 290 ----------------- .../publishers/AsyncMapTests.swift | 299 ------------------ 5 files changed, 2 insertions(+), 910 deletions(-) delete mode 100644 sources/conbini/operators/AsyncMapOp.swift delete mode 100644 sources/conbini/publishers/AsyncMap.swift delete mode 100644 sources/conbini/publishers/AsyncTryMap.swift delete mode 100644 tests/conbiniTests/publishers/AsyncMapTests.swift diff --git a/README.md b/README.md index 512703f..65064f5 100644 --- a/README.md +++ b/README.md @@ -24,7 +24,7 @@ import PackageDescription let package = Package( /* Your package name, supported platforms, and generated products go here */ dependencies: [ - .package(url: "https://github.com/dehesa/Conbini.git", from: "0.5.1") + .package(url: "https://github.com/dehesa/Conbini.git", from: "0.6.0") ], targets: [ .target(name: /* Your target name here */, dependencies: ["Conbini"]) @@ -97,25 +97,6 @@ let publisher = setConfigurationOnServer.then { This operator optionally lets you control backpressure with its `maxDemand` parameter. The parameter behaves like `flatMap`'s `maxPublishers`, which specifies the maximum demand requested to the upstream at any given time. -

- -
asyncMap(_:)

- -It transforms elements received from upstream (similar to `map`), but the result is returned from a promise instead of using the `return` statement. Furthermore, promises can be called multipled times effectively transforming one upstream value into many outputs. - -```swift -let publisher = [1, 10, 100].publisher.asyncMap { (value, isCancelled, promise) in - queue.asyncAfter(deadline: ....) { - guard isCancelled else { return } - promise(newValue1, .continue) - promise(newValue2, .continue) - promise(newValue3, .finished) - } -} -``` - -This operator also provides a `try` variant accepting a result (instead of a value). -

@@ -399,8 +380,6 @@ let emittedValue = publisherChain.expectsOne(timeout: 0.8, on: test) - Apple's [Combine documentation](https://developer.apple.com/documentation/combine). - [The Combine book](https://store.raywenderlich.com/products/combine-asynchronous-programming-with-swift) is an excellent Ray Wenderlich book about the Combine framework. -- [Cocoa with love](https://www.cocoawithlove.com) has a great series of articles about the inner workings of Combine: [1. Protocols](https://www.cocoawithlove.com/blog/twenty-two-short-tests-of-combine-part-1.html), [2. Sharing](https://www.cocoawithlove.com/blog/twenty-two-short-tests-of-combine-part-2.html), [3. Asynchrony](https://www.cocoawithlove.com/blog/twenty-two-short-tests-of-combine-part-3.html). +- [Cocoa with love](https://www.cocoawithlove.com) has a great series of articles about the inner workings of Combine: [1](https://www.cocoawithlove.com/blog/twenty-two-short-tests-of-combine-part-1.html), [2](https://www.cocoawithlove.com/blog/twenty-two-short-tests-of-combine-part-2.html), [3](https://www.cocoawithlove.com/blog/twenty-two-short-tests-of-combine-part-3.html). - [OpenCombine](https://github.com/broadwaylamb/OpenCombine) is an open source implementation of Apple's Combine framework. - [CombineX](https://github.com/cx-org/CombineX) is an open source implementation of Apple's Combine framework. - -> This framework name references both the `Combine` framework and the helpful Japanese convenience stores 😄 diff --git a/sources/conbini/operators/AsyncMapOp.swift b/sources/conbini/operators/AsyncMapOp.swift deleted file mode 100644 index 5c20a48..0000000 --- a/sources/conbini/operators/AsyncMapOp.swift +++ /dev/null @@ -1,47 +0,0 @@ -import Combine - -extension Publisher { - /// Transforms all elements from the upstream publisher with a provided closure, giving the option to send several ouput values from the closure. - /// - /// The publisher will only complete if the upstream has completed and all the value transformation have returned at some point `promise(transformedValue, .finished)`. - /// - /// Also, it is worth noticing that `promise`s return a `Permission` enum value indicating whether the subscriber wants to receive more values. - /// ``` - /// [1, 10, 100].publisher.asyncMap { (value, isCancelled, promise) in - /// queue.asyncAfter(deadline: ...) { - /// guard !isCancelled else { return } - /// - /// promise(String(value), .continue) - /// promise(String(value*2), .continue) - /// promise(String(value*4), .finished) - /// } - /// } - /// ``` - /// - parameter parallel: The maximum number of values being processed at a time. Since the processing is returned in a promise, many upstream values can be processed at a single time point. - /// - parameter transform: A closure that takes the upstream emitted value and expects a promise to be called with the transformed result. - /// - returns: A publisher with output `T` and failure `Upstream.Failure`. - @inlinable public func asyncMap(parallel: Subscribers.Demand, _ transform: @escaping Publishers.AsyncMap.Closure) -> Publishers.AsyncMap { - .init(upstream: self, parallel: parallel, transform: transform) - } - - /// Transforms all elements from the upstream publisher with a provided a closure, giving the option to send several output values or an error (through the `Result` type) from the closure. - /// - /// The publisher will only complete if the upstream has completed and all the value transformation have returned at some point `promise(transformedValue, .finished)`. - /// - /// Also, it is worth noticing that `promise`s return a `Permission` enum value indicating whether the subscriber wants to receive more values. - /// ``` - /// [0, 1, 2].publisher.asyncTryMap { (value, isCancelled, promise) in - /// queue.async { - /// promise(.success((value * 2, .continue))) - /// promise(.success((value * 4, .continue))) - /// promise(.failure(error)) - /// } - /// } - /// ``` - /// - parameter parallel: The maximum number of values being processed at a time. Since the processing is returned in a promise, many upstream values can be processed at a single time point. - /// - parameter transform: A closure that takes the upstream emitted value and expects a promise to be called with the transformed result. - /// - returns: A publisher with output `T` and failure `Swift.Error`. - @inlinable public func asyncTryMap(parallel: Subscribers.Demand, _ transform: @escaping Publishers.AsyncTryMap.Closure) -> Publishers.AsyncTryMap { - .init(upstream: self, parallel: parallel, transform: transform) - } -} diff --git a/sources/conbini/publishers/AsyncMap.swift b/sources/conbini/publishers/AsyncMap.swift deleted file mode 100644 index 9565e44..0000000 --- a/sources/conbini/publishers/AsyncMap.swift +++ /dev/null @@ -1,251 +0,0 @@ -import Combine - -extension Publishers { - /// Transforms all elements from the upstream publisher with a provided closure. - /// - /// This publisher only fails if the upstream fails. Also, remember that asynchronous and a parallel setting different than one may return the values in an unexpected order. - public struct AsyncMap: Publisher where Upstream:Publisher { - public typealias Failure = Upstream.Failure - /// The closure type used to return the result of the transformation. - /// - parameter result: The transformation result. - /// - parameter request: Whether the closure want to continue sending values or it is done. - /// - returns: Enum indicating whether the closure can keep calling this promise. - public typealias Promise = (_ result: Output, _ request: Publishers.Async.Request) -> Publishers.Async.Permission - /// Checks whether the publisher is already cancelled or it is still operating. - /// - returns: Boolean indicating whether the publisher has been already cancelled (`true`) or it is still active (`false`). - public typealias CancelCheck = () -> Bool - /// The closure type being stored for value transformation. - /// - parameter value: The value received from the upstream. - /// - parameter promise: The promise to call once the transformation is done. - public typealias Closure = (_ value: Upstream.Output, _ isCancelled: @escaping CancelCheck, _ promise: @escaping Promise) -> Void - - /// The upstream publisher. - public let upstream: Upstream - /// The maximum number of parallel requests allowed. - public let parallel: Subscribers.Demand - /// The closure generating the downstream value. - /// - note: The closure is kept in the publisher; therefore, if you keep the publisher around any reference in the closure will be kept too. - public let closure: Closure - /// Creates a publisher that transforms the incoming values. This transformation is asynchronous and it may entail several output values. - /// - /// The `parallel` parameter indicates how many upstream values shall be processed at the same time. It is also important to notice that the downstream demand is uphold. Therefore, if the downstream requests 3 values, and there are already 2 promises generating several values (while not completing), the following upstream values will be ignored. - /// - /// The only cases were values are promised not to be ignored are: - /// - If `parallel` is `.max(1)` and the promise user looks for the `Permission` response before sending new values. - /// - If a promise is just sending one output (similar to a `map` publisher). - /// - precondition: `parallel` must be greater than zero. - /// - parameter upstream: The event emitter to the publisher being created. - /// - parameter parallel: The maximum number of parallel upstream value processing. - /// - parameter transform: Closure in charge of transforming the values. - @inlinable public init(upstream: Upstream, parallel: Subscribers.Demand, transform: @escaping Closure) { - precondition(parallel > 0) - self.upstream = upstream - self.parallel = parallel - self.closure = transform - } - - public func receive(subscriber: S) where S:Subscriber, S.Input==Output, S.Failure==Failure { - let conduit = Conduit(downstream: subscriber, parallel: self.parallel, closure: self.closure) - self.upstream.subscribe(conduit) - } - } -} - -fileprivate extension Publishers.AsyncMap { - /// Subscription representing an activated `AsyncMap` publisher. - final class Conduit: Subscription, Subscriber where Downstream:Subscriber, Downstream.Input==Output, Downstream.Failure==Failure { - typealias Input = Upstream.Output - typealias Failure = Upstream.Failure - - /// Enum listing all possible conduit states. - @Lock private var state: State<_WaitConfiguration,_ActiveConfiguration> - - /// Creates a representation of an `AsyncMap` publisher. - init(downstream: Downstream, parallel: Subscribers.Demand, closure: @escaping Closure) { - self.state = .awaitingSubscription(.init(downstream: downstream, parallel: parallel, closure: closure)) - } - - deinit { - self.cancel() - self._state.deinitialize() - } - - func receive(subscription: Subscription) { - guard let config = self._state.activate(atomic: { .init(upstream: subscription, downstream: $0.downstream, parallel: $0.parallel, closure: $0.closure) }) else { - return subscription.cancel() - } - config.downstream.receive(subscription: self) - } - - func request(_ demand: Subscribers.Demand) { - guard demand > 0 else { return } - - self._state.lock() - - guard let config = self.state.activeConfiguration else { return self._state.unlock() } - config.demand.expected += demand - - guard case (let subscription?, let d) = config.calculateUpstreamDemand(), d > 0 else { return self._state.unlock() } - config.demand.requested += d - - self._state.unlock() - - subscription.request(d) - } - - func receive(_ input: Upstream.Output) -> Subscribers.Demand { - self._state.lock() - - guard let config = self.state.activeConfiguration else { self._state.unlock(); return .none } - config.demand.requested -= 1 - - // If there are already a maximum amount of parallel processing values, ignore the inconming value. - guard config.demand.processing < config.parallel else { self._state.unlock(); return .none } - config.demand.processing += 1 - - guard case (.some, let d) = config.calculateUpstreamDemand() else { fatalError("\nAn input was received, although the upstream already disappeared\n") } - config.demand.requested += d - let closure = config.closure - let (isCancelled, promise) = self._makeClosures() - self._state.unlock() - - closure(input, isCancelled, promise) - return d - } - - func receive(completion: Subscribers.Completion) { - self._state.lock() - - guard let config = self.state.activeConfiguration else { return self._state.unlock() } - config.upstream = nil - config.demand.requested = .none - - if case .finished = completion, config.demand.processing > 0 { return self._state.unlock() } - self.state = .terminated - self._state.unlock() - config.downstream.receive(completion: completion) - } - - func cancel() { - guard case .active(let config) = self._state.terminate() else { return } - config.upstream?.cancel() - } - } -} - -private extension Publishers.AsyncMap.Conduit { - /// - precondition: When this function is called `self` is within the lock and in an active state. - func _makeClosures() -> (check: Publishers.AsyncMap.CancelCheck, promise: Publishers.AsyncMap.Promise) { - typealias P = Publishers.AsyncMap - var isFinished = false - - let isCancelled: P.CancelCheck = { [weak self] in - guard let self = self else { return true } - self._state.lock() - let result = self.state.isTerminated || isFinished - self._state.unlock() - return result - } - - let promise: P.Promise = { [weak self] (value, request) in - guard let self = self else { - isFinished = true - return .forbidden - } - - self._state.lock() - guard let config = self.state.activeConfiguration else { - isFinished = true - self._state.unlock() - return .forbidden - } - - let downstream = config.downstream - config.demand.expected -= 1 - self._state.unlock() - - let receivedDemand = downstream.receive(value) - - self._state.lock() - guard self.state.isActive else { - isFinished = true - self._state.unlock() - return .forbidden - } - - config.demand.expected += receivedDemand - config.demand.processing -= 1 - - let (s, d) = config.calculateUpstreamDemand() - if case .continue = request, config.demand.expected > 0 { - config.demand.processing += 1 - self._state.unlock() - if let subscription = s, d > 1 { subscription.request(d - 1) } - return .allowed - } - - isFinished = true - - if let subscription = s { - config.demand.requested += d - self._state.unlock() - if d > 0 { subscription.request(d) } - } else { - self._state.unlock() - if config.demand.processing <= 0 { downstream.receive(completion: .finished) } - } - - return .forbidden - } - - return (isCancelled, promise) - } -} - -private extension Publishers.AsyncMap.Conduit { - /// Values needed for the subscription awaiting state. - struct _WaitConfiguration { - let downstream: Downstream - let parallel: Subscribers.Demand - let closure: Publishers.AsyncMap.Closure - } - - /// Values needed for the subscription active state. - final class _ActiveConfiguration { - /// The subscription used to manage the upstream back-pressure. - var upstream: Subscription? - /// The subscriber receiving the input and completion. - let downstream: Downstream - /// The maximum number of parallel requests allowed. - let parallel: Subscribers.Demand - /// The closure being called for each upstream value emitted. - let closure: Publishers.AsyncMap.Closure - /// The values requested by the downstream and the values being processed at the moment. - var demand: (requested: Subscribers.Demand, expected: Subscribers.Demand, processing: Int) - - /// Designated initializer providing the requried upstream and downstream. - init(upstream: Subscription, downstream: Downstream, parallel: Subscribers.Demand, closure: @escaping Publishers.AsyncMap.Closure) { - self.upstream = upstream - self.downstream = downstream - self.parallel = parallel - self.closure = closure - self.demand = (.none, .none, 0) - } - - /// Calculates how many values conduit wants. - /// - remark: This method must be called within the conduit lock. It expects atomic access. - /// - returns: The upstream subscription and the demand to perform. If `nil`, no operation shall be performed. - func calculateUpstreamDemand() -> (subscription: Subscription?, demand: Subscribers.Demand) { - // If there is no upstream or the upstream has been previously asked for an unlimited number of values, there is no need to request anything else. - guard let upstream = self.upstream else { return (nil, .none) } - guard let requested = self.demand.requested.max else { return (upstream, .none) } - - let inflight = requested + self.demand.processing - // If the number of inflight requests is the same as the allowed parallel ones, don't ask for more. - guard inflight < self.parallel else { return (upstream, .none) } - - let result = min(self.parallel, self.demand.expected) - inflight - return (upstream, result) - } - } -} diff --git a/sources/conbini/publishers/AsyncTryMap.swift b/sources/conbini/publishers/AsyncTryMap.swift deleted file mode 100644 index e8cc409..0000000 --- a/sources/conbini/publishers/AsyncTryMap.swift +++ /dev/null @@ -1,290 +0,0 @@ -import Combine - -extension Publishers { - /// Transforms all elements from the upstream publisher with a provided closure. - /// - /// This publisher only fails if the upstream fails. Also, remember that asynchronous and a parallel setting different than one may return the values in an unexpected order. - public struct AsyncTryMap: Publisher where Upstream:Publisher { - public typealias Failure = Swift.Error - /// The closure type used to return the result of the transformation. - /// - parameter result: The transformation result. - /// - parameter request: Whether the closure want to continue sending values or it is done. - /// - returns: Enum indicating whether the closure can keep calling this promise. - public typealias Promise = (_ result: Result<(value: Output, request: Publishers.Async.Request),Swift.Error>) -> Publishers.Async.Permission - /// Checks whether the publisher is already cancelled or it is still operating. - /// - returns: Boolean indicating whether the publisher has been already cancelled (`true`) or it is still active (`false`). - public typealias CancelCheck = () -> Bool - /// The closure type being stored for value transformation. - /// - parameter value: The value received from the upstream. - /// - parameter promise: The promise to call once the transformation is done. - public typealias Closure = (_ value: Upstream.Output, _ isCancelled: @escaping CancelCheck, _ promise: @escaping Promise) -> Void - - /// The upstream publisher. - public let upstream: Upstream - /// The maximum number of parallel requests allowed. - public let parallel: Subscribers.Demand - /// The closure generating the downstream value. - /// - note: The closure is kept in the publisher; therefore, if you keep the publisher around any reference in the closure will be kept too. - public let closure: Closure - /// Creates a publisher that transforms the incoming values. This transformation is asynchronous and it may entail several output values. - /// - /// The `parallel` parameter indicates how many upstream values shall be processed at the same time. It is also important to notice that the downstream demand is uphold. Therefore, if the downstream requests 3 values, and there are already 2 promises generating several values (while not completing), the following upstream values will be ignored. - /// - /// The only cases were values are promised not to be ignored are: - /// - If `parallel` is `.max(1)` and the promise user looks for the `Permission` response before sending new values. - /// - If a promise is just sending one output (similar to a `map` publisher). - /// - precondition: `parallel` must be greater than zero. - /// - parameter upstream: The event emitter to the publisher being created. - /// - parameter parallel: The maximum number of parallel upstream value processing. - /// - parameter transform: Closure in charge of transforming the values. - @inlinable public init(upstream: Upstream, parallel: Subscribers.Demand, transform: @escaping Closure) { - precondition(parallel > 0) - self.upstream = upstream - self.parallel = parallel - self.closure = transform - } - - public func receive(subscriber: S) where S:Subscriber, S.Input==Output, S.Failure==Failure { - let conduit = Conduit(downstream: subscriber, parallel: self.parallel, closure: self.closure) - self.upstream.subscribe(conduit) - } - } -} - -extension Publishers.AsyncTryMap { - /// Indication whether the transformation closure will continue emitting values (i.e. `.continue`) or it is done (i.e. `finished`). - public enum Request: Equatable { - /// The transformation closure will continue emitting values. Failing to do so will make the publisher to never complete nor process further upstream values. - case `continue` - /// The transformation closure is done and further upstream values may be processed. - case finished - } - - /// The permission returned by a promise. - public enum Permission: ExpressibleByBooleanLiteral, Equatable { - /// The transformation closure is allowed to send a new value. - case allowed - /// The transformation closure is forbidden to send a new value. If it tries to do so, it will get ignored. - case forbidden - - public init(booleanLiteral value: BooleanLiteralType) { - switch value { - case true: self = .allowed - case false: self = .forbidden - } - } - } -} - -fileprivate extension Publishers.AsyncTryMap { - /// Subscription representing an activated `AsyncTryMap` publisher. - final class Conduit: Subscription, Subscriber where Downstream:Subscriber, Downstream.Input==Output, Downstream.Failure==Failure { - typealias Input = Upstream.Output - typealias Failure = Upstream.Failure - - /// Enum listing all possible conduit states. - @Lock private var state: State<_WaitConfiguration,_ActiveConfiguration> - - /// Creates a representation of an `AsyncTryMap` publisher. - init(downstream: Downstream, parallel: Subscribers.Demand, closure: @escaping Closure) { - self.state = .awaitingSubscription(.init(downstream: downstream, parallel: parallel, closure: closure)) - } - - deinit { - self.cancel() - self._state.deinitialize() - } - - func receive(subscription: Subscription) { - guard let config = self._state.activate(atomic: { .init(upstream: subscription, downstream: $0.downstream, parallel: $0.parallel, closure: $0.closure) }) else { - return subscription.cancel() - } - config.downstream.receive(subscription: self) - } - - func request(_ demand: Subscribers.Demand) { - guard demand > 0 else { return } - - self._state.lock() - - guard let config = self.state.activeConfiguration else { return self._state.unlock() } - config.demand.expected += demand - - guard case (let subscription?, let d) = config.calculateUpstreamDemand(), d > 0 else { return self._state.unlock() } - config.demand.requested += d - - self._state.unlock() - - subscription.request(d) - } - - func receive(_ input: Upstream.Output) -> Subscribers.Demand { - self._state.lock() - - guard let config = self.state.activeConfiguration else { self._state.unlock(); return .none } - config.demand.requested -= 1 - - // If there are already a maximum amount of parallel processing values, ignore the inconming value. - guard config.demand.processing < config.parallel else { self._state.unlock(); return .none } - config.demand.processing += 1 - - guard case (.some, let d) = config.calculateUpstreamDemand() else { fatalError("\nAn input was received, although the upstream already disappeared\n") } - config.demand.requested += d - let closure = config.closure - let (isCancelled, promise) = self._makeClosures() - self._state.unlock() - - closure(input, isCancelled, promise) - return d - } - - func receive(completion: Subscribers.Completion) { - self._state.lock() - - guard let config = self.state.activeConfiguration else { return self._state.unlock() } - config.upstream = nil - config.demand.requested = .none - - if case .finished = completion, config.demand.processing > 0 { return self._state.unlock() } - self.state = .terminated - self._state.unlock() - config.downstream.receive(completion: completion.mapError { $0 }) - } - - func cancel() { - guard case .active(let config) = self._state.terminate() else { return } - config.upstream?.cancel() - } - } -} - -private extension Publishers.AsyncTryMap.Conduit { - /// - precondition: When this function is called `self` is within the lock and in an active state. - func _makeClosures() -> (check: Publishers.AsyncTryMap.CancelCheck, promise: Publishers.AsyncTryMap.Promise) { - typealias P = Publishers.AsyncTryMap - var isFinished = false - - let isCancelled: P.CancelCheck = { [weak self] in - guard let self = self else { return true } - self._state.lock() - let result = self.state.isTerminated || isFinished - self._state.unlock() - return result - } - - let promise: P.Promise = { [weak self] (result) in - guard let self = self else { - isFinished = true - return .forbidden - } - - self._state.lock() - guard let config = self.state.activeConfiguration else { - isFinished = true - self._state.unlock() - return .forbidden - } - - let downstream = config.downstream - config.demand.expected -= 1 - - let (value, request): (Output, Publishers.Async.Request) - switch result { - case .success(let result): - (value, request) = result - self._state.unlock() - case .failure(let error): - isFinished = true - config.upstream = nil - config.demand.requested = .none - self.state = .terminated - self._state.unlock() - downstream.receive(completion: .failure(error)) - return .forbidden - } - - let receivedDemand = downstream.receive(value) - - self._state.lock() - guard self.state.isActive else { - isFinished = true - self._state.unlock() - return .forbidden - } - - config.demand.expected += receivedDemand - config.demand.processing -= 1 - - let (s, d) = config.calculateUpstreamDemand() - if case .continue = request, config.demand.expected > 0 { - config.demand.processing += 1 - self._state.unlock() - if let subscription = s, d > 1 { subscription.request(d - 1) } - return .allowed - } - - isFinished = true - - if let subscription = s { - config.demand.requested += d - self._state.unlock() - if d > 0 { subscription.request(d) } - } else { - self._state.unlock() - if config.demand.processing <= 0 { downstream.receive(completion: .finished) } - } - - return .forbidden - } - - return (isCancelled, promise) - } -} - -private extension Publishers.AsyncTryMap.Conduit { - /// Values needed for the subscription awaiting state. - struct _WaitConfiguration { - let downstream: Downstream - let parallel: Subscribers.Demand - let closure: Publishers.AsyncTryMap.Closure - } - - /// Values needed for the subscription active state. - final class _ActiveConfiguration { - /// The subscription used to manage the upstream back-pressure. - var upstream: Subscription? - /// The subscriber receiving the input and completion. - let downstream: Downstream - /// The maximum number of parallel requests allowed. - let parallel: Subscribers.Demand - /// The closure being called for each upstream value emitted. - let closure: Publishers.AsyncTryMap.Closure - /// The values requested by the downstream and the values being processed at the moment. - var demand: (requested: Subscribers.Demand, expected: Subscribers.Demand, processing: Int) - - /// Designated initializer providing the requried upstream and downstream. - init(upstream: Subscription, downstream: Downstream, parallel: Subscribers.Demand, closure: @escaping Publishers.AsyncTryMap.Closure) { - self.upstream = upstream - self.downstream = downstream - self.parallel = parallel - self.closure = closure - self.demand = (.none, .none, 0) - } - - /// Calculates how many values conduit wants. - /// - remark: This method must be called within the conduit lock. It expects atomic access. - /// - returns: The upstream subscription and the demand to perform. If `nil`, no operation shall be performed. - func calculateUpstreamDemand() -> (subscription: Subscription?, demand: Subscribers.Demand) { - // If there is no upstream or the upstream has been previously asked for an unlimited number of values, there is no need to request anything else. - guard let upstream = self.upstream else { return (nil, .none) } - guard let requested = self.demand.requested.max else { return (upstream, .none) } - - let inflight = requested + self.demand.processing - // If the number of inflight requests is the same as the allowed parallel ones, don't ask for more. - guard inflight < self.parallel else { return (upstream, .none) } - - let result = min(self.parallel, self.demand.expected) - inflight - return (upstream, result) - } - } -} diff --git a/tests/conbiniTests/publishers/AsyncMapTests.swift b/tests/conbiniTests/publishers/AsyncMapTests.swift deleted file mode 100644 index 2515310..0000000 --- a/tests/conbiniTests/publishers/AsyncMapTests.swift +++ /dev/null @@ -1,299 +0,0 @@ -import XCTest -import Conbini -import Combine - -/// Tests the correct behavior of the `AsyncMap` publisher. -final class AsyncMapTests: XCTestCase { - /// A custom error to send as a dummy. - private struct CustomError: Swift.Error {} - /// A convenience storage of cancellables. - private var _cancellables = Set() - - override func setUp() { - self.continueAfterFailure = false - self._cancellables.removeAll() - } -} - -extension AsyncMapTests { - /// Test async unlimited map operation (just one value asynchronous transformation). - func testAsyncMapUnlimitedSingleValue() { - let exp = self.expectation(description: "Publisher completes") - let queue = DispatchQueue(label: "io.dehesa.conbini.tests.publishers.asyncmap") - - var received: [Int] = .init() - Just(1).asyncMap(parallel: .unlimited) { (value, isCancelled, promise) in - queue.async { - XCTAssertFalse(isCancelled()) - XCTAssertEqual(promise(value * 10, .finished), .forbidden) - XCTAssertTrue(isCancelled()) - } - }.receive(on: DispatchQueue.main) - .sink(receiveCompletion: { - guard case .finished = $0 else { return XCTFail() } - exp.fulfill() - }, receiveValue: { received.append($0) }) - .store(in: &self._cancellables) - - self.wait(for: [exp], timeout: 0.2) - XCTAssertEqual(received.count, 1) - XCTAssertEqual(received[0], 10) - } - - /// Test async unlimited map operation (array of asynchronous transformation). - func testAsyncMapUnlimitedArray() { - let exp = self.expectation(description: "Publisher completes") - let queue = DispatchQueue(label: "io.dehesa.conbini.tests.publishers.asyncmap") - - var received: [Int] = .init() - [1, 2, 3, 4].publisher - .asyncMap(parallel: .unlimited) { (value, isCancelled, promise) in - queue.async { - XCTAssertFalse(isCancelled()) - XCTAssertEqual(promise(value * 10, .finished), .forbidden) - XCTAssertTrue(isCancelled()) - } - }.receive(on: DispatchQueue.main) - .sink(receiveCompletion: { - guard case .finished = $0 else { return XCTFail() } - exp.fulfill() - }, receiveValue: { received.append($0) }) - .store(in: &self._cancellables) - - self.wait(for: [exp], timeout: 0.2) - XCTAssertEqual(received.sorted(), [10, 20, 30, 40].sorted()) - } - - /// Tests async unlimited map operation (each value received generate several values). - func testAsyncMapUnlimitedMulti() { - let exp = self.expectation(description: "Publisher completes") - let queue = DispatchQueue(label: "io.dehesa.conbini.tests", autoreleaseFrequency: .never, target: nil) - - var received: [Int] = .init() - [1, 2, 3, 4].publisher - .asyncMap(parallel: .unlimited) { (value, _, promise) in - queue.async { XCTAssertEqual(promise(value * 10 + 0, .continue), .allowed) } - queue.async { XCTAssertEqual(promise(value * 10 + 1, .continue), .allowed) } - queue.async { XCTAssertEqual(promise(value * 10 + 2, .finished), .forbidden) } - }.sink(receiveCompletion: { - guard case .finished = $0 else { return XCTFail() } - exp.fulfill() - }, receiveValue: { - received.append($0) - }).store(in: &self._cancellables) - - self.wait(for: [exp], timeout: 0.2) - XCTAssertEqual(received, [10, 11, 12, 20, 21, 22, 30, 31, 32, 40, 41, 42]) - } - - /// Tests async restrict map operation (array of asynchronous transformation). - func testAsyncMapRestrictedArrayOne() { - let exp = self.expectation(description: "Publisher completes") - let queue = DispatchQueue(label: "io.dehesa.conbini.tests.publishers.asyncmap") - - var received: [Int] = .init() - [1, 2, 3, 4].publisher - .asyncMap(parallel: .max(1)) { (value, _, promise) in - queue.async { XCTAssertEqual(promise(value * 10, .finished), .forbidden) } - }.sink(receiveCompletion: { - guard case .finished = $0 else { return XCTFail() } - exp.fulfill() - }, receiveValue: { received.append($0) }) - .store(in: &self._cancellables) - - self.wait(for: [exp], timeout: 0.2) - XCTAssertEqual(received, [10, 20, 30, 40]) - } - - func testAsyncMapRestrictedArrayTwo() { - let exp = self.expectation(description: "Publisher completes") - let queue = DispatchQueue(label: "io.dehesa.conbini.tests.publishers.asyncmap") - - var received: [Int] = .init() - [1, 2, 3, 4, 5, 6, 7, 8].publisher - .asyncMap(parallel: .max(2)) { (value, _, promise) in - queue.async { XCTAssertEqual(promise(value * 10, .finished), .forbidden) } - }.receive(on: DispatchQueue.main) - .sink(receiveCompletion: { - guard case .finished = $0 else { return XCTFail() } - exp.fulfill() - }, receiveValue: { received.append($0) }) - .store(in: &self._cancellables) - - self.wait(for: [exp], timeout: 0.2) - XCTAssertEqual(received.sorted(), [10, 20, 30, 40, 50, 60, 70, 80].sorted()) - } - - /// Tests async unlimited map operation (each value received generate several values). - func testAsyncMapRestrictedMulti() { - let exp = self.expectation(description: "Publisher completes") - let queue = DispatchQueue(label: "io.dehesa.conbini.tests", autoreleaseFrequency: .never, target: nil) - - var received: [Int] = .init() - [1, 2, 3, 4].publisher - .asyncMap(parallel: .max(1)) { (value, _, promise) in - queue.async { XCTAssertEqual(promise(value * 10 + 0, .continue), .allowed) } - queue.async { XCTAssertEqual(promise(value * 10 + 1, .continue), .allowed) } - queue.async { XCTAssertEqual(promise(value * 10 + 2, .finished), .forbidden) } - }.sink(receiveCompletion: { - guard case .finished = $0 else { return XCTFail() } - exp.fulfill() - }, receiveValue: { - received.append($0) - }).store(in: &self._cancellables) - - self.wait(for: [exp], timeout: 0.2) - XCTAssertEqual(received, [10, 11, 12, 20, 21, 22, 30, 31, 32, 40, 41, 42]) - } -} - -extension AsyncMapTests { - /// Test async unlimited map operation (just one value asynchronous transformation). - func testAsyncTryMapUnlimitedSingleValue() { - let exp = self.expectation(description: "Publisher completes") - let queue = DispatchQueue(label: "io.dehesa.conbini.tests.publishers.asyncmap") - - var received: [Int] = .init() - Just(1).asyncTryMap(parallel: .unlimited) { (value, _, promise) in - queue.async { XCTAssertEqual(promise(.success((value * 10, .finished))), .forbidden) } - }.receive(on: DispatchQueue.main) - .sink(receiveCompletion: { - guard case .finished = $0 else { return XCTFail() } - exp.fulfill() - }, receiveValue: { received.append($0) }) - .store(in: &self._cancellables) - - self.wait(for: [exp], timeout: 0.2) - XCTAssertEqual(received.count, 1) - XCTAssertEqual(received[0], 10) - } - - /// Test async unlimited map operation (array of asynchronous transformation). - func testAsyncTryMapUnlimitedArray() { - let exp = self.expectation(description: "Publisher completes") - let queue = DispatchQueue(label: "io.dehesa.conbini.tests.publishers.asyncmap") - - var received: [Int] = .init() - [1, 2, 3, 4].publisher - .asyncTryMap(parallel: .unlimited) { (value, _, promise) in - queue.async { XCTAssertEqual(promise(.success((value * 10, .finished))), .forbidden) } - }.receive(on: DispatchQueue.main) - .sink(receiveCompletion: { - guard case .finished = $0 else { return XCTFail() } - exp.fulfill() - }, receiveValue: { received.append($0) }) - .store(in: &self._cancellables) - - self.wait(for: [exp], timeout: 0.2) - XCTAssertEqual(received.sorted(), [10, 20, 30, 40].sorted()) - } - - /// Tests async unlimited map operation (each value received generate several values). - func testAsyncTryMapUnlimitedMulti() { - let exp = self.expectation(description: "Publisher completes") - let queue = DispatchQueue(label: "io.dehesa.conbini.tests", autoreleaseFrequency: .never, target: nil) - - var received: [Int] = .init() - [1, 2, 3, 4].publisher - .asyncTryMap(parallel: .unlimited) { (value, _, promise) in - queue.async { XCTAssertEqual(promise(.success((value * 10 + 0, .continue))), .allowed) } - queue.async { XCTAssertEqual(promise(.success((value * 10 + 1, .continue))), .allowed) } - queue.async { XCTAssertEqual(promise(.success((value * 10 + 2, .finished))), .forbidden) } - }.sink(receiveCompletion: { - guard case .finished = $0 else { return XCTFail() } - exp.fulfill() - }, receiveValue: { - received.append($0) - }).store(in: &self._cancellables) - - self.wait(for: [exp], timeout: 0.2) - XCTAssertEqual(received, [10, 11, 12, 20, 21, 22, 30, 31, 32, 40, 41, 42]) - } - - /// Tests async unlimited map operation (where one single error is published). - func testAsyncTryMapUnlimitedError() { - let exp = self.expectation(description: "Publisher fails") - let queue = DispatchQueue(label: "io.dehesa.conbini.tests.publishers.asyncmap") - - var received: [Int] = .init() - Just(1).asyncTryMap(parallel: .unlimited) { (value, isCancelled, promise) in - queue.async { - XCTAssertFalse(isCancelled()) - XCTAssertEqual(promise(.failure(CustomError())), .forbidden) - XCTAssertTrue(isCancelled()) - } - }.receive(on: DispatchQueue.main) - .sink(receiveCompletion: { - guard case .failure(let error) = $0, error is CustomError else { return XCTFail() } - exp.fulfill() - }, receiveValue: { received.append($0) }) - .store(in: &self._cancellables) - - self.wait(for: [exp], timeout: 0.2) - XCTAssertTrue(received.isEmpty) - } - - /// Tests async restrict map operation (array of asynchronous transformation). - func testAsyncTryMapRestrictedArrayOne() { - let exp = self.expectation(description: "Publisher completes") - let queue = DispatchQueue(label: "io.dehesa.conbini.tests.publishers.asyncmap") - - var received: [Int] = .init() - [1, 2, 3, 4].publisher - .asyncTryMap(parallel: .max(1)) { (value, _, promise) in - queue.async { - XCTAssertEqual(promise(.success((value * 10, .finished))), .forbidden) - } - }.sink(receiveCompletion: { - guard case .finished = $0 else { return XCTFail() } - exp.fulfill() - }, receiveValue: { received.append($0) }) - .store(in: &self._cancellables) - - self.wait(for: [exp], timeout: 0.2) - XCTAssertEqual(received, [10, 20, 30, 40]) - } - - func testAsyncTryMapRestrictedArrayTwo() { - let exp = self.expectation(description: "Publisher completes") - let queue = DispatchQueue(label: "io.dehesa.conbini.tests.publishers.asyncmap") - - var received: [Int] = .init() - [1, 2, 3, 4, 5, 6, 7, 8].publisher - .asyncTryMap(parallel: .max(2)) { (value, _, promise) in - queue.async { - XCTAssertEqual(promise(.success((value * 10, .finished))), .forbidden) - } - }.receive(on: DispatchQueue.main) - .sink(receiveCompletion: { - guard case .finished = $0 else { return XCTFail() } - exp.fulfill() - }, receiveValue: { received.append($0) }) - .store(in: &self._cancellables) - - self.wait(for: [exp], timeout: 0.2) - XCTAssertEqual(received.sorted(), [10, 20, 30, 40, 50, 60, 70, 80].sorted()) - } - - /// Tests async unlimited map operation (each value received generate several values). - func testAsyncTryMapRestrictedMulti() { - let exp = self.expectation(description: "Publisher completes") - let queue = DispatchQueue(label: "io.dehesa.conbini.tests", autoreleaseFrequency: .never, target: nil) - - var received: [Int] = .init() - [1, 2, 3, 4].publisher - .asyncTryMap(parallel: .max(1)) { (value, _, promise) in - queue.async { XCTAssertEqual(promise(.success((value * 10 + 0, .continue))), .allowed) } - queue.async { XCTAssertEqual(promise(.success((value * 10 + 1, .continue))), .allowed) } - queue.async { XCTAssertEqual(promise(.success((value * 10 + 2, .finished))), .forbidden) } - }.sink(receiveCompletion: { - guard case .finished = $0 else { return XCTFail() } - exp.fulfill() - }, receiveValue: { - received.append($0) - }).store(in: &self._cancellables) - - self.wait(for: [exp], timeout: 0.2) - XCTAssertEqual(received, [10, 11, 12, 20, 21, 22, 30, 31, 32, 40, 41, 42]) - } -}