From 911829d85c6ec36a87b873cbfe34497b1a493dde Mon Sep 17 00:00:00 2001 From: Anthonios Partheniou Date: Thu, 22 Jul 2021 13:23:52 -0400 Subject: [PATCH] feat: Add Pub/Sub topic retention fields (#456) --- google/cloud/pubsub_v1/proto/pubsub.proto | 19 +++++++++++++ .../services/publisher/async_client.py | 1 + google/pubsub_v1/services/publisher/client.py | 1 + google/pubsub_v1/types/pubsub.py | 27 +++++++++++++++++++ scripts/fixup_pubsub_v1_keywords.py | 4 +-- tests/unit/gapic/pubsub_v1/test_publisher.py | 1 + 6 files changed, 51 insertions(+), 2 deletions(-) diff --git a/google/cloud/pubsub_v1/proto/pubsub.proto b/google/cloud/pubsub_v1/proto/pubsub.proto index 173c4ce71..c5cb855d6 100644 --- a/google/cloud/pubsub_v1/proto/pubsub.proto +++ b/google/cloud/pubsub_v1/proto/pubsub.proto @@ -196,6 +196,16 @@ message Topic { // Reserved for future use. This field is set only in responses from the // server; it is ignored if it is set in any requests. bool satisfies_pzs = 7; + + // Indicates the minimum duration to retain a message after it is published to + // the topic. If this field is set, messages published to the topic in the + // last `message_retention_duration` are always available to subscribers. For + // instance, it allows any attached subscription to [seek to a + // timestamp](https://cloud.google.com/pubsub/docs/replay-overview#seek_to_a_time) + // that is up to `message_retention_duration` in the past. If this field is + // not set, message retention is controlled by settings on individual + // subscriptions. Cannot be more than 7 days or less than 10 minutes. + google.protobuf.Duration message_retention_duration = 8; } // A message that is published by publishers and consumed by subscribers. The @@ -740,6 +750,15 @@ message Subscription { // FAILED_PRECONDITION. If the subscription is a push subscription, pushes to // the endpoint will not be made. bool detached = 15; + + // Output only. Indicates the minimum duration for which a message is retained + // after it is published to the subscription's topic. If this field is set, + // messages published to the subscription's topic in the last + // `topic_message_retention_duration` are always available to subscribers. See + // the `message_retention_duration` field in `Topic`. This field is set only + // in responses from the server; it is ignored if it is set in any requests. + google.protobuf.Duration topic_message_retention_duration = 17 + [(google.api.field_behavior) = OUTPUT_ONLY]; } // A policy that specifies how Cloud Pub/Sub retries message delivery. diff --git a/google/pubsub_v1/services/publisher/async_client.py b/google/pubsub_v1/services/publisher/async_client.py index 09e4b0e55..df436e721 100644 --- a/google/pubsub_v1/services/publisher/async_client.py +++ b/google/pubsub_v1/services/publisher/async_client.py @@ -29,6 +29,7 @@ from google.iam.v1 import iam_policy_pb2 # type: ignore from google.iam.v1 import policy_pb2 # type: ignore +from google.protobuf import duration_pb2 # type: ignore from google.pubsub_v1.services.publisher import pagers from google.pubsub_v1.types import pubsub from google.pubsub_v1.types import TimeoutType diff --git a/google/pubsub_v1/services/publisher/client.py b/google/pubsub_v1/services/publisher/client.py index cb4450608..e68443254 100644 --- a/google/pubsub_v1/services/publisher/client.py +++ b/google/pubsub_v1/services/publisher/client.py @@ -34,6 +34,7 @@ from google.iam.v1 import iam_policy_pb2 # type: ignore from google.iam.v1 import policy_pb2 # type: ignore +from google.protobuf import duration_pb2 # type: ignore from google.pubsub_v1.services.publisher import pagers from google.pubsub_v1.types import pubsub from google.pubsub_v1.types import TimeoutType diff --git a/google/pubsub_v1/types/pubsub.py b/google/pubsub_v1/types/pubsub.py index bfbbcaf87..dadf62c0d 100644 --- a/google/pubsub_v1/types/pubsub.py +++ b/google/pubsub_v1/types/pubsub.py @@ -143,6 +143,18 @@ class Topic(proto.Message): Reserved for future use. This field is set only in responses from the server; it is ignored if it is set in any requests. + message_retention_duration (google.protobuf.duration_pb2.Duration): + Indicates the minimum duration to retain a message after it + is published to the topic. If this field is set, messages + published to the topic in the last + ``message_retention_duration`` are always available to + subscribers. For instance, it allows any attached + subscription to `seek to a + timestamp `__ + that is up to ``message_retention_duration`` in the past. If + this field is not set, message retention is controlled by + settings on individual subscriptions. Cannot be more than 7 + days or less than 10 minutes. """ name = proto.Field(proto.STRING, number=1,) @@ -153,6 +165,9 @@ class Topic(proto.Message): kms_key_name = proto.Field(proto.STRING, number=5,) schema_settings = proto.Field(proto.MESSAGE, number=6, message="SchemaSettings",) satisfies_pzs = proto.Field(proto.BOOL, number=7,) + message_retention_duration = proto.Field( + proto.MESSAGE, number=8, message=duration_pb2.Duration, + ) class PubsubMessage(proto.Message): @@ -541,6 +556,15 @@ class Subscription(proto.Message): ``StreamingPull`` requests will return FAILED_PRECONDITION. If the subscription is a push subscription, pushes to the endpoint will not be made. + topic_message_retention_duration (google.protobuf.duration_pb2.Duration): + Output only. Indicates the minimum duration for which a + message is retained after it is published to the + subscription's topic. If this field is set, messages + published to the subscription's topic in the last + ``topic_message_retention_duration`` are always available to + subscribers. See the ``message_retention_duration`` field in + ``Topic``. This field is set only in responses from the + server; it is ignored if it is set in any requests. """ name = proto.Field(proto.STRING, number=1,) @@ -562,6 +586,9 @@ class Subscription(proto.Message): ) retry_policy = proto.Field(proto.MESSAGE, number=14, message="RetryPolicy",) detached = proto.Field(proto.BOOL, number=15,) + topic_message_retention_duration = proto.Field( + proto.MESSAGE, number=17, message=duration_pb2.Duration, + ) class RetryPolicy(proto.Message): diff --git a/scripts/fixup_pubsub_v1_keywords.py b/scripts/fixup_pubsub_v1_keywords.py index 7262e021e..da668f42f 100644 --- a/scripts/fixup_pubsub_v1_keywords.py +++ b/scripts/fixup_pubsub_v1_keywords.py @@ -42,8 +42,8 @@ class pubsubCallTransformer(cst.CSTTransformer): 'acknowledge': ('subscription', 'ack_ids', ), 'create_schema': ('parent', 'schema', 'schema_id', ), 'create_snapshot': ('name', 'subscription', 'labels', ), - 'create_subscription': ('name', 'topic', 'push_config', 'ack_deadline_seconds', 'retain_acked_messages', 'message_retention_duration', 'labels', 'enable_message_ordering', 'expiration_policy', 'filter', 'dead_letter_policy', 'retry_policy', 'detached', ), - 'create_topic': ('name', 'labels', 'message_storage_policy', 'kms_key_name', 'schema_settings', 'satisfies_pzs', ), + 'create_subscription': ('name', 'topic', 'push_config', 'ack_deadline_seconds', 'retain_acked_messages', 'message_retention_duration', 'labels', 'enable_message_ordering', 'expiration_policy', 'filter', 'dead_letter_policy', 'retry_policy', 'detached', 'topic_message_retention_duration', ), + 'create_topic': ('name', 'labels', 'message_storage_policy', 'kms_key_name', 'schema_settings', 'satisfies_pzs', 'message_retention_duration', ), 'delete_schema': ('name', ), 'delete_snapshot': ('snapshot', ), 'delete_subscription': ('subscription', ), diff --git a/tests/unit/gapic/pubsub_v1/test_publisher.py b/tests/unit/gapic/pubsub_v1/test_publisher.py index b171fb3e7..ae5654d87 100644 --- a/tests/unit/gapic/pubsub_v1/test_publisher.py +++ b/tests/unit/gapic/pubsub_v1/test_publisher.py @@ -35,6 +35,7 @@ from google.iam.v1 import options_pb2 # type: ignore from google.iam.v1 import policy_pb2 # type: ignore from google.oauth2 import service_account +from google.protobuf import duration_pb2 # type: ignore from google.protobuf import field_mask_pb2 # type: ignore from google.protobuf import timestamp_pb2 # type: ignore from google.pubsub_v1.services.publisher import PublisherAsyncClient