Skip to content

Commit

Permalink
Merge pull request #24 from eandersson/bug/heartbeat
Browse files Browse the repository at this point in the history
Bug/heartbeat
  • Loading branch information
eandersson committed Jul 6, 2016
2 parents 35284ae + c4d9953 commit 6524e9a
Show file tree
Hide file tree
Showing 25 changed files with 392 additions and 216 deletions.
3 changes: 1 addition & 2 deletions .travis.yml
Expand Up @@ -13,8 +13,7 @@ install:
- if [[ $TRAVIS_PYTHON_VERSION == '2.6' ]]; then pip install unittest2; fi
- pip install -r requirements.txt
- pip install -r test-requirements.txt
- if [[ $TRAVIS_PYTHON_VERSION == '3.2' ]]; then pip install coverage==3.7.1; fi
script: nosetests -v -l DEBUG --logging-level=DEBUG --with-coverage --cover-package=amqpstorm
script: nosetests -v -l DEBUG --logging-level=DEBUG --with-coverage --cover-package=amqpstorm --with-timer --timer-top-n 25
after_success:
- bash <(curl -s https://codecov.io/bash)
services:
Expand Down
5 changes: 5 additions & 0 deletions CHANGELOG
@@ -1,5 +1,10 @@
# Changelog

### Version 1.4.0
- All classes are now slotted.
- New improved Heartbeat Monitor.
- If no data has been sent within the Heartbeat interval, the client will now send a Heartbeat to the server. - Thanks David Schneider.

### Version 1.3.4
- Dropped Python 3.2 Support.
- Fixed incorrect SSL warning when adding heartbeat or timeout to uri string (#18) - Thanks Adam Mills.
Expand Down
2 changes: 1 addition & 1 deletion amqpstorm/__init__.py
@@ -1,5 +1,5 @@
"""AMQP-Storm."""
__version__ = '1.3.4' # noqa
__version__ = '1.4.0' # noqa
__author__ = 'eandersson' # noqa

import logging
Expand Down
18 changes: 16 additions & 2 deletions amqpstorm/base.py
@@ -1,13 +1,20 @@
"""AMQP-Storm Base."""

import locale
import threading

AUTH_MECHANISM = 'PLAIN'
IDLE_WAIT = 0.01
FRAME_MAX = 131072
MAX_CHANNELS = 65535
LOCALE = locale.getdefaultlocale()[0] or 'en_US'


class Stateful(object):
"""Stateful"""
__slots__ = [
'_exceptions', '_lock', '_state'
]
CLOSED = 0
CLOSING = 1
OPENING = 2
Expand Down Expand Up @@ -81,6 +88,9 @@ def check_for_errors(self):

class BaseChannel(Stateful):
"""AMQP BaseChannel"""
__slots__ = [
'_channel_id', '_consumer_tags'
]

def __init__(self, channel_id):
super(BaseChannel, self).__init__()
Expand Down Expand Up @@ -129,7 +139,9 @@ def remove_consumer_tag(self, tag=None):

class BaseMessage(object):
"""AMQP BaseMessage"""
__slots__ = ['_body', '_channel', '_method', '_properties']
__slots__ = [
'_body', '_channel', '_method', '_properties'
]

def __init__(self, channel, **message):
"""
Expand Down Expand Up @@ -169,7 +181,9 @@ def to_tuple(self):

class Handler(object):
"""Operations Handler (e.g. Queue, Exchange)"""
__slots__ = ['_channel']
__slots__ = [
'_channel'
]

def __init__(self, channel):
self._channel = channel
1 change: 1 addition & 0 deletions amqpstorm/basic.py
Expand Up @@ -19,6 +19,7 @@

class Basic(Handler):
"""AMQP Channel.basic"""
__slots__ = []

def qos(self, prefetch_count=0, prefetch_size=0, global_=False):
"""Specify quality of service.
Expand Down
35 changes: 13 additions & 22 deletions amqpstorm/channel.py
Expand Up @@ -26,6 +26,10 @@

class Channel(BaseChannel):
"""Connection.channel"""
__slots__ = [
'confirming_deliveries', 'consumer_callback', 'rpc', '_basic',
'_connection', '_exchange', '_inbound', '_queue'
]

def __init__(self, channel_id, connection, rpc_timeout):
super(Channel, self).__init__(channel_id)
Expand All @@ -34,9 +38,9 @@ def __init__(self, channel_id, connection, rpc_timeout):
self.consumer_callback = None
self._inbound = []
self._connection = connection
self._basic = None
self._exchange = None
self._queue = None
self._basic = Basic(self)
self._exchange = Exchange(self)
self._queue = Queue(self)

def __enter__(self):
return self
Expand All @@ -58,23 +62,23 @@ def basic(self):
:rtype: Basic
"""
return self._lazy_load_handler('_basic', Basic)
return self._basic

@property
def exchange(self):
"""RabbitMQ Exchange Operations.
:rtype: Exchange
"""
return self._lazy_load_handler('_exchange', Exchange)
return self._exchange

@property
def queue(self):
"""RabbitMQ Queue Operations.
:rtype: Queue
"""
return self._lazy_load_handler('_queue', Queue)
return self._queue

def open(self):
"""Open Channel.
Expand Down Expand Up @@ -340,10 +344,10 @@ def _build_message(self):
with self.lock:
if len(self._inbound) < 2:
return None
result = self._build_message_headers()
if not result:
headers = self._build_message_headers()
if not headers:
return None
basic_deliver, content_header = result
basic_deliver, content_header = headers
body = self._build_message_body(content_header.body_size)

message = Message(channel=self,
Expand Down Expand Up @@ -387,16 +391,3 @@ def _build_message_body(self, body_size):
break
body += body_piece.value
return body

def _lazy_load_handler(self, name, handler_class):
"""Lazy load operations (e.g. Queue, Exchange)
:param name:
:param Handler handler_class: Handler (e.g. Queue)
:return:
"""
handler = getattr(self, name)
if not handler:
handler = handler_class(self)
setattr(self, name, handler)
return handler
21 changes: 14 additions & 7 deletions amqpstorm/channel0.py
@@ -1,6 +1,5 @@
"""AMQP-Storm Connection.Channel0."""

import locale
import logging
import platform

Expand All @@ -9,15 +8,16 @@
from pamqp.specification import Connection as pamqp_connection

from amqpstorm import __version__
from amqpstorm.base import AUTH_MECHANISM
from amqpstorm.base import FRAME_MAX
from amqpstorm.base import LOCALE
from amqpstorm.base import MAX_CHANNELS
from amqpstorm.base import Stateful
from amqpstorm.exception import AMQPConnectionError
from amqpstorm.compatibility import try_utf8_decode

AUTH_MECHANISM = 'PLAIN'
LOCALE = locale.getdefaultlocale()[0] or 'en_US'

LOGGER = logging.getLogger(__name__)
MAX_CHANNELS = 65535


class Channel0(object):
Expand All @@ -39,8 +39,7 @@ def on_frame(self, frame_in):
"""
LOGGER.debug('Frame Received: %s', frame_in.name)
if frame_in.name == 'Heartbeat':
self._connection.heartbeat.register_heartbeat()
self._write_frame(Heartbeat())
self.send_heartbeat()
elif frame_in.name == 'Connection.Start':
self.server_properties = frame_in.server_properties
self._send_start_ok_frame(frame_in)
Expand All @@ -63,6 +62,13 @@ def on_frame(self, frame_in):
else:
LOGGER.error('[Channel0] Unhandled Frame: %s', frame_in.name)

def send_heartbeat(self):
"""Send Heartbeat frame.
:return:
"""
self._write_frame(Heartbeat())

def send_close_connection_frame(self):
"""Send Connection Close frame.
Expand Down Expand Up @@ -107,11 +113,12 @@ def _write_frame(self, frame_out):
:return:
"""
self._connection.write_frame(0, frame_out)
LOGGER.debug('Frame Sent: %s', frame_out.name)

def _send_start_ok_frame(self, frame_in):
"""Send Start OK frame.
:param pamqp_spec.Frame frame_out: Amqp frame.
:param pamqp_spec.Connection.StartOk frame_in: Amqp frame.
:return:
"""
if 'PLAIN' not in try_utf8_decode(frame_in.mechanisms):
Expand Down
17 changes: 12 additions & 5 deletions amqpstorm/connection.py
Expand Up @@ -25,6 +25,9 @@

class Connection(Stateful):
"""AMQP Connection"""
__slots__ = [
'heartbeat', 'parameters', '_channel0', '_channels', '_io'
]

def __init__(self, hostname, username, password, port=5672, **kwargs):
"""
Expand Down Expand Up @@ -56,10 +59,12 @@ def __init__(self, hostname, username, password, port=5672, **kwargs):
'ssl_options': kwargs.get('ssl_options', {})
}
self._validate_parameters()
self.heartbeat = Heartbeat(self.parameters['heartbeat'])
self._io = IO(self.parameters, on_read=self._read_buffer)
self._io = IO(self.parameters, exceptions=self._exceptions,
on_read=self._read_buffer)
self._channel0 = Channel0(self)
self._channels = {}
self.heartbeat = Heartbeat(self.parameters['heartbeat'],
self._channel0.send_heartbeat)
if not kwargs.get('lazy', False):
self.open()

Expand Down Expand Up @@ -116,7 +121,7 @@ def open(self):
LOGGER.debug('Connection Opening')
self.set_state(self.OPENING)
self._exceptions = []
self._io.open(self._exceptions)
self._io.open()
self._send_handshake()
self._wait_for_connection_to_open()
self.heartbeat.start(self._exceptions)
Expand Down Expand Up @@ -180,6 +185,7 @@ def write_frame(self, channel_id, frame_out):
:return:
"""
frame_data = pamqp_frame.marshal(frame_out, channel_id)
self.heartbeat.register_write()
self._io.write_to_socket(frame_data)

def write_frames(self, channel_id, multiple_frames):
Expand All @@ -192,6 +198,7 @@ def write_frames(self, channel_id, multiple_frames):
frame_data = EMPTY_BUFFER
for single_frame in multiple_frames:
frame_data += pamqp_frame.marshal(single_frame, channel_id)
self.heartbeat.register_write()
self._io.write_to_socket(frame_data)

def _validate_parameters(self):
Expand Down Expand Up @@ -247,7 +254,7 @@ def _read_buffer(self, buffer):
if frame_in is None:
break

self.heartbeat.register_beat()
self.heartbeat.register_read()
if channel_id == 0:
self._channel0.on_frame(frame_in)
else:
Expand All @@ -270,7 +277,7 @@ def _handle_amqp_frame(self, data_in):
pass
except pamqp_spec.AMQPFrameError as why:
LOGGER.error('AMQPFrameError: %r', why, exc_info=True)
except (UnicodeDecodeError, ValueError) as why:
except ValueError as why:
LOGGER.error(why, exc_info=True)
self.exceptions.append(AMQPConnectionError(why))
return data_in, None, None
Expand Down
1 change: 1 addition & 0 deletions amqpstorm/exchange.py
Expand Up @@ -13,6 +13,7 @@

class Exchange(Handler):
"""AMQP Channel.exchange"""
__slots__ = []

def declare(self, exchange='', exchange_type='direct', passive=False,
durable=False, auto_delete=False, arguments=None):
Expand Down

0 comments on commit 6524e9a

Please sign in to comment.