diff --git a/google/cloud/pubsublite/internal/wire/assigner_impl.py b/google/cloud/pubsublite/internal/wire/assigner_impl.py index 13db5b89..b4ce786e 100644 --- a/google/cloud/pubsublite/internal/wire/assigner_impl.py +++ b/google/cloud/pubsublite/internal/wire/assigner_impl.py @@ -15,7 +15,7 @@ import asyncio from typing import Optional, Set -from absl import logging +import logging from google.cloud.pubsublite.internal.wait_ignore_cancelled import wait_ignore_errors from google.cloud.pubsublite.internal.wire.assigner import Assigner @@ -36,6 +36,8 @@ PartitionAssignmentAck, ) +_LOGGER = logging.getLogger(__name__) + # Maximum bytes per batch at 3.5 MiB to avoid GRPC limit of 4 MiB _MAX_BYTES = int(3.5 * 1024 * 1024) @@ -120,7 +122,7 @@ async def get_assignment(self) -> Set[Partition]: self._outstanding_assignment = False except GoogleAPICallError as e: # If there is a failure to ack, keep going. The stream likely restarted. - logging.debug( + _LOGGER.debug( f"Assignment ack attempt failed due to stream failure: {e}" ) return await self._connection.await_unless_failed(self._new_assignment.get()) diff --git a/google/cloud/pubsublite/internal/wire/committer_impl.py b/google/cloud/pubsublite/internal/wire/committer_impl.py index 115845fe..df7dab18 100644 --- a/google/cloud/pubsublite/internal/wire/committer_impl.py +++ b/google/cloud/pubsublite/internal/wire/committer_impl.py @@ -15,7 +15,7 @@ import asyncio from typing import Optional, List, Iterable -from absl import logging +import logging from google.cloud.pubsublite.internal.wait_ignore_cancelled import wait_ignore_errors from google.cloud.pubsublite.internal.wire.committer import Committer @@ -41,6 +41,9 @@ from google.cloud.pubsublite.internal.wire.work_item import WorkItem +_LOGGER = logging.getLogger(__name__) + + class CommitterImpl( Committer, ConnectionReinitializer[ @@ -149,7 +152,7 @@ async def _flush(self): try: await self._connection.write(req) except GoogleAPICallError as e: - logging.debug(f"Failed commit on stream: {e}") + _LOGGER.debug(f"Failed commit on stream: {e}") self._fail_if_retrying_failed() async def commit(self, cursor: Cursor) -> None: diff --git a/google/cloud/pubsublite/internal/wire/pubsub_context.py b/google/cloud/pubsublite/internal/wire/pubsub_context.py index 032f5fc7..5cbad6f6 100644 --- a/google/cloud/pubsublite/internal/wire/pubsub_context.py +++ b/google/cloud/pubsublite/internal/wire/pubsub_context.py @@ -15,11 +15,14 @@ from base64 import b64encode from typing import Mapping, Optional, NamedTuple -from absl import logging +import logging import pkg_resources from google.protobuf import struct_pb2 +_LOGGER = logging.getLogger(__name__) + + class _Semver(NamedTuple): major: int minor: int @@ -29,7 +32,7 @@ def _version() -> _Semver: version = pkg_resources.get_distribution("google-cloud-pubsublite").version splits = version.split(".") if len(splits) != 3: - logging.info(f"Failed to extract semver from {version}.") + _LOGGER.info(f"Failed to extract semver from {version}.") return _Semver(0, 0) return _Semver(int(splits[0]), int(splits[1])) diff --git a/google/cloud/pubsublite/internal/wire/single_partition_publisher.py b/google/cloud/pubsublite/internal/wire/single_partition_publisher.py index 411d9b13..43b04127 100644 --- a/google/cloud/pubsublite/internal/wire/single_partition_publisher.py +++ b/google/cloud/pubsublite/internal/wire/single_partition_publisher.py @@ -15,7 +15,7 @@ import asyncio from typing import Optional, List, Iterable -from absl import logging +import logging from google.cloud.pubsub_v1.types import BatchSettings from google.cloud.pubsublite.internal.wait_ignore_cancelled import wait_ignore_errors @@ -43,6 +43,8 @@ ) from google.cloud.pubsublite.internal.wire.work_item import WorkItem +_LOGGER = logging.getLogger(__name__) + # Maximum bytes per batch at 3.5 MiB to avoid GRPC limit of 4 MiB _MAX_BYTES = int(3.5 * 1024 * 1024) @@ -156,7 +158,7 @@ async def _flush(self): try: await self._connection.write(aggregate) except GoogleAPICallError as e: - logging.debug(f"Failed publish on stream: {e}") + _LOGGER.debug(f"Failed publish on stream: {e}") self._fail_if_retrying_failed() async def publish(self, message: PubSubMessage) -> MessageMetadata: diff --git a/setup.py b/setup.py index 53bd470e..b638ab6b 100644 --- a/setup.py +++ b/setup.py @@ -29,7 +29,6 @@ dependencies = [ "google-api-core >= 1.22.0", - "absl-py >= 0.9.0", "proto-plus >= 0.4.0", "google-cloud-pubsub >= 2.1.0", "grpcio", diff --git a/testing/constraints-3.6.txt b/testing/constraints-3.6.txt index 5886f65d..8bf36590 100644 --- a/testing/constraints-3.6.txt +++ b/testing/constraints-3.6.txt @@ -6,9 +6,8 @@ # e.g., if setup.py has "foo >= 1.14.0, < 2.0.0dev", # Then this file should have foo==1.14.0 google-api-core==1.22.0 -absl-py==0.9.0 proto-plus==0.4.0 google-cloud-pubsub==2.1.0 grpcio==100000.0.0 setuptools==100000.0.0 -overrides==100000.0.0 \ No newline at end of file +overrides==100000.0.0