Skip to content

Commit

Permalink
Merge pull request #32 from eandersson/feature/refactor-unittest
Browse files Browse the repository at this point in the history
Re-factored unit-tests
  • Loading branch information
eandersson committed Nov 18, 2016
2 parents fa78ce1 + c6d4f28 commit 432a081
Show file tree
Hide file tree
Showing 71 changed files with 3,790 additions and 3,673 deletions.
4 changes: 4 additions & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[run]
branch = true
source = amqpstorm
omit = amqpstorm/tests/*
6 changes: 3 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,20 @@ language: python
python:
- 2.6
- 2.7
- 2.7.8
- 3.3
- 3.4
- 3.5
- 3.6-dev
- nightly
- pypy
- pypy3
- pypy-5.3.1
install:
- sudo rabbitmq-plugins enable rabbitmq_management
- sudo service rabbitmq-server restart
- if [[ $TRAVIS_PYTHON_VERSION == '2.6' ]]; then pip install unittest2; fi
- pip install -r requirements.txt
- pip install -r test-requirements.txt
script: nosetests -v -l DEBUG --logging-level=DEBUG --with-coverage --cover-package=amqpstorm --with-timer --timer-top-n 25
script: nosetests -v -l DEBUG --logging-level=DEBUG --with-coverage --cover-package=amqpstorm --with-timer --timer-top-n 10
before_script:
- if [[ $TRAVIS_PYTHON_VERSION != '2.6' ]]; then flake8 --ignore=F821 .; fi
after_success:
Expand Down
7 changes: 4 additions & 3 deletions amqpstorm/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,13 +250,14 @@ def process_data_events(self, to_tuple=False, auto_decode=True):
if not self.consumer_callback:
raise AMQPChannelError('no consumer_callback defined')
for message in self.build_inbound_messages(break_on_empty=True,
to_tuple=to_tuple,
auto_decode=auto_decode):
if not to_tuple:
if to_tuple:
# noinspection PyCallingNonCallable
self.consumer_callback(message)
self.consumer_callback(*message)
continue
# noinspection PyCallingNonCallable
self.consumer_callback(*message.to_tuple())
self.consumer_callback(message)
sleep(IDLE_WAIT)

def rpc_request(self, frame_out):
Expand Down
12 changes: 6 additions & 6 deletions amqpstorm/channel0.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,14 @@ def on_frame(self, frame_in):
self._set_connection_state(Stateful.OPEN)
elif frame_in.name == 'Connection.Start':
self.server_properties = frame_in.server_properties
self._send_start_ok_frame(frame_in)
self._send_start_ok(frame_in)
elif frame_in.name == 'Connection.Tune':
self._send_tune_ok_frame()
self._send_tune_ok()
self._send_open_connection()
else:
LOGGER.error('[Channel0] Unhandled Frame: %s', frame_in.name)

def send_close_connection_frame(self):
def send_close_connection(self):
"""Send Connection Close frame.
:return:
Expand Down Expand Up @@ -122,10 +122,10 @@ def _plain_credentials(self):
return '\0%s\0%s' % (self._parameters['username'],
self._parameters['password'])

def _send_start_ok_frame(self, frame_in):
def _send_start_ok(self, frame_in):
"""Send Start OK frame.
:param pamqp_spec.Connection.StartOk frame_in: Amqp frame.
:param pamqp_spec.Connection.Start frame_in: Amqp frame.
:return:
"""
if 'PLAIN' not in try_utf8_decode(frame_in.mechanisms):
Expand All @@ -143,7 +143,7 @@ def _send_start_ok_frame(self, frame_in):
locale=LOCALE)
self._write_frame(start_ok_frame)

def _send_tune_ok_frame(self):
def _send_tune_ok(self):
"""Send Tune OK frame.
:return:
Expand Down
3 changes: 1 addition & 2 deletions amqpstorm/compatibility.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,7 @@ def get_default_ssl_version():
SSL_VERSIONS['protocol_tlsv1_2'] = ssl.PROTOCOL_TLSv1_2
if hasattr(ssl, 'PROTOCOL_TLSv1_1'):
SSL_VERSIONS['protocol_tlsv1_1'] = ssl.PROTOCOL_TLSv1_1
if hasattr(ssl, 'PROTOCOL_TLSv1'):
SSL_VERSIONS['protocol_tlsv1'] = ssl.PROTOCOL_TLSv1
SSL_VERSIONS['protocol_tlsv1'] = ssl.PROTOCOL_TLSv1

SSL_CERT_MAP = {
'cert_none': ssl.CERT_NONE,
Expand Down
2 changes: 1 addition & 1 deletion amqpstorm/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ def close(self):
try:
self._close_remaining_channels()
if not self.is_closed and self.socket:
self._channel0.send_close_connection_frame()
self._channel0.send_close_connection()
self._wait_for_connection_state(state=Stateful.CLOSED)
except AMQPConnectionError:
pass
Expand Down
2 changes: 0 additions & 2 deletions amqpstorm/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,6 @@ def _create_socket(self, socket_family):
:rtype: socket.socket
"""
sock = socket.socket(socket_family, socket.SOCK_STREAM, 0)
if hasattr(socket, 'SOL_TCP'):
sock.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
sock.settimeout(self._parameters['timeout'] or None)
if self.use_ssl:
if not compatibility.SSL_SUPPORTED:
Expand Down
5 changes: 5 additions & 0 deletions amqpstorm/tests/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
HOST = '127.0.0.1'
USERNAME = 'guest'
PASSWORD = 'guest'
URI = 'amqp://guest:guest@127.0.0.1:5672/%2F'
HTTP_URL = 'http://127.0.0.1:15672'
5 changes: 0 additions & 5 deletions amqpstorm/tests/functional/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +0,0 @@
HOST = '127.0.0.1'
USERNAME = 'guest'
PASSWORD = 'guest'
URI = 'amqp://guest:guest@127.0.0.1:5672/%2F'
HTTP_URL = 'http://127.0.0.1:15672'
207 changes: 75 additions & 132 deletions amqpstorm/tests/functional/basic_tests.py
Original file line number Diff line number Diff line change
@@ -1,171 +1,114 @@
import logging
from amqpstorm.tests.utility import TestFunctionalFramework
from amqpstorm.tests.utility import setup

try:
import unittest2 as unittest
except ImportError:
import unittest

from amqpstorm import Connection
from amqpstorm.tests.functional import HOST
from amqpstorm.tests.functional import USERNAME
from amqpstorm.tests.functional import PASSWORD
from amqpstorm.tests.utility import MockLoggingHandler

logging.basicConfig(level=logging.DEBUG)

LOGGER = logging.getLogger(__name__)


class BasicFunctionalTests(unittest.TestCase):
def setUp(self):
self.logging_handler = MockLoggingHandler()
logging.root.addHandler(self.logging_handler)
self.connection = Connection(HOST, USERNAME, PASSWORD)
self.channel = self.connection.channel()

class BasicFunctionalTests(TestFunctionalFramework):
@setup()
def test_functional_basic_qos(self):
result = self.channel.basic.qos(prefetch_count=100)
self.assertEqual(result, {})

@setup(queue=True)
def test_functional_basic_get(self):
payload = 'hello world'
queue = 'test_functional_basic_get'
try:
self.channel.queue.declare(queue)
self.channel.basic.publish(payload, queue)

message = self.channel.basic.get(queue)
self.assertEqual(message.body, payload)
message.ack()
finally:
self.channel.queue.delete(queue)
self.channel.queue.declare(self.queue_name)
self.channel.basic.publish(self.message, self.queue_name)

message = self.channel.basic.get(self.queue_name)
self.assertEqual(message.body, self.message)
message.ack()

@setup(queue=True)
def test_functional_basic_cancel(self):
queue = 'test_functional_basic_cancel'
try:
self.channel.queue.declare(queue)
consumer_tag = self.channel.basic.consume(None, queue)
self.channel.queue.declare(self.queue_name)
consumer_tag = self.channel.basic.consume(None, self.queue_name)

result = self.channel.basic.cancel(consumer_tag)
self.assertEqual(result['consumer_tag'], consumer_tag)
finally:
self.channel.queue.delete(queue)
result = self.channel.basic.cancel(consumer_tag)
self.assertEqual(result['consumer_tag'], consumer_tag)

@setup(queue=True)
def test_functional_basic_recover(self):
payload = 'hello world'
queue = 'test_functional_basic_recover'
try:
self.channel.queue.declare(queue)
self.channel.basic.publish(payload, queue)
self.channel.queue.declare(self.queue_name)
self.channel.basic.publish(self.message, self.queue_name)

self.assertEqual(self.channel.basic.recover(requeue=True), {})
finally:
self.channel.queue.delete(queue)
self.assertEqual(self.channel.basic.recover(requeue=True), {})

@setup(queue=True)
def test_functional_basic_ack(self):
payload = 'hello world'
queue = 'test_functional_basic_ack'
try:
self.channel.queue.declare(queue)
self.channel.basic.publish(payload, queue)
self.channel.queue.declare(self.queue_name)
self.channel.basic.publish(self.message, self.queue_name)

message = self.channel.basic.get(queue, to_dict=True)
message = self.channel.basic.get(self.queue_name, to_dict=True)

result = self.channel.basic.ack(
delivery_tag=message['method']['delivery_tag'])
result = self.channel.basic.ack(
delivery_tag=message['method']['delivery_tag'])

self.assertEqual(result, None)
self.assertEqual(result, None)

# Make sure the message wasn't requeued.
self.assertFalse(self.channel.basic.get(queue, to_dict=True))
finally:
self.channel.queue.delete(queue)
# Make sure the message wasn't requeued.
self.assertFalse(self.channel.basic.get(self.queue_name, to_dict=True))

@setup(queue=True)
def test_functional_basic_nack(self):
payload = 'hello world'
queue = 'test_functional_basic_nack'
try:
self.channel.queue.declare(queue)
self.channel.basic.publish(payload, queue)
self.channel.queue.declare(self.queue_name)
self.channel.basic.publish(self.message, self.queue_name)

message = self.channel.basic.get(queue, to_dict=True)
message = self.channel.basic.get(self.queue_name, to_dict=True)

result = self.channel.basic.nack(
requeue=False,
delivery_tag=message['method']['delivery_tag'])
result = self.channel.basic.nack(
requeue=False,
delivery_tag=message['method']['delivery_tag'])

self.assertEqual(result, None)
self.assertEqual(result, None)

# Make sure the message wasn't requeued.
self.assertFalse(self.channel.basic.get(queue, to_dict=True))
finally:
self.channel.queue.delete(queue)
# Make sure the message wasn't requeued.
self.assertFalse(self.channel.basic.get(self.queue_name, to_dict=True))

@setup(queue=True)
def test_functional_basic_nack_requeue(self):
payload = 'hello world'
queue = 'test_functional_basic_nack_requeue'
try:
self.channel.queue.declare(queue)
self.channel.basic.publish(payload, queue)
self.channel.queue.declare(self.queue_name)
self.channel.basic.publish(self.message, self.queue_name)

message = self.channel.basic.get(queue, to_dict=True)
message = self.channel.basic.get(self.queue_name, to_dict=True)

result = self.channel.basic.nack(
requeue=True,
delivery_tag=message['method']['delivery_tag'])
result = self.channel.basic.nack(
requeue=True,
delivery_tag=message['method']['delivery_tag'])

self.assertEqual(result, None)
self.assertEqual(result, None)

# Make sure the message was requeued.
self.assertIsInstance(self.channel.basic.get(queue, to_dict=True),
dict)
finally:
self.channel.queue.delete(queue)
# Make sure the message was requeued.
self.assertIsInstance(self.channel.basic.get(self.queue_name,
to_dict=True), dict)

@setup(queue=True)
def test_functional_basic_reject(self):
payload = 'hello world'
queue = 'test_functional_basic_reject'
try:
self.channel.queue.declare(queue)
self.channel.basic.publish(payload, queue)
self.channel.queue.declare(self.queue_name)
self.channel.basic.publish(self.message, self.queue_name)

message = self.channel.basic.get(queue, to_dict=True)
message = self.channel.basic.get(self.queue_name, to_dict=True)

result = self.channel.basic.reject(
requeue=False,
delivery_tag=message['method']['delivery_tag'])
result = self.channel.basic.reject(
requeue=False,
delivery_tag=message['method']['delivery_tag'])

self.assertEqual(result, None)
self.assertEqual(result, None)

# Make sure the message wasn't requeued.
self.assertFalse(self.channel.basic.get(queue, to_dict=True))
finally:
self.channel.queue.delete(queue)
# Make sure the message wasn't requeued.
self.assertFalse(self.channel.basic.get(self.queue_name, to_dict=True))

@setup(queue=True)
def test_functional_basic_reject_requeue(self):
payload = 'hello world'
queue = 'test_functional_basic_reject'
try:
self.channel.queue.declare(queue)
self.channel.basic.publish(payload, queue)

message = self.channel.basic.get(queue, to_dict=True)

result = self.channel.basic.reject(
requeue=True,
delivery_tag=message['method']['delivery_tag'])

self.assertEqual(result, None)

# Make sure the message was requeued.
self.assertIsInstance(self.channel.basic.get(queue, to_dict=True),
dict)
finally:
self.channel.queue.delete(queue)

def tearDown(self):
self.channel.close()
self.connection.close()
self.assertFalse(self.logging_handler.messages['warning'])
self.assertFalse(self.logging_handler.messages['error'])
self.assertFalse(self.logging_handler.messages['critical'])
self.channel.queue.declare(self.queue_name)
self.channel.basic.publish(self.message, self.queue_name)

message = self.channel.basic.get(self.queue_name, to_dict=True)

result = self.channel.basic.reject(
requeue=True,
delivery_tag=message['method']['delivery_tag'])

self.assertEqual(result, None)

# Make sure the message was requeued.
self.assertIsInstance(self.channel.basic.get(self.queue_name,
to_dict=True), dict)

0 comments on commit 432a081

Please sign in to comment.