Skip to content

Commit

Permalink
Retry + delay operator added
Browse files Browse the repository at this point in the history
  • Loading branch information
dehesa committed Feb 14, 2020
1 parent 6d29c8c commit f35da73
Show file tree
Hide file tree
Showing 18 changed files with 412 additions and 27 deletions.
19 changes: 14 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,14 @@ Conbini provides convenience `Publisher`s, operators, and `Subscriber`s to squee

### Publisher Operators

- `then` ignores all values and executes the provided publisher once a successful completion is received.
- `retry(on:intervals:)` attempts to recreate a failed subscription with the upstream publisher a given amount of times and waiting the specified number of seconds between failed attempts.

```swift
let apiCalls.retry(on: queue, intervals: [0.5, 2, 5])
// Same functionality to retry(3), but waiting between attemps 0.5, 2, and 3 seconds after a failed attempt.
```

- `then(maxDemand:_:)` ignores all values and executes the provided publisher once a successful completion is received.
<br>If a failed completion is emitted, it is forwarded downstream.

```swift
Expand All @@ -21,7 +28,7 @@ Conbini provides convenience `Publisher`s, operators, and `Subscriber`s to squee

This operator optionally lets you control backpressure with its `maxDemand` parameter. The parameter behaves like `flatMap`'s `maxPublishers`, which specifies the maximum demand requested to the upstream at any given time.

- `handleEnd` executes (only once) the provided closure when the publisher completes (whether successfully or with a failure) or when the publisher gets cancelled.
- `handleEnd(_:)` executes (only once) the provided closure when the publisher completes (whether successfully or with a failure) or when the publisher gets cancelled.
<br> It performs the same operation that the standard `handleEvents(receiveSubscription:receiveOutput:receiveCompletion:receiveCancel:receiveRequest:)` would perform if you add similar closures to `receiveCompletion` and `receiveCancel`.

```swift
Expand All @@ -34,7 +41,7 @@ Conbini provides convenience `Publisher`s, operators, and `Subscriber`s to squee
}
```

- `asyncMap` transforms elements received from upstream (similar to `map`), but the result is returned in a promise instead of using the `return` statement.
- `asyncMap(_:)` transforms elements received from upstream (similar to `map`), but the result is returned in a promise instead of using the `return` statement.

```swift
let publisher = [1, 2].publisher.asyncMap { (value, promise) in
Expand All @@ -47,7 +54,7 @@ Conbini provides convenience `Publisher`s, operators, and `Subscriber`s to squee

This operator also provides a `try` variant accepting a result (instead of a value).

- `sequentialMap` transform elements received from upstream (as `asyncMap`) with the twist that it allows you to call multiple times the `promise` callback; effectively transforming one value into many results.
- `sequentialMap(_:)` transform elements received from upstream (as `asyncMap`) with the twist that it allows you to call multiple times the `promise` callback; effectively transforming one value into many results.

```swift
let publisher = [1, 2].publisher.sequentialMap { (value, promise) in
Expand All @@ -65,7 +72,7 @@ Conbini provides convenience `Publisher`s, operators, and `Subscriber`s to squee

### Subscriber Operators

- `result` subscribes to the receiving publisher and executes the provided closure when a value is received.
- `result(onEmpty:_:)` subscribes to the receiving publisher and executes the provided closure when a value is received.
<br>In case of failure, the handler is executed with such failure.

```swift
Expand Down Expand Up @@ -142,7 +149,9 @@ Conbini provides convenience `Publisher`s, operators, and `Subscriber`s to squee
`Deferred...` closures await for subscriptions and a _greater-than-zero_ demand before executing. This also means, the closure will re-execute for any new subscriber.
- `Deferred` is the most similar in functionality, but it only accepts a publisher.

- `DelayedRetry` provides the functionality of the `retry(on:intervals:)` operator.
- `Then` provides the functionality of the `then` operator.
- `HandleEnd` provides the functionality of the `handleEnd(_:)` operator.

### Extra Functionality

Expand Down
14 changes: 14 additions & 0 deletions Sources/Conbini/Operators/RetryOp.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import Combine
import Foundation

extension Publisher {
/// Attempts to recreate a failed subscription with the upstream publisher using a specified number of attempts to establish the connection and a given amount of seconds between attempts.
/// - parameter scheduler: The scheduler used to wait for the specific intervals.
/// - parameter tolerance: The tolerance used when scheduling a new attempt after a failure. A default implies the minimum tolerance.
/// - parameter options: The options for the given scheduler.
/// - parameter intervals: The amount of seconds to wait after a failure occurrence. Negative values are considered zero.
/// - returns: A publisher that attemps to recreate its subscription to a failed upstream publisher a given amount of times and waiting a given amount of seconds between attemps.
public func retry<S>(on scheduler: S, tolerance: S.SchedulerTimeType.Stride? = nil, options: S.SchedulerOptions? = nil, intervals: [TimeInterval]) -> Publishers.DelayedRetry<Self,S> where S:Scheduler {
.init(upstream: self, scheduler: scheduler, options: options, intervals: intervals)
}
}
2 changes: 1 addition & 1 deletion Sources/Conbini/Publishers/DeferredComplete.swift
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ extension DeferredComplete {
@Lock private var state: State<Void,Configuration>

init(downstream: Downstream, closure: @escaping Closure) {
self._state = .init(wrappedValue: .active(.init(downstream: downstream, closure: closure)))
self.state = .active(.init(downstream: downstream, closure: closure))
}

func request(_ demand: Subscribers.Demand) {
Expand Down
2 changes: 1 addition & 1 deletion Sources/Conbini/Publishers/DeferredFuture.swift
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ extension DeferredFuture {
@Lock private var state: State<Void,Configuration>

init(downstream: Downstream, closure: @escaping Closure) {
self._state = .init(wrappedValue: .active(.init(downstream: downstream, step: .awaitingDemand(closure: closure))))
self.state = .active(.init(downstream: downstream, step: .awaitingDemand(closure: closure)))
}

func request(_ demand: Subscribers.Demand) {
Expand Down
4 changes: 2 additions & 2 deletions Sources/Conbini/Publishers/DeferredPassthrough.swift
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,15 @@ extension DeferredPassthrough {

/// Designated initializer passing all the needed info (except the upstream subscription).
init(upstream: PassthroughSubject<Output,Failure>, downstream: Downstream, closure: @escaping Closure) {
self._state = .init(wrappedValue: .awaitingSubscription(.init(upstream: upstream, downstream: downstream, closure: closure)))
self.state = .awaitingSubscription(.init(upstream: upstream, downstream: downstream, closure: closure))
}

deinit {
self.cancel()
}

func receive(subscription: Subscription) {
guard let config = self._state.activate(locking: { .init(upstream: subscription, downstream: $0.downstream, setup: ($0.upstream, $0.closure)) }) else {
guard let config = self._state.activate(atomic: { .init(upstream: subscription, downstream: $0.downstream, setup: ($0.upstream, $0.closure)) }) else {
return subscription.cancel()
}
config.downstream.receive(subscription: self)
Expand Down
2 changes: 1 addition & 1 deletion Sources/Conbini/Publishers/DeferredResult.swift
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ extension DeferredResult {
@Lock private var state: State<Void,Configuration>

init(downstream: Downstream, closure: @escaping Closure) {
self._state = .init(wrappedValue: .active(.init(downstream: downstream, closure: closure)))
self.state = .active(.init(downstream: downstream, closure: closure))
}

func request(_ demand: Subscribers.Demand) {
Expand Down
2 changes: 1 addition & 1 deletion Sources/Conbini/Publishers/DeferredTryComplete.swift
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ extension DeferredTryComplete {
@Lock private var state: State<Void,Configuration>

init(downstream: Downstream, closure: @escaping Closure) {
self._state = .init(wrappedValue: .active(.init(downstream: downstream, closure: closure)))
self.state = .active(.init(downstream: downstream, closure: closure))
}

func request(_ demand: Subscribers.Demand) {
Expand Down
2 changes: 1 addition & 1 deletion Sources/Conbini/Publishers/DeferredTryValue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ extension DeferredTryValue {
/// - parameter downstream: Downstream subscriber receiving the data from this instance.
/// - parameter closure: Closure in charge of generating the emitted value.
init(downstream: Downstream, closure: @escaping Closure) {
self._state = .init(wrappedValue: .active(.init(downstream: downstream, closure: closure)))
self.state = .active(.init(downstream: downstream, closure: closure))
}

func request(_ demand: Subscribers.Demand) {
Expand Down
2 changes: 1 addition & 1 deletion Sources/Conbini/Publishers/DeferredValue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ extension DeferredValue {
/// - parameter downstream: Downstream subscriber receiving the data from this instance.
/// - parameter closure: Closure in charge of generating the emitted value.
init(downstream: Downstream, closure: @escaping Closure) {
self._state = .init(wrappedValue: .active(.init(downstream: downstream, closure: closure)))
self.state = .active(.init(downstream: downstream, closure: closure))
}

func request(_ demand: Subscribers.Demand) {
Expand Down
4 changes: 2 additions & 2 deletions Sources/Conbini/Publishers/HandleEnd.swift
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ extension Publishers.HandleEnd {
}

func receive(subscription: Subscription) {
guard let config = self._state.activate(locking: { ActiveConfiguration(upstream: subscription, closure: $0.closure, downstream: $0.downstream) }) else {
guard let config = self._state.activate(atomic: { .init(upstream: subscription, closure: $0.closure, downstream: $0.downstream) }) else {
return subscription.cancel()
}
config.downstream.receive(subscription: self)
Expand All @@ -55,8 +55,8 @@ extension Publishers.HandleEnd {
func request(_ demand: Subscribers.Demand) {
self._state.lock()
guard let config = self.state.activeConfiguration else { return self._state.unlock() }
config.upstream.request(demand)
self._state.unlock()
config.upstream.request(demand)
}

func receive(_ input: Upstream.Output) -> Subscribers.Demand {
Expand Down

0 comments on commit f35da73

Please sign in to comment.