Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adding ability to create subscriptions at HEAD #106

Merged
merged 11 commits into from Mar 22, 2021
9 changes: 7 additions & 2 deletions google/cloud/pubsublite/admin_client.py
Expand Up @@ -30,6 +30,7 @@
SubscriptionPath,
LocationPath,
TopicPath,
BacklogLocation,
)
from google.cloud.pubsublite_v1 import AdminServiceClient, Subscription, Topic

Expand Down Expand Up @@ -101,8 +102,12 @@ def list_topic_subscriptions(self, topic_path: TopicPath):
return self._impl.list_topic_subscriptions(topic_path)

@overrides
def create_subscription(self, subscription: Subscription) -> Subscription:
return self._impl.create_subscription(subscription)
def create_subscription(
self,
subscription: Subscription,
starting_offset: BacklogLocation = BacklogLocation.END,
) -> Subscription:
return self._impl.create_subscription(subscription, starting_offset)

@overrides
def get_subscription(self, subscription_path: SubscriptionPath) -> Subscription:
Expand Down
11 changes: 9 additions & 2 deletions google/cloud/pubsublite/admin_client_interface.py
Expand Up @@ -20,6 +20,7 @@
TopicPath,
LocationPath,
SubscriptionPath,
BacklogLocation,
)
from google.cloud.pubsublite_v1 import Topic, Subscription
from google.protobuf.field_mask_pb2 import FieldMask
Expand Down Expand Up @@ -63,8 +64,14 @@ def list_topic_subscriptions(self, topic_path: TopicPath):
"""List the subscriptions that exist for a given topic."""

@abstractmethod
def create_subscription(self, subscription: Subscription) -> Subscription:
"""Create a subscription, returns the created subscription."""
def create_subscription(
self,
subscription: Subscription,
starting_offset: BacklogLocation = BacklogLocation.END,
) -> Subscription:
"""Create a subscription, returns the created subscription. By default
a subscription will only receive messages published after the
subscription was created."""

@abstractmethod
def get_subscription(self, subscription_path: SubscriptionPath) -> Subscription:
Expand Down
16 changes: 12 additions & 4 deletions google/cloud/pubsublite/internal/wire/admin_client_impl.py
Expand Up @@ -22,6 +22,7 @@
SubscriptionPath,
LocationPath,
TopicPath,
BacklogLocation,
)
from google.cloud.pubsublite_v1 import (
Subscription,
Expand Down Expand Up @@ -72,12 +73,19 @@ def list_topic_subscriptions(self, topic_path: TopicPath):
]
return [SubscriptionPath.parse(x) for x in subscription_strings]

def create_subscription(self, subscription: Subscription) -> Subscription:
def create_subscription(
self,
subscription: Subscription,
starting_offset: BacklogLocation = BacklogLocation.END,
) -> Subscription:
path = SubscriptionPath.parse(subscription.name)
return self._underlying.create_subscription(
parent=str(path.to_location_path()),
subscription=subscription,
subscription_id=path.name,
request={
"parent": str(path.to_location_path()),
"subscription": subscription,
"subscription_id": path.name,
"skip_backlog": (starting_offset == BacklogLocation.END),
}
)

def get_subscription(self, subscription_path: SubscriptionPath) -> Subscription:
Expand Down
2 changes: 2 additions & 0 deletions google/cloud/pubsublite/types/__init__.py
Expand Up @@ -18,6 +18,7 @@
from .paths import LocationPath, TopicPath, SubscriptionPath
from .message_metadata import MessageMetadata
from .flow_control_settings import FlowControlSettings, DISABLED_FLOW_CONTROL
from .backlog_location import BacklogLocation

__all__ = (
"CloudRegion",
Expand All @@ -28,4 +29,5 @@
"MessageMetadata",
"SubscriptionPath",
"TopicPath",
"BacklogLocation",
)
24 changes: 24 additions & 0 deletions google/cloud/pubsublite/types/backlog_location.py
@@ -0,0 +1,24 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import enum


class BacklogLocation(enum.Enum):
"""A location with respect to the message backlog. BEGINNING refers to the
location of the oldest retained message. END refers to the location past
all currently published messages, skipping the entire message backlog."""

BEGINNING = 0
END = 1