Skip to content

Commit

Permalink
Fix remote channel close taking too long
Browse files Browse the repository at this point in the history
  • Loading branch information
eandersson committed Jun 13, 2019
1 parent a917edc commit 3a044fd
Show file tree
Hide file tree
Showing 8 changed files with 43 additions and 38 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.rst
Expand Up @@ -4,6 +4,7 @@ Changelog
Version 2.7.1
-------------
- Fixed Connection close taking longer than intended when using SSL [#75]- Thanks troglas.
- Fixed an issue with closing Channels taking too long after the server initiated it.

Version 2.7.0
-------------
Expand Down
1 change: 1 addition & 0 deletions README.rst
Expand Up @@ -23,6 +23,7 @@ Changelog
Version 2.7.1
-------------
- Fixed Connection close taking longer than intended when using SSL [#75]- Thanks troglas.
- Fixed an issue with closing Channels taking too long after the server initiated it.

Version 2.7.0
-------------
Expand Down
32 changes: 11 additions & 21 deletions amqpstorm/channel.py
Expand Up @@ -457,25 +457,15 @@ def _close_channel(self, frame_in):
:param specification.Channel.Close frame_in: Channel Close frame.
:return:
"""
if frame_in.reply_code != 200:
reply_text = try_utf8_decode(frame_in.reply_text)
message = (
'Channel %d was closed by remote server: %s' %
(
self._channel_id,
reply_text
)
)
exception = AMQPChannelError(message,
reply_code=frame_in.reply_code)
self.exceptions.append(exception)
self.set_state(self.CLOSED)
if self._connection.is_open:

try:
self._connection.write_frame(
self.channel_id, specification.Channel.CloseOk()
)
except AMQPConnectionError:
pass
self.close()
self.remove_consumer_tag()
if self._inbound:
del self._inbound[:]
self.exceptions.append(AMQPChannelError(
'Channel %d was closed by remote server: %s' %
(
self._channel_id,
try_utf8_decode(frame_in.reply_text)
),
reply_code=frame_in.reply_code
))
25 changes: 24 additions & 1 deletion amqpstorm/tests/functional/reliability_tests.py
Expand Up @@ -135,13 +135,36 @@ def test_functional_close_performance(self):
:return:
"""
for _ in range(100):
for _ in range(10):
self.connection = self.connection = Connection(HOST, USERNAME,
PASSWORD)
start_time = time.time()
self.connection.close()
self.assertLess(time.time() - start_time, 3)

@setup(new_connection=False, queue=False)
def test_functional_close_after_channel_close_forced_by_server(self):
"""Make sure the channel is closed instantly when the remote server
closes it.
:return:
"""
for _ in range(10):
self.connection = self.connection = Connection(HOST, USERNAME,
PASSWORD)
self.channel = self.connection.channel(rpc_timeout=360)

start_time = time.time()
self.channel.basic.publish(body=self.message,
routing_key=self.queue_name,
exchange='invalid')
self.channel.close()
self.assertLess(time.time() - start_time, 3)

start_time = time.time()
self.connection.close()
self.assertLess(time.time() - start_time, 3)

@setup(new_connection=False)
def test_functional_uri_connection(self):
self.connection = UriConnection(URI)
Expand Down
2 changes: 1 addition & 1 deletion amqpstorm/tests/functional/ssl/reliability_tests.py
Expand Up @@ -136,7 +136,7 @@ def test_functional_ssl_close_performance(self):
:return:
"""
for _ in range(100):
for _ in range(10):
ssl_options = {
'context': ssl.create_default_context(cafile=CAFILE),
'server_hostname': SSL_HOST
Expand Down
7 changes: 1 addition & 6 deletions amqpstorm/tests/unit/channel/channel_exception_tests.py
Expand Up @@ -187,7 +187,7 @@ def test_channel_raise_with_close_reply_code_500(self):
# Set up Fake Channel.
channel._inbound = [1, 2, 3]
channel.set_state(channel.OPEN)
channel._consumer_tags = [1, 2, 3]
channel._consumer_tags = [4, 5, 6]

close_frame = specification.Channel.Close(
reply_code=500,
Expand All @@ -199,11 +199,6 @@ def test_channel_raise_with_close_reply_code_500(self):
self.assertEqual(channel._consumer_tags, [])
self.assertEqual(channel._state, channel.CLOSED)

self.assertIsInstance(
connection.get_last_frame(),
specification.Channel.CloseOk
)

self.assertRaisesRegexp(
AMQPChannelError,
'Channel 0 was closed by remote server: travis-ci',
Expand Down
5 changes: 0 additions & 5 deletions amqpstorm/tests/unit/channel/channel_frame_tests.py
Expand Up @@ -97,11 +97,6 @@ def test_channel_close_frame(self):
)
)

self.assertIsInstance(
connection.get_last_frame(),
specification.Channel.CloseOk
)

self.assertRaisesRegexp(
AMQPChannelError,
'Channel 0 was closed by remote server: travis-ci',
Expand Down
8 changes: 4 additions & 4 deletions amqpstorm/tests/unit/channel/channel_tests.py
Expand Up @@ -68,7 +68,7 @@ def on_close_ok(_, frame_out):
# Set up Fake Channel.
channel._inbound = [1, 2, 3]
channel.set_state(channel.OPEN)
channel._consumer_tags = ['1', '2', '3']
channel._consumer_tags = ['4', '5', '6']

# Close Channel.
channel.close()
Expand All @@ -89,7 +89,7 @@ def on_close_ok(_, frame_out):
# Set up Fake Channel.
channel._inbound = [1, 2, 3]
channel.set_state(channel.OPEN)
channel._consumer_tags = ['1', '2', '3']
channel._consumer_tags = ['4', '5', '6']
channel.exceptions.append(AMQPChannelError('travis-ci'))

# Close Channel.
Expand All @@ -107,7 +107,7 @@ def test_channel_close_when_already_closed(self):
# Set up Fake Channel.
channel._inbound = [1, 2, 3]
channel.set_state(channel.CLOSED)
channel._consumer_tags = ['1', '2', '3']
channel._consumer_tags = ['4', '5', '6']

def state_set(state):
self.assertEqual(state, channel.CLOSED)
Expand Down Expand Up @@ -142,7 +142,7 @@ def test_channel_close_channel(self):
# Set up Fake Channel.
channel._inbound = [1, 2, 3]
channel.set_state(channel.OPEN)
channel._consumer_tags = [1, 2, 3]
channel._consumer_tags = [4, 5, 6]

close_frame = specification.Channel.Close(reply_code=200,
reply_text='travis-ci')
Expand Down

0 comments on commit 3a044fd

Please sign in to comment.