Skip to content

Commit

Permalink
feat: remove Watch from async interface
Browse files Browse the repository at this point in the history
  • Loading branch information
rafilong committed Jul 28, 2020
1 parent 2fcd15c commit d11d572
Show file tree
Hide file tree
Showing 7 changed files with 0 additions and 313 deletions.
35 changes: 0 additions & 35 deletions google/cloud/firestore_v1/async_collection.py
Expand Up @@ -22,8 +22,6 @@
_item_to_document_ref,
)
from google.cloud.firestore_v1 import async_query
from google.cloud.firestore_v1.watch import Watch
from google.cloud.firestore_v1 import async_document


class AsyncCollectionReference(BaseCollectionReference):
Expand Down Expand Up @@ -162,36 +160,3 @@ async def stream(self, transaction=None):
query = async_query.AsyncQuery(self)
async for d in query.stream(transaction=transaction):
yield d

def on_snapshot(self, callback):
"""Monitor the documents in this collection.
This starts a watch on this collection using a background thread. The
provided callback is run on the snapshot of the documents.
Args:
callback (Callable[[:class:`~google.cloud.firestore.collection.CollectionSnapshot`], NoneType]):
a callback to run when a change occurs.
Example:
from google.cloud import firestore_v1
db = firestore_v1.Client()
collection_ref = db.collection(u'users')
def on_snapshot(collection_snapshot, changes, read_time):
for doc in collection_snapshot.documents:
print(u'{} => {}'.format(doc.id, doc.to_dict()))
# Watch this collection
collection_watch = collection_ref.on_snapshot(on_snapshot)
# Terminate this watch
collection_watch.unsubscribe()
"""
return Watch.for_query(
self._query(),
callback,
async_document.DocumentSnapshot,
async_document.AsyncDocumentReference,
)
37 changes: 0 additions & 37 deletions google/cloud/firestore_v1/async_document.py
Expand Up @@ -23,7 +23,6 @@
from google.api_core import exceptions
from google.cloud.firestore_v1 import _helpers
from google.cloud.firestore_v1.types import common
from google.cloud.firestore_v1.watch import Watch


class AsyncDocumentReference(BaseDocumentReference):
Expand Down Expand Up @@ -385,39 +384,3 @@ async def collections(self, page_size=None):
# iterator.document = self
# iterator.item_to_value = _item_to_collection_ref
# return iterator

def on_snapshot(self, callback):
"""Watch this document.
This starts a watch on this document using a background thread. The
provided callback is run on the snapshot.
Args:
callback(Callable[[:class:`~google.cloud.firestore.document.DocumentSnapshot`], NoneType]):
a callback to run when a change occurs
Example:
.. code-block:: python
from google.cloud import firestore_v1
db = firestore_v1.Client()
collection_ref = db.collection(u'users')
def on_snapshot(document_snapshot, changes, read_time):
doc = document_snapshot
print(u'{} => {}'.format(doc.id, doc.to_dict()))
doc_ref = db.collection(u'users').document(
u'alovelace' + unique_resource_id())
# Watch this document
doc_watch = doc_ref.on_snapshot(on_snapshot)
# Terminate this watch
doc_watch.unsubscribe()
"""
return Watch.for_document(
self, callback, DocumentSnapshot, AsyncDocumentReference
)
38 changes: 0 additions & 38 deletions google/cloud/firestore_v1/async_query.py
Expand Up @@ -27,8 +27,6 @@
)

from google.cloud.firestore_v1 import _helpers
from google.cloud.firestore_v1 import async_document
from google.cloud.firestore_v1.watch import Watch


class AsyncQuery(BaseQuery):
Expand Down Expand Up @@ -169,39 +167,3 @@ async def stream(self, transaction=None):
)
if snapshot is not None:
yield snapshot

def on_snapshot(self, callback):
"""Monitor the documents in this collection that match this query.
This starts a watch on this query using a background thread. The
provided callback is run on the snapshot of the documents.
Args:
callback(Callable[[:class:`~google.cloud.firestore.query.QuerySnapshot`], NoneType]):
a callback to run when a change occurs.
Example:
.. code-block:: python
from google.cloud import firestore_v1
db = firestore_v1.Client()
query_ref = db.collection(u'users').where("user", "==", u'Ada')
def on_snapshot(docs, changes, read_time):
for doc in docs:
print(u'{} => {}'.format(doc.id, doc.to_dict()))
# Watch this query
query_watch = query_ref.on_snapshot(on_snapshot)
# Terminate this watch
query_watch.unsubscribe()
"""
return Watch.for_query(
self,
callback,
async_document.DocumentSnapshot,
async_document.AsyncDocumentReference,
)
184 changes: 0 additions & 184 deletions tests/system/test_system_async.py
Expand Up @@ -21,7 +21,6 @@
import re

from google.oauth2 import service_account
import pytest

from google.api_core.exceptions import AlreadyExists
from google.api_core.exceptions import FailedPrecondition
Expand All @@ -32,8 +31,6 @@
from google.cloud import firestore_v1 as firestore
from test_utils.system import unique_resource_id

from time import sleep

FIRESTORE_CREDS = os.environ.get("FIRESTORE_APPLICATION_CREDENTIALS")
FIRESTORE_PROJECT = os.environ.get("GCLOUD_PROJECT")
RANDOM_ID_REGEX = re.compile("^[a-zA-Z0-9]{20}$")
Expand Down Expand Up @@ -1001,184 +998,3 @@ async def test_batch(client, cleanup):
assert snapshot2.update_time == write_result2.update_time

assert not (await document3.get()).exists


async def test_watch_document(client, cleanup):
db = client
collection_ref = db.collection(u"wd-users" + UNIQUE_RESOURCE_ID)
doc_ref = collection_ref.document(u"alovelace")

# Initial setting
await doc_ref.set({u"first": u"Jane", u"last": u"Doe", u"born": 1900})
cleanup(doc_ref.delete)

await asyncio.sleep(1)

# Setup listener
def on_snapshot(docs, changes, read_time):
on_snapshot.called_count += 1

on_snapshot.called_count = 0

doc_ref.on_snapshot(on_snapshot)

# Alter document
await doc_ref.set({u"first": u"Ada", u"last": u"Lovelace", u"born": 1815})

await asyncio.sleep(1)

for _ in range(10):
if on_snapshot.called_count > 0:
break
await asyncio.sleep(1)

if on_snapshot.called_count not in (1, 2):
raise AssertionError(
"Failed to get one or two document changes: count: "
+ str(on_snapshot.called_count)
)


async def test_watch_collection(client, cleanup):
db = client
collection_ref = db.collection(u"wc-users" + UNIQUE_RESOURCE_ID)
doc_ref = collection_ref.document(u"alovelace")

# Initial setting
await doc_ref.set({u"first": u"Jane", u"last": u"Doe", u"born": 1900})
cleanup(doc_ref.delete)

# Setup listener
def on_snapshot(docs, changes, read_time):
on_snapshot.called_count += 1
for doc in [doc for doc in docs if doc.id == doc_ref.id]:
on_snapshot.born = doc.get("born")

on_snapshot.called_count = 0
on_snapshot.born = 0

collection_ref.on_snapshot(on_snapshot)

# delay here so initial on_snapshot occurs and isn't combined with set
await asyncio.sleep(1)

await doc_ref.set({u"first": u"Ada", u"last": u"Lovelace", u"born": 1815})

for _ in range(10):
if on_snapshot.born == 1815:
break
await asyncio.sleep(1)

if on_snapshot.born != 1815:
raise AssertionError(
"Expected the last document update to update born: " + str(on_snapshot.born)
)


async def test_watch_query(client, cleanup):
db = client
collection_ref = db.collection(u"wq-users" + UNIQUE_RESOURCE_ID)
doc_ref = collection_ref.document(u"alovelace")
query_ref = collection_ref.where("first", "==", u"Ada")

# Initial setting
await doc_ref.set({u"first": u"Jane", u"last": u"Doe", u"born": 1900})
cleanup(doc_ref.delete)

await asyncio.sleep(1)

# Setup listener
async def on_snapshot(docs, changes, read_time):
on_snapshot.called_count += 1

# A snapshot should return the same thing as if a query ran now.
query_ran = await collection_ref.where("first", "==", u"Ada").stream()
assert len(docs) == len([i for i in query_ran])

on_snapshot.called_count = 0

query_ref.on_snapshot(on_snapshot)

# Alter document
await doc_ref.set({u"first": u"Ada", u"last": u"Lovelace", u"born": 1815})

for _ in range(10):
if on_snapshot.called_count == 1:
return
await asyncio.sleep(1)

if on_snapshot.called_count != 1:
raise AssertionError(
"Failed to get exactly one document change: count: "
+ str(on_snapshot.called_count)
)


