Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream.mergeAll blocks until first element appeared in each stream #135

Open
pavel-khritonenko opened this issue Nov 2, 2016 · 4 comments · May be fixed by #137
Open

Stream.mergeAll blocks until first element appeared in each stream #135

pavel-khritonenko opened this issue Nov 2, 2016 · 4 comments · May be fixed by #137
Labels

Comments

@pavel-khritonenko
Copy link

pavel-khritonenko commented Nov 2, 2016

It seems there is bug in Stream.mergeAll method. In this example I create 5 channels with 3 jobs whose push work items to first 3 channels. Then I define indefinite Stream for each channel and merge them with Stream.mergeAll combinator.

In the result, I have only 3 first items taken from Stream and then the application is blocked. There are two commented lines, and uncommenting any of these lines (with commenting line below) fixes the problem. In the first comment I send the single message to each "idle" channel, in the second comment I use Seq.fold over the sequence of streams instead of using ofSeq and mergeAll combinators.

    open Hopac
    open Hopac.Infixes
    open System
    open System.Threading
    
    let ctrlC = 
        let ch = Ch()
        Console.CancelKeyPress.Add(ignore >> Ch.give ch >> run)
        ch
    
    let inline (^) a b = a b
    
    let mutable produceCounter = 0
    
    let startProducer number = 
        let ch = Ch()
    
        if number < 4 then
            Job.foreverServer (timeOutMillis 100 >>= fun _ -> 
                                   let dt = DateTime.UtcNow
                                   let counter = Interlocked.Increment &produceCounter
                                   Ch.give ch (counter, number, dt) >>-
                                        fun _ -> printfn "Pushed %O" (counter, number, dt))
            // else Ch.send ch (0, 0, DateTime.UtcNow) :> _
            else Job.result ()
        >>-. Ch.take ch
    
    [<EntryPoint>]
    let main argv = 
    
        let mutable index = 0
    
        run ^ job { 
                  let! producers = [1..5] |> Seq.map startProducer |> Job.conCollect
    
                  do! producers
                      |> Seq.map Stream.indefinitely
                      // |> Seq.fold (fun s n -> s |> Stream.merge n) Stream.never
                      |> Stream.ofSeq |> Stream.mergeAll
                      |> Stream.iterJob(fun (ind, _, _) -> job {
                            let number = Interlocked.Increment(&index)
                            printfn "Writing chunk with content: %d..." ind })
                      |> Job.start
                  do! ctrlC
                  return 0
              }
@pavel-khritonenko pavel-khritonenko changed the title Stream.mergeAll blocks until first message Stream.mergeAll blocks until first element appeared in every streams Nov 2, 2016
@pavel-khritonenko pavel-khritonenko changed the title Stream.mergeAll blocks until first element appeared in every streams Stream.mergeAll blocks until first element appeared in every stream Nov 2, 2016
@pavel-khritonenko pavel-khritonenko changed the title Stream.mergeAll blocks until first element appeared in every stream Stream.mergeAll blocks until first element appeared in each stream Nov 2, 2016
@polytypic
Copy link
Member

Hmm... This does look like a potential bug. Perhaps in Stream.joinWith. ATM, I'm a bit too busy with other projects to look into this in detail.

@polytypic polytypic added the bug label Nov 5, 2016
@pavel-khritonenko
Copy link
Author

pavel-khritonenko commented Nov 11, 2016

@polytypic, I almost fixed the issue but I had a question during the writing tests. I have infinite streams in my case so I've never asked myself about it. What is the expected result from the merge of Stream.once () and Stream.never? I expect the result is Stream.once () but I receive result similar Stream.append (Stream.once ()) Stream.never (the stream with only one () value that never returns Nil)

@polytypic
Copy link
Member

I think that is correct. Consider the difference between nil and never. More to the point, never is not the identity element of merge.

@pavel-khritonenko
Copy link
Author

pavel-khritonenko commented Nov 11, 2016

So merge waits all streams are finished, ok. Thanks for comment, will send PR soon.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants