Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Rename PublishMetadata to MessageMetadata #92

Merged
merged 2 commits into from Feb 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -38,7 +38,7 @@ async def publish(
**attrs: Additional attributes to send.

Returns:
An ack id, which can be decoded using PublishMetadata.decode.
An ack id, which can be decoded using MessageMetadata.decode.

Raises:
GoogleApiCallError: On a permanent failure.
Expand All @@ -65,7 +65,7 @@ def publish(
**attrs: Additional attributes to send.

Returns:
A future completed with an ack id, which can be decoded using PublishMetadata.decode.
A future completed with an ack id, which can be decoded using MessageMetadata.decode.

Raises:
GoogleApiCallError: On a permanent failure.
Expand Down
4 changes: 2 additions & 2 deletions google/cloud/pubsublite/cloudpubsub/message_transforms.py
Expand Up @@ -19,7 +19,7 @@
from google.pubsub_v1 import PubsubMessage

from google.cloud.pubsublite.cloudpubsub import MessageTransformer
from google.cloud.pubsublite.types import Partition, PublishMetadata
from google.cloud.pubsublite.types import Partition, MessageMetadata
from google.cloud.pubsublite_v1 import AttributeValues, SequencedMessage, PubSubMessage

PUBSUB_LITE_EVENT_TIME = "x-goog-pubsublite-event-time"
Expand Down Expand Up @@ -63,7 +63,7 @@ def add_id_to_message(source: SequencedMessage):
raise InvalidArgument(
"Message after transforming has the message_id field set."
)
message.message_id = PublishMetadata(partition, source.cursor).encode()
message.message_id = MessageMetadata(partition, source.cursor).encode()
return message

return MessageTransformer.of_callable(add_id_to_message)
Expand Down
Expand Up @@ -45,7 +45,7 @@ async def publish(
**attrs: Additional attributes to send.

Returns:
An ack id, which can be decoded using PublishMetadata.decode.
An ack id, which can be decoded using MessageMetadata.decode.

Raises:
GoogleApiCallError: On a permanent failure.
Expand Down Expand Up @@ -78,7 +78,7 @@ def publish(
**attrs: Additional attributes to send.

Returns:
A future completed with an ack id, which can be decoded using PublishMetadata.decode.
A future completed with an ack id, which can be decoded using MessageMetadata.decode.

Raises:
GoogleApiCallError: On a permanent failure.
Expand Down
Expand Up @@ -21,7 +21,7 @@
)
from google.cloud.pubsublite.internal.wire.publisher import Publisher
from google.cloud.pubsublite.internal.wire.routing_policy import RoutingPolicy
from google.cloud.pubsublite.types import PublishMetadata, Partition
from google.cloud.pubsublite.types import MessageMetadata, Partition
from google.cloud.pubsublite_v1 import PubSubMessage


Expand Down Expand Up @@ -87,7 +87,7 @@ async def _handle_partition_count_update(self, partition_count: int):
self._publishers.update(new_publishers)
self._routing_policy = routing_policy

async def publish(self, message: PubSubMessage) -> PublishMetadata:
async def publish(self, message: PubSubMessage) -> MessageMetadata:
partition = self._routing_policy.route(message)
assert partition in self._publishers
publisher = self._publishers[partition]
Expand Down
4 changes: 2 additions & 2 deletions google/cloud/pubsublite/internal/wire/publisher.py
Expand Up @@ -15,7 +15,7 @@
from abc import abstractmethod
from typing import AsyncContextManager
from google.cloud.pubsublite_v1.types import PubSubMessage
from google.cloud.pubsublite.types import PublishMetadata
from google.cloud.pubsublite.types import MessageMetadata


class Publisher(AsyncContextManager):
Expand All @@ -24,7 +24,7 @@ class Publisher(AsyncContextManager):
"""

@abstractmethod
async def publish(self, message: PubSubMessage) -> PublishMetadata:
async def publish(self, message: PubSubMessage) -> MessageMetadata:
"""
Publish the provided message.

Expand Down
4 changes: 2 additions & 2 deletions google/cloud/pubsublite/internal/wire/routing_publisher.py
Expand Up @@ -17,7 +17,7 @@

from google.cloud.pubsublite.internal.wire.publisher import Publisher
from google.cloud.pubsublite.internal.wire.routing_policy import RoutingPolicy
from google.cloud.pubsublite.types import Partition, PublishMetadata
from google.cloud.pubsublite.types import Partition, MessageMetadata
from google.cloud.pubsublite_v1 import PubSubMessage


Expand All @@ -42,7 +42,7 @@ async def __aexit__(self, exc_type, exc_val, exc_tb):
for publisher in self._publishers.values():
await publisher.__aexit__(exc_type, exc_val, exc_tb)

async def publish(self, message: PubSubMessage) -> PublishMetadata:
async def publish(self, message: PubSubMessage) -> MessageMetadata:
partition = self._routing_policy.route(message)
assert partition in self._publishers
return await self._publishers[partition].publish(message)
Expand Up @@ -33,7 +33,7 @@
SerialBatcher,
BatchTester,
)
from google.cloud.pubsublite.types import Partition, PublishMetadata
from google.cloud.pubsublite.types import Partition, MessageMetadata
from google.cloud.pubsublite_v1.types import (
PubSubMessage,
Cursor,
Expand Down Expand Up @@ -159,11 +159,11 @@ async def _flush(self):
logging.debug(f"Failed publish on stream: {e}")
self._fail_if_retrying_failed()

async def publish(self, message: PubSubMessage) -> PublishMetadata:
async def publish(self, message: PubSubMessage) -> MessageMetadata:
cursor_future = self._batcher.add(message)
if self._batcher.should_flush():
await self._flush()
return PublishMetadata(self._partition, await cursor_future)
return MessageMetadata(self._partition, await cursor_future)

async def reinitialize(
self, connection: Connection[PublishRequest, PublishResponse]
Expand Down
4 changes: 2 additions & 2 deletions google/cloud/pubsublite/types/__init__.py
Expand Up @@ -16,7 +16,7 @@
from .location import CloudRegion, CloudZone
from .partition import Partition
from .paths import LocationPath, TopicPath, SubscriptionPath
from .publish_metadata import PublishMetadata
from .message_metadata import MessageMetadata
from .flow_control_settings import FlowControlSettings, DISABLED_FLOW_CONTROL

__all__ = (
Expand All @@ -25,7 +25,7 @@
"FlowControlSettings",
"LocationPath",
"Partition",
"PublishMetadata",
"MessageMetadata",
"SubscriptionPath",
"TopicPath",
)
Expand Up @@ -19,7 +19,7 @@
from google.cloud.pubsublite.types.partition import Partition


class PublishMetadata(NamedTuple):
class MessageMetadata(NamedTuple):
partition: Partition
cursor: Cursor

Expand All @@ -29,9 +29,9 @@ def encode(self) -> str:
)

@staticmethod
def decode(source: str) -> "PublishMetadata":
def decode(source: str) -> "MessageMetadata":
loaded = json.loads(source)
return PublishMetadata(
return MessageMetadata(
partition=Partition(loaded["partition"]),
cursor=Cursor(offset=loaded["offset"]),
)
4 changes: 2 additions & 2 deletions noxfile.py
Expand Up @@ -26,7 +26,7 @@
BLACK_VERSION = "black==19.10b0"
BLACK_PATHS = ["docs", "google", "tests", "noxfile.py", "setup.py"]

DEFAULT_PYTHON_VERSION = "3.8"
DEFAULT_PYTHON_VERSION = "3.6"
SYSTEM_TEST_PYTHON_VERSIONS = ["3.8"]
UNIT_TEST_PYTHON_VERSIONS = ["3.6", "3.7", "3.8", "3.9"]

Expand All @@ -45,7 +45,7 @@ def lint(session):
session.run("flake8", "google", "tests")


@nox.session(python="3.6")
@nox.session(python=DEFAULT_PYTHON_VERSION)
def blacken(session):
"""Run black.

Expand Down
6 changes: 3 additions & 3 deletions samples/snippets/publish_with_batch_settings_example.py
Expand Up @@ -31,7 +31,7 @@ def publish_with_batch_settings(
from google.cloud.pubsublite.types import (
CloudRegion,
CloudZone,
PublishMetadata,
MessageMetadata,
TopicPath,
)

Expand Down Expand Up @@ -62,9 +62,9 @@ def publish_with_batch_settings(
api_future = publisher_client.publish(topic_path, data.encode("utf-8"))
# result() blocks. To resolve API futures asynchronously, use add_done_callback().
message_id = api_future.result()
publish_metadata = PublishMetadata.decode(message_id)
message_metadata = MessageMetadata.decode(message_id)
print(
f"Published {data} to partition {publish_metadata.partition.value} and offset {publish_metadata.cursor.offset}."
f"Published {data} to partition {message_metadata.partition.value} and offset {message_metadata.cursor.offset}."
)

print(
Expand Down
6 changes: 3 additions & 3 deletions samples/snippets/publish_with_custom_attributes_example.py
Expand Up @@ -28,7 +28,7 @@ def publish_with_custom_attributes(project_number, cloud_region, zone_id, topic_
from google.cloud.pubsublite.types import (
CloudRegion,
CloudZone,
PublishMetadata,
MessageMetadata,
TopicPath,
)

Expand All @@ -49,9 +49,9 @@ def publish_with_custom_attributes(project_number, cloud_region, zone_id, topic_
)
# result() blocks. To resolve api futures asynchronously, use add_done_callback().
message_id = api_future.result()
publish_metadata = PublishMetadata.decode(message_id)
message_metadata = MessageMetadata.decode(message_id)
print(
f"Published {data} to partition {publish_metadata.partition.value} and offset {publish_metadata.cursor.offset}."
f"Published {data} to partition {message_metadata.partition.value} and offset {message_metadata.cursor.offset}."
)

print(f"Finished publishing a message with custom attributes to {str(topic_path)}.")
Expand Down
6 changes: 3 additions & 3 deletions samples/snippets/publish_with_ordering_key_example.py
Expand Up @@ -30,7 +30,7 @@ def publish_with_odering_key(
from google.cloud.pubsublite.types import (
CloudRegion,
CloudZone,
PublishMetadata,
MessageMetadata,
TopicPath,
)

Expand All @@ -56,9 +56,9 @@ def publish_with_odering_key(
)
# result() blocks. To resolve api futures asynchronously, use add_done_callback().
message_id = api_future.result()
publish_metadata = PublishMetadata.decode(message_id)
message_metadata = MessageMetadata.decode(message_id)
print(
f"Published {data} to partition {publish_metadata.partition.value} and offset {publish_metadata.cursor.offset}."
f"Published {data} to partition {message_metadata.partition.value} and offset {message_metadata.cursor.offset}."
)

print(
Expand Down
6 changes: 3 additions & 3 deletions samples/snippets/publisher_example.py
Expand Up @@ -28,7 +28,7 @@ def publish_messages(project_number, cloud_region, zone_id, topic_id):
from google.cloud.pubsublite.types import (
CloudRegion,
CloudZone,
PublishMetadata,
MessageMetadata,
TopicPath,
)

Expand All @@ -47,9 +47,9 @@ def publish_messages(project_number, cloud_region, zone_id, topic_id):
api_future = publisher_client.publish(topic_path, data.encode("utf-8"))
# result() blocks. To resolve API futures asynchronously, use add_done_callback().
message_id = api_future.result()
publish_metadata = PublishMetadata.decode(message_id)
message_metadata = MessageMetadata.decode(message_id)
print(
f"Published a message to partition {publish_metadata.partition.value} and offset {publish_metadata.cursor.offset}."
f"Published a message to partition {message_metadata.partition.value} and offset {message_metadata.cursor.offset}."
)
# [END pubsublite_quickstart_publisher]

Expand Down
5 changes: 2 additions & 3 deletions samples/snippets/subscriber_example.py
Expand Up @@ -21,10 +21,9 @@

import argparse

from google.cloud.pubsublite.types import MessageMetadata
from google.pubsub_v1 import PubsubMessage

from google.cloud.pubsublite.types import PublishMetadata


def receive_messages(
project_number, cloud_region, zone_id, subscription_id, timeout=90
Expand Down Expand Up @@ -60,7 +59,7 @@ def receive_messages(

def callback(message: PubsubMessage):
message_data = message.data.decode("utf-8")
metadata = PublishMetadata.decode(message.message_id)
metadata = MessageMetadata.decode(message.message_id)
print(f"Received {message_data} of ordering key {message.ordering_key} with id {metadata}.")
message.ack()

Expand Down
4 changes: 2 additions & 2 deletions tests/unit/pubsublite/cloudpubsub/message_transforms_test.py
Expand Up @@ -27,7 +27,7 @@
from_cps_publish_message,
add_id_to_cps_subscribe_transformer,
)
from google.cloud.pubsublite.types import Partition, PublishMetadata
from google.cloud.pubsublite.types import Partition, MessageMetadata
from google.cloud.pubsublite_v1 import (
SequencedMessage,
Cursor,
Expand Down Expand Up @@ -166,7 +166,7 @@ def test_wrapped_successful():
Timestamp(seconds=55).ToDatetime()
),
},
message_id=PublishMetadata(Partition(1), Cursor(offset=10)).encode(),
message_id=MessageMetadata(Partition(1), Cursor(offset=10)).encode(),
publish_time=Timestamp(seconds=10),
)
result = wrapped.transform(
Expand Down