Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Indexing with hot_swap or build always fails with Malformed content, found extra data after parsing: START_OBJECT #113

Open
cdvx opened this issue May 18, 2023 · 8 comments

Comments

@cdvx
Copy link

cdvx commented May 18, 2023

When creating a new index and loading data, the index is created fine but when uploading the data to elasticsearch, it returns

%Elasticsearch.Exception{
     status: 400,
     line: nil,
     col: nil,
     message: "failed to parse",
     type: "mapper_parsing_exception",
     query: nil,
     raw: %{
       "error" => %{
         "caused_by" => %{
           "reason" => "Malformed content, found extra data after parsing: START_OBJECT",
           "type" => "illegal_argument_exception"
         },
         "reason" => "failed to parse",
         "root_cause" => [
           %{
             "reason" => "failed to parse",
             "type" => "mapper_parsing_exception"
           }
         ],
         "type" => "mapper_parsing_exception"
       },
       "status" => 400
     }
   }

This implies the ndjson payload created for the bulk api is faulty from what I can tell. Would appreciate any guidance if I'm missing something.

example payload created for bulk api by library

"{\"create\":{\"_id\":\"140071AAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\",\"_index\":\"diagnosis-1684390655188645\"}}\n{\"description\":\"\",\"display_locale\":\"en\",\"display_name\":\"Flail Joint - Paralytic\",\"external_id\":\"140071AAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\",\"id\":\"140071AAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\",\"standards\":[{\"code\":\"239763002\",\"external_id\":\"54779ABBBBBBBBBBBBBBBBBBBBBBBBBBBBBB\",\"from_concept_name_resolved\":\"Flail Joint - Paralytic\",\"id\":\"175214\",\"retired\":false,\"standard\":\"SNOMED-CT\",\"to_concept_name_resolved\":null}]}\n{\"create\":{\"_id\":\"140054AAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\",\"_index\":\"diagnosis-1684390655188645\"}}\n{\"description\":\"\",\"display_locale\":\"en\",\"display_name\":\"Floppy Infant Syndrome\",\"external_id\":\"140054AAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\",\"id\":\"140054AAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\",\"standards\":[{\"code\":\"33010005\",\"external_id\":\"54794ABBBBBBBBBBBBBBBBBBBBBBBBBBBBBB\",\"from_concept_name_resolved\":\"Floppy Infant Syndrome\",\"id\":\"194010\",\"retired\":false,\"standard\":\"SNOMED-CT\",\"to_concept_name_resolved\":null},{\"code\":\"P94.2\",\"external_id\":\"104024ABBBBBBBBBBBBBBBBBBBBBBBBBBBBB\",\"from_concept_name_resolved\":\"Floppy Infant Syndrome\",\"id\":\"168840\",\"retired\":false,\"standard\":\"ICD-10-WHO\",\"to_concept_name_resolved\":\"Congenital hypotonia\"}]}\n{\"create\":{\"_id\":\"140012AAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\",\"_index\":\"diagnosis-1684390655188645\"}}\n{\"description\":\"Cyst due to the occlusion of the duct of a follicle or small gland.\",\"display_locale\":\"en\",\"display_name\":\"Follicular Cyst of Ovary\",\"external_id\":\"140012AAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\",\"id\":\"140012AAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\",\"standards\":[{\"code\":\"2615004\",\"external_id\":\"54834ABBBBBBBBBBBBBBBBBBBBBBBBBBBBBB\",\"from_concept_name_resolved\":\"Follicular Cyst of Ovary\",\"id\":\"174464\",\"retired\":false,\"standard\":\"SNOMED-CT\",\"to_concept_name_resolved\":null},{\"code\":\"N83.0\",\"external_id\":\"118576ABBBBBBBBBBBBBBBBBBBBBBBBBBBBB\",\"from_concept_name_resolved\":\"Follicular Cyst of Ovary\",\"id\":\"197238\",\"retired\":false,\"standard\":\"ICD-10-WHO\",\"to_concept_name_resolved\":\"Follicular cyst of ovary\"}]}\n{\"create\":{\"_id\":\"140008AAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\",\"_index\":\"diagnosis-1684390655188645\"}}\n{\"description\":\"Malignant lymphoma in which the lymphomatous cells are clustered into identifiable nodules within the lymph nodes. The nodules resemble to some extent the germinal centers of lymph node follicles and most likely represent neoplastic proliferation of lymph node-derived follicular center B-lymphocytes. This class of lymphoma usually occurs in older persons, is commonly multinodal, and possibly extranodal. Patients whose lymphomas present a follicular or nodular pattern generally have a more indolent course than those presenting with a diffuse pattern.\",\"display_locale\":\"en\",\"display_name\":\"Follicular Lymphoma\",\"external_id\":\"140008AAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\",\"id\":\"140008AAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\",\"standards\":[{\"code\":\"308121000\",\"external_id\":\"54839ABBBBBBBBBBBBBBBBBBBBBBBBBBBBBB\",\"from_concept_name_resolved\":\"Follicular Lymphoma\",\"id\":\"175378\",\"retired\":false,\"standard\":\"SNOMED-CT\",\"to_concept_name_resolved\":null}]}\n{\"create\":{\"_id\":\"140AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\",\"_index\":\"diagnosis-1684390655188645\"}}\n{\"description\":\"A contagious skin disease caused by a parasitic mite (Sarcoptes scabiei) and characterized by intense itching.\",\"display_locale\":\"en\",\"display_name\":\"Scabies\",\"external_id\":\"140AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\",\"id\":\"140AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\",\"standards\":[{\"code\":\"128869009\",\"external_id\":\"57446ABBBBBBBBBBBBBBBBBBBBBBBBBBBBBB\",\"from_concept_name_resolved\":\"Scabies\",\"id\":\"1070\",\"retired\":false,\"standard\":\"SNOMED-CT\",\"to_concept_name_resolved\":null},{\"code\":\"B86\",\"external_id\":\"87991ABBBBBBBBBBBBBBBBBBBBBBBBBBBBBB\",\"from_concept_name_resolved\":\"Scabies\",\"id\":\"1375\",\"retired\":false,\"standard\":\"ICD-10-WHO\",\"to_concept_name_resolved\":\"Scabies\"}]}\n"
@cdvx
Copy link
Author

cdvx commented May 18, 2023

@danielberkompas if you have the chance, please help point me in the right direction, I've not found much helpful online but I've had no luck for the past week or so.

@fadeojo
Copy link

fadeojo commented Oct 7, 2023

@cdvx did you find a solution to this? @danielberkompas please can you help?

@cdvx
Copy link
Author

cdvx commented Oct 7, 2023

@cdvx did you find a solution to this? @danielberkompas please can you help?

No i didnt, just found a work around, adding the index to the the json index file so it's created when loaded

@krezicoder
Copy link

Hi @cdvx @danielberkompas I am also facing the same issue on Elasticsearch 8.x. Can you pls help in solving this

@krezicoder
Copy link

@cdvx did you find a solution to this? @danielberkompas please can you help?

No i didnt, just found a work around, adding the index to the the json index file so it's created when loaded

@cdvx can you share what did you add to the json index file ? Or a sample format of this file

@EdmundMai
Copy link

EdmundMai commented Jan 15, 2024

Anyone have any luck on this? @krezicoder did you find a fix?

@lovebes
Copy link

lovebes commented Jan 18, 2024

Hello everyone - this is because of Elasticsearch 8.x

https://stackoverflow.com/questions/33340153/elasticsearch-bulk-index-json-data

Based on the post - 7.x wants this, and what the libarary calls:

curl -XPOST localhost:9200/index_local/_doc/_bulk --data-binary  @/home/data1.json

However in 8.x, _doc/ should be removed:

