Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Indexing with hot_swap or build always fails with Malformed content, found extra data after parsing: START_OBJECT #113

cdvx opened this issue May 18, 2023 · 8 comments


Copy link

cdvx commented May 18, 2023

When creating a new index and loading data, the index is created fine but when uploading the data to elasticsearch, it returns

     status: 400,
     line: nil,
     col: nil,
     message: "failed to parse",
     type: "mapper_parsing_exception",
     query: nil,
     raw: %{
       "error" => %{
         "caused_by" => %{
           "reason" => "Malformed content, found extra data after parsing: START_OBJECT",
           "type" => "illegal_argument_exception"
         "reason" => "failed to parse",
         "root_cause" => [
             "reason" => "failed to parse",
             "type" => "mapper_parsing_exception"
         "type" => "mapper_parsing_exception"
       "status" => 400

This implies the ndjson payload created for the bulk api is faulty from what I can tell. Would appreciate any guidance if I'm missing something.

example payload created for bulk api by library

"{\"create\":{\"_id\":\"140071AAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\",\"_index\":\"diagnosis-1684390655188645\"}}\n{\"description\":\"\",\"display_locale\":\"en\",\"display_name\":\"Flail Joint - Paralytic\",\"external_id\":\"140071AAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\",\"id\":\"140071AAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\",\"standards\":[{\"code\":\"239763002\",\"external_id\":\"54779ABBBBBBBBBBBBBBBBBBBBBBBBBBBBBB\",\"from_concept_name_resolved\":\"Flail Joint - Paralytic\",\"id\":\"175214\",\"retired\":false,\"standard\":\"SNOMED-CT\",\"to_concept_name_resolved\":null}]}\n{\"create\":{\"_id\":\"140054AAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\",\"_index\":\"diagnosis-1684390655188645\"}}\n{\"description\":\"\",\"display_locale\":\"en\",\"display_name\":\"Floppy Infant Syndrome\",\"external_id\":\"140054AAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\",\"id\":\"140054AAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\",\"standards\":[{\"code\":\"33010005\",\"external_id\":\"54794ABBBBBBBBBBBBBBBBBBBBBBBBBBBBBB\",\"from_concept_name_resolved\":\"Floppy Infant Syndrome\",\"id\":\"194010\",\"retired\":false,\"standard\":\"SNOMED-CT\",\"to_concept_name_resolved\":null},{\"code\":\"P94.2\",\"external_id\":\"104024ABBBBBBBBBBBBBBBBBBBBBBBBBBBBB\",\"from_concept_name_resolved\":\"Floppy Infant Syndrome\",\"id\":\"168840\",\"retired\":false,\"standard\":\"ICD-10-WHO\",\"to_concept_name_resolved\":\"Congenital hypotonia\"}]}\n{\"create\":{\"_id\":\"140012AAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\",\"_index\":\"diagnosis-1684390655188645\"}}\n{\"description\":\"Cyst due to the occlusion of the duct of a follicle or small gland.\",\"display_locale\":\"en\",\"display_name\":\"Follicular Cyst of Ovary\",\"external_id\":\"140012AAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\",\"id\":\"140012AAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\",\"standards\":[{\"code\":\"2615004\",\"external_id\":\"54834ABBBBBBBBBBBBBBBBBBBBBBBBBBBBBB\",\"from_concept_name_resolved\":\"Follicular Cyst of Ovary\",\"id\":\"174464\",\"retired\":false,\"standard\":\"SNOMED-CT\",\"to_concept_name_resolved\":null},{\"code\":\"N83.0\",\"external_id\":\"118576ABBBBBBBBBBBBBBBBBBBBBBBBBBBBB\",\"from_concept_name_resolved\":\"Follicular Cyst of Ovary\",\"id\":\"197238\",\"retired\":false,\"standard\":\"ICD-10-WHO\",\"to_concept_name_resolved\":\"Follicular cyst of ovary\"}]}\n{\"create\":{\"_id\":\"140008AAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\",\"_index\":\"diagnosis-1684390655188645\"}}\n{\"description\":\"Malignant lymphoma in which the lymphomatous cells are clustered into identifiable nodules within the lymph nodes. The nodules resemble to some extent the germinal centers of lymph node follicles and most likely represent neoplastic proliferation of lymph node-derived follicular center B-lymphocytes. This class of lymphoma usually occurs in older persons, is commonly multinodal, and possibly extranodal. Patients whose lymphomas present a follicular or nodular pattern generally have a more indolent course than those presenting with a diffuse pattern.\",\"display_locale\":\"en\",\"display_name\":\"Follicular Lymphoma\",\"external_id\":\"140008AAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\",\"id\":\"140008AAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\",\"standards\":[{\"code\":\"308121000\",\"external_id\":\"54839ABBBBBBBBBBBBBBBBBBBBBBBBBBBBBB\",\"from_concept_name_resolved\":\"Follicular Lymphoma\",\"id\":\"175378\",\"retired\":false,\"standard\":\"SNOMED-CT\",\"to_concept_name_resolved\":null}]}\n{\"create\":{\"_id\":\"140AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\",\"_index\":\"diagnosis-1684390655188645\"}}\n{\"description\":\"A contagious skin disease caused by a parasitic mite (Sarcoptes scabiei) and characterized by intense itching.\",\"display_locale\":\"en\",\"display_name\":\"Scabies\",\"external_id\":\"140AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\",\"id\":\"140AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\",\"standards\":[{\"code\":\"128869009\",\"external_id\":\"57446ABBBBBBBBBBBBBBBBBBBBBBBBBBBBBB\",\"from_concept_name_resolved\":\"Scabies\",\"id\":\"1070\",\"retired\":false,\"standard\":\"SNOMED-CT\",\"to_concept_name_resolved\":null},{\"code\":\"B86\",\"external_id\":\"87991ABBBBBBBBBBBBBBBBBBBBBBBBBBBBBB\",\"from_concept_name_resolved\":\"Scabies\",\"id\":\"1375\",\"retired\":false,\"standard\":\"ICD-10-WHO\",\"to_concept_name_resolved\":\"Scabies\"}]}\n"
Copy link

cdvx commented May 18, 2023

@danielberkompas if you have the chance, please help point me in the right direction, I've not found much helpful online but I've had no luck for the past week or so.

Copy link

fadeojo commented Oct 7, 2023

@cdvx did you find a solution to this? @danielberkompas please can you help?

Copy link

cdvx commented Oct 7, 2023

@cdvx did you find a solution to this? @danielberkompas please can you help?

No i didnt, just found a work around, adding the index to the the json index file so it's created when loaded

Copy link

Hi @cdvx @danielberkompas I am also facing the same issue on Elasticsearch 8.x. Can you pls help in solving this

Copy link

@cdvx did you find a solution to this? @danielberkompas please can you help?

No i didnt, just found a work around, adding the index to the the json index file so it's created when loaded

@cdvx can you share what did you add to the json index file ? Or a sample format of this file

Copy link

EdmundMai commented Jan 15, 2024

Anyone have any luck on this? @krezicoder did you find a fix?

Copy link

lovebes commented Jan 18, 2024

Hello everyone - this is because of Elasticsearch 8.x

Based on the post - 7.x wants this, and what the libarary calls:

curl -XPOST localhost:9200/index_local/_doc/_bulk --data-binary  @/home/data1.json

However in 8.x, _doc/ should be removed:

curl -XPOST localhost:9200/index_local/_bulk --data-binary  @/home/data1.json`

My tests via Kibana does confirm this.

This code needs to change the prefix based on some config.

Copy link

EdmundMai commented Jan 20, 2024

Not sure when @danielberkompas will be updating this repo... so if anyone is looking, my workaround was to:

  1. Copy and paste the file @lovebes pasted above and remove the _doc/ in the URL
  2. Since I wanted to upsert and not create (because I am uploading the same documents multiple times and want them to be overwritten and not error out), I had to change my Elastic.Document record to follow the { "doc: { ... }, "doc_as_upsert": true format



defmodule Elasticsearch.Index.BulkV2 do
  @moduledoc """
  Functions for creating bulk indexing requests.

  alias Elasticsearch.{

  require Logger

  @doc """
  Encodes a given variable into an Elasticsearch bulk request. The variable
  must implement `Elasticsearch.Document`.

  ## Examples

      iex> Bulk.encode(Cluster, %Post{id: "my-id"}, "my-index")
      {:ok, \"\"\"

      iex> Bulk.encode(Cluster, 123, "my-index")
        %Protocol.UndefinedError{description: "",
        protocol: Elasticsearch.Document, value: 123}}
  @spec encode(Cluster.t(), struct, String.t(), String.t()) ::
          {:ok, String.t()}
          | {:error, Error.t()}
  def encode(cluster, struct, index, action \\ "create") do
    {:ok, encode!(cluster, struct, index, action)}
    exception ->
      {:error, exception}

  @doc """
  Same as `encode/3`, but returns the request and raises errors.

  ## Example

      iex> Bulk.encode!(Cluster, %Post{id: "my-id"}, "my-index")

      iex> Bulk.encode!(Cluster, 123, "my-index")
      ** (Protocol.UndefinedError) protocol Elasticsearch.Document not implemented for 123 of type Integer
  def encode!(cluster, struct, index, action \\ "create") do
    config = Cluster.Config.get(cluster)
    header = header(config, action, index, struct)

    document =
      |> Document.encode()
      |> config.json_library.encode!()


  defp header(config, type, index, struct) do
    attrs = %{
      "_index" => index,
      "_id" =>

    attrs =
      if routing = Document.routing(struct) do
        Map.put(attrs, "_routing", routing)

    config.json_library.encode!(%{type => attrs})

  @doc """
  Uploads all the data from the list of `sources` to the given index.
  Data for each `source` will be fetched using the configured `:store`.
  @spec upload(Cluster.t(), index_name :: String.t(), Elasticsearch.Store.t(), list) ::
          :ok | {:error, [map]}
  def upload(cluster, index_name, index_config, errors \\ [])
  def upload(_cluster, _index_name, %{sources: []}, []), do: :ok
  def upload(_cluster, _index_name, %{sources: []}, errors), do: {:error, errors}

  def upload(
        %{store: store, sources: [source | tail]} = index_config,
      when is_atom(store) do
    config = Cluster.Config.get(cluster)
    bulk_page_size = index_config[:bulk_page_size] || 5000
    bulk_wait_interval = index_config[:bulk_wait_interval] || 0
    action = index_config[:bulk_action] || "create"

    errors =
      store.transaction(fn ->
        |>!(config, &1, index_name, action))
        |> Stream.chunk_every(bulk_page_size)
        |> Stream.intersperse(bulk_wait_interval)
        |>, index_name, &1))
        |> Enum.reduce(errors, &collect_errors(&1, &2, action))

    upload(config, index_name, %{index_config | sources: tail}, errors)

  defp put_bulk_page(_config, _index_name, wait_interval) when is_integer(wait_interval) do
    Logger.debug("Pausing #{wait_interval}ms between bulk pages")

  defp put_bulk_page(config, index_name, items) when is_list(items) do
    Elasticsearch.put(config, "/#{index_name}/_bulk", Enum.join(items))

  defp collect_errors({:ok, %{"errors" => true} = response}, errors, action) do
    new_errors =
      |> Enum.filter(&(&1[action]["error"] != nil))
      |> &1[action])
      |> &1))

    new_errors ++ errors

  defp collect_errors({:error, error}, errors, _action) do
    [error | errors]

  defp collect_errors(_response, errors, _action) do


defimpl Elasticsearch.Document, for: H1bjobs.JobListing do
  def id(job_listing), do:
  def routing(_), do: false

  def encode(job_listing) do
      doc: %{
        description: job_listing.description,
      doc_as_upsert: true

And then I run it like this:

Elasticsearch.Index.BulkV2.upload(MyApp.ElasticsearchCluster, :jobs, %{store: MyApp.ElasticsearchStore, sources: [MyApp.JobListing], bulk_action: "update"}, [Exception])

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
None yet
None yet

Successfully merging a pull request may close this issue.

5 participants