Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How is ResilientStream supposed to work in the context of RMQ? #893

Open
poohsen opened this issue Oct 19, 2023 · 2 comments
Open

How is ResilientStream supposed to work in the context of RMQ? #893

poohsen opened this issue Oct 19, 2023 · 2 comments

Comments

@poohsen
Copy link
Contributor

poohsen commented Oct 19, 2023

I have a test program that consumes from a queue and then throws an exception. It does not Ack/Nack/Reject anything explicitly.

When I wrap it in ResilientStream.run and publish a message to the queue I can see that the program is restarted once. The msg in the queue becomes "unacked". When I terminate the program the message becomes "ready" in the queue and just stays there. The program does not seem to pull it again after the restart delay.

Is this the expected behaviour? My expectation was that with each restart it would also pull a message from RMQ. Otherwise what's the point of restarting it if it won't be doing anything anymore? Can this behaviour be achieved somehow?

Copy link

stale bot commented Mar 17, 2024

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

@stale stale bot added the wontfix label Mar 17, 2024
@geirolz
Copy link

geirolz commented May 3, 2024

It depends on how you construct your program I believe.
Restarting just the consumer stream from createAckerConsumer means re-using the same connection and the same channel. The broker is still awaiting an ack from that channel.

If you what to use ResilientStream your stream should also spawn and release the channel I think.
I believe restarting the channel would be enough, if not include also the creation of the connection under the resilient stream.

Try something like this:

    val config: Fs2RabbitConfig = ???
    val run: IO[Nothing] = RabbitClient
      .default[IO](config)
      .resource
      .evalMap { connection =>
        ResilientStream.run {
          for {
            given AMQPChannel <- fs2.Stream.resource(connection.createConnectionChannel)
            (acker, stream) <- fs2.Stream.eval(connection.createAckerConsumer(QueueName("queue_name")))
            program <- stream.evalMap(msg => IO(println(msg)) >> acker(Ack(msg.deliveryTag)))
          } yield program
        }
      }
      .useForever

Let me know if this works and how ( just channel ? )

@stale stale bot removed the wontfix label May 3, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants