Skip to content

Commit

Permalink
Merge pull request #22 from eandersson/clean-up-pass
Browse files Browse the repository at this point in the history
Clean up pass
  • Loading branch information
eandersson committed Jun 12, 2016
2 parents 40c5842 + 0952288 commit 35284ae
Show file tree
Hide file tree
Showing 13 changed files with 322 additions and 269 deletions.
142 changes: 0 additions & 142 deletions amqpstorm/base.py
@@ -1,10 +1,6 @@
"""AMQP-Storm Base."""

import threading
import time
from uuid import uuid4

from amqpstorm.exception import AMQPChannelError

IDLE_WAIT = 0.01
FRAME_MAX = 131072
Expand Down Expand Up @@ -83,144 +79,6 @@ def check_for_errors(self):
pass


class Rpc(object):
"""AMQP Channel.rpc"""

def __init__(self, adapter, timeout=360):
"""
:param Stateful adapter: Connection or Channel.
:param int|float timeout: Rpc timeout.
"""
self._lock = threading.Lock()
self._adapter = adapter
self._timeout = timeout
self._response = {}
self._request = {}

@property
def lock(self):
return self._lock

def on_frame(self, frame_in):
"""On RPC Frame.
:param pamqp_spec.Frame frame_in: Amqp frame.
:return:
"""
if frame_in.name not in self._request:
return False

uuid = self._request[frame_in.name]
if self._response[uuid]:
self._response[uuid].append(frame_in)
else:
self._response[uuid] = [frame_in]
return True

def register_request(self, valid_responses):
"""Register a RPC request.
:param list valid_responses: List of possible Responses that
we should be waiting for.
:return:
"""
uuid = str(uuid4())
self._response[uuid] = []
for action in valid_responses:
self._request[action] = uuid
return uuid

def remove(self, uuid):
"""Remove any data related to a specific RPC request.
:param str uuid: Rpc Identifier.
:return:
"""
self.remove_request(uuid)
self.remove_response(uuid)

def remove_request(self, uuid):
"""Remove any RPC request(s) using this uuid.
:param str uuid: Rpc Identifier.
:return:
"""
for key in list(self._request):
if self._request[key] == uuid:
del self._request[key]

def remove_response(self, uuid):
"""Remove a RPC Response using this uuid.
:param str uuid: Rpc Identifier.
:return:
"""
if uuid in self._response:
del self._response[uuid]

def get_request(self, uuid, raw=False, multiple=False):
"""Get a RPC request.
:param str uuid: Rpc Identifier
:param bool raw: If enabled return the frame as is, else return
result as a dictionary.
:param bool multiple: Are we expecting multiple frames.
:return:
"""
if uuid not in self._response:
return
self._wait_for_request(uuid)
frame = self._get_response_frame(uuid)
if not multiple:
self.remove(uuid)
result = None
if raw:
result = frame
elif frame is not None:
result = dict(frame)
return result

def _get_response_frame(self, uuid):
"""Get a response frame.
:param str uuid: Rpc Identifier
:return:
"""
frame = None
frames = self._response.get(uuid, None)
if frames:
frame = frames.pop(0)
return frame

def _wait_for_request(self, uuid):
"""Wait for RPC request to arrive.
:param str uuid: Rpc Identifier.
:return:
"""
start_time = time.time()
while not self._response[uuid]:
self._adapter.check_for_errors()
if time.time() - start_time > self._timeout:
self._raise_rpc_timeout_error(uuid)
time.sleep(IDLE_WAIT)

def _raise_rpc_timeout_error(self, uuid):
"""Gather information and raise an Rpc exception.
:param str uuid: Rpc Identifier.
:return:
"""
requests = []
for key, value in self._request.items():
if value == uuid:
requests.append(key)
self.remove(uuid)
message = ('rpc requests %s (%s) took too long'
% (uuid, ', '.join(requests)))
raise AMQPChannelError(message)


class BaseChannel(Stateful):
"""AMQP BaseChannel"""

Expand Down
6 changes: 3 additions & 3 deletions amqpstorm/basic.py
Expand Up @@ -12,7 +12,6 @@
from amqpstorm.base import Handler
from amqpstorm.exception import AMQPChannelError
from amqpstorm.exception import AMQPInvalidArgument
from amqpstorm.exception import AMQPMessageError
from amqpstorm.message import Message

LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -400,8 +399,9 @@ def _create_content_body(body):
for offset in compatibility.RANGE(0, frames):
start_frame = FRAME_MAX * offset
end_frame = start_frame + FRAME_MAX
if end_frame > len(body):
end_frame = len(body)
body_len = len(body)
if end_frame > body_len:
end_frame = body_len
yield pamqp_body.ContentBody(body[start_frame:end_frame])

def _get_content_body(self, uuid_body, body_size):
Expand Down
2 changes: 1 addition & 1 deletion amqpstorm/channel.py
Expand Up @@ -9,7 +9,7 @@
from amqpstorm import compatibility
from amqpstorm.base import BaseChannel
from amqpstorm.base import IDLE_WAIT
from amqpstorm.base import Rpc
from amqpstorm.rpc import Rpc
from amqpstorm.basic import Basic
from amqpstorm.exception import AMQPChannelError
from amqpstorm.exception import AMQPConnectionError
Expand Down
1 change: 1 addition & 0 deletions amqpstorm/io.py
Expand Up @@ -52,6 +52,7 @@ def is_ready(self):

class IO(object):
"""AMQP Connection.io"""

def __init__(self, parameters, on_read=None):
self._inbound_thread = None
self._lock = threading.Lock()
Expand Down
146 changes: 146 additions & 0 deletions amqpstorm/rpc.py
@@ -0,0 +1,146 @@
"""AMQP-Storm Rpc."""

import threading
import time
from uuid import uuid4

from amqpstorm.base import IDLE_WAIT
from amqpstorm.exception import AMQPChannelError


class Rpc(object):
"""AMQP Channel.rpc"""

def __init__(self, adapter, timeout=360):
"""
:param Stateful adapter: Connection or Channel.
:param int|float timeout: Rpc timeout.
"""
self._lock = threading.Lock()
self._adapter = adapter
self._timeout = timeout
self._response = {}
self._request = {}

@property
def lock(self):
return self._lock

def on_frame(self, frame_in):
"""On RPC Frame.
:param pamqp_spec.Frame frame_in: Amqp frame.
:return:
"""
if frame_in.name not in self._request:
return False

uuid = self._request[frame_in.name]
if self._response[uuid]:
self._response[uuid].append(frame_in)
else:
self._response[uuid] = [frame_in]
return True

def register_request(self, valid_responses):
"""Register a RPC request.
:param list valid_responses: List of possible Responses that
we should be waiting for.
:return:
"""
uuid = str(uuid4())
self._response[uuid] = []
for action in valid_responses:
self._request[action] = uuid
return uuid

def remove(self, uuid):
"""Remove any data related to a specific RPC request.
:param str uuid: Rpc Identifier.
:return:
"""
self.remove_request(uuid)
self.remove_response(uuid)

def remove_request(self, uuid):
"""Remove any RPC request(s) using this uuid.
:param str uuid: Rpc Identifier.
:return:
"""
for key in list(self._request):
if self._request[key] == uuid:
del self._request[key]

def remove_response(self, uuid):
"""Remove a RPC Response using this uuid.
:param str uuid: Rpc Identifier.
:return:
"""
if uuid in self._response:
del self._response[uuid]

def get_request(self, uuid, raw=False, multiple=False):
"""Get a RPC request.
:param str uuid: Rpc Identifier
:param bool raw: If enabled return the frame as is, else return
result as a dictionary.
:param bool multiple: Are we expecting multiple frames.
:return:
"""
if uuid not in self._response:
return
self._wait_for_request(uuid)
frame = self._get_response_frame(uuid)
if not multiple:
self.remove(uuid)
result = None
if raw:
result = frame
elif frame is not None:
result = dict(frame)
return result

def _get_response_frame(self, uuid):
"""Get a response frame.
:param str uuid: Rpc Identifier
:return:
"""
frame = None
frames = self._response.get(uuid, None)
if frames:
frame = frames.pop(0)
return frame

def _wait_for_request(self, uuid):
"""Wait for RPC request to arrive.
:param str uuid: Rpc Identifier.
:return:
"""
start_time = time.time()
while not self._response[uuid]:
self._adapter.check_for_errors()
if time.time() - start_time > self._timeout:
self._raise_rpc_timeout_error(uuid)
time.sleep(IDLE_WAIT)

def _raise_rpc_timeout_error(self, uuid):
"""Gather information and raise an Rpc exception.
:param str uuid: Rpc Identifier.
:return:
"""
requests = []
for key, value in self._request.items():
if value == uuid:
requests.append(key)
self.remove(uuid)
message = ('rpc requests %s (%s) took too long'
% (uuid, ', '.join(requests)))
raise AMQPChannelError(message)

0 comments on commit 35284ae

Please sign in to comment.