Skip to content

Commit

Permalink
Merge pull request #17 from eandersson/bug16
Browse files Browse the repository at this point in the history
Releasing 1.3.3
  • Loading branch information
eandersson committed Jun 8, 2016
2 parents 48265cd + e6e3dc9 commit 49ffb01
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 8 deletions.
13 changes: 8 additions & 5 deletions CHANGELOG
@@ -1,17 +1,20 @@
# Changelog

### Version 1.3.3
- Fixed bug causing messages without a body to not be consumed properly (#16) - Thanks Adam Mills.

### Version 1.3.2
- Fixed a minor bug in the Poller error handling.
- Fixed an issue where network corruption caused a connection error to throw the wrong exception.
- Fixed minor bug in the Poller error handling.
- Fixed issue where network corruption could caused a connection error to throw the wrong exception.

### Version 1.3.1
- Fixed a SSL bug that could trigger an exception when running multiple threads. [#14]
- Fixed a bug when using channel.basic.get to retrieve large payloads.
- Fixed SSL bug that could trigger an exception when running multiple threads (#14) - Thanks Adam Mills.
- Fixed bug when using channel.basic.get to retrieve large payloads.
- Reduced default RPC timeout from 360s to 120s.

### Version 1.3.0
- Removed noisy logging.
- Fixed Runtime exception caused by listener trying to join itself.
- Fixed Runtime exception caused by listener trying to join itself (#11) - Thanks ramonz.
- Channels are no longer closed after RabbitMQ throws a recoverable exception.
- Added Error mapping based on the AMQP 0.9.1 specifications (when applicable).
Introduced three new variables to the AMQP-Storm Exceptions.
Expand Down
2 changes: 1 addition & 1 deletion amqpstorm/__init__.py
@@ -1,5 +1,5 @@
"""AMQP-Storm."""
__version__ = '1.3.2' # noqa
__version__ = '1.3.3' # noqa
__author__ = 'eandersson' # noqa

import logging
Expand Down
2 changes: 1 addition & 1 deletion amqpstorm/channel.py
Expand Up @@ -339,7 +339,7 @@ def _build_message(self):
:rtype: Message
"""
with self.lock:
if len(self._inbound) < 3:
if len(self._inbound) < 2:
return None
basic_deliver = self._inbound.pop(0)
if not isinstance(basic_deliver, pamqp_spec.Basic.Deliver):
Expand Down
65 changes: 65 additions & 0 deletions amqpstorm/tests/functional/generic_tests.py
Expand Up @@ -124,6 +124,39 @@ def tearDown(self):
self.connection.close()


class PublishAndGetEmptyMessagesTest(unittest.TestCase):
def setUp(self):
self.queue_name = 'test.basic.get_empty'
self.connection = Connection(HOST, USERNAME, PASSWORD)
self.channel = self.connection.channel()
self.channel.queue.declare(self.queue_name)
self.channel.queue.purge(self.queue_name)

def test_functional_publish_and_get_five_empty_messages(self):
# Publish 5 Messages.
for _ in range(5):
self.channel.basic.publish(body=b'',
routing_key=self.queue_name)

# Sleep for 0.5s to make sure RabbitMQ has time to catch up.
time.sleep(0.5)

# Get 5 messages.
inbound_messages = []
for _ in range(5):
payload = self.channel.basic.get(self.queue_name, to_dict=False)
self.assertIsInstance(payload, Message)
self.assertEqual(payload.body, b'')
inbound_messages.append(payload)

self.assertEqual(len(inbound_messages), 5)

def tearDown(self):
self.channel.queue.delete(self.queue_name)
self.channel.close()
self.connection.close()


class PublishAndGetLargeMessageTest(unittest.TestCase):
def setUp(self):
self.connection = Connection(HOST, USERNAME, PASSWORD)
Expand Down Expand Up @@ -183,6 +216,38 @@ def tearDown(self):
self.connection.close()


class PublishEmptyMessagesAndConsumeTest(unittest.TestCase):
def setUp(self):
self.queue_name = 'test.basic.empty_messages'
self.connection = Connection(HOST, USERNAME, PASSWORD)
self.channel = self.connection.channel()
self.channel.confirm_deliveries()
self.channel.queue.declare(self.queue_name)
self.channel.queue.purge(self.queue_name)

def test_functional_publish_5_empty_messages(self):
body = b''
messages_to_publish = 5

self.channel.basic.consume(queue=self.queue_name,
no_ack=True)
# Publish 5 Messages.
for _ in range(messages_to_publish):
self.channel.basic.publish(body=body,
routing_key=self.queue_name)

inbound_messages = []
for message in self.channel.build_inbound_messages(break_on_empty=True):
self.assertEqual(message.body, body)
inbound_messages.append(message)
self.assertEqual(len(inbound_messages), messages_to_publish)

def tearDown(self):
self.channel.queue.delete(self.queue_name)
self.channel.close()
self.connection.close()


class PublishLargeMessagesAndGetTest(unittest.TestCase):
def setUp(self):
self.connection = Connection(HOST, USERNAME, PASSWORD)
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Expand Up @@ -5,7 +5,7 @@
"""

setup(name='AMQPStorm',
version='1.3.2',
version='1.3.3',
description='Thread-safe Python RabbitMQ Client Library based on pamqp.',
long_description=long_description,
author='Erik Olof Gunnar Andersson',
Expand Down

0 comments on commit 49ffb01

Please sign in to comment.