Skip to content

Commit

Permalink
feat: Use AsyncIterator wherever possible instead of `AsyncGenerato…
Browse files Browse the repository at this point in the history
…r` (#131)

* feat: Use AsyncIterator wherever possible instead of AsyncGenerator

* version bump

* dependency updates
  • Loading branch information
felipecsl committed Jun 10, 2023
1 parent c41786f commit b86bf06
Show file tree
Hide file tree
Showing 6 changed files with 928 additions and 1,201 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "obgen",
"version": "0.3.0",
"version": "0.4.0",
"description": "Javascript Observables implemented with async generators",
"main": "dist/index.js",
"types": "dist/index.d.ts",
Expand Down
10 changes: 6 additions & 4 deletions src/asyncObservable.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {
import iteratorToIterable, {
asyncFilterIterator,
asyncMapIterator,
filterIterator,
Expand All @@ -14,9 +14,11 @@ import Observable from "./observable";
export default class AsyncObservable<T> extends Observable<T> {
private readonly buffer: BufferedIterator<T>;

constructor(generatorFn: () => AsyncGenerator<T>) {
super(generatorFn);
this.buffer = BufferedIterator.fromIterables(generatorFn());
constructor(iteratorFn: () => AsyncIterator<T>) {
super(iteratorFn);
this.buffer = BufferedIterator.fromIterables(
iteratorToIterable(iteratorFn)
);
}

/** Iterates over the items in this iterable, draining any previously buffered items */
Expand Down
29 changes: 11 additions & 18 deletions src/deferredObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import iteratorToIterable, {
asyncMapIterator,
filterIterator,
flatMapIterator,
iteratorToGenerator,
mapIterator,
takeIterator,
} from "./internal/util";
Expand All @@ -17,69 +16,63 @@ import Observable from "./observable";
export default class DeferredObservable<T> extends Observable<T> {
private _iterator: AsyncIterator<T> | null = null;

constructor(generatorFn: () => AsyncGenerator<T>) {
super(generatorFn);
constructor(iteratorFn: () => AsyncIterator<T>) {
super(iteratorFn);
}

override iterable(): AsyncIterable<T> {
this._iterator = this.generatorFn();
this._iterator = this.iteratorFn();
const { _iterator } = this;
return iteratorToIterable(() => _iterator);
}

override iterator(): AsyncIterator<T> {
this._iterator = this._iterator || this.generatorFn();
this._iterator = this._iterator || this.iteratorFn();
return this._iterator;
}

override asyncFilter(filterFn: (item: T) => Promise<boolean>): Observable<T> {
const self = this;
return new DeferredObservable(() =>
iteratorToGenerator(asyncFilterIterator(self.iterator(), filterFn))
asyncFilterIterator(self.iterator(), filterFn)
);
}

override asyncMap<O>(mapFn: (item: T) => Promise<O>): Observable<O> {
const self = this;
return new DeferredObservable(() =>
iteratorToGenerator(asyncMapIterator(self.iterator(), mapFn))
asyncMapIterator(self.iterator(), mapFn)
);
}

override filter(filterFn: (item: T) => boolean): Observable<T> {
const self = this;
return new DeferredObservable(() =>
iteratorToGenerator(filterIterator(self.iterator(), filterFn))
filterIterator(self.iterator(), filterFn)
);
}

override map<O>(mapFn: (item: T) => O): Observable<O> {
const self = this;
return new DeferredObservable(() =>
iteratorToGenerator(mapIterator(self.iterator(), mapFn))
);
return new DeferredObservable(() => mapIterator(self.iterator(), mapFn));
}

override flatMap<O>(mapFn: (item: T) => AsyncObservable<O>): Observable<O> {
const self = this;
return new DeferredObservable(() =>
iteratorToGenerator(flatMapIterator(self.iterator(), mapFn))
flatMapIterator(self.iterator(), mapFn)
);
}

override merge(other: Observable<T>): Observable<T> {
return new DeferredObservable(() =>
iteratorToGenerator(
BufferedIterator.fromIterables(this.iterable(), other.iterable())
)
BufferedIterator.fromIterables(this.iterable(), other.iterable())
);
}

override take(num: number): Observable<T> {
const self = this;
return new DeferredObservable(() =>
iteratorToGenerator(takeIterator(self.iterator(), num))
);
return new DeferredObservable(() => takeIterator(self.iterator(), num));
}

override async toArray(): Promise<T[]> {
Expand Down
4 changes: 2 additions & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,13 @@ export function asyncDefer<T>(promiseFn: () => Promise<T>): Observable<T> {
* is iterated over.
*/
export function deferredWrap<T>(
iteratorFn: () => AsyncGenerator<T>
iteratorFn: () => AsyncIterator<T>
): Observable<T> {
return new DeferredObservable(iteratorFn);
}

/** Returns a new `Observable` that immediately calls and buffers the provided `iteratorFn` function to emit events */
export function wrap<T>(iteratorFn: () => AsyncGenerator<T>): Observable<T> {
export function wrap<T>(iteratorFn: () => AsyncIterator<T>): Observable<T> {
return new AsyncObservable(iteratorFn);
}

Expand Down
2 changes: 1 addition & 1 deletion src/observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { GenericObserver } from "./genericObserver";
import { isObserver } from "./internal/util";

export default abstract class Observable<T> {
protected constructor(readonly generatorFn: () => AsyncGenerator<T>) {}
protected constructor(readonly iteratorFn: () => AsyncIterator<T>) {}

async subscribe(observer?: GenericObserver<T>): Promise<void> {
for await (const item of this.iterable()) {
Expand Down

0 comments on commit b86bf06

Please sign in to comment.