Skip to content

Commit

Permalink
Merge pull request #66 from eandersson/feature/reuse_channels
Browse files Browse the repository at this point in the history
Feature/reuse channels
  • Loading branch information
eandersson committed Dec 3, 2018
2 parents c64a687 + bad6f5b commit 6450ef2
Show file tree
Hide file tree
Showing 14 changed files with 225 additions and 95 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
Changelog
=========

Version 2.6.0
-------------
- Re-use closed channel ids [#55] - Thanks mikemrm.
- Changed Poller Timeout to be a constant.
- Improved Connection Close performance.
- Channels is now a publicly available variable in Connections.

Version 2.5.0
-------------
- Upgraded pamqp to v2.0.0.
Expand Down
7 changes: 7 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@ Additional documentation is available on `amqpstorm.io <https://www.amqpstorm.io
Changelog
=========

Version 2.6.0
-------------
- Re-use closed channel ids [#55] - Thanks mikemrm.
- Changed Poller Timeout to be a constant.
- Improved Connection Close performance.
- Channels is now a publicly available variable in Connections.

Version 2.5.0
-------------
- Upgraded pamqp to v2.0.0.
Expand Down
2 changes: 1 addition & 1 deletion amqpstorm/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""AMQPStorm."""
__version__ = '2.5.0' # noqa
__version__ = '2.6.0' # noqa
__author__ = 'eandersson' # noqa

import logging
Expand Down
8 changes: 4 additions & 4 deletions amqpstorm/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,9 @@ def build_inbound_messages(self, break_on_empty=False, to_tuple=False,
message = self._build_message(auto_decode=auto_decode)
if not message:
self.check_for_errors()
if break_on_empty:
sleep(IDLE_WAIT * 10)
if not self._inbound:
break
sleep(IDLE_WAIT)
if break_on_empty and not self._inbound:
break
continue
if to_tuple:
yield message.to_tuple()
Expand Down Expand Up @@ -163,6 +161,7 @@ def close(self, reply_code=200, reply_text=''):
reply_text=reply_text),
adapter=self._connection
)
self._connection._remove_channel(self.channel_id)
finally:
if self._inbound:
del self._inbound[:]
Expand Down Expand Up @@ -470,3 +469,4 @@ def _close_channel(self, frame_in):
except AMQPConnectionError:
pass
self.close()
self._connection._remove_channel(self.channel_id)
52 changes: 39 additions & 13 deletions amqpstorm/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ def __init__(self, hostname, username, password, port=5672, **kwargs):
on_read=self._read_buffer)
self._channel0 = Channel0(self)
self._channels = {}
self._last_channel_id = 1
self.heartbeat = Heartbeat(self.parameters['heartbeat'],
self._channel0.send_heartbeat)
if not kwargs.get('lazy', False):
Expand All @@ -78,6 +79,14 @@ def __exit__(self, exception_type, exception_value, _):
LOGGER.warning(message, exception_value)
self.close()

@property
def channels(self):
"""Returns a dictionary of the Channels currently available.
:rtype: dict
"""
return self._channels

@property
def fileno(self):
"""Returns the Socket File number.
Expand Down Expand Up @@ -183,13 +192,13 @@ def close(self):
self.set_state(self.CLOSING)
self.heartbeat.stop()
try:
self._close_remaining_channels()
if not self.is_closed and self.socket:
self._channel0.send_close_connection()
self._wait_for_connection_state(state=Stateful.CLOSED)
except AMQPConnectionError:
pass
finally:
self._close_remaining_channels()
self._io.close()
self.set_state(self.CLOSED)
LOGGER.debug('Connection Closed')
Expand All @@ -204,6 +213,7 @@ def open(self):
self.set_state(self.OPENING)
self._exceptions = []
self._channels = {}
self._last_channel_id = 1
self._io.open()
self._send_handshake()
self._wait_for_connection_state(state=Stateful.OPEN)
Expand Down Expand Up @@ -237,29 +247,34 @@ def write_frames(self, channel_id, frames_out):
self._io.write_to_socket(data_out)

def _close_remaining_channels(self):
"""Close any open channels.
"""Forcefully close all open channels.
:return:
"""
for channel_id in self._channels:
if not self._channels[channel_id].is_open:
continue
for channel_id in list(self._channels):
self._channels[channel_id].set_state(Channel.CLOSED)
self._channels[channel_id].close()
self._remove_channel(channel_id)

def _get_next_available_channel_id(self):
"""Returns the next available available channel id.
:raises AMQPConnectionError: Raises if there is no available channel.
:rtype: int
"""
channel_id = len(self._channels) + 1
if channel_id == self.max_allowed_channels:
raise AMQPConnectionError(
'reached the maximum number of channels %d' %
self.max_allowed_channels)
return channel_id
for index in compatibility.RANGE(self._last_channel_id,
self.max_allowed_channels):
if index in self._channels:
continue
self._last_channel_id = index
return index

if self._last_channel_id != 1:
self._last_channel_id = 1
return self._get_next_available_channel_id()

raise AMQPConnectionError(
'reached the maximum number of channels %d' %
self.max_allowed_channels)

def _handle_amqp_frame(self, data_in):
"""Unmarshal a single AMQP frame and return the result.
Expand Down Expand Up @@ -301,6 +316,17 @@ def _read_buffer(self, data_in):

return data_in

def _remove_channel(self, channel_id):
"""Remove a channel.
:param int channel_id: Channel id
:return:
"""
with self.lock:
if channel_id not in self._channels:
return
del self._channels[channel_id]

def _send_handshake(self):
"""Send a RabbitMQ Handshake.
Expand Down
12 changes: 4 additions & 8 deletions amqpstorm/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,15 @@
from errno import EAGAIN
from errno import EINTR
from errno import EWOULDBLOCK
from time import sleep

from amqpstorm import compatibility
from amqpstorm.base import IDLE_WAIT
from amqpstorm.base import MAX_FRAME_SIZE
from amqpstorm.compatibility import ssl
from amqpstorm.exception import AMQPConnectionError

EMPTY_BUFFER = bytes()
LOGGER = logging.getLogger(__name__)
POLL_TIMEOUT = 1.0


class Poller(object):
Expand Down Expand Up @@ -44,7 +43,7 @@ def is_ready(self):
"""
try:
ready, _, _ = self.select.select([self.fileno], [], [],
self.timeout)
POLL_TIMEOUT)
return bool(ready)
except self.select.error as why:
if why.args[0] != EINTR:
Expand Down Expand Up @@ -224,7 +223,6 @@ def _process_incoming_data(self):
if self.poller.is_ready:
self.data_in += self._receive()
self.data_in = self._on_read(self.data_in)
sleep(IDLE_WAIT)

def _receive(self):
"""Receive any incoming socket data.
Expand Down Expand Up @@ -253,7 +251,5 @@ def _read_from_socket(self):
:rtype: bytes
"""
if self.use_ssl:
data_in = self.socket.read(MAX_FRAME_SIZE)
else:
data_in = self.socket.recv(MAX_FRAME_SIZE)
return data_in
return self.socket.read(MAX_FRAME_SIZE)
return self.socket.recv(MAX_FRAME_SIZE)
12 changes: 7 additions & 5 deletions amqpstorm/tests/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
HOST = '127.0.0.1'
USERNAME = 'guest'
PASSWORD = 'guest'
URI = 'amqp://guest:guest@127.0.0.1:5672/%2F'
HTTP_URL = 'http://127.0.0.1:15672'
import os

HOST = os.environ.get('AMQP_HOST', '127.0.0.1')
USERNAME = os.environ.get('AMQP_USERNAME', 'guest')
PASSWORD = os.environ.get('AMQP_PASSWORD', 'guest')
URI = os.environ.get('AMQP_URI', 'amqp://guest:guest@127.0.0.1:5672/%2F')
HTTP_URL = os.environ.get('AMQP_HTTP_URL', 'http://127.0.0.1:15672')
10 changes: 0 additions & 10 deletions amqpstorm/tests/functional/basic_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,6 @@ def test_functional_basic_ack_multiple(self):
multiple=True
)

# Close AND Open Channel to force potential unacked messages to be
# flushed.
self.channel.close()
self.channel.open()

# Make sure the message wasn't requeued.
self.assertFalse(self.channel.basic.get(self.queue_name))

Expand Down Expand Up @@ -112,11 +107,6 @@ def test_functional_basic_nack_multiple(self):
multiple=True
)

# Close AND Open Channel to force potential unacked messages to be
# flushed.
self.channel.close()
self.channel.open()

# Make sure the message wasn't requeued.
self.assertFalse(self.channel.basic.get(self.queue_name))

Expand Down
42 changes: 17 additions & 25 deletions amqpstorm/tests/functional/reliability_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@ class ReliabilityFunctionalTests(TestFunctionalFramework):
@setup(new_connection=False, queue=True)
def test_functional_open_new_connection_loop(self):
for _ in range(25):
self.connection = self.connection = Connection(HOST,
USERNAME,
PASSWORD,
timeout=30)
self.connection = self.connection = Connection(HOST, USERNAME,
PASSWORD)
self.channel = self.connection.channel()

# Make sure that it's a new channel.
Expand All @@ -47,8 +45,7 @@ def test_functional_open_new_connection_loop(self):

@setup(new_connection=False, queue=True)
def test_functional_open_close_connection_loop(self):
self.connection = Connection(HOST, USERNAME, PASSWORD, timeout=30,
lazy=True)
self.connection = Connection(HOST, USERNAME, PASSWORD, lazy=True)
for _ in range(25):
self.connection.open()
channel = self.connection.channel()
Expand Down Expand Up @@ -78,9 +75,6 @@ def test_functional_close_gracefully_after_publish_mandatory_fails(self):
for index in range(3):
channel = self.connection.channel()

# Make sure that it's a new channel.
self.assertEqual(index + 1, int(channel))

# Try to publish 25 bad messages.
for _ in range(25):
try:
Expand All @@ -98,10 +92,8 @@ def test_functional_close_gracefully_after_publish_mandatory_fails(self):

@setup(new_connection=False, queue=True)
def test_functional_open_close_channel_loop(self):
self.connection = self.connection = Connection(HOST,
USERNAME,
PASSWORD,
timeout=30)
self.connection = self.connection = Connection(HOST, USERNAME,
PASSWORD)
for _ in range(25):
channel = self.connection.channel()

Expand All @@ -118,11 +110,8 @@ def test_functional_open_close_channel_loop(self):
@setup(new_connection=False, queue=True)
def test_functional_open_multiple_channels(self):
channels = []
self.connection = self.connection = Connection(HOST,
USERNAME,
PASSWORD,
timeout=30,
lazy=True)
self.connection = self.connection = Connection(HOST, USERNAME,
PASSWORD, lazy=True)
for _ in range(5):
self.connection.open()
for index in range(5):
Expand Down Expand Up @@ -173,12 +162,12 @@ def consume_messages(self):
channel = self.connection.channel()
channel.basic.consume(queue=self.queue_name,
no_ack=False)
for message in channel.build_inbound_messages(break_on_empty=False):
for message in channel.build_inbound_messages(
break_on_empty=False):
self.increment_message_count()
message.ack()
if self.messages_consumed == self.messages_to_send:
break
channel.close()

def increment_message_count(self):
with self.lock:
Expand All @@ -203,6 +192,10 @@ def test_functional_publish_and_consume_5k_messages(self):
break
time.sleep(0.1)

for channel in list(self.connection.channels.values()):
channel.stop_consuming()
channel.close()

self.assertEqual(self.messages_consumed, self.messages_to_send,
'test took too long')

Expand All @@ -221,17 +214,16 @@ def publish_messages(self):
@setup(queue=True)
def test_functional_publish_and_consume_until_empty(self):
self.channel.queue.declare(self.queue_name)

publish_thread = threading.Thread(target=self.publish_messages, )
publish_thread.daemon = True
publish_thread.start()
self.channel.confirm_deliveries()
self.publish_messages()

channel = self.connection.channel()
channel.basic.consume(queue=self.queue_name,
no_ack=False)
message_count = 0
for _ in channel.build_inbound_messages(break_on_empty=True):
for message in channel.build_inbound_messages(break_on_empty=True):
message_count += 1
message.ack()

result = channel.queue.declare(self.queue_name, passive=True)
self.assertEqual(result['message_count'], 0)
Expand Down
3 changes: 3 additions & 0 deletions amqpstorm/tests/functional/web_based_tests.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
import time

from amqpstorm import Connection
Expand All @@ -8,6 +9,8 @@
from amqpstorm.tests.utility import retry_function_wrapper
from amqpstorm.tests.utility import setup

LOGGER = logging.getLogger(__name__)


class WebFunctionalTests(TestFunctionalFramework):
def configure(self):
Expand Down

0 comments on commit 6450ef2

Please sign in to comment.