Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mergeAll doesn't wait every stream produces an item #137

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

pavel-khritonenko
Copy link

fixes #135

I haven't managed to fix joinWith method because of some type inference conflicts, so I decided to write an ad-hoc implementation of mergeAll first to review.

@pavel-khritonenko pavel-khritonenko changed the title mergeAll doesn't wait every stream produces an item [WIP] mergeAll doesn't wait every stream produces an item Nov 12, 2016
@pavel-khritonenko pavel-khritonenko changed the title [WIP] mergeAll doesn't wait every stream produces an item mergeAll doesn't wait every stream produces an item Nov 12, 2016
@pavel-khritonenko
Copy link
Author

Unfortunately, I don't know how to write more tests :-/ @polytypic, could you please review it when you have spare time?

@@ -1161,7 +1161,7 @@ module Stream =
val ambAll: Stream<#Stream<'x>> -> Stream<'x>

/// Joins all the streams together with `merge`.
val mergeAll: Stream<#Stream<'x>> -> Stream<'x>
val mergeAll: Stream<Stream<'x>> -> Stream<'x>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... This signature change should be unnecessary.

| Cons (x, xs) -> mergeAll' (merge combined x) xs :> _
|> memo

let mergeAll producer = mergeAll' nil producer
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After a quick look this seems like it should work correctly. It starts combined from nil, the identity element of merge, and only stops after both the combined and the producer stop.

<|> producer ^=> function
| Nil -> combined >>= function
| Nil -> nilj
| Cons (v, vs) -> consj v (mergeAll' vs nil)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the case that the producer ends, you should just be able to return combined.

| Nil -> producer >>= function
| Nil -> nilj
| Cons (x, xs) -> mergeAll' x xs :> _
| Cons (v, vs) -> consj v (mergeAll' vs producer)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The case when combined ends is like initial call to mergeAll (except for memoization).

| Nil -> nilj
| Cons (v, vs) -> consj v (mergeAll' vs nil)
| Cons (x, xs) -> mergeAll' (merge combined x) xs :> _
|> memo
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here could also just use <|>*.


values ^-> Choice1Of2
<|> timeOutMillis 100 ^-> Choice2Of2
|> run = Choice1Of2 (xs @ [ () ])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is good to have tests!

It might be possible to make tests more deterministic, though I've also used non-deterministic global timer based tests myself.

@polytypic
Copy link
Member

polytypic commented Nov 15, 2016

Thanks for the PR! The tests are welcome and mergeAll looks like it should work.

I checked blame and recalled this fairly recent change I made:

ac202a2

That commit broke mergeAll and mergeMap, but I still don't fully understand why. I have a hunch that there might be some important properties that make switch and merge and possibly amb and append to require different joinWith and mapJoin implementations.

The reasons in favour of that change are not very strong. As said in the commit message, and as far as I can see, it only fixes a space leak in a degenerate use case of switchMap (and switchAll).

Perhaps it would be best to keep the new tests from this PR and revert the commit that broke mergeAll instead?

@pavel-khritonenko
Copy link
Author

pavel-khritonenko commented Nov 18, 2016

Perhaps it would be best to keep the new tests from this PR and revert the commit that broke mergeAll instead?

It's hard to say from my point of view, I just want working functionality. Having ad-hoc implementation for mergeAll it's not good, ofc.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Stream.mergeAll blocks until first element appeared in each stream
2 participants