async def test_watch_query_order(client, cleanup):
db = client
collection_ref = db.collection(u"users")
doc_ref1 = collection_ref.document(u"alovelace" + UNIQUE_RESOURCE_ID)
doc_ref2 = collection_ref.document(u"asecondlovelace" + UNIQUE_RESOURCE_ID)
doc_ref3 = collection_ref.document(u"athirdlovelace" + UNIQUE_RESOURCE_ID)
doc_ref4 = collection_ref.document(u"afourthlovelace" + UNIQUE_RESOURCE_ID)
doc_ref5 = collection_ref.document(u"afifthlovelace" + UNIQUE_RESOURCE_ID)

query_ref = collection_ref.where("first", "==", u"Ada").order_by("last")

# Setup listener
async def on_snapshot(docs, changes, read_time):
try:
if len(docs) != 5:
return
# A snapshot should return the same thing as if a query ran now.
query_ran = query_ref.stream()
query_ran_results = [i async for i in query_ran]
assert len(docs) == len(query_ran_results)

# compare the order things are returned
for snapshot, query in zip(docs, query_ran_results):
assert snapshot.get("last") == query.get(
"last"
), "expect the sort order to match, last"
assert snapshot.get("born") == query.get(
"born"
), "expect the sort order to match, born"
on_snapshot.called_count += 1
on_snapshot.last_doc_count = len(docs)
except Exception as e:
on_snapshot.failed = e

on_snapshot.called_count = 0
on_snapshot.last_doc_count = 0
on_snapshot.failed = None
query_ref.on_snapshot(on_snapshot)

await asyncio.sleep(1)

await doc_ref1.set({u"first": u"Ada", u"last": u"Lovelace", u"born": 1815})
cleanup(doc_ref1.delete)

await doc_ref2.set({u"first": u"Ada", u"last": u"SecondLovelace", u"born": 1815})
cleanup(doc_ref2.delete)

await doc_ref3.set({u"first": u"Ada", u"last": u"ThirdLovelace", u"born": 1815})
cleanup(doc_ref3.delete)

await doc_ref4.set({u"first": u"Ada", u"last": u"FourthLovelace", u"born": 1815})
cleanup(doc_ref4.delete)

await doc_ref5.set({u"first": u"Ada", u"last": u"lovelace", u"born": 1815})
cleanup(doc_ref5.delete)

for _ in range(10):
if on_snapshot.last_doc_count == 5:
break
await asyncio.sleep(1)

if on_snapshot.failed:
raise on_snapshot.failed

if on_snapshot.last_doc_count != 5:
raise AssertionError(
"5 docs expected in snapshot method " + str(on_snapshot.last_doc_count)
)
6 changes: 0 additions & 6 deletions tests/unit/v1/test_async_collection.py
Expand Up @@ -322,12 +322,6 @@ async def test_stream_with_transaction(self, query_class):
query_instance = query_class.return_value
query_instance.stream.assert_called_once_with(transaction=transaction)

@mock.patch("google.cloud.firestore_v1.async_collection.Watch", autospec=True)
def test_on_snapshot(self, watch):
collection = self._make_one("collection")
collection.on_snapshot(None)
watch.for_query.assert_called_once()


def _make_credentials():
import google.auth.credentials
Expand Down
7 changes: 0 additions & 7 deletions tests/unit/v1/test_async_document.py
Expand Up @@ -477,13 +477,6 @@ async def test_collections_wo_page_size(self):
async def test_collections_w_page_size(self):
await self._collections_helper(page_size=10)

@mock.patch("google.cloud.firestore_v1.async_document.Watch", autospec=True)
def test_on_snapshot(self, watch):
client = mock.Mock(_database_string="sprinklez", spec=["_database_string"])
document = self._make_one("yellow", "mellow", client=client)
document.on_snapshot(None)
watch.for_document.assert_called_once()


def _make_credentials():
import google.auth.credentials
Expand Down
6 changes: 0 additions & 6 deletions tests/unit/v1/test_async_query.py
Expand Up @@ -360,12 +360,6 @@ async def test_stream_w_collection_group(self):
metadata=client._rpc_metadata,
)

@mock.patch("google.cloud.firestore_v1.async_query.Watch", autospec=True)
def test_on_snapshot(self, watch):
query = self._make_one(mock.sentinel.parent)
query.on_snapshot(None)
watch.for_query.assert_called_once()


def _make_client(project="project-project"):
from google.cloud.firestore_v1.async_client import AsyncClient
Expand Down

0 comments on commit d11d572

Please sign in to comment.