Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cannot use data stream with time_series mode #723

Open
jpthomasset opened this issue Oct 24, 2023 · 2 comments
Open

Cannot use data stream with time_series mode #723

jpthomasset opened this issue Oct 24, 2023 · 2 comments

Comments

@jpthomasset
Copy link

When using a data stream with "index.mode": "time_series" the connector fails to index data as it adds the document id to the index request and this is not supported.

Error:

[2023-10-24 09:24:02,348] ERROR Encountered an illegal document error 'ElasticsearchException[Elasticsearch exception [type=mapper_parsing_exception, reason=failed to parse]]; nested: ElasticsearchException[Elasticsearch exception [type=illegal_argument_exception, reason=_id must be unset or set to [JqqsQRVQfUc-4MsEAAABi1_9F4I] but was [10955520-4510-493f-be59-84dcca6e5e37] because [.ds-metrics-package-real-time-analytics.package-state-metrics.events-2023.10.23-000001] is in time_series mode]];'. To ignore future records like this, change the configuration 'behavior.on.malformed.documents' to 'IGNORE'. (io.confluent.connect.elasticsearch.ElasticsearchClient) 

Solution would be to allow to unset id or ignore id when creating the IndexRequest.

A potential workaround would be to merge this PR #679

@morrone
Copy link

morrone commented Jan 19, 2024

Agreed! We needed to add an elasticsearch ingest pipeline to strip the id, but for other reasons. Not ideal. Giving us a a connector configuration option to opt out of the id would be great. I don't know yet if our pipeline to strip the id will work with index.mode set to time_series.

@jpthomasset
Copy link
Author

I ended up forking the repo and adding an extra setting allowing to not set ID when creating the index request. jpthomasset@7b44693
Very basic implementation but it solved my problem for an experiment.

And then building the image is done with

ARG PLUGINS_FOLDER=plugins

FROM maven:3.9.5-eclipse-temurin-11 as maven-builder
COPY ./custom-elasticsearch-plugin /mvn-build
WORKDIR /mvn-build

RUN git clone https://github.com/jpthomasset/kafka-connect-elasticsearch.git --branch unset-key --single-branch

WORKDIR /mvn-build/kafka-connect-elasticsearch
RUN mvn -s /mvn-build/m2-settings.xml -P standalone package

FROM docker-registry-proxy.internal.stuart.com/confluentinc/cp-server-connect-base:7.5.0

ARG PLUGINS_FOLDER
RUN mkdir -p /usr/share/java/kafka-connect-storage-common
# Install S3 sink, transform plugins
RUN     confluent-hub install --no-prompt --component-dir /usr/share/java/kafka-connect-storage-common confluentinc/kafka-connect-s3:latest \
    &&  confluent-hub install --no-prompt --component-dir /usr/share/java/kafka-connect-storage-common confluentinc/connect-transforms:latest

# Disable standard elastic search plugin
#    &&  confluent-hub install --no-prompt --component-dir /usr/share/java/kafka-connect-storage-common confluentinc/kafka-connect-elasticsearch:latest

# Add custom elasticsearch plugin with feature to not push id
COPY --from=maven-builder /mvn-build/kafka-connect-elasticsearch/target/kafka-connect-elasticsearch-*-SNAPSHOT-standalone.jar /usr/share/java/kafka-connect-storage-common/

ENV CONNECT_REST_ADVERTISED_PORT 8084

And you can then add a key.unset setting to your kafka connector

{
    "name": "some-connector-name",
    "config": {
       [...]
        "key.unset": "true",
       [...]
    }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants