Skip to content

Commit

Permalink
chore: remove Python 2 compatibility code (#302)
Browse files Browse the repository at this point in the history
Closes #181.

**PR checklist:**
- [x] Make sure to open an issue as a [bug/issue](https://github.com/googleapis/python-pubsub/issues/new/choose) before writing your code!  That way we can discuss the change, evaluate designs, and agree on the general idea
- [x] Ensure the tests and linter pass
- [x] Code coverage does not decrease (if any source code was changed)
- [x] Appropriate docs were updated (if necessary)
  • Loading branch information
plamut committed Mar 17, 2021
1 parent dfc7e47 commit 5c036e4
Show file tree
Hide file tree
Showing 14 changed files with 27 additions and 50 deletions.
5 changes: 1 addition & 4 deletions google/cloud/pubsub_v1/publisher/_batch/base.py
Expand Up @@ -17,11 +17,8 @@
import abc
import enum

import six


@six.add_metaclass(abc.ABCMeta)
class Batch(object):
class Batch(metaclass=abc.ABCMeta):
"""The base batching class for Pub/Sub publishing.
Although the :class:`~.pubsub_v1.publisher.batch.thread.Batch` class, based
Expand Down
5 changes: 1 addition & 4 deletions google/cloud/pubsub_v1/publisher/_batch/thread.py
Expand Up @@ -18,8 +18,6 @@
import threading
import time

import six

import google.api_core.exceptions
from google.api_core import gapic_v1
from google.cloud.pubsub_v1.publisher import exceptions
Expand Down Expand Up @@ -287,8 +285,7 @@ def _commit(self):
# IDs. We are trusting that there is a 1:1 mapping, and raise
# an exception if not.
self._status = base.BatchStatus.SUCCESS
zip_iter = six.moves.zip(response.message_ids, self._futures)
for message_id, future in zip_iter:
for message_id, future in zip(response.message_ids, self._futures):
future.set_result(message_id)
else:
# Sanity check: If the number of message IDs is not equal to
Expand Down
5 changes: 1 addition & 4 deletions google/cloud/pubsub_v1/publisher/_sequencer/base.py
Expand Up @@ -16,11 +16,8 @@

import abc

import six


@six.add_metaclass(abc.ABCMeta)
class Sequencer(object):
class Sequencer(metaclass=abc.ABCMeta):
"""The base class for sequencers for Pub/Sub publishing. A sequencer
sequences messages to be published.
"""
Expand Down
8 changes: 3 additions & 5 deletions google/cloud/pubsub_v1/publisher/client.py
Expand Up @@ -21,8 +21,6 @@
import threading
import time

import six

from google.api_core import gapic_v1
from google.auth.credentials import AnonymousCredentials
from google.oauth2 import service_account
Expand Down Expand Up @@ -295,7 +293,7 @@ def publish(
"""
# Sanity check: Is the data being sent as a bytestring?
# If it is literally anything else, complain loudly about it.
if not isinstance(data, six.binary_type):
if not isinstance(data, bytes):
raise TypeError(
"Data being published to Pub/Sub must be sent as a bytestring."
)
Expand All @@ -308,9 +306,9 @@ def publish(

# Coerce all attributes to text strings.
for k, v in copy.copy(attrs).items():
if isinstance(v, six.text_type):
if isinstance(v, str):
continue
if isinstance(v, six.binary_type):
if isinstance(v, bytes):
attrs[k] = v.decode("utf-8")
continue
raise TypeError(
Expand Down
Expand Up @@ -13,11 +13,10 @@
# limitations under the License.

import logging
import queue
import time
import uuid

from six.moves import queue


__all__ = ("QueueCallbackWorker", "STOP")

Expand Down
4 changes: 1 addition & 3 deletions google/cloud/pubsub_v1/subscriber/_protocol/leaser.py
Expand Up @@ -21,8 +21,6 @@
import threading
import time

import six

from google.cloud.pubsub_v1.subscriber._protocol import requests


Expand Down Expand Up @@ -144,7 +142,7 @@ def maintain_leases(self):
cutoff = time.time() - self._manager.flow_control.max_lease_duration
to_drop = [
requests.DropRequest(ack_id, item.size, item.ordering_key)
for ack_id, item in six.iteritems(leased_messages)
for ack_id, item in leased_messages.items()
if item.sent_time < cutoff
]

Expand Down
Expand Up @@ -21,7 +21,6 @@
import uuid

import grpc
import six

from google.api_core import bidi
from google.api_core import exceptions
Expand Down Expand Up @@ -406,7 +405,7 @@ def _send_unary_request(self, request):
deadline = request.modify_deadline_seconds[n]
deadline_to_ack_ids[deadline].append(ack_id)

for deadline, ack_ids in six.iteritems(deadline_to_ack_ids):
for deadline, ack_ids in deadline_to_ack_ids.items():
self._client.modify_ack_deadline(
subscription=self._subscription,
ack_ids=ack_ids,
Expand Down
17 changes: 5 additions & 12 deletions google/cloud/pubsub_v1/subscriber/scheduler.py
Expand Up @@ -20,14 +20,10 @@

import abc
import concurrent.futures
import sys
import queue

import six
from six.moves import queue


@six.add_metaclass(abc.ABCMeta)
class Scheduler(object):
class Scheduler(metaclass=abc.ABCMeta):
"""Abstract base class for schedulers.
Schedulers are used to schedule callbacks asynchronously.
Expand Down Expand Up @@ -65,12 +61,9 @@ def shutdown(self):


def _make_default_thread_pool_executor():
# Python 2.7 and 3.6+ have the thread_name_prefix argument, which is useful
# for debugging.
executor_kwargs = {}
if sys.version_info[:2] == (2, 7) or sys.version_info >= (3, 6):
executor_kwargs["thread_name_prefix"] = "ThreadPoolExecutor-ThreadScheduler"
return concurrent.futures.ThreadPoolExecutor(max_workers=10, **executor_kwargs)
return concurrent.futures.ThreadPoolExecutor(
max_workers=10, thread_name_prefix="ThreadPoolExecutor-ThreadScheduler"
)


class ThreadScheduler(Scheduler):
Expand Down
15 changes: 7 additions & 8 deletions tests/system.py
Expand Up @@ -24,7 +24,6 @@

import mock
import pytest
import six

import google.auth
from google.api_core import exceptions as core_exceptions
Expand Down Expand Up @@ -86,12 +85,12 @@ def test_publish_messages(publisher, topic_path, cleanup):
publisher.publish(
topic_path, b"The hail in Wales falls mainly on the snails.", num=str(i)
)
for i in six.moves.range(500)
for i in range(500)
]

for future in futures:
result = future.result()
assert isinstance(result, six.string_types)
assert isinstance(result, str)


def test_publish_large_messages(publisher, topic_path, cleanup):
Expand Down Expand Up @@ -120,7 +119,7 @@ def test_publish_large_messages(publisher, topic_path, cleanup):
# be no "InvalidArgument: request_size is too large" error.
for future in futures:
result = future.result(timeout=10)
assert isinstance(result, six.string_types) # the message ID
assert isinstance(result, str) # the message ID


def test_subscribe_to_messages(
Expand All @@ -142,7 +141,7 @@ def test_subscribe_to_messages(
# Publish some messages.
futures = [
publisher.publish(topic_path, b"Wooooo! The claaaaaw!", num=str(index))
for index in six.moves.range(50)
for index in range(50)
]

# Make sure the publish completes.
Expand All @@ -154,7 +153,7 @@ def test_subscribe_to_messages(
# that we got everything at least once.
callback = AckCallback()
future = subscriber.subscribe(subscription_path, callback)
for second in six.moves.range(10):
for second in range(10):
time.sleep(1)

# The callback should have fired at least fifty times, but it
Expand Down Expand Up @@ -187,7 +186,7 @@ def test_subscribe_to_messages_async_callbacks(
# Publish some messages.
futures = [
publisher.publish(topic_path, b"Wooooo! The claaaaaw!", num=str(index))
for index in six.moves.range(2)
for index in range(2)
]

# Make sure the publish completes.
Expand All @@ -200,7 +199,7 @@ def test_subscribe_to_messages_async_callbacks(

# Actually open the subscription and hold it open for a few seconds.
future = subscriber.subscribe(subscription_path, callback)
for second in six.moves.range(5):
for second in range(5):
time.sleep(4)

# The callback should have fired at least two times, but it may
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/pubsub_v1/subscriber/test_dispatcher.py
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

import collections
import queue
import threading

from google.cloud.pubsub_v1.subscriber._protocol import dispatcher
Expand All @@ -22,7 +23,6 @@
from google.pubsub_v1 import types as gapic_types

import mock
from six.moves import queue
import pytest


Expand Down
2 changes: 1 addition & 1 deletion tests/unit/pubsub_v1/subscriber/test_helper_threads.py
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

import mock
from six.moves import queue
import queue

from google.cloud.pubsub_v1.subscriber._protocol import helper_threads

Expand Down
4 changes: 2 additions & 2 deletions tests/unit/pubsub_v1/subscriber/test_message.py
Expand Up @@ -13,16 +13,16 @@
# limitations under the License.

import datetime
import queue
import time

import mock
import pytz
from six.moves import queue
from google.protobuf import timestamp_pb2

from google.api_core import datetime_helpers
from google.cloud.pubsub_v1.subscriber import message
from google.cloud.pubsub_v1.subscriber._protocol import requests
from google.protobuf import timestamp_pb2
from google.pubsub_v1 import types as gapic_types


Expand Down
2 changes: 1 addition & 1 deletion tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from six.moves import queue
import queue

from google.cloud.pubsub_v1.subscriber import message
from google.cloud.pubsub_v1.subscriber._protocol import messages_on_hold
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/pubsub_v1/subscriber/test_scheduler.py
Expand Up @@ -13,10 +13,10 @@
# limitations under the License.

import concurrent.futures
import queue
import threading

import mock
from six.moves import queue

from google.cloud.pubsub_v1.subscriber import scheduler

Expand Down

0 comments on commit 5c036e4

Please sign in to comment.