diff --git a/google/cloud/pubsublite/cloudpubsub/message_transforms.py b/google/cloud/pubsublite/cloudpubsub/message_transforms.py new file mode 100644 index 00000000..6db67546 --- /dev/null +++ b/google/cloud/pubsublite/cloudpubsub/message_transforms.py @@ -0,0 +1,69 @@ +import datetime + +from google.api_core.exceptions import InvalidArgument +from google.protobuf.timestamp_pb2 import Timestamp +from google.pubsub_v1 import PubsubMessage + +from google.cloud.pubsublite_v1 import AttributeValues, SequencedMessage, PubSubMessage + +PUBSUB_LITE_EVENT_TIME = "x-goog-pubsublite-event-time" + + +def encode_attribute_event_time(dt: datetime.datetime) -> str: + ts = Timestamp() + ts.FromDatetime(dt) + return ts.ToJsonString() + + +def decode_attribute_event_time(attr: str) -> datetime.datetime: + try: + ts = Timestamp() + ts.FromJsonString(attr) + return ts.ToDatetime() + except ValueError: + raise InvalidArgument("Invalid value for event time attribute.") + + +def _parse_attributes(values: AttributeValues) -> str: + if not len(values.values) == 1: + raise InvalidArgument("Received an unparseable message with multiple values for an attribute.") + value: bytes = values.values[0] + try: + return value.decode('utf-8') + except UnicodeError: + raise InvalidArgument("Received an unparseable message with a non-utf8 attribute.") + + +def to_cps_subscribe_message(source: SequencedMessage) -> PubsubMessage: + message: PubsubMessage = to_cps_publish_message(source.message) + message.message_id = str(source.cursor.offset) + message.publish_time = source.publish_time + return message + + +def to_cps_publish_message(source: PubSubMessage) -> PubsubMessage: + out = PubsubMessage() + try: + out.ordering_key = source.key.decode('utf-8') + except UnicodeError: + raise InvalidArgument("Received an unparseable message with a non-utf8 key.") + if PUBSUB_LITE_EVENT_TIME in source.attributes: + raise InvalidArgument("Special timestamp attribute exists in wire message. Unable to parse message.") + out.data = source.data + for key, values in source.attributes.items(): + out.attributes[key] = _parse_attributes(values) + if 'event_time' in source: + out.attributes[PUBSUB_LITE_EVENT_TIME] = encode_attribute_event_time(source.event_time) + return out + + +def from_cps_publish_message(source: PubsubMessage) -> PubSubMessage: + out = PubSubMessage() + if PUBSUB_LITE_EVENT_TIME in source.attributes: + out.event_time = decode_attribute_event_time(source.attributes[PUBSUB_LITE_EVENT_TIME]) + out.data = source.data + out.key = source.ordering_key.encode('utf-8') + for key, value in source.attributes.items(): + if key != PUBSUB_LITE_EVENT_TIME: + out.attributes[key] = AttributeValues(values=[value.encode('utf-8')]) + return out diff --git a/google/cloud/pubsublite/publish_metadata.py b/google/cloud/pubsublite/publish_metadata.py index 6b37211f..83c50515 100644 --- a/google/cloud/pubsublite/publish_metadata.py +++ b/google/cloud/pubsublite/publish_metadata.py @@ -1,4 +1,6 @@ from typing import NamedTuple +import json + from google.cloud.pubsublite_v1.types.common import Cursor from google.cloud.pubsublite.partition import Partition @@ -6,3 +8,14 @@ class PublishMetadata(NamedTuple): partition: Partition cursor: Cursor + + def encode(self) -> str: + return json.dumps({ + 'partition': self.partition.value, + 'offset': self.cursor.offset + }) + + @staticmethod + def decode(source: str) -> 'PublishMetadata': + loaded = json.loads(source) + return PublishMetadata(partition=Partition(loaded['partition']), cursor=Cursor(offset=loaded['offset'])) diff --git a/tests/unit/pubsublite/cloudpubsub/message_transforms_test.py b/tests/unit/pubsublite/cloudpubsub/message_transforms_test.py new file mode 100644 index 00000000..e133baaf --- /dev/null +++ b/tests/unit/pubsublite/cloudpubsub/message_transforms_test.py @@ -0,0 +1,69 @@ +import datetime + +import pytest +from google.api_core.exceptions import InvalidArgument +from google.protobuf.timestamp_pb2 import Timestamp +from google.pubsub_v1 import PubsubMessage + +from google.cloud.pubsublite.cloudpubsub.message_transforms import PUBSUB_LITE_EVENT_TIME, to_cps_subscribe_message, \ + encode_attribute_event_time, from_cps_publish_message +from google.cloud.pubsublite_v1 import SequencedMessage, Cursor, PubSubMessage, AttributeValues + +NOT_UTF8 = bytes.fromhex('ffff') + + +def test_invalid_subscribe_transform_key(): + with pytest.raises(InvalidArgument): + to_cps_subscribe_message( + SequencedMessage(message=PubSubMessage(key=NOT_UTF8), publish_time=Timestamp(), cursor=Cursor(offset=10), + size_bytes=10)) + + +def test_invalid_subscribe_contains_magic_attribute(): + with pytest.raises(InvalidArgument): + to_cps_subscribe_message(SequencedMessage( + message=PubSubMessage(key=b'def', attributes={PUBSUB_LITE_EVENT_TIME: AttributeValues(values=[b'abc'])}), + publish_time=Timestamp(seconds=10), cursor=Cursor(offset=10), size_bytes=10)) + + +def test_invalid_subscribe_contains_multiple_attributes(): + with pytest.raises(InvalidArgument): + to_cps_subscribe_message(SequencedMessage( + message=PubSubMessage(key=b'def', attributes={'xyz': AttributeValues(values=[b'abc', b''])}), + publish_time=Timestamp(seconds=10), cursor=Cursor(offset=10), size_bytes=10)) + + +def test_invalid_subscribe_contains_non_utf8_attributes(): + with pytest.raises(InvalidArgument): + to_cps_subscribe_message(SequencedMessage( + message=PubSubMessage(key=b'def', attributes={'xyz': AttributeValues(values=[NOT_UTF8])}), + publish_time=Timestamp(seconds=10), cursor=Cursor(offset=10), size_bytes=10)) + + +def test_subscribe_transform_correct(): + expected = PubsubMessage( + data=b'xyz', ordering_key='def', attributes={'x': 'abc', 'y': 'abc', + PUBSUB_LITE_EVENT_TIME: encode_attribute_event_time( + Timestamp(seconds=55).ToDatetime())}, + message_id=str(10), publish_time=Timestamp(seconds=10)) + result = to_cps_subscribe_message(SequencedMessage( + message=PubSubMessage(data=b'xyz', key=b'def', event_time=Timestamp(seconds=55), + attributes={'x': AttributeValues(values=[b'abc']), 'y': AttributeValues(values=[b'abc'])}), + publish_time=Timestamp(seconds=10), cursor=Cursor(offset=10), size_bytes=10)) + assert result == expected + + +def test_publish_invalid_event_time(): + with pytest.raises(InvalidArgument): + from_cps_publish_message(PubsubMessage(attributes={PUBSUB_LITE_EVENT_TIME: 'probably not an encoded proto'})) + + +def test_publish_valid_transform(): + now = datetime.datetime.now() + expected = PubSubMessage(data=b'xyz', key=b'def', event_time=now, + attributes={'x': AttributeValues(values=[b'abc']), 'y': AttributeValues(values=[b'abc'])}) + result = from_cps_publish_message(PubsubMessage( + data=b'xyz', ordering_key='def', attributes={'x': 'abc', 'y': 'abc', + PUBSUB_LITE_EVENT_TIME: encode_attribute_event_time( + now)})) + assert result == expected