Add toAsyncItetator support. #6332
Replies: 10 comments
-
Related: the notes from the last core-team meeting - see the last item. |
Beta Was this translation helpful? Give feedback.
-
@cartant The notes provided me with the idea for this. I should add an option that handles the buffer issue. |
Beta Was this translation helpful? Give feedback.
-
@SanderElias I'm with you on allowing an imperative style for observables. I have a question though.. In your implementation |
Beta Was this translation helpful? Give feedback.
-
@somombo Have a closer look at my sample. I'm using a takeUntil to do exactly what you are asking for. |
Beta Was this translation helpful? Give feedback.
-
oh sorry I completely had misunderstood the roll of the |
Beta Was this translation helpful? Give feedback.
-
okay so with this understanding in mind, I propose that the api for this functionality look something like this: interface Observable {
/* ... */
toAsyncIterator( destroyer?: Observable<any> )
/* ... */
} Note that the method |
Beta Was this translation helpful? Give feedback.
-
Hi, what about adding for await (const result of anObservable$) {
// Do some stuff
} |
Beta Was this translation helpful? Give feedback.
-
are there any plans landing this in 7.0.0 given that #1624 is there already? |
Beta Was this translation helpful? Give feedback.
-
@alisahinozcelik: how about something like this? import { Observable, Subscriber } from "rxjs";
export interface AsyncIterableObservable<T> extends Observable<T>, AsyncIterable<T> {}
(Observable.prototype as AsyncIterableObservable<any>)[Symbol.asyncIterator] = async function*<T>() {
let resolve: (value: { value: T | undefined, done: boolean } | PromiseLike<{ value: T | undefined, done: boolean}> | undefined) => void;
let reject: ((reason: any) => void);
const setPromise = () =>
new Promise<{ value: T | undefined, done: boolean}>((_resolve, _reject) => {
resolve = _resolve;
reject = _reject;
});
let lastPromise = setPromise();
const subscription = this.subscribe({
next: emitted => {
resolve({ value: emitted, done: false })
lastPromise = setPromise()
},
error: err => reject(err),
complete: () => resolve({ value: undefined, done: true })
});
// Still have to decide what to to with the error case
// for now, it will be forwarded to the consumer.
// (they need to implement a try/catch)
try {
while (true) {
const result = await lastPromise;
if (result.done) {
return;
}
yield result.value as T
}
} finally {
subscription.unsubscribe()
}
} But please not this will not work for observables that resolve synchronously, so the next thing requires async function obsTest(){
const obs = from([1, 2, 3, 4]).pipe(
observeOn(asyncScheduler)) as AsyncIterableObservable<number>
for await (const result of obs){
console.log(result)
}
console.log("ended")
} Also, moving I wonder how do you guys think this should be handled? It is supposed to buffer or something? |
Beta Was this translation helpful? Give feedback.
-
Had issues with teardown when the above is used in subscriptions-transport-ws, revamped the above with imperative implementation: import { Observable, Subscriber, Notification, NotificationKind } from "rxjs";
import { materialize } from "rxjs/operators";
export interface AsyncIterableObservable<T> extends Observable<T>, AsyncIterable<T> {}
(Observable.prototype as AsyncIterableObservable<any>)[Symbol.asyncIterator] = function<T>(){
type RetainedPromiseFuncs = {
resolve: (value: IteratorResult<T>) => void,
reject: (reason?: any) => void
};
const pullQueue: RetainedPromiseFuncs[] = [];
const pushQueue: Notification<T>[] = [];
const resolveEmission = (promiseFnPair: RetainedPromiseFuncs, emission: Notification<T>) => {
const { resolve, reject } = promiseFnPair;
switch(emission.kind){
case NotificationKind.NEXT: return resolve({ value: emission.value as T, done: false })
case NotificationKind.ERROR: return reject(emission.error)
case NotificationKind.COMPLETE: return resolve({ value: undefined, done: true })
}
};
const subscription = this.pipe(
materialize()
).subscribe((emission: Notification<T>) => {
const promiseFns = pullQueue.shift()
if(promiseFns){
resolveEmission(promiseFns, emission)
} else {
pushQueue.push(emission)
}
});
const pull: () => Promise<IteratorResult<T>> = () => new Promise(
(resolve, reject) => {
const emission = pushQueue.shift()
if(emission){
resolveEmission({ resolve, reject }, emission)
} else {
pullQueue.push({ resolve, reject })
}
}
)
const teardown = () => {
if(subscription.closed){
return
}
subscription.unsubscribe()
pullQueue.forEach(({ resolve, reject }) => resolve({ value: undefined, done: true }))
pullQueue.length = 0
pushQueue.length = 0
}
return {
next(){
return subscription.closed
? (this.return as (() => Promise<IteratorReturnResult<void>>))()
: pull()
},
return() {
teardown();
return Promise.resolve({ value: undefined, done: true });
},
throw(error: any) {
teardown();
return Promise.reject(error)
},
[Symbol.asyncIterator]() {
return this;
}
}
} used originally with: export function fromAsyncIterator<T>(input: AsyncIterable<T>): AsyncIterableObservable<T> {
let ai = input[Symbol.asyncIterator]();
const getNextValue: (subscriber: Subscriber<T>) => Promise<void> = subscriber => ai.next().then(
result => {
if(result.done){
subscriber.complete()
} else {
subscriber.next(result.value)
getNextValue(subscriber)
}},
err => subscriber.error(err));
return new Observable(subscriber => {
getNextValue(subscriber);
return () => ai.return ? ai.return() : undefined
}) as AsyncIterableObservable<T>
} Let me know if any thoughts. |
Beta Was this translation helpful? Give feedback.
-
Feature Request
Add a
.toAsyncIterator()
method to rxjs. This will make the interop with es20018 better, and also adds an easy way to consume observables in a more readable form.Describe the solution you'd like
adding this to the prototype of rxjs, makes it easier to consume observables as a async iterator.
Here is the function I used in the above sample.
Describe alternatives you've considered
The alternative is keep the function to myself. or perhaps add it to rxjs as an utility function.
Additional context
Supporting async itterators makes it possible to consume observables in a more imparative way. That way it observables more aproachable for developers that have less of a functional background.
Beta Was this translation helpful? Give feedback.
All reactions