Skip to content

Commit

Permalink
Use RLock instead to handle potential race conditions
Browse files Browse the repository at this point in the history
  • Loading branch information
eandersson committed Nov 20, 2021
1 parent 10265bc commit 7074ae1
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 22 deletions.
4 changes: 2 additions & 2 deletions amqpstorm/base.py
Expand Up @@ -20,8 +20,8 @@ class Stateful(object):
OPENING = 2
OPEN = 3

def __init__(self):
self._lock = threading.Lock()
def __init__(self, lock_type=threading.Lock):
self._lock = lock_type()
self._state = self.CLOSED
self._exceptions = []

Expand Down
18 changes: 10 additions & 8 deletions amqpstorm/channel.py
@@ -1,7 +1,7 @@
"""AMQPStorm Connection.Channel."""

import logging
from time import sleep
import time

from pamqp import specification
from pamqp.header import ContentHeader
Expand All @@ -12,6 +12,7 @@
from amqpstorm.base import IDLE_WAIT
from amqpstorm.basic import Basic
from amqpstorm.compatibility import try_utf8_decode
from amqpstorm.exception import AMQPError
from amqpstorm.exception import AMQPChannelError
from amqpstorm.exception import AMQPConnectionError
from amqpstorm.exception import AMQPInvalidArgument
Expand Down Expand Up @@ -157,7 +158,7 @@ def build_inbound_messages(self, break_on_empty=False, to_tuple=False,
message_impl=message_impl)
if not message:
self.check_for_errors()
sleep(IDLE_WAIT)
time.sleep(IDLE_WAIT)
if break_on_empty and not self._inbound:
break
continue
Expand All @@ -184,7 +185,7 @@ def close(self, reply_code=200, reply_text=''):
elif not compatibility.is_string(reply_text):
raise AMQPInvalidArgument('reply_text should be a string')
try:
if self._connection.is_closed or self.is_closed:
if self._connection.is_closed or not self.is_open:
self.stop_consuming()
LOGGER.debug('Channel #%d forcefully Closed', self.channel_id)
return
Expand Down Expand Up @@ -357,7 +358,7 @@ def start_consuming(self, to_tuple=False, auto_decode=True):
auto_decode=auto_decode
)
if self.consumer_tags:
sleep(IDLE_WAIT)
time.sleep(IDLE_WAIT)
continue
break

Expand Down Expand Up @@ -488,7 +489,7 @@ def _build_message_body(self, body_size):
while len(body) < body_size:
if not self._inbound:
self.check_for_errors()
sleep(IDLE_WAIT)
time.sleep(IDLE_WAIT)
continue
body_piece = self._inbound.pop(0)
if not body_piece.value:
Expand All @@ -502,12 +503,12 @@ def _close_channel(self, frame_in):
:param specification.Channel.Close frame_in: Channel Close frame.
:return:
"""
if self._connection.is_open:
self.set_state(self.CLOSING)
if not self._connection.is_closed:
try:
self.write_frame(specification.Channel.CloseOk())
except AMQPConnectionError:
except AMQPError:
pass
self.set_state(self.CLOSED)
self.remove_consumer_tag()
if self._inbound:
del self._inbound[:]
Expand All @@ -519,3 +520,4 @@ def _close_channel(self, frame_in):
),
reply_code=frame_in.reply_code
))
self.set_state(self.CLOSED)
3 changes: 2 additions & 1 deletion amqpstorm/connection.py
@@ -1,6 +1,7 @@
"""AMQPStorm Connection."""

import logging
import threading
import time
from time import sleep

Expand Down Expand Up @@ -72,7 +73,7 @@ class Connection(Stateful):
]

def __init__(self, hostname, username, password, port=5672, **kwargs):
super(Connection, self).__init__()
super(Connection, self).__init__(lock_type=threading.RLock)
self.parameters = {
'hostname': hostname,
'username': username,
Expand Down
2 changes: 1 addition & 1 deletion amqpstorm/io.py
Expand Up @@ -279,7 +279,7 @@ def _receive(self):
# NOTE(visobet): Retry if the non-blocking socket does not
# have any meaningful data ready.
pass
except (IOError, OSError) as why:
except (IOError, OSError, ValueError) as why:
if why.args[0] not in (EWOULDBLOCK, EAGAIN):
self._exceptions.append(AMQPConnectionError(why))
if self._running.is_set():
Expand Down
26 changes: 16 additions & 10 deletions amqpstorm/tests/functional/test_reliability.py
Expand Up @@ -143,27 +143,33 @@ def test_functional_close_performance(self):
self.connection.close()
self.assertLess(time.time() - start_time, 3)

@setup(new_connection=False, queue=False)
@setup(new_connection=False)
def test_functional_close_after_channel_close_forced_by_server(self):
"""Make sure the channel is closed instantly when the remote server
closes it.
:return:
"""
for _ in range(10):
self.connection = self.connection = Connection(HOST, USERNAME,
PASSWORD)
self.channel = self.connection.channel(rpc_timeout=360)

connection = Connection(
HOST, USERNAME, PASSWORD
)
channel = connection.channel()
channel.confirm_deliveries()
try:
channel.basic.publish(
body=self.message,
routing_key=self.queue_name,
exchange='invalid'
)
except (AMQPConnectionError, AMQPChannelError):
pass
start_time = time.time()
self.channel.basic.publish(body=self.message,
routing_key=self.queue_name,
exchange='invalid')
self.channel.close()
channel.close()
self.assertLess(time.time() - start_time, 3)

start_time = time.time()
self.connection.close()
connection.close()
self.assertLess(time.time() - start_time, 3)

@setup(new_connection=False)
Expand Down

0 comments on commit 7074ae1

Please sign in to comment.