-
Notifications
You must be signed in to change notification settings - Fork 12
/
paths.py
83 lines (68 loc) · 2.69 KB
/
paths.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
from typing import NamedTuple
from google.api_core.exceptions import InvalidArgument
from google.cloud.pubsublite.types.location import CloudZone
class LocationPath(NamedTuple):
project_number: int
location: CloudZone
def __str__(self):
return f"projects/{self.project_number}/locations/{self.location}"
class TopicPath(NamedTuple):
project_number: int
location: CloudZone
name: str
def __str__(self):
return f"projects/{self.project_number}/locations/{self.location}/topics/{self.name}"
def to_location_path(self):
return LocationPath(self.project_number, self.location)
@staticmethod
def parse(to_parse: str) -> "TopicPath":
splits = to_parse.split("/")
if (
len(splits) != 6
or splits[0] != "projects"
or splits[2] != "locations"
or splits[4] != "topics"
):
raise InvalidArgument(
"Topic path must be formatted like projects/{project_number}/locations/{location}/topics/{name} but was instead "
+ to_parse
)
project_number: int
try:
project_number = int(splits[1])
except ValueError:
raise InvalidArgument(
"Topic path must be formatted like projects/{project_number}/locations/{location}/topics/{name} but was instead "
+ to_parse
)
return TopicPath(project_number, CloudZone.parse(splits[3]), splits[5])
class SubscriptionPath(NamedTuple):
project_number: int
location: CloudZone
name: str
def __str__(self):
return f"projects/{self.project_number}/locations/{self.location}/subscriptions/{self.name}"
def to_location_path(self):
return LocationPath(self.project_number, self.location)
@staticmethod
def parse(to_parse: str) -> "SubscriptionPath":
splits = to_parse.split("/")
if (
len(splits) != 6
or splits[0] != "projects"
or splits[2] != "locations"
or splits[4] != "subscriptions"
):
raise InvalidArgument(
"Subscription path must be formatted like projects/{project_number}/locations/{location}/subscriptions/{name} but was instead "
+ to_parse
)
project_number: int
try:
project_number = int(splits[1])
except ValueError:
raise InvalidArgument(
"Subscription path must be formatted like projects/{project_number}/locations/{location}/subscriptions/{name} but was instead "
+ to_parse
)
return SubscriptionPath(project_number, CloudZone.parse(splits[3]), splits[5])