Skip to content

Commit

Permalink
feat: Implement RoutingPublisher which routes between publishers. (#10)
Browse files Browse the repository at this point in the history
* feat: Implement SerialBatcher which helps with transforming single writes into batch writes.

* feat: Implement SinglePartitionPublisher which publishes to a single partition and handles retries.

* feat: Implement RoutingPublisher which routes between publishers.
  • Loading branch information
dpcollins-google committed Aug 12, 2020
1 parent fd1d76f commit 7aa39a1
Showing 1 changed file with 29 additions and 0 deletions.
29 changes: 29 additions & 0 deletions google/cloud/pubsublite/internal/wire/routing_publisher.py
@@ -0,0 +1,29 @@
from typing import Dict

from google.cloud.pubsublite.internal.wire.publisher import Publisher
from google.cloud.pubsublite.internal.wire.routing_policy import RoutingPolicy
from google.cloud.pubsublite.partition import Partition
from google.cloud.pubsublite.publish_metadata import PublishMetadata
from google.cloud.pubsublite_v1 import PubSubMessage


class RoutingPublisher(Publisher):
_routing_policy: RoutingPolicy
_publishers: Dict[Partition, Publisher]

def __init__(self, routing_policy: RoutingPolicy, publishers: Dict[Partition, Publisher]):
self._routing_policy = routing_policy
self._publishers = publishers

async def __aenter__(self):
for publisher in self._publishers.values():
await publisher.__aenter__()

async def __aexit__(self, exc_type, exc_val, exc_tb):
for publisher in self._publishers.values():
await publisher.__aexit__(exc_type, exc_val, exc_tb)

async def publish(self, message: PubSubMessage) -> PublishMetadata:
partition = self._routing_policy.route(message)
assert partition in self._publishers
return await self._publishers[partition].publish(message)

0 comments on commit 7aa39a1

Please sign in to comment.