Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

When writer throw non skippable exception(i.e. StringIndexOutOfBoundsException) then processor going infinite loop #4536

Open
punitsingh2 opened this issue Jan 25, 2024 · 3 comments
Labels
status: waiting-for-reporter Issues for which we are waiting for feedback from the reporter type: bug

Comments

@punitsingh2
Copy link

punitsingh2 commented Jan 25, 2024

When writer throw non skippable exception(i.e. StringIndexOutOfBoundsException) then processor going infinite loop

@Bean
 @JobScope
  public Step getKeysFromDB(@Value("#{jobParameters[name]}") String name) {
   
      SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = new SimpleAsyncTaskExecutor();
      simpleAsyncTaskExecutor.setThreadNamePrefix("life-ins-core-thread-" + "data-");
      simpleAsyncTaskExecutor.setTaskDecorator(new Slf4JTaskDecorator());
      
      return new StepBuilder("", jobRepository)
              .partitioner(STEP_GET_KEYS_FROM_SOURCE_DB, rangePartitioner)
                  .partitionHandler(null).step(processData())
                  .gridSize(1)
                  .taskExecutor(simpleAsyncTaskExecutor)
                          .build();
  }
  
  
  private Step processData() {
      
      SimpleStepBuilder simpleStepBuilder = new StepBuilder("slaveStep", jobRepository)
              .chunk(1, transactionManager);
      
      simpleStepBuilder.listener(preProcessingWriter);
      simpleStepBuilder.listener(postProcessingWriter);
      simpleStepBuilder.listener(batchDetailItemWriter);
      simpleStepBuilder.listener(postProcessingRWriter);
      
      simpleStepBuilder
                      .reader(sourceDbKeysReader)
                      .processor(stagingToBatchDetailProcessor)
                      .writer(sequenceWriterNeeded())
                      .listener(coreItemProcessorListener)
                      .listener(coreItemWriterListener)
                      .listener(new CoreChunkListener ())
                      .listener(coreStepExecutionListener);
                  
                      return simpleStepBuilder.faultTolerant()
                          .retryLimit(2)
                          .retry(BatchSystemException.class)
                          .skipLimit(2)
                          .skip(SkippableException.class)
                          .build();
  }
  
  private CompositeItemWriter sequenceWriterNeeded() {
      
      CompositeItemWriter compositeWriter = new CompositeItemWriter();
      
      List<ItemWriter> listWriter = new ArrayList<>();
      listWriter.add(preProcessingWriter);
      listWriter.add(batchDetailItemWriter);
      listWriter.add(postProcessingWriter);
      listWriter.add(postProcessingRWriter);
              compositeWriter.setDelegates(listWriter);
      return compositeWriter;
      
  }

This is processor:

@Service
@StepScope
public class StagingToBatchDetailProcessor implements ItemProcessor<HashMap<String, Object>, HashMap<String, Object>>{
    
    private static final Logger logger = LoggerFactory.getLogger(StagingToBatchDetailProcessor.class);
    private ApplicationContext applicationContext;
    public StagingToBatchDetailProcessor(ApplicationContext applicationContext) {
        this.applicationContext = applicationContext;
    }
    @Override
    public HashMap<String, Object> process(HashMap<String, Object> item) throws Exception {
        logger.info("inside StagingToBatchDetailProcessor::process");
        item.put("last_output", "item2");
         return item;
    }

}

if i see the code of FaultTolerantChunkProcessor only retryCallback is happening its not going in recoveryCallback.

try {
                    batchRetryTemplate.execute(retryCallback, recoveryCallback,
                            new DefaultRetryState(inputs, rollbackClassifier));
                }
                catch (Exception e) {
                    RetryContext context = contextHolder.get();
                    if (!batchRetryTemplate.canRetry(context)) {
                        /*
                         * BATCH-1761: we need advance warning of the scan about to start in
                         * the next transaction, so we can change the processing behaviour.
                         */
                        data.scanning(true);
                    }
                    throw e;
                }

Spring boot 3.1.6 and java 17

Steps to reproduce
its happening after migration from spring boot 2.7.13 to 3.16

Expected behavior
if any non-skippable error thrown from Item writer it should go in recoveryCallback(i.e. exausted) instead retryCall back. Here from StepContextRepeatCallback its should go to FaultTolerantChunkProcessor but its going to stagingToBatchDetailProcessor then scan is happening which is causing infinite loop.

While migration only change that we have made we used @JobScope only for late binding only we made the change.
When i debug i found that retryContextCache.get(key) always return null but i can value is there seems something wrong with hascode. this condition always gets satisfied while (canRetry(retryPolicy, context) && !context.isExhaustedOnly())

This infinite loop getting resolved only when doing faultTolerant().processorNonTransactional() or when returning new object from processor as below :

@Override
    public HashMap<String, Object> process(HashMap<String, Object> item) throws Exception {
        logger.info("inside StagingToBatchDetailProcessor::process");
        HashMap<String, Object> obj = new HashMap<String, Object>();
        obj.put("final_outpot", item.get("final_output"));
        
        item.put("last_output", "item2");
         return obj;
    }

I have checked every thing but don't see any issue, Just wondering there two solution what makes difference processorNonTransactional or retuning new Object from processor. I don't want to go with both solution as how it will impact on without transaction and with new object may leads to out of memory issue. As data is related to payment need to processed.

@punitsingh2 punitsingh2 added status: waiting-for-triage Issues that we did not analyse yet type: bug labels Jan 25, 2024
@fmbenhassine
Copy link
Contributor

This might be related to the way items are identified during retry (ie if the item type correctly implements equals/hascode). Can you please package a minimal example that reproduces the issue? This will help us identify the issue and find the best way to fix it. Thank you upfront.

@fmbenhassine fmbenhassine added status: waiting-for-reporter Issues for which we are waiting for feedback from the reporter and removed status: waiting-for-triage Issues that we did not analyse yet labels Feb 9, 2024
@punitsingh2
Copy link
Author

@fmbenhassine i got work around by creating Chunk.java class under package org.springframework.batch.item only I removed hashCode method then its working fine.

do you think hasCode logic should change that?

@fmbenhassine
Copy link
Contributor

Yes, hashcode is important as it is used to identify items from the cache during the retry. The item processor should be idempotent in a fault-tolerant step (there is a note about this here), which does not seem to be the case in your case. But since this is not always possible to achieve, the processorNonTransactional parameter can be used.

As a side note, the step you shared is probably doing too much with all these listeners and writers, and the fact of using a hashmap as item type and storing the last_output in it etc is possibly a sign of that. I could be wrong, but I would recommend breaking the logic into smaller steps with a staging area in between (this makes it easier to implement fault-tolerance like with the process-indicator pattern for example).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status: waiting-for-reporter Issues for which we are waiting for feedback from the reporter type: bug
Projects
None yet
Development

No branches or pull requests

2 participants