diff --git a/README.md b/README.md
index 512703f..65064f5 100644
--- a/README.md
+++ b/README.md
@@ -24,7 +24,7 @@ import PackageDescription
let package = Package(
/* Your package name, supported platforms, and generated products go here */
dependencies: [
- .package(url: "https://github.com/dehesa/Conbini.git", from: "0.5.1")
+ .package(url: "https://github.com/dehesa/Conbini.git", from: "0.6.0")
],
targets: [
.target(name: /* Your target name here */, dependencies: ["Conbini"])
@@ -97,25 +97,6 @@ let publisher = setConfigurationOnServer.then {
This operator optionally lets you control backpressure with its `maxDemand` parameter. The parameter behaves like `flatMap`'s `maxPublishers`, which specifies the maximum demand requested to the upstream at any given time.
-
-
-asyncMap(_:)
-
-It transforms elements received from upstream (similar to `map`), but the result is returned from a promise instead of using the `return` statement. Furthermore, promises can be called multipled times effectively transforming one upstream value into many outputs.
-
-```swift
-let publisher = [1, 10, 100].publisher.asyncMap { (value, isCancelled, promise) in
- queue.asyncAfter(deadline: ....) {
- guard isCancelled else { return }
- promise(newValue1, .continue)
- promise(newValue2, .continue)
- promise(newValue3, .finished)
- }
-}
-```
-
-This operator also provides a `try` variant accepting a result (instead of a value).
-
@@ -399,8 +380,6 @@ let emittedValue = publisherChain.expectsOne(timeout: 0.8, on: test)
- Apple's [Combine documentation](https://developer.apple.com/documentation/combine).
- [The Combine book](https://store.raywenderlich.com/products/combine-asynchronous-programming-with-swift) is an excellent Ray Wenderlich book about the Combine framework.
-- [Cocoa with love](https://www.cocoawithlove.com) has a great series of articles about the inner workings of Combine: [1. Protocols](https://www.cocoawithlove.com/blog/twenty-two-short-tests-of-combine-part-1.html), [2. Sharing](https://www.cocoawithlove.com/blog/twenty-two-short-tests-of-combine-part-2.html), [3. Asynchrony](https://www.cocoawithlove.com/blog/twenty-two-short-tests-of-combine-part-3.html).
+- [Cocoa with love](https://www.cocoawithlove.com) has a great series of articles about the inner workings of Combine: [1](https://www.cocoawithlove.com/blog/twenty-two-short-tests-of-combine-part-1.html), [2](https://www.cocoawithlove.com/blog/twenty-two-short-tests-of-combine-part-2.html), [3](https://www.cocoawithlove.com/blog/twenty-two-short-tests-of-combine-part-3.html).
- [OpenCombine](https://github.com/broadwaylamb/OpenCombine) is an open source implementation of Apple's Combine framework.
- [CombineX](https://github.com/cx-org/CombineX) is an open source implementation of Apple's Combine framework.
-
-> This framework name references both the `Combine` framework and the helpful Japanese convenience stores 😄
diff --git a/sources/conbini/operators/AsyncMapOp.swift b/sources/conbini/operators/AsyncMapOp.swift
deleted file mode 100644
index 5c20a48..0000000
--- a/sources/conbini/operators/AsyncMapOp.swift
+++ /dev/null
@@ -1,47 +0,0 @@
-import Combine
-
-extension Publisher {
- /// Transforms all elements from the upstream publisher with a provided closure, giving the option to send several ouput values from the closure.
- ///
- /// The publisher will only complete if the upstream has completed and all the value transformation have returned at some point `promise(transformedValue, .finished)`.
- ///
- /// Also, it is worth noticing that `promise`s return a `Permission` enum value indicating whether the subscriber wants to receive more values.
- /// ```
- /// [1, 10, 100].publisher.asyncMap { (value, isCancelled, promise) in
- /// queue.asyncAfter(deadline: ...) {
- /// guard !isCancelled else { return }
- ///
- /// promise(String(value), .continue)
- /// promise(String(value*2), .continue)
- /// promise(String(value*4), .finished)
- /// }
- /// }
- /// ```
- /// - parameter parallel: The maximum number of values being processed at a time. Since the processing is returned in a promise, many upstream values can be processed at a single time point.
- /// - parameter transform: A closure that takes the upstream emitted value and expects a promise to be called with the transformed result.
- /// - returns: A publisher with output `T` and failure `Upstream.Failure`.
- @inlinable public func asyncMap(parallel: Subscribers.Demand, _ transform: @escaping Publishers.AsyncMap.Closure) -> Publishers.AsyncMap {
- .init(upstream: self, parallel: parallel, transform: transform)
- }
-
- /// Transforms all elements from the upstream publisher with a provided a closure, giving the option to send several output values or an error (through the `Result` type) from the closure.
- ///
- /// The publisher will only complete if the upstream has completed and all the value transformation have returned at some point `promise(transformedValue, .finished)`.
- ///
- /// Also, it is worth noticing that `promise`s return a `Permission` enum value indicating whether the subscriber wants to receive more values.
- /// ```
- /// [0, 1, 2].publisher.asyncTryMap { (value, isCancelled, promise) in
- /// queue.async {
- /// promise(.success((value * 2, .continue)))
- /// promise(.success((value * 4, .continue)))
- /// promise(.failure(error))
- /// }
- /// }
- /// ```
- /// - parameter parallel: The maximum number of values being processed at a time. Since the processing is returned in a promise, many upstream values can be processed at a single time point.
- /// - parameter transform: A closure that takes the upstream emitted value and expects a promise to be called with the transformed result.
- /// - returns: A publisher with output `T` and failure `Swift.Error`.
- @inlinable public func asyncTryMap(parallel: Subscribers.Demand, _ transform: @escaping Publishers.AsyncTryMap.Closure) -> Publishers.AsyncTryMap {
- .init(upstream: self, parallel: parallel, transform: transform)
- }
-}
diff --git a/sources/conbini/publishers/AsyncMap.swift b/sources/conbini/publishers/AsyncMap.swift
deleted file mode 100644
index 9565e44..0000000
--- a/sources/conbini/publishers/AsyncMap.swift
+++ /dev/null
@@ -1,251 +0,0 @@
-import Combine
-
-extension Publishers {
- /// Transforms all elements from the upstream publisher with a provided closure.
- ///
- /// This publisher only fails if the upstream fails. Also, remember that asynchronous and a parallel setting different than one may return the values in an unexpected order.
- public struct AsyncMap: Publisher where Upstream:Publisher {
- public typealias Failure = Upstream.Failure
- /// The closure type used to return the result of the transformation.
- /// - parameter result: The transformation result.
- /// - parameter request: Whether the closure want to continue sending values or it is done.
- /// - returns: Enum indicating whether the closure can keep calling this promise.
- public typealias Promise = (_ result: Output, _ request: Publishers.Async.Request) -> Publishers.Async.Permission
- /// Checks whether the publisher is already cancelled or it is still operating.
- /// - returns: Boolean indicating whether the publisher has been already cancelled (`true`) or it is still active (`false`).
- public typealias CancelCheck = () -> Bool
- /// The closure type being stored for value transformation.
- /// - parameter value: The value received from the upstream.
- /// - parameter promise: The promise to call once the transformation is done.
- public typealias Closure = (_ value: Upstream.Output, _ isCancelled: @escaping CancelCheck, _ promise: @escaping Promise) -> Void
-
- /// The upstream publisher.
- public let upstream: Upstream
- /// The maximum number of parallel requests allowed.
- public let parallel: Subscribers.Demand
- /// The closure generating the downstream value.
- /// - note: The closure is kept in the publisher; therefore, if you keep the publisher around any reference in the closure will be kept too.
- public let closure: Closure
- /// Creates a publisher that transforms the incoming values. This transformation is asynchronous and it may entail several output values.
- ///
- /// The `parallel` parameter indicates how many upstream values shall be processed at the same time. It is also important to notice that the downstream demand is uphold. Therefore, if the downstream requests 3 values, and there are already 2 promises generating several values (while not completing), the following upstream values will be ignored.
- ///
- /// The only cases were values are promised not to be ignored are:
- /// - If `parallel` is `.max(1)` and the promise user looks for the `Permission` response before sending new values.
- /// - If a promise is just sending one output (similar to a `map` publisher).
- /// - precondition: `parallel` must be greater than zero.
- /// - parameter upstream: The event emitter to the publisher being created.
- /// - parameter parallel: The maximum number of parallel upstream value processing.
- /// - parameter transform: Closure in charge of transforming the values.
- @inlinable public init(upstream: Upstream, parallel: Subscribers.Demand, transform: @escaping Closure) {
- precondition(parallel > 0)
- self.upstream = upstream
- self.parallel = parallel
- self.closure = transform
- }
-
- public func receive(subscriber: S) where S:Subscriber, S.Input==Output, S.Failure==Failure {
- let conduit = Conduit(downstream: subscriber, parallel: self.parallel, closure: self.closure)
- self.upstream.subscribe(conduit)
- }
- }
-}
-
-fileprivate extension Publishers.AsyncMap {
- /// Subscription representing an activated `AsyncMap` publisher.
- final class Conduit: Subscription, Subscriber where Downstream:Subscriber, Downstream.Input==Output, Downstream.Failure==Failure {
- typealias Input = Upstream.Output
- typealias Failure = Upstream.Failure
-
- /// Enum listing all possible conduit states.
- @Lock private var state: State<_WaitConfiguration,_ActiveConfiguration>
-
- /// Creates a representation of an `AsyncMap` publisher.
- init(downstream: Downstream, parallel: Subscribers.Demand, closure: @escaping Closure) {
- self.state = .awaitingSubscription(.init(downstream: downstream, parallel: parallel, closure: closure))
- }
-
- deinit {
- self.cancel()
- self._state.deinitialize()
- }
-
- func receive(subscription: Subscription) {
- guard let config = self._state.activate(atomic: { .init(upstream: subscription, downstream: $0.downstream, parallel: $0.parallel, closure: $0.closure) }) else {
- return subscription.cancel()
- }
- config.downstream.receive(subscription: self)
- }
-
- func request(_ demand: Subscribers.Demand) {
- guard demand > 0 else { return }
-
- self._state.lock()
-
- guard let config = self.state.activeConfiguration else { return self._state.unlock() }
- config.demand.expected += demand
-
- guard case (let subscription?, let d) = config.calculateUpstreamDemand(), d > 0 else { return self._state.unlock() }
- config.demand.requested += d
-
- self._state.unlock()
-
- subscription.request(d)
- }
-
- func receive(_ input: Upstream.Output) -> Subscribers.Demand {
- self._state.lock()
-
- guard let config = self.state.activeConfiguration else { self._state.unlock(); return .none }
- config.demand.requested -= 1
-
- // If there are already a maximum amount of parallel processing values, ignore the inconming value.
- guard config.demand.processing < config.parallel else { self._state.unlock(); return .none }
- config.demand.processing += 1
-
- guard case (.some, let d) = config.calculateUpstreamDemand() else { fatalError("\nAn input was received, although the upstream already disappeared\n") }
- config.demand.requested += d
- let closure = config.closure
- let (isCancelled, promise) = self._makeClosures()
- self._state.unlock()
-
- closure(input, isCancelled, promise)
- return d
- }
-
- func receive(completion: Subscribers.Completion) {
- self._state.lock()
-
- guard let config = self.state.activeConfiguration else { return self._state.unlock() }
- config.upstream = nil
- config.demand.requested = .none
-
- if case .finished = completion, config.demand.processing > 0 { return self._state.unlock() }
- self.state = .terminated
- self._state.unlock()
- config.downstream.receive(completion: completion)
- }
-
- func cancel() {
- guard case .active(let config) = self._state.terminate() else { return }
- config.upstream?.cancel()
- }
- }
-}
-
-private extension Publishers.AsyncMap.Conduit {
- /// - precondition: When this function is called `self` is within the lock and in an active state.
- func _makeClosures() -> (check: Publishers.AsyncMap.CancelCheck, promise: Publishers.AsyncMap.Promise) {
- typealias P = Publishers.AsyncMap
- var isFinished = false
-
- let isCancelled: P.CancelCheck = { [weak self] in
- guard let self = self else { return true }
- self._state.lock()
- let result = self.state.isTerminated || isFinished
- self._state.unlock()
- return result
- }
-
- let promise: P.Promise = { [weak self] (value, request) in
- guard let self = self else {
- isFinished = true
- return .forbidden
- }
-
- self._state.lock()
- guard let config = self.state.activeConfiguration else {
- isFinished = true
- self._state.unlock()
- return .forbidden
- }
-
- let downstream = config.downstream
- config.demand.expected -= 1
- self._state.unlock()
-
- let receivedDemand = downstream.receive(value)
-
- self._state.lock()
- guard self.state.isActive else {
- isFinished = true
- self._state.unlock()
- return .forbidden
- }
-
- config.demand.expected += receivedDemand
- config.demand.processing -= 1
-
- let (s, d) = config.calculateUpstreamDemand()
- if case .continue = request, config.demand.expected > 0 {
- config.demand.processing += 1
- self._state.unlock()
- if let subscription = s, d > 1 { subscription.request(d - 1) }
- return .allowed
- }
-
- isFinished = true
-
- if let subscription = s {
- config.demand.requested += d
- self._state.unlock()
- if d > 0 { subscription.request(d) }
- } else {
- self._state.unlock()
- if config.demand.processing <= 0 { downstream.receive(completion: .finished) }
- }
-
- return .forbidden
- }
-
- return (isCancelled, promise)
- }
-}
-
-private extension Publishers.AsyncMap.Conduit {
- /// Values needed for the subscription awaiting state.
- struct _WaitConfiguration {
- let downstream: Downstream
- let parallel: Subscribers.Demand
- let closure: Publishers.AsyncMap.Closure
- }
-
- /// Values needed for the subscription active state.
- final class _ActiveConfiguration {
- /// The subscription used to manage the upstream back-pressure.
- var upstream: Subscription?
- /// The subscriber receiving the input and completion.
- let downstream: Downstream
- /// The maximum number of parallel requests allowed.
- let parallel: Subscribers.Demand
- /// The closure being called for each upstream value emitted.
- let closure: Publishers.AsyncMap.Closure
- /// The values requested by the downstream and the values being processed at the moment.
- var demand: (requested: Subscribers.Demand, expected: Subscribers.Demand, processing: Int)
-
- /// Designated initializer providing the requried upstream and downstream.
- init(upstream: Subscription, downstream: Downstream, parallel: Subscribers.Demand, closure: @escaping Publishers.AsyncMap.Closure) {
- self.upstream = upstream
- self.downstream = downstream
- self.parallel = parallel
- self.closure = closure
- self.demand = (.none, .none, 0)
- }
-
- /// Calculates how many values conduit wants.
- /// - remark: This method must be called within the conduit lock. It expects atomic access.
- /// - returns: The upstream subscription and the demand to perform. If `nil`, no operation shall be performed.
- func calculateUpstreamDemand() -> (subscription: Subscription?, demand: Subscribers.Demand) {
- // If there is no upstream or the upstream has been previously asked for an unlimited number of values, there is no need to request anything else.
- guard let upstream = self.upstream else { return (nil, .none) }
- guard let requested = self.demand.requested.max else { return (upstream, .none) }
-
- let inflight = requested + self.demand.processing
- // If the number of inflight requests is the same as the allowed parallel ones, don't ask for more.
- guard inflight < self.parallel else { return (upstream, .none) }
-
- let result = min(self.parallel, self.demand.expected) - inflight
- return (upstream, result)
- }
- }
-}
diff --git a/sources/conbini/publishers/AsyncTryMap.swift b/sources/conbini/publishers/AsyncTryMap.swift
deleted file mode 100644
index e8cc409..0000000
--- a/sources/conbini/publishers/AsyncTryMap.swift
+++ /dev/null
@@ -1,290 +0,0 @@
-import Combine
-
-extension Publishers {
- /// Transforms all elements from the upstream publisher with a provided closure.
- ///
- /// This publisher only fails if the upstream fails. Also, remember that asynchronous and a parallel setting different than one may return the values in an unexpected order.
- public struct AsyncTryMap: Publisher where Upstream:Publisher {
- public typealias Failure = Swift.Error
- /// The closure type used to return the result of the transformation.
- /// - parameter result: The transformation result.
- /// - parameter request: Whether the closure want to continue sending values or it is done.
- /// - returns: Enum indicating whether the closure can keep calling this promise.
- public typealias Promise = (_ result: Result<(value: Output, request: Publishers.Async.Request),Swift.Error>) -> Publishers.Async.Permission
- /// Checks whether the publisher is already cancelled or it is still operating.
- /// - returns: Boolean indicating whether the publisher has been already cancelled (`true`) or it is still active (`false`).
- public typealias CancelCheck = () -> Bool
- /// The closure type being stored for value transformation.
- /// - parameter value: The value received from the upstream.
- /// - parameter promise: The promise to call once the transformation is done.
- public typealias Closure = (_ value: Upstream.Output, _ isCancelled: @escaping CancelCheck, _ promise: @escaping Promise) -> Void
-
- /// The upstream publisher.
- public let upstream: Upstream
- /// The maximum number of parallel requests allowed.
- public let parallel: Subscribers.Demand
- /// The closure generating the downstream value.
- /// - note: The closure is kept in the publisher; therefore, if you keep the publisher around any reference in the closure will be kept too.
- public let closure: Closure
- /// Creates a publisher that transforms the incoming values. This transformation is asynchronous and it may entail several output values.
- ///
- /// The `parallel` parameter indicates how many upstream values shall be processed at the same time. It is also important to notice that the downstream demand is uphold. Therefore, if the downstream requests 3 values, and there are already 2 promises generating several values (while not completing), the following upstream values will be ignored.
- ///
- /// The only cases were values are promised not to be ignored are:
- /// - If `parallel` is `.max(1)` and the promise user looks for the `Permission` response before sending new values.
- /// - If a promise is just sending one output (similar to a `map` publisher).
- /// - precondition: `parallel` must be greater than zero.
- /// - parameter upstream: The event emitter to the publisher being created.
- /// - parameter parallel: The maximum number of parallel upstream value processing.
- /// - parameter transform: Closure in charge of transforming the values.
- @inlinable public init(upstream: Upstream, parallel: Subscribers.Demand, transform: @escaping Closure) {
- precondition(parallel > 0)
- self.upstream = upstream
- self.parallel = parallel
- self.closure = transform
- }
-
- public func receive(subscriber: S) where S:Subscriber, S.Input==Output, S.Failure==Failure {
- let conduit = Conduit(downstream: subscriber, parallel: self.parallel, closure: self.closure)
- self.upstream.subscribe(conduit)
- }
- }
-}
-
-extension Publishers.AsyncTryMap {
- /// Indication whether the transformation closure will continue emitting values (i.e. `.continue`) or it is done (i.e. `finished`).
- public enum Request: Equatable {
- /// The transformation closure will continue emitting values. Failing to do so will make the publisher to never complete nor process further upstream values.
- case `continue`
- /// The transformation closure is done and further upstream values may be processed.
- case finished
- }
-
- /// The permission returned by a promise.
- public enum Permission: ExpressibleByBooleanLiteral, Equatable {
- /// The transformation closure is allowed to send a new value.
- case allowed
- /// The transformation closure is forbidden to send a new value. If it tries to do so, it will get ignored.
- case forbidden
-
- public init(booleanLiteral value: BooleanLiteralType) {
- switch value {
- case true: self = .allowed
- case false: self = .forbidden
- }
- }
- }
-}
-
-fileprivate extension Publishers.AsyncTryMap {
- /// Subscription representing an activated `AsyncTryMap` publisher.
- final class Conduit: Subscription, Subscriber where Downstream:Subscriber, Downstream.Input==Output, Downstream.Failure==Failure {
- typealias Input = Upstream.Output
- typealias Failure = Upstream.Failure
-
- /// Enum listing all possible conduit states.
- @Lock private var state: State<_WaitConfiguration,_ActiveConfiguration>
-
- /// Creates a representation of an `AsyncTryMap` publisher.
- init(downstream: Downstream, parallel: Subscribers.Demand, closure: @escaping Closure) {
- self.state = .awaitingSubscription(.init(downstream: downstream, parallel: parallel, closure: closure))
- }
-
- deinit {
- self.cancel()
- self._state.deinitialize()
- }
-
- func receive(subscription: Subscription) {
- guard let config = self._state.activate(atomic: { .init(upstream: subscription, downstream: $0.downstream, parallel: $0.parallel, closure: $0.closure) }) else {
- return subscription.cancel()
- }
- config.downstream.receive(subscription: self)
- }
-
- func request(_ demand: Subscribers.Demand) {
- guard demand > 0 else { return }
-
- self._state.lock()
-
- guard let config = self.state.activeConfiguration else { return self._state.unlock() }
- config.demand.expected += demand
-
- guard case (let subscription?, let d) = config.calculateUpstreamDemand(), d > 0 else { return self._state.unlock() }
- config.demand.requested += d
-
- self._state.unlock()
-
- subscription.request(d)
- }
-
- func receive(_ input: Upstream.Output) -> Subscribers.Demand {
- self._state.lock()
-
- guard let config = self.state.activeConfiguration else { self._state.unlock(); return .none }
- config.demand.requested -= 1
-
- // If there are already a maximum amount of parallel processing values, ignore the inconming value.
- guard config.demand.processing < config.parallel else { self._state.unlock(); return .none }
- config.demand.processing += 1
-
- guard case (.some, let d) = config.calculateUpstreamDemand() else { fatalError("\nAn input was received, although the upstream already disappeared\n") }
- config.demand.requested += d
- let closure = config.closure
- let (isCancelled, promise) = self._makeClosures()
- self._state.unlock()
-
- closure(input, isCancelled, promise)
- return d
- }
-
- func receive(completion: Subscribers.Completion) {
- self._state.lock()
-
- guard let config = self.state.activeConfiguration else { return self._state.unlock() }
- config.upstream = nil
- config.demand.requested = .none
-
- if case .finished = completion, config.demand.processing > 0 { return self._state.unlock() }
- self.state = .terminated
- self._state.unlock()
- config.downstream.receive(completion: completion.mapError { $0 })
- }
-
- func cancel() {
- guard case .active(let config) = self._state.terminate() else { return }
- config.upstream?.cancel()
- }
- }
-}
-
-private extension Publishers.AsyncTryMap.Conduit {
- /// - precondition: When this function is called `self` is within the lock and in an active state.
- func _makeClosures() -> (check: Publishers.AsyncTryMap.CancelCheck, promise: Publishers.AsyncTryMap.Promise) {
- typealias P = Publishers.AsyncTryMap
- var isFinished = false
-
- let isCancelled: P.CancelCheck = { [weak self] in
- guard let self = self else { return true }
- self._state.lock()
- let result = self.state.isTerminated || isFinished
- self._state.unlock()
- return result
- }
-
- let promise: P.Promise = { [weak self] (result) in
- guard let self = self else {
- isFinished = true
- return .forbidden
- }
-
- self._state.lock()
- guard let config = self.state.activeConfiguration else {
- isFinished = true
- self._state.unlock()
- return .forbidden
- }
-
- let downstream = config.downstream
- config.demand.expected -= 1
-
- let (value, request): (Output, Publishers.Async.Request)
- switch result {
- case .success(let result):
- (value, request) = result
- self._state.unlock()
- case .failure(let error):
- isFinished = true
- config.upstream = nil
- config.demand.requested = .none
- self.state = .terminated
- self._state.unlock()
- downstream.receive(completion: .failure(error))
- return .forbidden
- }
-
- let receivedDemand = downstream.receive(value)
-
- self._state.lock()
- guard self.state.isActive else {
- isFinished = true
- self._state.unlock()
- return .forbidden
- }
-
- config.demand.expected += receivedDemand
- config.demand.processing -= 1
-
- let (s, d) = config.calculateUpstreamDemand()
- if case .continue = request, config.demand.expected > 0 {
- config.demand.processing += 1
- self._state.unlock()
- if let subscription = s, d > 1 { subscription.request(d - 1) }
- return .allowed
- }
-
- isFinished = true
-
- if let subscription = s {
- config.demand.requested += d
- self._state.unlock()
- if d > 0 { subscription.request(d) }
- } else {
- self._state.unlock()
- if config.demand.processing <= 0 { downstream.receive(completion: .finished) }
- }
-
- return .forbidden
- }
-
- return (isCancelled, promise)
- }
-}
-
-private extension Publishers.AsyncTryMap.Conduit {
- /// Values needed for the subscription awaiting state.
- struct _WaitConfiguration {
- let downstream: Downstream
- let parallel: Subscribers.Demand
- let closure: Publishers.AsyncTryMap.Closure
- }
-
- /// Values needed for the subscription active state.
- final class _ActiveConfiguration {
- /// The subscription used to manage the upstream back-pressure.
- var upstream: Subscription?
- /// The subscriber receiving the input and completion.
- let downstream: Downstream
- /// The maximum number of parallel requests allowed.
- let parallel: Subscribers.Demand
- /// The closure being called for each upstream value emitted.
- let closure: Publishers.AsyncTryMap.Closure
- /// The values requested by the downstream and the values being processed at the moment.
- var demand: (requested: Subscribers.Demand, expected: Subscribers.Demand, processing: Int)
-
- /// Designated initializer providing the requried upstream and downstream.
- init(upstream: Subscription, downstream: Downstream, parallel: Subscribers.Demand, closure: @escaping Publishers.AsyncTryMap.Closure) {
- self.upstream = upstream
- self.downstream = downstream
- self.parallel = parallel
- self.closure = closure
- self.demand = (.none, .none, 0)
- }
-
- /// Calculates how many values conduit wants.
- /// - remark: This method must be called within the conduit lock. It expects atomic access.
- /// - returns: The upstream subscription and the demand to perform. If `nil`, no operation shall be performed.
- func calculateUpstreamDemand() -> (subscription: Subscription?, demand: Subscribers.Demand) {
- // If there is no upstream or the upstream has been previously asked for an unlimited number of values, there is no need to request anything else.
- guard let upstream = self.upstream else { return (nil, .none) }
- guard let requested = self.demand.requested.max else { return (upstream, .none) }
-
- let inflight = requested + self.demand.processing
- // If the number of inflight requests is the same as the allowed parallel ones, don't ask for more.
- guard inflight < self.parallel else { return (upstream, .none) }
-
- let result = min(self.parallel, self.demand.expected) - inflight
- return (upstream, result)
- }
- }
-}
diff --git a/tests/conbiniTests/publishers/AsyncMapTests.swift b/tests/conbiniTests/publishers/AsyncMapTests.swift
deleted file mode 100644
index 2515310..0000000
--- a/tests/conbiniTests/publishers/AsyncMapTests.swift
+++ /dev/null
@@ -1,299 +0,0 @@
-import XCTest
-import Conbini
-import Combine
-
-/// Tests the correct behavior of the `AsyncMap` publisher.
-final class AsyncMapTests: XCTestCase {
- /// A custom error to send as a dummy.
- private struct CustomError: Swift.Error {}
- /// A convenience storage of cancellables.
- private var _cancellables = Set()
-
- override func setUp() {
- self.continueAfterFailure = false
- self._cancellables.removeAll()
- }
-}
-
-extension AsyncMapTests {
- /// Test async unlimited map operation (just one value asynchronous transformation).
- func testAsyncMapUnlimitedSingleValue() {
- let exp = self.expectation(description: "Publisher completes")
- let queue = DispatchQueue(label: "io.dehesa.conbini.tests.publishers.asyncmap")
-
- var received: [Int] = .init()
- Just(1).asyncMap(parallel: .unlimited) { (value, isCancelled, promise) in
- queue.async {
- XCTAssertFalse(isCancelled())
- XCTAssertEqual(promise(value * 10, .finished), .forbidden)
- XCTAssertTrue(isCancelled())
- }
- }.receive(on: DispatchQueue.main)
- .sink(receiveCompletion: {
- guard case .finished = $0 else { return XCTFail() }
- exp.fulfill()
- }, receiveValue: { received.append($0) })
- .store(in: &self._cancellables)
-
- self.wait(for: [exp], timeout: 0.2)
- XCTAssertEqual(received.count, 1)
- XCTAssertEqual(received[0], 10)
- }
-
- /// Test async unlimited map operation (array of asynchronous transformation).
- func testAsyncMapUnlimitedArray() {
- let exp = self.expectation(description: "Publisher completes")
- let queue = DispatchQueue(label: "io.dehesa.conbini.tests.publishers.asyncmap")
-
- var received: [Int] = .init()
- [1, 2, 3, 4].publisher
- .asyncMap(parallel: .unlimited) { (value, isCancelled, promise) in
- queue.async {
- XCTAssertFalse(isCancelled())
- XCTAssertEqual(promise(value * 10, .finished), .forbidden)
- XCTAssertTrue(isCancelled())
- }
- }.receive(on: DispatchQueue.main)
- .sink(receiveCompletion: {
- guard case .finished = $0 else { return XCTFail() }
- exp.fulfill()
- }, receiveValue: { received.append($0) })
- .store(in: &self._cancellables)
-
- self.wait(for: [exp], timeout: 0.2)
- XCTAssertEqual(received.sorted(), [10, 20, 30, 40].sorted())
- }
-
- /// Tests async unlimited map operation (each value received generate several values).
- func testAsyncMapUnlimitedMulti() {
- let exp = self.expectation(description: "Publisher completes")
- let queue = DispatchQueue(label: "io.dehesa.conbini.tests", autoreleaseFrequency: .never, target: nil)
-
- var received: [Int] = .init()
- [1, 2, 3, 4].publisher
- .asyncMap(parallel: .unlimited) { (value, _, promise) in
- queue.async { XCTAssertEqual(promise(value * 10 + 0, .continue), .allowed) }
- queue.async { XCTAssertEqual(promise(value * 10 + 1, .continue), .allowed) }
- queue.async { XCTAssertEqual(promise(value * 10 + 2, .finished), .forbidden) }
- }.sink(receiveCompletion: {
- guard case .finished = $0 else { return XCTFail() }
- exp.fulfill()
- }, receiveValue: {
- received.append($0)
- }).store(in: &self._cancellables)
-
- self.wait(for: [exp], timeout: 0.2)
- XCTAssertEqual(received, [10, 11, 12, 20, 21, 22, 30, 31, 32, 40, 41, 42])
- }
-
- /// Tests async restrict map operation (array of asynchronous transformation).
- func testAsyncMapRestrictedArrayOne() {
- let exp = self.expectation(description: "Publisher completes")
- let queue = DispatchQueue(label: "io.dehesa.conbini.tests.publishers.asyncmap")
-
- var received: [Int] = .init()
- [1, 2, 3, 4].publisher
- .asyncMap(parallel: .max(1)) { (value, _, promise) in
- queue.async { XCTAssertEqual(promise(value * 10, .finished), .forbidden) }
- }.sink(receiveCompletion: {
- guard case .finished = $0 else { return XCTFail() }
- exp.fulfill()
- }, receiveValue: { received.append($0) })
- .store(in: &self._cancellables)
-
- self.wait(for: [exp], timeout: 0.2)
- XCTAssertEqual(received, [10, 20, 30, 40])
- }
-
- func testAsyncMapRestrictedArrayTwo() {
- let exp = self.expectation(description: "Publisher completes")
- let queue = DispatchQueue(label: "io.dehesa.conbini.tests.publishers.asyncmap")
-
- var received: [Int] = .init()
- [1, 2, 3, 4, 5, 6, 7, 8].publisher
- .asyncMap(parallel: .max(2)) { (value, _, promise) in
- queue.async { XCTAssertEqual(promise(value * 10, .finished), .forbidden) }
- }.receive(on: DispatchQueue.main)
- .sink(receiveCompletion: {
- guard case .finished = $0 else { return XCTFail() }
- exp.fulfill()
- }, receiveValue: { received.append($0) })
- .store(in: &self._cancellables)
-
- self.wait(for: [exp], timeout: 0.2)
- XCTAssertEqual(received.sorted(), [10, 20, 30, 40, 50, 60, 70, 80].sorted())
- }
-
- /// Tests async unlimited map operation (each value received generate several values).
- func testAsyncMapRestrictedMulti() {
- let exp = self.expectation(description: "Publisher completes")
- let queue = DispatchQueue(label: "io.dehesa.conbini.tests", autoreleaseFrequency: .never, target: nil)
-
- var received: [Int] = .init()
- [1, 2, 3, 4].publisher
- .asyncMap(parallel: .max(1)) { (value, _, promise) in
- queue.async { XCTAssertEqual(promise(value * 10 + 0, .continue), .allowed) }
- queue.async { XCTAssertEqual(promise(value * 10 + 1, .continue), .allowed) }
- queue.async { XCTAssertEqual(promise(value * 10 + 2, .finished), .forbidden) }
- }.sink(receiveCompletion: {
- guard case .finished = $0 else { return XCTFail() }
- exp.fulfill()
- }, receiveValue: {
- received.append($0)
- }).store(in: &self._cancellables)
-
- self.wait(for: [exp], timeout: 0.2)
- XCTAssertEqual(received, [10, 11, 12, 20, 21, 22, 30, 31, 32, 40, 41, 42])
- }
-}
-
-extension AsyncMapTests {
- /// Test async unlimited map operation (just one value asynchronous transformation).
- func testAsyncTryMapUnlimitedSingleValue() {
- let exp = self.expectation(description: "Publisher completes")
- let queue = DispatchQueue(label: "io.dehesa.conbini.tests.publishers.asyncmap")
-
- var received: [Int] = .init()
- Just(1).asyncTryMap(parallel: .unlimited) { (value, _, promise) in
- queue.async { XCTAssertEqual(promise(.success((value * 10, .finished))), .forbidden) }
- }.receive(on: DispatchQueue.main)
- .sink(receiveCompletion: {
- guard case .finished = $0 else { return XCTFail() }
- exp.fulfill()
- }, receiveValue: { received.append($0) })
- .store(in: &self._cancellables)
-
- self.wait(for: [exp], timeout: 0.2)
- XCTAssertEqual(received.count, 1)
- XCTAssertEqual(received[0], 10)
- }
-
- /// Test async unlimited map operation (array of asynchronous transformation).
- func testAsyncTryMapUnlimitedArray() {
- let exp = self.expectation(description: "Publisher completes")
- let queue = DispatchQueue(label: "io.dehesa.conbini.tests.publishers.asyncmap")
-
- var received: [Int] = .init()
- [1, 2, 3, 4].publisher
- .asyncTryMap(parallel: .unlimited) { (value, _, promise) in
- queue.async { XCTAssertEqual(promise(.success((value * 10, .finished))), .forbidden) }
- }.receive(on: DispatchQueue.main)
- .sink(receiveCompletion: {
- guard case .finished = $0 else { return XCTFail() }
- exp.fulfill()
- }, receiveValue: { received.append($0) })
- .store(in: &self._cancellables)
-
- self.wait(for: [exp], timeout: 0.2)
- XCTAssertEqual(received.sorted(), [10, 20, 30, 40].sorted())
- }
-
- /// Tests async unlimited map operation (each value received generate several values).
- func testAsyncTryMapUnlimitedMulti() {
- let exp = self.expectation(description: "Publisher completes")
- let queue = DispatchQueue(label: "io.dehesa.conbini.tests", autoreleaseFrequency: .never, target: nil)
-
- var received: [Int] = .init()
- [1, 2, 3, 4].publisher
- .asyncTryMap(parallel: .unlimited) { (value, _, promise) in
- queue.async { XCTAssertEqual(promise(.success((value * 10 + 0, .continue))), .allowed) }
- queue.async { XCTAssertEqual(promise(.success((value * 10 + 1, .continue))), .allowed) }
- queue.async { XCTAssertEqual(promise(.success((value * 10 + 2, .finished))), .forbidden) }
- }.sink(receiveCompletion: {
- guard case .finished = $0 else { return XCTFail() }
- exp.fulfill()
- }, receiveValue: {
- received.append($0)
- }).store(in: &self._cancellables)
-
- self.wait(for: [exp], timeout: 0.2)
- XCTAssertEqual(received, [10, 11, 12, 20, 21, 22, 30, 31, 32, 40, 41, 42])
- }
-
- /// Tests async unlimited map operation (where one single error is published).
- func testAsyncTryMapUnlimitedError() {
- let exp = self.expectation(description: "Publisher fails")
- let queue = DispatchQueue(label: "io.dehesa.conbini.tests.publishers.asyncmap")
-
- var received: [Int] = .init()
- Just(1).asyncTryMap(parallel: .unlimited) { (value, isCancelled, promise) in
- queue.async {
- XCTAssertFalse(isCancelled())
- XCTAssertEqual(promise(.failure(CustomError())), .forbidden)
- XCTAssertTrue(isCancelled())
- }
- }.receive(on: DispatchQueue.main)
- .sink(receiveCompletion: {
- guard case .failure(let error) = $0, error is CustomError else { return XCTFail() }
- exp.fulfill()
- }, receiveValue: { received.append($0) })
- .store(in: &self._cancellables)
-
- self.wait(for: [exp], timeout: 0.2)
- XCTAssertTrue(received.isEmpty)
- }
-
- /// Tests async restrict map operation (array of asynchronous transformation).
- func testAsyncTryMapRestrictedArrayOne() {
- let exp = self.expectation(description: "Publisher completes")
- let queue = DispatchQueue(label: "io.dehesa.conbini.tests.publishers.asyncmap")
-
- var received: [Int] = .init()
- [1, 2, 3, 4].publisher
- .asyncTryMap(parallel: .max(1)) { (value, _, promise) in
- queue.async {
- XCTAssertEqual(promise(.success((value * 10, .finished))), .forbidden)
- }
- }.sink(receiveCompletion: {
- guard case .finished = $0 else { return XCTFail() }
- exp.fulfill()
- }, receiveValue: { received.append($0) })
- .store(in: &self._cancellables)
-
- self.wait(for: [exp], timeout: 0.2)
- XCTAssertEqual(received, [10, 20, 30, 40])
- }
-
- func testAsyncTryMapRestrictedArrayTwo() {
- let exp = self.expectation(description: "Publisher completes")
- let queue = DispatchQueue(label: "io.dehesa.conbini.tests.publishers.asyncmap")
-
- var received: [Int] = .init()
- [1, 2, 3, 4, 5, 6, 7, 8].publisher
- .asyncTryMap(parallel: .max(2)) { (value, _, promise) in
- queue.async {
- XCTAssertEqual(promise(.success((value * 10, .finished))), .forbidden)
- }
- }.receive(on: DispatchQueue.main)
- .sink(receiveCompletion: {
- guard case .finished = $0 else { return XCTFail() }
- exp.fulfill()
- }, receiveValue: { received.append($0) })
- .store(in: &self._cancellables)
-
- self.wait(for: [exp], timeout: 0.2)
- XCTAssertEqual(received.sorted(), [10, 20, 30, 40, 50, 60, 70, 80].sorted())
- }
-
- /// Tests async unlimited map operation (each value received generate several values).
- func testAsyncTryMapRestrictedMulti() {
- let exp = self.expectation(description: "Publisher completes")
- let queue = DispatchQueue(label: "io.dehesa.conbini.tests", autoreleaseFrequency: .never, target: nil)
-
- var received: [Int] = .init()
- [1, 2, 3, 4].publisher
- .asyncTryMap(parallel: .max(1)) { (value, _, promise) in
- queue.async { XCTAssertEqual(promise(.success((value * 10 + 0, .continue))), .allowed) }
- queue.async { XCTAssertEqual(promise(.success((value * 10 + 1, .continue))), .allowed) }
- queue.async { XCTAssertEqual(promise(.success((value * 10 + 2, .finished))), .forbidden) }
- }.sink(receiveCompletion: {
- guard case .finished = $0 else { return XCTFail() }
- exp.fulfill()
- }, receiveValue: {
- received.append($0)
- }).store(in: &self._cancellables)
-
- self.wait(for: [exp], timeout: 0.2)
- XCTAssertEqual(received, [10, 11, 12, 20, 21, 22, 30, 31, 32, 40, 41, 42])
- }
-}