Skip to content

Commit

Permalink
fix: Enable pytype on Pub/Sub Lite repo and fix all errors (#214)
Browse files Browse the repository at this point in the history
* chore: Enable pytype on Pub/Sub Lite repo and fix all errors

* chore: fix GapicConnection typing

* chore: no empty string notimplementederror
  • Loading branch information
dpcollins-google committed Sep 13, 2021
1 parent f441d31 commit df58fdf
Show file tree
Hide file tree
Showing 31 changed files with 127 additions and 85 deletions.
2 changes: 1 addition & 1 deletion google/cloud/pubsublite/admin_client.py
Expand Up @@ -18,7 +18,7 @@
from google.api_core.client_options import ClientOptions
from google.api_core.operation import Operation
from google.auth.credentials import Credentials
from google.protobuf.field_mask_pb2 import FieldMask
from google.protobuf.field_mask_pb2 import FieldMask # pytype: disable=pyi-error

from google.cloud.pubsublite.admin_client_interface import AdminClientInterface
from google.cloud.pubsublite.internal.constructable_from_service_account import (
Expand Down
2 changes: 1 addition & 1 deletion google/cloud/pubsublite/admin_client_interface.py
Expand Up @@ -27,7 +27,7 @@
)
from google.cloud.pubsublite.types.paths import ReservationPath
from google.cloud.pubsublite_v1 import Topic, Subscription, Reservation
from google.protobuf.field_mask_pb2 import FieldMask
from google.protobuf.field_mask_pb2 import FieldMask # pytype: disable=pyi-error


class AdminClientInterface(ABC):
Expand Down
Expand Up @@ -12,11 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from abc import abstractmethod
from abc import abstractmethod, ABCMeta
from typing import AsyncContextManager


class AckSetTracker(AsyncContextManager):
class AckSetTracker(AsyncContextManager, metaclass=ABCMeta):
"""
An AckSetTracker tracks disjoint acknowledged messages and commits them when a contiguous prefix of tracked offsets
is aggregated.
Expand Down
Expand Up @@ -88,6 +88,9 @@ def _make_dynamic_assigner(
credentials: Optional[Credentials],
base_metadata: Optional[Mapping[str, str]],
) -> Assigner:
if base_metadata is None:
base_metadata = {}

def assignment_connection_factory(
requests: AsyncIterator[PartitionAssignmentRequest],
):
Expand Down
Expand Up @@ -51,7 +51,7 @@ def str(self) -> str:
return json.dumps({"generation": self.generation, "offset": self.offset})

@staticmethod
def parse(payload: str) -> "_AckId":
def parse(payload: str) -> "_AckId": # pytype: disable=invalid-annotation
loaded = json.loads(payload)
return _AckId(
generation=int(loaded["generation"]), offset=int(loaded["offset"]),
Expand Down
Expand Up @@ -12,12 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from abc import abstractmethod
from abc import abstractmethod, ABCMeta
from typing import AsyncContextManager, Mapping, ContextManager
from concurrent import futures


class AsyncSinglePublisher(AsyncContextManager):
class AsyncSinglePublisher(AsyncContextManager, metaclass=ABCMeta):
"""
An AsyncPublisher publishes messages similar to Google Pub/Sub, but must be used in an
async context. Any publish failures are permanent.
Expand All @@ -43,9 +43,10 @@ async def publish(
Raises:
GoogleApiCallError: On a permanent failure.
"""
raise NotImplementedError()


class SinglePublisher(ContextManager):
class SinglePublisher(ContextManager, metaclass=ABCMeta):
"""
A Publisher publishes messages similar to Google Pub/Sub. Any publish failures are permanent.
Expand All @@ -70,3 +71,4 @@ def publish(
Raises:
GoogleApiCallError: On a permanent failure.
"""
raise NotImplementedError()
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from abc import abstractmethod
from abc import abstractmethod, ABCMeta
from typing import AsyncContextManager, Callable, Set, Optional

from google.cloud.pubsub_v1.subscriber.message import Message
Expand All @@ -24,7 +24,7 @@
)


class AsyncSingleSubscriber(AsyncContextManager):
class AsyncSingleSubscriber(AsyncContextManager, metaclass=ABCMeta):
"""
A Cloud Pub/Sub asynchronous subscriber.
Expand Down
2 changes: 1 addition & 1 deletion google/cloud/pubsublite/cloudpubsub/message_transforms.py
Expand Up @@ -15,7 +15,7 @@
import datetime

from google.api_core.exceptions import InvalidArgument
from google.protobuf.timestamp_pb2 import Timestamp
from google.protobuf.timestamp_pb2 import Timestamp # pytype: disable=pyi-error
from google.pubsub_v1 import PubsubMessage

from google.cloud.pubsublite.cloudpubsub import MessageTransformer
Expand Down
Expand Up @@ -12,14 +12,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from abc import abstractmethod
from abc import abstractmethod, ABCMeta
from concurrent.futures import Future
from typing import ContextManager, Mapping, Union, AsyncContextManager

from google.cloud.pubsublite.types import TopicPath


class AsyncPublisherClientInterface(AsyncContextManager):
class AsyncPublisherClientInterface(AsyncContextManager, metaclass=ABCMeta):
"""
An AsyncPublisherClientInterface publishes messages similar to Google Pub/Sub, but must be used in an
async context. Any publish failures are unlikely to succeed if retried.
Expand Down Expand Up @@ -50,9 +50,10 @@ async def publish(
Raises:
GoogleApiCallError: On a permanent failure.
"""
raise NotImplementedError()


class PublisherClientInterface(ContextManager):
class PublisherClientInterface(ContextManager, metaclass=ABCMeta):
"""
A PublisherClientInterface publishes messages similar to Google Pub/Sub.
Any publish failures are unlikely to succeed if retried.
Expand Down Expand Up @@ -84,3 +85,4 @@ def publish(
Raises:
GoogleApiCallError: On a permanent failure.
"""
raise NotImplementedError()
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from abc import abstractmethod
from abc import abstractmethod, ABCMeta
from typing import (
ContextManager,
Union,
Expand All @@ -33,7 +33,7 @@
)


class AsyncSubscriberClientInterface(AsyncContextManager):
class AsyncSubscriberClientInterface(AsyncContextManager, metaclass=ABCMeta):
"""
An AsyncSubscriberClientInterface reads messages similar to Google Pub/Sub, but must be used in an
async context.
Expand Down Expand Up @@ -64,12 +64,13 @@ async def subscribe(
Raises:
GoogleApiCallError: On a permanent failure.
"""
raise NotImplementedError()


MessageCallback = Callable[[Message], None]


class SubscriberClientInterface(ContextManager):
class SubscriberClientInterface(ContextManager, metaclass=ABCMeta):
"""
A SubscriberClientInterface reads messages similar to Google Pub/Sub.
Any subscribe failures are unlikely to succeed if retried.
Expand Down Expand Up @@ -103,3 +104,4 @@ def subscribe(
Raises:
GoogleApiCallError: On a permanent failure.
"""
raise NotImplementedError()
15 changes: 8 additions & 7 deletions google/cloud/pubsublite/internal/wire/admin_client_impl.py
Expand Up @@ -16,7 +16,7 @@

from google.api_core.exceptions import InvalidArgument
from google.api_core.operation import Operation
from google.protobuf.field_mask_pb2 import FieldMask
from google.protobuf.field_mask_pb2 import FieldMask # pytype: disable=pyi-error

from google.cloud.pubsublite.admin_client_interface import AdminClientInterface
from google.cloud.pubsublite.types import (
Expand All @@ -37,6 +37,7 @@
Reservation,
TimeTarget,
SeekSubscriptionRequest,
CreateSubscriptionRequest,
)


Expand Down Expand Up @@ -88,12 +89,12 @@ def create_subscription(
) -> Subscription:
path = SubscriptionPath.parse(subscription.name)
return self._underlying.create_subscription(
request={
"parent": str(path.to_location_path()),
"subscription": subscription,
"subscription_id": path.name,
"skip_backlog": (starting_offset == BacklogLocation.END),
}
request=CreateSubscriptionRequest(
parent=str(path.to_location_path()),
subscription=subscription,
subscription_id=path.name,
skip_backlog=(starting_offset == BacklogLocation.END),
)
)

def get_subscription(self, subscription_path: SubscriptionPath) -> Subscription:
Expand Down
6 changes: 3 additions & 3 deletions google/cloud/pubsublite/internal/wire/assigner.py
Expand Up @@ -12,13 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from abc import abstractmethod
from abc import abstractmethod, ABCMeta
from typing import AsyncContextManager, Set

from google.cloud.pubsublite.types import Partition
from google.cloud.pubsublite.types.partition import Partition


class Assigner(AsyncContextManager):
class Assigner(AsyncContextManager, metaclass=ABCMeta):
"""
An assigner will deliver a continuous stream of assignments when called into. Perform all necessary work with the
assignment before attempting to get the next one.
Expand Down
2 changes: 1 addition & 1 deletion google/cloud/pubsublite/internal/wire/assigner_impl.py
Expand Up @@ -28,7 +28,7 @@
ConnectionReinitializer,
)
from google.cloud.pubsublite.internal.wire.connection import Connection
from google.cloud.pubsublite.types import Partition
from google.cloud.pubsublite.types.partition import Partition
from google.cloud.pubsublite_v1.types import (
PartitionAssignmentRequest,
PartitionAssignment,
Expand Down
4 changes: 2 additions & 2 deletions google/cloud/pubsublite/internal/wire/committer.py
Expand Up @@ -12,13 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from abc import abstractmethod
from abc import abstractmethod, ABCMeta
from typing import AsyncContextManager

from google.cloud.pubsublite_v1 import Cursor


class Committer(AsyncContextManager):
class Committer(AsyncContextManager, metaclass=ABCMeta):
"""
A Committer is able to commit subscribers' completed offsets.
"""
Expand Down
9 changes: 6 additions & 3 deletions google/cloud/pubsublite/internal/wire/connection.py
Expand Up @@ -13,13 +13,15 @@
# limitations under the License.

from typing import Generic, TypeVar, AsyncContextManager
from abc import abstractmethod
from abc import abstractmethod, ABCMeta

Request = TypeVar("Request")
Response = TypeVar("Response")


class Connection(Generic[Request, Response], AsyncContextManager["Connection"]):
class Connection(
AsyncContextManager["Connection"], Generic[Request, Response], metaclass=ABCMeta
):
"""
A connection to an underlying stream. Only one call to 'read' may be outstanding at a time.
"""
Expand All @@ -45,8 +47,9 @@ async def read(self) -> Response:
raise NotImplementedError()


class ConnectionFactory(Generic[Request, Response]):
class ConnectionFactory(Generic[Request, Response], metaclass=ABCMeta):
"""A factory for producing Connections."""

@abstractmethod
async def new(self) -> Connection[Request, Response]:
raise NotImplementedError()
Expand Up @@ -16,7 +16,7 @@
import random

from google.cloud.pubsublite.internal.wire.routing_policy import RoutingPolicy
from google.cloud.pubsublite.types import Partition
from google.cloud.pubsublite.types.partition import Partition
from google.cloud.pubsublite_v1.types import PubSubMessage


Expand Down
Expand Up @@ -16,7 +16,7 @@
from typing import Set

from google.cloud.pubsublite.internal.wire.assigner import Assigner
from google.cloud.pubsublite.types import Partition
from google.cloud.pubsublite.types.partition import Partition


class FixedSetAssigner(Assigner):
Expand Down
20 changes: 16 additions & 4 deletions google/cloud/pubsublite/internal/wire/gapic_connection.py
Expand Up @@ -12,7 +12,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import AsyncIterator, TypeVar, Optional, Callable, AsyncIterable, Awaitable
from typing import (
cast,
AsyncIterator,
TypeVar,
Optional,
Callable,
AsyncIterable,
Awaitable,
)
import asyncio

from google.api_core.exceptions import GoogleAPICallError, FailedPrecondition
Expand All @@ -34,7 +42,7 @@ class GapicConnection(
):
"""A Connection wrapping a gapic AsyncIterator[Request/Response] pair."""

_write_queue: "asyncio.Queue[WorkItem[Request]]"
_write_queue: "asyncio.Queue[WorkItem[Request, None]]"
_response_it: Optional[AsyncIterator[Response]]

def __init__(self):
Expand All @@ -50,8 +58,12 @@ async def write(self, request: Request) -> None:
await self.await_unless_failed(item.response_future)

async def read(self) -> Response:
if self._response_it is None:
self.fail(FailedPrecondition("GapicConnection not initialized."))
raise self.error()
try:
return await self.await_unless_failed(self._response_it.__anext__())
response_it = cast(AsyncIterator[Response], self._response_it)
return await self.await_unless_failed(response_it.__anext__())
except StopAsyncIteration:
self.fail(FailedPrecondition("Server sent unprompted half close."))
except GoogleAPICallError as e:
Expand All @@ -65,7 +77,7 @@ async def __aexit__(self, exc_type, exc_value, traceback) -> None:
pass

async def __anext__(self) -> Request:
item: WorkItem[Request] = await self.await_unless_failed(
item: WorkItem[Request, None] = await self.await_unless_failed(
self._write_queue.get()
)
item.response_future.set_result(None)
Expand Down
Expand Up @@ -12,11 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from abc import abstractmethod
from abc import abstractmethod, ABCMeta
from typing import AsyncContextManager


class PartitionCountWatcher(AsyncContextManager):
class PartitionCountWatcher(AsyncContextManager, metaclass=ABCMeta):
@abstractmethod
async def get_partition_count(self) -> int:
raise NotImplementedError()
Expand Up @@ -32,7 +32,7 @@ class PartitionCountWatcherImpl(PartitionCountWatcher, PermanentFailable):
_any_success: bool
_thread: ThreadPoolExecutor
_queue: asyncio.Queue
_poll_partition_loop: asyncio.Future
_partition_loop_poller: asyncio.Future

def __init__(
self, admin: AdminClientInterface, topic_path: TopicPath, duration: float
Expand All @@ -46,13 +46,13 @@ def __init__(
async def __aenter__(self):
self._thread = ThreadPoolExecutor(max_workers=1)
self._queue = asyncio.Queue(maxsize=1)
self._poll_partition_loop = asyncio.ensure_future(
self._partition_loop_poller = asyncio.ensure_future(
self.run_poller(self._poll_partition_loop)
)

async def __aexit__(self, exc_type, exc_val, exc_tb):
self._poll_partition_loop.cancel()
await wait_ignore_cancelled(self._poll_partition_loop)
self._partition_loop_poller.cancel()
await wait_ignore_cancelled(self._partition_loop_poller)
self._thread.shutdown(wait=False)

def _get_partition_count_sync(self) -> int:
Expand Down

0 comments on commit df58fdf

Please sign in to comment.