Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Result window too large #1507

Open
1 task
walidsa3d opened this issue May 18, 2017 · 19 comments
Open
1 task

Result window too large #1507

walidsa3d opened this issue May 18, 2017 · 19 comments

Comments

@walidsa3d
Copy link

  • [x ] Tested with the latest Haystack release
  • Tested with the current Haystack master branch

When I run the update command with remove option:

python manage.py update_index -b 10000 --remove

I get this exception:

Failed to query Elasticsearch using ':': TransportError(500, u'search_phase_execution_exception', u'Result window is too large, from + size must be less than or equal to: [10000] but was [20000]. See the scroll api for a more efficient way to request large data sets. This limit can be set by changing the [index.max_result_window] index level parameter.')

BUT the batch size is only 10000, where is the problem?

Configuration

  • Operating system version: ubuntu 16.04
  • Search engine version: 2.4.1
  • Python version: 2.7
  • Django version: 1.10.5
  • Haystack version: 2.6.0
@acdha
Copy link
Contributor

acdha commented May 18, 2017

Does this happen with a smaller block size?

@walidsa3d
Copy link
Author

I set index.max_result_window to 50000 and used a smaller batch (6000), I got this error.

Failed to query Elasticsearch using ':': TransportError(500, u'search_phase_execution_exception', u'Result window is too large, from + size must be less than or equal to: [50000] but was [56000]. See the scroll api for a more efficient way to request large data sets. This limit can be set by changing the [index.max_result_window] index level parameter.')

@Ravenons
Copy link

https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-from-size.html

Note that from + size can not be more than the index.max_result_window index setting which defaults to 10,000. See the Scroll API for more efficient ways to do deep scrolling.

I'm still trying to find a solution for this... So pagination is effectively performed without additional hacks.

@notanumber
Copy link
Contributor

I just configured a brand new server with Elasticsearch 2.4.5 and I'm seeing this intermittently when users search. Sometimes the results are returned, sometimes they get this error, even with the same search terms. This is the exact same code base, except the old server was running Elasticsearch 1.4.0.

Anything I can do to help track down the source?

@MaZZly
Copy link

MaZZly commented Feb 17, 2018

Been having this problem for a while and tried looking into it but I stopped after a while.

I suggest users facing this problem should just switch to e.g. the Solr backend instead that doesn't seem to have the problem. (Until this is resolved)

@marksweb
Copy link
Contributor

marksweb commented Mar 4, 2018

I've just started seeing similar to this error message after uploading just over 11000 records! How do you set the max_result_window on the index?

@siovene
Copy link

siovene commented Apr 23, 2018

Any update on this? I still have this problem too.

@afedosenko
Copy link

Any news?

@marksweb
Copy link
Contributor

marksweb commented Mar 7, 2019

Just to add a traceback to this as I'm just trying to manage some indexes;

root@32c6a37cff2c:/app# python manage.py update_index results -r -b 500 -v 2
Indexing 11734 Results
  indexed 1 - 500 of 11734 (worker PID: 2082).
  indexed 501 - 1000 of 11734 (worker PID: 2082).
  indexed 1001 - 1500 of 11734 (worker PID: 2082).
  indexed 1501 - 2000 of 11734 (worker PID: 2082).
  indexed 2001 - 2500 of 11734 (worker PID: 2082).
  indexed 2501 - 3000 of 11734 (worker PID: 2082).
  indexed 3001 - 3500 of 11734 (worker PID: 2082).
  indexed 3501 - 4000 of 11734 (worker PID: 2082).
  indexed 4001 - 4500 of 11734 (worker PID: 2082).
  indexed 4501 - 5000 of 11734 (worker PID: 2082).
  indexed 5001 - 5500 of 11734 (worker PID: 2082).
  indexed 5501 - 6000 of 11734 (worker PID: 2082).
  indexed 6001 - 6500 of 11734 (worker PID: 2082).
  indexed 6501 - 7000 of 11734 (worker PID: 2082).
  indexed 7001 - 7500 of 11734 (worker PID: 2082).
  indexed 7501 - 8000 of 11734 (worker PID: 2082).
  indexed 8001 - 8500 of 11734 (worker PID: 2082).
  indexed 8501 - 9000 of 11734 (worker PID: 2082).
  indexed 9001 - 9500 of 11734 (worker PID: 2082).
  indexed 9501 - 10000 of 11734 (worker PID: 2082).
  indexed 10001 - 10500 of 11734 (worker PID: 2082).
  indexed 10501 - 11000 of 11734 (worker PID: 2082).
  indexed 11001 - 11500 of 11734 (worker PID: 2082).
  indexed 11501 - 11734 of 11734 (worker PID: 2082).
