Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fromNodeStream #178

Open
aj0strow opened this issue Jan 13, 2016 · 5 comments
Open

fromNodeStream #178

aj0strow opened this issue Jan 13, 2016 · 5 comments

Comments

@aj0strow
Copy link
Contributor

Like fromNodeCallback and fromPromise.

It's nice to lift node streams into kefir streams. When a stream is in object mode, it's basically a kefir observable anyway with data, error, and end events.

Kefir.fromNodeStream = function (stream) {
  return Kefir.stream(function (emitter) {
    stream.on("data", emitter.emit)
    stream.on("error", emitter.error)
    stream.on("end", emitter.end)
  }
}

I'm lifting node database streams in a project, so maybe others want it too?

@rpominov
Copy link
Member

I didn't work with node streams yet (I do frontend mostly), but as far as I understand they are a bit different from event streams, because they represent streams of data. I'm not yet sure how well data streams and event streams fit together.

Rx has implementation of similar method (methods?) in https://github.com/Reactive-Extensions/rx-node , perhaps we could borrow ideas from there.

@aj0strow
Copy link
Contributor Author

Yeah it only makes sense for object mode:

https://nodejs.org/api/stream.html#stream_object_mode

For example lots of libs have object streams, like knex for postgres.

http://knexjs.org/#Streams-stream

It's very similar to fromEvents helper but it would map multiple events at once. What do you think of a generic event emitter helper, and then maybe a readable stream helper?

// interop for node event emitter

Kefir.fromEventEmitter(emitter, { value: "data", end: "end" })
observable.toEventEmitter({ value: "data", end: "finish" })

// maybe node stream helper

Kefir.fromReadableStream(nodeStream)

Or maybe just one or the other?

@hallettj
Copy link
Collaborator

I was just typing up a test case before I found this issue. For my use, the fromEvents would work nicely for streams - except that the Kefir stream does not end when the Node stream ends.

import * as Kefir from 'kefir'
import { Readable } from 'stream'

const nodeStream = new Readable()
const kefirStream = Kefir.fromEvents(nodeStream, 'data')

kefirStream.onValue(data => console.log('data', data))
kefirStream.onEnd(() => console.log('end (kefir callback)')) // <-- never runs

nodeStream.on('end', () => console.log('end (node callback)')) // <-- does run

nodeStream.push('some data', 'utf8')
nodeStream.push(null)  // signals end of stream

In my view, data from a stream is just an event with a Buffer value. It might be nice to have a helper to concatenate all buffer values from a stream; but it would also easy to use scan and last to do that, if the Kefir stream ended at the appropriate time.

I would like to request one of the following enhancements:

  • fromEvents treats an end event specially, and ends the resulting Kefir stream (there is already precedent for special handling for error events).
  • or, a new method that works just like fromEvents, but that treats an end event specially.

Edit: Oh, I see that @aj0strow already submitted a pull request that does just what I want! Please consider my comment to be a vote in favor of #179 .

@rpominov
Copy link
Member

Hm, I didn't realize fromEvents can be used to create stream from a node stream. That's cool. Making it end shouldn't be a problem, we can use takeUntilBy:

const kefirStream = Kefir.fromEvents(nodeStream, 'data')
  .takeUntilBy(Kefir.fromEvents(nodeStream, 'end'))

@hallettj will this work for you?

@hallettj
Copy link
Collaborator

Yes, that does look good. I had not thought of using takeUntilBy - thanks!
On Jan 26, 2016 3:49 PM, "Roman Pominov" notifications@github.com wrote:

Hm, I didn't realize fromEvents can be used to create stream from a node
stream. That's cool. Making it end shouldn't be a problem, we can use
takeUntilBy:

const kefirStream = Kefir.fromEvents(nodeStream, 'data')
.takeUntilBy(Kefir.fromEvents(nodeStream, 'end'))

@hallettj https://github.com/hallettj will this work for you?


Reply to this email directly or view it on GitHub
https://github.com/rpominov/kefir/issues/178#issuecomment-175296254.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants