diff --git a/google/cloud/pubsublite/cloudpubsub/internal/async_publisher_impl.py b/google/cloud/pubsublite/cloudpubsub/internal/async_publisher_impl.py new file mode 100644 index 00000000..7828ee8d --- /dev/null +++ b/google/cloud/pubsublite/cloudpubsub/internal/async_publisher_impl.py @@ -0,0 +1,26 @@ +from typing import Mapping + +from google.pubsub_v1 import PubsubMessage + +from google.cloud.pubsublite.cloudpubsub.message_transforms import from_cps_publish_message +from google.cloud.pubsublite.cloudpubsub.publisher import AsyncPublisher +from google.cloud.pubsublite.internal.wire.publisher import Publisher + + +class AsyncPublisherImpl(AsyncPublisher): + _publisher: Publisher + + def __init__(self, publisher: Publisher): + self._publisher = publisher + + async def publish(self, data: bytes, ordering_key: str = "", **attrs: Mapping[str, str]) -> str: + cps_message = PubsubMessage(data=data, ordering_key=ordering_key, attributes=attrs) + psl_message = from_cps_publish_message(cps_message) + return (await self._publisher.publish(psl_message)).encode() + + def __aenter__(self): + self._publisher.__aenter__() + return self + + def __aexit__(self, exc_type, exc_value, traceback): + self._publisher.__aexit__(exc_type, exc_value, traceback) diff --git a/google/cloud/pubsublite/cloudpubsub/internal/managed_event_loop.py b/google/cloud/pubsublite/cloudpubsub/internal/managed_event_loop.py new file mode 100644 index 00000000..7840bac3 --- /dev/null +++ b/google/cloud/pubsublite/cloudpubsub/internal/managed_event_loop.py @@ -0,0 +1,23 @@ +from asyncio import AbstractEventLoop, new_event_loop, run_coroutine_threadsafe +from concurrent.futures import Future +from threading import Thread +from typing import ContextManager + + +class ManagedEventLoop(ContextManager): + _loop: AbstractEventLoop + _thread: Thread + + def __init__(self): + self._loop = new_event_loop() + self._thread = Thread(target=lambda: self._loop.run_forever()) + + def __enter__(self): + self._thread.start() + + def __exit__(self, __exc_type, __exc_value, __traceback): + self._loop.call_soon_threadsafe(self._loop.stop) + self._thread.join() + + def submit(self, coro) -> Future: + return run_coroutine_threadsafe(coro, self._loop) diff --git a/google/cloud/pubsublite/cloudpubsub/internal/publisher_impl.py b/google/cloud/pubsublite/cloudpubsub/internal/publisher_impl.py new file mode 100644 index 00000000..25419580 --- /dev/null +++ b/google/cloud/pubsublite/cloudpubsub/internal/publisher_impl.py @@ -0,0 +1,26 @@ +from concurrent.futures import Future +from typing import Mapping + +from google.cloud.pubsublite.cloudpubsub.internal.managed_event_loop import ManagedEventLoop +from google.cloud.pubsublite.cloudpubsub.publisher import Publisher, AsyncPublisher + + +class PublisherImpl(Publisher): + _managed_loop: ManagedEventLoop + _underlying: AsyncPublisher + + def __init__(self, underlying: AsyncPublisher): + self._managed_loop = ManagedEventLoop() + self._underlying = underlying + + def publish(self, data: bytes, ordering_key: str = "", **attrs: Mapping[str, str]) -> 'Future[str]': + return self._managed_loop.submit(self._underlying.publish(data=data, ordering_key=ordering_key, **attrs)) + + def __enter__(self): + self._managed_loop.__enter__() + self._managed_loop.submit(self._underlying.__aenter__()).result() + return self + + def __exit__(self, __exc_type, __exc_value, __traceback): + self._managed_loop.submit(self._underlying.__aexit__(__exc_type, __exc_value, __traceback)).result() + self._managed_loop.__exit__(__exc_type, __exc_value, __traceback) diff --git a/google/cloud/pubsublite/cloudpubsub/publisher.py b/google/cloud/pubsublite/cloudpubsub/publisher.py new file mode 100644 index 00000000..26644fba --- /dev/null +++ b/google/cloud/pubsublite/cloudpubsub/publisher.py @@ -0,0 +1,50 @@ +from abc import abstractmethod +from typing import AsyncContextManager, Mapping, ContextManager +from concurrent import futures + + +class AsyncPublisher(AsyncContextManager): + """ + An AsyncPublisher publishes messages similar to Google Pub/Sub, but must be used in an + async context. Any publish failures are permanent. + """ + + @abstractmethod + async def publish(self, data: bytes, ordering_key: str = "", **attrs: Mapping[str, str]) -> str: + """ + Publish a message. + + Args: + data: The bytestring payload of the message + ordering_key: The key to enforce ordering on, or "" for no ordering. + **attrs: Additional attributes to send. + + Returns: + An ack id, which can be decoded using PublishMetadata.decode. + + Raises: + GoogleApiCallError: On a permanent failure. + """ + + +class Publisher(ContextManager): + """ + A Publisher publishes messages similar to Google Pub/Sub. Any publish failures are permanent. + """ + + @abstractmethod + def publish(self, data: bytes, ordering_key: str = "", **attrs: Mapping[str, str]) -> 'futures.Future[str]': + """ + Publish a message. + + Args: + data: The bytestring payload of the message + ordering_key: The key to enforce ordering on, or "" for no ordering. + **attrs: Additional attributes to send. + + Returns: + A future completed with an ack id, which can be decoded using PublishMetadata.decode. + + Raises: + GoogleApiCallError: On a permanent failure. + """ diff --git a/setup.py b/setup.py index 51667806..2810f54d 100644 --- a/setup.py +++ b/setup.py @@ -31,6 +31,7 @@ "google-api-core >= 1.22.0", "absl-py >= 0.9.0", "proto-plus >= 0.4.0", + "google-cloud-pubsub >= 1.7.0", "grpcio", "setuptools" ] diff --git a/tests/unit/pubsublite/cloudpubsub/internal/publisher_impl_test.py b/tests/unit/pubsublite/cloudpubsub/internal/publisher_impl_test.py new file mode 100644 index 00000000..afb59d1f --- /dev/null +++ b/tests/unit/pubsublite/cloudpubsub/internal/publisher_impl_test.py @@ -0,0 +1,25 @@ +from asynctest.mock import MagicMock +import pytest + +from google.cloud.pubsublite.cloudpubsub.internal.publisher_impl import PublisherImpl +from google.cloud.pubsublite.cloudpubsub.publisher import AsyncPublisher, Publisher + + +@pytest.fixture() +def async_publisher(): + publisher = MagicMock(spec=AsyncPublisher) + publisher.__aenter__.return_value = publisher + return publisher + + +@pytest.fixture() +def publisher(async_publisher): + return PublisherImpl(async_publisher) + + +def test_proxies_to_async(async_publisher, publisher: Publisher): + with publisher: + async_publisher.__aenter__.assert_called_once() + publisher.publish(data=b'abc', ordering_key='zyx', xyz='xyz').result() + async_publisher.publish.assert_called_once_with(data=b'abc', ordering_key='zyx', xyz='xyz') + async_publisher.__aexit__.assert_called_once()