From 903070df3f57220745cf1588287a3ad6de21a046 Mon Sep 17 00:00:00 2001 From: hannahrogers-google <52459909+hannahrogers-google@users.noreply.github.com> Date: Tue, 22 Sep 2020 11:44:06 -0400 Subject: [PATCH] feat: Implement transforms to/from Cloud Pub/Sub Messages (#20) * feat: Implement AckSetTracker which tracks message acknowledgements. Note that it is awkward to structure this like the java version, as there is no "AsyncCallable" type in python. * fix: Fix comments on ack_set_tracker. * feat: Implement transforms to/from Pub/Sub messages and Pub/Sub Lite messages. * fix: Change test to handle a bug in proto-plus-python. The following code returns 18005 seconds as the time is timezone adjusted: PubSubMessage(event_time=Timestamp(seconds=5).ToDatetime()) * fix: Replace try-blocks with pytest.raises. * fix: Replace pickle encoding/decoding with specific per-type encoding/decoding. Co-authored-by: Daniel Collins --- .../cloudpubsub/message_transforms.py | 69 +++++++++++++++++++ google/cloud/pubsublite/publish_metadata.py | 13 ++++ .../cloudpubsub/message_transforms_test.py | 69 +++++++++++++++++++ 3 files changed, 151 insertions(+) create mode 100644 google/cloud/pubsublite/cloudpubsub/message_transforms.py create mode 100644 tests/unit/pubsublite/cloudpubsub/message_transforms_test.py diff --git a/google/cloud/pubsublite/cloudpubsub/message_transforms.py b/google/cloud/pubsublite/cloudpubsub/message_transforms.py new file mode 100644 index 00000000..6db67546 --- /dev/null +++ b/google/cloud/pubsublite/cloudpubsub/message_transforms.py @@ -0,0 +1,69 @@ +import datetime + +from google.api_core.exceptions import InvalidArgument +from google.protobuf.timestamp_pb2 import Timestamp +from google.pubsub_v1 import PubsubMessage + +from google.cloud.pubsublite_v1 import AttributeValues, SequencedMessage, PubSubMessage + +PUBSUB_LITE_EVENT_TIME = "x-goog-pubsublite-event-time" + + +def encode_attribute_event_time(dt: datetime.datetime) -> str: + ts = Timestamp() + ts.FromDatetime(dt) + return ts.ToJsonString() + + +def decode_attribute_event_time(attr: str) -> datetime.datetime: + try: + ts = Timestamp() + ts.FromJsonString(attr) + return ts.ToDatetime() + except ValueError: + raise InvalidArgument("Invalid value for event time attribute.") + + +def _parse_attributes(values: AttributeValues) -> str: + if not len(values.values) == 1: + raise InvalidArgument("Received an unparseable message with multiple values for an attribute.") + value: bytes = values.values[0] + try: + return value.decode('utf-8') + except UnicodeError: + raise InvalidArgument("Received an unparseable message with a non-utf8 attribute.") + + +def to_cps_subscribe_message(source: SequencedMessage) -> PubsubMessage: + message: PubsubMessage = to_cps_publish_message(source.message) + message.message_id = str(source.cursor.offset) + message.publish_time = source.publish_time + return message + + +def to_cps_publish_message(source: PubSubMessage) -> PubsubMessage: + out = PubsubMessage() + try: + out.ordering_key = source.key.decode('utf-8') + except UnicodeError: + raise InvalidArgument("Received an unparseable message with a non-utf8 key.") + if PUBSUB_LITE_EVENT_TIME in source.attributes: + raise InvalidArgument("Special timestamp attribute exists in wire message. Unable to parse message.") + out.data = source.data + for key, values in source.attributes.items(): + out.attributes[key] = _parse_attributes(values) + if 'event_time' in source: + out.attributes[PUBSUB_LITE_EVENT_TIME] = encode_attribute_event_time(source.event_time) + return out + + +def from_cps_publish_message(source: PubsubMessage) -> PubSubMessage: + out = PubSubMessage() + if PUBSUB_LITE_EVENT_TIME in source.attributes: + out.event_time = decode_attribute_event_time(source.attributes[PUBSUB_LITE_EVENT_TIME]) + out.data = source.data + out.key = source.ordering_key.encode('utf-8') + for key, value in source.attributes.items(): + if key != PUBSUB_LITE_EVENT_TIME: + out.attributes[key] = AttributeValues(values=[value.encode('utf-8')]) + return out diff --git a/google/cloud/pubsublite/publish_metadata.py b/google/cloud/pubsublite/publish_metadata.py index 6b37211f..83c50515 100644 --- a/google/cloud/pubsublite/publish_metadata.py +++ b/google/cloud/pubsublite/publish_metadata.py @@ -1,4 +1,6 @@ from typing import NamedTuple +import json + from google.cloud.pubsublite_v1.types.common import Cursor from google.cloud.pubsublite.partition import Partition @@ -6,3 +8,14 @@ class PublishMetadata(NamedTuple): partition: Partition cursor: Cursor + + def encode(self) -> str: + return json.dumps({ + 'partition': self.partition.value, + 'offset': self.cursor.offset + }) + + @staticmethod + def decode(source: str) -> 'PublishMetadata': + loaded = json.loads(source) + return PublishMetadata(partition=Partition(loaded['partition']), cursor=Cursor(offset=loaded['offset'])) diff --git a/tests/unit/pubsublite/cloudpubsub/message_transforms_test.py b/tests/unit/pubsublite/cloudpubsub/message_transforms_test.py new file mode 100644 index 00000000..e133baaf --- /dev/null +++ b/tests/unit/pubsublite/cloudpubsub/message_transforms_test.py @@ -0,0 +1,69 @@ +import datetime + +import pytest +from google.api_core.exceptions import InvalidArgument +from google.protobuf.timestamp_pb2 import Timestamp +from google.pubsub_v1 import PubsubMessage + +from google.cloud.pubsublite.cloudpubsub.message_transforms import PUBSUB_LITE_EVENT_TIME, to_cps_subscribe_message, \ + encode_attribute_event_time, from_cps_publish_message +from google.cloud.pubsublite_v1 import SequencedMessage, Cursor, PubSubMessage, AttributeValues + +NOT_UTF8 = bytes.fromhex('ffff') + + +def test_invalid_subscribe_transform_key(): + with pytest.raises(InvalidArgument): + to_cps_subscribe_message( + SequencedMessage(message=PubSubMessage(key=NOT_UTF8), publish_time=Timestamp(), cursor=Cursor(offset=10), + size_bytes=10)) + + +def test_invalid_subscribe_contains_magic_attribute(): + with pytest.raises(InvalidArgument): + to_cps_subscribe_message(SequencedMessage( + message=PubSubMessage(key=b'def', attributes={PUBSUB_LITE_EVENT_TIME: AttributeValues(values=[b'abc'])}), + publish_time=Timestamp(seconds=10), cursor=Cursor(offset=10), size_bytes=10)) + + +def test_invalid_subscribe_contains_multiple_attributes(): + with pytest.raises(InvalidArgument): + to_cps_subscribe_message(SequencedMessage( + message=PubSubMessage(key=b'def', attributes={'xyz': AttributeValues(values=[b'abc', b''])}), + publish_time=Timestamp(seconds=10), cursor=Cursor(offset=10), size_bytes=10)) + + +def test_invalid_subscribe_contains_non_utf8_attributes(): + with pytest.raises(InvalidArgument): + to_cps_subscribe_message(SequencedMessage( + message=PubSubMessage(key=b'def', attributes={'xyz': AttributeValues(values=[NOT_UTF8])}), + publish_time=Timestamp(seconds=10), cursor=Cursor(offset=10), size_bytes=10)) + + +def test_subscribe_transform_correct(): + expected = PubsubMessage( + data=b'xyz', ordering_key='def', attributes={'x': 'abc', 'y': 'abc', + PUBSUB_LITE_EVENT_TIME: encode_attribute_event_time( + Timestamp(seconds=55).ToDatetime())}, + message_id=str(10), publish_time=Timestamp(seconds=10)) + result = to_cps_subscribe_message(SequencedMessage( + message=PubSubMessage(data=b'xyz', key=b'def', event_time=Timestamp(seconds=55), + attributes={'x': AttributeValues(values=[b'abc']), 'y': AttributeValues(values=[b'abc'])}), + publish_time=Timestamp(seconds=10), cursor=Cursor(offset=10), size_bytes=10)) + assert result == expected + + +def test_publish_invalid_event_time(): + with pytest.raises(InvalidArgument): + from_cps_publish_message(PubsubMessage(attributes={PUBSUB_LITE_EVENT_TIME: 'probably not an encoded proto'})) + + +def test_publish_valid_transform(): + now = datetime.datetime.now() + expected = PubSubMessage(data=b'xyz', key=b'def', event_time=now, + attributes={'x': AttributeValues(values=[b'abc']), 'y': AttributeValues(values=[b'abc'])}) + result = from_cps_publish_message(PubsubMessage( + data=b'xyz', ordering_key='def', attributes={'x': 'abc', 'y': 'abc', + PUBSUB_LITE_EVENT_TIME: encode_attribute_event_time( + now)})) + assert result == expected