diff --git a/google/cloud/datastore/query.py b/google/cloud/datastore/query.py index 4b3daa66..78a153cb 100644 --- a/google/cloud/datastore/query.py +++ b/google/cloud/datastore/query.py @@ -498,7 +498,6 @@ def _process_query_results(self, response_pb): :raises ValueError: If ``more_results`` is an unexpected value. """ self._skipped_results = response_pb.batch.skipped_results - if response_pb.batch.more_results == _NO_MORE_RESULTS: self.next_page_token = None else: @@ -540,6 +539,21 @@ def _next_page(self): response_pb = self.client._datastore_api.run_query( self._query.project, partition_id, read_options, query=query_pb ) + + while ( + response_pb.batch.more_results == _NOT_FINISHED + and response_pb.batch.skipped_results < query_pb.offset + ): + # We haven't finished processing. A likely reason is we haven't + # skipped all of the results yet. Don't return any results. + # Instead, rerun query, adjusting offsets. Datastore doesn't process + # more than 1000 skipped results in a query. + query_pb.start_cursor = response_pb.batch.skipped_cursor + query_pb.offset -= response_pb.batch.skipped_results + response_pb = self.client._datastore_api.run_query( + self._query.project, partition_id, read_options, query=query_pb + ) + entity_pbs = self._process_query_results(response_pb) return page_iterator.Page(self, entity_pbs, self.item_to_value) diff --git a/tests/system/test_system.py b/tests/system/test_system.py index 482b0b80..ef0de3a2 100644 --- a/tests/system/test_system.py +++ b/tests/system/test_system.py @@ -14,6 +14,7 @@ import datetime import os +import string import unittest import requests @@ -465,6 +466,62 @@ def test_query_distinct_on(self): self.assertEqual(entities[1]["name"], "Arya") +class TestDatastoreQueryOffsets(TestDatastore): + TOTAL_OBJECTS = 2500 + NAMESPACE = "LargeCharacterEntity" + KIND = "LargeCharacter" + + @classmethod + def setUpClass(cls): + cls.CLIENT = clone_client(Config.CLIENT) + # Remove the namespace from the cloned client, since these + # query tests rely on the entities to be already stored + # cls.CLIENT.namespace = cls.NAMESPACE + cls.CLIENT.namespace = None + + # Populating the datastore if necessary. + populate_datastore.add_large_character_entities(client=cls.CLIENT) + + @classmethod + def tearDownClass(cls): + # In the emulator, destroy the query entities. + if os.getenv(GCD_DATASET) is not None: + # Use the client for this test instead of the global. + clear_datastore.remove_all_entities(client=cls.CLIENT) + + def _base_query(self): + # Use the client for this test instead of the global. + return self.CLIENT.query(kind=self.KIND, namespace=self.NAMESPACE) + + def _verify(self, limit, offset, expected): + # Query used for all tests + page_query = self._base_query() + page_query.add_filter("family", "=", "Stark") + page_query.add_filter("alive", "=", False) + + iterator = page_query.fetch(limit=limit, offset=offset) + entities = [e for e in iterator] + self.assertEqual(len(entities), expected) + + def test_query_in_bounds_offsets(self): + # Verify that with no offset there are the correct # of results + self._verify(limit=None, offset=None, expected=self.TOTAL_OBJECTS) + + # Verify that with no limit there are results (offset provided)") + self._verify(limit=None, offset=900, expected=self.TOTAL_OBJECTS - 900) + + # Offset beyond items larger Verify 200 items found") + self._verify(limit=200, offset=1100, expected=200) + + def test_query_partially_out_of_bounds_offsets(self): + # Offset within range, expect 50 despite larger limit") + self._verify(limit=100, offset=self.TOTAL_OBJECTS - 50, expected=50) + + def test_query_out_of_bounds_offsets(self): + # Offset beyond items larger Verify no items found") + self._verify(limit=200, offset=self.TOTAL_OBJECTS + 1000, expected=0) + + class TestDatastoreTransaction(TestDatastore): def test_transaction_via_with_statement(self): entity = datastore.Entity(key=Config.CLIENT.key("Company", "Google")) diff --git a/tests/system/utils/populate_datastore.py b/tests/system/utils/populate_datastore.py index 2c266a8a..e2baa5b3 100644 --- a/tests/system/utils/populate_datastore.py +++ b/tests/system/utils/populate_datastore.py @@ -18,6 +18,7 @@ from __future__ import print_function import os +import string import sys import time import uuid @@ -62,6 +63,60 @@ def print_func(message): print(message) +def add_large_character_entities(client=None): + TOTAL_OBJECTS = 2500 + NAMESPACE = "LargeCharacterEntity" + KIND = "LargeCharacter" + MAX_STRING = (string.ascii_lowercase * 58)[:1500] + + client.namespace = NAMESPACE + + # Query used for all tests + page_query = client.query(kind=KIND, namespace=NAMESPACE) + + def put_objects(count): + remaining = count + current = 0 + + # Can only do 500 operations in a transaction with an overall + # size limit. + ENTITIES_TO_BATCH = 25 + while current < count: + start = current + end = min(current + ENTITIES_TO_BATCH, count) + with client.transaction() as xact: + # The name/ID for the new entity + for i in range(start, end): + name = "character{0:05d}".format(i) + # The Cloud Datastore key for the new entity + task_key = client.key(KIND, name) + + # Prepares the new entity + task = datastore.Entity(key=task_key) + task["name"] = "{0:05d}".format(i) + task["family"] = "Stark" + task["alive"] = False + + for i in string.ascii_lowercase: + task["space-{}".format(i)] = MAX_STRING + + # Saves the entity + xact.put(task) + current += ENTITIES_TO_BATCH + + # Ensure we have 1500 entities for tests. If not, clean up type and add + # new entities equal to TOTAL_OBJECTS + all_entities = [e for e in page_query.fetch()] + if len(all_entities) != TOTAL_OBJECTS: + # Cleanup Collection if not an exact match + while all_entities: + entities = all_entities[:500] + all_entities = all_entities[500:] + client.delete_multi([e.key for e in entities]) + # Put objects + put_objects(TOTAL_OBJECTS) + + def add_characters(client=None): if client is None: # Get a client that uses the test dataset.