Skip to content

Commit

Permalink
feat: Implement transforms to/from Cloud Pub/Sub Messages (#20)
Browse files Browse the repository at this point in the history
* feat: Implement AckSetTracker which tracks message acknowledgements.

Note that it is awkward to structure this like the java version, as there is no "AsyncCallable" type in python.

* fix: Fix comments on ack_set_tracker.

* feat: Implement transforms to/from Pub/Sub messages and Pub/Sub Lite messages.

* fix: Change test to handle a bug in proto-plus-python.

The following code returns 18005 seconds as the time is timezone adjusted: PubSubMessage(event_time=Timestamp(seconds=5).ToDatetime())

* fix: Replace try-blocks with pytest.raises.

* fix: Replace pickle encoding/decoding with specific per-type encoding/decoding.

Co-authored-by: Daniel Collins <dpcollins@google.com>
  • Loading branch information
hannahrogers-google and dpcollins-google committed Sep 22, 2020
1 parent 7f88458 commit 903070d
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 0 deletions.
69 changes: 69 additions & 0 deletions google/cloud/pubsublite/cloudpubsub/message_transforms.py
@@ -0,0 +1,69 @@
import datetime

from google.api_core.exceptions import InvalidArgument
from google.protobuf.timestamp_pb2 import Timestamp
from google.pubsub_v1 import PubsubMessage

from google.cloud.pubsublite_v1 import AttributeValues, SequencedMessage, PubSubMessage

PUBSUB_LITE_EVENT_TIME = "x-goog-pubsublite-event-time"


def encode_attribute_event_time(dt: datetime.datetime) -> str:
ts = Timestamp()
ts.FromDatetime(dt)
return ts.ToJsonString()


def decode_attribute_event_time(attr: str) -> datetime.datetime:
try:
ts = Timestamp()
ts.FromJsonString(attr)
return ts.ToDatetime()
except ValueError:
raise InvalidArgument("Invalid value for event time attribute.")


def _parse_attributes(values: AttributeValues) -> str:
if not len(values.values) == 1:
raise InvalidArgument("Received an unparseable message with multiple values for an attribute.")
value: bytes = values.values[0]
try:
return value.decode('utf-8')
except UnicodeError:
raise InvalidArgument("Received an unparseable message with a non-utf8 attribute.")


def to_cps_subscribe_message(source: SequencedMessage) -> PubsubMessage:
message: PubsubMessage = to_cps_publish_message(source.message)
message.message_id = str(source.cursor.offset)
message.publish_time = source.publish_time
return message


def to_cps_publish_message(source: PubSubMessage) -> PubsubMessage:
out = PubsubMessage()
try:
out.ordering_key = source.key.decode('utf-8')
except UnicodeError:
raise InvalidArgument("Received an unparseable message with a non-utf8 key.")
if PUBSUB_LITE_EVENT_TIME in source.attributes:
raise InvalidArgument("Special timestamp attribute exists in wire message. Unable to parse message.")
out.data = source.data
for key, values in source.attributes.items():
out.attributes[key] = _parse_attributes(values)
if 'event_time' in source:
out.attributes[PUBSUB_LITE_EVENT_TIME] = encode_attribute_event_time(source.event_time)
return out


def from_cps_publish_message(source: PubsubMessage) -> PubSubMessage:
out = PubSubMessage()
if PUBSUB_LITE_EVENT_TIME in source.attributes:
out.event_time = decode_attribute_event_time(source.attributes[PUBSUB_LITE_EVENT_TIME])
out.data = source.data
out.key = source.ordering_key.encode('utf-8')
for key, value in source.attributes.items():
if key != PUBSUB_LITE_EVENT_TIME:
out.attributes[key] = AttributeValues(values=[value.encode('utf-8')])
return out
13 changes: 13 additions & 0 deletions google/cloud/pubsublite/publish_metadata.py
@@ -1,8 +1,21 @@
from typing import NamedTuple
import json

from google.cloud.pubsublite_v1.types.common import Cursor
from google.cloud.pubsublite.partition import Partition


class PublishMetadata(NamedTuple):
partition: Partition
cursor: Cursor

def encode(self) -> str:
return json.dumps({
'partition': self.partition.value,
'offset': self.cursor.offset
})

@staticmethod
def decode(source: str) -> 'PublishMetadata':
loaded = json.loads(source)
return PublishMetadata(partition=Partition(loaded['partition']), cursor=Cursor(offset=loaded['offset']))
69 changes: 69 additions & 0 deletions tests/unit/pubsublite/cloudpubsub/message_transforms_test.py
@@ -0,0 +1,69 @@
import datetime

import pytest
from google.api_core.exceptions import InvalidArgument
from google.protobuf.timestamp_pb2 import Timestamp
from google.pubsub_v1 import PubsubMessage

from google.cloud.pubsublite.cloudpubsub.message_transforms import PUBSUB_LITE_EVENT_TIME, to_cps_subscribe_message, \
encode_attribute_event_time, from_cps_publish_message
from google.cloud.pubsublite_v1 import SequencedMessage, Cursor, PubSubMessage, AttributeValues

NOT_UTF8 = bytes.fromhex('ffff')


def test_invalid_subscribe_transform_key():
with pytest.raises(InvalidArgument):
to_cps_subscribe_message(
SequencedMessage(message=PubSubMessage(key=NOT_UTF8), publish_time=Timestamp(), cursor=Cursor(offset=10),
size_bytes=10))


def test_invalid_subscribe_contains_magic_attribute():
with pytest.raises(InvalidArgument):
to_cps_subscribe_message(SequencedMessage(
message=PubSubMessage(key=b'def', attributes={PUBSUB_LITE_EVENT_TIME: AttributeValues(values=[b'abc'])}),
publish_time=Timestamp(seconds=10), cursor=Cursor(offset=10), size_bytes=10))


def test_invalid_subscribe_contains_multiple_attributes():
with pytest.raises(InvalidArgument):
to_cps_subscribe_message(SequencedMessage(
message=PubSubMessage(key=b'def', attributes={'xyz': AttributeValues(values=[b'abc', b''])}),
publish_time=Timestamp(seconds=10), cursor=Cursor(offset=10), size_bytes=10))


def test_invalid_subscribe_contains_non_utf8_attributes():
with pytest.raises(InvalidArgument):
to_cps_subscribe_message(SequencedMessage(
message=PubSubMessage(key=b'def', attributes={'xyz': AttributeValues(values=[NOT_UTF8])}),
publish_time=Timestamp(seconds=10), cursor=Cursor(offset=10), size_bytes=10))


def test_subscribe_transform_correct():
expected = PubsubMessage(
data=b'xyz', ordering_key='def', attributes={'x': 'abc', 'y': 'abc',
PUBSUB_LITE_EVENT_TIME: encode_attribute_event_time(
Timestamp(seconds=55).ToDatetime())},
message_id=str(10), publish_time=Timestamp(seconds=10))
result = to_cps_subscribe_message(SequencedMessage(
message=PubSubMessage(data=b'xyz', key=b'def', event_time=Timestamp(seconds=55),
attributes={'x': AttributeValues(values=[b'abc']), 'y': AttributeValues(values=[b'abc'])}),
publish_time=Timestamp(seconds=10), cursor=Cursor(offset=10), size_bytes=10))
assert result == expected


def test_publish_invalid_event_time():
with pytest.raises(InvalidArgument):
from_cps_publish_message(PubsubMessage(attributes={PUBSUB_LITE_EVENT_TIME: 'probably not an encoded proto'}))


def test_publish_valid_transform():
now = datetime.datetime.now()
expected = PubSubMessage(data=b'xyz', key=b'def', event_time=now,
attributes={'x': AttributeValues(values=[b'abc']), 'y': AttributeValues(values=[b'abc'])})
result = from_cps_publish_message(PubsubMessage(
data=b'xyz', ordering_key='def', attributes={'x': 'abc', 'y': 'abc',
PUBSUB_LITE_EVENT_TIME: encode_attribute_event_time(
now)}))
assert result == expected

0 comments on commit 903070d

Please sign in to comment.