Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JdbcMetadataStore does not handle concurrent transactions correctly #3609

Open
JulienCharon opened this issue Aug 2, 2021 · 6 comments
Open

Comments

@JulienCharon
Copy link

JulienCharon commented Aug 2, 2021

Versions

  • Sprint Boot 2.5.2
  • Sprint Integration 5.5.1 and 5.5.2
  • PostgreSQL 11.2and 13

Describe the bug
I have following File Integration Flow setup in a Spring Boot App:

  @Bean
  public IntegrationFlow fileFlow(
      Consumer consumer,
      ConcurrentMetadataStore metadataStore,
      Executor filePollerExecutor) {
    var fileFilter =
        new ChainFileListFilter<>(
            List.of(
                new LastModifiedFileListFilter(10L),
                new FileSystemPersistentAcceptOnceFileListFilter(
                    metadataStore, "myPrefix")));
    return IntegrationFlows.from(
            Files.inboundAdapter(sourceDirectory)
                .filter(fileFilter),
            poller ->
                poller.poller(
                    pm ->
                        pm.fixedRate(100L)
                            .taskExecutor(filePollerExecutor)
							.transactional()))
        .transform(Files.toStringTransformer(StandardCharsets.UTF_8.toString(), true))
        .handle(
            String.class,
            (p, h) -> {
              consumer.consumer(p);
              return null;
            })
        .get();
  }

  @Bean
  Executor filePollerExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(10);
    executor.setMaxPoolSize(100);
    executor.initialize();
    return executor;
  }

  @Bean
  ConcurrentMetadataStore metadataStore(DataSource dataSource) {
    return new JdbcMetadataStore(dataSource);
  }

When I copy 500 files into the directory that is being polled, most of the are processed. But a couple of them are not, I can see a couple of exceptions like:

2021-08-02 14:34:17.319 ERROR 25316 --- [llerExecutor-10] o.s.integration.handler.LoggingHandler :

org.springframework.messaging.MessagingException: nested exception is org.springframework.jdbc.UncategorizedSQLException: PreparedStatementCallback; uncategorized SQLException for SQL [SELECT METADATA_VALUE FROM INT_METADATA_STORE WHERE METADATA_KEY=? AND REGION=?]; SQL state [25P02]; error code [0]; ERROR: current transaction is aborted, commands ignored until end of transaction block; nested exception is org.postgresql.util.PSQLException: ERROR: current transaction is aborted, commands ignored until end of transaction block
at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:427)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$4(AbstractPollingEndpoint.java:348)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
at java.base/java.lang.Thread.run(Thread.java:831)
Caused by: org.springframework.jdbc.UncategorizedSQLException: PreparedStatementCallback; uncategorized SQLException for SQL [SELECT METADATA_VALUE FROM INT_METADATA_STORE WHERE METADATA_KEY=? AND REGION=?]; SQL state [25P02]; error code [0]; ERROR: current transaction is aborted, commands ignored until end of transaction block; nested exception is org.postgresql.util.PSQLException: ERROR: current transaction is aborted, commands ignored until end of transaction block
at org.springframework.jdbc.core.JdbcTemplate.translateException(JdbcTemplate.java:1542)
at org.springframework.jdbc.core.JdbcTemplate.execute(JdbcTemplate.java:667)
at org.springframework.jdbc.core.JdbcTemplate.query(JdbcTemplate.java:713)
at org.springframework.jdbc.core.JdbcTemplate.query(JdbcTemplate.java:744)
at org.springframework.jdbc.core.JdbcTemplate.query(JdbcTemplate.java:757)
at org.springframework.jdbc.core.JdbcTemplate.queryForObject(JdbcTemplate.java:879)
at org.springframework.jdbc.core.JdbcTemplate.queryForObject(JdbcTemplate.java:906)
at org.springframework.integration.jdbc.metadata.JdbcMetadataStore.putIfAbsent(JdbcMetadataStore.java:156)
at org.springframework.integration.jdbc.metadata.JdbcMetadataStore$$FastClassBySpringCGLIB$$1bf04d6a.invoke()
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:779)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:123)
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:388)
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:692)
at org.springframework.integration.jdbc.metadata.JdbcMetadataStore$$EnhancerBySpringCGLIB$$32961232.putIfAbsent()
at org.springframework.integration.file.filters.AbstractPersistentAcceptOnceFileListFilter.accept(AbstractPersistentAcceptOnceFileListFilter.java:83)
at org.springframework.integration.file.filters.AbstractFileListFilter.filterFiles(AbstractFileListFilter.java:38)
at org.springframework.integration.file.filters.ChainFileListFilter.filterFiles(ChainFileListFilter.java:59)
at org.springframework.integration.file.filters.CompositeFileListFilter.filterFiles(CompositeFileListFilter.java:144)
at org.springframework.integration.file.DefaultDirectoryScanner.listFiles(DefaultDirectoryScanner.java:95)
at org.springframework.integration.file.FileReadingMessageSource.scanInputDirectory(FileReadingMessageSource.java:375)
at org.springframework.integration.file.FileReadingMessageSource.doReceive(FileReadingMessageSource.java:349)
at org.springframework.integration.file.FileReadingMessageSource.doReceive(FileReadingMessageSource.java:94)
at org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:142)
at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:212)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:444)
at java.base/jdk.internal.reflect.GeneratedMethodAccessor270.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:567)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:123)
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:388)
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215)
at jdk.proxy3/jdk.proxy3.$Proxy219.call(Unknown Source)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:413)
... 5 more
Caused by: org.postgresql.util.PSQLException: ERROR: current transaction is aborted, commands ignored until end of transaction block
at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2552)
at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2284)
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:322)
at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:481)
at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:401)
at org.postgresql.jdbc.PgPreparedStatement.executeWithFlags(PgPreparedStatement.java:164)
at org.postgresql.jdbc.PgPreparedStatement.executeQuery(PgPreparedStatement.java:114)
at com.zaxxer.hikari.pool.ProxyPreparedStatement.executeQuery(ProxyPreparedStatement.java:52)
at com.zaxxer.hikari.pool.HikariProxyPreparedStatement.executeQuery(HikariProxyPreparedStatement.java)
at org.springframework.jdbc.core.JdbcTemplate$1.doInPreparedStatement(JdbcTemplate.java:722)
at org.springframework.jdbc.core.JdbcTemplate.execute(JdbcTemplate.java:651)
... 47 more
Caused by: org.postgresql.util.PSQLException: ERROR: duplicate key value violates unique constraint "int_metadata_store_pk"
Detail: Key (metadata_key, region)=(business-case_C:\myDir\myFile-xyz.xml, DEFAULT) already exists.
at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2552)
at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2284)
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:322)
at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:481)
at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:401)
at org.postgresql.jdbc.PgPreparedStatement.executeWithFlags(PgPreparedStatement.java:164)
at org.postgresql.jdbc.PgPreparedStatement.executeUpdate(PgPreparedStatement.java:130)
at com.zaxxer.hikari.pool.ProxyPreparedStatement.executeUpdate(ProxyPreparedStatement.java:61)
at com.zaxxer.hikari.pool.HikariProxyPreparedStatement.executeUpdate(HikariProxyPreparedStatement.java)
at org.springframework.jdbc.core.JdbcTemplate.lambda$update$2(JdbcTemplate.java:965)
at org.springframework.jdbc.core.JdbcTemplate.execute(JdbcTemplate.java:651)
at org.springframework.jdbc.core.JdbcTemplate.update(JdbcTemplate.java:960)
at org.springframework.jdbc.core.JdbcTemplate.update(JdbcTemplate.java:1015)
at org.springframework.integration.jdbc.metadata.JdbcMetadataStore.tryToPutIfAbsent(JdbcMetadataStore.java:167)
at org.springframework.integration.jdbc.metadata.JdbcMetadataStore.putIfAbsent(JdbcMetadataStore.java:148)
... 41 more

