Skip to content

Commit

Permalink
Added delivery_tag property to Message
Browse files Browse the repository at this point in the history
  • Loading branch information
eandersson committed Nov 8, 2017
1 parent 5488b83 commit f36f6e5
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 41 deletions.
20 changes: 15 additions & 5 deletions amqpstorm/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def ack(self):
raise AMQPMessageError(
'Message.ack only available on incoming messages'
)
self._channel.basic.ack(delivery_tag=self._method['delivery_tag'])
self._channel.basic.ack(delivery_tag=self.delivery_tag)

def nack(self, requeue=True):
"""Negative Acknowledgement.
Expand All @@ -126,7 +126,7 @@ def nack(self, requeue=True):
raise AMQPMessageError(
'Message.nack only available on incoming messages'
)
self._channel.basic.nack(delivery_tag=self._method['delivery_tag'],
self._channel.basic.nack(delivery_tag=self.delivery_tag,
requeue=requeue)

def reject(self, requeue=True):
Expand All @@ -143,7 +143,7 @@ def reject(self, requeue=True):
raise AMQPMessageError(
'Message.reject only available on incoming messages'
)
self._channel.basic.reject(delivery_tag=self._method['delivery_tag'],
self._channel.basic.reject(delivery_tag=self.delivery_tag,
requeue=requeue)

def publish(self, routing_key, exchange='', mandatory=False,
Expand Down Expand Up @@ -316,14 +316,24 @@ def reply_to(self, value):
@property
def redelivered(self):
"""Indicates if this message may have been delivered before (but not
acknowledged)"
acknowledged).
:rtype: bool or None
:rtype: bool|None
"""
if not self._method:
return None
return self._method.get('redelivered')

@property
def delivery_tag(self):
"""Server-assigned delivery tag.
:rtype: int|None
"""
if not self._method:
return None
return self._method.get('delivery_tag')

def json(self):
"""Deserialize the message body, if it is JSON.
Expand Down
75 changes: 39 additions & 36 deletions amqpstorm/tests/functional/basic_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ class BasicFunctionalTests(TestFunctionalFramework):
@setup()
def test_functional_basic_qos(self):
result = self.channel.basic.qos(prefetch_count=100)
self.assertEqual(result, {})
self.assertFalse(result)

@setup(queue=True)
def test_functional_basic_get(self):
Expand Down Expand Up @@ -37,34 +37,36 @@ def test_functional_basic_recover(self):
self.channel.queue.declare(self.queue_name)
self.channel.basic.publish(self.message, self.queue_name)

self.assertEqual(self.channel.basic.recover(requeue=True), {})
self.assertFalse(self.channel.basic.recover(requeue=True))

@setup(queue=True)
def test_functional_basic_ack(self):
self.channel.queue.declare(self.queue_name)
self.channel.basic.publish(self.message, self.queue_name)

message = self.channel.basic.get(self.queue_name, to_dict=True)

result = self.channel.basic.ack(
delivery_tag=message['method']['delivery_tag'])
message = self.channel.basic.get(self.queue_name)
result = self.channel.basic.ack(delivery_tag=message.delivery_tag)

self.assertEqual(result, None)
self.assertIsNone(result)

# Make sure the message wasn't requeued.
self.assertFalse(self.channel.basic.get(self.queue_name, to_dict=True))
self.assertFalse(self.channel.basic.get(self.queue_name))

@setup(queue=True)
def test_functional_basic_ack_multiple(self):
message = None
self.channel.queue.declare(self.queue_name)

for _ in range(5):
self.channel.basic.publish(self.message, self.queue_name)

for _ in range(5):
message = self.channel.basic.get(self.queue_name, to_dict=True)
message = self.channel.basic.get(self.queue_name)

self.assertIsNotNone(message)

self.channel.basic.ack(
delivery_tag=message['method']['delivery_tag'],
delivery_tag=message.delivery_tag,
multiple=True
)

Expand All @@ -74,35 +76,38 @@ def test_functional_basic_ack_multiple(self):
self.channel.open()

# Make sure the message wasn't requeued.
self.assertFalse(self.channel.basic.get(self.queue_name, to_dict=True))
self.assertFalse(self.channel.basic.get(self.queue_name))

@setup(queue=True)
def test_functional_basic_nack(self):
self.channel.queue.declare(self.queue_name)
self.channel.basic.publish(self.message, self.queue_name)

message = self.channel.basic.get(self.queue_name, to_dict=True)

message = self.channel.basic.get(self.queue_name)
result = self.channel.basic.nack(
requeue=False,
delivery_tag=message['method']['delivery_tag'])
delivery_tag=message.delivery_tag
)

