Skip to content

Commit

Permalink
Fixed a potential recursion error in Connection.close
Browse files Browse the repository at this point in the history
  • Loading branch information
eandersson committed Sep 23, 2016
1 parent 4e0ce7c commit 54ab3f3
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 14 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.rst
@@ -1,6 +1,10 @@
Changelog
---------

Version 2.1.2
-------------
- Fixed a potential recursion error in Connection.close.

Version 2.1.1
-------------
- Reduced default TCP Timeout from 30s to 10s.
Expand Down
10 changes: 7 additions & 3 deletions README.rst
@@ -1,11 +1,11 @@
AMQPStorm
-------------
---------
Thread-safe Python RabbitMQ Client & Management library.

|Version| |CodeClimate| |Travis| |Coverage|

Introduction
-------------
------------
AMQPStorm is a library designed to be consistent, stable and thread-safe.

- 100% Unit-test Coverage!
Expand All @@ -19,7 +19,11 @@ Documentation
Additional documentation is available on `ReadTheDocs <https://amqpstorm.readthedocs.org>`_.

Changelog
---------

Version 2.1.2
-------------
- Fixed a potential recursion error in Connection.close.

Version 2.1.1
-------------
Expand Down Expand Up @@ -47,7 +51,7 @@ Version 2.0.0
- channel.basic.get

Credits
-------------
-------
Special thanks to gmr (Gavin M. Roy) for creating pamqp, and in addition amqpstorm is heavily influenced by his pika and rabbitpy libraries.

.. |Version| image:: https://badge.fury.io/py/amqpstorm.svg?
Expand Down
2 changes: 1 addition & 1 deletion amqpstorm/__init__.py
@@ -1,5 +1,5 @@
"""AMQPStorm."""
__version__ = '2.1.1' # noqa
__version__ = '2.1.2' # noqa
__author__ = 'eandersson' # noqa

import logging
Expand Down
22 changes: 16 additions & 6 deletions amqpstorm/connection.py
Expand Up @@ -15,6 +15,7 @@
from amqpstorm.channel import Channel
from amqpstorm.channel0 import Channel0
from amqpstorm.exception import AMQPConnectionError
from amqpstorm.exception import AMQPError
from amqpstorm.exception import AMQPInvalidArgument
from amqpstorm.heartbeat import Heartbeat
from amqpstorm.io import EMPTY_BUFFER
Expand Down Expand Up @@ -168,7 +169,8 @@ def close(self):
self._close_remaining_channels()
if not self.is_closed and self._io.socket:
self._channel0.send_close_connection_frame()
self._wait_for_connection_state(Stateful.CLOSED)
self._wait_for_connection_state(Stateful.CLOSED,
raise_on_exception=False)
except AMQPConnectionError:
pass
finally:
Expand All @@ -187,7 +189,8 @@ def open(self):
self._exceptions = []
self._io.open()
self._send_handshake()
self._wait_for_connection_state(state=Stateful.OPEN)
self._wait_for_connection_state(state=Stateful.OPEN,
raise_on_exception=True)
self.heartbeat.start(self._exceptions)
LOGGER.debug('Connection Opened')

Expand Down Expand Up @@ -296,10 +299,12 @@ def _validate_parameters(self):
elif not compatibility.is_integer(self.parameters['heartbeat']):
raise AMQPInvalidArgument('heartbeat should be an integer')

def _wait_for_connection_state(self, state=Stateful.OPEN):
def _wait_for_connection_state(self, state=Stateful.OPEN,
raise_on_exception=True):
"""Wait for a Connection state.
:param int state: State that we expect
:param bool raise_on_exception: Should we raise on exception
:raises AMQPConnectionError: Raises if we reach the connection timeout.
Expand All @@ -308,7 +313,12 @@ def _wait_for_connection_state(self, state=Stateful.OPEN):
start_time = time.time()
timeout = (self.parameters['timeout'] or 10) * 3
while self.current_state != state:
self.check_for_errors()
if time.time() - start_time > timeout:
raise AMQPConnectionError('Connection timed out')
try:
self.check_for_errors()
if time.time() - start_time > timeout:
raise AMQPConnectionError('Connection timed out')
except AMQPError:
if not raise_on_exception:
break
raise
sleep(IDLE_WAIT)
49 changes: 46 additions & 3 deletions amqpstorm/tests/unit/connection_tests.py
Expand Up @@ -218,6 +218,32 @@ def func(conn):

self.assertTrue(connection.is_open)

def test_connection_wait_for_connection_does_raise_on_error(self):
connection = Connection('127.0.0.1', 'guest', 'guest', timeout=0.1,
lazy=True)
connection.set_state(connection.OPENING)

connection.exceptions.append(AMQPConnectionError('travis-ci'))

self.assertRaises(
AMQPConnectionError, connection._wait_for_connection_state,
connection.OPEN, True
)

def test_connection_wait_for_connection_does_not_raise_on_error(self):
connection = Connection('127.0.0.1', 'guest', 'guest', timeout=0.1,
lazy=True)
connection.set_state(connection.OPENING)

connection.exceptions.append(AMQPConnectionError('travis-ci'))

self.assertIsNone(
connection._wait_for_connection_state(
connection.OPEN,
False
)
)

def test_connection_wait_for_connection_raises_on_timeout(self):
connection = Connection('127.0.0.1', 'guest', 'guest', timeout=0.1,
lazy=True)
Expand All @@ -226,9 +252,26 @@ def test_connection_wait_for_connection_raises_on_timeout(self):
io.socket = Mock(name='socket', spec=socket.socket)
connection._io = io

self.assertRaises(AMQPConnectionError,
connection._wait_for_connection_state,
connection.OPEN)
self.assertRaises(
AMQPConnectionError,
connection._wait_for_connection_state,
connection.OPEN
)

def test_connection_wait_for_connection_breaks_on_timeout(self):
connection = Connection('127.0.0.1', 'guest', 'guest', timeout=0.1,
lazy=True)
connection.set_state(connection.OPENING)
io = IO(connection.parameters, [])
io.socket = Mock(name='socket', spec=socket.socket)
connection._io = io

self.assertIsNone(
connection._wait_for_connection_state(
connection.OPEN,
False
)
)

def test_connection_close_channels(self):
connection = Connection('127.0.0.1', 'guest', 'guest', timeout=1,
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Expand Up @@ -2,7 +2,7 @@

setup(
name='AMQPStorm',
version='2.1.1',
version='2.1.2',
description='Thread-safe Python RabbitMQ Client & Management library.',
long_description=open('README.rst').read(),
author='Erik Olof Gunnar Andersson',
Expand Down

0 comments on commit 54ab3f3

Please sign in to comment.