Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support await / AsyncSequence #830

Open
NachoSoto opened this issue Jun 7, 2021 · 13 comments
Open

Support await / AsyncSequence #830

NachoSoto opened this issue Jun 7, 2021 · 13 comments
Labels

Comments

@NachoSoto
Copy link
Member

NachoSoto commented Jun 7, 2021

I haven’t tried the new betas yet, but I imagine the prevalence of @MainActor in iOS 15 will bring issues to current uses of ReactiveSwift.
Specifically, I imagine you won’t be able to just do this:

producer
  .observe(on: UIScheduler())
  .start { [label] in label.text = $0  }

Feature request:

for await value in producer.start() {
  self.label.text = value
}

Unfortunately for try await would mean we lose error type information, so I would propose this API produces Result<Value, Error>, or a sequence of Value if Error is Never.

I’ll probably work on this throughout the week, but other thoughts are appreciated!

References:

Other ideas:

  • Being able to create SignalProducers using async:
SignalProducer<Int, MyError> { observer, disposable in
  observer.send(value: await f1())

  guard !disposable.isDisposed else { return }

  do {
    observer.send(value: try await f2())
  } catch {
    observer.send(error: error)
  }
}
  • Or simply just one async function:
let producer = SignalProducer<Int, Error>(asyncFunction: f)
  • New collect overloads:
let values: [Int] = await producer.collect()
let result: Result<[Int], MyError> = try await producer.collect()
@andersio
Copy link
Member

andersio commented Jun 8, 2021

It should be modelled as a conversion to AsyncSequence conforming type. asSequence() for fluency presumably.

It could also be SignalProducer(Convertible) inheriting AsyncSequence, though I am not sure how the operator name clash will pan out.

The nuance is likely around dealing with the scenario of a fast producer against a slow consumer. To achieve the deliver exactly once default of today’s RAS, we probably need to use unlimited buffering with the (supposedly available) AsyncStream.

(That is assuming we throw away all the blocking options)

@andersio
Copy link
Member

andersio commented Jun 11, 2021

Another thing worth watching is whether Swift Concurrency will end up backward deployed.

Async-await is really a natural way to express backpressure (callee can defer return of control flow by holding onto the continuation), especially personally having had a hands-on experience of it in Kotlin Coroutines. So that’s honestly how I would see proper backpressure being introduced into RAS, versus the Combine/Reactive Streams model (works but sometimes brittle).

(especially relevant since the proposed AsyncSequence stuff did not grow into a full fledged FRP toolbox, which means there are still values in using community libraries like RAS).

Either way, for now, we should bet on basically simple interops.

@danya61
Copy link

danya61 commented Jul 4, 2021

It became known that async / await mechanism is not backward deployed :( It's require new runtime. It's a pity

@NachoSoto
Copy link
Member Author

NachoSoto commented Jul 20, 2021

Edit: fixed implementation for RC:

I quickly prototyped this on Beta 3 and it works:

@available(iOS 15.0, *)
@available(tvOS 15.0, *)
@available(watchOS 8.0, *)
@available(macCatalystApplicationExtension 15.0, *)
extension SignalProducer {
    public func start() -> AsyncStream<Result<Value, Error>> {
        return AsyncStream(Result<Value, Error>.self) { continuation in
            let disposable = self.start { event in
                switch event {
                case let .value(value): continuation.yield(.success(value))
                case let .failed(error): continuation.yield(.failure(error))
                case .completed, .interrupted: continuation.finish()
                }
            }
            
            continuation.onTermination = { @Sendable _ in disposable.dispose() }
        }
    }
}

@available(iOS 15.0, *)
@available(tvOS 15.0, *)
@available(watchOS 6.0, *)
@available(macCatalystApplicationExtension 15.0, *)
func f() async {
	let p = SignalProducer<Int, Never>([1, 2, 3])

	for try await x in p.start() {
		print(x)
	}
}

Unfortunately we don't have a way to test these things right now until Quick/Quick#1084.

@NachoSoto
Copy link
Member Author

It became known that async / await mechanism is not backward deployed :( It's require new runtime. It's a pity

Luckily that's no longer true! :)

@danya61
Copy link

danya61 commented Sep 24, 2021

@NachoSoto who knows how long this process will take...

@NachoSoto
Copy link
Member Author

What process?

@mluisbrown
Copy link
Contributor

@NachoSoto I guess @danya61 is referring to this PR which I think is the last piece in back porting concurrency to older OSes. (looks like it will be iOS 13, macOS 10.15).

@danya61
Copy link

danya61 commented Sep 29, 2021

What process?

Back-deploy concurrency

@apps4everyone
Copy link
Contributor

maybe https://github.com/ReactiveX/RxSwift/releases/tag/6.5.0 can serve as an example / template ?

@jonasman
Copy link

Edit: fixed implementation for RC:

I quickly prototyped this on Beta 3 and it works:

@available(iOS 15.0, *)
@available(tvOS 15.0, *)
@available(watchOS 8.0, *)
@available(macCatalystApplicationExtension 15.0, *)
extension SignalProducer {
    public func start() -> AsyncStream<Result<Value, Error>> {
        return AsyncStream(Result<Value, Error>.self) { continuation in
            let disposable = self.start { event in
                switch event {
                case let .value(value): continuation.yield(.success(value))
                case let .failed(error): continuation.yield(.failure(error))
                case .completed, .interrupted: continuation.finish()
                }
            }
            
            continuation.onTermination = { @Sendable _ in disposable.dispose() }
        }
    }
}

@available(iOS 15.0, *)
@available(tvOS 15.0, *)
@available(watchOS 6.0, *)
@available(macCatalystApplicationExtension 15.0, *)
func f() async {
	let p = SignalProducer<Int, Never>([1, 2, 3])

	for try await x in p.start() {
		print(x)
	}
}

Unfortunately we don't have a way to test these things right now until Quick/Quick#1084.

What about using AsyncThrowingStream to handle the errors without using Result?

@NachoSoto
Copy link
Member Author

We would use the Error type information.

@DarkoDamjanovic
Copy link

DarkoDamjanovic commented Jun 30, 2022

Hi!

@andersio
Let me first say: thank you very much for your great work! I learned the reactive paradigm mostly by using ReactiveSwift and enjoyed it a lot.

But in our team we have concerns mixing async/await with ReactiveSwift. According to this WWDC talk it's unsafe to use locking primitives like semaphores in the context of async continuations. I can see usage of DispatchSemaphore, PthreadLock, etc... in ReactiveSwift sources. Do you consider this a possible issue?

Thanks in advance.

Br, Darko

Screen Shot 2022-06-30 at 09 11 08

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

7 participants