Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce retries while creating stream message decoder for more robustness #13036

Merged
merged 3 commits into from
May 17, 2024

Conversation

tibrewalpratik17
Copy link
Contributor

We have intermittently seen issues in our clusters while creating streamMessageDecoder. Stack trace:

java.lang.RuntimeException: Caught exception while creating StreamMessageDecoder from stream config: 
StreamConfig{_type='kafka', _topicName='<redacted>', _consumerTypes=[LOWLEVEL], _consumerFactoryClassName='redacted>', 
_offsetCriteria='OffsetCriteria{_offsetType=LARGEST, _offsetString='largest'}', _connectionTimeoutMillis=30000, _fetchTimeoutMillis=5000, _idleTimeoutMillis=180000, _flushThresholdRows=80000000, 
_flushThresholdTimeMillis=86400000, _flushSegmentDesiredSizeBytes=209715200, _flushAutotuneInitialRows=100000, _decoderClass='redacted', _decoderProperties={}, _groupId='null', _topicConsumptionRateLimit=-1.0, 
_tableNameWithType='redacted'}
at org.apache.pinot.spi.stream.StreamDecoderProvider.create(StreamDecoderProvider.java:48)
at org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager.(LLRealtimeSegmentDataManager.java:1424)
at org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager.addSegment(RealtimeTableDataManager.java:446)
at org.apache.pinot.server.starter.helix.HelixInstanceDataManager.addRealtimeSegment(HelixInstanceDataManager.java:228)
at org.apache.pinot.server.starter.helix.SegmentOnlineOfflineStateModelFactory$SegmentOnlineOfflineStateModel.onBecomeOnlineFromOffline(SegmentOnlineOfflineStateModelFactory.java:168)
at org.apache.pinot.server.starter.helix.SegmentOnlineOfflineStateModelFactory$SegmentOnlineOfflineStateModel.onBecomeConsumingFromOffline(SegmentOnlineOfflineStateModelFactory.java:83)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.helix.messaging.handling.HelixStateTransitionHandler.invoke(HelixStateTransitionHandler.java:350)
at org.apache.helix.messaging.handling.HelixStateTransitionHandler.handleMessage(HelixStateTransitionHandler.java:278)
at org.apache.helix.messaging.handling.HelixTask.call(HelixTask.java:97)
at org.apache.helix.messaging.handling.HelixTask.call(HelixTask.java:49)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)

This stops consumption in one of the replicas and once the other replica starts committing, this stopped replica always ends up in ERROR state. The only way to fix this is to reset this replica's segment.
The behaviour of not consuming in one replica is also dangerous as if the other replica's hosts restarts / goes down due to any reason, it can cause data loss scenarios.

Having a retry policy during StreamMessageDecoder.create() may help reduce the chances of such scenarios.

@tibrewalpratik17 tibrewalpratik17 changed the title Introduce retries while creating stream message decoder to make syste… Introduce retries while creating stream message decoder for more robustness Apr 30, 2024
@codecov-commenter
Copy link

codecov-commenter commented Apr 30, 2024

Codecov Report

Attention: Patch coverage is 66.66667% with 4 lines in your changes are missing coverage. Please review.

Project coverage is 62.17%. Comparing base (59551e4) to head (5e14a73).
Report is 448 commits behind head on master.

Files Patch % Lines
...a/manager/realtime/RealtimeSegmentDataManager.java 66.66% 4 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #13036      +/-   ##
============================================
+ Coverage     61.75%   62.17%   +0.42%     
+ Complexity      207      198       -9     
============================================
  Files          2436     2515      +79     
  Lines        133233   137867    +4634     
  Branches      20636    21335     +699     
============================================
+ Hits          82274    85723    +3449     
- Misses        44911    45755     +844     
- Partials       6048     6389     +341     
Flag Coverage Δ
custom-integration1 <0.01% <0.00%> (-0.01%) ⬇️
integration <0.01% <0.00%> (-0.01%) ⬇️
integration1 <0.01% <0.00%> (-0.01%) ⬇️
integration2 0.00% <0.00%> (ø)
java-11 62.13% <66.66%> (+0.42%) ⬆️
java-21 62.05% <66.66%> (+0.43%) ⬆️
skip-bytebuffers-false 62.16% <66.66%> (+0.41%) ⬆️
skip-bytebuffers-true 62.01% <66.66%> (+34.28%) ⬆️
temurin 62.17% <66.66%> (+0.42%) ⬆️
unittests 62.17% <66.66%> (+0.42%) ⬆️
unittests1 46.73% <66.66%> (-0.16%) ⬇️
unittests2 27.80% <0.00%> (+0.07%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@swaminathanmanish
Copy link
Contributor

We have intermittently seen issues in our clusters while creating streamMessageDecoder. Stack trace:

java.lang.RuntimeException: Caught exception while creating StreamMessageDecoder from stream config: 
StreamConfig{_type='kafka', _topicName='<redacted>', _consumerTypes=[LOWLEVEL], _consumerFactoryClassName='redacted>', 
_offsetCriteria='OffsetCriteria{_offsetType=LARGEST, _offsetString='largest'}', _connectionTimeoutMillis=30000, _fetchTimeoutMillis=5000, _idleTimeoutMillis=180000, _flushThresholdRows=80000000, 
_flushThresholdTimeMillis=86400000, _flushSegmentDesiredSizeBytes=209715200, _flushAutotuneInitialRows=100000, _decoderClass='redacted', _decoderProperties={}, _groupId='null', _topicConsumptionRateLimit=-1.0, 
_tableNameWithType='redacted'}
at org.apache.pinot.spi.stream.StreamDecoderProvider.create(StreamDecoderProvider.java:48)
at org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager.(LLRealtimeSegmentDataManager.java:1424)
at org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager.addSegment(RealtimeTableDataManager.java:446)
at org.apache.pinot.server.starter.helix.HelixInstanceDataManager.addRealtimeSegment(HelixInstanceDataManager.java:228)
at org.apache.pinot.server.starter.helix.SegmentOnlineOfflineStateModelFactory$SegmentOnlineOfflineStateModel.onBecomeOnlineFromOffline(SegmentOnlineOfflineStateModelFactory.java:168)
at org.apache.pinot.server.starter.helix.SegmentOnlineOfflineStateModelFactory$SegmentOnlineOfflineStateModel.onBecomeConsumingFromOffline(SegmentOnlineOfflineStateModelFactory.java:83)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.helix.messaging.handling.HelixStateTransitionHandler.invoke(HelixStateTransitionHandler.java:350)
at org.apache.helix.messaging.handling.HelixStateTransitionHandler.handleMessage(HelixStateTransitionHandler.java:278)
at org.apache.helix.messaging.handling.HelixTask.call(HelixTask.java:97)
at org.apache.helix.messaging.handling.HelixTask.call(HelixTask.java:49)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)

This stops consumption in one of the replicas and once the other replica starts committing, this stopped replica always ends up in ERROR state. The only way to fix this is to reset this replica's segment. The behaviour of not consuming in one replica is also dangerous as if the other replica's hosts restarts / goes down due to any reason, it can cause data loss scenarios.

Having a retry policy during StreamMessageDecoder.create() may help reduce the chances of such scenarios.

What kind of failures are these ? Are all these transient failures that are likely to go away on retries?

@tibrewalpratik17
Copy link
Contributor Author

tibrewalpratik17 commented Apr 30, 2024

What kind of failures are these ? Are all these transient failures that are likely to go away on retries?

Yes till now we have mostly seen transient failures. We use internal kafka-clients so transiently we see client-timeouts or something similar during this code-path resulting in segment errors.

One example is JDK bug we found in our internal client very similar to user-agent issue called out in #10894. Often this particular bug results in NPE during StreamDecoderProvider.create and can be fixed by retries.

@tibrewalpratik17
Copy link
Contributor Author

hey @swaminathanmanish can you help with review?

@tibrewalpratik17
Copy link
Contributor Author

hey @swaminathanmanish bump on reviewing this

@Jackie-Jiang Jackie-Jiang requested review from KKcorps and removed request for swaminathanmanish May 11, 2024 06:17
Copy link
Contributor

@snleee snleee left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM otherwise

@xiangfu0 xiangfu0 merged commit d4aa66c into apache:master May 17, 2024
20 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants