-
Notifications
You must be signed in to change notification settings - Fork 2
/
AsyncMap.swift
251 lines (209 loc) · 12 KB
/
AsyncMap.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
import Combine
extension Publishers {
/// Transforms all elements from the upstream publisher with a provided closure.
///
/// This publisher only fails if the upstream fails. Also, remember that asynchronous and a parallel setting different than one may return the values in an unexpected order.
public struct AsyncMap<Upstream,Output>: Publisher where Upstream:Publisher {
public typealias Failure = Upstream.Failure
/// The closure type used to return the result of the transformation.
/// - parameter result: The transformation result.
/// - parameter request: Whether the closure want to continue sending values or it is done.
/// - returns: Enum indicating whether the closure can keep calling this promise.
public typealias Promise = (_ result: Output, _ request: Publishers.Async.Request) -> Publishers.Async.Permission
/// Checks whether the publisher is already cancelled or it is still operating.
/// - returns: Boolean indicating whether the publisher has been already cancelled (`true`) or it is still active (`false`).
public typealias CancelCheck = () -> Bool
/// The closure type being stored for value transformation.
/// - parameter value: The value received from the upstream.
/// - parameter promise: The promise to call once the transformation is done.
public typealias Closure = (_ value: Upstream.Output, _ isCancelled: @escaping CancelCheck, _ promise: @escaping Promise) -> Void
/// The upstream publisher.
public let upstream: Upstream
/// The maximum number of parallel requests allowed.
public let parallel: Subscribers.Demand
/// The closure generating the downstream value.
/// - note: The closure is kept in the publisher; therefore, if you keep the publisher around any reference in the closure will be kept too.
public let closure: Closure
/// Creates a publisher that transforms the incoming values. This transformation is asynchronous and it may entail several output values.
///
/// The `parallel` parameter indicates how many upstream values shall be processed at the same time. It is also important to notice that the downstream demand is uphold. Therefore, if the downstream requests 3 values, and there are already 2 promises generating several values (while not completing), the following upstream values will be ignored.
///
/// The only cases were values are promised not to be ignored are:
/// - If `parallel` is `.max(1)` and the promise user looks for the `Permission` response before sending new values.
/// - If a promise is just sending one output (similar to a `map` publisher).
/// - precondition: `parallel` must be greater than zero.
/// - parameter upstream: The event emitter to the publisher being created.
/// - parameter parallel: The maximum number of parallel upstream value processing.
/// - parameter transform: Closure in charge of transforming the values.
@inlinable public init(upstream: Upstream, parallel: Subscribers.Demand, transform: @escaping Closure) {
precondition(parallel > 0)
self.upstream = upstream
self.parallel = parallel
self.closure = transform
}
public func receive<S>(subscriber: S) where S:Subscriber, S.Input==Output, S.Failure==Failure {
let conduit = Conduit(downstream: subscriber, parallel: self.parallel, closure: self.closure)
self.upstream.subscribe(conduit)
}
}
}
fileprivate extension Publishers.AsyncMap {
/// Subscription representing an activated `AsyncMap` publisher.
final class Conduit<Downstream>: Subscription, Subscriber where Downstream:Subscriber, Downstream.Input==Output, Downstream.Failure==Failure {
typealias Input = Upstream.Output
typealias Failure = Upstream.Failure
/// Enum listing all possible conduit states.
@Lock private var state: State<_WaitConfiguration,_ActiveConfiguration>
/// Creates a representation of an `AsyncMap` publisher.
init(downstream: Downstream, parallel: Subscribers.Demand, closure: @escaping Closure) {
self.state = .awaitingSubscription(.init(downstream: downstream, parallel: parallel, closure: closure))
}
deinit {
self.cancel()
self._state.deinitialize()
}
func receive(subscription: Subscription) {
guard let config = self._state.activate(atomic: { .init(upstream: subscription, downstream: $0.downstream, parallel: $0.parallel, closure: $0.closure) }) else {
return subscription.cancel()
}
config.downstream.receive(subscription: self)
}
func request(_ demand: Subscribers.Demand) {
guard demand > 0 else { return }
self._state.lock()
guard let config = self.state.activeConfiguration else { return self._state.unlock() }
config.demand.expected += demand
guard case (let subscription?, let d) = config.calculateUpstreamDemand(), d > 0 else { return self._state.unlock() }
config.demand.requested += d
self._state.unlock()
subscription.request(d)
}
func receive(_ input: Upstream.Output) -> Subscribers.Demand {
self._state.lock()
guard let config = self.state.activeConfiguration else { self._state.unlock(); return .none }
config.demand.requested -= 1
// If there are already a maximum amount of parallel processing values, ignore the inconming value.
guard config.demand.processing < config.parallel else { self._state.unlock(); return .none }
config.demand.processing += 1
guard case (.some, let d) = config.calculateUpstreamDemand() else { fatalError("\nAn input was received, although the upstream already disappeared\n") }
config.demand.requested += d
let closure = config.closure
let (isCancelled, promise) = self._makeClosures()
self._state.unlock()
closure(input, isCancelled, promise)
return d
}
func receive(completion: Subscribers.Completion<Failure>) {
self._state.lock()
guard let config = self.state.activeConfiguration else { return self._state.unlock() }
config.upstream = nil
config.demand.requested = .none
if case .finished = completion, config.demand.processing > 0 { return self._state.unlock() }
self.state = .terminated
self._state.unlock()
config.downstream.receive(completion: completion)
}
func cancel() {
guard case .active(let config) = self._state.terminate() else { return }
config.upstream?.cancel()
}
}
}
private extension Publishers.AsyncMap.Conduit {
/// - precondition: When this function is called `self` is within the lock and in an active state.
func _makeClosures() -> (check: Publishers.AsyncMap<Upstream,Output>.CancelCheck, promise: Publishers.AsyncMap<Upstream,Output>.Promise) {
typealias P = Publishers.AsyncMap<Upstream,Output>
var isFinished = false
let isCancelled: P.CancelCheck = { [weak self] in
guard let self = self else { return true }
self._state.lock()
let result = self.state.isTerminated || isFinished
self._state.unlock()
return result
}
let promise: P.Promise = { [weak self] (value, request) in
guard let self = self else {
isFinished = true
return .forbidden
}
self._state.lock()
guard let config = self.state.activeConfiguration else {
isFinished = true
self._state.unlock()
return .forbidden
}
let downstream = config.downstream
config.demand.expected -= 1
self._state.unlock()
let receivedDemand = downstream.receive(value)
self._state.lock()
guard self.state.isActive else {
isFinished = true
self._state.unlock()
return .forbidden
}
config.demand.expected += receivedDemand
config.demand.processing -= 1
let (s, d) = config.calculateUpstreamDemand()
if case .continue = request, config.demand.expected > 0 {
config.demand.processing += 1
self._state.unlock()
if let subscription = s, d > 1 { subscription.request(d - 1) }
return .allowed
}
isFinished = true
if let subscription = s {
config.demand.requested += d
self._state.unlock()
if d > 0 { subscription.request(d) }
} else {
self._state.unlock()
if config.demand.processing <= 0 { downstream.receive(completion: .finished) }
}
return .forbidden
}
return (isCancelled, promise)
}
}
private extension Publishers.AsyncMap.Conduit {
/// Values needed for the subscription awaiting state.
struct _WaitConfiguration {
let downstream: Downstream
let parallel: Subscribers.Demand
let closure: Publishers.AsyncMap<Upstream,Output>.Closure
}
/// Values needed for the subscription active state.
final class _ActiveConfiguration {
/// The subscription used to manage the upstream back-pressure.
var upstream: Subscription?
/// The subscriber receiving the input and completion.
let downstream: Downstream
/// The maximum number of parallel requests allowed.
let parallel: Subscribers.Demand
/// The closure being called for each upstream value emitted.
let closure: Publishers.AsyncMap<Upstream,Output>.Closure
/// The values requested by the downstream and the values being processed at the moment.
var demand: (requested: Subscribers.Demand, expected: Subscribers.Demand, processing: Int)
/// Designated initializer providing the requried upstream and downstream.
init(upstream: Subscription, downstream: Downstream, parallel: Subscribers.Demand, closure: @escaping Publishers.AsyncMap<Upstream,Output>.Closure) {
self.upstream = upstream
self.downstream = downstream
self.parallel = parallel
self.closure = closure
self.demand = (.none, .none, 0)
}
/// Calculates how many values conduit wants.
/// - remark: This method must be called within the conduit lock. It expects atomic access.
/// - returns: The upstream subscription and the demand to perform. If `nil`, no operation shall be performed.
func calculateUpstreamDemand() -> (subscription: Subscription?, demand: Subscribers.Demand) {
// If there is no upstream or the upstream has been previously asked for an unlimited number of values, there is no need to request anything else.
guard let upstream = self.upstream else { return (nil, .none) }
guard let requested = self.demand.requested.max else { return (upstream, .none) }
let inflight = requested + self.demand.processing
// If the number of inflight requests is the same as the allowed parallel ones, don't ask for more.
guard inflight < self.parallel else { return (upstream, .none) }
let result = min(self.parallel, self.demand.expected) - inflight
return (upstream, result)
}
}
}