Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement transforms to/from Cloud Pub/Sub Messages #20

Merged
merged 6 commits into from Sep 22, 2020
Empty file.
Empty file.
32 changes: 32 additions & 0 deletions google/cloud/pubsublite/cloudpubsub/internal/ack_set_tracker.py
@@ -0,0 +1,32 @@
from abc import abstractmethod
from typing import AsyncContextManager


class AckSetTracker(AsyncContextManager):
"""
An AckSetTracker tracks disjoint acknowledged messages and commits them when a contiguous prefix of tracked offsets
is aggregated.
"""
@abstractmethod
def track(self, offset: int):
"""
Track the provided offset.

Args:
offset: the offset to track.

Raises:
GoogleAPICallError: On an invalid offset to track.
"""

@abstractmethod
async def ack(self, offset: int):
"""
Acknowledge the message with the provided offset. The offset must have previously been tracked.

Args:
offset: the offset to acknowledge.

Returns:
GoogleAPICallError: On a commit failure.
"""
@@ -0,0 +1,52 @@
import queue
from collections import deque
from typing import Optional

from google.api_core.exceptions import FailedPrecondition
from google.cloud.pubsublite.cloudpubsub.internal.ack_set_tracker import AckSetTracker
from google.cloud.pubsublite.internal.wire.committer import Committer
from google.cloud.pubsublite_v1 import Cursor


class AckSetTrackerImpl(AckSetTracker):
_committer: Committer

_receipts: "deque[int]"
_acks: "queue.PriorityQueue[int]"

def __init__(self, committer: Committer):
self._committer = committer
self._receipts = deque()
self._acks = queue.PriorityQueue()

def track(self, offset: int):
if len(self._receipts) > 0:
last = self._receipts[0]
if last >= offset:
raise FailedPrecondition(f"Tried to track message {offset} which is before last tracked message {last}.")
self._receipts.append(offset)

async def ack(self, offset: int):
# Note: put_nowait is used here and below to ensure that the below logic is executed without yielding
# to another coroutine in the event loop. The queue is unbounded so it will never throw.
self._acks.put_nowait(offset)
prefix_acked_offset: Optional[int] = None
while len(self._receipts) != 0 and not self._acks.empty():
receipt = self._receipts.popleft()
ack = self._acks.get_nowait()
if receipt == ack:
prefix_acked_offset = receipt
continue
self._receipts.append(receipt)
self._acks.put(ack)
break
if prefix_acked_offset is None:
return
# Convert from last acked to first unacked.
await self._committer.commit(Cursor(offset=prefix_acked_offset+1))

async def __aenter__(self):
await self._committer.__aenter__()

async def __aexit__(self, exc_type, exc_value, traceback):
await self._committer.__aexit__(exc_type, exc_value, traceback)
69 changes: 69 additions & 0 deletions google/cloud/pubsublite/cloudpubsub/message_transforms.py
@@ -0,0 +1,69 @@
import datetime

from google.api_core.exceptions import InvalidArgument
from google.protobuf.timestamp_pb2 import Timestamp
from google.pubsub_v1 import PubsubMessage

from google.cloud.pubsublite_v1 import AttributeValues, SequencedMessage, PubSubMessage

PUBSUB_LITE_EVENT_TIME = "x-goog-pubsublite-event-time"


def encode_attribute_event_time(dt: datetime.datetime) -> str:
ts = Timestamp()
ts.FromDatetime(dt)
return ts.ToJsonString()


def decode_attribute_event_time(attr: str) -> datetime.datetime:
try:
ts = Timestamp()
ts.FromJsonString(attr)
return ts.ToDatetime()
except ValueError:
raise InvalidArgument("Invalid value for event time attribute.")


def _parse_attributes(values: AttributeValues) -> str:
if not len(values.values) == 1:
raise InvalidArgument("Received an unparseable message with multiple values for an attribute.")
value: bytes = values.values[0]
try:
return value.decode('utf-8')
except UnicodeError:
raise InvalidArgument("Received an unparseable message with a non-utf8 attribute.")


def to_cps_subscribe_message(source: SequencedMessage) -> PubsubMessage:
message: PubsubMessage = to_cps_publish_message(source.message)
message.message_id = str(source.cursor.offset)
message.publish_time = source.publish_time
return message


def to_cps_publish_message(source: PubSubMessage) -> PubsubMessage:
out = PubsubMessage()
try:
out.ordering_key = source.key.decode('utf-8')
except UnicodeError:
raise InvalidArgument("Received an unparseable message with a non-utf8 key.")
if PUBSUB_LITE_EVENT_TIME in source.attributes:
raise InvalidArgument("Special timestamp attribute exists in wire message. Unable to parse message.")
out.data = source.data
for key, values in source.attributes.items():
out.attributes[key] = _parse_attributes(values)
if 'event_time' in source:
out.attributes[PUBSUB_LITE_EVENT_TIME] = encode_attribute_event_time(source.event_time)
return out


def from_cps_publish_message(source: PubsubMessage) -> PubSubMessage:
out = PubSubMessage()
if PUBSUB_LITE_EVENT_TIME in source.attributes:
out.event_time = decode_attribute_event_time(source.attributes[PUBSUB_LITE_EVENT_TIME])
out.data = source.data
out.key = source.ordering_key.encode('utf-8')
for key, value in source.attributes.items():
if key != PUBSUB_LITE_EVENT_TIME:
out.attributes[key] = AttributeValues(values=[value.encode('utf-8')])
return out
13 changes: 13 additions & 0 deletions google/cloud/pubsublite/publish_metadata.py
@@ -1,8 +1,21 @@
from typing import NamedTuple
import json

from google.cloud.pubsublite_v1.types.common import Cursor
from google.cloud.pubsublite.partition import Partition


class PublishMetadata(NamedTuple):
partition: Partition
cursor: Cursor

def encode(self) -> str:
dpcollins-google marked this conversation as resolved.
Show resolved Hide resolved
return json.dumps({
'partition': self.partition.value,
'offset': self.cursor.offset
})

@staticmethod
def decode(source: str) -> 'PublishMetadata':
loaded = json.loads(source)
return PublishMetadata(partition=Partition(loaded['partition']), cursor=Cursor(offset=loaded['offset']))
Empty file.
Empty file.
@@ -0,0 +1,46 @@
from asynctest.mock import MagicMock, call
import pytest

# All test coroutines will be treated as marked.
from google.cloud.pubsublite.cloudpubsub.internal.ack_set_tracker import AckSetTracker
from google.cloud.pubsublite.cloudpubsub.internal.ack_set_tracker_impl import AckSetTrackerImpl
from google.cloud.pubsublite.internal.wire.committer import Committer
from google.cloud.pubsublite_v1 import Cursor

pytestmark = pytest.mark.asyncio


@pytest.fixture()
def committer():
committer = MagicMock(spec=Committer)
committer.__aenter__.return_value = committer
return committer


@pytest.fixture()
def tracker(committer):
return AckSetTrackerImpl(committer)


async def test_track_and_aggregate_acks(committer, tracker: AckSetTracker):
async with tracker:
committer.__aenter__.assert_called_once()
tracker.track(offset=1)
tracker.track(offset=3)
tracker.track(offset=5)
tracker.track(offset=7)

committer.commit.assert_has_calls([])
await tracker.ack(offset=3)
committer.commit.assert_has_calls([])
await tracker.ack(offset=5)
committer.commit.assert_has_calls([])
await tracker.ack(offset=1)
committer.commit.assert_has_calls([call(Cursor(offset=6))])

tracker.track(offset=8)
await tracker.ack(offset=7)
committer.commit.assert_has_calls([call(Cursor(offset=6)), call(Cursor(offset=8))])
committer.__aexit__.assert_called_once()


69 changes: 69 additions & 0 deletions tests/unit/pubsublite/cloudpubsub/message_transforms_test.py
@@ -0,0 +1,69 @@
import datetime

import pytest
from google.api_core.exceptions import InvalidArgument
from google.protobuf.timestamp_pb2 import Timestamp
from google.pubsub_v1 import PubsubMessage

from google.cloud.pubsublite.cloudpubsub.message_transforms import PUBSUB_LITE_EVENT_TIME, to_cps_subscribe_message, \
encode_attribute_event_time, from_cps_publish_message
from google.cloud.pubsublite_v1 import SequencedMessage, Cursor, PubSubMessage, AttributeValues

NOT_UTF8 = bytes.fromhex('ffff')


def test_invalid_subscribe_transform_key():
dpcollins-google marked this conversation as resolved.
Show resolved Hide resolved
with pytest.raises(InvalidArgument):
to_cps_subscribe_message(
SequencedMessage(message=PubSubMessage(key=NOT_UTF8), publish_time=Timestamp(), cursor=Cursor(offset=10),
size_bytes=10))


def test_invalid_subscribe_contains_magic_attribute():
with pytest.raises(InvalidArgument):
to_cps_subscribe_message(SequencedMessage(
message=PubSubMessage(key=b'def', attributes={PUBSUB_LITE_EVENT_TIME: AttributeValues(values=[b'abc'])}),
publish_time=Timestamp(seconds=10), cursor=Cursor(offset=10), size_bytes=10))


def test_invalid_subscribe_contains_multiple_attributes():
with pytest.raises(InvalidArgument):
to_cps_subscribe_message(SequencedMessage(
message=PubSubMessage(key=b'def', attributes={'xyz': AttributeValues(values=[b'abc', b''])}),
publish_time=Timestamp(seconds=10), cursor=Cursor(offset=10), size_bytes=10))


def test_invalid_subscribe_contains_non_utf8_attributes():
with pytest.raises(InvalidArgument):
to_cps_subscribe_message(SequencedMessage(
message=PubSubMessage(key=b'def', attributes={'xyz': AttributeValues(values=[NOT_UTF8])}),
publish_time=Timestamp(seconds=10), cursor=Cursor(offset=10), size_bytes=10))


def test_subscribe_transform_correct():
expected = PubsubMessage(
data=b'xyz', ordering_key='def', attributes={'x': 'abc', 'y': 'abc',
PUBSUB_LITE_EVENT_TIME: encode_attribute_event_time(
Timestamp(seconds=55).ToDatetime())},
message_id=str(10), publish_time=Timestamp(seconds=10))
result = to_cps_subscribe_message(SequencedMessage(
message=PubSubMessage(data=b'xyz', key=b'def', event_time=Timestamp(seconds=55),
attributes={'x': AttributeValues(values=[b'abc']), 'y': AttributeValues(values=[b'abc'])}),
publish_time=Timestamp(seconds=10), cursor=Cursor(offset=10), size_bytes=10))
assert result == expected


def test_publish_invalid_event_time():
with pytest.raises(InvalidArgument):
from_cps_publish_message(PubsubMessage(attributes={PUBSUB_LITE_EVENT_TIME: 'probably not an encoded proto'}))


def test_publish_valid_transform():
now = datetime.datetime.now()
expected = PubSubMessage(data=b'xyz', key=b'def', event_time=now,
attributes={'x': AttributeValues(values=[b'abc']), 'y': AttributeValues(values=[b'abc'])})
result = from_cps_publish_message(PubsubMessage(
data=b'xyz', ordering_key='def', attributes={'x': 'abc', 'y': 'abc',
PUBSUB_LITE_EVENT_TIME: encode_attribute_event_time(
now)}))
assert result == expected