Skip to content

Commit

Permalink
Release 1.2.3
Browse files Browse the repository at this point in the history
  • Loading branch information
eandersson committed Aug 21, 2015
1 parent c000c2b commit 3caf61a
Show file tree
Hide file tree
Showing 29 changed files with 88 additions and 33 deletions.
1 change: 0 additions & 1 deletion .travis.yml
Expand Up @@ -6,7 +6,6 @@ python:
- 3.2
- 3.3
- 3.4
- "nightly"
- pypy
- pypy3
install:
Expand Down
8 changes: 4 additions & 4 deletions CHANGELOG
@@ -1,9 +1,9 @@
# Changelog

### Version 1.2.3-dev
- Added Client-side Heartbeat monitor.
- Added a Connection Timeout to Connection.open.
- Fixed potential bug in the Socket Poller Error handling.
### Version 1.2.3
- Added a Client-side Heartbeat monitor.
- Added Timeout to Connection.open.
- Fixed potential bug in the Socket Poller error handling.

### Version 1.2.2
- Added shortcuts for common properties in the Message class, e.g. message.app_id.
Expand Down
2 changes: 1 addition & 1 deletion amqpstorm/connection.py
Expand Up @@ -54,13 +54,13 @@ def __init__(self, hostname, username, password, port=5672, **kwargs):
'ssl': kwargs.get('ssl', False),
'ssl_options': kwargs.get('ssl_options', {})
}
self._validate_parameters()
self.io = IO(self.parameters,
on_read=self._read_buffer,
on_error=self._handle_socket_error)
self.heartbeat = Heartbeat(self.parameters['heartbeat'])
self._channel0 = Channel0(self)
self._channels = {}
self._validate_parameters()
if not kwargs.get('lazy', False):
self.open()

Expand Down
3 changes: 1 addition & 2 deletions amqpstorm/io.py
Expand Up @@ -66,10 +66,9 @@ def is_ready(self):


class IO(Stateful):
lock = threading.Lock()

def __init__(self, parameters, on_read=None, on_error=None):
super(IO, self).__init__()
self.lock = threading.Lock()
self.socket = None
self.poller = None
self.inbound_thread = None
Expand Down
2 changes: 1 addition & 1 deletion examples/__init__.py
Expand Up @@ -3,4 +3,4 @@
HOST = '127.0.0.1'
USERNAME = 'guest'
PASSWORD = 'guest'
URI = 'amqp://guest:guest@127.0.0.1:5672/%2F'
URI = 'amqp://guest:guest@127.0.0.1:5672/%2F'
1 change: 0 additions & 1 deletion examples/basic_error_handling.py
Expand Up @@ -9,7 +9,6 @@
from examples import USERNAME
from examples import PASSWORD


logging.basicConfig(level=logging.DEBUG)


Expand Down
1 change: 0 additions & 1 deletion examples/basic_get.py
Expand Up @@ -8,7 +8,6 @@
from examples import USERNAME
from examples import PASSWORD


logging.basicConfig(level=logging.DEBUG)

QUEUE_NAME = 'simple_queue'
Expand Down
1 change: 0 additions & 1 deletion examples/basic_publisher.py
Expand Up @@ -8,7 +8,6 @@
from examples import USERNAME
from examples import PASSWORD


logging.basicConfig(level=logging.DEBUG)


Expand Down
1 change: 0 additions & 1 deletion examples/basic_publisher_confirms.py
Expand Up @@ -9,7 +9,6 @@
from examples import USERNAME
from examples import PASSWORD


logging.basicConfig(level=logging.DEBUG)


Expand Down
2 changes: 1 addition & 1 deletion examples/basic_rpc_server.py
Expand Up @@ -9,7 +9,6 @@
from examples import USERNAME
from examples import PASSWORD


CONNECTION = amqpstorm.Connection(HOST, USERNAME, PASSWORD)
CHANNEL = CONNECTION.channel()
CHANNEL.queue.declare(queue='rpc_queue')
Expand Down Expand Up @@ -38,6 +37,7 @@ def on_request(body, channel, header, properties):
body=str(response))
channel.basic.ack(delivery_tag=header['delivery_tag'])


if __name__ == '__main__':
CHANNEL.basic.qos(prefetch_count=1)
CHANNEL.basic.consume(on_request, queue='rpc_queue')
Expand Down
1 change: 0 additions & 1 deletion examples/basic_ssl_publisher.py
Expand Up @@ -9,7 +9,6 @@
from examples import USERNAME
from examples import PASSWORD


logging.basicConfig(level=logging.DEBUG)


Expand Down
1 change: 0 additions & 1 deletion examples/basic_threaded_consumer.py
Expand Up @@ -27,7 +27,6 @@
from examples import USERNAME
from examples import PASSWORD


logging.basicConfig(level=logging.DEBUG)


Expand Down
1 change: 0 additions & 1 deletion examples/basic_threaded_consumer2.py
Expand Up @@ -25,7 +25,6 @@
from examples import USERNAME
from examples import PASSWORD


logging.basicConfig(level=logging.DEBUG)


Expand Down
2 changes: 0 additions & 2 deletions examples/basic_threaded_publisher.py
Expand Up @@ -10,7 +10,6 @@
from examples import USERNAME
from examples import PASSWORD


logging.basicConfig(level=logging.DEBUG)


Expand Down Expand Up @@ -54,4 +53,3 @@ def send_messages(connection):
time.sleep(1)

CONNECTION.close()

1 change: 0 additions & 1 deletion examples/flask_threaded_rpc_client.py
Expand Up @@ -13,7 +13,6 @@
from examples import USERNAME
from examples import PASSWORD


app = Flask(__name__)


Expand Down
1 change: 0 additions & 1 deletion examples/simple_generator_consumer.py
Expand Up @@ -8,7 +8,6 @@
from examples import USERNAME
from examples import PASSWORD


logging.basicConfig(level=logging.DEBUG)


Expand Down
1 change: 0 additions & 1 deletion examples/simple_publisher.py
Expand Up @@ -8,7 +8,6 @@
from examples import USERNAME
from examples import PASSWORD


logging.basicConfig(level=logging.DEBUG)


Expand Down
1 change: 1 addition & 0 deletions examples/simple_rpc_client.py
Expand Up @@ -64,6 +64,7 @@ def _on_response(self, message):
return
self.response = message.body


