Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Asynchronous write fails when writing data to multiple buckets #202

Closed
dmajic opened this issue Mar 1, 2021 · 22 comments · Fixed by #358
Closed

Asynchronous write fails when writing data to multiple buckets #202

dmajic opened this issue Mar 1, 2021 · 22 comments · Fixed by #358
Labels
bug Something isn't working
Milestone

Comments

@dmajic
Copy link

dmajic commented Mar 1, 2021

Hi all,

I have following problem when sending data points to InfluxDB using asynchronous WriteApi.
The problem occurs only when processing data points that should be sent to different InfluxDB buckets
in parallel. In that case, POST request to InfluxDB is never sent and client doesn't generate any
error (WriteErrorEvent, WriteRetriableErrorEvent, ...). The only way how we detect that data is not sent
to InfluxDB is the fact that WriteSuccessEvent is never published.

Our application receives list of events using REST API, processes these events (processing is simple and fast) and transfers each event to InfluxDB point.

Data points are relatively simple:

  • max 5 tags
  • 1 field

The problem occurs when we're processing multiple requests at same time:

  • Request A - has more than ~500 points, sending points to bucket A
  • Request B - has more than ~500 points, sending points to bucket B

Processing of both requests starts at near same time (HTTP request is received at near same time) and we process all points in parallel (multiple threads).
We use following method for sending data point to InfluxDB (we're sending point by point):

  • writeApi.writePoint(bucket, organisation, point);

We use default WriteOptions (batch size = 1000, max number of retries = 3, ...)

Steps to reproduce:

  1. Create at least two buckets in InfluxDB
  2. Create Java application
  • Thread A will process request A (containing > 500 data points that need to be sent to bucket A)
  • Thread B will process request B (containing > 500 data points that need to be sent to bucket B)
  • Threads A and B will start at near same time
  • Both threads will process data point by starting new Thread for processing each point (for test, there can be thread pool with 1000 threads for processing events)
  • Each "event processing thread" will block until it receives WriteSuccessEvent - we have to be sure that point is written to InfluxDB

Also, if we decrease number of threads that process events in parallel to 50, then everything works as expected. When we have more than 100 threads, processing is stuck.

Expected behavior:
All created points written to InfluxDB

Actual behavior:
Points are not written to InfluxDB. POST request for writing points to InfluxDB is not created. Since POST request is never generated (like batch is stuck), there is no Event published that could tell if writing was successful or there was some error.

Specifications:

  • Client Version: 1.15
  • InfluxDB Version: 2.0.2
  • JDK Version: openjdk 11.0.9
  • Platform: CentOS 8

Please let me know if you need any additional information.

Thank you and best regards,
Domagoj

@bednar bednar added the bug Something isn't working label Mar 1, 2021
@bednar
Copy link
Contributor

bednar commented Mar 1, 2021

@dmajic, thanks for using our client and your description, we will take a look.

@bednar
Copy link
Contributor

bednar commented Mar 2, 2021

Hi @dmajic,

Just for clarification...

The "event processing thread" calls writeApi.writePoint(point, ... and also is hooked to writeApi.listenEvents(WriteSuccessEvent.class, .... Is it true?

Regards

@dmajic
Copy link
Author

dmajic commented Mar 2, 2021

Hi @dmajic,

Just for clarification...

The "event processing thread" calls writeApi.writePoint(point, ... and also is hooked to writeApi.listenEvents(WriteSuccessEvent.class, .... Is it true?

Regards

Hi @bednar ,

correct. All threads that call writePoint method are "hooked to WriteSuccessEvent".

BR,
Domagoj

@bednar bednar removed the bug Something isn't working label Apr 7, 2021
@dmajic
Copy link
Author

dmajic commented Apr 26, 2021

Hi @bednar ,

could you please send update regarding this issue? Have you managed to reproduce described behaviour?
I see that you've removed label bug.

Thank you and best regards,
Domagoj

@gemron
Copy link
Contributor

gemron commented Feb 8, 2022

@bednar I found the same problem. When multiple threads write batches of data to two buckets at the same time, the data cannot be written successfully. I try to use different writeApi instance for different buckets, and the writing can be successful at the same time。

@akash007ganga
Copy link

@bednar I found the same problem. I used singleton instance of WriteApi to write in two different buckets. Data is written properly for some indefinite period(sometime 7 hours, sometime 15 minutes etc.) then stops automatically. There was no error or success event called. If I restart my application(JVM) it starts writting again. Please let me know what is the way forward. I am using influxdb-client-java v 4.1.0.

@dometec
Copy link

dometec commented May 8, 2022

I think I have the same problem, with 6.0.0. But I also get an Exception, and it happens in just a minute:

Exception in thread "executor-thread-188" io.reactivex.rxjava3.exceptions.MissingBackpressureException: Unable to emit a new group (#3) due to lack of requests. Please make sure the downstream can always accept a new group as well as each group is consumed in order for the whole operator to be able to proceed. at io.reactivex.rxjava3.internal.operators.flowable.FlowableGroupBy$GroupBySubscriber.onNext(FlowableGroupBy.java:197) at io.reactivex.rxjava3.processors.UnicastProcessor.drainRegular(UnicastProcessor.java:309) at io.reactivex.rxjava3.processors.UnicastProcessor.drain(UnicastProcessor.java:384) at io.reactivex.rxjava3.processors.UnicastProcessor.onNext(UnicastProcessor.java:444) at io.reactivex.rxjava3.internal.operators.flowable.FlowableWindowBoundary$WindowBoundaryMainSubscriber.drain(FlowableWindowBoundary.java:227) at io.reactivex.rxjava3.internal.operators.flowable.FlowableWindowBoundary$WindowBoundaryMainSubscriber.onNext(FlowableWindowBoundary.java:104) at io.reactivex.rxjava3.internal.operators.flowable.FlowablePublishMulticast$MulticastProcessor.drain(FlowablePublishMulticast.java:401) at io.reactivex.rxjava3.internal.operators.flowable.FlowablePublishMulticast$MulticastProcessor.onNext(FlowablePublishMulticast.java:218) at io.reactivex.rxjava3.internal.operators.flowable.FlowableOnBackpressureBufferStrategy$OnBackpressureBufferStrategySubscriber.drain(FlowableOnBackpressureBufferStrategy.java:234) at io.reactivex.rxjava3.internal.operators.flowable.FlowableOnBackpressureBufferStrategy$OnBackpressureBufferStrategySubscriber.onNext(FlowableOnBackpressureBufferStrategy.java:145) at io.reactivex.rxjava3.processors.PublishProcessor$PublishSubscription.onNext(PublishProcessor.java:361) at io.reactivex.rxjava3.processors.PublishProcessor.onNext(PublishProcessor.java:243) at io.reactivex.rxjava3.internal.subscribers.LambdaSubscriber.onNext(LambdaSubscriber.java:65) at io.reactivex.rxjava3.internal.operators.flowable.FlowableMap$MapSubscriber.onNext(FlowableMap.java:69) at io.reactivex.rxjava3.internal.subscriptions.ScalarSubscription.request(ScalarSubscription.java:55) at io.reactivex.rxjava3.internal.subscribers.BasicFuseableSubscriber.request(BasicFuseableSubscriber.java:153) at io.reactivex.rxjava3.internal.subscribers.LambdaSubscriber.request(LambdaSubscriber.java:114) at io.reactivex.rxjava3.internal.operators.flowable.FlowableInternalHelper$RequestMax.accept(FlowableInternalHelper.java:217) at io.reactivex.rxjava3.internal.operators.flowable.FlowableInternalHelper$RequestMax.accept(FlowableInternalHelper.java:213) at io.reactivex.rxjava3.internal.subscribers.LambdaSubscriber.onSubscribe(LambdaSubscriber.java:52) at io.reactivex.rxjava3.internal.subscribers.BasicFuseableSubscriber.onSubscribe(BasicFuseableSubscriber.java:67) at io.reactivex.rxjava3.internal.operators.flowable.FlowableJust.subscribeActual(FlowableJust.java:34) at io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:15917) at io.reactivex.rxjava3.internal.operators.flowable.FlowableMap.subscribeActual(FlowableMap.java:38) at io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:15917) at io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:15808) at io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:15768) at com.influxdb.client.internal.AbstractWriteClient.write(AbstractWriteClient.java:243) at com.influxdb.client.internal.AbstractWriteClient.lambda$writePoints$14(AbstractWriteClient.java:226) at io.reactivex.rxjava3.internal.subscribers.LambdaSubscriber.onNext(LambdaSubscriber.java:65) at io.reactivex.rxjava3.internal.operators.flowable.FlowableMap$MapSubscriber.onNext(FlowableMap.java:69) at io.reactivex.rxjava3.internal.operators.flowable.FlowableFilter$FilterSubscriber.tryOnNext(FlowableFilter.java:75) at io.reactivex.rxjava3.internal.operators.flowable.FlowableFromIterable$IteratorConditionalSubscription.fastPath(FlowableFromIterable.java:321) at io.reactivex.rxjava3.internal.operators.flowable.FlowableFromIterable$BaseRangeSubscription.request(FlowableFromIterable.java:129) at io.reactivex.rxjava3.internal.subscribers.BasicFuseableSubscriber.request(BasicFuseableSubscriber.java:153) at io.reactivex.rxjava3.internal.subscribers.BasicFuseableSubscriber.request(BasicFuseableSubscriber.java:153) at io.reactivex.rxjava3.internal.subscribers.LambdaSubscriber.request(LambdaSubscriber.java:114) at io.reactivex.rxjava3.internal.operators.flowable.FlowableInternalHelper$RequestMax.accept(FlowableInternalHelper.java:217) at io.reactivex.rxjava3.internal.operators.flowable.FlowableInternalHelper$RequestMax.accept(FlowableInternalHelper.java:213) at io.reactivex.rxjava3.internal.subscribers.LambdaSubscriber.onSubscribe(LambdaSubscriber.java:52) at io.reactivex.rxjava3.internal.subscribers.BasicFuseableSubscriber.onSubscribe(BasicFuseableSubscriber.java:67) at io.reactivex.rxjava3.internal.subscribers.BasicFuseableSubscriber.onSubscribe(BasicFuseableSubscriber.java:67) at io.reactivex.rxjava3.internal.operators.flowable.FlowableFromIterable.subscribe(FlowableFromIterable.java:66) at io.reactivex.rxjava3.internal.operators.flowable.FlowableFromIterable.subscribeActual(FlowableFromIterable.java:47) at io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:15917) at io.reactivex.rxjava3.internal.operators.flowable.FlowableFilter.subscribeActual(FlowableFilter.java:38) at io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:15917) at io.reactivex.rxjava3.internal.operators.flowable.FlowableMap.subscribeActual(FlowableMap.java:38) at io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:15917) at io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:15808) at io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:15768) at com.influxdb.client.internal.AbstractWriteClient.writePoints(AbstractWriteClient.java:223) at com.influxdb.client.internal.WriteApiImpl.writePoints(WriteApiImpl.java:190) at com.influxdb.client.internal.WriteApiImpl.writePoint(WriteApiImpl.java:156) at com.influxdb.client.internal.WriteApiImpl.writePoint(WriteApiImpl.java:143) at metrics.ingestor.service.InfluxDb.ingest(InfluxDb.java:108)

@White-Lee
Copy link

I got the same problem with 6.0.0, and find a issues in RxJava #7100.

@bednar
Copy link
Contributor

bednar commented May 9, 2022

Hi @White-Lee,

how looks your code to ingesting data into InfluxDB? How many data your ingestor produces?

Regards

@White-Lee
Copy link

Hi bednar,

Test code like this. Hope this will help you.

        WriteApi writer = influxDBClient.makeWriteApi();
        new Thread(() -> {
            while (true) {
                List<Point> points = new ArrayList<>();
                for (int i = 0; i < 129; i++) {
                    points.add(Point.measurement("xxx")
                            .addField("id", i)
                            .addTag("tag" + i, i + "")
                            .time(System.currentTimeMillis(), WritePrecision.MS));
                }
                List<String> s = points.stream().map(Point::toLineProtocol).collect(Collectors.toList());
                writer.writeRecords(WritePrecision.MS, s);
                try {
                    sleep(805);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }).start();
        new Thread(() -> {
            while (true) {
                List<Point> points = new ArrayList<>();
                for (int i = 0; i < 129; i++) {
                    points.add(Point.measurement("xxx")
                            .addField("id", i)
                            .addTag("tag" + i, i + "")
                            .time(System.currentTimeMillis(), WritePrecision.MS));
                }
                List<String> s = points.stream().map(Point::toLineProtocol).collect(Collectors.toList());
                writer.writeRecords(WritePrecision.MS, s);
                try {
                    sleep(759);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }).start();

The problem appeared after the test program ran for 2 hours.

stack

ERROR com.influxdb.client.write.events.WriteErrorEvent.logEvent - The error occurred during writing of data
com.influxdb.exceptions.InfluxException: Unable to emit a new group (#2) due to lack of requests. Please make sure the downstream can always accept a new group as well as each group is consumed in order for the whole operator to be able to proceed.
	at com.influxdb.internal.AbstractRestClient.toInfluxException(AbstractRestClient.java:101)
	at com.influxdb.client.internal.AbstractWriteClient.lambda$new$12(AbstractWriteClient.java:184)
	at io.reactivex.rxjava3.internal.subscribers.LambdaSubscriber.onError(LambdaSubscriber.java:79)
	at io.reactivex.rxjava3.internal.operators.flowable.FlowableDoFinally$DoFinallySubscriber.onError(FlowableDoFinally.java:90)
	at io.reactivex.rxjava3.internal.util.AtomicThrowable.tryTerminateConsumer(AtomicThrowable.java:94)
	at io.reactivex.rxjava3.internal.operators.mixed.FlowableConcatMapMaybe$ConcatMapMaybeSubscriber.drain(FlowableConcatMapMaybe.java:178)
	at io.reactivex.rxjava3.internal.operators.mixed.ConcatMapXMainSubscriber.onError(ConcatMapXMainSubscriber.java:116)
	at io.reactivex.rxjava3.internal.util.AtomicThrowable.tryTerminateConsumer(AtomicThrowable.java:94)
	at io.reactivex.rxjava3.internal.operators.mixed.FlowableConcatMapSingle$ConcatMapSingleSubscriber.drain(FlowableConcatMapSingle.java:173)
	at io.reactivex.rxjava3.internal.operators.mixed.ConcatMapXMainSubscriber.onError(ConcatMapXMainSubscriber.java:116)
	at io.reactivex.rxjava3.internal.util.AtomicThrowable.tryTerminateConsumer(AtomicThrowable.java:94)
	at io.reactivex.rxjava3.internal.util.HalfSerializer.onError(HalfSerializer.java:68)
	at io.reactivex.rxjava3.internal.operators.flowable.FlowableConcatMap$ConcatMapImmediate.innerError(FlowableConcatMap.java:212)
	at io.reactivex.rxjava3.internal.operators.flowable.FlowableConcatMap$ConcatMapInner.onError(FlowableConcatMap.java:575)
	at io.reactivex.rxjava3.internal.operators.flowable.FlowableGroupBy$GroupBySubscriber.onError(FlowableGroupBy.java:218)
	at io.reactivex.rxjava3.internal.operators.flowable.FlowableGroupBy$GroupBySubscriber.onNext(FlowableGroupBy.java:197)
	at io.reactivex.rxjava3.processors.UnicastProcessor.drainRegular(UnicastProcessor.java:309)
	at io.reactivex.rxjava3.processors.UnicastProcessor.drain(UnicastProcessor.java:384)
	at io.reactivex.rxjava3.processors.UnicastProcessor.onNext(UnicastProcessor.java:444)
	at io.reactivex.rxjava3.internal.operators.flowable.FlowableWindowBoundary$WindowBoundaryMainSubscriber.drain(FlowableWindowBoundary.java:227)
	at io.reactivex.rxjava3.internal.operators.flowable.FlowableWindowBoundary$WindowBoundaryMainSubscriber.onNext(FlowableWindowBoundary.java:104)
	at io.reactivex.rxjava3.internal.operators.flowable.FlowablePublishMulticast$MulticastProcessor.drain(FlowablePublishMulticast.java:401)
	at io.reactivex.rxjava3.internal.operators.flowable.FlowablePublishMulticast$MulticastProcessor.onNext(FlowablePublishMulticast.java:218)
	at io.reactivex.rxjava3.internal.operators.flowable.FlowableOnBackpressureBufferStrategy$OnBackpressureBufferStrategySubscriber.drain(FlowableOnBackpressureBufferStrategy.java:234)
	at io.reactivex.rxjava3.internal.operators.flowable.FlowableOnBackpressureBufferStrategy$OnBackpressureBufferStrategySubscriber.onNext(FlowableOnBackpressureBufferStrategy.java:145)
	at io.reactivex.rxjava3.processors.PublishProcessor$PublishSubscription.onNext(PublishProcessor.java:361)
	at io.reactivex.rxjava3.processors.PublishProcessor.onNext(PublishProcessor.java:243)
	at io.reactivex.rxjava3.internal.subscribers.LambdaSubscriber.onNext(LambdaSubscriber.java:65)
	at io.reactivex.rxjava3.internal.operators.flowable.FlowableMap$MapSubscriber.onNext(FlowableMap.java:69)
	at io.reactivex.rxjava3.internal.operators.flowable.FlowableMap$MapSubscriber.onNext(FlowableMap.java:69)
	at io.reactivex.rxjava3.internal.operators.flowable.FlowableFromIterable$IteratorSubscription.fastPath(FlowableFromIterable.java:185)
	at io.reactivex.rxjava3.internal.operators.flowable.FlowableFromIterable$BaseRangeSubscription.request(FlowableFromIterable.java:129)
	at io.reactivex.rxjava3.internal.subscribers.BasicFuseableSubscriber.request(BasicFuseableSubscriber.java:153)
	at io.reactivex.rxjava3.internal.subscribers.BasicFuseableSubscriber.request(BasicFuseableSubscriber.java:153)
	at io.reactivex.rxjava3.internal.subscribers.LambdaSubscriber.request(LambdaSubscriber.java:114)
	at io.reactivex.rxjava3.internal.operators.flowable.FlowableInternalHelper$RequestMax.accept(FlowableInternalHelper.java:217)
	at io.reactivex.rxjava3.internal.operators.flowable.FlowableInternalHelper$RequestMax.accept(FlowableInternalHelper.java:213)
	at io.reactivex.rxjava3.internal.subscribers.LambdaSubscriber.onSubscribe(LambdaSubscriber.java:52)
	at io.reactivex.rxjava3.internal.subscribers.BasicFuseableSubscriber.onSubscribe(BasicFuseableSubscriber.java:67)
	at io.reactivex.rxjava3.internal.subscribers.BasicFuseableSubscriber.onSubscribe(BasicFuseableSubscriber.java:67)
	at io.reactivex.rxjava3.internal.operators.flowable.FlowableFromIterable.subscribe(FlowableFromIterable.java:69)
	at io.reactivex.rxjava3.internal.operators.flowable.FlowableFromIterable.subscribeActual(FlowableFromIterable.java:47)
	at io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:15917)
	at io.reactivex.rxjava3.internal.operators.flowable.FlowableMap.subscribeActual(FlowableMap.java:38)
	at io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:15917)
	at io.reactivex.rxjava3.internal.operators.flowable.FlowableMap.subscribeActual(FlowableMap.java:38)
	at io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:15917)
	at io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:15808)
	at io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:15768)

Specifications:

Client Version: 6.0.0
InfluxDB Version: 2.1.1
JDK Version: openjdk 1.8.0_271
Platform: windows 10, CentOS 7.4

@linghengqian
Copy link

linghengqian commented May 14, 2022

  • If something like Caused by: io.reactivex.rxjava3.exceptions.MissingBackpressureException: Unable to emit a new group (#1) due to lack of requests. Please make sure the downstream can always accept a new group as well as each group is consumed in order for the whole operator to be able to proceed. needs to be reproduced, it seems that it only needs to be reproduced by writing large batches of data to a single bucket by calling the same WriteApi instance in multiple threads (for example, through JDK8 parallelStream ).

  • I'm trying to get around this issue with the WriteApiBlocking and it does seem to be related to https://github.com/ReactiveX/RxJava/wiki/What%27s-different-in-3.0/080da79549b91abbd5af1c58cfe984e471454dcf#groupby-backpressure-example ?

  • So I take a wild guess that this problem can be solved by changing the .flatMap(v -> v, 1) somewhere to a code block like .flatMap(v -> v, Integer.MAX_VALUE). Even feat: update libraries, migrate to RxJava3 #298 mentions that unit tests used to have such a bug.

  • Specifications, Client Version: 6.0.0, InfluxDB Version: 2.2.0, JDK Version: 17.0.3, Platform: Docker

@bednar
Copy link
Contributor

bednar commented May 18, 2022

@linghengqian, @White-Lee Thank you for the detailed information. We will take a look ASAP.

@robertjgtoth
Copy link

robertjgtoth commented May 31, 2022

I can reproduce this same issue with 6.0.1 by simply stopping the remote instance of InfluxDB while writes are ongoing. I have a simple Java application running in docker that periodically writes data to a single bucket in a single thread. If I stop the InfluxDB docker container while this application is running, I see the same issue. The write API does not recover if the InfluxDB container is restarted.

Client: 6.0.1, InfluxDB: 2.1, JDK: 17.0.2, Platform: Docker

Relevant log output:

20:55:02.006 [RxNewThreadScheduler-1] INFO  com.mycompany.MyApplication - Successfully wrote 80 points to influx.
20:55:03.020 [RxNewThreadScheduler-1] INFO  com.mycompany.MyApplication - Successfully wrote 80 points to influx.
May 31, 2022 8:55:04 PM com.influxdb.client.write.events.WriteRetriableErrorEvent logEvent
WARNING: The retriable error occurred during writing of data. Reason: 'unexpected end of stream on http://influxdb:8086/...'. Retry in: 9.777s.
May 31, 2022 8:55:07 PM com.influxdb.client.write.events.WriteErrorEvent logEvent
SEVERE: The error occurred during writing of data
com.influxdb.exceptions.InfluxException: Unable to emit a new group (#2) due to lack of requests. Please make sure the downstream can always accept a new group as well as each group is consumed in order for the whole operator to be able to proceed.
        at com.influxdb.internal.AbstractRestClient.toInfluxException(AbstractRestClient.java:101)
        at com.influxdb.client.internal.AbstractWriteClient.lambda$new$12(AbstractWriteClient.java:184)
        at io.reactivex.rxjava3.internal.subscribers.LambdaSubscriber.onError(LambdaSubscriber.java:79)
        at io.reactivex.rxjava3.internal.operators.flowable.FlowableDoFinally$DoFinallySubscriber.onError(FlowableDoFinally.java:90)
        at io.reactivex.rxjava3.internal.util.AtomicThrowable.tryTerminateConsumer(AtomicThrowable.java:94)
        at io.reactivex.rxjava3.internal.operators.mixed.FlowableConcatMapMaybe$ConcatMapMaybeSubscriber.drain(FlowableConcatMapMaybe.java:178)
        at io.reactivex.rxjava3.internal.operators.mixed.ConcatMapXMainSubscriber.onError(ConcatMapXMainSubscriber.java:116)
        at io.reactivex.rxjava3.internal.util.AtomicThrowable.tryTerminateConsumer(AtomicThrowable.java:94)
        at io.reactivex.rxjava3.internal.operators.mixed.FlowableConcatMapSingle$ConcatMapSingleSubscriber.drain(FlowableConcatMapSingle.java:173)
        at io.reactivex.rxjava3.internal.operators.mixed.ConcatMapXMainSubscriber.onError(ConcatMapXMainSubscriber.java:116)
        at io.reactivex.rxjava3.internal.util.AtomicThrowable.tryTerminateConsumer(AtomicThrowable.java:94)
        at io.reactivex.rxjava3.internal.util.HalfSerializer.onError(HalfSerializer.java:68)
        at io.reactivex.rxjava3.internal.operators.flowable.FlowableConcatMap$ConcatMapImmediate.innerError(FlowableConcatMap.java:212)
        at io.reactivex.rxjava3.internal.operators.flowable.FlowableConcatMap$ConcatMapInner.onError(FlowableConcatMap.java:575)
        at io.reactivex.rxjava3.internal.operators.flowable.FlowableGroupBy$GroupBySubscriber.onError(FlowableGroupBy.java:218)
        at io.reactivex.rxjava3.internal.operators.flowable.FlowableGroupBy$GroupBySubscriber.onNext(FlowableGroupBy.java:197)
        at io.reactivex.rxjava3.processors.UnicastProcessor.drainRegular(UnicastProcessor.java:309)
        at io.reactivex.rxjava3.processors.UnicastProcessor.drain(UnicastProcessor.java:384)
        at io.reactivex.rxjava3.processors.UnicastProcessor.onNext(UnicastProcessor.java:444)
        at io.reactivex.rxjava3.internal.operators.flowable.FlowableWindowBoundary$WindowBoundaryMainSubscriber.drain(FlowableWindowBoundary.java:227)
        at io.reactivex.rxjava3.internal.operators.flowable.FlowableWindowBoundary$WindowBoundaryMainSubscriber.onNext(FlowableWindowBoundary.java:104)
        at io.reactivex.rxjava3.internal.operators.flowable.FlowablePublishMulticast$MulticastProcessor.drain(FlowablePublishMulticast.java:401)
        at io.reactivex.rxjava3.internal.operators.flowable.FlowablePublishMulticast$MulticastProcessor.onNext(FlowablePublishMulticast.java:218)
        at io.reactivex.rxjava3.internal.operators.flowable.FlowableOnBackpressureBufferStrategy$OnBackpressureBufferStrategySubscriber.drain(FlowableOnBackpressureBufferStrategy.java:234)
        at io.reactivex.rxjava3.internal.operators.flowable.FlowableOnBackpressureBufferStrategy$OnBackpressureBufferStrategySubscriber.onNext(FlowableOnBackpressureBufferStrategy.java:145)
        at io.reactivex.rxjava3.processors.PublishProcessor$PublishSubscription.onNext(PublishProcessor.java:361)
        at io.reactivex.rxjava3.processors.PublishProcessor.onNext(PublishProcessor.java:243)
        at io.reactivex.rxjava3.internal.subscribers.LambdaSubscriber.onNext(LambdaSubscriber.java:65)
        at io.reactivex.rxjava3.internal.operators.flowable.FlowableMap$MapSubscriber.onNext(FlowableMap.java:69)
        at io.reactivex.rxjava3.internal.subscriptions.ScalarSubscription.request(ScalarSubscription.java:55)
        at io.reactivex.rxjava3.internal.subscribers.BasicFuseableSubscriber.request(BasicFuseableSubscriber.java:153)
        at io.reactivex.rxjava3.internal.subscribers.LambdaSubscriber.request(LambdaSubscriber.java:114)
        at io.reactivex.rxjava3.internal.operators.flowable.FlowableInternalHelper$RequestMax.accept(FlowableInternalHelper.java:217)
        at io.reactivex.rxjava3.internal.operators.flowable.FlowableInternalHelper$RequestMax.accept(FlowableInternalHelper.java:213)
        at io.reactivex.rxjava3.internal.subscribers.LambdaSubscriber.onSubscribe(LambdaSubscriber.java:52)
        at io.reactivex.rxjava3.internal.subscribers.BasicFuseableSubscriber.onSubscribe(BasicFuseableSubscriber.java:67)
        at io.reactivex.rxjava3.internal.operators.flowable.FlowableJust.subscribeActual(FlowableJust.java:34)
        at io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:15917)
        at io.reactivex.rxjava3.internal.operators.flowable.FlowableMap.subscribeActual(FlowableMap.java:38)
        at io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:15917)
        at io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:15808)
        at io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:15768)
        at io.reactivex.rxjava3.internal.operators.flowable.FlowableInternalHelper$RequestMax.accept(FlowableInternalHelper.java:213)
        at io.reactivex.rxjava3.internal.subscribers.LambdaSubscriber.onSubscribe(LambdaSubscriber.java:52)
        at io.reactivex.rxjava3.internal.subscribers.BasicFuseableSubscriber.onSubscribe(BasicFuseableSubscriber.java:67)
        at io.reactivex.rxjava3.internal.subscribers.BasicFuseableSubscriber.onSubscribe(BasicFuseableSubscriber.java:67)
        at io.reactivex.rxjava3.internal.operators.flowable.FlowableFromIterable.subscribe(FlowableFromIterable.java:66)
        at io.reactivex.rxjava3.internal.operators.flowable.FlowableFromIterable.subscribeActual(FlowableFromIterable.java:47)
        at io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:15917)
        at io.reactivex.rxjava3.internal.operators.flowable.FlowableFilter.subscribeActual(FlowableFilter.java:38)
        at io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:15917)
        at io.reactivex.rxjava3.internal.operators.flowable.FlowableMap.subscribeActual(FlowableMap.java:38)
        at io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:15917)
        at io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:15808)
        at io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:15768)
        at com.influxdb.client.internal.AbstractWriteClient.writePoints(AbstractWriteClient.java:223)
        at com.influxdb.client.internal.WriteApiImpl.writePoints(WriteApiImpl.java:190)
        at com.influxdb.client.internal.WriteApiImpl.writePoint(WriteApiImpl.java:156)
        at com.influxdb.client.internal.WriteApiImpl.writePoint(WriteApiImpl.java:143)
        at com.influxdb.client.internal.WriteApiImpl.writePoint(WriteApiImpl.java:135)
        at com.mycompany.MyApplication.writePointAsync(MyApplication.java:204)
        at com.mycompany.MyApplication.lambda$convertAndQueue$4(MyApplication.java:180)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: io.reactivex.rxjava3.exceptions.MissingBackpressureException: Unable to emit a new group (#2) due to lack of requests. Please make sure the downstream can always accept a new group as well as each group is consumed in order for the whole operator to be able to proceed.
        ... 63 more

@bednar
Copy link
Contributor

bednar commented Jun 1, 2022

Hi All,

I am currently working on this issue and I hope so that I found the solution. I've prepared a fixed version of the client in the #358. The development version is deployed into snapshot repository with version 6.2.0.groupBy-SNAPSHOT.

Any feedback will be appreciate 👂

Regards

@robertjgtoth
Copy link

robertjgtoth commented Jun 1, 2022

@bednar This fixes my issue (stopping influx server while writes are ongoing). The write API successfully recovers and resumes writing after the influx server is restarted.

Thank you!

@White-Lee
Copy link

Hi bednar,
It seems that my problem has been solved, and the test case has not reported any errors. Thanks for your work.

@bednar
Copy link
Contributor

bednar commented Jun 2, 2022

@robertjgtoth, @White-Lee thanks for your feedback 👍

We have to implement additional test cases before we will be ready merge it in master branch... stay tuned.

@linghengqian
Copy link

I tested 6.2.0.groupBy-SNAPSHOT with a dataset of 800 million pieces of data, which solves my problem above about multithreaded insertion, I am very much looking forward to the next version of release, thanks for your work!

@xiegeng
Copy link

xiegeng commented Jun 17, 2022

I have same problem, my influxdb client lost data after startup and write data for 2 hours, and report "Unable to emit a new group (#2) due to lack of requests".

When I using 6.2.0.groupBy-SNAPSHOT, this issue still exists. please check if anything wrong.

Client: 6.2.0.groupBy-SNAPSHOT,
InfluxDB: 2.2,
JDK: 18.0.301,
Platform: Windows

pom.xml, code and output log is attached;

thanks.

related log:

2022-06-17 11:32:18.632 INFO [RxNewThreadScheduler-2] okhttp3.OkHttpClient : <-- 204 No Content http://192.168.1.80:8086/api/v2/write?org=zcs-org&bucket=zcs-xiaojihan&precision=ms (1063ms, 0-byte body)
2022-06-17 11:32:18.633 ERROR[RxNewThreadScheduler-2] com.influxdb.client.write.events.WriteErrorEvent : The error occurred during writing of data
com.influxdb.exceptions.InfluxException: Unable to emit a new group (#2) due to lack of requests. Please make sure the downstream can always accept a new group as well as each group is consumed in order for the whole operator to be able to proceed.
	at com.influxdb.internal.AbstractRestClient.toInfluxException(AbstractRestClient.java:101) ~[influxdb-client-core-6.2.0.groupBy-20220603.093958-4.jar:6.2.0.groupBy-SNAPSHOT]
	at com.influxdb.client.internal.AbstractWriteClient.lambda$new$12(AbstractWriteClient.java:189) ~[influxdb-client-java-6.2.0.groupBy-20220603.093958-4.jar:6.2.0.groupBy-SNAPSHOT]
	at io.reactivex.rxjava3.internal.subscribers.LambdaSubscriber.onError(LambdaSubscriber.java:79) ~[rxjava-3.1.4.jar:?]
	at io.reactivex.rxjava3.internal.operators.flowable.FlowableDoFinally$DoFinallySubscriber.onError(FlowableDoFinally.java:90) ~[rxjava-3.1.4.jar:?]
	at io.reactivex.rxjava3.internal.util.AtomicThrowable.tryTerminateConsumer(AtomicThrowable.java:94) ~[rxjava-3.1.4.jar:?]
	at io.reactivex.rxjava3.internal.operators.mixed.FlowableConcatMapMaybe$ConcatMapMaybeSubscriber.drain(FlowableConcatMapMaybe.java:178) ~[rxjava-3.1.4.jar:?]
	at io.reactivex.rxjava3.internal.operators.mixed.ConcatMapXMainSubscriber.onError(ConcatMapXMainSubscriber.java:116) ~[rxjava-3.1.4.jar:?]
	at io.reactivex.rxjava3.internal.util.AtomicThrowable.tryTerminateConsumer(AtomicThrowable.java:94) ~[rxjava-3.1.4.jar:?]
	at io.reactivex.rxjava3.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.checkTerminate(FlowableFlatMap.java:540) ~[rxjava-3.1.4.jar:?]
	at io.reactivex.rxjava3.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.drainLoop(FlowableFlatMap.java:365) ~[rxjava-3.1.4.jar:?]
	at io.reactivex.rxjava3.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.drain(FlowableFlatMap.java:357) ~[rxjava-3.1.4.jar:?]
	at io.reactivex.rxjava3.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.onError(FlowableFlatMap.java:318) ~[rxjava-3.1.4.jar:?]
	at io.reactivex.rxjava3.internal.util.AtomicThrowable.tryTerminateConsumer(AtomicThrowable.java:94) ~[rxjava-3.1.4.jar:?]
	at io.reactivex.rxjava3.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.checkTerminate(FlowableFlatMap.java:540) ~[rxjava-3.1.4.jar:?]
	at io.reactivex.rxjava3.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.drainLoop(FlowableFlatMap.java:455) ~[rxjava-3.1.4.jar:?]
	at io.reactivex.rxjava3.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.tryEmit(FlowableFlatMap.java:301) ~[rxjava-3.1.4.jar:?]
	at io.reactivex.rxjava3.internal.operators.flowable.FlowableFlatMap$InnerSubscriber.onNext(FlowableFlatMap.java:627) ~[rxjava-3.1.4.jar:?]
	at io.reactivex.rxjava3.internal.operators.flowable.FlowableSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FlowableSwitchIfEmpty.java:59) ~[rxjava-3.1.4.jar:?]
	at io.reactivex.rxjava3.internal.operators.flowable.FlowableMap$MapSubscriber.onNext(FlowableMap.java:69) ~[rxjava-3.1.4.jar:?]
	at io.reactivex.rxjava3.internal.operators.flowable.FlowableTake$TakeSubscriber.onNext(FlowableTake.java:74) ~[rxjava-3.1.4.jar:?]
	at io.reactivex.rxjava3.internal.operators.flowable.FlowableTimer$TimerSubscriber.run(FlowableTimer.java:76) ~[rxjava-3.1.4.jar:?]
	at io.reactivex.rxjava3.core.Scheduler$DisposeTask.run(Scheduler.java:644) ~[rxjava-3.1.4.jar:?]
	at io.reactivex.rxjava3.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:65) ~[rxjava-3.1.4.jar:?]
	at io.reactivex.rxjava3.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:56) ~[rxjava-3.1.4.jar:?]
	at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) ~[?:1.8.0_301]
	at java.util.concurrent.FutureTask.run(FutureTask.java) ~[?:1.8.0_301]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) ~[?:1.8.0_301]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) ~[?:1.8.0_301]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_301]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_301]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_301]
