diff --git a/amqpstorm/base.py b/amqpstorm/base.py index 1ead1931..0fbe45fe 100644 --- a/amqpstorm/base.py +++ b/amqpstorm/base.py @@ -20,8 +20,8 @@ class Stateful(object): OPENING = 2 OPEN = 3 - def __init__(self): - self._lock = threading.Lock() + def __init__(self, lock_type=threading.Lock): + self._lock = lock_type() self._state = self.CLOSED self._exceptions = [] diff --git a/amqpstorm/channel.py b/amqpstorm/channel.py index b0d829b3..05c1e973 100644 --- a/amqpstorm/channel.py +++ b/amqpstorm/channel.py @@ -1,7 +1,7 @@ """AMQPStorm Connection.Channel.""" import logging -from time import sleep +import time from pamqp import specification from pamqp.header import ContentHeader @@ -12,6 +12,7 @@ from amqpstorm.base import IDLE_WAIT from amqpstorm.basic import Basic from amqpstorm.compatibility import try_utf8_decode +from amqpstorm.exception import AMQPError from amqpstorm.exception import AMQPChannelError from amqpstorm.exception import AMQPConnectionError from amqpstorm.exception import AMQPInvalidArgument @@ -157,7 +158,7 @@ def build_inbound_messages(self, break_on_empty=False, to_tuple=False, message_impl=message_impl) if not message: self.check_for_errors() - sleep(IDLE_WAIT) + time.sleep(IDLE_WAIT) if break_on_empty and not self._inbound: break continue @@ -184,7 +185,7 @@ def close(self, reply_code=200, reply_text=''): elif not compatibility.is_string(reply_text): raise AMQPInvalidArgument('reply_text should be a string') try: - if self._connection.is_closed or self.is_closed: + if self._connection.is_closed or not self.is_open: self.stop_consuming() LOGGER.debug('Channel #%d forcefully Closed', self.channel_id) return @@ -357,7 +358,7 @@ def start_consuming(self, to_tuple=False, auto_decode=True): auto_decode=auto_decode ) if self.consumer_tags: - sleep(IDLE_WAIT) + time.sleep(IDLE_WAIT) continue break @@ -488,7 +489,7 @@ def _build_message_body(self, body_size): while len(body) < body_size: if not self._inbound: self.check_for_errors() - sleep(IDLE_WAIT) + time.sleep(IDLE_WAIT) continue body_piece = self._inbound.pop(0) if not body_piece.value: @@ -502,12 +503,12 @@ def _close_channel(self, frame_in): :param specification.Channel.Close frame_in: Channel Close frame. :return: """ - if self._connection.is_open: + self.set_state(self.CLOSING) + if not self._connection.is_closed: try: self.write_frame(specification.Channel.CloseOk()) - except AMQPConnectionError: + except AMQPError: pass - self.set_state(self.CLOSED) self.remove_consumer_tag() if self._inbound: del self._inbound[:] @@ -519,3 +520,4 @@ def _close_channel(self, frame_in): ), reply_code=frame_in.reply_code )) + self.set_state(self.CLOSED) diff --git a/amqpstorm/connection.py b/amqpstorm/connection.py index 7ed6b540..c22e9081 100644 --- a/amqpstorm/connection.py +++ b/amqpstorm/connection.py @@ -1,6 +1,7 @@ """AMQPStorm Connection.""" import logging +import threading import time from time import sleep @@ -72,7 +73,7 @@ class Connection(Stateful): ] def __init__(self, hostname, username, password, port=5672, **kwargs): - super(Connection, self).__init__() + super(Connection, self).__init__(lock_type=threading.RLock) self.parameters = { 'hostname': hostname, 'username': username, diff --git a/amqpstorm/io.py b/amqpstorm/io.py index 5d44a119..70e73cf7 100644 --- a/amqpstorm/io.py +++ b/amqpstorm/io.py @@ -279,7 +279,7 @@ def _receive(self): # NOTE(visobet): Retry if the non-blocking socket does not # have any meaningful data ready. pass - except (IOError, OSError) as why: + except (IOError, OSError, ValueError) as why: if why.args[0] not in (EWOULDBLOCK, EAGAIN): self._exceptions.append(AMQPConnectionError(why)) if self._running.is_set(): diff --git a/amqpstorm/tests/functional/test_reliability.py b/amqpstorm/tests/functional/test_reliability.py index 9a3bc5fe..a99098f4 100644 --- a/amqpstorm/tests/functional/test_reliability.py +++ b/amqpstorm/tests/functional/test_reliability.py @@ -143,7 +143,7 @@ def test_functional_close_performance(self): self.connection.close() self.assertLess(time.time() - start_time, 3) - @setup(new_connection=False, queue=False) + @setup(new_connection=False) def test_functional_close_after_channel_close_forced_by_server(self): """Make sure the channel is closed instantly when the remote server closes it. @@ -151,19 +151,25 @@ def test_functional_close_after_channel_close_forced_by_server(self): :return: """ for _ in range(10): - self.connection = self.connection = Connection(HOST, USERNAME, - PASSWORD) - self.channel = self.connection.channel(rpc_timeout=360) - + connection = Connection( + HOST, USERNAME, PASSWORD + ) + channel = connection.channel() + channel.confirm_deliveries() + try: + channel.basic.publish( + body=self.message, + routing_key=self.queue_name, + exchange='invalid' + ) + except (AMQPConnectionError, AMQPChannelError): + pass start_time = time.time() - self.channel.basic.publish(body=self.message, - routing_key=self.queue_name, - exchange='invalid') - self.channel.close() + channel.close() self.assertLess(time.time() - start_time, 3) start_time = time.time() - self.connection.close() + connection.close() self.assertLess(time.time() - start_time, 3) @setup(new_connection=False)