Skip to content

Commit

Permalink
Update heartbeat interval implementation [#127]
Browse files Browse the repository at this point in the history
  • Loading branch information
eandersson committed Jan 21, 2024
1 parent 00cdac1 commit 6ea6807
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 17 deletions.
6 changes: 3 additions & 3 deletions amqpstorm/connection.py
Expand Up @@ -22,7 +22,7 @@

LOGGER = logging.getLogger(__name__)

DEFAULT_HEARTBEAT_INTERVAL = 60
DEFAULT_HEARTBEAT_TIMEOUT = 60
DEFAULT_SOCKET_TIMEOUT = 10
DEFAULT_VIRTUAL_HOST = '/'

Expand Down Expand Up @@ -57,7 +57,7 @@ class Connection(Stateful):
:param str password: Password
:param int port: Server port
:param str virtual_host: Virtual host
:param int heartbeat: RabbitMQ Heartbeat interval
:param int heartbeat: RabbitMQ Heartbeat timeout
:param int,float timeout: Socket timeout
:param bool ssl: Enable SSL
:param dict ssl_options: SSL kwargs
Expand All @@ -80,7 +80,7 @@ def __init__(self, hostname, username, password, port=5672, **kwargs):
'password': password,
'port': port,
'virtual_host': kwargs.get('virtual_host', DEFAULT_VIRTUAL_HOST),
'heartbeat': kwargs.get('heartbeat', DEFAULT_HEARTBEAT_INTERVAL),
'heartbeat': kwargs.get('heartbeat', DEFAULT_HEARTBEAT_TIMEOUT),
'timeout': kwargs.get('timeout', DEFAULT_SOCKET_TIMEOUT),
'ssl': kwargs.get('ssl', False),
'ssl_options': kwargs.get('ssl_options', {}),
Expand Down
4 changes: 2 additions & 2 deletions amqpstorm/heartbeat.py
Expand Up @@ -11,7 +11,7 @@
class Heartbeat(object):
"""Internal Heartbeat handler."""

def __init__(self, interval, send_heartbeat_impl, timer=threading.Timer):
def __init__(self, timeout, send_heartbeat_impl, timer=threading.Timer):
self.send_heartbeat_impl = send_heartbeat_impl
self.timer_impl = timer
self._lock = threading.Lock()
Expand All @@ -20,7 +20,7 @@ def __init__(self, interval, send_heartbeat_impl, timer=threading.Timer):
self._exceptions = None
self._reads_since_check = 0
self._writes_since_check = 0
self._interval = interval
self._interval = None if timeout is None else max(timeout / 2, 0)
self._threshold = 0

def register_read(self):
Expand Down
16 changes: 8 additions & 8 deletions amqpstorm/tests/unit/test_heartbeat.py
Expand Up @@ -43,15 +43,15 @@ def test_heartbeat_stop(self):
self.assertFalse(heartbeat._running.is_set())
self.assertIsNone(heartbeat._timer)

def test_heartbeat_interval(self):
def test_heartbeat_timeout(self):
heartbeat = Heartbeat(60, fake_function)

self.assertEqual(heartbeat._interval, 60)
self.assertEqual(heartbeat._interval, 30)
self.assertEqual(heartbeat._threshold, 0)

heartbeat = Heartbeat(360, fake_function)

self.assertEqual(heartbeat._interval, 360)
self.assertEqual(heartbeat._interval, 180)
self.assertEqual(heartbeat._threshold, 0)

def test_heartbeat_no_interval(self):
Expand Down Expand Up @@ -229,15 +229,15 @@ def test_heartbeat_raise_exception(self):

self.assertRaisesRegex(
AMQPConnectionError,
'Connection dead, no heartbeat or data received in >= 120s',
'Connection dead, no heartbeat or data received in >= 60s',
heartbeat._raise_or_append_exception
)

heartbeat = Heartbeat(120, None)

self.assertRaisesRegex(
AMQPConnectionError,
'Connection dead, no heartbeat or data received in >= 240',
'Connection dead, no heartbeat or data received in >= 120s',
heartbeat._raise_or_append_exception
)

Expand All @@ -252,14 +252,14 @@ def check(exception):

self.assertRaisesRegex(
AMQPConnectionError,
'Connection dead, no heartbeat or data received in >= 120s',
'Connection dead, no heartbeat or data received in >= 60s',
check, heartbeat._exceptions
)

heartbeat._interval = 120
heartbeat._interval = 120 // 2

self.assertRaisesRegex(
AMQPConnectionError,
'Connection dead, no heartbeat or data received in >= 240',
'Connection dead, no heartbeat or data received in >= 120s',
check, heartbeat._exceptions
)
4 changes: 2 additions & 2 deletions amqpstorm/tests/unit/uri_connection/test_uri_connection.py
@@ -1,7 +1,7 @@
import ssl

from amqpstorm import UriConnection
from amqpstorm.connection import DEFAULT_HEARTBEAT_INTERVAL
from amqpstorm.connection import DEFAULT_HEARTBEAT_TIMEOUT
from amqpstorm.connection import DEFAULT_SOCKET_TIMEOUT
from amqpstorm.connection import DEFAULT_VIRTUAL_HOST
from amqpstorm.tests.utility import TestFramework
Expand All @@ -20,7 +20,7 @@ def test_uri_default(self):
DEFAULT_VIRTUAL_HOST)
self.assertEqual(connection.parameters['port'], 5672)
self.assertEqual(connection.parameters['heartbeat'],
DEFAULT_HEARTBEAT_INTERVAL)
DEFAULT_HEARTBEAT_TIMEOUT)
self.assertEqual(connection.parameters['timeout'],
DEFAULT_SOCKET_TIMEOUT)
self.assertFalse(connection.parameters['ssl'])
Expand Down
4 changes: 2 additions & 2 deletions amqpstorm/uri_connection.py
Expand Up @@ -6,7 +6,7 @@
from amqpstorm.compatibility import ssl
from amqpstorm.compatibility import urlparse
from amqpstorm.connection import Connection
from amqpstorm.connection import DEFAULT_HEARTBEAT_INTERVAL
from amqpstorm.connection import DEFAULT_HEARTBEAT_TIMEOUT
from amqpstorm.connection import DEFAULT_SOCKET_TIMEOUT
from amqpstorm.connection import DEFAULT_VIRTUAL_HOST
from amqpstorm.exception import AMQPConnectionError
Expand Down Expand Up @@ -83,7 +83,7 @@ def _parse_uri_options(self, parsed_uri, use_ssl=False, ssl_options=None):
'ssl': use_ssl,
'virtual_host': vhost,
'heartbeat': int(kwargs.pop('heartbeat',
[DEFAULT_HEARTBEAT_INTERVAL])[0]),
[DEFAULT_HEARTBEAT_TIMEOUT])[0]),
'timeout': int(kwargs.pop('timeout',
[DEFAULT_SOCKET_TIMEOUT])[0])
}
Expand Down

0 comments on commit 6ea6807

Please sign in to comment.