Caused by: io.reactivex.rxjava3.exceptions.MissingBackpressureException: Unable to emit a new group (#2) due to lack of requests. Please make sure the downstream can always accept a new group as well as each group is consumed in order for the whole operator to be able to proceed.
	at io.reactivex.rxjava3.internal.operators.flowable.FlowableGroupBy$GroupBySubscriber.onNext(FlowableGroupBy.java:197) ~[rxjava-3.1.4.jar:?]
	at io.reactivex.rxjava3.processors.UnicastProcessor.drainRegular(UnicastProcessor.java:309) ~[rxjava-3.1.4.jar:?]
	at io.reactivex.rxjava3.processors.UnicastProcessor.drain(UnicastProcessor.java:384) ~[rxjava-3.1.4.jar:?]
	at io.reactivex.rxjava3.processors.UnicastProcessor.onNext(UnicastProcessor.java:444) ~[rxjava-3.1.4.jar:?]
	at io.reactivex.rxjava3.internal.operators.flowable.FlowableWindowBoundary$WindowBoundaryMainSubscriber.drain(FlowableWindowBoundary.java:227) ~[rxjava-3.1.4.jar:?]
	at io.reactivex.rxjava3.internal.operators.flowable.FlowableWindowBoundary$WindowBoundaryMainSubscriber.onNext(FlowableWindowBoundary.java:104) ~[rxjava-3.1.4.jar:?]
	at io.reactivex.rxjava3.internal.operators.flowable.FlowablePublishMulticast$MulticastProcessor.drain(FlowablePublishMulticast.java:401) ~[rxjava-3.1.4.jar:?]
	at io.reactivex.rxjava3.internal.operators.flowable.FlowablePublishMulticast$MulticastProcessor.onNext(FlowablePublishMulticast.java:218) ~[rxjava-3.1.4.jar:?]
	at io.reactivex.rxjava3.internal.operators.flowable.FlowableOnBackpressureBufferStrategy$OnBackpressureBufferStrategySubscriber.drain(FlowableOnBackpressureBufferStrategy.java:234) ~[rxjava-3.1.4.jar:?]
	at io.reactivex.rxjava3.internal.operators.flowable.FlowableOnBackpressureBufferStrategy$OnBackpressureBufferStrategySubscriber.onNext(FlowableOnBackpressureBufferStrategy.java:145) ~[rxjava-3.1.4.jar:?]
	at io.reactivex.rxjava3.processors.PublishProcessor$PublishSubscription.onNext(PublishProcessor.java:361) ~[rxjava-3.1.4.jar:?]
	at io.reactivex.rxjava3.processors.PublishProcessor.onNext(PublishProcessor.java:243) ~[rxjava-3.1.4.jar:?]
	at io.reactivex.rxjava3.internal.subscribers.LambdaSubscriber.onNext(LambdaSubscriber.java:65) ~[rxjava-3.1.4.jar:?]
	at io.reactivex.rxjava3.internal.operators.flowable.FlowableMap$MapSubscriber.onNext(FlowableMap.java:69) ~[rxjava-3.1.4.jar:?]
	at io.reactivex.rxjava3.internal.subscriptions.ScalarSubscription.request(ScalarSubscription.java:55) ~[rxjava-3.1.4.jar:?]
	at io.reactivex.rxjava3.internal.subscribers.BasicFuseableSubscriber.request(BasicFuseableSubscriber.java:153) ~[rxjava-3.1.4.jar:?]
	at io.reactivex.rxjava3.internal.subscribers.LambdaSubscriber.request(LambdaSubscriber.java:114) ~[rxjava-3.1.4.jar:?]
	at io.reactivex.rxjava3.internal.operators.flowable.FlowableInternalHelper$RequestMax.accept(FlowableInternalHelper.java:217) ~[rxjava-3.1.4.jar:?]
	at io.reactivex.rxjava3.internal.operators.flowable.FlowableInternalHelper$RequestMax.accept(FlowableInternalHelper.java:213) ~[rxjava-3.1.4.jar:?]
	at io.reactivex.rxjava3.internal.subscribers.LambdaSubscriber.onSubscribe(LambdaSubscriber.java:52) ~[rxjava-3.1.4.jar:?]
	at io.reactivex.rxjava3.internal.subscribers.BasicFuseableSubscriber.onSubscribe(BasicFuseableSubscriber.java:67) ~[rxjava-3.1.4.jar:?]
	at io.reactivex.rxjava3.internal.operators.flowable.FlowableJust.subscribeActual(FlowableJust.java:34) ~[rxjava-3.1.4.jar:?]
	at io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:15917) ~[rxjava-3.1.4.jar:?]
	at io.reactivex.rxjava3.internal.operators.flowable.FlowableMap.subscribeActual(FlowableMap.java:38) ~[rxjava-3.1.4.jar:?]
	at io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:15917) ~[rxjava-3.1.4.jar:?]
	at io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:15808) ~[rxjava-3.1.4.jar:?]
	at io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:15768) ~[rxjava-3.1.4.jar:?]
	at com.influxdb.client.internal.AbstractWriteClient.write(AbstractWriteClient.java:248) ~[influxdb-client-java-6.2.0.groupBy-20220603.093958-4.jar:6.2.0.groupBy-SNAPSHOT]
	at com.influxdb.client.internal.AbstractWriteClient.lambda$writePoints$14(AbstractWriteClient.java:231) ~[influxdb-client-java-6.2.0.groupBy-20220603.093958-4.jar:6.2.0.groupBy-SNAPSHOT]
	at io.reactivex.rxjava3.internal.subscribers.LambdaSubscriber.onNext(LambdaSubscriber.java:65) ~[rxjava-3.1.4.jar:?]
	at io.reactivex.rxjava3.internal.operators.flowable.FlowableMap$MapSubscriber.onNext(FlowableMap.java:69) ~[rxjava-3.1.4.jar:?]
	at io.reactivex.rxjava3.internal.operators.flowable.FlowableFilter$FilterSubscriber.tryOnNext(FlowableFilter.java:75) ~[rxjava-3.1.4.jar:?]
	at io.reactivex.rxjava3.internal.operators.flowable.FlowableFromIterable$IteratorConditionalSubscription.fastPath(FlowableFromIterable.java:321) ~[rxjava-3.1.4.jar:?]
	at io.reactivex.rxjava3.internal.operators.flowable.FlowableFromIterable$BaseRangeSubscription.request(FlowableFromIterable.java:129) ~[rxjava-3.1.4.jar:?]
	at io.reactivex.rxjava3.internal.subscribers.BasicFuseableSubscriber.request(BasicFuseableSubscriber.java:153) ~[rxjava-3.1.4.jar:?]
	at io.reactivex.rxjava3.internal.subscribers.BasicFuseableSubscriber.request(BasicFuseableSubscriber.java:153) ~[rxjava-3.1.4.jar:?]
	at io.reactivex.rxjava3.internal.subscribers.LambdaSubscriber.request(LambdaSubscriber.java:114) ~[rxjava-3.1.4.jar:?]
	at io.reactivex.rxjava3.internal.operators.flowable.FlowableInternalHelper$RequestMax.accept(FlowableInternalHelper.java:217) ~[rxjava-3.1.4.jar:?]
	at io.reactivex.rxjava3.internal.operators.flowable.FlowableInternalHelper$RequestMax.accept(FlowableInternalHelper.java:213) ~[rxjava-3.1.4.jar:?]
	at io.reactivex.rxjava3.internal.subscribers.LambdaSubscriber.onSubscribe(LambdaSubscriber.java:52) ~[rxjava-3.1.4.jar:?]
	at io.reactivex.rxjava3.internal.subscribers.BasicFuseableSubscriber.onSubscribe(BasicFuseableSubscriber.java:67) ~[rxjava-3.1.4.jar:?]
	at io.reactivex.rxjava3.internal.subscribers.BasicFuseableSubscriber.onSubscribe(BasicFuseableSubscriber.java:67) ~[rxjava-3.1.4.jar:?]
	at io.reactivex.rxjava3.internal.operators.flowable.FlowableFromIterable.subscribe(FlowableFromIterable.java:66) ~[rxjava-3.1.4.jar:?]
	at io.reactivex.rxjava3.internal.operators.flowable.FlowableFromIterable.subscribeActual(FlowableFromIterable.java:47) ~[rxjava-3.1.4.jar:?]
	at io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:15917) ~[rxjava-3.1.4.jar:?]
	at io.reactivex.rxjava3.internal.operators.flowable.FlowableFilter.subscribeActual(FlowableFilter.java:38) ~[rxjava-3.1.4.jar:?]
	at io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:15917) ~[rxjava-3.1.4.jar:?]
	at io.reactivex.rxjava3.internal.operators.flowable.FlowableMap.subscribeActual(FlowableMap.java:38) ~[rxjava-3.1.4.jar:?]
	at io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:15917) ~[rxjava-3.1.4.jar:?]
	at io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:15808) ~[rxjava-3.1.4.jar:?]
	at io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:15768) ~[rxjava-3.1.4.jar:?]
	at com.influxdb.client.internal.AbstractWriteClient.writePoints(AbstractWriteClient.java:228) ~[influxdb-client-java-6.2.0.groupBy-20220603.093958-4.jar:6.2.0.groupBy-SNAPSHOT]
	at com.influxdb.client.internal.WriteApiImpl.writePoints(WriteApiImpl.java:190) ~[influxdb-client-java-6.2.0.groupBy-20220603.093958-4.jar:6.2.0.groupBy-SNAPSHOT]
	at com.influxdb.client.internal.WriteApiImpl.writePoint(WriteApiImpl.java:156) ~[influxdb-client-java-6.2.0.groupBy-20220603.093958-4.jar:6.2.0.groupBy-SNAPSHOT]
	at com.influxdb.client.internal.WriteApiImpl.writePoint(WriteApiImpl.java:143) ~[influxdb-client-java-6.2.0.groupBy-20220603.093958-4.jar:6.2.0.groupBy-SNAPSHOT]
	at com.zcs.datastorage.service.influxDB.InfluxDBService.write(InfluxDBService.java:175) ~[classes/:?]
	at com.zcs.datastorage.Mq.consumer.InfluxDBConsumerService.onMessage(InfluxDBConsumerService.java:42) ~[classes/:?]
	at com.zcs.datastorage.Mq.consumer.InfluxDBConsumerService.onMessage(InfluxDBConsumerService.java:17) ~[classes/:?]
	at org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer$DefaultMessageListenerConcurrently.consumeMessage(DefaultRocketMQListenerContainer.java:350) ~[rocketmq-spring-boot-2.0.4.jar:2.0.4]
	at org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService$ConsumeRequest.run(ConsumeMessageConcurrentlyService.java:412) ~[rocketmq-client-4.8.0.jar:4.8.0]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_301]
	at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) ~[?:1.8.0_301]
	at java.util.concurrent.FutureTask.run(FutureTask.java) ~[?:1.8.0_301]
	... 3 more

pom.xml.txt
InfluxDBService.java.txt
stdout-20220617.txt

@bednar
Copy link
Contributor

bednar commented Jun 17, 2022

@linghengqian thanks for testing

@bednar
Copy link
Contributor

bednar commented Jun 17, 2022

@xiegeng thanks for testing. Please try the latest SNAPSHOT, your issue was fixed by: ba23663

@bednar
Copy link
Contributor

bednar commented Jun 29, 2022

Hi All,

The PR #358 is ready to review before merge into master. The development version is deployed into snapshot repository with version 6.2.0.groupBy-SNAPSHOT.

Any feedback and testing will be appreciate👂

Regards

@bednar bednar added the bug Something isn't working label Jun 29, 2022
@bednar bednar added this to the 6.4.0 milestone Jul 19, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

9 participants