curl -XPOST localhost:9200/index_local/_bulk --data-binary  @/home/data1.json`

My tests via Kibana does confirm this.

This code needs to change the prefix based on some config.

@EdmundMai
Copy link

EdmundMai commented Jan 20, 2024

Not sure when @danielberkompas will be updating this repo... so if anyone is looking, my workaround was to:

  1. Copy and paste the file @lovebes pasted above and remove the _doc/ in the URL
  2. Since I wanted to upsert and not create (because I am uploading the same documents multiple times and want them to be overwritten and not error out), I had to change my Elastic.Document record to follow the { "doc: { ... }, "doc_as_upsert": true format

Code:

lib/h1bjobs/elasticsearch/bulk.ex

defmodule Elasticsearch.Index.BulkV2 do
  @moduledoc """
  Functions for creating bulk indexing requests.
  """

  alias Elasticsearch.{
    Cluster,
    Document
  }

  require Logger

  @doc """
  Encodes a given variable into an Elasticsearch bulk request. The variable
  must implement `Elasticsearch.Document`.

  ## Examples

      iex> Bulk.encode(Cluster, %Post{id: "my-id"}, "my-index")
      {:ok, \"\"\"
      {"create":{"_index":"my-index","_id":"my-id"}}
      {"doctype":{"name":"post"},"author":null,"title":null}
      \"\"\"}

      iex> Bulk.encode(Cluster, 123, "my-index")
      {:error,
        %Protocol.UndefinedError{description: "",
        protocol: Elasticsearch.Document, value: 123}}
  """
  @spec encode(Cluster.t(), struct, String.t(), String.t()) ::
          {:ok, String.t()}
          | {:error, Error.t()}
  def encode(cluster, struct, index, action \\ "create") do
    {:ok, encode!(cluster, struct, index, action)}
  rescue
    exception ->
      {:error, exception}
  end

  @doc """
  Same as `encode/3`, but returns the request and raises errors.

  ## Example

      iex> Bulk.encode!(Cluster, %Post{id: "my-id"}, "my-index")
      \"\"\"
      {"create":{"_index":"my-index","_id":"my-id"}}
      {"doctype":{"name":"post"},"author":null,"title":null}
      \"\"\"

      iex> Bulk.encode!(Cluster, 123, "my-index")
      ** (Protocol.UndefinedError) protocol Elasticsearch.Document not implemented for 123 of type Integer
  """
  def encode!(cluster, struct, index, action \\ "create") do
    config = Cluster.Config.get(cluster)
    header = header(config, action, index, struct)

    document =
      struct
      |> Document.encode()
      |> config.json_library.encode!()

    "#{header}\n#{document}\n"
  end

  defp header(config, type, index, struct) do
    attrs = %{
      "_index" => index,
      "_id" => Document.id(struct)
    }

    attrs =
      if routing = Document.routing(struct) do
        Map.put(attrs, "_routing", routing)
      else
        attrs
      end

    config.json_library.encode!(%{type => attrs})
  end

  @doc """
  Uploads all the data from the list of `sources` to the given index.
  Data for each `source` will be fetched using the configured `:store`.
  """
  @spec upload(Cluster.t(), index_name :: String.t(), Elasticsearch.Store.t(), list) ::
          :ok | {:error, [map]}
  def upload(cluster, index_name, index_config, errors \\ [])
  def upload(_cluster, _index_name, %{sources: []}, []), do: :ok
  def upload(_cluster, _index_name, %{sources: []}, errors), do: {:error, errors}

  def upload(
        cluster,
        index_name,
        %{store: store, sources: [source | tail]} = index_config,
        errors
      )
      when is_atom(store) do
    config = Cluster.Config.get(cluster)
    bulk_page_size = index_config[:bulk_page_size] || 5000
    bulk_wait_interval = index_config[:bulk_wait_interval] || 0
    action = index_config[:bulk_action] || "create"

    errors =
      store.transaction(fn ->
        source
        |> store.stream()
        |> Stream.map(&encode!(config, &1, index_name, action))
        |> Stream.chunk_every(bulk_page_size)
        |> Stream.intersperse(bulk_wait_interval)
        |> Stream.map(&put_bulk_page(config, index_name, &1))
        |> Enum.reduce(errors, &collect_errors(&1, &2, action))
      end)

    upload(config, index_name, %{index_config | sources: tail}, errors)
  end

  defp put_bulk_page(_config, _index_name, wait_interval) when is_integer(wait_interval) do
    Logger.debug("Pausing #{wait_interval}ms between bulk pages")
    :timer.sleep(wait_interval)
  end

  defp put_bulk_page(config, index_name, items) when is_list(items) do
    Elasticsearch.put(config, "/#{index_name}/_bulk", Enum.join(items))
  end

  defp collect_errors({:ok, %{"errors" => true} = response}, errors, action) do
    new_errors =
      response["items"]
      |> Enum.filter(&(&1[action]["error"] != nil))
      |> Enum.map(& &1[action])
      |> Enum.map(&Elasticsearch.Exception.exception(response: &1))

    new_errors ++ errors
  end

  defp collect_errors({:error, error}, errors, _action) do
    [error | errors]
  end

  defp collect_errors(_response, errors, _action) do
    errors
  end
end

lib/h1bjobs/elasticsearch/models/job_listing.ex

defimpl Elasticsearch.Document, for: H1bjobs.JobListing do
  def id(job_listing), do: job_listing.id
  def routing(_), do: false

  def encode(job_listing) do
    %{
      doc: %{
        description: job_listing.description,
      },
      doc_as_upsert: true
    }
  end
end

And then I run it like this:

Elasticsearch.Index.BulkV2.upload(MyApp.ElasticsearchCluster, :jobs, %{store: MyApp.ElasticsearchStore, sources: [MyApp.JobListing], bulk_action: "update"}, [Exception])

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants