From 9ef995dd511d6dec6b469b36ceb9e69029b7b4b7 Mon Sep 17 00:00:00 2001 From: Evan Palmer Date: Mon, 14 Dec 2020 12:16:01 -0500 Subject: [PATCH] addressing comments --- .gitignore | 2 +- google/cloud/pubsublite/testing/test_utils.py | 16 +++++++++++++++- .../wire/partition_count_watcher_impl_test.py | 16 +++------------- .../partition_count_watching_publisher_test.py | 15 ++++----------- 4 files changed, 23 insertions(+), 26 deletions(-) diff --git a/.gitignore b/.gitignore index 71f99fe7..efad5203 100644 --- a/.gitignore +++ b/.gitignore @@ -19,7 +19,6 @@ develop-eggs .installed.cfg lib lib64 -venv __pycache__ # Installer logs @@ -51,6 +50,7 @@ docs.metadata # Virtual environment env/ +venv/ coverage.xml sponge_log.xml diff --git a/google/cloud/pubsublite/testing/test_utils.py b/google/cloud/pubsublite/testing/test_utils.py index f5077203..7655fcb8 100644 --- a/google/cloud/pubsublite/testing/test_utils.py +++ b/google/cloud/pubsublite/testing/test_utils.py @@ -13,7 +13,8 @@ # limitations under the License. import asyncio -from typing import List, Union, Any, TypeVar, Generic, Optional +import threading +from typing import List, Union, Any, TypeVar, Generic, Optional, Callable from asynctest import CoroutineMock @@ -62,3 +63,16 @@ def wire_queues(mock: CoroutineMock) -> QueuePair: class Box(Generic[T]): val: Optional[T] + + +def run_on_thread(func: Callable[[], T]) -> T: + box = Box() + + def set_box(): + box.val = func() + + # Initialize watcher on another thread with a different event loop. + thread = threading.Thread(target=set_box) + thread.start() + thread.join() + return box.val diff --git a/tests/unit/pubsublite/internal/wire/partition_count_watcher_impl_test.py b/tests/unit/pubsublite/internal/wire/partition_count_watcher_impl_test.py index ee72e848..a01ce1ee 100644 --- a/tests/unit/pubsublite/internal/wire/partition_count_watcher_impl_test.py +++ b/tests/unit/pubsublite/internal/wire/partition_count_watcher_impl_test.py @@ -22,7 +22,7 @@ PartitionCountWatcherImpl, ) from google.cloud.pubsublite.internal.wire.publisher import Publisher -from google.cloud.pubsublite.testing.test_utils import Box +from google.cloud.pubsublite.testing.test_utils import Box, run_on_thread from google.cloud.pubsublite.types import Partition, TopicPath, CloudZone, CloudRegion from google.api_core.exceptions import GoogleAPICallError @@ -36,8 +36,7 @@ def mock_publishers(): @pytest.fixture() def topic(): - zone = CloudZone(region=CloudRegion("a"), zone_id="a") - return TopicPath(project_number=1, location=zone, name="c") + return TopicPath.parse("projects/1/locations/us-central1-a/topics/topic") @pytest.fixture() @@ -48,16 +47,7 @@ def mock_admin(): @pytest.fixture() def watcher(mock_admin, topic): - box = Box() - - def set_box(): - box.val = PartitionCountWatcherImpl(mock_admin, topic, 0.001) - - # Initialize watcher on another thread with a different event loop. - thread = threading.Thread(target=set_box) - thread.start() - thread.join() - return box.val + return run_on_thread(lambda: PartitionCountWatcherImpl(mock_admin, topic, 0.001)) async def test_init(watcher, mock_admin, topic): diff --git a/tests/unit/pubsublite/internal/wire/partition_count_watching_publisher_test.py b/tests/unit/pubsublite/internal/wire/partition_count_watching_publisher_test.py index 7c840bd1..79f5a07d 100644 --- a/tests/unit/pubsublite/internal/wire/partition_count_watching_publisher_test.py +++ b/tests/unit/pubsublite/internal/wire/partition_count_watching_publisher_test.py @@ -23,7 +23,7 @@ ) from google.cloud.pubsublite.internal.wire.publisher import Publisher from google.cloud.pubsublite.internal.wire.routing_policy import RoutingPolicy -from google.cloud.pubsublite.testing.test_utils import Box, wire_queues +from google.cloud.pubsublite.testing.test_utils import Box, wire_queues, run_on_thread from google.cloud.pubsublite.types import Partition from google.cloud.pubsublite_v1 import PubSubMessage from google.api_core.exceptions import GoogleAPICallError @@ -49,18 +49,11 @@ def mock_watcher(): @pytest.fixture() def publisher(mock_watcher, mock_publishers, mock_policies): - box = Box() - - def set_box(): - box.val = PartitionCountWatchingPublisher( + return run_on_thread( + lambda: PartitionCountWatchingPublisher( mock_watcher, lambda p: mock_publishers[p], lambda c: mock_policies[c] ) - - # Initialize publisher on another thread with a different event loop. - thread = threading.Thread(target=set_box) - thread.start() - thread.join() - return box.val + ) async def test_init(mock_watcher, publisher):