Skip to content

Commit

Permalink
Merge pull request #101 from eandersson/feature/message
Browse files Browse the repository at this point in the history
Re-worked BaseMessage
  • Loading branch information
eandersson committed Jun 11, 2021
2 parents 7a999a8 + e45a688 commit c173dfd
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 14 deletions.
13 changes: 8 additions & 5 deletions amqpstorm/base.py
Expand Up @@ -153,16 +153,19 @@ class BaseMessage(object):
:param str,unicode body: Message body
:param dict method: Message method
:param dict properties: Message properties
:param bool auto_decode: This is not implemented in the base message class.
"""
__slots__ = [
'_body', '_channel', '_method', '_properties'
'_auto_decode', '_body', '_channel', '_method', '_properties'
]

def __init__(self, channel, **message):
def __init__(self, channel, body=None, method=None, properties=None,
auto_decode=None):
self._auto_decode = auto_decode
self._channel = channel
self._body = message.get('body', None)
self._method = message.get('method', None)
self._properties = message.get('properties', {'headers': {}})
self._body = body
self._method = method
self._properties = properties or {}

def __iter__(self):
for attribute in ['_body', '_channel', '_method', '_properties']:
Expand Down
12 changes: 9 additions & 3 deletions amqpstorm/management/basic.py
Expand Up @@ -86,7 +86,13 @@ def get(self, queue, virtual_host='/', requeue=False, to_dict=False,
return response
messages = []
for message in response:
if 'payload' in message:
message['body'] = message.pop('payload')
messages.append(Message(channel=None, auto_decode=True, **message))
body = message.get('body')
if not body:
body = message.get('payload')
messages.append(Message(
channel=None,
body=body,
properties=message.get('properties'),
auto_decode=True,
))
return messages
14 changes: 8 additions & 6 deletions amqpstorm/message.py
Expand Up @@ -27,20 +27,22 @@ class Message(BaseMessage):
message.publish('my_queue')
:param Channel channel: AMQPStorm Channel
:param bool auto_decode: Auto-decode strings when possible. Does not
apply to to_dict, or to_tuple.
:param bytes,str,unicode body: Message payload
:param dict method: Message method
:param dict properties: Message properties
:param bool auto_decode: Auto-decode strings when possible. Does not
apply to to_dict, or to_tuple.
"""
__slots__ = [
'_auto_decode', '_decode_cache'
'_decode_cache'
]

def __init__(self, channel, auto_decode=True, **message):
super(Message, self).__init__(channel, **message)
def __init__(self, channel, body=None, method=None, properties=None,
auto_decode=True):
super(Message, self).__init__(
channel, body, method, properties, auto_decode
)
self._decode_cache = dict()
self._auto_decode = auto_decode

@staticmethod
def create(channel, body, properties=None):
Expand Down
45 changes: 45 additions & 0 deletions amqpstorm/tests/functional/generic_tests.py
Expand Up @@ -4,6 +4,7 @@
from amqpstorm import AMQPMessageError
from amqpstorm import Channel
from amqpstorm import Message
from amqpstorm.base import BaseMessage
from amqpstorm.tests.functional.utility import TestFunctionalFramework
from amqpstorm.tests.functional.utility import setup

Expand Down Expand Up @@ -271,6 +272,34 @@ def test_functional_generator_consume(self):
# Make sure all five messages were downloaded.
self.assertEqual(len(inbound_messages), 5)

@setup(queue=True)
def test_functional_consume_with_custom_message_impl(self):
self.channel.queue.declare(self.queue_name)
self.channel.confirm_deliveries()
for _ in range(5):
self.channel.basic.publish(body=self.message,
routing_key=self.queue_name)
self.channel.basic.consume(queue=self.queue_name,
no_ack=True)
# Sleep for 0.01s to make sure RabbitMQ has time to catch up.
time.sleep(0.01)

# Store and inbound messages.
inbound_messages = []

class CustomMessage(BaseMessage):
pass

for message in self.channel.build_inbound_messages(
break_on_empty=True,
message_impl=CustomMessage):
self.assertIsInstance(message, CustomMessage)
self.assertEqual(message._body, self.message.encode('utf-8'))
inbound_messages.append(message)

# Make sure all five messages were downloaded.
self.assertEqual(len(inbound_messages), 5)

@setup(queue=True)
def test_functional_consume_and_redeliver(self):
self.channel.queue.declare(self.queue_name)
Expand Down Expand Up @@ -385,6 +414,22 @@ def test_functional_publish_and_get(self):
self.assertIsInstance(message.body, bytes)
self.assertEqual(message.body, self.message.encode('utf-8'))

@setup(queue=True)
def test_functional_publish_and_get_with_custom_message_impl(self):
self.channel.queue.declare(self.queue_name)
self.channel.confirm_deliveries()
self.channel.basic.publish(body=self.message,
routing_key=self.queue_name)

class CustomMessage(BaseMessage):
pass

message = self.channel.basic.get(self.queue_name, no_ack=True,
auto_decode=False,
message_impl=CustomMessage)
self.assertIsInstance(message, CustomMessage)
self.assertEqual(message._body, self.message.encode('utf-8'))

@setup(queue=True)
def test_functional_publish_and_get_auto_decode(self):
self.channel.queue.declare(self.queue_name)
Expand Down

0 comments on commit c173dfd

Please sign in to comment.