Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SCF multithreading issue #1136

Open
GioTem opened this issue Apr 15, 2024 · 4 comments
Open

SCF multithreading issue #1136

GioTem opened this issue Apr 15, 2024 · 4 comments

Comments

@GioTem
Copy link

GioTem commented Apr 15, 2024

Hi everyone,
I've developed a server application that receives data in parallel from a client. I've noticed that when I send two packets simultaneously, they are processed in parallel up to my function composition. However, I deliberately added a sleep(To simulate the blocking operations present in the real case, such as file writing) at the end of my function composition to verify that the data were actually being processed in parallel. Unfortunately, the logs show that all packets are being processed every 3 seconds, as if they were being executed sequentially.
Could someone help me understand what's happening?

My code
`

@Bean
public TcpConnectionFactoryFactoryBean tcpSourceConnectionFactory() {
    TcpConnectionFactoryFactoryBean factoryBean = new TcpConnectionFactoryFactoryBean();
    factoryBean.setType("server");
    factoryBean.setPort(12345);
    factoryBean.setUsingNio(false);
    factoryBean.setDeserializer(new MyDeserializer());
    return factoryBean;
}

@Bean
public TcpInboundGateway gateway(@Qualifier("tcpSourceConnectionFactory") AbstractConnectionFactory connectionFactory) {
    TcpInboundGateway gateway = new TcpInboundGateway();
    gateway.setConnectionFactory(connectionFactory);
    gateway.setAutoStartup(false);
    return gateway;
}

@Bean
public Publisher<Message<String>> tcpSupplierFlow(TcpInboundGateway gateway) {
    return IntegrationFlow.from(gateway)
            .headerFilter(IpHeaders.LOCAL_ADDRESS)
            .toReactivePublisher();
}

@Bean
public Supplier<Flux<Message<String>>> tcpSupplier(
        Publisher<Message<String>> tcpSupplierFlow,
        TcpInboundGateway gateway) {
    return () -> Flux.from(tcpSupplierFlow)
            .doOnSubscribe(subscription -> gateway.start());
}

@Bean
public ApplicationListener<TcpConnectionCloseEvent> onClose() {
    return event -> logger.warn("Closing connection...");
}

@Bean
public Function<Message<String>, Message<String>> messageHandler() {
    return m -> {
        String message = m.getPayload();
        if (!message.isEmpty()) {
            try {
                logger.error("RECEIVED MESSAGE: %s | receivedTimestamp %s".formatted(m.getPayload(), new Date().getTime()));
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
        return MessageBuilder.withPayload(m.getPayload()).build();
    };
}

`

