Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

On error resume #2

Open
Swoorup opened this issue May 6, 2020 · 5 comments · May be fixed by #3
Open

On error resume #2

Swoorup opened this issue May 6, 2020 · 5 comments · May be fixed by #3

Comments

@Swoorup
Copy link

Swoorup commented May 6, 2020

Currently using pollStreamForever like,

let stream = pollStreamForever db streamKey position PollOptions.Default
stream |> Observable.retry // On disconnection and reconnection will repeat all the elements from first given position

Is it possible to resume where it left off on Error?

@Swoorup

This comment has been minimized.

@Swoorup
Copy link
Author

Swoorup commented May 6, 2020

Other option would be to simply use OnNext instead of OnError for connection issues and wrap inside an F# result

module temp.RedisRx

open System
open System.Threading
open System.Reactive.Linq
open System.Reactive
open System.Threading.Tasks
open FSharp.Control.Reactive
open FSharp.Control.Redis.Streams.Core
open FSharp.Control.Tasks
open StackExchange.Redis

module Observable =
  let internal taskUnfold (fn: 's -> CancellationToken -> Task<('s * 'e) option>) (state: 's) =
    Observable.Create(fun (obs: IObserver<_>) ->
      let cts = new CancellationTokenSource()
      let ct = cts.Token
      task {
        let mutable innerState = state
        let mutable isFinished = false
        try
            while not ct.IsCancellationRequested || not isFinished do
              try 
                let! result = fn innerState ct
                match result with
                | Some (newState, output) ->
                    innerState <- newState
                    obs.OnNext <| Ok output
                | None ->
                    isFinished <- true
              with e ->
                obs.OnNext <| Error e
            obs.OnCompleted()
        finally
          cts.Dispose()
      }
      |> ignore

      new Disposables.CancellationDisposable(cts) :> IDisposable)

  let flattenArray (observable : IObservable<Result<StreamEntry [],exn>>) =
      observable.SelectMany (fun x ->
          match x with
          | Ok o -> o |> Array.map (Ok) :> seq<_>
          | Error e -> [Error e] :> seq<_>)


let pollStreamForever (redisdb : IDatabase) (streamName : RedisKey) (startingPosition : RedisValue) (pollOptions : PollOptions) =
    Observable.taskUnfold (fun (nextPosition, pollDelay) ct -> task {
        let! (response : StreamEntry []) = redisdb.StreamRangeAsync(streamName, minId = Nullable(nextPosition), count = (Option.toNullable pollOptions.CountToPullATime))
        match response with
        | EmptyArray ->
            let nextPollDelay = pollOptions.CalculateNextPollDelay pollDelay
            do! Task.Delay pollDelay
            return Some ((nextPosition, nextPollDelay ) , Array.empty )
        | entries ->
            let lastEntry = Seq.last entries
            let nextPosition = EntryId.CalculateNextPositionIncr lastEntry.Id
            let nextPollDelay = TimeSpan.Zero
            return Some ((nextPosition, nextPollDelay), entries )

    }) (startingPosition, TimeSpan.Zero)
    |> Observable.flattenArray

@TheAngryByrd
Copy link
Owner

We discussed this in slack, creating a new function like pollStreamForeverSafe that returns a Result would be the best fix here.

Could you open a PR for this, also adding this function for the Hopac and Akka Streams implementations?

@Swoorup
Copy link
Author

Swoorup commented May 6, 2020

Slack convo:

sytherax  25 minutes ago
I did try hacking around code with consumer groups. In my case, each server instance can pop on and off. And each of them has to read stream individually. So put them across different consumer groups and read only unacknowledged messages which appears to do the job. But it seems like I need to have a seperate code to clean off unused consumer groups.

theangrybyrd:penguin:  24 minutes ago
 But it seems like I need to have a seperate code to clean off unused consumer groups.

theangrybyrd:penguin:  24 minutes ago
yep, that’s why i havent released anything yet =\

sytherax  24 minutes ago
lol

sytherax  23 minutes ago
I find it bit confusing reading the protocol as to why you would manually sent acknowledgement though

theangrybyrd:penguin:  23 minutes ago
I had a similar problem to this when lining up a Process to an Rx Stream (stdout/stderr), the amount of people that use stderr for no good reason basically led me to your second option (had to use Choice back then)

sytherax  22 minutes ago
so i reverted to solution keeping stream position in the client itself.

sytherax  19 minutes ago
doesn’t it make sense though, pollStreamForever to return Result<_, exn> instead?

sytherax  19 minutes ago
you could probably filter the exception to very specific exception in the implementation of pollStreamForever

theangrybyrd:penguin:  18 minutes ago
In theory, no
In practice, probably

sytherax  18 minutes ago
or have the first try catch inside pollStreamForever instead

sytherax  18 minutes ago
https://netflixtechblog.com/android-rx-onerror-guidelines-e68e8dc7383f

MediumMedium
Android Rx onError Guidelines
By Ed Ballot
Reading time
4 min read
Oct 26th, 2019

theangrybyrd:penguin:  18 minutes ago
In theory the underlying stream tech should be able to accomodate for this kind of weird behavior.

sytherax  18 minutes ago
The divergent understanding is partially because the name "onError" is a bit misleading. The item emitted by onError() is not a simple error, but a throwable that can cause significant damage if not caught. Our app has a global handler that prevents it from crashing outright, but an uncaught exception can still leave parts of the app in an unpredictable state.

sytherax  16 minutes ago
exactly, it doesn’t make sense when the underlying stream has reconnect support.

theangrybyrd:penguin:  15 minutes ago
maybe we should just catch that specific exception and ignore it and assume it will retry the connect?

theangrybyrd:penguin:  14 minutes ago
as you said with having the try catch in there

theangrybyrd:penguin:  13 minutes ago
I could see being silent also being an annoying behavior =\

sytherax  13 minutes ago
I am bit torn in this, if silent since you could be disconnected forever.

theangrybyrd:penguin:  13 minutes ago
right

theangrybyrd:penguin:  12 minutes ago
I think the best answer for now is to add a second function like pollStreamForeverSafe where it returns the Result

sytherax  12 minutes ago
Yeah I agree

sytherax  11 minutes ago
Leaving it as a Result will let the users decide what to do with Error type, log it or just ignore it

theangrybyrd:penguin:  11 minutes ago
:thumbsup:

@Swoorup
Copy link
Author

Swoorup commented May 15, 2020

@TheAngryByrd I have created the pull request. 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants