Skip to content

Commit

Permalink
feat: Implement RoutingPolicy (#5)
Browse files Browse the repository at this point in the history
* feat: Implement python retrying connection, which generically retries stream errors.

* fix: Add asynctest to tests_require.

* feat: Implement DefaultRouingPolicy

* fix: Add class comments.

* feat: Implement python retrying connection, which generically retries stream errors (#4)

* feat: Implement python retrying connection, which generically retries stream errors.

* fix: Add asynctest to tests_require.

* fix: Add class comments.

* feat: Implement DefaultRouingPolicy

* fix: Add class comments.

* docs: Add comment to DefaultRoutingPolicy.
  • Loading branch information
dpcollins-google committed Aug 10, 2020
1 parent 11c9a69 commit f72a2f0
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 0 deletions.
30 changes: 30 additions & 0 deletions google/cloud/pubsublite/internal/wire/default_routing_policy.py
@@ -0,0 +1,30 @@
import hashlib
import random

from google.cloud.pubsublite.internal.wire.routing_policy import RoutingPolicy
from google.cloud.pubsublite.partition import Partition
from google.cloud.pubsublite_v1.types import PubSubMessage


class DefaultRoutingPolicy(RoutingPolicy):
"""
The default routing policy which routes based on sha256 % num_partitions using the key if set or round robin if
unset.
"""
_num_partitions: int
_current_round_robin: Partition

def __init__(self, num_partitions: int):
self._num_partitions = num_partitions
self._current_round_robin = Partition(random.randint(0, num_partitions))

def route(self, message: PubSubMessage) -> Partition:
"""Route the message using the key if set or round robin if unset."""
if not message.key:
result = Partition(self._current_round_robin.value)
self._current_round_robin.value = (self._current_round_robin.value + 1) % self._num_partitions
return result
sha = hashlib.sha256()
sha.update(message.key)
as_int = int.from_bytes(sha.digest(), byteorder='big')
return Partition(as_int % self._num_partitions)
19 changes: 19 additions & 0 deletions google/cloud/pubsublite/internal/wire/routing_policy.py
@@ -0,0 +1,19 @@
from abc import ABC, abstractmethod

from google.cloud.pubsublite.partition import Partition
from google.cloud.pubsublite_v1.types.common import PubSubMessage


class RoutingPolicy(ABC):
"""A policy for how to route messages."""
@abstractmethod
def route(self, message: PubSubMessage) -> Partition:
"""
Route a message to a given partition.
Args:
message: The message to route
Returns: The partition to route to
"""
raise NotImplementedError()
5 changes: 5 additions & 0 deletions google/cloud/pubsublite/partition.py
@@ -0,0 +1,5 @@
from typing import NamedTuple


class Partition(NamedTuple):
value: int
23 changes: 23 additions & 0 deletions tests/unit/pubsublite/internal/wire/default_routing_policy_test.py
@@ -0,0 +1,23 @@
import json
import os

from google.cloud.pubsublite.partition import Partition

from google.cloud.pubsublite.internal.wire.default_routing_policy import DefaultRoutingPolicy
from google.cloud.pubsublite_v1 import PubSubMessage


def test_routing_cases():
policy = DefaultRoutingPolicy(num_partitions=29)
json_list = []
with open(os.path.join(os.path.dirname(__file__), "routing_tests.json")) as f:
for line in f:
if not line.startswith("//"):
json_list.append(line)

loaded = json.loads("\n".join(json_list))
target = {bytes(k, 'utf-8'): Partition(v) for k, v in loaded.items()}
result = {}
for key in target:
result[key] = policy.route(PubSubMessage(key=key))
assert result == target
20 changes: 20 additions & 0 deletions tests/unit/pubsublite/internal/wire/routing_tests.json
@@ -0,0 +1,20 @@
// File defining a map from routing keys to the expected partition in a 29
// partition topic for test purposes. This file should be copied into all client
// library test suites.
{
"oaisdhfoiahsd": 18,
"P(#*YNPOIUDF": 9,
"LCIUNDFPOASIUN":8,
";odsfiupoius": 9,
"OPISUDfpoiu": 2,
"dokjwO:IDf": 21,
"%^&*": 19,
"XXXXXXXXX": 15,
"dpcollins": 28,
"#()&$IJHLOIURF": 2,
"dfasiduyf": 6,
"983u2poer": 3,
"8888888": 6,
"OPUIPOUYPOIOPUIOIPUOUIPJOP": 2,
"x": 16
}

0 comments on commit f72a2f0

Please sign in to comment.