Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Content-Type header being changed to "text/plain" from "application/json" somewhere in the process of publishing a document to self-hosted Elasticsearch #2092

Open
ghettosamson opened this issue Dec 11, 2023 · 5 comments

Comments

@ghettosamson
Copy link

馃悰 Bug Report

When running my NodeJS application deployed on AWS Lambda, the Content-Type header is being changed to text/plain which causes Elasticsearch to not ingest the document.

To Reproduce

Steps to reproduce the behavior:

  • Deploy code to AWS Lambda as a Docker container.
  • Run a test run with the below event.
  • You will get an error when publishing the document to elasticsearch.

Error below:

status: 'rejected',
  reason: ResponseError: {"error":"Content-Type header [text/plain] is not supported","status":406}
      at SniffingTransport.request (/var/task/node_modules/@elastic/transport/lib/Transport.js:479:27)
      at process.processTicksAndRejections (node:internal/process/task_queues:95:5)
      at async Client.CreateApi [as create] (/var/task/node_modules/@elastic/elasticsearch/lib/api/api/create.js:43:12)
      at async Promise.allSettled (index 1)
      at async Runtime.processEvent [as handler] (/var/task/index.js:104:27) {
    meta: {
      body: [Object],
      statusCode: 406,
      headers: [Object],
      meta: [Object],
      warnings: [Getter]
    }

AWS Event being processed

{
  "Records": [
    {
      "Sns": {
        "Message": {
          "notificationType": "Bounce",
          "bounce": {
            "feedbackId": "e45dcc17-e041-4e07-80cc-d4c0ef13a948-000000",
            "bounceType": "Permanent",
            "bounceSubType": "General",
            "bouncedRecipients": [
              {
                "emailAddress": "bounce@simulator.amazonses.com",
                "action": "failed",
                "status": "5.1.1",
                "diagnosticCode": "smtp; 550 5.1.1 user unknown"
              }
            ],
            "timestamp": "2023-12-08T21:29:48.000Z",
            "remoteMtaIp": "66.164.6.66",
            "reportingMTA": "dns; a8-59.smtp-out.amazonses.com"
          },
          "mail": {
            "timestamp": "2023-12-08T21:29:48.153Z",
            "source": "mebroh-bounce@mydomain.org",
            "sourceArn": "arn:aws:ses:us-east-1:myaccountbroh:identity/mydomain.org",
            "sourceIp": "333.333.18.55",
            "callerIdentity": "myidentity",
            "sendingAccountId": "myaccountbroh",
            "messageId": "433d8581-71da-470f-9789-2cb21c532a4f-000000",
            "destination": [
              "bounce@simulator.amazonses.com"
            ],
            "headersTruncated": false,
            "headers": [
              {
                "name": "From",
                "value": "mebroh-bounce@mydomain.org"
              },
              {
                "name": "To",
                "value": "bounce@simulator.amazonses.com"
              },
              {
                "name": "Subject",
                "value": "Bounce 8 Dec 2023 4:29PM"
              },
              {
                "name": "MIME-Version",
                "value": "1.0"
              },
              {
                "name": "Content-Type",
                "value": "multipart/alternative;  boundary=\"----=_Part_539327_1734740786.1702070988153\""
              }
            ],
            "commonHeaders": {
              "from": [
                "mebroh-bounce@mydomain.org"
              ],
              "to": [
                "bounce@simulator.amazonses.com"
              ],
              "subject": "Bounce 8 Dec 2023 4:29PM"
            }
          }
        }
      }
    }
  ]
}

Paste your code here:
Here is the entire Dockerfile

FROM amazon/aws-lambda-nodejs:20
WORKDIR $LAMBDA_TASK_ROOT
# Copy only the files for running in Lambda
COPY package*.json .
RUN npm ci --omit=dev
COPY ./ ./

CMD [ "index.processEvent" ]

Here is the javascript code processing the event:

const {cloudwatch, app, elasticsearch, index} = require('./config');
const {CloudWatchLogs} = require('@aws-sdk/client-cloudwatch-logs');
const client = new CloudWatchLogs(cloudwatch.clientConfig);
const {randomBytes} = require('node:crypto');
const { Client } = require('@elastic/elasticsearch');
const esClient = new Client(elasticsearch);
//console.log('Created new elasticsearch client with node', elasticsearch.node);

/**
 * Receives and event from SES, creates a new CloudWatch log stream, and publishes the event to a CloudWatch log group
 * and publishes that same event to Elasticsearch
 * @param {Object} event
 * @param {Object} context
 * @return {Object}
 */
async function processEvent(event, context) {
    console.log('Processing awsRequestId and event', context.awsRequestId, event);
    try {
        // First AWS CloudWatch requires us to create a Log Stream
        const date = new Date();
        const buffer = randomBytes(20);
        const logStreamName =
            `${date.getFullYear()}/${date.getMonth() + 1}/${date.getDate()}[$LATEST]${buffer.toString('hex')}`; // yyyy/mm/dd[$LATEST]
        console.info('Creating log stream with name', logStreamName);

        let promises = [];
        // create the new log stream as required by CloudWatch
        promises.push(client.createLogStream({
            logGroupName: cloudwatch.logGroup,
            logStreamName,
        }));

        // get all the Datastreams from Elasticsearch
        promises.push(esClient.indices.getDataStream());

        let settledPromises = await Promise.allSettled(promises);

        // the datastream will be the 2nd promises value
        if (settledPromises[1].status !== 'fulfilled') {
            console.error('The elasticsearch data streams promises did not succeed', settledPromises[1]);
            process.exit(1);
        }
        const ds = settledPromises[1].value;
        console.log('Settled Promise [1]', settledPromises[1]);

        // Find the data stream that matches the current elastic stack version
        let latestDataStream;
        let found = false;
        for (const dataStream of ds.data_streams) {
            if (dataStream.name.includes(index.version)) {
                latestDataStream = dataStream;
                found = true;
                console.log('Found the latest data stream', latestDataStream);
                break;
            }
        }

        if (!found) {
            latestDataStream = ds.data_streams[0];
            console.log('Using Data Stream', latestDataStream);
        }

        const logEvents = [];
        event.Records.forEach(function (record) {
            console.info('Message string', record.Sns.Message);
            const logEvent = {
                message: record.Sns.Message,
                timestamp: date.valueOf(),
            };
            logEvents.push(logEvent);
        });
        const logEventsRequest = {
            logGroupName: cloudwatch.logGroup,
            logEvents: logEvents,
            logStreamName,
        };
        promises = [];
        console.info('Putting the log events', logEventsRequest);
        promises.push(client.putLogEvents(logEventsRequest));

        // Safe to always use the first datastream from the response
        const {name} = latestDataStream;
        const esDocuments = [];

        // iterate through all the events and publish each one as a separate document to elasticsearch
        logEvents.forEach((logEvent) => {
            // Clone the original document and then add the fields we need, @timestamp and any other custom labels
            const eventDocument = structuredClone(logEvent.message);
            eventDocument['log.source-app'] = app.name;
            eventDocument['@timestamp'] = logEvent.timestamp;
            const documentBuffer = randomBytes(8);

            // Create a new document id for each document created
            const documentId = `ses-event-${documentBuffer.toString('hex')}`;
            console.log(`Creating the new document in elasticsearch with id ${documentId} to stream ${name}`);
            promises.push(esClient.create({
                id: documentId,
                index: name,
                document: eventDocument,
            }));
            esDocuments.push(eventDocument);
         });

        // Now wait for all the promises to be settled
        settledPromises = await Promise.allSettled(promises);
        console.info(`All Promises settled ${settledPromises.length}`);

        // Iterate through the promises to ensure they all succeeded
        for (const settledPromise of settledPromises) {
            if (settledPromise.status !== 'fulfilled') {
                console.error('Promise was not fulfilled', settledPromise);
                return;
            }
            console.log('Promise was fulfilled', settledPromise);
        }

        return {
            esDocuments,
            logEvents,
        };
    } catch (error) {
        console.error('Error sending the log events', error);
        process.exit(1);
    }
}

module.exports = {
    processEvent,
};

elasticsearch client configuration:

const fs = require('node:fs');
const config = {
    cloudwatch: {
        clientConfig: {
            region: process.env.REGION || 'us-east-1',
        },
        logGroup: process.env.LOG_GROUP,
    },
    elasticsearch: {
        //node: process.env.ELASTICSEARCH_HOST,
        node: {
            url: new URL(process.env.ELASTICSEARCH_HOST),
            headers: {
                'content-type': 'application/vnd.elasticsearch+json; compatible-with=8',
            },
        },
        auth: {
            username: process.env.ELASTICSEARCH_USERNAME,
            password: process.env.ELASTICSEARCH_PASSWORD,
        },
        tls: {
            ca: fs.readFileSync(process.env.CA_FILE),
            rejectUnauthorized: true
        }
    },
    app: {
        name: 'ses-event-logger',
    },
    index: {
        version: process.env.ELASTIC_STACK_VERSION
    }
};

module.exports = config;

Expected behavior

I expect the document to successfully be published to elasticsearch instead of getting a header error.

Paste the results here:

Creating the new document in elasticsearch with id ses-event-f43f4a04ec2a46ca to stream filebeat-8.10.2
All Promises settled 2

When I run this code locally on an M2 Mac with the latest MacOS, it works. It works as a node app outside of Docker and inside of Docker with the same Dockerfile as above using the following link to testing images locally but it fails when deployed to AWS Lambda

Environment

  • node version: 20
  • @elastic/elasticsearch version: >=8.10.0
  • os: Linux
  • Elasticsearch version: 8.11.1 running on AWS Fargate behind an AWS ALB
@JoshMock
Copy link
Member

@ghettosamson Can you use the observability EventEmitter to listen to request events and paste the output here? Just make sure to scrub any private info from the data before pasting.

I'd also like to know where your Elasticsearch instance is hosted in relation to your Lambda code. On AWS in an EC2 instance? On Elastic Cloud? Any details about what networking or proxies sit between them may be useful for debugging this.

@ghettosamson
Copy link
Author

ghettosamson commented Dec 13, 2023

@JoshMock I added the following code:

esClient.diagnostic.on('request', (err, result) => {
    if (err) {
        console.log('Received error from request, log below');
        console.error(err);
    } else {
        console.log('Received non-error from request, log below');
        console.info(result);

        if (result?.meta?.connection?.headers) {
            console.log('Logging the headers', result.meta.connection.headers);
        } else {
            console.log('The result.meta.connection.headers was not populated');
        }
    }
});

and here are the log messages

INFO Received non-error from request, log below
INFO	{
  body: undefined,
  statusCode: 0,
  headers: {},
  meta: {
    context: null,
    request: { params: [Object], options: {}, id: 2 },
    name: 'elasticsearch-js',
    connection: {
      url: 'https://myurl:9200/',
      id: 'https://myurl:9200/',
      headers: [Object],
      status: 'alive'
    },
    attempts: 0,
    aborted: false
  },
  warnings: [Getter]
}
INFO { body: undefined, statusCode: 0, headers: {}, meta: { context: null, request: { params: [Object], options: {}, id: 2 }, name: 'elasticsearch-js', connection: { url: 'https://myurl:9200/', id: 'https://myurl:9200/', headers: [Object], status: 'alive' }, attempts: 0, aborted: false }, warnings: [Getter] }
INFO	Logging the headers {
  'content-type': 'application/vnd.elasticsearch+json; compatible-with=8',
  authorization: 'Basic abunchofrandomcharacterssendthemtothemoon='
}


ERROR	Promise was not fulfilled {
  status: 'rejected',
  reason: ResponseError: {"error":"Content-Type header [text/plain] is not supported","status":406}
      at SniffingTransport.request (/var/task/node_modules/@elastic/transport/lib/Transport.js:479:27)
      at process.processTicksAndRejections (node:internal/process/task_queues:95:5)
      at async Client.CreateApi [as create] (/var/task/node_modules/@elastic/elasticsearch/lib/api/api/create.js:43:12)
      at async Promise.allSettled (index 1)
      at async Runtime.processEvent [as handler] (/var/task/index.js:123:27) {
    meta: {
      body: [Object],
      statusCode: 406,
      headers: [Object],
      meta: [Object],
      warnings: [Getter]
    }
  }
}
ERROR Promise was not fulfilled { status: 'rejected', reason: ResponseError: {"error":"Content-Type header [text/plain] is not supported","status":406} at SniffingTransport.request (/var/task/node_modules/@elastic/transport/lib/Transport.js:479:27) at process.processTicksAndRejections (node:internal/process/task_queues:95:5) at async Client.CreateApi [as create] (/var/task/node_modules/@elastic/elasticsearch/lib/api/api/create.js:43:12) at async Promise.allSettled (index 1) at async Runtime.processEvent [as handler] (/var/task/index.js:123:27) { meta: { body: [Object], statusCode: 406, headers: [Object], meta: [Object], warnings: [Getter] } } }

ERROR	reason.meta {
  body: {
    error: 'Content-Type header [text/plain] is not supported',
    status: 406
  },
  statusCode: 406,
  headers: {
    date: 'Wed, 13 Dec 2023 23:08:07 GMT',
    'content-type': 'application/json',
    'content-length': '74',
    connection: 'keep-alive',
    'set-cookie': [
      'AWSALB=dUQ2JX1P/YW/IKgpBcPjeIlT6d3rZLVkHOY7jqRzEYibZrPMHIev4baEc1QZEpNsQNFCgUE7nmvPA8pC; Expires=Wed, 20 Dec 2023 23:08:07 GMT; Path=/',
      'AWSALBCORS=dUQ2JX1P/YW/lT6d3rZLVkHOY7jqRzEYibZrPMHIev4baEc1QZEpNsQNFCgUE7nmvPA8pC; Expires=Wed, 20 Dec 2023 23:08:07 GMT; Path=/; SameSite=None; Secure'
    ],
    'x-elastic-product': 'Elasticsearch'
  },
  meta: {
    context: null,
    request: { params: [Object], options: {}, id: 2 },
    name: 'elasticsearch-js',
    connection: {
      url: 'https://myurl:9200/',
      id: 'https://myurl:9200/',
      headers: [Object],
      status: 'alive'
    },
    attempts: 0,
    aborted: false
  },
  warnings: [Getter]
}
ERROR reason.meta { body: { error: 'Content-Type header [text/plain] is not supported', status: 406 }, statusCode: 406, headers: { date: 'Wed, 13 Dec 2023 23:08:07 GMT', 'content-type': 'application/json', 'content-length': '74', connection: 'keep-alive', 'set-cookie': [ 'AWSALB=dUQ2JX1P/YW/xs6h6Wn74jIKgpBcPjeIlT6d3rZLVkHOY7jqRzEYibZrPMHIev4baEc1QZEpNsQNFCgUE7nmvPA8pC; Expires=Wed, 20 Dec 2023 23:08:07 GMT; Path=/', 'AWSALBCORS=dUQ2JX1P/YW/s6h6Wn74jIKgpBcPjeIlT6d3rZLVkHOY7jqRzEYibZrPMHIev4baEc1QZEpNsQNFCgUE7nmvPA8pC; Expires=Wed, 20 Dec 2023 23:08:07 GMT; Path=/; SameSite=None; Secure' ], 'x-elastic-product': 'Elasticsearch' }, meta: { context: null, request: { params: [Object], options: {}, id: 2 }, name: 'elasticsearch-js', connection: { url: 'https://myurl:9200/', id: 'https://myurl:9200/', headers: [Object], status: 'alive' }, attempts: 0, aborted: false }, warnings: [Getter] }

settledPromise.reason.meta.meta.connection.headers {
  'content-type': 'application/vnd.elasticsearch+json; compatible-with=8',
  authorization: 'Basic abunchofrandomcharacterssendthemtothemoon='

My Elasticsearch instance is running as a Docker container in Fargate in the same VPC, in one of 3 private subnets, behind an ALB also in one of the same 3 private subnets. I don't see in any of the logs the content-type header being [text/plain].

@picheljitsu
Copy link

I ran into the same issue. I'm thinking it has something to do with how the body/document parameter is read into the buffer? I've tried multiple techniques to handle the encoding and it looks like it attempts to detect the type or the encoding, fails, thus also failing to set the content-type header? just a swag

{
  index: 'test',
  type: '_doc',
  body: '\x00\x00\x00\x00\x00\x00\x00\x00\x00{"a":"b"}'
}

Here, I just pass a simple {"a":"b"} object

@picheljitsu
Copy link

Ok, for my instance, I believe our reverse proxy was replace the content-type which was causing es to incorrectly parse the request. I noticed on the error response the host field said the response was from the rev proxy.

@JoshMock
Copy link
Member

Reverse proxies are definitely an important place to check when you get an unexpected content type on a response.

@ghettosamson Are you still experiencing this or is yours also caused by a reverse proxy or similar? If it's still an issue, in your sample output, instead of logging result can you serialize to JSON (JSON.stringify) first so that we can see the nested data rather than just [Object]?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants