Skip to content

Commit

Permalink
Added support for python 3
Browse files Browse the repository at this point in the history
  • Loading branch information
steve authored and steveniemitz committed Jan 2, 2020
1 parent 651ec0e commit 66e1358
Show file tree
Hide file tree
Showing 41 changed files with 336 additions and 236 deletions.
1 change: 1 addition & 0 deletions .travis.yml
@@ -1,6 +1,7 @@
language: python
python:
- '2.7'
- '3.6'
install: pip install .
script: ./runtests.sh
deploy:
Expand Down
6 changes: 6 additions & 0 deletions README.md
Expand Up @@ -10,6 +10,12 @@ A protocol agnostic RPC client stack for python.
* Robust load balancing and error detection / recovery.
* Service discovery via ZooKeeper

## Installing

```bash
pip install scales-rpc
```

## Getting started
Getting started with scales is very simple. For example, lets use it to do an HTTP GET of www.google.com

Expand Down
File renamed without changes.
4 changes: 3 additions & 1 deletion scales/binary.py
Expand Up @@ -4,14 +4,15 @@
unpack,
Struct
)
import itertools


class Structs(object):
Byte = Struct('!B')
Int16 = Struct('!h')
Int32 = Struct('!i')
Int64 = Struct('!q')


class BinaryReader(object):
def __init__(self, buf):
self._buf = buf
Expand Down Expand Up @@ -41,6 +42,7 @@ def Unpack(self, fmt):
to_read = calcsize(fmt)
return unpack(fmt, self._buf.read(to_read))


class BinaryWriter(object):
def __init__(self, buf):
self._buf = buf
Expand Down
10 changes: 10 additions & 0 deletions scales/compat.py
@@ -0,0 +1,10 @@

try:
from cStringIO import StringIO as BytesIO
except ImportError:
from io import BytesIO

try:
Long = long
except NameError:
Long = int
25 changes: 16 additions & 9 deletions scales/core.py
Expand Up @@ -4,7 +4,7 @@
import functools
import inspect

from urlparse import urlparse, ParseResult
from six.moves.urllib.parse import urlparse, ParseResult

from .constants import (SinkProperties, SinkRole)
from .dispatch import MessageDispatcher
Expand Down Expand Up @@ -38,6 +38,13 @@ class ClientProxyBuilder(object):
"""
_PROXY_TYPE_CACHE = {}

@staticmethod
def _method_name(m):
if hasattr(m, 'func_name'):
return m.func_name
else:
return m.__name__

@staticmethod
def _BuildServiceProxy(Iface):
"""Build a proxy class that intercepts all user methods on [Iface]
Expand All @@ -46,24 +53,24 @@ def _BuildServiceProxy(Iface):
Args:
Iface - An interface to proxy
"""

def ProxyMethod(method_name, orig_method, async=False):
def ProxyMethod(method_name, orig_method, asynchronous=False):
@functools.wraps(orig_method)
def _ProxyMethod(self, *args, **kwargs):
ar = self._dispatcher.DispatchMethodCall(method_name, args, kwargs)
return ar if async else ar.get()
return ar if asynchronous else ar.get()
return _ProxyMethod

is_user_method = lambda m: (inspect.ismethod(m)
and not inspect.isbuiltin(m)
and not m.func_name.startswith('__')
and not m.func_name.endswith('__'))
def is_user_method(m):
return ((inspect.ismethod(m) or inspect.isfunction(m))
and not inspect.isbuiltin(m)
and not ClientProxyBuilder._method_name(m).startswith('__')
and not ClientProxyBuilder._method_name(m).endswith('__'))

# Get all methods defined on the interface.
iface_methods = { m[0]: ProxyMethod(*m)
for m in inspect.getmembers(Iface, is_user_method) }
iface_methods.pop('__init__', None)
iface_methods.update({ m[0] + "_async": ProxyMethod(*m, async=True)
iface_methods.update({ m[0] + "_async": ProxyMethod(*m, asynchronous=True)
for m in inspect.getmembers(Iface, is_user_method) })

# Create a proxy class to intercept the interface's methods.
Expand Down
14 changes: 11 additions & 3 deletions scales/dispatch.py
Expand Up @@ -4,7 +4,7 @@

import gevent

from .async import AsyncResult
from .asynchronous import AsyncResult
from .constants import MessageProperties, SinkProperties
from .message import (
Deadline,
Expand All @@ -23,13 +23,20 @@
VarzBase
)

class InternalError(Exception): pass

class InternalError(Exception):
pass


class ScalesError(Exception):
def __init__(self, ex, msg):
self.inner_exception = ex
super(ScalesError, self).__init__(msg)

class ServiceClosedError(Exception): pass

class ServiceClosedError(Exception):
pass


class _AsyncResponseSink(ClientMessageSink):
@staticmethod
Expand Down Expand Up @@ -94,6 +101,7 @@ def AsyncProcessResponse(self, sink_stack, context, stream, msg):
ar.set_exception(InternalError('Unknown response message of type %s'
% msg.__class__))


class MessageDispatcher(ClientMessageSink):
"""Handles dispatching incoming and outgoing messages to a client sink stack."""

Expand Down
2 changes: 1 addition & 1 deletion scales/http/sink.py
Expand Up @@ -5,7 +5,7 @@
import requests
from requests import exceptions

from ..async import AsyncResult
from ..asynchronous import AsyncResult
from ..constants import ChannelState, SinkProperties
from ..sink import (ClientMessageSink, SinkProvider)
from ..message import (Deadline, MethodReturnMessage, TimeoutError)
Expand Down
2 changes: 2 additions & 0 deletions scales/kafka/builder.py
Expand Up @@ -8,10 +8,12 @@
from ..loadbalancer import HeapBalancerSink
from ..resurrector import ResurrectorSink


class _KafkaIface(object):
def Put(self, topic, payloads=[], acks=1):
pass


class Kafka(object):
@staticmethod
def _get_sink_key(properties):
Expand Down
6 changes: 2 additions & 4 deletions scales/kafka/protocol.py
@@ -1,5 +1,3 @@
from cStringIO import StringIO

from struct import pack, Struct
from collections import namedtuple
from ..binary import (
Expand Down Expand Up @@ -87,7 +85,7 @@ class NoBrokerForTopicException(Exception): pass

class KafkaProtocol(object):
MSG_STRUCT = Struct('!BBii')
MSG_HEADER = Struct('!qii')
MSG_HEADER = Struct('!qiI')
PRODUCE_HEADER = Struct('!hii')

def DeserializeMessage(self, buf, msg_type):
Expand Down Expand Up @@ -173,7 +171,7 @@ def _SerializeProduceRequest(self, msg, buf, headers):
crc = zlib.crc32(p, crc)

# Write the header
writer.WriteStruct(self.MSG_HEADER, 0, len(header) + len(p) + 4, crc)
writer.WriteStruct(self.MSG_HEADER, 0, len(header) + len(p) + 4, crc & 0xffffffff)
# Write the message data
writer.WriteRaw(header)
writer.WriteRaw(p)
Expand Down
6 changes: 3 additions & 3 deletions scales/kafka/sink.py
@@ -1,8 +1,8 @@
from collections import namedtuple
from cStringIO import StringIO
from struct import (pack, unpack)
import time

from ..compat import BytesIO
from ..dispatch import MessageDispatcher
from ..loadbalancer.serverset import StaticServerSetProvider
from ..loadbalancer.zookeeper import Member
Expand Down Expand Up @@ -62,7 +62,7 @@ def _BuildHeader(self, tag, msg_type, data_len):
return header

def _ProcessReply(self, stream):
tag, = unpack('!i', str(stream.read(4)))
tag, = unpack('!i', stream.read(4))
self._ProcessTaggedReply(tag, stream)


Expand Down Expand Up @@ -262,7 +262,7 @@ def __init__(self, next_provider, sink_properties, global_properties):
self.next_sink = next_provider.CreateSink(global_properties)

def AsyncProcessRequest(self, sink_stack, msg, stream, headers):
buf = StringIO()
buf = BytesIO()
headers = {}

try:
Expand Down
3 changes: 2 additions & 1 deletion scales/loadbalancer/aperture.py
Expand Up @@ -13,7 +13,7 @@
import random

from .heap import HeapBalancerSink
from ..async import AsyncResult
from ..asynchronous import AsyncResult
from ..constants import (ChannelState, SinkProperties, SinkRole)
from ..sink import SinkProvider
from ..timer_queue import LOW_RESOLUTION_TIMER_QUEUE, LOW_RESOLUTION_TIME_SOURCE
Expand Down Expand Up @@ -230,6 +230,7 @@ def _AdjustAperture(self, amount):
elif aperture_load <= self._min_load and aperture_size > self._min_size:
self._ContractAperture()


ApertureBalancerSink.Builder = SinkProvider(
ApertureBalancerSink,
SinkRole.LoadBalancer,
Expand Down
2 changes: 1 addition & 1 deletion scales/loadbalancer/base.py
Expand Up @@ -10,7 +10,7 @@
import gevent
from gevent.event import Event

from ..async import AsyncResult
from ..asynchronous import AsyncResult
from ..constants import (
ChannelState,
SinkProperties,
Expand Down
8 changes: 4 additions & 4 deletions scales/loadbalancer/heap.py
Expand Up @@ -23,7 +23,7 @@
LoadBalancerSink,
NoMembersError
)
from ..async import AsyncResult
from ..asynchronous import AsyncResult
from ..constants import (
ChannelState,
Int,
Expand Down Expand Up @@ -68,9 +68,9 @@ def FixUp(heap, i):
i - The index to start at.
"""
while True:
if i != 1 and heap[i] < heap[i/2]:
Heap.Swap(heap, i, i/2)
i /= 2 # FixUp(heap, i/2)
if i != 1 and heap[i] < heap[i//2]:
Heap.Swap(heap, i, i//2)
i //= 2 # FixUp(heap, i/2)
else:
break

Expand Down
3 changes: 2 additions & 1 deletion scales/loadbalancer/serverset.py
@@ -1,4 +1,5 @@
from abc import (ABCMeta, abstractmethod)
from six import string_types

class ServerSetProvider(ABCMeta('ABCMeta', (object,), {})):
"""Base class for providing a set of servers, as well as optionally
Expand Down Expand Up @@ -84,7 +85,7 @@ def __init__(self,
in the znode.
"""
self._zk_client = None
if isinstance(zk_servers_or_client, basestring):
if isinstance(zk_servers_or_client, string_types):
self._zk_client = self._GetZooKeeperClient(zk_servers_or_client, zk_timeout)
self._owns_zk_client = True
else:
Expand Down
13 changes: 6 additions & 7 deletions scales/message.py
Expand Up @@ -3,6 +3,8 @@
import sys
import traceback

from .compat import Long

class Deadline(object):
KEY = "__Deadline"
EVENT_KEY = "__Deadline_Event"
Expand All @@ -13,8 +15,8 @@ def __init__(self, timeout):
timeout - The timeout in seconds
"""
import time
self._ts = long(time.time()) * 1000000000 # Nanoseconds
self._timeout = long(timeout * 1000000000)
self._ts = Long(time.time()) * 1000000000 # Nanoseconds
self._timeout = Long(timeout * 1000000000)


class ClientError(Exception): pass
Expand Down Expand Up @@ -53,11 +55,10 @@ def public_properties(self):
"""Returns:
A dict of properties intended to be transported to the server
with the method call."""
return { k: v for k,v in self.properties.iteritems()
return { k: v for k, v in self.properties.items()
if not k.startswith('__') }



class MethodCallMessage(Message):
"""A MethodCallMessage represents a method being invoked on a service."""
__slots__ = ('service', 'method', 'args', 'kwargs')
Expand Down Expand Up @@ -126,9 +127,7 @@ def __init__(self, return_value=None, error=None):
frame = tb.tb_frame

stack = traceback.format_list(traceback.extract_stack(frame))
error_module = getattr(error, '__module__', '<builtin>')
error_name = '%s.%s' % (error_module, error.__class__.__name__)
stack = stack + traceback.format_exception_only(error_name, error.message)
stack = stack + traceback.format_exception_only(error.__class__, error)
self.stack = stack
# Prevent circular references
del frame
Expand Down
10 changes: 6 additions & 4 deletions scales/mux/sink.py
Expand Up @@ -3,15 +3,15 @@
import logging
import time
from struct import unpack
from cStringIO import StringIO

import gevent
from gevent.queue import Queue

from ..async import (
from ..asynchronous import (
AsyncResult,
NamedGreenlet
)
from ..compat import BytesIO
from ..constants import ChannelState
from ..message import (
Deadline,
Expand All @@ -35,6 +35,8 @@
ROOT_LOG = logging.getLogger('scales.mux')

class Tag(object):
__slots__ = ('_tag',)

KEY = "__Tag"

def __init__(self, tag):
Expand Down Expand Up @@ -300,10 +302,10 @@ def _RecvLoop(self):
"""
while self.isActive:
try:
sz, = unpack('!i', str(self._socket.readAll(4)))
sz, = unpack('!i', self._socket.readAll(4))
with self._varz.recv_time.Measure():
with self._varz.recv_latency.Measure():
buf = StringIO(self._socket.readAll(sz))
buf = BytesIO(self._socket.readAll(sz))
self._varz.messages_recv()
gevent.spawn(self._ProcessReply, buf)
except Exception as e:
Expand Down
2 changes: 1 addition & 1 deletion scales/pool/singleton.py
@@ -1,5 +1,5 @@
from .base import PoolSink
from ..async import AsyncResult
from ..asynchronous import AsyncResult
from ..constants import (ChannelState, SinkRole)
from ..sink import SinkProvider, SinkProperties

Expand Down
2 changes: 1 addition & 1 deletion scales/pool/watermark.py
Expand Up @@ -4,7 +4,7 @@
import gevent

from .base import PoolSink
from ..async import AsyncResult
from ..asynchronous import AsyncResult
from ..constants import (Int, ChannelState, SinkProperties, SinkRole)
from ..sink import (
ClientMessageSink,
Expand Down
2 changes: 1 addition & 1 deletion scales/redis/sink.py
Expand Up @@ -3,7 +3,7 @@
import gevent
import redis

from ..async import (
from ..asynchronous import (
AsyncResult,
NoopTimeout
)
Expand Down

0 comments on commit 66e1358

Please sign in to comment.