Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spring data elasticsearch cannot create index of type Data Stream #2590

Open
patpatpat123 opened this issue Jun 13, 2023 · 3 comments
Open
Labels
type: enhancement A general enhancement

Comments

@patpatpat123
Copy link

patpatpat123 commented Jun 13, 2023

Hello team,

I wanted to reach out with an issue observed while trying to use the method save() and/or saveAll() from spring data elasticsearch, when the index name is on purpose starting with logs-

What I would like to achieve please:

Using the methods save() and saveAll() from spring data elasticsearch, when on purpose, using this

@Document(indexName = "logs-onpurpose-foo")
public record Foo(@JsonProperty(value = "@timestamp") String timestamp, ...

I would expect the documents of type Foo to be saved/created as Data Stream (not just Index) in elasticsearch

Actual:

I am encountering this issue:

defined in @EnableReactiveElasticsearchRepositories declared on MyElasticConfiguration: Failed to instantiate [org.springframework.data.elasticsearch.repository.support.SimpleReactiveElasticsearchRepository]: Constructor threw exception
	at org.springframework.beans.factory.support.ConstructorResolver.createArgumentArray(ConstructorResolver.java:800)
	at org.springframework.beans.factory.support.ConstructorResolver.autowireConstructor(ConstructorResolver.java:245)
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.autowireConstructor(AbstractAutowireCapableBeanFactory.java:1352)
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1189)
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:560)
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:520)
	at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:326)
	at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234)
	at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:324)
	at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:200)
	at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:973)
	at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:941)
	at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:608)
	at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:733)
	at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:435)
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:311)
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1305)
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1294)
	at com.MyApplication.main(MyApplication.java:18)
Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'myElasticRepository' defined in com.repository.elastic.MyElasticRepository defined in @EnableReactiveElasticsearchRepositories declared on MyElasticConfiguration: Failed to instantiate [org.springframework.data.elasticsearch.repository.support.SimpleReactiveElasticsearchRepository]: Constructor threw exception
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1770)
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:598)
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:520)
	at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:326)
	at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234)
	at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:324)
	at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:200)
	at org.springframework.beans.factory.config.DependencyDescriptor.resolveCandidate(DependencyDescriptor.java:254)
	at org.springframework.beans.factory.support.DefaultListableBeanFactory.doResolveDependency(DefaultListableBeanFactory.java:1417)
	at org.springframework.beans.factory.support.DefaultListableBeanFactory.resolveDependency(DefaultListableBeanFactory.java:1337)
	at org.springframework.beans.factory.support.ConstructorResolver.resolveAutowiredArgument(ConstructorResolver.java:888)
	at org.springframework.beans.factory.support.ConstructorResolver.createArgumentArray(ConstructorResolver.java:791)
	... 18 common frames omitted
Caused by: org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.springframework.data.elasticsearch.repository.support.SimpleReactiveElasticsearchRepository]: Constructor threw exception
	at org.springframework.beans.BeanUtils.instantiateClass(BeanUtils.java:224)
	at org.springframework.data.repository.core.support.RepositoryFactorySupport.lambda$instantiateClass$5(RepositoryFactorySupport.java:571)
	at java.base/java.util.Optional.map(Optional.java:260)
	at org.springframework.data.repository.core.support.RepositoryFactorySupport.instantiateClass(RepositoryFactorySupport.java:571)
	at org.springframework.data.repository.core.support.RepositoryFactorySupport.getTargetRepositoryViaReflection(RepositoryFactorySupport.java:536)
	at org.springframework.data.elasticsearch.repository.support.ReactiveElasticsearchRepositoryFactory.getTargetRepository(ReactiveElasticsearchRepositoryFactory.java:94)
	at org.springframework.data.repository.core.support.RepositoryFactorySupport.getRepository(RepositoryFactorySupport.java:317)
	at org.springframework.data.repository.core.support.RepositoryFactoryBeanSupport.lambda$afterPropertiesSet$5(RepositoryFactoryBeanSupport.java:279)
	at org.springframework.data.util.Lazy.getNullable(Lazy.java:245)
	at org.springframework.data.util.Lazy.get(Lazy.java:114)
	at org.springframework.data.repository.core.support.RepositoryFactoryBeanSupport.afterPropertiesSet(RepositoryFactoryBeanSupport.java:285)
	at org.springframework.data.elasticsearch.repository.support.ReactiveElasticsearchRepositoryFactoryBean.afterPropertiesSet(ReactiveElasticsearchRepositoryFactoryBean.java:102)
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1816)
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1766)
	... 29 common frames omitted