Output Logs
2024-04-15T15:44:38.755+02:00 DEBUG 26152 --- [pool-5-thread-1] .s.i.i.t.c.TcpNetServerConnectionFactory : Accepted connection from 127.0.0.1:64720 2024-04-15T15:44:38.759+02:00 DEBUG 26152 --- [pool-5-thread-1] o.s.i.i.tcp.connection.TcpNetConnection : New connection 127.0.0.1:64720:12345:93d694a1-a774-4cb7-911a-e18e40047bea 2024-04-15T15:44:38.759+02:00 DEBUG 26152 --- [pool-5-thread-1] .s.i.i.t.c.TcpNetServerConnectionFactory : tcpSourceConnectionFactory: Added new connection: 127.0.0.1:64720:12345:93d694a1-a774-4cb7-911a-e18e40047bea 2024-04-15T15:44:38.759+02:00 DEBUG 26152 --- [pool-5-thread-2] o.s.i.i.tcp.connection.TcpNetConnection : 127.0.0.1:64720:12345:93d694a1-a774-4cb7-911a-e18e40047bea Reading... 2024-04-15T15:44:38.759+02:00 DEBUG 26152 --- [pool-5-thread-1] .s.i.i.t.c.TcpNetServerConnectionFactory : Accepted connection from 127.0.0.1:64719 2024-04-15T15:44:38.759+02:00 DEBUG 26152 --- [pool-5-thread-1] o.s.i.i.tcp.connection.TcpNetConnection : New connection 127.0.0.1:64719:12345:2a1da428-fbd9-4b5d-b83a-5b5a4b26bf1a 2024-04-15T15:44:38.759+02:00 DEBUG 26152 --- [pool-5-thread-1] .s.i.i.t.c.TcpNetServerConnectionFactory : tcpSourceConnectionFactory: Added new connection: 127.0.0.1:64719:12345:2a1da428-fbd9-4b5d-b83a-5b5a4b26bf1a 2024-04-15T15:44:38.759+02:00 DEBUG 26152 --- [pool-5-thread-3] o.s.i.i.tcp.connection.TcpNetConnection : 127.0.0.1:64719:12345:2a1da428-fbd9-4b5d-b83a-5b5a4b26bf1a Reading... 2024-04-15T15:44:38.763+02:00 DEBUG 26152 --- [pool-5-thread-3] o.s.i.i.tcp.connection.TcpNetConnection : Message received GenericMessage [payload=Hello server from threadId 23 | timestamp 1713188678755�n, headers={ip_tcp_remotePort=64719, ip_connectionId=127.0.0.1:64719:12345:2a1da428-fbd9-4b5d-b83a-5b5a4b26bf1a, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=8d07d3e2-5008-a609-1987-f1057e262712, ip_hostname=127.0.0.1, timestamp=1713188678763}] 2024-04-15T15:44:38.763+02:00 DEBUG 26152 --- [pool-5-thread-2] o.s.i.i.tcp.connection.TcpNetConnection : Message received GenericMessage [payload=Hello server from threadId 24 | timestamp 1713188678755�n, headers={ip_tcp_remotePort=64720, ip_connectionId=127.0.0.1:64720:12345:93d694a1-a774-4cb7-911a-e18e40047bea, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=0b49f68f-5f2b-eea4-45a1-5cdedecd39fa, ip_hostname=127.0.0.1, timestamp=1713188678763}] 2024-04-15T15:44:38.765+02:00 DEBUG 26152 --- [pool-5-thread-2] o.s.integration.channel.DirectChannel : preSend on channel 'bean 'tcpSupplierFlow.channel#0'; defined in: 'class path resource [com/example/demo/Conf.class]'; from source: 'bean method tcpSupplierFlow'', message: GenericMessage [payload=Hello server from threadId 24 | timestamp 1713188678755�n, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@22a5034d, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@22a5034d, ip_tcp_remotePort=64720, ip_connectionId=127.0.0.1:64720:12345:93d694a1-a774-4cb7-911a-e18e40047bea, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=5083e134-7990-7ef2-0379-364bde276e6b, ip_hostname=127.0.0.1, timestamp=1713188678765}] 2024-04-15T15:44:38.765+02:00 DEBUG 26152 --- [pool-5-thread-3] o.s.integration.channel.DirectChannel : preSend on channel 'bean 'tcpSupplierFlow.channel#0'; defined in: 'class path resource [com/example/demo/Conf.class]'; from source: 'bean method tcpSupplierFlow'', message: GenericMessage [payload=Hello server from threadId 23 | timestamp 1713188678755�n, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@10b01e60, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@10b01e60, ip_tcp_remotePort=64719, ip_connectionId=127.0.0.1:64719:12345:2a1da428-fbd9-4b5d-b83a-5b5a4b26bf1a, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=ebbe5efc-0a4b-6b18-ca11-d86ba28acdc0, ip_hostname=127.0.0.1, timestamp=1713188678765}] 2024-04-15T15:44:38.765+02:00 DEBUG 26152 --- [pool-5-thread-3] o.s.i.t.MessageTransformingHandler : bean 'tcpSupplierFlow.header-filter#1' for component 'tcpSupplierFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'class path resource [com/example/demo/Conf.class]'; from source: 'bean method tcpSupplierFlow' received message: GenericMessage [payload=Hello server from threadId 23 | timestamp 1713188678755�n, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@10b01e60, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@10b01e60, ip_tcp_remotePort=64719, ip_connectionId=127.0.0.1:64719:12345:2a1da428-fbd9-4b5d-b83a-5b5a4b26bf1a, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=ebbe5efc-0a4b-6b18-ca11-d86ba28acdc0, ip_hostname=127.0.0.1, timestamp=1713188678765}] 2024-04-15T15:44:38.765+02:00 DEBUG 26152 --- [pool-5-thread-2] o.s.i.t.MessageTransformingHandler : bean 'tcpSupplierFlow.header-filter#1' for component 'tcpSupplierFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'class path resource [com/example/demo/Conf.class]'; from source: 'bean method tcpSupplierFlow' received message: GenericMessage [payload=Hello server from threadId 24 | timestamp 1713188678755�n, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@22a5034d, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@22a5034d, ip_tcp_remotePort=64720, ip_connectionId=127.0.0.1:64720:12345:93d694a1-a774-4cb7-911a-e18e40047bea, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=5083e134-7990-7ef2-0379-364bde276e6b, ip_hostname=127.0.0.1, timestamp=1713188678765}] 2024-04-15T15:44:38.765+02:00 DEBUG 26152 --- [pool-5-thread-3] o.s.i.channel.FluxMessageChannel : preSend on channel 'bean 'tcpSupplierFlow.channel#1'; defined in: 'class path resource [com/example/demo/Conf.class]'; from source: 'bean method tcpSupplierFlow'', message: GenericMessage [payload=Hello server from threadId 23 | timestamp 1713188678755�n, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@10b01e60, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@10b01e60, ip_tcp_remotePort=64719, ip_connectionId=127.0.0.1:64719:12345:2a1da428-fbd9-4b5d-b83a-5b5a4b26bf1a, ip_address=127.0.0.1, id=47ea1331-8e47-4618-d208-e84d0b70f2b0, ip_hostname=127.0.0.1, timestamp=1713188678765}] 2024-04-15T15:44:38.765+02:00 DEBUG 26152 --- [pool-5-thread-2] o.s.i.channel.FluxMessageChannel : preSend on channel 'bean 'tcpSupplierFlow.channel#1'; defined in: 'class path resource [com/example/demo/Conf.class]'; from source: 'bean method tcpSupplierFlow'', message: GenericMessage [payload=Hello server from threadId 24 | timestamp 1713188678755�n, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@22a5034d, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@22a5034d, ip_tcp_remotePort=64720, ip_connectionId=127.0.0.1:64720:12345:93d694a1-a774-4cb7-911a-e18e40047bea, ip_address=127.0.0.1, id=3693d126-16c9-059d-e820-7af715e1337a, ip_hostname=127.0.0.1, timestamp=1713188678765}] 2024-04-15T15:44:38.767+02:00 DEBUG 26152 --- [pool-5-thread-3] c.f.c.c.BeanFactoryAwareFunctionRegistry : Converted Message: GenericMessage [payload=Hello server from threadId 23 | timestamp 1713188678755�n, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@10b01e60, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@10b01e60, ip_tcp_remotePort=64719, ip_connectionId=127.0.0.1:64719:12345:2a1da428-fbd9-4b5d-b83a-5b5a4b26bf1a, ip_address=127.0.0.1, id=47ea1331-8e47-4618-d208-e84d0b70f2b0, ip_hostname=127.0.0.1, timestamp=1713188678765}] to: class org.springframework.messaging.support.GenericMessage 2024-04-15T15:44:38.767+02:00 DEBUG 26152 --- [pool-5-thread-3] c.f.c.c.BeanFactoryAwareFunctionRegistry : Invoking function: messageHandler<org.springframework.messaging.Message<java.lang.String>, org.springframework.messaging.Message<java.lang.String>>with input type: org.springframework.messaging.Message<java.lang.String> 2024-04-15T15:44:38.767+02:00 ERROR 26152 --- [pool-5-thread-3] com.example.demo.Conf$$SpringCGLIB$$0 : RECEIVED MESSAGE: Hello server from threadId 23 | timestamp 1713188678755�n | receivedTimestamp 1713188678767 2024-04-15T15:44:41.769+02:00 DEBUG 26152 --- [oundedElastic-1] o.s.i.channel.FluxMessageChannel : preSend on channel 'bean 'tcpSupplier|messageHandler_integrationflow.channel#0'', message: GenericMessage [payload=Hello server from threadId 23 | timestamp 1713188678755�n, headers={id=7c3be25e-23cc-7054-c017-988dd42029a0, timestamp=1713188681769}] 2024-04-15T15:44:41.769+02:00 DEBUG 26152 --- [oundedElastic-1] o.s.i.router.MethodInvokingRouter : bean 'tcpSupplier|messageHandler_integrationflow.router#0' for component 'tcpSupplier|messageHandler_integrationflow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0' received message: GenericMessage [payload=Hello server from threadId 23 | timestamp 1713188678755�n, headers={id=7c3be25e-23cc-7054-c017-988dd42029a0, timestamp=1713188681769}] 2024-04-15T15:44:41.769+02:00 DEBUG 26152 --- [oundedElastic-1] o.s.c.s.m.DirectWithAttributesChannel : preSend on channel 'bean 'output'', message: GenericMessage [payload=Hello server from threadId 23 | timestamp 1713188678755�n, headers={id=7c3be25e-23cc-7054-c017-988dd42029a0, timestamp=1713188681769}] 2024-04-15T15:44:41.769+02:00 DEBUG 26152 --- [oundedElastic-1] tractMessageChannelBinder$SendingHandler : org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler@231e4dda received message: GenericMessage [payload=byte[57], headers={contentType=application/json, id=c38b7db9-1727-a2b3-d95c-9a5fcedafaec, timestamp=1713188681769}] 2024-04-15T15:44:41.769+02:00 DEBUG 26152 --- [oundedElastic-1] o.s.i.a.outbound.AmqpOutboundEndpoint : org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint@4331b295 received message: GenericMessage [payload=byte[57], headers={contentType=application/json, id=ae45cb45-95bd-67eb-3de3-7a858b37b37f, timestamp=1713188681769}] 2024-04-15T15:44:41.769+02:00 DEBUG 26152 --- [pool-5-thread-3] o.s.i.channel.FluxMessageChannel : postSend (sent=true) on channel 'bean 'tcpSupplierFlow.channel#1'; defined in: 'class path resource [com/example/demo/Conf.class]'; from source: 'bean method tcpSupplierFlow'', message: GenericMessage [payload=Hello server from threadId 23 | timestamp 1713188678755�n, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@10b01e60, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@10b01e60, ip_tcp_remotePort=64719, ip_connectionId=127.0.0.1:64719:12345:2a1da428-fbd9-4b5d-b83a-5b5a4b26bf1a, ip_address=127.0.0.1, id=47ea1331-8e47-4618-d208-e84d0b70f2b0, ip_hostname=127.0.0.1, timestamp=1713188678765}] 2024-04-15T15:44:41.769+02:00 DEBUG 26152 --- [pool-5-thread-3] o.s.integration.channel.DirectChannel : postSend (sent=true) on channel 'bean 'tcpSupplierFlow.channel#0'; defined in: 'class path resource [com/example/demo/Conf.class]'; from source: 'bean method tcpSupplierFlow'', message: GenericMessage [payload=Hello server from threadId 23 | timestamp 1713188678755�n, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@10b01e60, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@10b01e60, ip_tcp_remotePort=64719, ip_connectionId=127.0.0.1:64719:12345:2a1da428-fbd9-4b5d-b83a-5b5a4b26bf1a, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=ebbe5efc-0a4b-6b18-ca11-d86ba28acdc0, ip_hostname=127.0.0.1, timestamp=1713188678765}] 2024-04-15T15:44:41.769+02:00 DEBUG 26152 --- [oundedElastic-1] s.i.m.AbstractHeaderMapper$HeaderMatcher : headerName=[contentType] WILL be mapped, matched pattern=* 2024-04-15T15:44:41.769+02:00 INFO 26152 --- [oundedElastic-1] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [rabbitmq:5672] 2024-04-15T15:44:41.769+02:00 DEBUG 26152 --- [oundedElastic-1] c.r.client.impl.ConsumerWorkService : Creating executor service with 16 thread(s) for consumer work service 2024-04-15T15:44:41.785+02:00 DEBUG 26152 --- [pool-5-thread-2] c.f.c.c.BeanFactoryAwareFunctionRegistry : Converted Message: GenericMessage [payload=Hello server from threadId 24 | timestamp 1713188678755�n, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@22a5034d, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@22a5034d, ip_tcp_remotePort=64720, ip_connectionId=127.0.0.1:64720:12345:93d694a1-a774-4cb7-911a-e18e40047bea, ip_address=127.0.0.1, id=3693d126-16c9-059d-e820-7af715e1337a, ip_hostname=127.0.0.1, timestamp=1713188678765}] to: class org.springframework.messaging.support.GenericMessage 2024-04-15T15:44:41.785+02:00 DEBUG 26152 --- [pool-5-thread-2] c.f.c.c.BeanFactoryAwareFunctionRegistry : Invoking function: messageHandler<org.springframework.messaging.Message<java.lang.String>, org.springframework.messaging.Message<java.lang.String>>with input type: org.springframework.messaging.Message<java.lang.String> 2024-04-15T15:44:41.785+02:00 ERROR 26152 --- [pool-5-thread-2] com.example.demo.Conf$$SpringCGLIB$$0 : RECEIVED MESSAGE: Hello server from threadId 24 | timestamp 1713188678755�n | receivedTimestamp 1713188681785

In my test, I opened two socket connections on the client and sent a packet for each connection. As seen from the logs, the server processes the packet with threadId 24 with a delay of 3 seconds (from 2024-04-15T15:44:38.767+02:00 to 2024-04-15T15:44:41.785+02:00).

@olegz
Copy link
Contributor

olegz commented Apr 25, 2024

Composed function is a single function. It is not a chain of things to be executed.
It's as if you wrote all your code in a single function instead of multiple functions and then compose.
So look at it as such

@GioTem
Copy link
Author

GioTem commented Apr 26, 2024

Thank you for your response. I understand that function composition is seen as a single function. However, I can't understand why requests are serialized. After all, the tcpSupplierFlow is outside the function composition and therefore should handle requests in parallel. Can you give me some explanation about it?

@olegz
Copy link
Contributor

olegz commented Apr 29, 2024

Perhaps I am missing something. It would be simpler if you create a sample app that reproduces the issue and push it to github somewhere so I can run it and better understand what you are trying to accomplish

@olegz
Copy link
Contributor

olegz commented Jun 6, 2024

Any followup?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants