-
Notifications
You must be signed in to change notification settings - Fork 0
/
EventSourceGenerator.ts
61 lines (49 loc) · 2.52 KB
/
EventSourceGenerator.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
class EventSourceGenerator<T>
implements AsyncIterator<T, number, void> {
/** Create an async iteration of events.
* @param source Event source to generate an iterator over. */
constructor(private source: EventSource, private event: string) {
this.running = true;
}
private running: boolean;
private resolveCurrent: (e: MessageEvent<T>) => void;
private messageCount: number = 0;
/** Stop listening for further events.
* Sets flag so that next() will return done as true, and resolves and unsubscribes the current event.
* This tidies up if the loop is exited early with break or return.
* @returns {any} Object with done: true, always. */
return(): Promise<IteratorResult<T, number>> {
this.running = false;
// If set resolve the current promise
if(this.resolveCurrent)
this.resolveCurrent(undefined);
return { done: true, value: this.messageCount } as any; // really IteratorReturnResult<number>;
}
/** Return a promise that resolves when the next event fires, or when stop is called.
* Called each time for-await-of iterates as long as done is false.
* @returns Resolves when the next event fires, or when stop is called. */
next(): Promise<IteratorResult<T, number>> {
// Each time this is called we await a promise resolving on the next event
return new Promise((r,x) => {
if(this.resolveCurrent)
x(new Error('Cannot iterate to the next promise while previous is still active.'));
// If not running then resolve any current event and return done
if(!this.running)
return r({ done: true, value: undefined });
// Hold on to the resolution event so we can stop it from outside the loop
this.resolveCurrent = e => {
// Always stop listening, though once: true will handle this normally
this.source.removeEventListener(this.event, this.resolveCurrent);
// Resolve the promise
if(this.running)
r({value: e.data, done: false });
// This keeps a reference to the expired promise hanging around, so null it once done
this.resolveCurrent = null;
this.messageCount++;
};
this.source.addEventListener(this.event, this.resolveCurrent, { once: true, passive: true });
});
}
/** Iterate the event asynchronously */
[Symbol.asyncIterator]() { return this; }
}