Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Enable pytype on Pub/Sub Lite repo and fix all errors #214

Merged
merged 3 commits into from Sep 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion google/cloud/pubsublite/admin_client.py
Expand Up @@ -18,7 +18,7 @@
from google.api_core.client_options import ClientOptions
from google.api_core.operation import Operation
from google.auth.credentials import Credentials
from google.protobuf.field_mask_pb2 import FieldMask
from google.protobuf.field_mask_pb2 import FieldMask # pytype: disable=pyi-error

from google.cloud.pubsublite.admin_client_interface import AdminClientInterface
from google.cloud.pubsublite.internal.constructable_from_service_account import (
Expand Down
2 changes: 1 addition & 1 deletion google/cloud/pubsublite/admin_client_interface.py
Expand Up @@ -27,7 +27,7 @@
)
from google.cloud.pubsublite.types.paths import ReservationPath
from google.cloud.pubsublite_v1 import Topic, Subscription, Reservation
from google.protobuf.field_mask_pb2 import FieldMask
from google.protobuf.field_mask_pb2 import FieldMask # pytype: disable=pyi-error


class AdminClientInterface(ABC):
Expand Down
Expand Up @@ -12,11 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from abc import abstractmethod
from abc import abstractmethod, ABCMeta
from typing import AsyncContextManager


class AckSetTracker(AsyncContextManager):
class AckSetTracker(AsyncContextManager, metaclass=ABCMeta):
"""
An AckSetTracker tracks disjoint acknowledged messages and commits them when a contiguous prefix of tracked offsets
is aggregated.
Expand Down
Expand Up @@ -88,6 +88,9 @@ def _make_dynamic_assigner(
credentials: Optional[Credentials],
base_metadata: Optional[Mapping[str, str]],
) -> Assigner:
if base_metadata is None:
base_metadata = {}

def assignment_connection_factory(
requests: AsyncIterator[PartitionAssignmentRequest],
):
Expand Down
Expand Up @@ -51,7 +51,7 @@ def str(self) -> str:
return json.dumps({"generation": self.generation, "offset": self.offset})

@staticmethod
def parse(payload: str) -> "_AckId":
def parse(payload: str) -> "_AckId": # pytype: disable=invalid-annotation
loaded = json.loads(payload)
return _AckId(
generation=int(loaded["generation"]), offset=int(loaded["offset"]),
Expand Down
Expand Up @@ -12,12 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from abc import abstractmethod
from abc import abstractmethod, ABCMeta
from typing import AsyncContextManager, Mapping, ContextManager
from concurrent import futures


class AsyncSinglePublisher(AsyncContextManager):
class AsyncSinglePublisher(AsyncContextManager, metaclass=ABCMeta):
"""
An AsyncPublisher publishes messages similar to Google Pub/Sub, but must be used in an
async context. Any publish failures are permanent.
Expand All @@ -43,9 +43,10 @@ async def publish(
Raises:
GoogleApiCallError: On a permanent failure.
"""
raise NotImplementedError()


class SinglePublisher(ContextManager):
class SinglePublisher(ContextManager, metaclass=ABCMeta):
"""
A Publisher publishes messages similar to Google Pub/Sub. Any publish failures are permanent.

Expand All @@ -70,3 +71,4 @@ def publish(
Raises:
GoogleApiCallError: On a permanent failure.
"""
raise NotImplementedError()
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from abc import abstractmethod
from abc import abstractmethod, ABCMeta
from typing import AsyncContextManager, Callable, Set, Optional

from google.cloud.pubsub_v1.subscriber.message import Message
Expand All @@ -24,7 +24,7 @@
)


class AsyncSingleSubscriber(AsyncContextManager):
class AsyncSingleSubscriber(AsyncContextManager, metaclass=ABCMeta):
"""
A Cloud Pub/Sub asynchronous subscriber.

Expand Down
Expand Up @@ -15,7 +15,7 @@
import datetime

from google.api_core.exceptions import InvalidArgument
from google.protobuf.timestamp_pb2 import Timestamp
from google.protobuf.timestamp_pb2 import Timestamp # pytype: disable=pyi-error
from google.pubsub_v1 import PubsubMessage

from google.cloud.pubsublite.cloudpubsub import MessageTransformer
Expand Down
Expand Up @@ -12,14 +12,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from abc import abstractmethod
from abc import abstractmethod, ABCMeta
from concurrent.futures import Future
from typing import ContextManager, Mapping, Union, AsyncContextManager

from google.cloud.pubsublite.types import TopicPath


class AsyncPublisherClientInterface(AsyncContextManager):
class AsyncPublisherClientInterface(AsyncContextManager, metaclass=ABCMeta):
"""
An AsyncPublisherClientInterface publishes messages similar to Google Pub/Sub, but must be used in an
async context. Any publish failures are unlikely to succeed if retried.
Expand Down Expand Up @@ -50,9 +50,10 @@ async def publish(
Raises:
GoogleApiCallError: On a permanent failure.
"""
raise NotImplementedError()
dpcollins-google marked this conversation as resolved.
Show resolved Hide resolved


class PublisherClientInterface(ContextManager):
class PublisherClientInterface(ContextManager, metaclass=ABCMeta):
"""
A PublisherClientInterface publishes messages similar to Google Pub/Sub.
Any publish failures are unlikely to succeed if retried.
Expand Down Expand Up @@ -84,3 +85,4 @@ def publish(
Raises:
GoogleApiCallError: On a permanent failure.
"""
raise NotImplementedError()
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from abc import abstractmethod
from abc import abstractmethod, ABCMeta
from typing import (
ContextManager,
Union,
Expand All @@ -33,7 +33,7 @@
)


class AsyncSubscriberClientInterface(AsyncContextManager):
class AsyncSubscriberClientInterface(AsyncContextManager, metaclass=ABCMeta):
"""
An AsyncSubscriberClientInterface reads messages similar to Google Pub/Sub, but must be used in an
async context.
Expand Down Expand Up @@ -64,12 +64,13 @@ async def subscribe(
Raises:
GoogleApiCallError: On a permanent failure.
"""
raise NotImplementedError()


MessageCallback = Callable[[Message], None]


class SubscriberClientInterface(ContextManager):
class SubscriberClientInterface(ContextManager, metaclass=ABCMeta):
"""
A SubscriberClientInterface reads messages similar to Google Pub/Sub.
Any subscribe failures are unlikely to succeed if retried.
Expand Down Expand Up @@ -103,3 +104,4 @@ def subscribe(
Raises:
GoogleApiCallError: On a permanent failure.
"""
raise NotImplementedError()
15 changes: 8 additions & 7 deletions google/cloud/pubsublite/internal/wire/admin_client_impl.py
Expand Up @@ -16,7 +16,7 @@

from google.api_core.exceptions import InvalidArgument
from google.api_core.operation import Operation
from google.protobuf.field_mask_pb2 import FieldMask
from google.protobuf.field_mask_pb2 import FieldMask # pytype: disable=pyi-error

from google.cloud.pubsublite.admin_client_interface import AdminClientInterface
from google.cloud.pubsublite.types import (
Expand All @@ -37,6 +37,7 @@
Reservation,
TimeTarget,
SeekSubscriptionRequest,
CreateSubscriptionRequest,
)


Expand Down Expand Up @@ -88,12 +89,12 @@ def create_subscription(
) -> Subscription:
path = SubscriptionPath.parse(subscription.name)
return self._underlying.create_subscription(
request={
"parent": str(path.to_location_path()),
"subscription": subscription,
"subscription_id": path.name,
"skip_backlog": (starting_offset == BacklogLocation.END),
}
request=CreateSubscriptionRequest(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is potentially a breaking behavior change due to some known issues in proto-plus if you have any users that have passed in a subscription as a dictionary

googleapis/proto-plus-python#161 comes to mind, but I think there was another one specifically about mixing dictionaries and protos, but I'm having trouble finding it.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Acknowledged. I'm okay with this- the type hints tell the user the exact type they need to pass. I don't think this library has wide enough usage that its expected to be bug-compatable with past versions: this is only a breaking change if proto-plus-python has a bug preventing usage of a dict where it should be usable, and the user is ignoring the type hint provided on the method.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't looked closely enough to know whether it's a breaking change or not, but if it is, we should update the issue title to not use chore accordingly.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

parent=str(path.to_location_path()),
subscription=subscription,
subscription_id=path.name,
skip_backlog=(starting_offset == BacklogLocation.END),
)
)

def get_subscription(self, subscription_path: SubscriptionPath) -> Subscription:
Expand Down
6 changes: 3 additions & 3 deletions google/cloud/pubsublite/internal/wire/assigner.py
Expand Up @@ -12,13 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from abc import abstractmethod
from abc import abstractmethod, ABCMeta
from typing import AsyncContextManager, Set

from google.cloud.pubsublite.types import Partition
from google.cloud.pubsublite.types.partition import Partition


class Assigner(AsyncContextManager):
class Assigner(AsyncContextManager, metaclass=ABCMeta):
"""
An assigner will deliver a continuous stream of assignments when called into. Perform all necessary work with the
assignment before attempting to get the next one.
Expand Down
2 changes: 1 addition & 1 deletion google/cloud/pubsublite/internal/wire/assigner_impl.py
Expand Up @@ -28,7 +28,7 @@
ConnectionReinitializer,
)
from google.cloud.pubsublite.internal.wire.connection import Connection
from google.cloud.pubsublite.types import Partition
from google.cloud.pubsublite.types.partition import Partition
from google.cloud.pubsublite_v1.types import (
PartitionAssignmentRequest,
PartitionAssignment,
Expand Down
4 changes: 2 additions & 2 deletions google/cloud/pubsublite/internal/wire/committer.py
Expand Up @@ -12,13 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from abc import abstractmethod
from abc import abstractmethod, ABCMeta
from typing import AsyncContextManager

from google.cloud.pubsublite_v1 import Cursor


class Committer(AsyncContextManager):
class Committer(AsyncContextManager, metaclass=ABCMeta):
"""
A Committer is able to commit subscribers' completed offsets.
"""
Expand Down
9 changes: 6 additions & 3 deletions google/cloud/pubsublite/internal/wire/connection.py
Expand Up @@ -13,13 +13,15 @@
# limitations under the License.

from typing import Generic, TypeVar, AsyncContextManager
from abc import abstractmethod
from abc import abstractmethod, ABCMeta

Request = TypeVar("Request")
Response = TypeVar("Response")


class Connection(Generic[Request, Response], AsyncContextManager["Connection"]):
class Connection(
AsyncContextManager["Connection"], Generic[Request, Response], metaclass=ABCMeta
):
"""
A connection to an underlying stream. Only one call to 'read' may be outstanding at a time.
"""
Expand All @@ -45,8 +47,9 @@ async def read(self) -> Response:
raise NotImplementedError()


class ConnectionFactory(Generic[Request, Response]):
class ConnectionFactory(Generic[Request, Response], metaclass=ABCMeta):
"""A factory for producing Connections."""

@abstractmethod
async def new(self) -> Connection[Request, Response]:
raise NotImplementedError()
Expand Up @@ -16,7 +16,7 @@
import random

from google.cloud.pubsublite.internal.wire.routing_policy import RoutingPolicy
from google.cloud.pubsublite.types import Partition
from google.cloud.pubsublite.types.partition import Partition
from google.cloud.pubsublite_v1.types import PubSubMessage


Expand Down
Expand Up @@ -16,7 +16,7 @@
from typing import Set

from google.cloud.pubsublite.internal.wire.assigner import Assigner
from google.cloud.pubsublite.types import Partition
from google.cloud.pubsublite.types.partition import Partition


class FixedSetAssigner(Assigner):
Expand Down
20 changes: 16 additions & 4 deletions google/cloud/pubsublite/internal/wire/gapic_connection.py
Expand Up @@ -12,7 +12,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import AsyncIterator, TypeVar, Optional, Callable, AsyncIterable, Awaitable
from typing import (
cast,
AsyncIterator,
TypeVar,
Optional,
Callable,
AsyncIterable,
Awaitable,
)
import asyncio

from google.api_core.exceptions import GoogleAPICallError, FailedPrecondition
Expand All @@ -34,7 +42,7 @@ class GapicConnection(
):
"""A Connection wrapping a gapic AsyncIterator[Request/Response] pair."""

_write_queue: "asyncio.Queue[WorkItem[Request]]"
_write_queue: "asyncio.Queue[WorkItem[Request, None]]"
_response_it: Optional[AsyncIterator[Response]]

def __init__(self):
Expand All @@ -50,8 +58,12 @@ async def write(self, request: Request) -> None:
await self.await_unless_failed(item.response_future)

async def read(self) -> Response:
if self._response_it is None:
self.fail(FailedPrecondition("GapicConnection not initialized."))
raise self.error()
try:
return await self.await_unless_failed(self._response_it.__anext__())
response_it = cast(AsyncIterator[Response], self._response_it)
return await self.await_unless_failed(response_it.__anext__())
except StopAsyncIteration:
self.fail(FailedPrecondition("Server sent unprompted half close."))
except GoogleAPICallError as e:
Expand All @@ -65,7 +77,7 @@ async def __aexit__(self, exc_type, exc_value, traceback) -> None:
pass

async def __anext__(self) -> Request:
item: WorkItem[Request] = await self.await_unless_failed(
item: WorkItem[Request, None] = await self.await_unless_failed(
self._write_queue.get()
)
item.response_future.set_result(None)
Expand Down
Expand Up @@ -12,11 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from abc import abstractmethod
from abc import abstractmethod, ABCMeta
from typing import AsyncContextManager


class PartitionCountWatcher(AsyncContextManager):
class PartitionCountWatcher(AsyncContextManager, metaclass=ABCMeta):
@abstractmethod
async def get_partition_count(self) -> int:
raise NotImplementedError()
Expand Up @@ -32,7 +32,7 @@ class PartitionCountWatcherImpl(PartitionCountWatcher, PermanentFailable):
_any_success: bool
_thread: ThreadPoolExecutor
_queue: asyncio.Queue
_poll_partition_loop: asyncio.Future
_partition_loop_poller: asyncio.Future

def __init__(
self, admin: AdminClientInterface, topic_path: TopicPath, duration: float
Expand All @@ -46,13 +46,13 @@ def __init__(
async def __aenter__(self):
self._thread = ThreadPoolExecutor(max_workers=1)
self._queue = asyncio.Queue(maxsize=1)
self._poll_partition_loop = asyncio.ensure_future(
self._partition_loop_poller = asyncio.ensure_future(
self.run_poller(self._poll_partition_loop)
)

async def __aexit__(self, exc_type, exc_val, exc_tb):
self._poll_partition_loop.cancel()
await wait_ignore_cancelled(self._poll_partition_loop)
self._partition_loop_poller.cancel()
await wait_ignore_cancelled(self._partition_loop_poller)
self._thread.shutdown(wait=False)

def _get_partition_count_sync(self) -> int:
Expand Down