Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

how to provide backpressure (lazy signal producer that fetches more data as the data is consumed) #2808

Closed
samidalouche opened this issue Apr 20, 2016 · 6 comments
Labels

Comments

@samidalouche
Copy link

I am trying to figure out how to create a SignalProducer that will repeatedly fetch the next chunk of data when the whole stream has been consumed.
I have an implementation, however the backpressure mechanism doesn't work if observeOn is called on the producer. What seems to happen is that the producer continues to fetch the new data even though the client still hasn't consumed the entire stream.

Any idea how I could achieve what I want? Is that actually possible?

Thanks!

public func streamMessages(from startOffset: Offset = Offset(value: 0), toExclusive endOffsetOpt : Offset? = .None,
                               includeTransient: Bool = true) -> SignalProducer<Message, NoError> {

        func streamMessagesChunk(from: Offset) -> SignalProducer<Message, NoError> {
            func waitForNewMessageAvailable(from: Offset) -> SignalProducer<Offset?, NoError> {
                return self.lastOffsetIncludingTransient(includeTransient).producer
                    .filter{ offsetOpt in offsetOpt.map {offset in offset >= from } ?? false }
                    .take(1)
            }

            let streamMessagesProducer = self.fetchMessages(10, from: from, includeTransientMessages: includeTransient)
                .flatMap(.Concat){ messages in SignalProducer<Message, NoError>(values: messages)}

            return waitForNewMessageAvailable(from)
                .then(streamMessagesProducer)
        }

        func streamNextBatch(from: Offset, observer: Observer<Message, NoError>, observerDisposable: CompositeDisposable) -> Void {
            func hasReachedEndOffset(currentOffset: Offset) -> Bool {
                return endOffsetOpt.map{ endOffset in endOffset == currentOffset } ?? false
            }

            print("StreamNextBatch \(from)")

            streamMessagesChunk(from).startWithSignal { signal, signalDisposable in
                var lastOffset: Offset = from
                let disposableHandle = observerDisposable.addDisposable(signalDisposable)

                signal.observe { switch $0 {
                    case let .Failed(error): observer.sendFailed(error)
                    case .Interrupted: observer.sendInterrupted()
                    case .Completed:
                        disposableHandle.remove()
                        streamNextBatch(lastOffset.next, observer: observer, observerDisposable: observerDisposable)
                    case .Next(let message):
                        if hasReachedEndOffset(message.offset) {
                            disposableHandle.remove()
                            observer.sendCompleted()
                        } else {
                            lastOffset = message.offset
                            observer.sendNext(message)
                        }
                    }
                }
            }
        }

        return SignalProducer<Message, NoError> { observer, observerDisposable in
            streamNextBatch(startOffset, observer: observer, observerDisposable: observerDisposable)
        }
    }

func testShouldStreamMessagesWaitingForFutureMessages() {
        let expectation = self.expectationWithDescription("Test")
        let messages = (0...50000).map{value in self.createKafkaData(UInt64(value)) }
        let nextMessages = (50001...65000).map{value in self.createKafkaData(UInt64(value)) }

        try! self.sut.publishMessages(messages, persist: false).get()

        let messageCountFuture = self.sut
            .streamMessages(from: Offset(value: 45), toExclusive: Offset(value: 60000), includeTransient: true)
            .observeOn(QueueScheduler())
            .map{ m in print("sleeping at \(m.data)"); sleep(1); return 1 }
            .reduce(0, +)
            .toFuture()

        messageCountFuture.onSuccess{ count in
            expect(count) == 15
            expectation.fulfill()
        }

        try! self.sut.publishMessages(nextMessages, persist: false).get()

        self.waitForExpectationsWithTimeout(30, handler: nil)
    }

     func createKafkaData(number: UInt64) -> String {
        return "message \(number)"
    }
@samidalouche samidalouche changed the title Question: how to provide backpressure how to provide backpressure Apr 20, 2016
@samidalouche samidalouche changed the title how to provide backpressure how to provide backpressure (lazy signal producer that fetches more data as the data is consumed) Apr 20, 2016
@mdiep
Copy link
Contributor

mdiep commented May 7, 2016

Sorry for the long delay in a response. We've been working through a lot of old issues as some contributors have dropped off.

Sadly, AFAIK there's not currently a way to provide backpressure across schedulers. 😞

Do you know if any other reactive frameworks provide a way to do this? I definitely think it's an interesting concept.

@samidalouche
Copy link
Author

As far as I know, the frameworks from the Rx family do not generally support backpressure, but one particular implementation of streams (akka-streams) does support it: http://www.smartjava.org/content/visualizing-back-pressure-and-reactive-streams-akka-streams-statsd-grafana-and-influxdb

The idea is that data flows downstream and demand flows upstream, so the recipient is always in control of the maximal incoming data rate.

@samidalouche
Copy link
Author

Also RxSwift does not seem to have backpressure, but it looks like their defer operator would be enough to address the same needs: http://reactivex.io/documentation/operators/defer.html

How hard would it be to have such a defer operator in ReactiveCocoa?

@neilpa
Copy link
Member

neilpa commented May 11, 2016

Not hard. I did something similar in Rex. Although, it sounds like your asking for a generalized version. Instead of a time interval for delay you would want a trigger signal/producer param to defer the subscription.

@samidalouche
Copy link
Author

@neilpa we would need the subscription to be deferred until the consumer has finished consuming the stream. This works well with ReactiveCocoa (example above) until you start subscribing on a different scheduler.

@mdiep
Copy link
Contributor

mdiep commented May 19, 2016

I'm going to close this since your question has been answered. If you'd like to make a proposal for something, feel free to open a new issue or PR!

@mdiep mdiep closed this as completed May 19, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants