Conbini provides convenience Publisher
s, operators, and Subscriber
s to squeeze the most out of Apple's Combine framework.
-
retry(on:intervals:)
attempts to recreate a failed subscription with the upstream publisher a given amount of times. Furthermore, it waits the specified number of seconds between failed attempts.let apiCallPublisher.retry(on: queue, intervals: [0.5, 2, 5]) // Same functionality to retry(3), but waiting between attemps 0.5, 2, and 5 seconds after each failed attempt.
This operator accept any scheduler conforming to
Scheduler
(e.g.DispatchQueue
,RunLoop
, etc). You can also optionally tweak the tolerance and scheduler operations. -
then(maxDemand:_:)
ignores all values and executes the provided publisher once a successful completion is received.
If a failed completion is emitted, it is forwarded downstream.let publisher = setConfigurationOnServer.then { subscribeToWebsocket.publisher }
This operator optionally lets you control backpressure with its
maxDemand
parameter. The parameter behaves likeflatMap
'smaxPublishers
, which specifies the maximum demand requested to the upstream at any given time. -
handleEnd(_:)
executes (only once) the provided closure when the publisher completes (whether successfully or with a failure) or when the publisher gets cancelled.
It performs the same operation that the standardhandleEvents(receiveSubscription:receiveOutput:receiveCompletion:receiveCancel:receiveRequest:)
would perform if you add similar closures toreceiveCompletion
andreceiveCancel
.let publisher = upstream.handleEnd { (completion) in switch completion { case .none: // The publisher got cancelled. case .finished: // The publisher finished successfully. case .failure(let error): // The publisher generated an error. } }
-
asyncMap(_:)
transforms elements received from upstream (similar tomap
), but the result is returned in a promise instead of using thereturn
statement.let publisher = [1, 2].publisher.asyncMap { (value, promise) in queue.async { let newValue = String(value * 10) promise(newValue) } }
This operator also provides a
try
variant accepting a result (instead of a value). -
sequentialMap(_:)
transform elements received from upstream (asasyncMap
) with the twist that it allows you to call multiple times thepromise
callback; effectively transforming one value into many results.let publisher = [1, 2].publisher.sequentialMap { (value, promise) in queue.async { promise(value * 10 + 1, .continue) promise(value * 10 + 2, .continue) promise(value * 10 + 3, .finished) } } // Downstream will receive: [11, 12, 13, 21, 22, 23]
The
SequentialMap
publisher executes one upstream value at a time. It doesn't request or fetch a previously sent upstream value till thetransform
closure is fully done andpromise(..., .finished)
has been called.
This operator also provides atry
variant accepting a result (instead of a value).
-
result(onEmpty:_:)
subscribes to the receiving publisher and executes the provided closure when a value is received.
In case of failure, the handler is executed with such failure.let cancellable = serverRequest.result { (result) in switch result { case .success(let value): ... case .failure(let error): ... } }
The operator lets you optionally generate an error (which will be consumed by your
handler
) for cases where upstream completes without a value. -
sink(fixedDemand:)
subscribes upstream and request exactlyfixedDemand
values (after which the subscriber completes).
The subscriber may receive zero tofixedDemand
of values before completing, but never more than that.let cancellable = upstream.sink(fixedDemand: 5, receiveCompletion: { (completion) in ... }) { (value) in ... }
-
sink(maxDemand:)
subscribes upstream requestingmaxDemand
values and always keeping the same backpressure.let cancellable = upstream.sink(maxDemand: 3) { (value) in ... }
-
Deferred...
publishers accept a closure that is executed once a greater-than-zero demand is requested.
There are several flavors:-
DeferredValue
emits a single value and then completes.
The value is not provided/cached, but instead a closure will generate it. The closure is executed once a positive subscription is received.let publisher = DeferredValue<Int,CustomError> { return intenseProcessing() }
A
Try
variant is also offered, enabling you tothrow
from within the closure. It loses the concrete error type (i.e. it gets converted toSwift.Error
). -
DeferredResult
offers the same functionality asDeferredValue
, but the closure generates aResult
instead.let publisher = DeferredResult { guard someExpression else { return .failure(CustomError()) } return .success(someValue) }
-
DeferredComplete
offers the same functionality asDeferredValue
, but the closure only generates a completion event.let publisher = DeferredComplete { return errorOrNil }
A
Try
variant is also offered, enabling you tothrow
from within the closure; but it loses the concrete error type (i.e. gets converted toSwift.Error
). -
DeferredPassthrough
is similar to wrapping aPassthrough
subject on aDeferred
closure, with the diferrence that thePassthrough
given on the closure is already wired on the publisher chain and can start sending values right away. Also, the memory management is taken care of and every new subscriber receives a new subject (closure re-execution).let publisher = DeferredPassthrough { (subject) in subject.send(something) subject.send(somethingElse) subject.send(completion: .finished) }
There are several reason for these publishers to exist instead of using other
Combine
-provided closure such asJust
,Future
, orDeferred
:Future
publishers execute their provided closure right away (upon initialization) and then cache the returned value. That value is then forwarded for any future subscription.Deferred...
closures await for subscriptions and a greater-than-zero demand before executing. This also means, the closure will re-execute for any new subscriber.Deferred
is the most similar in functionality, but it only accepts a publisher.
-
-
DelayedRetry
provides the functionality of theretry(on:intervals:)
operator. -
Then
provides the functionality of thethen
operator. -
HandleEnd
provides the functionality of thehandleEnd(_:)
operator.
-
Publishers.PrefetchStrategy
has been extended with a.fatalError(message:file:line:)
option to stop execution if the buffer is filled.
This is useful during development and debugging and for cases when you are sure the buffer will never be filled.publisher.buffer(size: 10, prefetch: .keepFull, whenFull: .fatalError())
-
FixedSink
requests a fixed amount of values upon subscription and once if has received them all it completes/cancel the pipeline.
The values are requested through backpressure, so no more than the allowed amount of values are generated upstream.let subscriber = FixedSink(demand: 5) { (value) in ... } upstream.subscribe(subscriber)
-
GraduatedSink
requests a fixed amount of values upon subscription and always keep the same demand by asking one more value upon input reception.
The standardSubscribers.Sink
requests an.unlimited
amount of values upon subscription. This might not be what we want since some times a control of in-flight values might be desirable (e.g. allowing only n in-flight* API calls at the same time).let subscriber = GraduatedSink(maxDemand: 3) { (value) in ... } upstream.subscribe(subscriber)
The names for these subscribers are not very good/accurate. Any suggestion is appreciated.
Conbini provides convenience subscribers to ease code testing. These subscribers make the test wait till a specific expectation is fulfilled (or making the test fail in a negative case). Furthermore, if a timeout ellapses or a expectation is not fulfilled, the affected test line will be marked in red correctly in Xcode.
-
expectsCompletion
subscribes to a publisher making the running test wait for a successful completion while ignoring all emitted values.publisherChain.expectsCompletion(timeout: 0.8, on: test)
-
expectsFailure
subscribes to a publisher making the running test wait for a failed completion while ignoring all emitted values.publisherChain.expectsFailure(timeout: 0.8, on: test)
-
expectsOne
subscribes to a publisher making the running test wait for a single value and a successful completion.
If more than one values are emitted or the publisher fails, the subscription gets cancelled and the test fails.let emittedValue = publisherChain.expectsOne(timeout: 0.8, on: test)
-
expectsAll
subscribes to a publisher making the running test wait for zero or more values and a successful completion.let emittedValues = publisherChain.expectsAll(timeout: 0.8, on: test)
-
expectsAtLeast
subscribes to a publisher making the running test wait for at least the provided amount of values.
Once the provided amount of values is received, the publisher gets cancelled and the values are returned.let emittedValues = publisherChain.expectsAtLeast(values: 5, timeout: 0.8, on: test)
This operator/subscriber accepts an optional closure to check every value received.
let emittedValues = publisherChain.expectsAtLeast(values: 5, timeout: 0.8, on: test) { (value) in XCTAssert... }
The testing conveniences depend on XCTest, which is not available on regular execution. That is why Conbini is offered in two flavors:
import Conbini
includes all code excepts the testing conveniences.import ConbiniForTesting
includes the testing functionality only.
The rule of thumb is to use import Conbini
in your regular code (e.g. within your framework or app) and write import ConbiniForTesting
within your test target files.
- Apple's Combine documentation.
- The Combine book is an excellent Ray Wenderlich book about the Combine framework.
- Cocoa with love has a great series of articles about the inner workings of Combine: 1. Protocols, 2. Sharing, 3. Asynchrony.
- OpenCombine is an open source implementation of Apple's Combine framework.
- CombineX is an open source implementation of Apple's Combine framework.
This framework name references both the
Combine
framework and the helpful Japanese convenience stores 😄