Failed to query Elasticsearch using '*:*': TransportError(500, u'search_phase_execution_exception', u'Result window is too large, from + size must be less than or equal to: [10000] but was [10500]. See the scroll api for a more efficient way to request large data sets. This limit can be set by changing the [index.max_result_window] index level parameter.')
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/site-packages/haystack/backends/elasticsearch_backend.py", line 524, in search
    _source=True)
  File "/usr/local/lib/python2.7/site-packages/elasticsearch/client/utils.py", line 69, in _wrapped
    return func(*args, params=params, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/elasticsearch/client/__init__.py", line 539, in search
    doc_type, '_search'), params=params, body=body)
  File "/usr/local/lib/python2.7/site-packages/elasticsearch/transport.py", line 327, in perform_request
    status, headers, data = connection.perform_request(method, url, params, body, ignore=ignore, timeout=timeout)
  File "/usr/local/lib/python2.7/site-packages/elasticsearch/connection/http_requests.py", line 84, in perform_request
    self._raise_error(response.status_code, raw_data)
  File "/usr/local/lib/python2.7/site-packages/elasticsearch/connection/base.py", line 114, in _raise_error
    raise HTTP_EXCEPTIONS.get(status_code, TransportError)(status_code, error_message, additional_info)
TransportError: TransportError(500, u'search_phase_execution_exception', u'Result window is too large, from + size must be less than or equal to: [10000] but was [10500]. See the scroll api for a more efficient way to request large data sets. This limit can be set by changing the [index.max_result_window] index level parameter.')
ERROR - 15:32:43 07-03-2019: elasticsearch_backend - Failed to query Elasticsearch using '*:*': TransportError(500, u'search_phase_execution_exception', u'Result window is too large, from + size must be less than or equal to: [10000] but was [10500]. See the scroll api for a more efficient way to request large data sets. This limit can be set by changing the [index.max_result_window] index level parameter.')
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/site-packages/haystack/backends/elasticsearch_backend.py", line 524, in search
    _source=True)
  File "/usr/local/lib/python2.7/site-packages/elasticsearch/client/utils.py", line 69, in _wrapped
    return func(*args, params=params, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/elasticsearch/client/__init__.py", line 539, in search
    doc_type, '_search'), params=params, body=body)
  File "/usr/local/lib/python2.7/site-packages/elasticsearch/transport.py", line 327, in perform_request
    status, headers, data = connection.perform_request(method, url, params, body, ignore=ignore, timeout=timeout)
  File "/usr/local/lib/python2.7/site-packages/elasticsearch/connection/http_requests.py", line 84, in perform_request
    self._raise_error(response.status_code, raw_data)
  File "/usr/local/lib/python2.7/site-packages/elasticsearch/connection/base.py", line 114, in _raise_error
    raise HTTP_EXCEPTIONS.get(status_code, TransportError)(status_code, error_message, additional_info)
TransportError: TransportError(500, u'search_phase_execution_exception', u'Result window is too large, from + size must be less than or equal to: [10000] but was [10500]. See the scroll api for a more efficient way to request large data sets. This limit can be set by changing the [index.max_result_window] index level parameter.')

What does haystack do at the end of an update? The output suggests it's completed the requested update in the batches specified, but then something else must happen which fails.

Also we use AWS for the ES backend & they don't support changing the max_result_window on an index from what I've read.

@marksweb
Copy link
Contributor

