Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream.buffer with TimeSpan combinator #132

Open
pavel-khritonenko opened this issue Oct 10, 2016 · 9 comments
Open

Stream.buffer with TimeSpan combinator #132

pavel-khritonenko opened this issue Oct 10, 2016 · 9 comments

Comments

@pavel-khritonenko
Copy link

pavel-khritonenko commented Oct 10, 2016

Hello.

I need buffer stream events not only by buffer size but by some TimeOut too.

For example - I need flush statistic to DB from external source by batches of size 10000 or every 30 seconds (if there are any entries). What is the best way to achieve it? I modified buffer function in this way and it even works however I don't like DateTime.UtcNow here. Is there more convient way?

    let inline consj x xs = Job.result (Cons (x, xs))
    let inline lastBuffer i b tail =
       if 0 < i then consj (Array.sub b 0 i) tail else upcast tail
    let rec buffer' i (b: array<_>) (started: DateTime option) timeout xs =
      Job.tryIn xs
       <| function Cons (x, xs) ->
                   b.[i] <- x
                   let i = i + 1
                   match i, started with
                   | 1, _ ->
                        buffer' i b (Some DateTime.UtcNow) timeout xs
                   | i, Some started when i < b.Length && DateTime.UtcNow - started < timeout ->
                        buffer' i b (Some started) timeout xs
                   | _ -> consj (Array.sub b 0 i) (buffer' 0 (Array.zeroCreate b.Length) None timeout xs |> memo)
                 | Nil -> lastBuffer i b nil
       <| fun e -> lastBuffer i b (error e)
    let bufferTs (n: int) timeOut (xs: Stream<_>) : Stream<_> =
      if n < 1 then failwith "buffer: n < 1" else
      buffer' 0 (Array.zeroCreate n) None timeOut xs |> memo
@polytypic
Copy link
Member

polytypic commented Oct 11, 2016

Hi, I'm very busy this week, so not sure when I have time to look into this in full detail. After a quick look I believe you are right to be a bit suspicious of the use of UtcNow. The problem is that the explicitly obtained time is only examined after receiving an element. If there are no elements in a long time, then the buffer is not flushed. (This might not happen in your particular case.) To fix that you need to move the time to be a choice with the attempt to take an element from the source stream, something like Alt.tryIn xs ... <|> bufferTimeBudget ^=> .... To measure a time over multiple choices you can memoize a timeout ( memo timeOut -> bufferTimeBudget ) at the beginning of each new buffer.

polytypic added a commit that referenced this issue Oct 13, 2016
@polytypic
Copy link
Member

See the pull request #133 for a draft version of bufferTime. It has not been tested. If someone has time, adding a basic test or two (and fixing any bugs) for it would be nice.

I spent some time drafting code for this and it seems that there are a number of things time based buffer should handle properly. First of all the timeout should be respected as mentioned in my previous comment. Also, if there are no elements, then no timeout should be started to avoid busy-waiting (start timeout -> nothing received -> start timeout ...). There are potentially many ways to handle this. The approach I chose is to start the timeout when the first element is received. Third issue is that, unlike with plain buffer, the number of elements in each buffer depends on the timeout and the numerical limit. So, allocating a (potentially big) array for the buffer is not a good idea in general.

@pavel-khritonenko
Copy link
Author

Thanks! Will test it today.

@pavel-khritonenko
Copy link
Author

This code works well, will think how to write tests.

@pavel-khritonenko
Copy link
Author

pavel-khritonenko commented Oct 20, 2016

I don't know if it better to open new issue or comment here.

I have taken a look the code of keepPrecending* and keepFollowing* functions. I spent half day with trying to implement methods like cacheBlocking : int -> Stream<'t> -> Stream<'t> and keepFollowing : int -> Stream<'t> -> Stream<'t> and didn't manage to success. I've only implemented them using imperative "workers" run by Hopac.start and Job.foreverServer with intermediate BoundedMb however I don't think it's a proper way to achieve what I want.

First one is about implementing blocking back-pressure for streams. I mean it should convert this stream to "eager" one that consumes source stream into the limited cache and blocks when cache reaches the limits. A second one is pretty same except one thing - it discards the elements using specified handler 't -> #Job<unit>

The use case: an application receives the data from 3rd party server and stores it into a database, for example. And there are two possible behaviors. It stops pulling the data from the source when the database is unreachable or unresponsive, or discards oldest samples of data with a proper handler (it could store it on the disc and I need to handle discarded files to delete them properly).

It's very likely that I want something wrong or I am trying to abuse wrong primitives.

@polytypic
Copy link
Member

First of all I'd like to just say that some of the stream combinators have fairly subtle implementations. Indeed, it has not been a priori obvious (to me) that some of the combinators are even possible. The implementation of (sequential) lazy streams is subtle business already. Adding concurrency and non-deterministic choice to the mix opens a whole new dimension of subtle.

The stream buffering combinators you describe sound fairly straight-forward, but after reading the use case, I'm not sure it needs a stream combinator. Does the use case require a stream transformer (Stream<'x> -> Stream<'y>)? Could it just be a stream consumer (Stream<'x> -> Job<'y>)? If so, then would it just be enough to write a tail recursive loop that pulls elements from the source when it wants, performs the buffering and storage on elements as it sees fit, without producing an output stream?

@pavel-khritonenko
Copy link
Author

pavel-khritonenko commented Oct 21, 2016

The simple usecase: The worker gets data from a remote source (RabbitMQ), bulks it by chunk of 100k entries (or each 30sec), stores it on disk and then try to push it into database. Database is not available sometime however if it works it can handle stored amount of data.

So the code is quite simple:

    let flushTimeout = timeOut <| TimeSpan.FromSeconds 30.

    let storeToDisk (chunk, ack) =
        var file = saveToFile chunk
        ack ()
        chunk, File.Delete file

    let reportDiscard = notImpl ""

    let liveStream =
        subscribe sources
        |> Stream.map (Stream.bufferTime 100000 flushTimeout)
        |> Stream.mergeAll
        |> Stream.mapFun (fun chunks -> ResizeArray.map fst chunks, snd chunks.[chunks.Length - 1])
        |> Stream.mapJob (fun (chunk, ack) -> storeToDisk chunk)

    let! savedChunks = readSavedChunks ()

    do! savedChunks
        |> Stream.append liveStream
        |> Stream.discardingCache 10000 reportDiscard // or blockingCache 
        |> Stream.iterJob (storeToDataBase >=> deleteTempFile)

I would prefer to have the functional like Stream.mapPartition that splits Stream to two another stream in someway, but having function handler it's ok.

It has been implemented in more imperative way with channels, mailboxes and workers and I'm looking for simplifying the code using Stream combinators.

@pavel-khritonenko
Copy link
Author

pavel-khritonenko commented Oct 22, 2016

And this is naive implementation:

    let blockingCache capacity (xs: Stream<'t>) : Stream<'t>  =
        if capacity < 1 then failwith "Capacity < 1" else
        let mb = BoundedMb capacity
        Job.iterateServer xs (fun xs ->
            xs ^=>
                function
                | Nil -> BoundedMb.put mb None >>=. Alt.never ()
                | Cons (x, xs) ->  BoundedMb.put mb (Some x) >>-. xs)
        >>= Stream.unfoldJob (fun _ -> BoundedMb.take mb >>- Option.map (fun x -> x, ()))
        |> memo

    let discardingCache capacity dj (xs: Stream<'t>) : Stream<'t>  =
        if capacity < 2 then failwith "Capacity < 2" else
        let mb = BoundedMb capacity
        let ch = Ch()

        Job.iterateServer (0, xs) (fun (count, xs) ->
            if count >= capacity then BoundedMb.take mb >>= fun x -> dj (Option.get x) >>-. (count - 1, xs)
            else 
                Alt.choose [ xs ^=> function
                                    | Cons (x, xs) -> BoundedMb.put mb (Some x) >>-. (count + 1, xs)
                                    | Nil -> BoundedMb.put mb None >>= Alt.never
                             (Ch.take ch ^=>  (fun reply -> Job.start (BoundedMb.take mb >>= IVar.fill reply)))
                                         ^->. (count - 1, xs) ] :> _)


        >>= Stream.unfoldJob (fun _ -> job { let reply = IVar()
                                             do! Ch.send ch reply
                                             let! consOrNil = IVar.read reply
                                             return Option.map (fun x -> x, ()) consOrNil })

        |> memo

@haf
Copy link
Member

haf commented Oct 22, 2016

You may also be interested in RingBuffer @pavelhritonenko

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants