You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I have a test program that consumes from a queue and then throws an exception. It does not Ack/Nack/Reject anything explicitly.
When I wrap it in ResilientStream.run and publish a message to the queue I can see that the program is restarted once. The msg in the queue becomes "unacked". When I terminate the program the message becomes "ready" in the queue and just stays there. The program does not seem to pull it again after the restart delay.
Is this the expected behaviour? My expectation was that with each restart it would also pull a message from RMQ. Otherwise what's the point of restarting it if it won't be doing anything anymore? Can this behaviour be achieved somehow?
The text was updated successfully, but these errors were encountered:
This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.
It depends on how you construct your program I believe.
Restarting just the consumer stream from createAckerConsumer means re-using the same connection and the same channel. The broker is still awaiting an ack from that channel.
If you what to use ResilientStream your stream should also spawn and release the channel I think.
I believe restarting the channel would be enough, if not include also the creation of the connection under the resilient stream.
Try something like this:
valconfig:Fs2RabbitConfig=???valrun:IO[Nothing] =RabbitClient
.default[IO](config)
.resource
.evalMap { connection =>ResilientStream.run {
for {
givenAMQPChannel<- fs2.Stream.resource(connection.createConnectionChannel)
(acker, stream) <- fs2.Stream.eval(connection.createAckerConsumer(QueueName("queue_name")))
program <- stream.evalMap(msg =>IO(println(msg)) >> acker(Ack(msg.deliveryTag)))
} yield program
}
}
.useForever
Let me know if this works and how ( just channel ? )
I have a test program that consumes from a queue and then throws an exception. It does not Ack/Nack/Reject anything explicitly.
When I wrap it in
ResilientStream.run
and publish a message to the queue I can see that the program is restarted once. The msg in the queue becomes "unacked". When I terminate the program the message becomes "ready" in the queue and just stays there. The program does not seem to pull it again after the restart delay.Is this the expected behaviour? My expectation was that with each restart it would also pull a message from RMQ. Otherwise what's the point of restarting it if it won't be doing anything anymore? Can this behaviour be achieved somehow?
The text was updated successfully, but these errors were encountered: