Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Address queries not fully satisfying requested offset #18

Merged
merged 8 commits into from Apr 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 15 additions & 1 deletion google/cloud/datastore/query.py
Expand Up @@ -498,7 +498,6 @@ def _process_query_results(self, response_pb):
:raises ValueError: If ``more_results`` is an unexpected value.
"""
self._skipped_results = response_pb.batch.skipped_results

if response_pb.batch.more_results == _NO_MORE_RESULTS:
self.next_page_token = None
else:
Expand Down Expand Up @@ -540,6 +539,21 @@ def _next_page(self):
response_pb = self.client._datastore_api.run_query(
self._query.project, partition_id, read_options, query=query_pb
)

while (
response_pb.batch.more_results == _NOT_FINISHED
and response_pb.batch.skipped_results < query_pb.offset
):
# We haven't finished processing. A likely reason is we haven't
# skipped all of the results yet. Don't return any results.
# Instead, rerun query, adjusting offsets. Datastore doesn't process
# more than 1000 skipped results in a query.
query_pb.start_cursor = response_pb.batch.skipped_cursor
query_pb.offset -= response_pb.batch.skipped_results
response_pb = self.client._datastore_api.run_query(
self._query.project, partition_id, read_options, query=query_pb
)

entity_pbs = self._process_query_results(response_pb)
return page_iterator.Page(self, entity_pbs, self.item_to_value)

Expand Down
57 changes: 57 additions & 0 deletions tests/system/test_system.py
Expand Up @@ -14,6 +14,7 @@

import datetime
import os
import string
import unittest

import requests
Expand Down Expand Up @@ -465,6 +466,62 @@ def test_query_distinct_on(self):
self.assertEqual(entities[1]["name"], "Arya")


class TestDatastoreQueryOffsets(TestDatastore):
TOTAL_OBJECTS = 2500
NAMESPACE = "LargeCharacterEntity"
KIND = "LargeCharacter"

@classmethod
def setUpClass(cls):
cls.CLIENT = clone_client(Config.CLIENT)
# Remove the namespace from the cloned client, since these
# query tests rely on the entities to be already stored
# cls.CLIENT.namespace = cls.NAMESPACE
cls.CLIENT.namespace = None

# Populating the datastore if necessary.
populate_datastore.add_large_character_entities(client=cls.CLIENT)

@classmethod
def tearDownClass(cls):
# In the emulator, destroy the query entities.
if os.getenv(GCD_DATASET) is not None:
# Use the client for this test instead of the global.
clear_datastore.remove_all_entities(client=cls.CLIENT)

def _base_query(self):
# Use the client for this test instead of the global.
return self.CLIENT.query(kind=self.KIND, namespace=self.NAMESPACE)

def _verify(self, limit, offset, expected):
# Query used for all tests
page_query = self._base_query()
page_query.add_filter("family", "=", "Stark")
page_query.add_filter("alive", "=", False)

iterator = page_query.fetch(limit=limit, offset=offset)
entities = [e for e in iterator]
self.assertEqual(len(entities), expected)

def test_query_in_bounds_offsets(self):
# Verify that with no offset there are the correct # of results
self._verify(limit=None, offset=None, expected=self.TOTAL_OBJECTS)

# Verify that with no limit there are results (offset provided)")
self._verify(limit=None, offset=900, expected=self.TOTAL_OBJECTS - 900)

# Offset beyond items larger Verify 200 items found")
self._verify(limit=200, offset=1100, expected=200)

def test_query_partially_out_of_bounds_offsets(self):
# Offset within range, expect 50 despite larger limit")
self._verify(limit=100, offset=self.TOTAL_OBJECTS - 50, expected=50)

def test_query_out_of_bounds_offsets(self):
# Offset beyond items larger Verify no items found")
self._verify(limit=200, offset=self.TOTAL_OBJECTS + 1000, expected=0)


class TestDatastoreTransaction(TestDatastore):
def test_transaction_via_with_statement(self):
entity = datastore.Entity(key=Config.CLIENT.key("Company", "Google"))
Expand Down
55 changes: 55 additions & 0 deletions tests/system/utils/populate_datastore.py
Expand Up @@ -18,6 +18,7 @@
from __future__ import print_function

import os
import string
import sys
import time
import uuid
Expand Down Expand Up @@ -62,6 +63,60 @@ def print_func(message):
print(message)


def add_large_character_entities(client=None):
TOTAL_OBJECTS = 2500
NAMESPACE = "LargeCharacterEntity"
KIND = "LargeCharacter"
MAX_STRING = (string.ascii_lowercase * 58)[:1500]

client.namespace = NAMESPACE

# Query used for all tests
page_query = client.query(kind=KIND, namespace=NAMESPACE)

def put_objects(count):
remaining = count
current = 0

# Can only do 500 operations in a transaction with an overall
# size limit.
ENTITIES_TO_BATCH = 25
while current < count:
start = current
end = min(current + ENTITIES_TO_BATCH, count)
with client.transaction() as xact:
# The name/ID for the new entity
for i in range(start, end):
name = "character{0:05d}".format(i)
# The Cloud Datastore key for the new entity
task_key = client.key(KIND, name)

# Prepares the new entity
task = datastore.Entity(key=task_key)
task["name"] = "{0:05d}".format(i)
task["family"] = "Stark"
task["alive"] = False

for i in string.ascii_lowercase:
task["space-{}".format(i)] = MAX_STRING

# Saves the entity
xact.put(task)
current += ENTITIES_TO_BATCH

# Ensure we have 1500 entities for tests. If not, clean up type and add
# new entities equal to TOTAL_OBJECTS
all_entities = [e for e in page_query.fetch()]
if len(all_entities) != TOTAL_OBJECTS:
# Cleanup Collection if not an exact match
while all_entities:
entities = all_entities[:500]
all_entities = all_entities[500:]
client.delete_multi([e.key for e in entities])
# Put objects
put_objects(TOTAL_OBJECTS)


def add_characters(client=None):
if client is None:
# Get a client that uses the test dataset.
Expand Down