diff --git a/google/cloud/pubsublite/cloudpubsub/internal/single_publisher.py b/google/cloud/pubsublite/cloudpubsub/internal/single_publisher.py index 96c03b0e..04f6e0f6 100644 --- a/google/cloud/pubsublite/cloudpubsub/internal/single_publisher.py +++ b/google/cloud/pubsublite/cloudpubsub/internal/single_publisher.py @@ -38,7 +38,7 @@ async def publish( **attrs: Additional attributes to send. Returns: - An ack id, which can be decoded using PublishMetadata.decode. + An ack id, which can be decoded using MessageMetadata.decode. Raises: GoogleApiCallError: On a permanent failure. @@ -65,7 +65,7 @@ def publish( **attrs: Additional attributes to send. Returns: - A future completed with an ack id, which can be decoded using PublishMetadata.decode. + A future completed with an ack id, which can be decoded using MessageMetadata.decode. Raises: GoogleApiCallError: On a permanent failure. diff --git a/google/cloud/pubsublite/cloudpubsub/message_transforms.py b/google/cloud/pubsublite/cloudpubsub/message_transforms.py index e3d9a895..3eeed468 100644 --- a/google/cloud/pubsublite/cloudpubsub/message_transforms.py +++ b/google/cloud/pubsublite/cloudpubsub/message_transforms.py @@ -19,7 +19,7 @@ from google.pubsub_v1 import PubsubMessage from google.cloud.pubsublite.cloudpubsub import MessageTransformer -from google.cloud.pubsublite.types import Partition, PublishMetadata +from google.cloud.pubsublite.types import Partition, MessageMetadata from google.cloud.pubsublite_v1 import AttributeValues, SequencedMessage, PubSubMessage PUBSUB_LITE_EVENT_TIME = "x-goog-pubsublite-event-time" @@ -63,7 +63,7 @@ def add_id_to_message(source: SequencedMessage): raise InvalidArgument( "Message after transforming has the message_id field set." ) - message.message_id = PublishMetadata(partition, source.cursor).encode() + message.message_id = MessageMetadata(partition, source.cursor).encode() return message return MessageTransformer.of_callable(add_id_to_message) diff --git a/google/cloud/pubsublite/cloudpubsub/publisher_client_interface.py b/google/cloud/pubsublite/cloudpubsub/publisher_client_interface.py index 3b19a351..5d15a601 100644 --- a/google/cloud/pubsublite/cloudpubsub/publisher_client_interface.py +++ b/google/cloud/pubsublite/cloudpubsub/publisher_client_interface.py @@ -45,7 +45,7 @@ async def publish( **attrs: Additional attributes to send. Returns: - An ack id, which can be decoded using PublishMetadata.decode. + An ack id, which can be decoded using MessageMetadata.decode. Raises: GoogleApiCallError: On a permanent failure. @@ -78,7 +78,7 @@ def publish( **attrs: Additional attributes to send. Returns: - A future completed with an ack id, which can be decoded using PublishMetadata.decode. + A future completed with an ack id, which can be decoded using MessageMetadata.decode. Raises: GoogleApiCallError: On a permanent failure. diff --git a/google/cloud/pubsublite/internal/wire/partition_count_watching_publisher.py b/google/cloud/pubsublite/internal/wire/partition_count_watching_publisher.py index 55618de1..ba3a962e 100644 --- a/google/cloud/pubsublite/internal/wire/partition_count_watching_publisher.py +++ b/google/cloud/pubsublite/internal/wire/partition_count_watching_publisher.py @@ -21,7 +21,7 @@ ) from google.cloud.pubsublite.internal.wire.publisher import Publisher from google.cloud.pubsublite.internal.wire.routing_policy import RoutingPolicy -from google.cloud.pubsublite.types import PublishMetadata, Partition +from google.cloud.pubsublite.types import MessageMetadata, Partition from google.cloud.pubsublite_v1 import PubSubMessage @@ -87,7 +87,7 @@ async def _handle_partition_count_update(self, partition_count: int): self._publishers.update(new_publishers) self._routing_policy = routing_policy - async def publish(self, message: PubSubMessage) -> PublishMetadata: + async def publish(self, message: PubSubMessage) -> MessageMetadata: partition = self._routing_policy.route(message) assert partition in self._publishers publisher = self._publishers[partition] diff --git a/google/cloud/pubsublite/internal/wire/publisher.py b/google/cloud/pubsublite/internal/wire/publisher.py index d9a14a94..35b581bb 100644 --- a/google/cloud/pubsublite/internal/wire/publisher.py +++ b/google/cloud/pubsublite/internal/wire/publisher.py @@ -15,7 +15,7 @@ from abc import abstractmethod from typing import AsyncContextManager from google.cloud.pubsublite_v1.types import PubSubMessage -from google.cloud.pubsublite.types import PublishMetadata +from google.cloud.pubsublite.types import MessageMetadata class Publisher(AsyncContextManager): @@ -24,7 +24,7 @@ class Publisher(AsyncContextManager): """ @abstractmethod - async def publish(self, message: PubSubMessage) -> PublishMetadata: + async def publish(self, message: PubSubMessage) -> MessageMetadata: """ Publish the provided message. diff --git a/google/cloud/pubsublite/internal/wire/routing_publisher.py b/google/cloud/pubsublite/internal/wire/routing_publisher.py index e6db82e1..630b2712 100644 --- a/google/cloud/pubsublite/internal/wire/routing_publisher.py +++ b/google/cloud/pubsublite/internal/wire/routing_publisher.py @@ -17,7 +17,7 @@ from google.cloud.pubsublite.internal.wire.publisher import Publisher from google.cloud.pubsublite.internal.wire.routing_policy import RoutingPolicy -from google.cloud.pubsublite.types import Partition, PublishMetadata +from google.cloud.pubsublite.types import Partition, MessageMetadata from google.cloud.pubsublite_v1 import PubSubMessage @@ -42,7 +42,7 @@ async def __aexit__(self, exc_type, exc_val, exc_tb): for publisher in self._publishers.values(): await publisher.__aexit__(exc_type, exc_val, exc_tb) - async def publish(self, message: PubSubMessage) -> PublishMetadata: + async def publish(self, message: PubSubMessage) -> MessageMetadata: partition = self._routing_policy.route(message) assert partition in self._publishers return await self._publishers[partition].publish(message) diff --git a/google/cloud/pubsublite/internal/wire/single_partition_publisher.py b/google/cloud/pubsublite/internal/wire/single_partition_publisher.py index 90f6f46a..411d9b13 100644 --- a/google/cloud/pubsublite/internal/wire/single_partition_publisher.py +++ b/google/cloud/pubsublite/internal/wire/single_partition_publisher.py @@ -33,7 +33,7 @@ SerialBatcher, BatchTester, ) -from google.cloud.pubsublite.types import Partition, PublishMetadata +from google.cloud.pubsublite.types import Partition, MessageMetadata from google.cloud.pubsublite_v1.types import ( PubSubMessage, Cursor, @@ -159,11 +159,11 @@ async def _flush(self): logging.debug(f"Failed publish on stream: {e}") self._fail_if_retrying_failed() - async def publish(self, message: PubSubMessage) -> PublishMetadata: + async def publish(self, message: PubSubMessage) -> MessageMetadata: cursor_future = self._batcher.add(message) if self._batcher.should_flush(): await self._flush() - return PublishMetadata(self._partition, await cursor_future) + return MessageMetadata(self._partition, await cursor_future) async def reinitialize( self, connection: Connection[PublishRequest, PublishResponse] diff --git a/google/cloud/pubsublite/types/__init__.py b/google/cloud/pubsublite/types/__init__.py index d6e3e4ec..6304fd3c 100644 --- a/google/cloud/pubsublite/types/__init__.py +++ b/google/cloud/pubsublite/types/__init__.py @@ -16,7 +16,7 @@ from .location import CloudRegion, CloudZone from .partition import Partition from .paths import LocationPath, TopicPath, SubscriptionPath -from .publish_metadata import PublishMetadata +from .message_metadata import MessageMetadata from .flow_control_settings import FlowControlSettings, DISABLED_FLOW_CONTROL __all__ = ( @@ -25,7 +25,7 @@ "FlowControlSettings", "LocationPath", "Partition", - "PublishMetadata", + "MessageMetadata", "SubscriptionPath", "TopicPath", ) diff --git a/google/cloud/pubsublite/types/publish_metadata.py b/google/cloud/pubsublite/types/message_metadata.py similarity index 90% rename from google/cloud/pubsublite/types/publish_metadata.py rename to google/cloud/pubsublite/types/message_metadata.py index c746eda1..cd6b2dc5 100644 --- a/google/cloud/pubsublite/types/publish_metadata.py +++ b/google/cloud/pubsublite/types/message_metadata.py @@ -19,7 +19,7 @@ from google.cloud.pubsublite.types.partition import Partition -class PublishMetadata(NamedTuple): +class MessageMetadata(NamedTuple): partition: Partition cursor: Cursor @@ -29,9 +29,9 @@ def encode(self) -> str: ) @staticmethod - def decode(source: str) -> "PublishMetadata": + def decode(source: str) -> "MessageMetadata": loaded = json.loads(source) - return PublishMetadata( + return MessageMetadata( partition=Partition(loaded["partition"]), cursor=Cursor(offset=loaded["offset"]), ) diff --git a/noxfile.py b/noxfile.py index a2791746..776269dd 100644 --- a/noxfile.py +++ b/noxfile.py @@ -26,7 +26,7 @@ BLACK_VERSION = "black==19.10b0" BLACK_PATHS = ["docs", "google", "tests", "noxfile.py", "setup.py"] -DEFAULT_PYTHON_VERSION = "3.8" +DEFAULT_PYTHON_VERSION = "3.6" SYSTEM_TEST_PYTHON_VERSIONS = ["3.8"] UNIT_TEST_PYTHON_VERSIONS = ["3.6", "3.7", "3.8", "3.9"] @@ -45,7 +45,7 @@ def lint(session): session.run("flake8", "google", "tests") -@nox.session(python="3.6") +@nox.session(python=DEFAULT_PYTHON_VERSION) def blacken(session): """Run black. diff --git a/samples/snippets/publish_with_batch_settings_example.py b/samples/snippets/publish_with_batch_settings_example.py index 8035bbc5..5f0a1067 100644 --- a/samples/snippets/publish_with_batch_settings_example.py +++ b/samples/snippets/publish_with_batch_settings_example.py @@ -31,7 +31,7 @@ def publish_with_batch_settings( from google.cloud.pubsublite.types import ( CloudRegion, CloudZone, - PublishMetadata, + MessageMetadata, TopicPath, ) @@ -62,9 +62,9 @@ def publish_with_batch_settings( api_future = publisher_client.publish(topic_path, data.encode("utf-8")) # result() blocks. To resolve API futures asynchronously, use add_done_callback(). message_id = api_future.result() - publish_metadata = PublishMetadata.decode(message_id) + message_metadata = MessageMetadata.decode(message_id) print( - f"Published {data} to partition {publish_metadata.partition.value} and offset {publish_metadata.cursor.offset}." + f"Published {data} to partition {message_metadata.partition.value} and offset {message_metadata.cursor.offset}." ) print( diff --git a/samples/snippets/publish_with_custom_attributes_example.py b/samples/snippets/publish_with_custom_attributes_example.py index 38c6b1b7..45a0ce29 100644 --- a/samples/snippets/publish_with_custom_attributes_example.py +++ b/samples/snippets/publish_with_custom_attributes_example.py @@ -28,7 +28,7 @@ def publish_with_custom_attributes(project_number, cloud_region, zone_id, topic_ from google.cloud.pubsublite.types import ( CloudRegion, CloudZone, - PublishMetadata, + MessageMetadata, TopicPath, ) @@ -49,9 +49,9 @@ def publish_with_custom_attributes(project_number, cloud_region, zone_id, topic_ ) # result() blocks. To resolve api futures asynchronously, use add_done_callback(). message_id = api_future.result() - publish_metadata = PublishMetadata.decode(message_id) + message_metadata = MessageMetadata.decode(message_id) print( - f"Published {data} to partition {publish_metadata.partition.value} and offset {publish_metadata.cursor.offset}." + f"Published {data} to partition {message_metadata.partition.value} and offset {message_metadata.cursor.offset}." ) print(f"Finished publishing a message with custom attributes to {str(topic_path)}.") diff --git a/samples/snippets/publish_with_ordering_key_example.py b/samples/snippets/publish_with_ordering_key_example.py index 54919901..5b3680e1 100644 --- a/samples/snippets/publish_with_ordering_key_example.py +++ b/samples/snippets/publish_with_ordering_key_example.py @@ -30,7 +30,7 @@ def publish_with_odering_key( from google.cloud.pubsublite.types import ( CloudRegion, CloudZone, - PublishMetadata, + MessageMetadata, TopicPath, ) @@ -56,9 +56,9 @@ def publish_with_odering_key( ) # result() blocks. To resolve api futures asynchronously, use add_done_callback(). message_id = api_future.result() - publish_metadata = PublishMetadata.decode(message_id) + message_metadata = MessageMetadata.decode(message_id) print( - f"Published {data} to partition {publish_metadata.partition.value} and offset {publish_metadata.cursor.offset}." + f"Published {data} to partition {message_metadata.partition.value} and offset {message_metadata.cursor.offset}." ) print( diff --git a/samples/snippets/publisher_example.py b/samples/snippets/publisher_example.py index ab240c66..04d7e713 100644 --- a/samples/snippets/publisher_example.py +++ b/samples/snippets/publisher_example.py @@ -28,7 +28,7 @@ def publish_messages(project_number, cloud_region, zone_id, topic_id): from google.cloud.pubsublite.types import ( CloudRegion, CloudZone, - PublishMetadata, + MessageMetadata, TopicPath, ) @@ -47,9 +47,9 @@ def publish_messages(project_number, cloud_region, zone_id, topic_id): api_future = publisher_client.publish(topic_path, data.encode("utf-8")) # result() blocks. To resolve API futures asynchronously, use add_done_callback(). message_id = api_future.result() - publish_metadata = PublishMetadata.decode(message_id) + message_metadata = MessageMetadata.decode(message_id) print( - f"Published a message to partition {publish_metadata.partition.value} and offset {publish_metadata.cursor.offset}." + f"Published a message to partition {message_metadata.partition.value} and offset {message_metadata.cursor.offset}." ) # [END pubsublite_quickstart_publisher] diff --git a/samples/snippets/subscriber_example.py b/samples/snippets/subscriber_example.py index a8c9b5d3..5f5e2c10 100644 --- a/samples/snippets/subscriber_example.py +++ b/samples/snippets/subscriber_example.py @@ -21,10 +21,9 @@ import argparse +from google.cloud.pubsublite.types import MessageMetadata from google.pubsub_v1 import PubsubMessage -from google.cloud.pubsublite.types import PublishMetadata - def receive_messages( project_number, cloud_region, zone_id, subscription_id, timeout=90 @@ -60,7 +59,7 @@ def receive_messages( def callback(message: PubsubMessage): message_data = message.data.decode("utf-8") - metadata = PublishMetadata.decode(message.message_id) + metadata = MessageMetadata.decode(message.message_id) print(f"Received {message_data} of ordering key {message.ordering_key} with id {metadata}.") message.ack() diff --git a/tests/unit/pubsublite/cloudpubsub/message_transforms_test.py b/tests/unit/pubsublite/cloudpubsub/message_transforms_test.py index 43e71e90..206a228d 100644 --- a/tests/unit/pubsublite/cloudpubsub/message_transforms_test.py +++ b/tests/unit/pubsublite/cloudpubsub/message_transforms_test.py @@ -27,7 +27,7 @@ from_cps_publish_message, add_id_to_cps_subscribe_transformer, ) -from google.cloud.pubsublite.types import Partition, PublishMetadata +from google.cloud.pubsublite.types import Partition, MessageMetadata from google.cloud.pubsublite_v1 import ( SequencedMessage, Cursor, @@ -166,7 +166,7 @@ def test_wrapped_successful(): Timestamp(seconds=55).ToDatetime() ), }, - message_id=PublishMetadata(Partition(1), Cursor(offset=10)).encode(), + message_id=MessageMetadata(Partition(1), Cursor(offset=10)).encode(), publish_time=Timestamp(seconds=10), ) result = wrapped.transform(