Caused by: org.springframework.data.elasticsearch.UncategorizedElasticsearchException: [es/indices.create] failed: [illegal_argument_exception] cannot create index with name [logs-foo-one], because it matches with template [logs] that creates data streams only, use create data stream api instead
	at org.springframework.data.elasticsearch.client.elc.ElasticsearchExceptionTranslator.translateExceptionIfPossible(ElasticsearchExceptionTranslator.java:102)
	at org.springframework.data.elasticsearch.client.elc.ElasticsearchExceptionTranslator.translateException(ElasticsearchExceptionTranslator.java:63)
	at reactor.core.publisher.Flux.lambda$onErrorMap$27(Flux.java:7099)
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94)
	at reactor.core.publisher.MonoCompletionStage$MonoCompletionStageSubscription.apply(MonoCompletionStage.java:122)
	at reactor.core.publisher.MonoCompletionStage$MonoCompletionStageSubscription.apply(MonoCompletionStage.java:71)
	at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:934)
	at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:911)
	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
	at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2162)
	at co.elastic.clients.transport.rest_client.RestClientTransport$1.onSuccess(RestClientTransport.java:185)
	at org.elasticsearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:676)
	at org.elasticsearch.client.RestClient$1.completed(RestClient.java:399)
	at org.elasticsearch.client.RestClient$1.completed(RestClient.java:393)
	at org.apache.http.concurrent.BasicFuture.completed(BasicFuture.java:122)
	at org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:182)
	at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:448)
	at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:338)
	at org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)
	at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:87)
	at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:40)
	at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)
	at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
	at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
	at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
	at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
	at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
	at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591)
	at java.base/java.lang.Thread.run(Thread.java:833)
	Suppressed: java.lang.Exception: #block terminated with an error
		at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99)
		at reactor.core.publisher.Mono.block(Mono.java:1710)
		at org.springframework.data.elasticsearch.repository.support.SimpleReactiveElasticsearchRepository.createIndexAndMappingIfNeeded(SimpleReactiveElasticsearchRepository.java:68)
		at org.springframework.data.elasticsearch.repository.support.SimpleReactiveElasticsearchRepository.<init>(SimpleReactiveElasticsearchRepository.java:60)
		at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
		at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:77)
		at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
		at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:499)
		at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:480)
		at org.springframework.beans.BeanUtils.instantiateClass(BeanUtils.java:211)
		at org.springframework.data.repository.core.support.RepositoryFactorySupport.lambda$instantiateClass$5(RepositoryFactorySupport.java:571)
		at java.base/java.util.Optional.map(Optional.java:260)
		at org.springframework.data.repository.core.support.RepositoryFactorySupport.instantiateClass(RepositoryFactorySupport.java:571)
		at org.springframework.data.repository.core.support.RepositoryFactorySupport.getTargetRepositoryViaReflection(RepositoryFactorySupport.java:536)
		at org.springframework.data.elasticsearch.repository.support.ReactiveElasticsearchRepositoryFactory.getTargetRepository(ReactiveElasticsearchRepositoryFactory.java:94)
		at org.springframework.data.repository.core.support.RepositoryFactorySupport.getRepository(RepositoryFactorySupport.java:317)
		at org.springframework.data.repository.core.support.RepositoryFactoryBeanSupport.lambda$afterPropertiesSet$5(RepositoryFactoryBeanSupport.java:279)
		at org.springframework.data.util.Lazy.getNullable(Lazy.java:245)
		at org.springframework.data.util.Lazy.get(Lazy.java:114)
		at org.springframework.data.repository.core.support.RepositoryFactoryBeanSupport.afterPropertiesSet(RepositoryFactoryBeanSupport.java:285)
		at org.springframework.data.elasticsearch.repository.support.ReactiveElasticsearchRepositoryFactoryBean.afterPropertiesSet(ReactiveElasticsearchRepositoryFactoryBean.java:102)
		at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1816)
		at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1766)
		at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:598)
		at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:520)
		at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:326)
		at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234)
		at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:324)
		at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:200)
		at org.springframework.beans.factory.config.DependencyDescriptor.resolveCandidate(DependencyDescriptor.java:254)
		at org.springframework.beans.factory.support.DefaultListableBeanFactory.doResolveDependency(DefaultListableBeanFactory.java:1417)
		at org.springframework.beans.factory.support.DefaultListableBeanFactory.resolveDependency(DefaultListableBeanFactory.java:1337)
		at org.springframework.beans.factory.support.ConstructorResolver.resolveAutowiredArgument(ConstructorResolver.java:888)
		at org.springframework.beans.factory.support.ConstructorResolver.createArgumentArray(ConstructorResolver.java:791)
		at org.springframework.beans.factory.support.ConstructorResolver.autowireConstructor(ConstructorResolver.java:245)
		at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.autowireConstructor(AbstractAutowireCapableBeanFactory.java:1352)
		at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1189)
		at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:560)
		at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:520)
		at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:326)
		at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234)
		at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:324)
		at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:200)
		at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:973)
		at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:941)
		at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:608)
		at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:733)
		at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:435)
		at org.springframework.boot.SpringApplication.run(SpringApplication.java:311)
		at org.springframework.boot.SpringApplication.run(SpringApplication.java:1305)
		at org.springframework.boot.SpringApplication.run(SpringApplication.java:1294)
		at com.MyApplication.main(MyApplication.java:18)
