Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handling prefetched messages upon lost connection? #26

Open
jraede opened this issue Apr 22, 2017 · 3 comments
Open

Handling prefetched messages upon lost connection? #26

jraede opened this issue Apr 22, 2017 · 3 comments

Comments

@jraede
Copy link

jraede commented Apr 22, 2017

Awesome package, this helps us a lot. Just one question that's come up while testing various edge cases:

If prefetch is set to > 1, then you could conceivably have the following scenario:

  1. Multiple messages are published and delivered to the consumer, pending processing
  2. Connection is lost (server blip, let's say). Cony gracefully handles reconnection, BUT
  3. The amqp.Messages in the deliveries channel reference a channel that is now CLOSED.

How would you suggest handling this case? If we remake the deliveries channel would that imply a NACK on the rabbit server?

@kron4eg
Copy link
Contributor

kron4eg commented Apr 23, 2017

@jraede hello and thank you for kind words!

It's not that simple how it works, although the problem with prefetch still can strike. Each Consumer has own chan amqp.Delivery which is not a reference to what amqp.#Channel.Consume() would return. Our Consumer copies messages from "per session" channel returned from amqp to own persistent chan amqp.Delivery. Proof: https://github.com/assembla/cony/blob/master/consumer.go#L92 .

Frankly, I don't know how to "fix" this. If you have ideas — please post them.

@jraede
Copy link
Author

jraede commented Apr 23, 2017

Assuming I'm correct and RabbitMQ will redeliver those messages to another channel (even the same consumer on the new reconnected channel), could we just remake the cony.Consumer.deliveries in consumer.serve? e.g.:

func (c *Consumer) serve(client mqDeleter, ch mqChannel) {
	if c.reportErr(ch.Qos(c.qos, 0, false)) {
		return
	}

	// If prefetch > 1, c.deliveries might have messages in it
	// that are associated with a closed RabbitMQ channel (if e.g.
	// there was a reconnection event). Rabbit will already deliver
	// those messages to a new channel so we need to remove
	// references to them here.
	c.deliveries = make(chan amqp.Delivery)

	deliveries, err2 := ch.Consume(c.q.Name,
		c.tag,       // consumer tag
		c.autoAck,   // autoAck,
		c.exclusive, // exclusive,
		c.noLocal,   // noLocal,
		false,       // noWait,
		nil,         // args Table
	)
	if c.reportErr(err2) {
		return
	}

	for {
		select {
		case <-c.stop:
			client.deleteConsumer(c)
			ch.Close()
			return
		case d, ok := <-deliveries: // deliveries will be closed once channel is closed (disconnected from network)
			if !ok {
				return
			}
			c.deliveries <- d
		}
	}
}

@kron4eg
Copy link
Contributor

kron4eg commented Apr 23, 2017

Assuming that user's code uses it like in basic example:

for cli.Loop() {
	select {
	case msg := <-cns.Deliveries():
		handleMessage(msg)
	}
}

meaning that user calls Consumer.Deliveries() every iteration to get a channel then yes, it's possible. But there is a problem :) Consumer.Deliveries() documentation says that:

Deliveries return deliveries shipped to this consumer this channel never closed, even on disconnects

Code like this will break.

message := cns.Deliveries()
for cli.Loop() {
	select {
	case msg := <-messages:
		handleMessage(msg)
	}
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants