Skip to content

Commit

Permalink
feat: Implement make_publisher which creates a routing publisher. (#11)
Browse files Browse the repository at this point in the history
* feat: Implement RoutingPublisher which routes between publishers.

* fix: Add documentation for many files.
  • Loading branch information
dpcollins-google committed Aug 12, 2020
1 parent 7aa39a1 commit baeb0f6
Show file tree
Hide file tree
Showing 11 changed files with 178 additions and 4 deletions.
5 changes: 5 additions & 0 deletions google/cloud/pubsublite/endpoints.py
@@ -0,0 +1,5 @@
from google.cloud.pubsublite.location import CloudRegion


def regional_endpoint(region: CloudRegion):
return f"{region}-pubsublite.googleapis.com"
5 changes: 4 additions & 1 deletion google/cloud/pubsublite/internal/wire/gapic_connection.py
Expand Up @@ -47,7 +47,10 @@ class GapicConnectionFactory(ConnectionFactory[Request, Response]):
"""A ConnectionFactory that produces GapicConnections."""
_producer = Callable[[AsyncIterator[Request]], AsyncIterable[Response]]

def New(self) -> Connection[Request, Response]:
def __init__(self, producer: Callable[[AsyncIterator[Request]], AsyncIterable[Response]]):
self._producer = producer

def new(self) -> Connection[Request, Response]:
conn = GapicConnection[Request, Response]()
response_iterable = self._producer(conn)
conn.set_response_it(response_iterable.__aiter__())
Expand Down
62 changes: 62 additions & 0 deletions google/cloud/pubsublite/internal/wire/make_publisher.py
@@ -0,0 +1,62 @@
from typing import AsyncIterator, Mapping, Optional, MutableMapping

from google.cloud.pubsublite.endpoints import regional_endpoint
from google.cloud.pubsublite.internal.wire.default_routing_policy import DefaultRoutingPolicy
from google.cloud.pubsublite.internal.wire.gapic_connection import GapicConnectionFactory
from google.cloud.pubsublite.internal.wire.merge_metadata import merge_metadata
from google.cloud.pubsublite.internal.wire.publisher import Publisher
from google.cloud.pubsublite.internal.wire.routing_publisher import RoutingPublisher
from google.cloud.pubsublite.internal.wire.single_partition_publisher import SinglePartitionPublisher
from google.cloud.pubsublite.partition import Partition
from google.cloud.pubsublite.paths import TopicPath
from google.cloud.pubsublite.routing_metadata import topic_routing_metadata
from google.cloud.pubsublite_v1 import InitialPublishRequest, PublishRequest
from google.cloud.pubsublite_v1.services.publisher_service import async_client
from google.cloud.pubsublite_v1.services.admin_service.client import AdminServiceClient
from google.cloud.pubsublite_v1.types.admin import GetTopicPartitionsRequest
from google.api_core.client_options import ClientOptions
from google.auth.credentials import Credentials


def make_publisher(
topic: TopicPath,
batching_delay_secs: float = .05,
credentials: Optional[Credentials] = None,
client_options: Optional[ClientOptions] = None,
metadata: Optional[Mapping[str, str]] = None) -> Publisher:
"""
Make a new publisher for the given topic.
Args:
topic: The topic to publish to.
batching_delay_secs: The delay in seconds to batch messages. The default is reasonable for most cases.
credentials: The credentials to use to connect. GOOGLE_DEFAULT_CREDENTIALS is used if None.
client_options: Other options to pass to the client. Note that if you pass any you must set api_endpoint.
metadata: Additional metadata to send with the RPC.
Returns:
A new Publisher.
Throws:
GoogleApiCallException on any error determining topic structure.
"""
if client_options is None:
client_options = ClientOptions(api_endpoint=regional_endpoint(topic.location.region))
client = async_client.PublisherServiceAsyncClient(
credentials=credentials, client_options=client_options) # type: ignore

admin_client = AdminServiceClient(credentials=credentials, client_options=client_options)
partitions = admin_client.get_topic_partitions(GetTopicPartitionsRequest(name=str(topic)))

clients: MutableMapping[Partition, Publisher] = {}

for partition in range(partitions.partition_count):
partition = Partition(partition)

def connection_factory(requests: AsyncIterator[PublishRequest]):
final_metadata = merge_metadata(metadata, topic_routing_metadata(topic, partition))
return client.publish(requests, metadata=list(final_metadata.items()))

clients[partition] = SinglePartitionPublisher(InitialPublishRequest(topic=str(topic), partition=partition.value),
batching_delay_secs, GapicConnectionFactory(connection_factory))
return RoutingPublisher(DefaultRoutingPolicy(partitions.partition_count), clients)
15 changes: 15 additions & 0 deletions google/cloud/pubsublite/internal/wire/merge_metadata.py
@@ -0,0 +1,15 @@
from typing import Mapping, Optional


def merge_metadata(a: Optional[Mapping[str, str]], b: Optional[Mapping[str, str]]) -> Mapping[str, str]:
"""
Merge the two sets of metadata if either exists. The second map overwrites the first.
"""
result = {}
if a:
for k, v in a.items():
result[k] = v
if b:
for k, v in b.items():
result[k] = v
return result
3 changes: 3 additions & 0 deletions google/cloud/pubsublite/internal/wire/publisher.py
Expand Up @@ -4,6 +4,9 @@


class Publisher(ABC):
"""
A Pub/Sub Lite asynchronous wire protocol publisher.
"""
@abstractmethod
async def __aenter__(self):
raise NotImplementedError()
Expand Down
34 changes: 34 additions & 0 deletions google/cloud/pubsublite/internal/wire/pubsub_context.py
@@ -0,0 +1,34 @@
from base64 import b64encode
from typing import Mapping, Optional, NamedTuple

from absl import logging
import pkg_resources
from google.protobuf import struct_pb2


class _Semver(NamedTuple):
major: int
minor: int


def _version() -> _Semver:
version = pkg_resources.get_distribution("google-cloud-pubsublite").version
splits = version.split(".")
if len(splits) != 3:
logging.info(f"Failed to extract semver from {version}.")
return _Semver(0, 0)
return _Semver(int(splits[0]), int(splits[1]))


def pubsub_context(framework: Optional[str] = None) -> Mapping[str, str]:
"""Construct the pubsub context mapping for the given framework."""
context = struct_pb2.Struct()
context.fields["language"] = struct_pb2.Value(string_value="PYTHON")
if framework:
context.fields["framework"] = struct_pb2.Value(string_value=framework)
version = _version()
context.fields["major_version"] = struct_pb2.Value(number_value=version.major)
context.fields["minor_version"] = struct_pb2.Value(number_value=version.minor)
encoded = b64encode(context.SerializeToString())
return {"x-goog-pubsub-context": encoded}

6 changes: 3 additions & 3 deletions google/cloud/pubsublite/internal/wire/routing_publisher.py
@@ -1,4 +1,4 @@
from typing import Dict
from typing import Mapping

from google.cloud.pubsublite.internal.wire.publisher import Publisher
from google.cloud.pubsublite.internal.wire.routing_policy import RoutingPolicy
Expand All @@ -9,9 +9,9 @@

class RoutingPublisher(Publisher):
_routing_policy: RoutingPolicy
_publishers: Dict[Partition, Publisher]
_publishers: Mapping[Partition, Publisher]

def __init__(self, routing_policy: RoutingPolicy, publishers: Dict[Partition, Publisher]):
def __init__(self, routing_policy: RoutingPolicy, publishers: Mapping[Partition, Publisher]):
self._routing_policy = routing_policy
self._publishers = publishers

Expand Down
13 changes: 13 additions & 0 deletions google/cloud/pubsublite/location.py
@@ -0,0 +1,13 @@
from typing import NamedTuple


class CloudRegion(NamedTuple):
name: str


class CloudZone(NamedTuple):
region: CloudRegion
zone_id: str

def __str__(self):
return f"{self.region.name}-{self.zone_id}"
21 changes: 21 additions & 0 deletions google/cloud/pubsublite/paths.py
@@ -0,0 +1,21 @@
from typing import NamedTuple

from google.cloud.pubsublite.location import CloudZone


class TopicPath(NamedTuple):
project_number: int
location: CloudZone
name: str

def __str__(self):
return f"projects/{self.project_number}/locations/{self.location}/topics/{self.name}"


class SubscriptionPath(NamedTuple):
project_number: int
location: CloudZone
name: str

def __str__(self):
return f"projects/{self.project_number}/locations/{self.location}/subscriptions/{self.name}"
17 changes: 17 additions & 0 deletions google/cloud/pubsublite/routing_metadata.py
@@ -0,0 +1,17 @@
from typing import Mapping, Union
from urllib.parse import urlencode

from google.cloud.pubsublite.partition import Partition
from google.cloud.pubsublite.paths import TopicPath, SubscriptionPath

_PARAMS_HEADER = "x-goog-request-params"


def topic_routing_metadata(topic: TopicPath, partition: Partition) -> Mapping[str, str]:
encoded = urlencode(topic)
return {_PARAMS_HEADER: f"partition={partition.value}&topic={encoded}"}


def subscription_routing_metadata(subscription: SubscriptionPath, partition: Partition) -> Mapping[str, str]:
encoded = urlencode(subscription)
return {_PARAMS_HEADER: f"partition={partition.value}&subscription={encoded}"}
1 change: 1 addition & 0 deletions setup.py
Expand Up @@ -31,6 +31,7 @@
"google-api-core >= 1.22.0",
"absl-py >= 0.9.0",
"proto-plus >= 0.4.0",
"setuptools"
]

setuptools.setup(
Expand Down

0 comments on commit baeb0f6

Please sign in to comment.