Skip to content

Commit

Permalink
feat: Implement Publisher API (#21)
Browse files Browse the repository at this point in the history
* feat: Implement transforms to/from Pub/Sub messages and Pub/Sub Lite messages.

* feat: Implement Pub/Sub Publisher

* fix: Re-delete b64_utils after merge.

Co-authored-by: Daniel Collins <dpcollins@google.com>
  • Loading branch information
hannahrogers-google and dpcollins-google committed Sep 23, 2020
1 parent 903070d commit 58fda6f
Show file tree
Hide file tree
Showing 6 changed files with 151 additions and 0 deletions.
@@ -0,0 +1,26 @@
from typing import Mapping

from google.pubsub_v1 import PubsubMessage

from google.cloud.pubsublite.cloudpubsub.message_transforms import from_cps_publish_message
from google.cloud.pubsublite.cloudpubsub.publisher import AsyncPublisher
from google.cloud.pubsublite.internal.wire.publisher import Publisher


class AsyncPublisherImpl(AsyncPublisher):
_publisher: Publisher

def __init__(self, publisher: Publisher):
self._publisher = publisher

async def publish(self, data: bytes, ordering_key: str = "", **attrs: Mapping[str, str]) -> str:
cps_message = PubsubMessage(data=data, ordering_key=ordering_key, attributes=attrs)
psl_message = from_cps_publish_message(cps_message)
return (await self._publisher.publish(psl_message)).encode()

def __aenter__(self):
self._publisher.__aenter__()
return self

def __aexit__(self, exc_type, exc_value, traceback):
self._publisher.__aexit__(exc_type, exc_value, traceback)
23 changes: 23 additions & 0 deletions google/cloud/pubsublite/cloudpubsub/internal/managed_event_loop.py
@@ -0,0 +1,23 @@
from asyncio import AbstractEventLoop, new_event_loop, run_coroutine_threadsafe
from concurrent.futures import Future
from threading import Thread
from typing import ContextManager


class ManagedEventLoop(ContextManager):
_loop: AbstractEventLoop
_thread: Thread

def __init__(self):
self._loop = new_event_loop()
self._thread = Thread(target=lambda: self._loop.run_forever())

def __enter__(self):
self._thread.start()

def __exit__(self, __exc_type, __exc_value, __traceback):
self._loop.call_soon_threadsafe(self._loop.stop)
self._thread.join()

def submit(self, coro) -> Future:
return run_coroutine_threadsafe(coro, self._loop)
26 changes: 26 additions & 0 deletions google/cloud/pubsublite/cloudpubsub/internal/publisher_impl.py
@@ -0,0 +1,26 @@
from concurrent.futures import Future
from typing import Mapping

from google.cloud.pubsublite.cloudpubsub.internal.managed_event_loop import ManagedEventLoop
from google.cloud.pubsublite.cloudpubsub.publisher import Publisher, AsyncPublisher


class PublisherImpl(Publisher):
_managed_loop: ManagedEventLoop
_underlying: AsyncPublisher

def __init__(self, underlying: AsyncPublisher):
self._managed_loop = ManagedEventLoop()
self._underlying = underlying

def publish(self, data: bytes, ordering_key: str = "", **attrs: Mapping[str, str]) -> 'Future[str]':
return self._managed_loop.submit(self._underlying.publish(data=data, ordering_key=ordering_key, **attrs))

def __enter__(self):
self._managed_loop.__enter__()
self._managed_loop.submit(self._underlying.__aenter__()).result()
return self

def __exit__(self, __exc_type, __exc_value, __traceback):
self._managed_loop.submit(self._underlying.__aexit__(__exc_type, __exc_value, __traceback)).result()
self._managed_loop.__exit__(__exc_type, __exc_value, __traceback)
50 changes: 50 additions & 0 deletions google/cloud/pubsublite/cloudpubsub/publisher.py
@@ -0,0 +1,50 @@
from abc import abstractmethod
from typing import AsyncContextManager, Mapping, ContextManager
from concurrent import futures


class AsyncPublisher(AsyncContextManager):
"""
An AsyncPublisher publishes messages similar to Google Pub/Sub, but must be used in an
async context. Any publish failures are permanent.
"""

@abstractmethod
async def publish(self, data: bytes, ordering_key: str = "", **attrs: Mapping[str, str]) -> str:
"""
Publish a message.
Args:
data: The bytestring payload of the message
ordering_key: The key to enforce ordering on, or "" for no ordering.
**attrs: Additional attributes to send.
Returns:
An ack id, which can be decoded using PublishMetadata.decode.
Raises:
GoogleApiCallError: On a permanent failure.
"""


class Publisher(ContextManager):
"""
A Publisher publishes messages similar to Google Pub/Sub. Any publish failures are permanent.
"""

@abstractmethod
def publish(self, data: bytes, ordering_key: str = "", **attrs: Mapping[str, str]) -> 'futures.Future[str]':
"""
Publish a message.
Args:
data: The bytestring payload of the message
ordering_key: The key to enforce ordering on, or "" for no ordering.
**attrs: Additional attributes to send.
Returns:
A future completed with an ack id, which can be decoded using PublishMetadata.decode.
Raises:
GoogleApiCallError: On a permanent failure.
"""
1 change: 1 addition & 0 deletions setup.py
Expand Up @@ -31,6 +31,7 @@
"google-api-core >= 1.22.0",
"absl-py >= 0.9.0",
"proto-plus >= 0.4.0",
"google-cloud-pubsub >= 1.7.0",
"grpcio",
"setuptools"
]
Expand Down
25 changes: 25 additions & 0 deletions tests/unit/pubsublite/cloudpubsub/internal/publisher_impl_test.py
@@ -0,0 +1,25 @@
from asynctest.mock import MagicMock
import pytest

from google.cloud.pubsublite.cloudpubsub.internal.publisher_impl import PublisherImpl
from google.cloud.pubsublite.cloudpubsub.publisher import AsyncPublisher, Publisher


@pytest.fixture()
def async_publisher():
publisher = MagicMock(spec=AsyncPublisher)
publisher.__aenter__.return_value = publisher
return publisher


@pytest.fixture()
def publisher(async_publisher):
return PublisherImpl(async_publisher)


def test_proxies_to_async(async_publisher, publisher: Publisher):
with publisher:
async_publisher.__aenter__.assert_called_once()
publisher.publish(data=b'abc', ordering_key='zyx', xyz='xyz').result()
async_publisher.publish.assert_called_once_with(data=b'abc', ordering_key='zyx', xyz='xyz')
async_publisher.__aexit__.assert_called_once()

0 comments on commit 58fda6f

Please sign in to comment.