Skip to content

Commit

Permalink
fix: Address queries not fully satisfying requested offset (#18)
Browse files Browse the repository at this point in the history
  • Loading branch information
crwilcox committed Apr 7, 2020
1 parent 96fd5b8 commit e7b5fc9
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 1 deletion.
16 changes: 15 additions & 1 deletion google/cloud/datastore/query.py
Expand Up @@ -498,7 +498,6 @@ def _process_query_results(self, response_pb):
:raises ValueError: If ``more_results`` is an unexpected value.
"""
self._skipped_results = response_pb.batch.skipped_results

if response_pb.batch.more_results == _NO_MORE_RESULTS:
self.next_page_token = None
else:
Expand Down Expand Up @@ -540,6 +539,21 @@ def _next_page(self):
response_pb = self.client._datastore_api.run_query(
self._query.project, partition_id, read_options, query=query_pb
)

while (
response_pb.batch.more_results == _NOT_FINISHED
and response_pb.batch.skipped_results < query_pb.offset
):
# We haven't finished processing. A likely reason is we haven't
# skipped all of the results yet. Don't return any results.
# Instead, rerun query, adjusting offsets. Datastore doesn't process
# more than 1000 skipped results in a query.
query_pb.start_cursor = response_pb.batch.skipped_cursor
query_pb.offset -= response_pb.batch.skipped_results
response_pb = self.client._datastore_api.run_query(
self._query.project, partition_id, read_options, query=query_pb
)

entity_pbs = self._process_query_results(response_pb)
return page_iterator.Page(self, entity_pbs, self.item_to_value)

Expand Down
57 changes: 57 additions & 0 deletions tests/system/test_system.py
Expand Up @@ -14,6 +14,7 @@

import datetime
import os
import string
import unittest

import requests
Expand Down Expand Up @@ -465,6 +466,62 @@ def test_query_distinct_on(self):
self.assertEqual(entities[1]["name"], "Arya")


class TestDatastoreQueryOffsets(TestDatastore):
TOTAL_OBJECTS = 2500
NAMESPACE = "LargeCharacterEntity"
KIND = "LargeCharacter"

@classmethod
def setUpClass(cls):
cls.CLIENT = clone_client(Config.CLIENT)
# Remove the namespace from the cloned client, since these
# query tests rely on the entities to be already stored
# cls.CLIENT.namespace = cls.NAMESPACE
cls.CLIENT.namespace = None

# Populating the datastore if necessary.
populate_datastore.add_large_character_entities(client=cls.CLIENT)

@classmethod
def tearDownClass(cls):
# In the emulator, destroy the query entities.
if os.getenv(GCD_DATASET) is not None:
# Use the client for this test instead of the global.
clear_datastore.remove_all_entities(client=cls.CLIENT)

def _base_query(self):
# Use the client for this test instead of the global.
return self.CLIENT.query(kind=self.KIND, namespace=self.NAMESPACE)

def _verify(self, limit, offset, expected):
# Query used for all tests
page_query = self._base_query()
page_query.add_filter("family", "=", "Stark")
page_query.add_filter("alive", "=", False)

iterator = page_query.fetch(limit=limit, offset=offset)
entities = [e for e in iterator]
self.assertEqual(len(entities), expected)

def test_query_in_bounds_offsets(self):
# Verify that with no offset there are the correct # of results
self._verify(limit=None, offset=None, expected=self.TOTAL_OBJECTS)

# Verify that with no limit there are results (offset provided)")
self._verify(limit=None, offset=900, expected=self.TOTAL_OBJECTS - 900)

# Offset beyond items larger Verify 200 items found")
self._verify(limit=200, offset=1100, expected=200)

def test_query_partially_out_of_bounds_offsets(self):
# Offset within range, expect 50 despite larger limit")
self._verify(limit=100, offset=self.TOTAL_OBJECTS - 50, expected=50)

def test_query_out_of_bounds_offsets(self):
# Offset beyond items larger Verify no items found")
self._verify(limit=200, offset=self.TOTAL_OBJECTS + 1000, expected=0)


class TestDatastoreTransaction(TestDatastore):
def test_transaction_via_with_statement(self):
entity = datastore.Entity(key=Config.CLIENT.key("Company", "Google"))
Expand Down
55 changes: 55 additions & 0 deletions tests/system/utils/populate_datastore.py
Expand Up @@ -18,6 +18,7 @@
from __future__ import print_function

import os
import string
import sys
import time
import uuid
Expand Down Expand Up @@ -62,6 +63,60 @@ def print_func(message):
print(message)


def add_large_character_entities(client=None):
TOTAL_OBJECTS = 2500
NAMESPACE = "LargeCharacterEntity"
KIND = "LargeCharacter"
MAX_STRING = (string.ascii_lowercase * 58)[:1500]

client.namespace = NAMESPACE

# Query used for all tests
page_query = client.query(kind=KIND, namespace=NAMESPACE)

def put_objects(count):
remaining = count
current = 0

# Can only do 500 operations in a transaction with an overall
# size limit.
ENTITIES_TO_BATCH = 25
while current < count:
start = current
end = min(current + ENTITIES_TO_BATCH, count)
with client.transaction() as xact:
# The name/ID for the new entity
for i in range(start, end):
name = "character{0:05d}".format(i)
# The Cloud Datastore key for the new entity
task_key = client.key(KIND, name)

# Prepares the new entity
task = datastore.Entity(key=task_key)
task["name"] = "{0:05d}".format(i)
task["family"] = "Stark"
task["alive"] = False

for i in string.ascii_lowercase:
task["space-{}".format(i)] = MAX_STRING

# Saves the entity
xact.put(task)
current += ENTITIES_TO_BATCH

# Ensure we have 1500 entities for tests. If not, clean up type and add
# new entities equal to TOTAL_OBJECTS
all_entities = [e for e in page_query.fetch()]
if len(all_entities) != TOTAL_OBJECTS:
# Cleanup Collection if not an exact match
while all_entities:
entities = all_entities[:500]
all_entities = all_entities[500:]
client.delete_multi([e.key for e in entities])
# Put objects
put_objects(TOTAL_OBJECTS)


def add_characters(client=None):
if client is None:
# Get a client that uses the test dataset.
Expand Down

0 comments on commit e7b5fc9

Please sign in to comment.