New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement admin client. #17
Merged
Merged
Changes from all commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
ba3140b
feat: Implement committer
dpcollins-google 8d34653
fix: Patch retrying connection and add comments.
dpcollins-google e370118
feat: Implement assigner which generates subscription-partition assig…
dpcollins-google 0500806
feat: Implement FlowControlBatcher
dpcollins-google 5eab873
Use correct request for comparisson.
dpcollins-google 6c44e92
feat: Implement Subscriber, which handles flow control and batch mess…
dpcollins-google 159d3ef
feat: Implement AdminClient, which helps users perform admin operatio…
dpcollins-google b5a33d0
Merge branch 'master' into admin_client_impl
dpcollins-google File filter
Filter by extension
Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
from abc import ABC, abstractmethod | ||
from typing import List, Optional | ||
|
||
from google.api_core.client_options import ClientOptions | ||
from google.protobuf.field_mask_pb2 import FieldMask | ||
|
||
from google.cloud.pubsublite.endpoints import regional_endpoint | ||
from google.cloud.pubsublite.internal.wire.admin_client_impl import AdminClientImpl | ||
from google.cloud.pubsublite.location import CloudRegion | ||
from google.cloud.pubsublite.paths import TopicPath, LocationPath, SubscriptionPath | ||
from google.cloud.pubsublite_v1 import Topic, Subscription, AdminServiceClient | ||
from google.auth.credentials import Credentials | ||
|
||
|
||
class AdminClient(ABC): | ||
@abstractmethod | ||
def region(self) -> CloudRegion: | ||
"""The region this client is for.""" | ||
|
||
@abstractmethod | ||
def create_topic(self, topic: Topic) -> Topic: | ||
"""Create a topic, returns the created topic.""" | ||
|
||
@abstractmethod | ||
def get_topic(self, topic_path: TopicPath) -> Topic: | ||
"""Get the topic object from the server.""" | ||
|
||
@abstractmethod | ||
def get_topic_partition_count(self, topic_path: TopicPath) -> int: | ||
"""Get the number of partitions in the provided topic.""" | ||
|
||
@abstractmethod | ||
def list_topics(self, location_path: LocationPath) -> List[Topic]: | ||
"""List the Pub/Sub lite topics that exist for a project in a given location.""" | ||
|
||
@abstractmethod | ||
def update_topic(self, topic: Topic, update_mask: FieldMask) -> Topic: | ||
"""Update the masked fields of the provided topic.""" | ||
|
||
@abstractmethod | ||
def delete_topic(self, topic_path: TopicPath): | ||
"""Delete a topic and all associated messages.""" | ||
|
||
@abstractmethod | ||
def list_topic_subscriptions(self, topic_path: TopicPath): | ||
"""List the subscriptions that exist for a given topic.""" | ||
|
||
@abstractmethod | ||
def create_subscription(self, subscription: Subscription) -> Subscription: | ||
"""Create a subscription, returns the created subscription.""" | ||
|
||
@abstractmethod | ||
def get_subscription(self, subscription_path: SubscriptionPath) -> Subscription: | ||
"""Get the subscription object from the server.""" | ||
|
||
@abstractmethod | ||
def list_subscriptions(self, location_path: LocationPath) -> List[Subscription]: | ||
"""List the Pub/Sub lite subscriptions that exist for a project in a given location.""" | ||
|
||
@abstractmethod | ||
def update_subscription(self, subscription: Subscription, update_mask: FieldMask) -> Subscription: | ||
"""Update the masked fields of the provided subscription.""" | ||
|
||
@abstractmethod | ||
def delete_subscription(self, subscription_path: SubscriptionPath): | ||
"""Delete a subscription and all associated messages.""" | ||
|
||
|
||
def make_admin_client(region: CloudRegion, credentials: Optional[Credentials] = None, | ||
client_options: Optional[ClientOptions] = None) -> AdminClient: | ||
if client_options is None: | ||
client_options = ClientOptions(api_endpoint=regional_endpoint(region)) | ||
return AdminClientImpl(AdminServiceClient(client_options=client_options, credentials=credentials), region) |
61 changes: 61 additions & 0 deletions
61
google/cloud/pubsublite/internal/wire/admin_client_impl.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
from typing import List | ||
|
||
from google.protobuf.field_mask_pb2 import FieldMask | ||
|
||
from google.cloud.pubsublite.admin_client import AdminClient | ||
from google.cloud.pubsublite.location import CloudRegion | ||
from google.cloud.pubsublite.paths import SubscriptionPath, LocationPath, TopicPath | ||
from google.cloud.pubsublite_v1 import Subscription, Topic, AdminServiceClient, TopicPartitions | ||
|
||
|
||
class AdminClientImpl(AdminClient): | ||
_underlying: AdminServiceClient | ||
_region: CloudRegion | ||
|
||
def __init__(self, underlying: AdminServiceClient, region: CloudRegion): | ||
self._underlying = underlying | ||
self._region = region | ||
|
||
def region(self) -> CloudRegion: | ||
return self._region | ||
|
||
def create_topic(self, topic: Topic) -> Topic: | ||
path = TopicPath.parse(topic.name) | ||
return self._underlying.create_topic(parent=str(path.to_location_path()), topic=topic, topic_id=path.name) | ||
|
||
def get_topic(self, topic_path: TopicPath) -> Topic: | ||
return self._underlying.get_topic(name=str(topic_path)) | ||
|
||
def get_topic_partition_count(self, topic_path: TopicPath) -> int: | ||
partitions: TopicPartitions = self._underlying.get_topic_partitions(name=str(topic_path)) | ||
return partitions.partition_count | ||
|
||
def list_topics(self, location_path: LocationPath) -> List[Topic]: | ||
return [x for x in self._underlying.list_topics(parent=str(location_path))] | ||
|
||
def update_topic(self, topic: Topic, update_mask: FieldMask) -> Topic: | ||
return self._underlying.update_topic(topic=topic, update_mask=update_mask) | ||
|
||
def delete_topic(self, topic_path: TopicPath): | ||
self._underlying.delete_topic(name=str(topic_path)) | ||
|
||
def list_topic_subscriptions(self, topic_path: TopicPath): | ||
subscription_strings = [x for x in self._underlying.list_topic_subscriptions(name=str(topic_path))] | ||
return [SubscriptionPath.parse(x) for x in subscription_strings] | ||
|
||
def create_subscription(self, subscription: Subscription) -> Subscription: | ||
path = SubscriptionPath.parse(subscription.name) | ||
return self._underlying.create_subscription(parent=str(path.to_location_path()), subscription=subscription, | ||
subscription_id=path.name) | ||
|
||
def get_subscription(self, subscription_path: SubscriptionPath) -> Subscription: | ||
return self._underlying.get_subscription(name=str(subscription_path)) | ||
|
||
def list_subscriptions(self, location_path: LocationPath) -> List[Subscription]: | ||
return [x for x in self._underlying.list_subscriptions(parent=str(location_path))] | ||
|
||
def update_subscription(self, subscription: Subscription, update_mask: FieldMask) -> Subscription: | ||
return self._underlying.update_subscription(subscription=subscription, update_mask=update_mask) | ||
|
||
def delete_subscription(self, subscription_path: SubscriptionPath): | ||
self._underlying.delete_subscription(name=str(subscription_path)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: consider checking splits[2] is a number
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nack. splits[2] is not a number, it is a single lowercase character. 'us-central1-a' for example.