Skip to content

Commit

Permalink
fix(debounce): Dispose the source disposable (#472)
Browse files Browse the repository at this point in the history
* Dispose the source disposable

When disposing a debounce stream we also need to dispose the source disposable

* add a test case for debounce dispose

* drain the debounced stream in the test

* fix(debounce): assign source disposable directly to debounce disposable

Suggested by mostjs/core#107

* fix linting issue
  • Loading branch information
ppoliani authored and briancavalier committed Aug 11, 2017
1 parent b9dc0e2 commit a8322e9
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 4 deletions.
6 changes: 2 additions & 4 deletions src/combinator/limit.js
Expand Up @@ -4,7 +4,6 @@

import Stream from '../Stream'
import Pipe from '../sink/Pipe'
import * as dispose from '../disposable/dispose'
import PropagateTask from '../scheduler/PropagateTask'
import Map from '../fusion/Map'

Expand Down Expand Up @@ -84,9 +83,7 @@ function DebounceSink (dt, source, sink, scheduler) {
this.scheduler = scheduler
this.value = void 0
this.timer = null

var sourceDisposable = source.run(this, scheduler)
this.disposable = dispose.all([this, sourceDisposable])
this.disposable = source.run(this, scheduler)
}

DebounceSink.prototype.event = function (t, x) {
Expand All @@ -110,6 +107,7 @@ DebounceSink.prototype.error = function (t, x) {

DebounceSink.prototype.dispose = function () {
this._clearTimer()
return this.disposable.dispose()
}

DebounceSink.prototype._clearTimer = function () {
Expand Down
13 changes: 13 additions & 0 deletions test/limit-test.js
Expand Up @@ -10,6 +10,9 @@ var take = require('../src/combinator/slice').take
var fromArray = require('../src/source/fromArray').fromArray
var core = require('../src/source/core')
var Map = require('../src/fusion/Map').default
var drain = require('../src/combinator/observe').drain
var Stream = require('../src/Stream').default
var FakeDisposeSource = require('./helper/FakeDisposeSource')

var empty = core.empty
var streamOf = core.of
Expand Down Expand Up @@ -95,6 +98,16 @@ describe('debounce', function () {
])
})
})

it('should dispose', function () {
var dispose = this.spy()
var s = new Stream(new FakeDisposeSource(dispose, streamOf(sentinel).source))
var debounced = limit.debounce(1, s)

return drain(debounced).then(function () {
expect(dispose).toHaveBeenCalledOnce()
})
})
})

describe('throttle', function () {
Expand Down

0 comments on commit a8322e9

Please sign in to comment.