-
Notifications
You must be signed in to change notification settings - Fork 468
/
createEpicMiddleware.ts
119 lines (104 loc) · 3.95 KB
/
createEpicMiddleware.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import type { Dispatch, Middleware, MiddlewareAPI } from 'redux';
import { Subject, from, queueScheduler } from 'rxjs';
import { map, mergeMap, observeOn, subscribeOn } from 'rxjs/operators';
import { StateObservable } from './StateObservable';
import type { Epic } from './epic';
import { warn } from './utils/console';
interface Options<D = any> {
dependencies?: D;
}
export interface EpicMiddleware<
Input = unknown,
Output extends Input = Input,
State = void,
Dependencies = any
// eslint-disable-next-line @typescript-eslint/ban-types
> extends Middleware<{}, State> {
run(rootEpic: Epic<Input, Output, State, Dependencies>): void;
}
export function createEpicMiddleware<
Input = unknown,
Output extends Input = Input,
State = void,
Dependencies = any
>(
options: Options<Dependencies> = {}
): EpicMiddleware<Input, Output, State, Dependencies> {
// This isn't great. RxJS doesn't publicly export the constructor for
// QueueScheduler nor QueueAction, so we reach in. We need to do this because
// we don't want our internal queuing mechanism to be on the same queue as any
// other RxJS code outside of redux-observable internals.
const QueueScheduler: any = queueScheduler.constructor;
const uniqueQueueScheduler: typeof queueScheduler = new QueueScheduler(
(queueScheduler as any).schedulerActionCtor
);
if (process.env.NODE_ENV !== 'production' && typeof options === 'function') {
throw new TypeError(
'Providing your root Epic to `createEpicMiddleware(rootEpic)` is no longer supported, instead use `epicMiddleware.run(rootEpic)`\n\nLearn more: https://redux-observable.js.org/MIGRATION.html#setting-up-the-middleware'
);
}
const epic$ = new Subject<Epic<Input, Output, State, Dependencies>>();
let store: MiddlewareAPI<Dispatch<any>, State>;
const epicMiddleware: EpicMiddleware<Input, Output, State, Dependencies> = (
_store
) => {
if (process.env.NODE_ENV !== 'production' && store) {
// https://github.com/redux-observable/redux-observable/issues/389
warn(
'this middleware is already associated with a store. createEpicMiddleware should be called for every store.\n\nLearn more: https://goo.gl/2GQ7Da'
);
}
store = _store;
const actionSubject$ = new Subject<Input>();
const stateSubject$ = new Subject<State>();
const action$ = actionSubject$
.asObservable()
.pipe(observeOn(uniqueQueueScheduler));
const state$ = new StateObservable(
stateSubject$.pipe(observeOn(uniqueQueueScheduler)),
store.getState()
);
const result$ = epic$.pipe(
map((epic) => {
const output$ = epic(action$, state$, options.dependencies!);
if (!output$) {
throw new TypeError(
`Your root Epic "${
epic.name || '<anonymous>'
}" does not return a stream. Double check you\'re not missing a return statement!`
);
}
return output$;
}),
mergeMap((output$) =>
from(output$).pipe(
subscribeOn(uniqueQueueScheduler),
observeOn(uniqueQueueScheduler)
)
)
);
result$.subscribe(store.dispatch);
return (next) => {
return (action) => {
// Downstream middleware gets the action first,
// which includes their reducers, so state is
// updated before epics receive the action
const result = next(action);
// It's important to update the state$ before we emit
// the action because otherwise it would be stale
stateSubject$.next(store.getState());
actionSubject$.next(action as Input);
return result;
};
};
};
epicMiddleware.run = (rootEpic) => {
if (process.env.NODE_ENV !== 'production' && !store) {
warn(
'epicMiddleware.run(rootEpic) called before the middleware has been setup by redux. Provide the epicMiddleware instance to createStore() first.'
);
}
epic$.next(rootEpic);
};
return epicMiddleware;
}