From d4c955a3d41bfdeba6bd9e99fe9ce5c1d00cea8d Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Fri, 18 Sep 2020 10:16:34 -0400 Subject: [PATCH 1/6] feat: Implement AckSetTracker which tracks message acknowledgements. Note that it is awkward to structure this like the java version, as there is no "AsyncCallable" type in python. --- .../cloud/pubsublite/cloudpubsub/__init__.py | 0 .../cloudpubsub/internal/__init__.py | 0 .../cloudpubsub/internal/ack_set_tracker.py | 31 +++++++++++ .../internal/ack_set_tracker_impl.py | 51 +++++++++++++++++++ tests/unit/pubsublite/cloudpubsub/__init__.py | 0 .../cloudpubsub/internal/__init__.py | 0 .../internal/ack_set_tracker_impl_test.py | 46 +++++++++++++++++ 7 files changed, 128 insertions(+) create mode 100644 google/cloud/pubsublite/cloudpubsub/__init__.py create mode 100644 google/cloud/pubsublite/cloudpubsub/internal/__init__.py create mode 100644 google/cloud/pubsublite/cloudpubsub/internal/ack_set_tracker.py create mode 100644 google/cloud/pubsublite/cloudpubsub/internal/ack_set_tracker_impl.py create mode 100644 tests/unit/pubsublite/cloudpubsub/__init__.py create mode 100644 tests/unit/pubsublite/cloudpubsub/internal/__init__.py create mode 100644 tests/unit/pubsublite/cloudpubsub/internal/ack_set_tracker_impl_test.py diff --git a/google/cloud/pubsublite/cloudpubsub/__init__.py b/google/cloud/pubsublite/cloudpubsub/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/google/cloud/pubsublite/cloudpubsub/internal/__init__.py b/google/cloud/pubsublite/cloudpubsub/internal/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/google/cloud/pubsublite/cloudpubsub/internal/ack_set_tracker.py b/google/cloud/pubsublite/cloudpubsub/internal/ack_set_tracker.py new file mode 100644 index 00000000..de8cbc4c --- /dev/null +++ b/google/cloud/pubsublite/cloudpubsub/internal/ack_set_tracker.py @@ -0,0 +1,31 @@ +from abc import abstractmethod +from typing import AsyncContextManager + + +class AckSetTracker(AsyncContextManager): + """ + An AckSetTracker tracks disjoint acknowledged messages and commits them when necessary. + """ + @abstractmethod + def track(self, offset: int): + """ + Track the provided offset. + + Args: + offset: the offset to track. + + Raises: + GoogleAPICallError: On an invalid offset to track. + """ + + @abstractmethod + async def ack(self, offset: int): + """ + Acknowledge the message with the provided offset. The offset must have previously been tracked. + + Args: + offset: the offset to acknowledge. + + Returns: + GoogleAPICallError: On a commit failure. + """ \ No newline at end of file diff --git a/google/cloud/pubsublite/cloudpubsub/internal/ack_set_tracker_impl.py b/google/cloud/pubsublite/cloudpubsub/internal/ack_set_tracker_impl.py new file mode 100644 index 00000000..b722e313 --- /dev/null +++ b/google/cloud/pubsublite/cloudpubsub/internal/ack_set_tracker_impl.py @@ -0,0 +1,51 @@ +import queue +from collections import deque +from typing import Optional + +from google.api_core.exceptions import FailedPrecondition +from google.cloud.pubsublite.cloudpubsub.internal.ack_set_tracker import AckSetTracker +from google.cloud.pubsublite.internal.wire.committer import Committer +from google.cloud.pubsublite_v1 import Cursor + + +class AckSetTrackerImpl(AckSetTracker): + _committer: Committer + + _receipts: "deque[int]" + _acks: "queue.PriorityQueue[int]" + + def __init__(self, committer: Committer): + self._committer = committer + self._receipts = deque() + self._acks = queue.PriorityQueue() + + def track(self, offset: int): + if len(self._receipts) > 0: + last = self._receipts.pop() + if last >= offset: + raise FailedPrecondition(f"Tried to track message {offset} which is before last tracked message {last}.") + self._receipts.append(last) + self._receipts.append(offset) + + async def ack(self, offset: int): + self._acks.put_nowait(offset) + prefix_acked_offset: Optional[int] = None + while len(self._receipts) != 0 and not self._acks.empty(): + receipt = self._receipts.popleft() + ack = self._acks.get_nowait() + if receipt == ack: + prefix_acked_offset = receipt + continue + self._receipts.append(receipt) + self._acks.put(ack) + break + if prefix_acked_offset is None: + return + # Convert from last acked to first unacked. + await self._committer.commit(Cursor(offset=prefix_acked_offset+1)) + + async def __aenter__(self): + await self._committer.__aenter__() + + async def __aexit__(self, exc_type, exc_value, traceback): + await self._committer.__aexit__(exc_type, exc_value, traceback) diff --git a/tests/unit/pubsublite/cloudpubsub/__init__.py b/tests/unit/pubsublite/cloudpubsub/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/unit/pubsublite/cloudpubsub/internal/__init__.py b/tests/unit/pubsublite/cloudpubsub/internal/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/unit/pubsublite/cloudpubsub/internal/ack_set_tracker_impl_test.py b/tests/unit/pubsublite/cloudpubsub/internal/ack_set_tracker_impl_test.py new file mode 100644 index 00000000..6b5bf8f7 --- /dev/null +++ b/tests/unit/pubsublite/cloudpubsub/internal/ack_set_tracker_impl_test.py @@ -0,0 +1,46 @@ +from asynctest.mock import MagicMock, CoroutineMock, call +import pytest + +# All test coroutines will be treated as marked. +from google.cloud.pubsublite.cloudpubsub.internal.ack_set_tracker import AckSetTracker +from google.cloud.pubsublite.cloudpubsub.internal.ack_set_tracker_impl import AckSetTrackerImpl +from google.cloud.pubsublite.internal.wire.committer import Committer +from google.cloud.pubsublite_v1 import Cursor + +pytestmark = pytest.mark.asyncio + + +@pytest.fixture() +def committer(): + committer = MagicMock(spec=Committer) + committer.__aenter__.return_value = committer + return committer + + +@pytest.fixture() +def tracker(committer): + return AckSetTrackerImpl(committer) + + +async def test_track_and_aggregate_acks(committer, tracker: AckSetTracker): + async with tracker: + committer.__aenter__.assert_called_once() + tracker.track(offset=1) + tracker.track(offset=3) + tracker.track(offset=5) + tracker.track(offset=7) + + committer.commit.assert_has_calls([]) + await tracker.ack(offset=3) + committer.commit.assert_has_calls([]) + await tracker.ack(offset=5) + committer.commit.assert_has_calls([]) + await tracker.ack(offset=1) + committer.commit.assert_has_calls([call(Cursor(offset=6))]) + + tracker.track(offset=8) + await tracker.ack(offset=7) + committer.commit.assert_has_calls([call(Cursor(offset=6)), call(Cursor(offset=8))]) + committer.__aexit__.assert_called_once() + + From 8a083177106186e7f476dd9435ea920f68adcc5c Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Mon, 21 Sep 2020 15:22:03 -0400 Subject: [PATCH 2/6] fix: Fix comments on ack_set_tracker. --- .../cloud/pubsublite/cloudpubsub/internal/ack_set_tracker.py | 5 +++-- .../pubsublite/cloudpubsub/internal/ack_set_tracker_impl.py | 5 +++-- .../cloudpubsub/internal/ack_set_tracker_impl_test.py | 2 +- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/google/cloud/pubsublite/cloudpubsub/internal/ack_set_tracker.py b/google/cloud/pubsublite/cloudpubsub/internal/ack_set_tracker.py index de8cbc4c..b2d838d4 100644 --- a/google/cloud/pubsublite/cloudpubsub/internal/ack_set_tracker.py +++ b/google/cloud/pubsublite/cloudpubsub/internal/ack_set_tracker.py @@ -4,7 +4,8 @@ class AckSetTracker(AsyncContextManager): """ - An AckSetTracker tracks disjoint acknowledged messages and commits them when necessary. + An AckSetTracker tracks disjoint acknowledged messages and commits them when a contiguous prefix of tracked offsets + is aggregated. """ @abstractmethod def track(self, offset: int): @@ -28,4 +29,4 @@ async def ack(self, offset: int): Returns: GoogleAPICallError: On a commit failure. - """ \ No newline at end of file + """ diff --git a/google/cloud/pubsublite/cloudpubsub/internal/ack_set_tracker_impl.py b/google/cloud/pubsublite/cloudpubsub/internal/ack_set_tracker_impl.py index b722e313..45f0cd56 100644 --- a/google/cloud/pubsublite/cloudpubsub/internal/ack_set_tracker_impl.py +++ b/google/cloud/pubsublite/cloudpubsub/internal/ack_set_tracker_impl.py @@ -21,13 +21,14 @@ def __init__(self, committer: Committer): def track(self, offset: int): if len(self._receipts) > 0: - last = self._receipts.pop() + last = self._receipts[0] if last >= offset: raise FailedPrecondition(f"Tried to track message {offset} which is before last tracked message {last}.") - self._receipts.append(last) self._receipts.append(offset) async def ack(self, offset: int): + # Note: put_nowait is used here and below to ensure that the below logic is executed without yielding + # to another coroutine in the event loop. The queue is unbounded so it will never throw. self._acks.put_nowait(offset) prefix_acked_offset: Optional[int] = None while len(self._receipts) != 0 and not self._acks.empty(): diff --git a/tests/unit/pubsublite/cloudpubsub/internal/ack_set_tracker_impl_test.py b/tests/unit/pubsublite/cloudpubsub/internal/ack_set_tracker_impl_test.py index 6b5bf8f7..f7089044 100644 --- a/tests/unit/pubsublite/cloudpubsub/internal/ack_set_tracker_impl_test.py +++ b/tests/unit/pubsublite/cloudpubsub/internal/ack_set_tracker_impl_test.py @@ -1,4 +1,4 @@ -from asynctest.mock import MagicMock, CoroutineMock, call +from asynctest.mock import MagicMock, call import pytest # All test coroutines will be treated as marked. From 26ead2d0d0b8278738233cd6a4339ff99f8734b8 Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Mon, 21 Sep 2020 15:46:11 -0400 Subject: [PATCH 3/6] feat: Implement transforms to/from Pub/Sub messages and Pub/Sub Lite messages. --- .../cloudpubsub/message_transforms.py | 70 ++++++++++++++++++ google/cloud/pubsublite/internal/b64_utils.py | 11 +++ google/cloud/pubsublite/publish_metadata.py | 11 ++- .../cloudpubsub/message_transforms_test.py | 72 +++++++++++++++++++ 4 files changed, 163 insertions(+), 1 deletion(-) create mode 100644 google/cloud/pubsublite/cloudpubsub/message_transforms.py create mode 100644 google/cloud/pubsublite/internal/b64_utils.py create mode 100644 tests/unit/pubsublite/cloudpubsub/message_transforms_test.py diff --git a/google/cloud/pubsublite/cloudpubsub/message_transforms.py b/google/cloud/pubsublite/cloudpubsub/message_transforms.py new file mode 100644 index 00000000..9d450d2b --- /dev/null +++ b/google/cloud/pubsublite/cloudpubsub/message_transforms.py @@ -0,0 +1,70 @@ +import datetime +from typing import cast + +from google.api_core.exceptions import InvalidArgument +from google.protobuf.timestamp_pb2 import Timestamp +from google.pubsub_v1 import PubsubMessage + +from google.cloud.pubsublite.internal.b64_utils import to_b64_string, from_b64_string +from google.cloud.pubsublite_v1 import AttributeValues, SequencedMessage, PubSubMessage + +PUBSUB_LITE_EVENT_TIME = "x-goog-pubsublite-event-time" + + +def encode_attribute_event_time(dt: datetime.datetime) -> str: + ts = Timestamp() + ts.FromDatetime(dt) + return to_b64_string(ts) + + +def decode_attribute_event_time(attr: str) -> datetime.datetime: + try: + ts = cast(Timestamp, from_b64_string(attr)) + return ts.ToDatetime() + except ValueError: + raise InvalidArgument("Invalid value for event time attribute.") + + +def _parse_attributes(values: AttributeValues) -> str: + if not len(values.values) == 1: + raise InvalidArgument("Received an unparseable message with multiple values for an attribute.") + value: bytes = values.values[0] + try: + return value.decode('utf-8') + except UnicodeError: + raise InvalidArgument("Received an unparseable message with a non-utf8 attribute.") + + +def to_cps_subscribe_message(source: SequencedMessage) -> PubsubMessage: + message: PubsubMessage = to_cps_publish_message(source.message) + message.message_id = str(source.cursor.offset) + message.publish_time = source.publish_time + return message + + +def to_cps_publish_message(source: PubSubMessage) -> PubsubMessage: + out = PubsubMessage() + try: + out.ordering_key = source.key.decode('utf-8') + except UnicodeError: + raise InvalidArgument("Received an unparseable message with a non-utf8 key.") + if PUBSUB_LITE_EVENT_TIME in source.attributes: + raise InvalidArgument("Special timestamp attribute exists in wire message. Unable to parse message.") + out.data = source.data + for key, values in source.attributes.items(): + out.attributes[key] = _parse_attributes(values) + if 'event_time' in source: + out.attributes[PUBSUB_LITE_EVENT_TIME] = encode_attribute_event_time(source.event_time) + return out + + +def from_cps_publish_message(source: PubsubMessage) -> PubSubMessage: + out = PubSubMessage() + if PUBSUB_LITE_EVENT_TIME in source.attributes: + out.event_time = decode_attribute_event_time(source.attributes[PUBSUB_LITE_EVENT_TIME]) + out.data = source.data + out.key = source.ordering_key.encode('utf-8') + for key, value in source.attributes.items(): + if key != PUBSUB_LITE_EVENT_TIME: + out.attributes[key] = AttributeValues(values=[value.encode('utf-8')]) + return out diff --git a/google/cloud/pubsublite/internal/b64_utils.py b/google/cloud/pubsublite/internal/b64_utils.py new file mode 100644 index 00000000..1a5dd15c --- /dev/null +++ b/google/cloud/pubsublite/internal/b64_utils.py @@ -0,0 +1,11 @@ +import base64 +import pickle + + +def to_b64_string(src: object) -> str: + return base64.b64encode(pickle.dumps(src)).decode('utf-8') + + +def from_b64_string(src: str) -> object: + return pickle.loads(base64.b64decode(src.encode('utf-8'))) + diff --git a/google/cloud/pubsublite/publish_metadata.py b/google/cloud/pubsublite/publish_metadata.py index 6b37211f..4feb8adf 100644 --- a/google/cloud/pubsublite/publish_metadata.py +++ b/google/cloud/pubsublite/publish_metadata.py @@ -1,4 +1,6 @@ -from typing import NamedTuple +from typing import NamedTuple, cast + +from google.cloud.pubsublite.internal.b64_utils import to_b64_string, from_b64_string from google.cloud.pubsublite_v1.types.common import Cursor from google.cloud.pubsublite.partition import Partition @@ -6,3 +8,10 @@ class PublishMetadata(NamedTuple): partition: Partition cursor: Cursor + + def encode(self) -> str: + return to_b64_string(self) + + @staticmethod + def decode(source: str) -> 'PublishMetadata': + return cast(PublishMetadata, from_b64_string(source)) diff --git a/tests/unit/pubsublite/cloudpubsub/message_transforms_test.py b/tests/unit/pubsublite/cloudpubsub/message_transforms_test.py new file mode 100644 index 00000000..ef83d723 --- /dev/null +++ b/tests/unit/pubsublite/cloudpubsub/message_transforms_test.py @@ -0,0 +1,72 @@ +import grpc +from google.api_core.exceptions import GoogleAPICallError +from google.protobuf.timestamp_pb2 import Timestamp +from google.pubsub_v1 import PubsubMessage + +from google.cloud.pubsublite.cloudpubsub.message_transforms import PUBSUB_LITE_EVENT_TIME, to_cps_subscribe_message, \ + encode_attribute_event_time, from_cps_publish_message +from google.cloud.pubsublite_v1 import SequencedMessage, Cursor, PubSubMessage, AttributeValues + +NOT_UTF8 = bytes.fromhex('ffff') + + +def test_invalid_subscribe_transform_key(): + try: + to_cps_subscribe_message( + SequencedMessage(message=PubSubMessage(key=NOT_UTF8), publish_time=Timestamp(), cursor=Cursor(offset=10), + size_bytes=10)) + assert False + except GoogleAPICallError as e: + assert e.grpc_status_code == grpc.StatusCode.INVALID_ARGUMENT + + +def test_invalid_subscribe_contains_magic_attribute(): + try: + to_cps_subscribe_message(SequencedMessage( + message=PubSubMessage(key=b'def', attributes={PUBSUB_LITE_EVENT_TIME: AttributeValues(values=[b'abc'])}), + publish_time=Timestamp(seconds=10), cursor=Cursor(offset=10), size_bytes=10)) + assert False + except GoogleAPICallError as e: + assert e.grpc_status_code == grpc.StatusCode.INVALID_ARGUMENT + + +def test_invalid_subscribe_contains_multiple_attributes(): + try: + to_cps_subscribe_message(SequencedMessage( + message=PubSubMessage(key=b'def', attributes={'xyz': AttributeValues(values=[b'abc', b''])}), + publish_time=Timestamp(seconds=10), cursor=Cursor(offset=10), size_bytes=10)) + assert False + except GoogleAPICallError as e: + assert e.grpc_status_code == grpc.StatusCode.INVALID_ARGUMENT + + +def test_invalid_subscribe_contains_non_utf8_attributes(): + try: + to_cps_subscribe_message(SequencedMessage( + message=PubSubMessage(key=b'def', attributes={'xyz': AttributeValues(values=[NOT_UTF8])}), + publish_time=Timestamp(seconds=10), cursor=Cursor(offset=10), size_bytes=10)) + assert False + except GoogleAPICallError as e: + assert e.grpc_status_code == grpc.StatusCode.INVALID_ARGUMENT + + +def test_subscribe_transform_correct(): + expected = PubsubMessage(data=b'xyz', ordering_key='def', attributes={'x': 'abc', 'y': 'abc', PUBSUB_LITE_EVENT_TIME: encode_attribute_event_time(Timestamp(seconds=55).ToDatetime())}, message_id=str(10), publish_time=Timestamp(seconds=10)) + result = to_cps_subscribe_message(SequencedMessage( + message=PubSubMessage(data=b'xyz', key=b'def', event_time=Timestamp(seconds=55), attributes={'x': AttributeValues(values=[b'abc']), 'y': AttributeValues(values=[b'abc'])}), + publish_time=Timestamp(seconds=10), cursor=Cursor(offset=10), size_bytes=10)) + assert result == expected + + +def test_publish_invalid_event_time(): + try: + from_cps_publish_message(PubsubMessage(attributes={PUBSUB_LITE_EVENT_TIME: 'probably not an encoded proto'})) + assert False + except GoogleAPICallError as e: + assert e.grpc_status_code == grpc.StatusCode.INVALID_ARGUMENT + + +def test_publish_valid_transform(): + expected = PubSubMessage(data=b'xyz', key=b'def', event_time=Timestamp(seconds=55), attributes={'x': AttributeValues(values=[b'abc']), 'y': AttributeValues(values=[b'abc'])}) + result = from_cps_publish_message(PubsubMessage(data=b'xyz', ordering_key='def', attributes={'x': 'abc', 'y': 'abc', PUBSUB_LITE_EVENT_TIME: encode_attribute_event_time(Timestamp(seconds=55).ToDatetime())})) + assert result == expected From 7a43e2a81a01a1fa03a960d691c542f84c2eae5e Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Tue, 22 Sep 2020 09:40:23 -0400 Subject: [PATCH 4/6] fix: Change test to handle a bug in proto-plus-python. The following code returns 18005 seconds as the time is timezone adjusted: PubSubMessage(event_time=Timestamp(seconds=5).ToDatetime()) --- .../cloudpubsub/message_transforms_test.py | 23 ++++++++++++++----- 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/tests/unit/pubsublite/cloudpubsub/message_transforms_test.py b/tests/unit/pubsublite/cloudpubsub/message_transforms_test.py index ef83d723..8617d3bc 100644 --- a/tests/unit/pubsublite/cloudpubsub/message_transforms_test.py +++ b/tests/unit/pubsublite/cloudpubsub/message_transforms_test.py @@ -1,8 +1,9 @@ +import datetime + import grpc from google.api_core.exceptions import GoogleAPICallError from google.protobuf.timestamp_pb2 import Timestamp from google.pubsub_v1 import PubsubMessage - from google.cloud.pubsublite.cloudpubsub.message_transforms import PUBSUB_LITE_EVENT_TIME, to_cps_subscribe_message, \ encode_attribute_event_time, from_cps_publish_message from google.cloud.pubsublite_v1 import SequencedMessage, Cursor, PubSubMessage, AttributeValues @@ -51,10 +52,15 @@ def test_invalid_subscribe_contains_non_utf8_attributes(): def test_subscribe_transform_correct(): - expected = PubsubMessage(data=b'xyz', ordering_key='def', attributes={'x': 'abc', 'y': 'abc', PUBSUB_LITE_EVENT_TIME: encode_attribute_event_time(Timestamp(seconds=55).ToDatetime())}, message_id=str(10), publish_time=Timestamp(seconds=10)) + expected = PubsubMessage( + data=b'xyz', ordering_key='def', attributes={'x': 'abc', 'y': 'abc', + PUBSUB_LITE_EVENT_TIME: encode_attribute_event_time( + Timestamp(seconds=55).ToDatetime())}, + message_id=str(10), publish_time=Timestamp(seconds=10)) result = to_cps_subscribe_message(SequencedMessage( - message=PubSubMessage(data=b'xyz', key=b'def', event_time=Timestamp(seconds=55), attributes={'x': AttributeValues(values=[b'abc']), 'y': AttributeValues(values=[b'abc'])}), - publish_time=Timestamp(seconds=10), cursor=Cursor(offset=10), size_bytes=10)) + message=PubSubMessage(data=b'xyz', key=b'def', event_time=Timestamp(seconds=55), + attributes={'x': AttributeValues(values=[b'abc']), 'y': AttributeValues(values=[b'abc'])}), + publish_time=Timestamp(seconds=10), cursor=Cursor(offset=10), size_bytes=10)) assert result == expected @@ -67,6 +73,11 @@ def test_publish_invalid_event_time(): def test_publish_valid_transform(): - expected = PubSubMessage(data=b'xyz', key=b'def', event_time=Timestamp(seconds=55), attributes={'x': AttributeValues(values=[b'abc']), 'y': AttributeValues(values=[b'abc'])}) - result = from_cps_publish_message(PubsubMessage(data=b'xyz', ordering_key='def', attributes={'x': 'abc', 'y': 'abc', PUBSUB_LITE_EVENT_TIME: encode_attribute_event_time(Timestamp(seconds=55).ToDatetime())})) + now = datetime.datetime.now() + expected = PubSubMessage(data=b'xyz', key=b'def', event_time=now, + attributes={'x': AttributeValues(values=[b'abc']), 'y': AttributeValues(values=[b'abc'])}) + result = from_cps_publish_message(PubsubMessage( + data=b'xyz', ordering_key='def', attributes={'x': 'abc', 'y': 'abc', + PUBSUB_LITE_EVENT_TIME: encode_attribute_event_time( + now)})) assert result == expected From c432d1dac86aaba62303c33c7277279790d060e8 Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Tue, 22 Sep 2020 09:49:20 -0400 Subject: [PATCH 5/6] fix: Replace try-blocks with pytest.raises. --- .../cloudpubsub/message_transforms_test.py | 30 +++++-------------- 1 file changed, 8 insertions(+), 22 deletions(-) diff --git a/tests/unit/pubsublite/cloudpubsub/message_transforms_test.py b/tests/unit/pubsublite/cloudpubsub/message_transforms_test.py index 8617d3bc..e133baaf 100644 --- a/tests/unit/pubsublite/cloudpubsub/message_transforms_test.py +++ b/tests/unit/pubsublite/cloudpubsub/message_transforms_test.py @@ -1,9 +1,10 @@ import datetime -import grpc -from google.api_core.exceptions import GoogleAPICallError +import pytest +from google.api_core.exceptions import InvalidArgument from google.protobuf.timestamp_pb2 import Timestamp from google.pubsub_v1 import PubsubMessage + from google.cloud.pubsublite.cloudpubsub.message_transforms import PUBSUB_LITE_EVENT_TIME, to_cps_subscribe_message, \ encode_attribute_event_time, from_cps_publish_message from google.cloud.pubsublite_v1 import SequencedMessage, Cursor, PubSubMessage, AttributeValues @@ -12,43 +13,31 @@ def test_invalid_subscribe_transform_key(): - try: + with pytest.raises(InvalidArgument): to_cps_subscribe_message( SequencedMessage(message=PubSubMessage(key=NOT_UTF8), publish_time=Timestamp(), cursor=Cursor(offset=10), size_bytes=10)) - assert False - except GoogleAPICallError as e: - assert e.grpc_status_code == grpc.StatusCode.INVALID_ARGUMENT def test_invalid_subscribe_contains_magic_attribute(): - try: + with pytest.raises(InvalidArgument): to_cps_subscribe_message(SequencedMessage( message=PubSubMessage(key=b'def', attributes={PUBSUB_LITE_EVENT_TIME: AttributeValues(values=[b'abc'])}), publish_time=Timestamp(seconds=10), cursor=Cursor(offset=10), size_bytes=10)) - assert False - except GoogleAPICallError as e: - assert e.grpc_status_code == grpc.StatusCode.INVALID_ARGUMENT def test_invalid_subscribe_contains_multiple_attributes(): - try: + with pytest.raises(InvalidArgument): to_cps_subscribe_message(SequencedMessage( message=PubSubMessage(key=b'def', attributes={'xyz': AttributeValues(values=[b'abc', b''])}), publish_time=Timestamp(seconds=10), cursor=Cursor(offset=10), size_bytes=10)) - assert False - except GoogleAPICallError as e: - assert e.grpc_status_code == grpc.StatusCode.INVALID_ARGUMENT def test_invalid_subscribe_contains_non_utf8_attributes(): - try: + with pytest.raises(InvalidArgument): to_cps_subscribe_message(SequencedMessage( message=PubSubMessage(key=b'def', attributes={'xyz': AttributeValues(values=[NOT_UTF8])}), publish_time=Timestamp(seconds=10), cursor=Cursor(offset=10), size_bytes=10)) - assert False - except GoogleAPICallError as e: - assert e.grpc_status_code == grpc.StatusCode.INVALID_ARGUMENT def test_subscribe_transform_correct(): @@ -65,11 +54,8 @@ def test_subscribe_transform_correct(): def test_publish_invalid_event_time(): - try: + with pytest.raises(InvalidArgument): from_cps_publish_message(PubsubMessage(attributes={PUBSUB_LITE_EVENT_TIME: 'probably not an encoded proto'})) - assert False - except GoogleAPICallError as e: - assert e.grpc_status_code == grpc.StatusCode.INVALID_ARGUMENT def test_publish_valid_transform(): From 9c35ac82608fae0fd266a7f47d4cb0db248211c6 Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Tue, 22 Sep 2020 10:41:06 -0400 Subject: [PATCH 6/6] fix: Replace pickle encoding/decoding with specific per-type encoding/decoding. --- .../pubsublite/cloudpubsub/message_transforms.py | 7 +++---- google/cloud/pubsublite/internal/b64_utils.py | 11 ----------- google/cloud/pubsublite/publish_metadata.py | 12 ++++++++---- 3 files changed, 11 insertions(+), 19 deletions(-) delete mode 100644 google/cloud/pubsublite/internal/b64_utils.py diff --git a/google/cloud/pubsublite/cloudpubsub/message_transforms.py b/google/cloud/pubsublite/cloudpubsub/message_transforms.py index 9d450d2b..6db67546 100644 --- a/google/cloud/pubsublite/cloudpubsub/message_transforms.py +++ b/google/cloud/pubsublite/cloudpubsub/message_transforms.py @@ -1,11 +1,9 @@ import datetime -from typing import cast from google.api_core.exceptions import InvalidArgument from google.protobuf.timestamp_pb2 import Timestamp from google.pubsub_v1 import PubsubMessage -from google.cloud.pubsublite.internal.b64_utils import to_b64_string, from_b64_string from google.cloud.pubsublite_v1 import AttributeValues, SequencedMessage, PubSubMessage PUBSUB_LITE_EVENT_TIME = "x-goog-pubsublite-event-time" @@ -14,12 +12,13 @@ def encode_attribute_event_time(dt: datetime.datetime) -> str: ts = Timestamp() ts.FromDatetime(dt) - return to_b64_string(ts) + return ts.ToJsonString() def decode_attribute_event_time(attr: str) -> datetime.datetime: try: - ts = cast(Timestamp, from_b64_string(attr)) + ts = Timestamp() + ts.FromJsonString(attr) return ts.ToDatetime() except ValueError: raise InvalidArgument("Invalid value for event time attribute.") diff --git a/google/cloud/pubsublite/internal/b64_utils.py b/google/cloud/pubsublite/internal/b64_utils.py deleted file mode 100644 index 1a5dd15c..00000000 --- a/google/cloud/pubsublite/internal/b64_utils.py +++ /dev/null @@ -1,11 +0,0 @@ -import base64 -import pickle - - -def to_b64_string(src: object) -> str: - return base64.b64encode(pickle.dumps(src)).decode('utf-8') - - -def from_b64_string(src: str) -> object: - return pickle.loads(base64.b64decode(src.encode('utf-8'))) - diff --git a/google/cloud/pubsublite/publish_metadata.py b/google/cloud/pubsublite/publish_metadata.py index 4feb8adf..83c50515 100644 --- a/google/cloud/pubsublite/publish_metadata.py +++ b/google/cloud/pubsublite/publish_metadata.py @@ -1,6 +1,6 @@ -from typing import NamedTuple, cast +from typing import NamedTuple +import json -from google.cloud.pubsublite.internal.b64_utils import to_b64_string, from_b64_string from google.cloud.pubsublite_v1.types.common import Cursor from google.cloud.pubsublite.partition import Partition @@ -10,8 +10,12 @@ class PublishMetadata(NamedTuple): cursor: Cursor def encode(self) -> str: - return to_b64_string(self) + return json.dumps({ + 'partition': self.partition.value, + 'offset': self.cursor.offset + }) @staticmethod def decode(source: str) -> 'PublishMetadata': - return cast(PublishMetadata, from_b64_string(source)) + loaded = json.loads(source) + return PublishMetadata(partition=Partition(loaded['partition']), cursor=Cursor(offset=loaded['offset']))