Just to add to the info on this, I'm looking at an error logged by sentry and it looks like it's only actually looking at a small section of search results, but I may be mistaken.

So the error is the familiar;

TransportError(500, 'search_phase_execution_exception', 'Result window is too large, from + size must be less than or equal to: [10000] but was [17684]. See the scroll api for a more efficient way to request large data sets. This limit can be set by changing the [index.max_result_window] index level parameter.')

Which comes from this search;
https://github.com/django-haystack/django-haystack/blob/v2.6.1/haystack/backends/elasticsearch_backend.py#L521

search_kwargs for which are (taken more or less straight from sentry);

{
    'from': 17675, 
    'query': {
        'filtered': {
            'filter': {"terms":{"django_ct":["results.result"]}}, 
            'query': {
                "query_string": {
                        "analyze_wildcard":"True",
                        "auto_generate_phrase_queries":"True",
                        "default_field":"'text'",
                        "default_operator":"'AND'",
                        "fuzzy_max_expansions":"50",
                        "fuzzy_min_sim":"0.5",
                        "query":"'(NOT (position:([* TO \"0\"]) AND enabled:(Y) AND slug:(\"the\\-slug\")'"
                }
            }
        }
    }
}

The kwargs passed to search also make it look as though size should be included in the above search_kwargs.

{
    'end_offset': 17684, 
    'models': [
        <class 'results.models.result.Result'>
    ], 
    'result_class': <class 'haystack.models.SearchResult'>, 
    'sort_by': [
        [
            'name', 
            'asc'
        ]
    ], 
    'start_offset': 17675
}

@marksweb
Copy link
Contributor

marksweb commented Apr 23, 2020

So I've got a an index with 3.1 million objects from a model. Recently about 1,200 objects were removed directly from the database, so skipped signals to remove from the index :(

I'm running ES in AWS so there is no access to configuration variables so I can't change index.max_result_window.

Running update_index myapp.MyModel --remove doesn't seem to remove stale objects because of this error happening after the updating;

  indexed 3150001 - 3155000 of 3188192 (worker PID: 2610).
  indexed 3155001 - 3160000 of 3188192 (worker PID: 2610).
  indexed 3160001 - 3165000 of 3188192 (worker PID: 2610).
  indexed 3165001 - 3170000 of 3188192 (worker PID: 2610).
  indexed 3170001 - 3175000 of 3188192 (worker PID: 2610).
  indexed 3175001 - 3180000 of 3188192 (worker PID: 2610).
  indexed 3180001 - 3185000 of 3188192 (worker PID: 2610).
  indexed 3185001 - 3188192 of 3188192 (worker PID: 2610).
Failed to query Elasticsearch using '*:*': TransportError(500, u'search_phase_execution_exception', u'Result window is too large, from + size must be less than or equal to: [10000] but was [15000]. See the scroll api for a more efficient way to request large data sets. This limit can be set by changing the [index.max_result_window] index level parameter.')
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/site-packages/haystack/backends/elasticsearch_backend.py", line 524, in search
    _source=True)
  File "/usr/local/lib/python2.7/site-packages/elasticsearch/client/utils.py", line 69, in _wrapped
    return func(*args, params=params, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/elasticsearch/client/__init__.py", line 539, in search
    doc_type, '_search'), params=params, body=body)
  File "/usr/local/lib/python2.7/site-packages/elasticsearch/transport.py", line 327, in perform_request
    status, headers, data = connection.perform_request(method, url, params, body, ignore=ignore, timeout=timeout)
  File "/usr/local/lib/python2.7/site-packages/elasticsearch/connection/http_requests.py", line 84, in perform_request
    self._raise_error(response.status_code, raw_data)
  File "/usr/local/lib/python2.7/site-packages/elasticsearch/connection/base.py", line 114, in _raise_error
    raise HTTP_EXCEPTIONS.get(status_code, TransportError)(status_code, error_message, additional_info)
TransportError: TransportError(500, u'search_phase_execution_exception', u'Result window is too large, from + size must be less than or equal to: [10000] but was [15000]. See the scroll api for a more efficient way to request large data sets. This limit can be set by changing the [index.max_result_window] index level parameter.')
ERROR Failed to query Elasticsearch using '*:*': TransportError(500, u'search_phase_execution_exception', u'Result window is too large, from + size must be less than or equal to: [10000] but was [15000]. See the scroll api for a more efficient way to request large data sets. This limit can be set by changing the [index.max_result_window] index level parameter.')
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/site-packages/haystack/backends/elasticsearch_backend.py", line 524, in search
    _source=True)
  File "/usr/local/lib/python2.7/site-packages/elasticsearch/client/utils.py", line 69, in _wrapped
    return func(*args, params=params, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/elasticsearch/client/__init__.py", line 539, in search
    doc_type, '_search'), params=params, body=body)
  File "/usr/local/lib/python2.7/site-packages/elasticsearch/transport.py", line 327, in perform_request
    status, headers, data = connection.perform_request(method, url, params, body, ignore=ignore, timeout=timeout)
  File "/usr/local/lib/python2.7/site-packages/elasticsearch/connection/http_requests.py", line 84, in perform_request
    self._raise_error(response.status_code, raw_data)
  File "/usr/local/lib/python2.7/site-packages/elasticsearch/connection/base.py", line 114, in _raise_error
    raise HTTP_EXCEPTIONS.get(status_code, TransportError)(status_code, error_message, additional_info)
TransportError: TransportError(500, u'search_phase_execution_exception', u'Result window is too large, from + size must be less than or equal to: [10000] but was [15000]. See the scroll api for a more efficient way to request large data sets. This limit can be set by changing the [index.max_result_window] index level parameter.')
[INFO/MainProcess] process shutting down

Against an index with less than 10000 objects I've never had any trouble.

# python manage.py update_index news.LatestNews --remove -v 2
Indexing 405 News
  indexed 1 - 405 of 405 (worker PID: 2957).
[INFO/MainProcess] process shutting down
# 

This is also not related to the updating of objects, because if I try to target a very specific timeframe which finds no results to update, I still get the error seemingly at the removal stage of update_index;

# python manage.py update_index results.Results -v 2 --remove --start 2019-11-19T14:00 --end 2019-11-18T00:01
/usr/local/lib/python2.7/site-packages/django/db/models/fields/__init__.py:1430: RuntimeWarning: DateTimeField Results.modified received a naive datetime (2019-11-18 00:01:00) while time zone support is active.
  RuntimeWarning)
/usr/local/lib/python2.7/site-packages/django/db/models/fields/__init__.py:1430: RuntimeWarning: DateTimeField Results.modified received a naive datetime (2019-11-19 14:00:00) while time zone support is active.
  RuntimeWarning)
Indexing 0 Results
Failed to query Elasticsearch using '*:*': TransportError(500, u'search_phase_execution_exception', u'Result window is too large, from + size must be less than or equal to: [10000] but was [11000]. See the scroll api for a more efficient way to request large data sets. This limit can be set by changing the [index.max_result_window] index level parameter.')
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/site-packages/haystack/backends/elasticsearch_backend.py", line 524, in search
    _source=True)
  File "/usr/local/lib/python2.7/site-packages/elasticsearch/client/utils.py", line 69, in _wrapped
    return func(*args, params=params, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/elasticsearch/client/__init__.py", line 539, in search
    doc_type, '_search'), params=params, body=body)
  File "/usr/local/lib/python2.7/site-packages/elasticsearch/transport.py", line 327, in perform_request
    status, headers, data = connection.perform_request(method, url, params, body, ignore=ignore, timeout=timeout)
  File "/usr/local/lib/python2.7/site-packages/elasticsearch/connection/http_requests.py", line 84, in perform_request
    self._raise_error(response.status_code, raw_data)
  File "/usr/local/lib/python2.7/site-packages/elasticsearch/connection/base.py", line 114, in _raise_error
    raise HTTP_EXCEPTIONS.get(status_code, TransportError)(status_code, error_message, additional_info)
TransportError: TransportError(500, u'search_phase_execution_exception', u'Result window is too large, from + size must be less than or equal to: [10000] but was [11000]. See the scroll api for a more efficient way to request large data sets. This limit can be set by changing the [index.max_result_window] index level parameter.')
ERROR Failed to query Elasticsearch using '*:*': TransportError(500, u'search_phase_execution_exception', u'Result window is too large, from + size must be less than or equal to: [10000] but was [11000]. See the scroll api for a more efficient way to request large data sets. This limit can be set by changing the [index.max_result_window] index level parameter.')
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/site-packages/haystack/backends/elasticsearch_backend.py", line 524, in search
    _source=True)
  File "/usr/local/lib/python2.7/site-packages/elasticsearch/client/utils.py", line 69, in _wrapped
    return func(*args, params=params, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/elasticsearch/client/__init__.py", line 539, in search
    doc_type, '_search'), params=params, body=body)
  File "/usr/local/lib/python2.7/site-packages/elasticsearch/transport.py", line 327, in perform_request
    status, headers, data = connection.perform_request(method, url, params, body, ignore=ignore, timeout=timeout)
  File "/usr/local/lib/python2.7/site-packages/elasticsearch/connection/http_requests.py", line 84, in perform_request
    self._raise_error(response.status_code, raw_data)
  File "/usr/local/lib/python2.7/site-packages/elasticsearch/connection/base.py", line 114, in _raise_error
    raise HTTP_EXCEPTIONS.get(status_code, TransportError)(status_code, error_message, additional_info)
TransportError: TransportError(500, u'search_phase_execution_exception', u'Result window is too large, from + size must be less than or equal to: [10000] but was [11000]. See the scroll api for a more efficient way to request large data sets. This limit can be set by changing the [index.max_result_window] index level parameter.')
[INFO/MainProcess] process shutting down
# 

Is there an alternative way you can remove objects? Maybe some python you can run from a custom management command could get all IDs in the database & remove objects not matching those IDs?

@acdha
Copy link
Contributor

acdha commented Apr 23, 2020

I wrote something like that a long time ago for similar reasons, which might be useful. It has some work which would be needed to be generally useful but it might save you some time:

https://gist.github.com/acdha/08d0a760221184bd84cf#file-sync_index-py

@marksweb
Copy link
Contributor

marksweb commented Apr 24, 2020

Superb, thanks Chris (@acdha)! I was going to start from the remove section of update_index so this is a big help.
I'll have to go over it next week. When you say it needs some work to be useful, what does it do as it stands?

@acdha
Copy link
Contributor

acdha commented Apr 24, 2020

Basically it works for me on the project where I needed it to work but there are a couple of things which need to be generalized — basically the two imports from the wdl package and especially this line:

https://gist.github.com/acdha/08d0a760221184bd84cf#file-sync_index-py-L44

(I think that could just be backend_names = list(haystack_connections.connections_info.keys()))

The two iterators probably also could be replaced with something from more-itertools by now.

@ahmedbilal
Copy link

ahmedbilal commented Jan 13, 2022

@acdha Any update on this error? I still get it. Elasticsearch 2.4.3

elasticsearch>=2,<3
django-haystack=2.8.1

@afedosenko
Copy link

afedosenko commented Jan 21, 2022

A very long time ago I solved this problem by using the scroll API.
All you have to do is modify update_index command a bit. And implement a simple custom Elasticsearch2SearchBackend.

Need to add a few lines to the custom update_index command:

if isinstance(backend, CustomElasticsearchSearchBackend):
    for result in backend.scan_models(models={model}, source=["django_id"]):
        if smart_bytes(result["_source"]["django_id"]) not in database_pks:
            stale_records.add(result["_id"])
else:

Custom Elasticsearch2SearchBackend:

from elasticsearch.helpers import scan
from haystack.backends.elasticsearch2_backend import Elasticsearch2SearchBackend, Elasticsearch2SearchEngine


class CustomElasticsearchSearchBackend(Elasticsearch2SearchBackend):
    def scan_models(self, models, source=None):
        if not self.setup_complete:
            self.setup()

        search_kwargs = self.build_search_kwargs("*:*", models=models)
        if source:
            search_kwargs["_source"] = source

        for result in scan(self.conn, query=search_kwargs, index=self.index_name, doc_type="modelresult"):
            yield result


class CustomElasticsearchSearchEngine(Elasticsearch2SearchEngine):
    backend = CustomElasticsearchSearchBackend

In settings.py:

HAYSTACK_CONNECTIONS = {
    "default": {
        "ENGINE": "my_app.search_utils.CustomElasticsearchSearchEngine",
      ...
    }
}
Full custom my_app_update_index.py

# encoding: utf-8
from __future__ import absolute_import, division, print_function, unicode_literals

import multiprocessing
from django.db import close_old_connections
from django.utils.encoding import force_text, smart_bytes
from haystack import connections as haystack_connections
from haystack.exceptions import NotHandled
from haystack.management.commands.update_index import (
    Command as HaystackUpdateCommand,
    do_update,
    update_worker,
)
from haystack.query import SearchQuerySet
from haystack.utils.app_loading import haystack_get_models
from my_app.search_utils import CustomElasticsearchSearchBackend


class Command(HaystackUpdateCommand):
    def update_backend(self, label, using):
        backend = haystack_connections[using].get_backend()
        unified_index = haystack_connections[using].get_unified_index()

        for model in haystack_get_models(label):
            try:
                index = unified_index.get_index(model)
            except NotHandled:
                if self.verbosity >= 2:
                    self.stdout.write("Skipping '%s' - no index." % model)
                continue

            if self.workers > 0:
                # workers resetting connections leads to references to models / connections getting
                # stale and having their connection disconnected from under them. Resetting before
                # the loop continues and it accesses the ORM makes it better.
                close_old_connections()

            qs = index.build_queryset(using=using, start_date=self.start_date, end_date=self.end_date)

            total = qs.count()

            if self.verbosity >= 1:
                self.stdout.write("Indexing %d %s" % (total, force_text(model._meta.verbose_name_plural)))

            batch_size = self.batchsize or backend.batch_size

            if self.workers > 0:
                ghetto_queue = []

            max_pk = None
            for start in range(0, total, batch_size):
                end = min(start + batch_size, total)

                if self.workers == 0:
                    max_pk = do_update(
                        backend,
                        index,
                        qs,
                        start,
                        end,
                        total,
                        verbosity=self.verbosity,
                        commit=self.commit,
                        max_retries=self.max_retries,
                        last_max_pk=max_pk,
                    )
                else:
                    ghetto_queue.append(
                        (
                            model,
                            start,
                            end,
                            total,
                            using,
                            self.start_date,
                            self.end_date,
                            self.verbosity,
                            self.commit,
                            self.max_retries,
                        )
                    )

            if self.workers > 0:
                pool = multiprocessing.Pool(self.workers)

                successful_tasks = pool.map(update_worker, ghetto_queue)

                if len(ghetto_queue) != len(successful_tasks):
                    self.stderr.write(
                        "Queued %d tasks but only %d completed" % (len(ghetto_queue), len(successful_tasks))
                    )
                    for i in ghetto_queue:
                        if i not in successful_tasks:
                            self.stderr.write("Incomplete task: %s" % repr(i))

                pool.close()
                pool.join()

            if self.remove:
                if self.start_date or self.end_date or total <= 0:
                    # They're using a reduced set, which may not incorporate
                    # all pks. Rebuild the list with everything.
                    qs = index.index_queryset().values_list("pk", flat=True)
                    database_pks = set(smart_bytes(pk) for pk in qs)
                else:
                    database_pks = set(smart_bytes(pk) for pk in qs.values_list("pk", flat=True))

                # Since records may still be in the search index but not the local database
                # we'll use that to create batches for processing.
                # See https://github.com/django-haystack/django-haystack/issues/1186
                index_total = SearchQuerySet(using=backend.connection_alias).models(model).count()

                # Retrieve PKs from the index. Note that this cannot be a numeric range query because although
                # pks are normally numeric they can be non-numeric UUIDs or other custom values. To reduce
                # load on the search engine, we only retrieve the pk field, which will be checked against the
                # full list obtained from the database, and the id field, which will be used to delete the
                # record should it be found to be stale.
                index_pks = SearchQuerySet(using=backend.connection_alias).models(model)
                index_pks = index_pks.values_list("pk", "id")

                # We'll collect all of the record IDs which are no longer present in the database and delete
                # them after walking the entire index. This uses more memory than the incremental approach but
                # avoids needing the pagination logic below to account for both commit modes:
                stale_records = set()

                if isinstance(backend, CustomElasticsearchSearchBackend):
                    for result in backend.scan_models(models={model}, source=["django_id"]):
                        if smart_bytes(result["_source"]["django_id"]) not in database_pks:
                            stale_records.add(result["_id"])
                else:
                    for start in range(0, index_total, batch_size):
                        upper_bound = start + batch_size

                        # If the database pk is no longer present, queue the index key for removal:
                        for pk, rec_id in index_pks[start:upper_bound]:
                            if smart_bytes(pk) not in database_pks:
                                stale_records.add(rec_id)

                if stale_records:
                    if self.verbosity >= 1:
                        self.stdout.write("  removing %d stale records." % len(stale_records))

                    for rec_id in stale_records:
                        # Since the PK was not in the database list, we'll delete the record from the search
                        # index:
                        if self.verbosity >= 2:
                            self.stdout.write("  removing %s." % rec_id)

                        backend.remove(rec_id, commit=self.commit)

@siovene
Copy link

siovene commented Feb 15, 2024

@afedosenko I'm trying to implement your suggestion but there's no such method as scan_models in my version of haystack (3.1.1). Would you know how can I adapt your solution to that version? Thanks!

@afedosenko
Copy link

@siovene scan_models is custom method you should implement this method yourself as in my example.

from elasticsearch.helpers import scan
from haystack.backends.elasticsearch2_backend import Elasticsearch2SearchBackend, Elasticsearch2SearchEngine


class CustomElasticsearchSearchBackend(Elasticsearch2SearchBackend):
    def scan_models(self, models, source=None):
        if not self.setup_complete:
            self.setup()

        search_kwargs = self.build_search_kwargs("*:*", models=models)
        if source:
            search_kwargs["_source"] = source

        for result in scan(self.conn, query=search_kwargs, index=self.index_name, doc_type="modelresult"):
            yield result


class CustomElasticsearchSearchEngine(Elasticsearch2SearchEngine):
    backend = CustomElasticsearchSearchBackend

or

from elasticsearch.helpers import scan
from haystack.backends.elasticsearch7_backend import Elasticsearch7SearchBackend, Elasticsearch7SearchEngine


class CustomElasticsearchSearchBackend(Elasticsearch7SearchBackend):
    def scan_models(self, models, source=None):
        if not self.setup_complete:
            self.setup()

        search_kwargs = self.build_search_kwargs("*:*", models=models)
        if source:
            search_kwargs["_source"] = source

        for result in scan(self.conn, query=search_kwargs, index=self.index_name):
            yield result


class CustomElasticsearchSearchEngine(Elasticsearch7SearchEngine):
    backend = CustomElasticsearchSearchBackend

@siovene
Copy link

siovene commented Feb 15, 2024

@afedosenko ok, thanks for the response! I see what you mean now, but in my case the error message "Result window too large" is happening even tho I'm not doing update_index but I'm indexing things one at a time as needed (I use HAYSTACK_SIGNAL_PROCESSOR = 'celery_haystack.signals.CelerySignalProcessor'.

I get:

{'type': 'query_phase_execution_exception', 'reason': 'Result window is too large, from + size must be less than or equal to: [10000] but was [610038]. See the scroll api for a more efficient way to request large data sets. This limit can be set by changing the [index.max_result_window] index level parameter.'}

I thought it was weird because my max_result_windowis 1000000.

I'm seeing this entry using Sentry to monitor errors, and it's happening on a page that queries the SearchIndex, not one that updates it.

Any theories?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

9 participants