From 46d9075287faf95a9e0c251b0ca446ee254fa22b Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Mon, 21 Sep 2020 15:46:11 -0400 Subject: [PATCH 1/3] feat: Implement transforms to/from Pub/Sub messages and Pub/Sub Lite messages. --- google/cloud/pubsublite/internal/b64_utils.py | 11 +++++++++++ 1 file changed, 11 insertions(+) create mode 100644 google/cloud/pubsublite/internal/b64_utils.py diff --git a/google/cloud/pubsublite/internal/b64_utils.py b/google/cloud/pubsublite/internal/b64_utils.py new file mode 100644 index 00000000..1a5dd15c --- /dev/null +++ b/google/cloud/pubsublite/internal/b64_utils.py @@ -0,0 +1,11 @@ +import base64 +import pickle + + +def to_b64_string(src: object) -> str: + return base64.b64encode(pickle.dumps(src)).decode('utf-8') + + +def from_b64_string(src: str) -> object: + return pickle.loads(base64.b64decode(src.encode('utf-8'))) + From 088f474a6c502f894c6081264c3b1e907b060f59 Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Mon, 21 Sep 2020 16:59:28 -0400 Subject: [PATCH 2/3] feat: Implement Pub/Sub Publisher --- .../internal/async_publisher_impl.py | 26 ++++++++++ .../internal/managed_event_loop.py | 23 +++++++++ .../cloudpubsub/internal/publisher_impl.py | 26 ++++++++++ .../cloud/pubsublite/cloudpubsub/publisher.py | 50 +++++++++++++++++++ setup.py | 1 + .../internal/publisher_impl_test.py | 25 ++++++++++ 6 files changed, 151 insertions(+) create mode 100644 google/cloud/pubsublite/cloudpubsub/internal/async_publisher_impl.py create mode 100644 google/cloud/pubsublite/cloudpubsub/internal/managed_event_loop.py create mode 100644 google/cloud/pubsublite/cloudpubsub/internal/publisher_impl.py create mode 100644 google/cloud/pubsublite/cloudpubsub/publisher.py create mode 100644 tests/unit/pubsublite/cloudpubsub/internal/publisher_impl_test.py diff --git a/google/cloud/pubsublite/cloudpubsub/internal/async_publisher_impl.py b/google/cloud/pubsublite/cloudpubsub/internal/async_publisher_impl.py new file mode 100644 index 00000000..7828ee8d --- /dev/null +++ b/google/cloud/pubsublite/cloudpubsub/internal/async_publisher_impl.py @@ -0,0 +1,26 @@ +from typing import Mapping + +from google.pubsub_v1 import PubsubMessage + +from google.cloud.pubsublite.cloudpubsub.message_transforms import from_cps_publish_message +from google.cloud.pubsublite.cloudpubsub.publisher import AsyncPublisher +from google.cloud.pubsublite.internal.wire.publisher import Publisher + + +class AsyncPublisherImpl(AsyncPublisher): + _publisher: Publisher + + def __init__(self, publisher: Publisher): + self._publisher = publisher + + async def publish(self, data: bytes, ordering_key: str = "", **attrs: Mapping[str, str]) -> str: + cps_message = PubsubMessage(data=data, ordering_key=ordering_key, attributes=attrs) + psl_message = from_cps_publish_message(cps_message) + return (await self._publisher.publish(psl_message)).encode() + + def __aenter__(self): + self._publisher.__aenter__() + return self + + def __aexit__(self, exc_type, exc_value, traceback): + self._publisher.__aexit__(exc_type, exc_value, traceback) diff --git a/google/cloud/pubsublite/cloudpubsub/internal/managed_event_loop.py b/google/cloud/pubsublite/cloudpubsub/internal/managed_event_loop.py new file mode 100644 index 00000000..7840bac3 --- /dev/null +++ b/google/cloud/pubsublite/cloudpubsub/internal/managed_event_loop.py @@ -0,0 +1,23 @@ +from asyncio import AbstractEventLoop, new_event_loop, run_coroutine_threadsafe +from concurrent.futures import Future +from threading import Thread +from typing import ContextManager + + +class ManagedEventLoop(ContextManager): + _loop: AbstractEventLoop + _thread: Thread + + def __init__(self): + self._loop = new_event_loop() + self._thread = Thread(target=lambda: self._loop.run_forever()) + + def __enter__(self): + self._thread.start() + + def __exit__(self, __exc_type, __exc_value, __traceback): + self._loop.call_soon_threadsafe(self._loop.stop) + self._thread.join() + + def submit(self, coro) -> Future: + return run_coroutine_threadsafe(coro, self._loop) diff --git a/google/cloud/pubsublite/cloudpubsub/internal/publisher_impl.py b/google/cloud/pubsublite/cloudpubsub/internal/publisher_impl.py new file mode 100644 index 00000000..25419580 --- /dev/null +++ b/google/cloud/pubsublite/cloudpubsub/internal/publisher_impl.py @@ -0,0 +1,26 @@ +from concurrent.futures import Future +from typing import Mapping + +from google.cloud.pubsublite.cloudpubsub.internal.managed_event_loop import ManagedEventLoop +from google.cloud.pubsublite.cloudpubsub.publisher import Publisher, AsyncPublisher + + +class PublisherImpl(Publisher): + _managed_loop: ManagedEventLoop + _underlying: AsyncPublisher + + def __init__(self, underlying: AsyncPublisher): + self._managed_loop = ManagedEventLoop() + self._underlying = underlying + + def publish(self, data: bytes, ordering_key: str = "", **attrs: Mapping[str, str]) -> 'Future[str]': + return self._managed_loop.submit(self._underlying.publish(data=data, ordering_key=ordering_key, **attrs)) + + def __enter__(self): + self._managed_loop.__enter__() + self._managed_loop.submit(self._underlying.__aenter__()).result() + return self + + def __exit__(self, __exc_type, __exc_value, __traceback): + self._managed_loop.submit(self._underlying.__aexit__(__exc_type, __exc_value, __traceback)).result() + self._managed_loop.__exit__(__exc_type, __exc_value, __traceback) diff --git a/google/cloud/pubsublite/cloudpubsub/publisher.py b/google/cloud/pubsublite/cloudpubsub/publisher.py new file mode 100644 index 00000000..26644fba --- /dev/null +++ b/google/cloud/pubsublite/cloudpubsub/publisher.py @@ -0,0 +1,50 @@ +from abc import abstractmethod +from typing import AsyncContextManager, Mapping, ContextManager +from concurrent import futures + + +class AsyncPublisher(AsyncContextManager): + """ + An AsyncPublisher publishes messages similar to Google Pub/Sub, but must be used in an + async context. Any publish failures are permanent. + """ + + @abstractmethod + async def publish(self, data: bytes, ordering_key: str = "", **attrs: Mapping[str, str]) -> str: + """ + Publish a message. + + Args: + data: The bytestring payload of the message + ordering_key: The key to enforce ordering on, or "" for no ordering. + **attrs: Additional attributes to send. + + Returns: + An ack id, which can be decoded using PublishMetadata.decode. + + Raises: + GoogleApiCallError: On a permanent failure. + """ + + +class Publisher(ContextManager): + """ + A Publisher publishes messages similar to Google Pub/Sub. Any publish failures are permanent. + """ + + @abstractmethod + def publish(self, data: bytes, ordering_key: str = "", **attrs: Mapping[str, str]) -> 'futures.Future[str]': + """ + Publish a message. + + Args: + data: The bytestring payload of the message + ordering_key: The key to enforce ordering on, or "" for no ordering. + **attrs: Additional attributes to send. + + Returns: + A future completed with an ack id, which can be decoded using PublishMetadata.decode. + + Raises: + GoogleApiCallError: On a permanent failure. + """ diff --git a/setup.py b/setup.py index 51667806..2810f54d 100644 --- a/setup.py +++ b/setup.py @@ -31,6 +31,7 @@ "google-api-core >= 1.22.0", "absl-py >= 0.9.0", "proto-plus >= 0.4.0", + "google-cloud-pubsub >= 1.7.0", "grpcio", "setuptools" ] diff --git a/tests/unit/pubsublite/cloudpubsub/internal/publisher_impl_test.py b/tests/unit/pubsublite/cloudpubsub/internal/publisher_impl_test.py new file mode 100644 index 00000000..afb59d1f --- /dev/null +++ b/tests/unit/pubsublite/cloudpubsub/internal/publisher_impl_test.py @@ -0,0 +1,25 @@ +from asynctest.mock import MagicMock +import pytest + +from google.cloud.pubsublite.cloudpubsub.internal.publisher_impl import PublisherImpl +from google.cloud.pubsublite.cloudpubsub.publisher import AsyncPublisher, Publisher + + +@pytest.fixture() +def async_publisher(): + publisher = MagicMock(spec=AsyncPublisher) + publisher.__aenter__.return_value = publisher + return publisher + + +@pytest.fixture() +def publisher(async_publisher): + return PublisherImpl(async_publisher) + + +def test_proxies_to_async(async_publisher, publisher: Publisher): + with publisher: + async_publisher.__aenter__.assert_called_once() + publisher.publish(data=b'abc', ordering_key='zyx', xyz='xyz').result() + async_publisher.publish.assert_called_once_with(data=b'abc', ordering_key='zyx', xyz='xyz') + async_publisher.__aexit__.assert_called_once() From 974f8904cf9e58a43ef5e429d72d1e44b8753d48 Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Tue, 22 Sep 2020 11:48:54 -0400 Subject: [PATCH 3/3] fix: Re-delete b64_utils after merge. --- google/cloud/pubsublite/internal/b64_utils.py | 11 ----------- 1 file changed, 11 deletions(-) delete mode 100644 google/cloud/pubsublite/internal/b64_utils.py diff --git a/google/cloud/pubsublite/internal/b64_utils.py b/google/cloud/pubsublite/internal/b64_utils.py deleted file mode 100644 index 1a5dd15c..00000000 --- a/google/cloud/pubsublite/internal/b64_utils.py +++ /dev/null @@ -1,11 +0,0 @@ -import base64 -import pickle - - -def to_b64_string(src: object) -> str: - return base64.b64encode(pickle.dumps(src)).decode('utf-8') - - -def from_b64_string(src: str) -> object: - return pickle.loads(base64.b64decode(src.encode('utf-8'))) -