-
Notifications
You must be signed in to change notification settings - Fork 231
/
limit.js
120 lines (100 loc) · 2.97 KB
/
limit.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
import Stream from '../Stream'
import Pipe from '../sink/Pipe'
import PropagateTask from '../scheduler/PropagateTask'
import Map from '../fusion/Map'
/**
* Limit the rate of events by suppressing events that occur too often
* @param {Number} period time to suppress events
* @param {Stream} stream
* @returns {Stream}
*/
export function throttle (period, stream) {
return new Stream(throttleSource(period, stream.source))
}
function throttleSource (period, source) {
return source instanceof Map ? commuteMapThrottle(period, source)
: source instanceof Throttle ? fuseThrottle(period, source)
: new Throttle(period, source)
}
function commuteMapThrottle (period, source) {
return Map.create(source.f, throttleSource(period, source.source))
}
function fuseThrottle (period, source) {
return new Throttle(Math.max(period, source.period), source.source)
}
function Throttle (period, source) {
this.period = period
this.source = source
}
Throttle.prototype.run = function (sink, scheduler) {
return this.source.run(new ThrottleSink(this.period, sink), scheduler)
}
function ThrottleSink (period, sink) {
this.time = 0
this.period = period
this.sink = sink
}
ThrottleSink.prototype.event = function (t, x) {
if (t >= this.time) {
this.time = t + this.period
this.sink.event(t, x)
}
}
ThrottleSink.prototype.end = Pipe.prototype.end
ThrottleSink.prototype.error = Pipe.prototype.error
/**
* Wait for a burst of events to subside and emit only the last event in the burst
* @param {Number} period events occuring more frequently than this
* will be suppressed
* @param {Stream} stream stream to debounce
* @returns {Stream} new debounced stream
*/
export function debounce (period, stream) {
return new Stream(new Debounce(period, stream.source))
}
function Debounce (dt, source) {
this.dt = dt
this.source = source
}
Debounce.prototype.run = function (sink, scheduler) {
return new DebounceSink(this.dt, this.source, sink, scheduler)
}
function DebounceSink (dt, source, sink, scheduler) {
this.dt = dt
this.sink = sink
this.scheduler = scheduler
this.value = void 0
this.timer = null
this.disposable = source.run(this, scheduler)
}
DebounceSink.prototype.event = function (t, x) {
this._clearTimer()
this.value = x
this.timer = this.scheduler.delay(this.dt, PropagateTask.event(x, this.sink))
}
DebounceSink.prototype.end = function (t, x) {
if (this._clearTimer()) {
this.sink.event(t, this.value)
this.value = void 0
}
this.sink.end(t, x)
}
DebounceSink.prototype.error = function (t, x) {
this._clearTimer()
this.sink.error(t, x)
}
DebounceSink.prototype.dispose = function () {
this._clearTimer()
return this.disposable.dispose()
}
DebounceSink.prototype._clearTimer = function () {
if (this.timer === null) {
return false
}
this.timer.dispose()
this.timer = null
return true
}