Skip to content

Commit

Permalink
feat: adding ability to create subscriptions at HEAD (#106)
Browse files Browse the repository at this point in the history
* feat: adding ability to create subscriptions at head

* fix: lint errors

* fix: remove absl dependency

* fix: lint

* feat: use default keyword args

* fix: rename offset location to backlog location

* fix: broken samples
  • Loading branch information
hannahrogers-google committed Mar 22, 2021
1 parent 62e376c commit 4d03d3a
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 8 deletions.
9 changes: 7 additions & 2 deletions google/cloud/pubsublite/admin_client.py
Expand Up @@ -30,6 +30,7 @@
SubscriptionPath,
LocationPath,
TopicPath,
BacklogLocation,
)
from google.cloud.pubsublite_v1 import AdminServiceClient, Subscription, Topic

Expand Down Expand Up @@ -101,8 +102,12 @@ def list_topic_subscriptions(self, topic_path: TopicPath):
return self._impl.list_topic_subscriptions(topic_path)

@overrides
def create_subscription(self, subscription: Subscription) -> Subscription:
return self._impl.create_subscription(subscription)
def create_subscription(
self,
subscription: Subscription,
starting_offset: BacklogLocation = BacklogLocation.END,
) -> Subscription:
return self._impl.create_subscription(subscription, starting_offset)

@overrides
def get_subscription(self, subscription_path: SubscriptionPath) -> Subscription:
Expand Down
11 changes: 9 additions & 2 deletions google/cloud/pubsublite/admin_client_interface.py
Expand Up @@ -20,6 +20,7 @@
TopicPath,
LocationPath,
SubscriptionPath,
BacklogLocation,
)
from google.cloud.pubsublite_v1 import Topic, Subscription
from google.protobuf.field_mask_pb2 import FieldMask
Expand Down Expand Up @@ -63,8 +64,14 @@ def list_topic_subscriptions(self, topic_path: TopicPath):
"""List the subscriptions that exist for a given topic."""

@abstractmethod
def create_subscription(self, subscription: Subscription) -> Subscription:
"""Create a subscription, returns the created subscription."""
def create_subscription(
self,
subscription: Subscription,
starting_offset: BacklogLocation = BacklogLocation.END,
) -> Subscription:
"""Create a subscription, returns the created subscription. By default
a subscription will only receive messages published after the
subscription was created."""

@abstractmethod
def get_subscription(self, subscription_path: SubscriptionPath) -> Subscription:
Expand Down
16 changes: 12 additions & 4 deletions google/cloud/pubsublite/internal/wire/admin_client_impl.py
Expand Up @@ -22,6 +22,7 @@
SubscriptionPath,
LocationPath,
TopicPath,
BacklogLocation,
)
from google.cloud.pubsublite_v1 import (
Subscription,
Expand Down Expand Up @@ -72,12 +73,19 @@ def list_topic_subscriptions(self, topic_path: TopicPath):
]
return [SubscriptionPath.parse(x) for x in subscription_strings]

def create_subscription(self, subscription: Subscription) -> Subscription:
def create_subscription(
self,
subscription: Subscription,
starting_offset: BacklogLocation = BacklogLocation.END,
) -> Subscription:
path = SubscriptionPath.parse(subscription.name)
return self._underlying.create_subscription(
parent=str(path.to_location_path()),
subscription=subscription,
subscription_id=path.name,
request={
"parent": str(path.to_location_path()),
"subscription": subscription,
"subscription_id": path.name,
"skip_backlog": (starting_offset == BacklogLocation.END),
}
)

def get_subscription(self, subscription_path: SubscriptionPath) -> Subscription:
Expand Down
2 changes: 2 additions & 0 deletions google/cloud/pubsublite/types/__init__.py
Expand Up @@ -18,6 +18,7 @@
from .paths import LocationPath, TopicPath, SubscriptionPath
from .message_metadata import MessageMetadata
from .flow_control_settings import FlowControlSettings, DISABLED_FLOW_CONTROL
from .backlog_location import BacklogLocation

__all__ = (
"CloudRegion",
Expand All @@ -28,4 +29,5 @@
"MessageMetadata",
"SubscriptionPath",
"TopicPath",
"BacklogLocation",
)
24 changes: 24 additions & 0 deletions google/cloud/pubsublite/types/backlog_location.py
@@ -0,0 +1,24 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import enum


class BacklogLocation(enum.Enum):
"""A location with respect to the message backlog. BEGINNING refers to the
location of the oldest retained message. END refers to the location past
all currently published messages, skipping the entire message backlog."""

BEGINNING = 0
END = 1

0 comments on commit 4d03d3a

Please sign in to comment.