Although it looks like all files where processed correctly, when I query the int_metadata_store table, I can see that a couple of them have the value 0 in the metadata_value column. If I copy those files to the directory being polled again, they are processed again.

Expected behavior

All files should be processed, no exceptions should be thrown and there should be no entries with metadata_value "0" int_metadata_store table.

Looks similar to the behavior described in #3282

@JulienCharon JulienCharon added status: waiting-for-triage The issue need to be evaluated and its future decided type: bug labels Aug 2, 2021
@artembilan
Copy link
Member

This looks more like this issue: #3576.

Please, see the solution in the end. Probably you have the same problem.

@artembilan artembilan added status: waiting-for-reporter Needs a feedback from the reporter and removed status: waiting-for-triage The issue need to be evaluated and its future decided labels Aug 3, 2021
@JulienCharon
Copy link
Author

JulienCharon commented Aug 3, 2021

@artembilan: thank you very much for you input.
I'm afraid that's not the reason. I debugged through org.springframework.jdbc.support.SQLErrorCodeSQLExceptionTranslator and I can see that there is an exception that is correctly translated to a DuplicateKeyException.
However, there is another Exception with error code 25P02 coming from PostgreSQL saying that the current transaction is already aborted. But that one occurs when executing a select statement (SELECT METADATA_VALUE FROM INT_METADATA_STORE WHERE METADATA_KEY=? AND REGION=?).
After analyzing JdbcMetadataStore#putIfAbsent it seems clear to me what's happening:

  1. tryToPutIfAbsent(...) is called
  2. The insert statement causes an exception in the database which is translated to a DuplicateKeyException. When the exception is thrown from the database (error code 23505), the transaction is aborted
  3. Since tryToPutIfAbsent(...) returns zero, the query on line 156, which is SELECT METADATA_VALUE FROM %sMETADATA_STORE WHERE METADATA_KEY=? AND REGION=?, is executed, causing the database to throw another exception (error code 25P02) because the transaction is already aborted

@artembilan
Copy link
Member

What is your TransactionManager?
The JdbcMetadataStore.putIfAbsent() is marked with @Transactional. So, if it is declared as a bean and there is an appropriate TransactionManager (typically a DataSourceTransactionManager), the outer transaction is started, fully around this method. Not local withing a target DB.
I mean that DuplicateKeyException must not mark this application-level transaction as aborted since DB has not started it to control, therefore it must not abort it or commit.

Do I miss anything related to the application-level transaction management?

@JulienCharon
Copy link
Author

I'm using Spring Boot and didn't configure a TransactionManager manually. org.springframework.orm.jpa.JpaTransactionManager is configured automatically.
I'm far from being an expert in transaction management, so I guess you're right. But how can that behavior be explained then?

@artembilan
Copy link
Member

OK. It probably would be great if you are able to share with us a simple Spring Boot project which let us to reproduce issue and debug the situation.

@artembilan
Copy link
Member

Perhaps we need to revise this issue for a similar fix we did in the DefaultLockRepository: #3733.

So, this JdbcMetadataStore will be free from @Transactional and you will be able to inject an appropriate TransactionManager which works well exactly with JDBC.

@artembilan artembilan added this to the 6.0.x milestone Apr 19, 2022
@artembilan artembilan modified the milestones: 6.0.x, 6.0.0 Nov 16, 2022
@artembilan artembilan self-assigned this Nov 16, 2022
@artembilan artembilan modified the milestones: 6.0.0, Backlog Nov 16, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants