Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

how to wait for async operations inside iterator of: Stream.reduce #707

Open
adrian-gierakowski opened this issue Aug 5, 2020 · 2 comments

Comments

@adrian-gierakowski
Copy link

When consuming a stream with .reduce, I'd like to be able to do some async operations inside the iterator/reducer function and have the stream wait for their completion before consuming next element.

Here's what I'm doing right now:

const H = require('highland')
const Promise = require('aigle')

const reducer = async (memoPromise, x) => {
  console.log('got x:', x)
  const memo = await memoPromise
  console.log('processing x:', x)
  return Promise.delay(10).then(() => memo + x)
}

H([1, 2, 3])
  .reduce(reducer, 0)
  .toPromise(Promise)
  .then(console.log)

However the stream is being consumed as fast as possible, without waiting for async operation inside the reducer to complete. The output of running the above is:

got x: 1
got x: 2
got x: 3
processing x: 1
processing x: 2
processing x: 3
6

while I'd like it to be:

got x: 1
processing x: 1
got x: 2
processing x: 2
got x: 3
processing x: 3
6
@richardscarrott
Copy link

You can use wrapAsync which converts a promise returning function (e.g. an async fn) into a stream which emits a single value, e.g.

const wrapAsync = (
  fn
) => {
  return (...args) => {
    let promise;
    try {
      promise = fn.apply(_, args);
      if (!_.isObject(promise) || !_.isFunction(promise.then)) {
        return _.fromError(
          new Error('Wrapped function did not return a promise')
        );
      }
      return _(promise);
    } catch (e) {
      return _.fromError(e);
    }
  };
};
_([1, 2, 3])
  .flatMap(wrapAsync(async (val) => {
     await sleep(4000);
     return val * 2;
  }))
  .toArray((result) => console.log('DONE', result));

@adrian-gierakowski
Copy link
Author

@richardscarrott thanks for the tip. I am already using a similar approach with flatMap, however I'd like to use the same technique with reduce so that I can pass some state along as I'm processing individual elements. I know I can achieve a similar result by using flatMap with some mutable state in the outer scope but I'd prefer to avoid it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants