Skip to content

Commit

Permalink
fix: Assorted fixes to the publish layer and internals. (#39)
Browse files Browse the repository at this point in the history
* fix: Assorted fixes to the publish layer and internals.

* chore: Run blacken
  • Loading branch information
dpcollins-google committed Oct 8, 2020
1 parent 0d6f0f4 commit 4276882
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 14 deletions.
Expand Up @@ -25,9 +25,9 @@ async def publish(
psl_message = from_cps_publish_message(cps_message)
return (await self._publisher.publish(psl_message)).encode()

def __aenter__(self):
self._publisher.__aenter__()
async def __aenter__(self):
await self._publisher.__aenter__()
return self

def __aexit__(self, exc_type, exc_value, traceback):
self._publisher.__aexit__(exc_type, exc_value, traceback)
async def __aexit__(self, exc_type, exc_value, traceback):
await self._publisher.__aexit__(exc_type, exc_value, traceback)
Expand Up @@ -17,15 +17,15 @@ class DefaultRoutingPolicy(RoutingPolicy):

def __init__(self, num_partitions: int):
self._num_partitions = num_partitions
self._current_round_robin = Partition(random.randint(0, num_partitions))
self._current_round_robin = Partition(random.randint(0, num_partitions - 1))

def route(self, message: PubSubMessage) -> Partition:
"""Route the message using the key if set or round robin if unset."""
if not message.key:
result = Partition(self._current_round_robin.value)
self._current_round_robin.value = (
self._current_round_robin.value + 1
) % self._num_partitions
self._current_round_robin = Partition(
(self._current_round_robin.value + 1) % self._num_partitions
)
return result
sha = hashlib.sha256()
sha.update(message.key)
Expand Down
11 changes: 9 additions & 2 deletions google/cloud/pubsublite/internal/wire/permanent_failable.py
Expand Up @@ -9,10 +9,17 @@
class PermanentFailable:
"""A class that can experience permanent failures, with helpers for forwarding these to client actions."""

_failure_task: asyncio.Future
_maybe_failure_task: Optional[asyncio.Future]

def __init__(self):
self._failure_task = asyncio.Future()
self._maybe_failure_task = None

@property
def _failure_task(self) -> asyncio.Future:
"""Get the failure task, initializing it lazily, since it needs to be initialized in the event loop."""
if self._maybe_failure_task is None:
self._maybe_failure_task = asyncio.Future()
return self._maybe_failure_task

async def await_unless_failed(self, awaitable: Awaitable[T]) -> T:
"""
Expand Down
3 changes: 3 additions & 0 deletions google/cloud/pubsublite/location.py
Expand Up @@ -6,6 +6,9 @@
class CloudRegion(NamedTuple):
name: str

def __str__(self):
return self.name


class CloudZone(NamedTuple):
region: CloudRegion
Expand Down
10 changes: 6 additions & 4 deletions google/cloud/pubsublite/routing_metadata.py
Expand Up @@ -8,12 +8,14 @@


def topic_routing_metadata(topic: TopicPath, partition: Partition) -> Mapping[str, str]:
encoded = urlencode(topic)
return {_PARAMS_HEADER: f"partition={partition.value}&topic={encoded}"}
encoded = urlencode({"partition": str(partition.value), "topic": str(topic)})
return {_PARAMS_HEADER: encoded}


def subscription_routing_metadata(
subscription: SubscriptionPath, partition: Partition
) -> Mapping[str, str]:
encoded = urlencode(subscription)
return {_PARAMS_HEADER: f"partition={partition.value}&subscription={encoded}"}
encoded = urlencode(
{"partition": str(partition.value), "subscription": str(subscription)}
)
return {_PARAMS_HEADER: encoded}

0 comments on commit 4276882

Please sign in to comment.