Skip to content

Commit

Permalink
addressing comments
Browse files Browse the repository at this point in the history
  • Loading branch information
palmere-google committed Dec 14, 2020
1 parent 1e8e5a5 commit 9ef995d
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 26 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Expand Up @@ -19,7 +19,6 @@ develop-eggs
.installed.cfg
lib
lib64
venv
__pycache__

# Installer logs
Expand Down Expand Up @@ -51,6 +50,7 @@ docs.metadata

# Virtual environment
env/
venv/
coverage.xml
sponge_log.xml

Expand Down
16 changes: 15 additions & 1 deletion google/cloud/pubsublite/testing/test_utils.py
Expand Up @@ -13,7 +13,8 @@
# limitations under the License.

import asyncio
from typing import List, Union, Any, TypeVar, Generic, Optional
import threading
from typing import List, Union, Any, TypeVar, Generic, Optional, Callable

from asynctest import CoroutineMock

Expand Down Expand Up @@ -62,3 +63,16 @@ def wire_queues(mock: CoroutineMock) -> QueuePair:

class Box(Generic[T]):
val: Optional[T]


def run_on_thread(func: Callable[[], T]) -> T:
box = Box()

def set_box():
box.val = func()

# Initialize watcher on another thread with a different event loop.
thread = threading.Thread(target=set_box)
thread.start()
thread.join()
return box.val
Expand Up @@ -22,7 +22,7 @@
PartitionCountWatcherImpl,
)
from google.cloud.pubsublite.internal.wire.publisher import Publisher
from google.cloud.pubsublite.testing.test_utils import Box
from google.cloud.pubsublite.testing.test_utils import Box, run_on_thread
from google.cloud.pubsublite.types import Partition, TopicPath, CloudZone, CloudRegion
from google.api_core.exceptions import GoogleAPICallError

Expand All @@ -36,8 +36,7 @@ def mock_publishers():

@pytest.fixture()
def topic():
zone = CloudZone(region=CloudRegion("a"), zone_id="a")
return TopicPath(project_number=1, location=zone, name="c")
return TopicPath.parse("projects/1/locations/us-central1-a/topics/topic")


@pytest.fixture()
Expand All @@ -48,16 +47,7 @@ def mock_admin():

@pytest.fixture()
def watcher(mock_admin, topic):
box = Box()

def set_box():
box.val = PartitionCountWatcherImpl(mock_admin, topic, 0.001)

# Initialize watcher on another thread with a different event loop.
thread = threading.Thread(target=set_box)
thread.start()
thread.join()
return box.val
return run_on_thread(lambda: PartitionCountWatcherImpl(mock_admin, topic, 0.001))


async def test_init(watcher, mock_admin, topic):
Expand Down
Expand Up @@ -23,7 +23,7 @@
)
from google.cloud.pubsublite.internal.wire.publisher import Publisher
from google.cloud.pubsublite.internal.wire.routing_policy import RoutingPolicy
from google.cloud.pubsublite.testing.test_utils import Box, wire_queues
from google.cloud.pubsublite.testing.test_utils import Box, wire_queues, run_on_thread
from google.cloud.pubsublite.types import Partition
from google.cloud.pubsublite_v1 import PubSubMessage
from google.api_core.exceptions import GoogleAPICallError
Expand All @@ -49,18 +49,11 @@ def mock_watcher():

@pytest.fixture()
def publisher(mock_watcher, mock_publishers, mock_policies):
box = Box()

def set_box():
box.val = PartitionCountWatchingPublisher(
return run_on_thread(
lambda: PartitionCountWatchingPublisher(
mock_watcher, lambda p: mock_publishers[p], lambda c: mock_policies[c]
)

# Initialize publisher on another thread with a different event loop.
thread = threading.Thread(target=set_box)
thread.start()
thread.join()
return box.val
)


async def test_init(mock_watcher, publisher):
Expand Down

0 comments on commit 9ef995d

Please sign in to comment.