if __name__ == '__main__':
fibonacci_rpc = FibonacciRpcClient(HOST, USERNAME, PASSWORD)

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Expand Up @@ -6,7 +6,7 @@
"""

setup(name='AMQP-Storm',
version='1.2.2',
version='1.2.3',
description='Thread-safe Python AMQP Client Library based on pamqp.',
long_description=long_description,
author='Erik Olof Gunnar Andersson',
Expand Down
3 changes: 2 additions & 1 deletion test-requirements.txt
@@ -1,2 +1,3 @@
nose
mock
mock
coverage
2 changes: 1 addition & 1 deletion tests/__init__.py
Expand Up @@ -3,4 +3,4 @@
HOST = '127.0.0.1'
USERNAME = 'guest'
PASSWORD = 'guest'
URI = 'amqp://guest:guest@127.0.0.1:5672/%2F'
URI = 'amqp://guest:guest@127.0.0.1:5672/%2F'
1 change: 0 additions & 1 deletion tests/base_tests.py
Expand Up @@ -14,7 +14,6 @@
from tests.utility import FakeConnection
from tests.utility import TestPayload


logging.basicConfig(level=logging.DEBUG)


Expand Down
1 change: 0 additions & 1 deletion tests/basic_tests.py
Expand Up @@ -18,7 +18,6 @@

from tests.utility import FakeConnection


logging.basicConfig(level=logging.DEBUG)


Expand Down
1 change: 0 additions & 1 deletion tests/channel0_tests.py
Expand Up @@ -15,7 +15,6 @@

from tests.utility import FakeConnection


logging.basicConfig(level=logging.DEBUG)


Expand Down
1 change: 0 additions & 1 deletion tests/channel_tests.py
Expand Up @@ -18,7 +18,6 @@

from tests.utility import FakeConnection


logging.basicConfig(level=logging.DEBUG)


Expand Down
1 change: 0 additions & 1 deletion tests/compatiblity_tests.py
Expand Up @@ -10,7 +10,6 @@

from amqpstorm import compatibility


logging.basicConfig(level=logging.DEBUG)


Expand Down
65 changes: 64 additions & 1 deletion tests/connection_tests.py
Expand Up @@ -16,6 +16,8 @@
from amqpstorm import Connection
from amqpstorm.exception import *

from pamqp.body import ContentBody
from pamqp.specification import Basic as spec_basic

logging.basicConfig(level=logging.DEBUG)

Expand Down Expand Up @@ -57,22 +59,59 @@ def test_invalid_hostname(self):
def test_invalid_username(self):
self.assertRaises(AMQPInvalidArgument, Connection, '127.0.0.1',
2, 'guest', lazy=True)
self.assertRaises(AMQPInvalidArgument, Connection, '127.0.0.1',
None, 'guest', lazy=True)

def test_invalid_password(self):
self.assertRaises(AMQPInvalidArgument, Connection, '127.0.0.1',
'guest', 3, lazy=True)
self.assertRaises(AMQPInvalidArgument, Connection, '127.0.0.1',
'guest', None, lazy=True)

def test_invalid_virtual_host(self):
self.assertRaises(AMQPInvalidArgument, Connection, '127.0.0.1',
'guest', 'guest', virtual_host=4, lazy=True)
self.assertRaises(AMQPInvalidArgument, Connection, '127.0.0.1',
'guest', 'guest', virtual_host=None, lazy=True)

def test_invalid_port(self):
self.assertRaises(AMQPInvalidArgument, Connection, '127.0.0.1',
'guest', 'guest', port='', lazy=True)
self.assertRaises(AMQPInvalidArgument, Connection, '127.0.0.1',
'guest', 'guest', port=None, lazy=True)

def test_invalid_heartbeat(self):
self.assertRaises(AMQPInvalidArgument, Connection, '127.0.0.1',
'guest', 'guest', heartbeat='5', lazy=True)
self.assertRaises(AMQPInvalidArgument, Connection, '127.0.0.1',
'guest', 'guest', heartbeat=None, lazy=True)

def test_invalid_timeout(self):
self.assertRaises(AMQPInvalidArgument, Connection, '127.0.0.1',
'guest', 'guest', timeout='6', lazy=True)
self.assertRaises(AMQPInvalidArgument, Connection, '127.0.0.1',
'guest', 'guest', timeout=None, lazy=True)

def test_server_is_blocked_default_value(self):
connection = Connection('127.0.0.1', 'guest', 'guest', lazy=True)
self.assertEqual(connection.is_blocked, False)

def test_server_properties_default_value(self):
connection = Connection('127.0.0.1', 'guest', 'guest', lazy=True)
self.assertEqual(connection.server_properties, {})

def test_fileno_property(self):
connection = Connection('127.0.0.1', 'guest', 'guest', lazy=True)
connection.set_state(connection.OPENING)
io = IO(connection.parameters)
io.socket = MagicMock(name='socket', spec=socket.socket)
connection.io = io
io.socket.fileno.return_value = 5
self.assertEqual(connection.fileno, 5)

def test_fileno_none_when_connection_closed(self):
connection = Connection('127.0.0.1', 'guest', 'guest', lazy=True)
self.assertIsNone(connection.fileno)

def test_close_state(self):
connection = Connection('127.0.0.1', 'guest', 'guest', lazy=True)
Expand All @@ -84,6 +123,30 @@ def test_open_channel_on_closed_connection(self):
connection = Connection('127.0.0.1', 'guest', 'guest', lazy=True)
self.assertRaises(AMQPConnectionError, connection.channel)

def test_basic_read_buffer(self):
connection = Connection('127.0.0.1', 'guest', 'guest', lazy=True)
cancel_ok_frame = spec_basic.CancelOk().marshal()

self.assertEqual(connection._read_buffer(cancel_ok_frame), b'\x00')

def test_handle_read_buffer_none_returns_none(self):
connection = Connection('127.0.0.1', 'guest', 'guest', lazy=True)
self.assertIsNone(connection._read_buffer(None))

def test_basic_handle_amqp_frame(self):
connection = Connection('127.0.0.1', 'guest', 'guest', lazy=True)
cancel_ok_frame = spec_basic.CancelOk().marshal()

self.assertEqual(connection._handle_amqp_frame(cancel_ok_frame),
(b'\x00', None, None))

def test_handle_amqp_frame_none_returns_none(self):
connection = Connection('127.0.0.1', 'guest', 'guest', lazy=True)
result = connection._handle_amqp_frame('')
self.assertEqual(result[0], '')
self.assertIsNone(result[1])
self.assertIsNone(result[2])

def test_wait_for_connection(self):
connection = Connection('127.0.0.1', 'guest', 'guest', timeout=5,
lazy=True)
Expand All @@ -95,7 +158,7 @@ def test_wait_for_connection(self):
def func(conn):
conn.set_state(conn.OPEN)

threading.Timer(function=func, interval=1, args=(connection, )).start()
threading.Timer(function=func, interval=1, args=(connection,)).start()
connection._wait_for_connection_to_open()

def test_wait_for_connection_raises_on_timeout(self):
Expand Down
1 change: 0 additions & 1 deletion tests/functional_tests.py
Expand Up @@ -30,7 +30,6 @@ def setUp(self):
self.connection = Connection(HOST, USERNAME, PASSWORD, lazy=True)

def test_open_close_loop(self):

for _ in range(10):
self.connection.open()
self.channel = self.connection.channel()
Expand Down
12 changes: 11 additions & 1 deletion tests/heartbeat_tests.py
Expand Up @@ -9,12 +9,12 @@
import unittest

from amqpstorm.heartbeat import Heartbeat
from amqpstorm.exception import AMQPConnectionError

logging.basicConfig(level=logging.DEBUG)


class HeartbeatTests(unittest.TestCase):

def test_heartbeat_start(self):
heartbeat = Heartbeat(1)
heartbeat.start([])
Expand Down Expand Up @@ -48,3 +48,13 @@ def test_basic_raise_on_missed_heartbeats(self):
heartbeat.start(exceptions)
time.sleep(3)
self.assertGreater(len(exceptions), 0)

def test_basic_raise_when_check_for_life_when_exceptions_not_set(self):
heartbeat = Heartbeat(0.5)
heartbeat._beats_since_check = 0
heartbeat._last_heartbeat = time.time() - 100

# Normally the exception should be passed down to the list of
# exceptions in the connection, but if that list for some obscure
# reason is None, we should raise directly in _check_for_life_signs.
self.assertRaises(AMQPConnectionError, heartbeat._check_for_life_signs)

0 comments on commit 3caf61a

Please sign in to comment.