Skip to content

Commit

Permalink
fix: Rename PublishMetadata to MessageMetadata (#92)
Browse files Browse the repository at this point in the history
* fix: Rename PublishMetadata to MessageMetadata

* fix: lint
  • Loading branch information
dpcollins-google committed Feb 9, 2021
1 parent 85944e7 commit a744441
Show file tree
Hide file tree
Showing 16 changed files with 38 additions and 39 deletions.
Expand Up @@ -38,7 +38,7 @@ async def publish(
**attrs: Additional attributes to send.
Returns:
An ack id, which can be decoded using PublishMetadata.decode.
An ack id, which can be decoded using MessageMetadata.decode.
Raises:
GoogleApiCallError: On a permanent failure.
Expand All @@ -65,7 +65,7 @@ def publish(
**attrs: Additional attributes to send.
Returns:
A future completed with an ack id, which can be decoded using PublishMetadata.decode.
A future completed with an ack id, which can be decoded using MessageMetadata.decode.
Raises:
GoogleApiCallError: On a permanent failure.
Expand Down
4 changes: 2 additions & 2 deletions google/cloud/pubsublite/cloudpubsub/message_transforms.py
Expand Up @@ -19,7 +19,7 @@
from google.pubsub_v1 import PubsubMessage

from google.cloud.pubsublite.cloudpubsub import MessageTransformer
from google.cloud.pubsublite.types import Partition, PublishMetadata
from google.cloud.pubsublite.types import Partition, MessageMetadata
from google.cloud.pubsublite_v1 import AttributeValues, SequencedMessage, PubSubMessage

PUBSUB_LITE_EVENT_TIME = "x-goog-pubsublite-event-time"
Expand Down Expand Up @@ -63,7 +63,7 @@ def add_id_to_message(source: SequencedMessage):
raise InvalidArgument(
"Message after transforming has the message_id field set."
)
message.message_id = PublishMetadata(partition, source.cursor).encode()
message.message_id = MessageMetadata(partition, source.cursor).encode()
return message

return MessageTransformer.of_callable(add_id_to_message)
Expand Down
Expand Up @@ -45,7 +45,7 @@ async def publish(
**attrs: Additional attributes to send.
Returns:
An ack id, which can be decoded using PublishMetadata.decode.
An ack id, which can be decoded using MessageMetadata.decode.
Raises:
GoogleApiCallError: On a permanent failure.
Expand Down Expand Up @@ -78,7 +78,7 @@ def publish(
**attrs: Additional attributes to send.
Returns:
A future completed with an ack id, which can be decoded using PublishMetadata.decode.
A future completed with an ack id, which can be decoded using MessageMetadata.decode.
Raises:
GoogleApiCallError: On a permanent failure.
Expand Down
Expand Up @@ -21,7 +21,7 @@
)
from google.cloud.pubsublite.internal.wire.publisher import Publisher
from google.cloud.pubsublite.internal.wire.routing_policy import RoutingPolicy
from google.cloud.pubsublite.types import PublishMetadata, Partition
from google.cloud.pubsublite.types import MessageMetadata, Partition
from google.cloud.pubsublite_v1 import PubSubMessage


Expand Down Expand Up @@ -87,7 +87,7 @@ async def _handle_partition_count_update(self, partition_count: int):
self._publishers.update(new_publishers)
self._routing_policy = routing_policy

async def publish(self, message: PubSubMessage) -> PublishMetadata:
async def publish(self, message: PubSubMessage) -> MessageMetadata:
partition = self._routing_policy.route(message)
assert partition in self._publishers
publisher = self._publishers[partition]
Expand Down
4 changes: 2 additions & 2 deletions google/cloud/pubsublite/internal/wire/publisher.py
Expand Up @@ -15,7 +15,7 @@
from abc import abstractmethod
from typing import AsyncContextManager
from google.cloud.pubsublite_v1.types import PubSubMessage
from google.cloud.pubsublite.types import PublishMetadata
from google.cloud.pubsublite.types import MessageMetadata


class Publisher(AsyncContextManager):
Expand All @@ -24,7 +24,7 @@ class Publisher(AsyncContextManager):
"""

@abstractmethod
async def publish(self, message: PubSubMessage) -> PublishMetadata:
async def publish(self, message: PubSubMessage) -> MessageMetadata:
"""
Publish the provided message.
Expand Down
4 changes: 2 additions & 2 deletions google/cloud/pubsublite/internal/wire/routing_publisher.py
Expand Up @@ -17,7 +17,7 @@

from google.cloud.pubsublite.internal.wire.publisher import Publisher
from google.cloud.pubsublite.internal.wire.routing_policy import RoutingPolicy
from google.cloud.pubsublite.types import Partition, PublishMetadata
from google.cloud.pubsublite.types import Partition, MessageMetadata
from google.cloud.pubsublite_v1 import PubSubMessage


Expand All @@ -42,7 +42,7 @@ async def __aexit__(self, exc_type, exc_val, exc_tb):
for publisher in self._publishers.values():
await publisher.__aexit__(exc_type, exc_val, exc_tb)

async def publish(self, message: PubSubMessage) -> PublishMetadata:
async def publish(self, message: PubSubMessage) -> MessageMetadata:
partition = self._routing_policy.route(message)
assert partition in self._publishers
return await self._publishers[partition].publish(message)
Expand Up @@ -33,7 +33,7 @@
SerialBatcher,
BatchTester,
)
from google.cloud.pubsublite.types import Partition, PublishMetadata
from google.cloud.pubsublite.types import Partition, MessageMetadata
from google.cloud.pubsublite_v1.types import (
PubSubMessage,
Cursor,
Expand Down Expand Up @@ -159,11 +159,11 @@ async def _flush(self):
logging.debug(f"Failed publish on stream: {e}")
self._fail_if_retrying_failed()

async def publish(self, message: PubSubMessage) -> PublishMetadata:
async def publish(self, message: PubSubMessage) -> MessageMetadata:
cursor_future = self._batcher.add(message)
if self._batcher.should_flush():
await self._flush()
return PublishMetadata(self._partition, await cursor_future)
return MessageMetadata(self._partition, await cursor_future)

async def reinitialize(
self, connection: Connection[PublishRequest, PublishResponse]
Expand Down
4 changes: 2 additions & 2 deletions google/cloud/pubsublite/types/__init__.py
Expand Up @@ -16,7 +16,7 @@
from .location import CloudRegion, CloudZone
from .partition import Partition
from .paths import LocationPath, TopicPath, SubscriptionPath
from .publish_metadata import PublishMetadata
from .message_metadata import MessageMetadata
from .flow_control_settings import FlowControlSettings, DISABLED_FLOW_CONTROL

__all__ = (
Expand All @@ -25,7 +25,7 @@
"FlowControlSettings",
"LocationPath",
"Partition",
"PublishMetadata",
"MessageMetadata",
"SubscriptionPath",
"TopicPath",
)
Expand Up @@ -19,7 +19,7 @@
from google.cloud.pubsublite.types.partition import Partition


class PublishMetadata(NamedTuple):
class MessageMetadata(NamedTuple):
partition: Partition
cursor: Cursor

Expand All @@ -29,9 +29,9 @@ def encode(self) -> str:
)

@staticmethod
def decode(source: str) -> "PublishMetadata":
def decode(source: str) -> "MessageMetadata":
loaded = json.loads(source)
return PublishMetadata(
return MessageMetadata(
partition=Partition(loaded["partition"]),
cursor=Cursor(offset=loaded["offset"]),
)
4 changes: 2 additions & 2 deletions noxfile.py
Expand Up @@ -26,7 +26,7 @@
BLACK_VERSION = "black==19.10b0"
BLACK_PATHS = ["docs", "google", "tests", "noxfile.py", "setup.py"]

DEFAULT_PYTHON_VERSION = "3.8"
DEFAULT_PYTHON_VERSION = "3.6"
SYSTEM_TEST_PYTHON_VERSIONS = ["3.8"]
UNIT_TEST_PYTHON_VERSIONS = ["3.6", "3.7", "3.8", "3.9"]

Expand All @@ -45,7 +45,7 @@ def lint(session):
session.run("flake8", "google", "tests")


@nox.session(python="3.6")
@nox.session(python=DEFAULT_PYTHON_VERSION)
def blacken(session):
"""Run black.
Expand Down
6 changes: 3 additions & 3 deletions samples/snippets/publish_with_batch_settings_example.py
Expand Up @@ -31,7 +31,7 @@ def publish_with_batch_settings(
from google.cloud.pubsublite.types import (
CloudRegion,
CloudZone,
PublishMetadata,
MessageMetadata,
TopicPath,
)

Expand Down Expand Up @@ -62,9 +62,9 @@ def publish_with_batch_settings(
api_future = publisher_client.publish(topic_path, data.encode("utf-8"))
# result() blocks. To resolve API futures asynchronously, use add_done_callback().
message_id = api_future.result()
publish_metadata = PublishMetadata.decode(message_id)
message_metadata = MessageMetadata.decode(message_id)
print(
f"Published {data} to partition {publish_metadata.partition.value} and offset {publish_metadata.cursor.offset}."
f"Published {data} to partition {message_metadata.partition.value} and offset {message_metadata.cursor.offset}."
)

print(
Expand Down
6 changes: 3 additions & 3 deletions samples/snippets/publish_with_custom_attributes_example.py
Expand Up @@ -28,7 +28,7 @@ def publish_with_custom_attributes(project_number, cloud_region, zone_id, topic_
from google.cloud.pubsublite.types import (
CloudRegion,
CloudZone,
PublishMetadata,
MessageMetadata,
TopicPath,
)

Expand All @@ -49,9 +49,9 @@ def publish_with_custom_attributes(project_number, cloud_region, zone_id, topic_
)
# result() blocks. To resolve api futures asynchronously, use add_done_callback().
message_id = api_future.result()
publish_metadata = PublishMetadata.decode(message_id)
message_metadata = MessageMetadata.decode(message_id)
print(
f"Published {data} to partition {publish_metadata.partition.value} and offset {publish_metadata.cursor.offset}."
f"Published {data} to partition {message_metadata.partition.value} and offset {message_metadata.cursor.offset}."
)

print(f"Finished publishing a message with custom attributes to {str(topic_path)}.")
Expand Down
6 changes: 3 additions & 3 deletions samples/snippets/publish_with_ordering_key_example.py
Expand Up @@ -30,7 +30,7 @@ def publish_with_odering_key(
from google.cloud.pubsublite.types import (
CloudRegion,
CloudZone,
PublishMetadata,
MessageMetadata,
TopicPath,
)

Expand All @@ -56,9 +56,9 @@ def publish_with_odering_key(
)
# result() blocks. To resolve api futures asynchronously, use add_done_callback().
message_id = api_future.result()
publish_metadata = PublishMetadata.decode(message_id)
message_metadata = MessageMetadata.decode(message_id)
print(
f"Published {data} to partition {publish_metadata.partition.value} and offset {publish_metadata.cursor.offset}."
f"Published {data} to partition {message_metadata.partition.value} and offset {message_metadata.cursor.offset}."
)

print(
Expand Down
6 changes: 3 additions & 3 deletions samples/snippets/publisher_example.py
Expand Up @@ -28,7 +28,7 @@ def publish_messages(project_number, cloud_region, zone_id, topic_id):
from google.cloud.pubsublite.types import (
CloudRegion,
CloudZone,
PublishMetadata,
MessageMetadata,
TopicPath,
)

Expand All @@ -47,9 +47,9 @@ def publish_messages(project_number, cloud_region, zone_id, topic_id):
api_future = publisher_client.publish(topic_path, data.encode("utf-8"))
# result() blocks. To resolve API futures asynchronously, use add_done_callback().
message_id = api_future.result()
publish_metadata = PublishMetadata.decode(message_id)
message_metadata = MessageMetadata.decode(message_id)
print(
f"Published a message to partition {publish_metadata.partition.value} and offset {publish_metadata.cursor.offset}."
f"Published a message to partition {message_metadata.partition.value} and offset {message_metadata.cursor.offset}."
)
# [END pubsublite_quickstart_publisher]

Expand Down
5 changes: 2 additions & 3 deletions samples/snippets/subscriber_example.py
Expand Up @@ -21,10 +21,9 @@

import argparse

from google.cloud.pubsublite.types import MessageMetadata
from google.pubsub_v1 import PubsubMessage

from google.cloud.pubsublite.types import PublishMetadata


def receive_messages(
project_number, cloud_region, zone_id, subscription_id, timeout=90
Expand Down Expand Up @@ -60,7 +59,7 @@ def receive_messages(

def callback(message: PubsubMessage):
message_data = message.data.decode("utf-8")
metadata = PublishMetadata.decode(message.message_id)
metadata = MessageMetadata.decode(message.message_id)
print(f"Received {message_data} of ordering key {message.ordering_key} with id {metadata}.")
message.ack()

Expand Down
4 changes: 2 additions & 2 deletions tests/unit/pubsublite/cloudpubsub/message_transforms_test.py
Expand Up @@ -27,7 +27,7 @@
from_cps_publish_message,
add_id_to_cps_subscribe_transformer,
)
from google.cloud.pubsublite.types import Partition, PublishMetadata
from google.cloud.pubsublite.types import Partition, MessageMetadata
from google.cloud.pubsublite_v1 import (
SequencedMessage,
Cursor,
Expand Down Expand Up @@ -166,7 +166,7 @@ def test_wrapped_successful():
Timestamp(seconds=55).ToDatetime()
),
},
message_id=PublishMetadata(Partition(1), Cursor(offset=10)).encode(),
message_id=MessageMetadata(Partition(1), Cursor(offset=10)).encode(),
publish_time=Timestamp(seconds=10),
)
result = wrapped.transform(
Expand Down

0 comments on commit a744441

Please sign in to comment.