self.assertEqual(result, None)
self.assertIsNone(result)

# Make sure the message wasn't requeued.
self.assertFalse(self.channel.basic.get(self.queue_name, to_dict=True))
self.assertFalse(self.channel.basic.get(self.queue_name))

@setup(queue=True)
def test_functional_basic_nack_multiple(self):
message = None
self.channel.queue.declare(self.queue_name)

for _ in range(5):
self.channel.basic.publish(self.message, self.queue_name)

for _ in range(5):
message = self.channel.basic.get(self.queue_name, to_dict=True)
message = self.channel.basic.get(self.queue_name)

self.assertIsNotNone(message)
self.channel.basic.nack(
delivery_tag=message['method']['delivery_tag'],
delivery_tag=message.delivery_tag,
requeue=False,
multiple=True
)
Expand All @@ -113,54 +118,52 @@ def test_functional_basic_nack_multiple(self):
self.channel.open()

# Make sure the message wasn't requeued.
self.assertFalse(self.channel.basic.get(self.queue_name, to_dict=True))
self.assertFalse(self.channel.basic.get(self.queue_name))

@setup(queue=True)
def test_functional_basic_nack_requeue(self):
self.channel.queue.declare(self.queue_name)
self.channel.basic.publish(self.message, self.queue_name)

message = self.channel.basic.get(self.queue_name, to_dict=True)

message = self.channel.basic.get(self.queue_name)
result = self.channel.basic.nack(
requeue=True,
delivery_tag=message['method']['delivery_tag'])
delivery_tag=message.delivery_tag
)

self.assertEqual(result, None)
self.assertIsNone(result)

# Make sure the message was requeued.
self.assertIsInstance(self.channel.basic.get(self.queue_name,
to_dict=True), dict)
self.assertIsNotNone(self.channel.basic.get(self.queue_name))

@setup(queue=True)
def test_functional_basic_reject(self):
self.channel.queue.declare(self.queue_name)
self.channel.basic.publish(self.message, self.queue_name)

message = self.channel.basic.get(self.queue_name, to_dict=True)

message = self.channel.basic.get(self.queue_name)
result = self.channel.basic.reject(
requeue=False,
delivery_tag=message['method']['delivery_tag'])
delivery_tag=message.delivery_tag
)

self.assertEqual(result, None)
self.assertIsNone(result)

# Make sure the message wasn't requeued.
self.assertFalse(self.channel.basic.get(self.queue_name, to_dict=True))
self.assertFalse(self.channel.basic.get(self.queue_name))

@setup(queue=True)
def test_functional_basic_reject_requeue(self):
self.channel.queue.declare(self.queue_name)
self.channel.basic.publish(self.message, self.queue_name)

message = self.channel.basic.get(self.queue_name, to_dict=True)

message = self.channel.basic.get(self.queue_name)
result = self.channel.basic.reject(
requeue=True,
delivery_tag=message['method']['delivery_tag'])
delivery_tag=message.delivery_tag
)

self.assertEqual(result, None)
self.assertIsNone(result)

# Make sure the message was requeued.
self.assertIsInstance(self.channel.basic.get(self.queue_name,
to_dict=True), dict)
self.assertIsNotNone(self.channel.basic.get(self.queue_name))
38 changes: 38 additions & 0 deletions amqpstorm/tests/unit/message_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,15 @@ def test_message_not_redelivered(self):

self.assertFalse(message.redelivered)

def test_message_redelivered_is_none(self):
message = Message.create(body='',
channel=FakeChannel())
message._method = {
'redelivered': None
}

self.assertIsNone(message.redelivered)

def test_message_redelivered_and_method_none(self):
message = Message.create(None, '')
message._method = dict()
Expand All @@ -143,6 +152,35 @@ def test_message_redelivered_and_method_empty(self):

self.assertIsNone(message.redelivered)

def test_message_delivery_tag(self):
message = Message.create(body='',
channel=FakeChannel())
message._method = {
'delivery_tag': 5
}

self.assertEqual(message.delivery_tag, 5)

def test_message_delivery_tag_is_none(self):
message = Message.create(body='',
channel=FakeChannel())
message._method = {
'delivery_tag': None
}

self.assertIsNone(message.delivery_tag)

def test_message_delivery_tag_and_method_none(self):
message = Message.create(None, '')
message._method = dict()

self.assertIsNone(message.delivery_tag)

def test_message_rdelivery_tag_and_method_empty(self):
message = Message.create(None, '')

self.assertIsNone(message.delivery_tag)

def test_message_do_not_override_properties(self):
reply_to = self.message,
correlation_id = str(uuid.uuid4())
Expand Down

0 comments on commit f36f6e5

Please sign in to comment.