Caused by: co.elastic.clients.elasticsearch._types.ElasticsearchException: [es/indices.create] failed: [illegal_argument_exception] cannot create index with name [logs-foo-one], because it matches with template [logs] that creates data streams only, use create data stream api instead
	at co.elastic.clients.transport.rest_client.RestClientTransport.getHighLevelResponse(RestClientTransport.java:334)
	at co.elastic.clients.transport.rest_client.RestClientTransport.access$200(RestClientTransport.java:70)
	at co.elastic.clients.transport.rest_client.RestClientTransport$1.onSuccess(RestClientTransport.java:181)
	... 18 common frames omitted

Process finished with exit code 1

And as a result, the document does not end up as Data Stream.

Justification:

It can be interesting for spring data elastic search to save data as data streams. From official elasticsearch documentation, data streams are well suited for metrics, and logs type of data. But also, for some specific time series data.

If nothing from above, having a custom LogsPojo object to be saved as a data stream should be beneficial.

Issue:

Would it be possible to help on this issue, to allow saving to the data stream (not index) logs-foo?

Thank you

@spring-projects-issues spring-projects-issues added the status: waiting-for-triage An issue we've not yet triaged label Jun 13, 2023
@sothawo
Copy link
Collaborator

sothawo commented Jun 15, 2023

Spring Data Elasticsearch does not support the creation of data streams. And using a name like <prefix>-* to define that a data stream should be created is not feasible, as these names are valid for normal Elasticsearch indices as well.

To add support for data streams, we'd need additional parameters in the @Document annotation and in addition to that, the bulk/save methods would probably need to be extended to handle the data stream case.

@sothawo sothawo added type: enhancement A general enhancement and removed status: waiting-for-triage An issue we've not yet triaged labels Jun 15, 2023
@patpatpat123
Copy link
Author

Thank you @sothawo for the clear explanation.

Agreed, some sort of @Document(indexName = "myindex", datastream = true) would be really nice.
I hope this enhancement request will see the light one day.
Good day

@patpatpat123
Copy link
Author

Update two months after:

It is confirmed Spring data elasticsearch cannot create index of type data stream.

I did another test. Even if the data stream is created beforehand, meaning, the data stream is created prior to any spring data elasticsearch interaction, via the Index Management WebUI, or via curl, etc , spring data elasticsearch still cannot write into a data stream created (even if the data stream is already there)

reactor.core.publisher.Operators : Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: org.springframework.data.elasticsearch.BulkFailureException: Bulk operation has failures. Use ElasticsearchException.getFailedDocuments() for detailed messages [{null=only write ops with an op_type of create are allowed in data streams}]
Caused by: org.springframework.data.elasticsearch.BulkFailureException: Bulk operation has failures. Use ElasticsearchException.getFailedDocuments() for detailed messages [{null=only write ops with an op_type of create are allowed in data streams}]
	at org.springframework.data.elasticsearch.client.elc.ReactiveElasticsearchTemplate.checkForBulkOperationFailure(ReactiveElasticsearchTemplate.java:261)
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:132)
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
	at reactor.core.publisher.MonoCompletionStage$MonoCompletionStageSubscription.apply(MonoCompletionStage.java:125)
	at reactor.core.publisher.MonoCompletionStage$MonoCompletionStageSubscription.apply(MonoCompletionStage.java:71)
	at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:934)
	at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:911)
	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
	at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
	at co.elastic.clients.transport.rest_client.RestClientTransport$1.onSuccess(RestClientTransport.java:183)
	at org.elasticsearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:676)
	at org.elasticsearch.client.RestClient$1.completed(RestClient.java:399)
	at org.elasticsearch.client.RestClient$1.completed(RestClient.java:393)
	at org.apache.http.concurrent.BasicFuture.completed(BasicFuture.java:122)
	at org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:182)
	at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:448)
	at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:338)
	at org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)
	at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:87)
	at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:40)
	at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)
	at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
	at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
	at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
	at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
	at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)

Just wanted to add this observation to this enhancement request.

Thank you

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type: enhancement A general enhancement
Projects
None yet
Development

No branches or pull requests

3 participants