Skip to content
This repository has been archived by the owner on Sep 11, 2022. It is now read-only.

Multiple server-named queues after auto_recovery #204

Open
tht opened this issue Sep 23, 2014 · 3 comments
Open

Multiple server-named queues after auto_recovery #204

tht opened this issue Sep 23, 2014 · 3 comments

Comments

@tht
Copy link

tht commented Sep 23, 2014

I'm using amqp-1.5 on Linux and the auto-recovery feature works great with named queues. But I experience some issues with server-named queues.

Steps to reproduce:

  • Open a single server-named queue and subscribe to it
  • Restart RabbitMQ server
  • amqp reconnects the server-named queue which gets a new name. queue.on_recovery gets called twice for the same queue. One queue is visible on RabbitMQ Management UI
  • Restart RabbitMQ server
  • amqp reconnects two server queues with two new names. queue.on_recovery gets called 4 times with the same queue. Two queues are visible on RabbitMQ Management UI.

On every RabbitMQ restart I get more and more calls to the queue.on_recovery callback and the number of queues on the server increases.

Closing the client removes all the server-named queues on RabbitMQ.

I used this test script:

require 'rubygems'
require 'amqp'

config = { :host => 'localhost', :username => 'guest', :password => 'guest' }

EventMachine.run do
  conn = AMQP.connect(@config) do |client|
    puts "Initial connect."

    conn.on_recovery do
      puts "Connection restored"
    end

    conn.on_tcp_connection_loss do |conn, settings|
      puts "[network failure] Trying to reconnect..."
      conn.reconnect(false, 2)
    end

    # Connection up, connect channel, use auto_recovery
    channel = AMQP::Channel.new(conn)
    channel.auto_recovery = true

    AMQP::Queue.new(channel, "", :auto_delete => true, :exclusive => true) do |queue, declare_ok|
      puts "#{queue.name} is ready to go. AMQP method: #{declare_ok.inspect}"

      queue.subscribe(:ack => false) do |header,data|
        puts "queue.subscribe called with message #{data}"
      end

      queue.on_recovery do |recQueue|
        puts "queue.on_recovery called for queue #{recQueue} named #{recQueue.name}"
      end
    end
  end
end

Output while running:

$ ruby reconnect_test.rb
Initial connect.
amq.gen-dXrBy45YS1hZD8ffThK_RA is ready to go. AMQP method: #<AMQ::Protocol::Queue::DeclareOk:0x7f6a0b4aff98 @queue="amq.gen-dXrBy45YS1hZD8ffThK_RA", @consumer_count=0, @message_count=0>

// RabbitMQ restarted here...

[network failure] Trying to reconnect...
[network failure] Trying to reconnect...
[network failure] Trying to reconnect...
[network failure] Trying to reconnect...
[network failure] Trying to reconnect...
[network failure] Trying to reconnect...
Connection restored
queue.on_recovery called for queue #<AMQP::Queue:0x7f9cd2c97428> named amq.gen-D7mIbnnR2jmyvnhKRb6r0g
queue.on_recovery called for queue #<AMQP::Queue:0x7f9cd2c97428> named amq.gen-bylghjGFgEPcLkoj8fIE8A

// RabbitMQ restarted here...

[network failure] Trying to reconnect...
[network failure] Trying to reconnect...
[network failure] Trying to reconnect...
[network failure] Trying to reconnect...
[network failure] Trying to reconnect...
[network failure] Trying to reconnect...
Connection restored
queue.on_recovery called for queue #<AMQP::Queue:0x7f9cd2c97428> named amq.gen-bylghjGFgEPcLkoj8fIE8A
queue.on_recovery called for queue #<AMQP::Queue:0x7f9cd2c97428> named amq.gen-bylghjGFgEPcLkoj8fIE8A
queue.on_recovery called for queue #<AMQP::Queue:0x7f9cd2c97428> named amq.gen-K4pZgJCMOXxzgeTVcZfywg
queue.on_recovery called for queue #<AMQP::Queue:0x7f9cd2c97428> named amq.gen-K4pZgJCMOXxzgeTVcZfywg

// RabbitMQ restarted here...

[network failure] Trying to reconnect...
[network failure] Trying to reconnect...
[network failure] Trying to reconnect...
[network failure] Trying to reconnect...
[network failure] Trying to reconnect...
[network failure] Trying to reconnect...
Connection restored
queue.on_recovery called for queue #<AMQP::Queue:0x7f9cd2c97428> named amq.gen-K4pZgJCMOXxzgeTVcZfywg
queue.on_recovery called for queue #<AMQP::Queue:0x7f9cd2c97428> named amq.gen-K4pZgJCMOXxzgeTVcZfywg
queue.on_recovery called for queue #<AMQP::Queue:0x7f9cd2c97428> named amq.gen-K4pZgJCMOXxzgeTVcZfywg
queue.on_recovery called for queue #<AMQP::Queue:0x7f9cd2c97428> named amq.gen-w3zp3WJlDS8fGLGYry3UEw
queue.on_recovery called for queue #<AMQP::Queue:0x7f9cd2c97428> named amq.gen-w3zp3WJlDS8fGLGYry3UEw
queue.on_recovery called for queue #<AMQP::Queue:0x7f9cd2c97428> named amq.gen-w3zp3WJlDS8fGLGYry3UEw

For me it looks like queue.on_recovery get's called for the old (now invalid) queue name and once for the new one. Additionally there's one more queue after every RabbitMQ restart. Maybe two different issues or just me doing something wrong...

Thanks,
Thomas

@michaelklishin
Copy link
Member

OK, thanks for the info about how to reproduce it. Your hypothesis makes sense, I'll take a look at it when I have time.

@tht
Copy link
Author

tht commented Sep 23, 2014

I think I know why the queue.on_recovery get's called twice. Checking the execution stack shows two distinct stacks.

The first call is from the session restore:

/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/callbacks.rb:51:in `call'
/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/callbacks.rb:51:in `exec_callback_yielding_self'
/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/callbacks.rb:51:in `each'
/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/callbacks.rb:51:in `exec_callback_yielding_self'
/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/queue.rb:1258:in `run_after_recovery_callbacks'
/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/channel.rb:1274:in `run_after_recovery_callbacks'
/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/channel.rb:1274:in `each'
/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/channel.rb:1274:in `run_after_recovery_callbacks'
/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/session.rb:836:in `run_after_recovery_callbacks'
/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/session.rb:836:in `each'
/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/session.rb:836:in `run_after_recovery_callbacks'
/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/session.rb:858:in `start_automatic_recovery'
/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/session.rb:531:in `call'
/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/session.rb:531:in `register_connection_callback'
/usr/lib64/ruby/gems/1.8/gems/eventmachine-1.0.3/lib/em/deferrable.rb:151:in `call'
/usr/lib64/ruby/gems/1.8/gems/eventmachine-1.0.3/lib/em/deferrable.rb:151:in `set_deferred_status'
/usr/lib64/ruby/gems/1.8/gems/eventmachine-1.0.3/lib/em/deferrable.rb:191:in `succeed'
/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/session.rb:682:in `connection_successful'
/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/session.rb:1040:in `handle_open_ok'
/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/session.rb:1136
/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/session.rb:959:in `call'
/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/session.rb:959:in `receive_frameset'
/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/session.rb:933:in `receive_frame'
/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/session.rb:671:in `receive_data'
/usr/lib64/ruby/gems/1.8/gems/eventmachine-1.0.3/lib/eventmachine.rb:187:in `run_machine'
/usr/lib64/ruby/gems/1.8/gems/eventmachine-1.0.3/lib/eventmachine.rb:187:in `run'
reconnect_test.rb:6

The second one from the queue restore:

/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/callbacks.rb:51:in `call'
/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/callbacks.rb:51:in `exec_callback_yielding_self'
/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/callbacks.rb:51:in `each'
/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/callbacks.rb:51:in `exec_callback_yielding_self'
/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/queue.rb:1217:in `auto_recover'
/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/callbacks.rb:59:in `call'
/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/callbacks.rb:59:in `exec_callback_once_yielding_self'
/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/callbacks.rb:59:in `each'
/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/callbacks.rb:59:in `exec_callback_once_yielding_self'
/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/queue.rb:928:in `handle_declare_ok'
/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/queue.rb:1321
/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/session.rb:959:in `call'
/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/session.rb:959:in `receive_frameset'
/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/session.rb:933:in `receive_frame'
/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/session.rb:671:in `receive_data'
/usr/lib64/ruby/gems/1.8/gems/eventmachine-1.0.3/lib/eventmachine.rb:187:in `run_machine'
/usr/lib64/ruby/gems/1.8/gems/eventmachine-1.0.3/lib/eventmachine.rb:187:in `run'
reconnect_test.rb:6

The first call (dispatched through the session-object) is kinda useless for a queue as the queue is not actually restored (still old, now invalid queue name). The second call is "the good one".

The first one is defined in channel.rb. I've removed the line '@queues.each' which clearly improves the situation.

# @private
def run_after_recovery_callbacks
  self.exec_callback_yielding_self(:after_recovery)

  ## REMOVED ## @queues.each    { |name, q| q.run_after_recovery_callbacks }
  @exchanges.each { |name, e| e.run_after_recovery_callbacks }
end

Now I no longer see the queue.on_recovery calls with the old queue name. I've retested with named queues. They still recover without problems too.

This does not solve the second problem (one additional server named queue on every RabbitMQ restart). I'm still trying to figure out where this one gets created.

@tht
Copy link
Author

tht commented Sep 24, 2014

Sorry for spamming messages here. I'm diving deeper and found something else slightly related to this issue.

In queue.rb

def auto_recover
  self.exec_callback_yielding_self(:before_recovery)

  if self.server_named?
    old_name = @name.dup
    @name    = AMQ::Protocol::EMPTY_STRING

    #@channel.queues.delete(old_name)
  end

@channel.queues contains queue-objects. Trying to delete the name of an old queue is useless, as it will not match the string to a queue object. Also it's not needed, there is no duplicate in @channel.queues.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

No branches or pull requests

2 participants