Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rsocket error "java.lang.IllegalStateException: Source has to be ASYNC fuseable" with Spring boot 3.2.x & JDK17 #1100

Open
abdulrahimseera opened this issue Jan 3, 2024 · 2 comments

Comments

@abdulrahimseera
Copy link

abdulrahimseera commented Jan 3, 2024

Expected Behavior

The Rocket server should respond to the client.
Hello from RSocket consumer

Actual Behavior

Throwing exception
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: Source has to be ASYNC fuseable

Error stack trace

reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: Source has to be ASYNC fuseable
Caused by: java.lang.IllegalStateException: Source has to be ASYNC fuseable
    at io.rsocket.resume.InMemoryResumableFramesStore$FramesSubscriber.onSubscribe(InMemoryResumableFramesStore.java:528) ~[rsocket-core-1.1.3.jar:na]
    at reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onSubscribe(FluxContextWriteRestoringThreadLocals.java:104) ~[reactor-core-3.6.1.jar:3.6.1]
    at reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onSubscribe(FluxContextWriteRestoringThreadLocals.java:104) ~[reactor-core-3.6.1.jar:3.6.1]
    at io.rsocket.internal.UnboundedProcessor.subscribe(UnboundedProcessor.java:414) ~[rsocket-core-1.1.3.jar:na]
    at reactor.core.publisher.FluxContextWriteRestoringThreadLocals.subscribe(FluxContextWriteRestoringThreadLocals.java:46) ~[reactor-core-3.6.1.jar:3.6.1]
    at reactor.core.publisher.Mono.subscribe(Mono.java:4512) ~[reactor-core-3.6.1.jar:3.6.1]
    at reactor.core.publisher.Mono.subscribeWith(Mono.java:4578) ~[reactor-core-3.6.1.jar:3.6.1]
    at reactor.core.publisher.Mono.subscribe(Mono.java:4339) ~[reactor-core-3.6.1.jar:3.6.1]
    at io.rsocket.resume.ResumableDuplexConnection.<init>(ResumableDuplexConnection.java:83) ~[rsocket-core-1.1.3.jar:na]
    at io.rsocket.core.ServerSetup$ResumableServerSetup.acceptRSocketSetup(ServerSetup.java:124) ~[rsocket-core-1.1.3.jar:na]
    at io.rsocket.core.RSocketServer.acceptSetup(RSocketServer.java:418) ~[rsocket-core-1.1.3.jar:na]
    at io.rsocket.core.RSocketServer.accept(RSocketServer.java:386) ~[rsocket-core-1.1.3.jar:na]
    at io.rsocket.core.RSocketServer.lambda$acceptor$0(RSocketServer.java:370) ~[rsocket-core-1.1.3.jar:na]
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:132) ~[reactor-core-3.6.1.jar:3.6.1]
    at reactor.core.publisher.FluxFirstWithSignal$FirstEmittingSubscriber.onNext(FluxFirstWithSignal.java:332) ~[reactor-core-3.6.1.jar:3.6.1]
    at reactor.core.publisher.SerializedSubscriber.onNext(SerializedSubscriber.java:99) ~[reactor-core-3.6.1.jar:3.6.1]
    at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.onNext(FluxTimeout.java:181) ~[reactor-core-3.6.1.jar:3.6.1]
    at reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onNext(FluxContextWriteRestoringThreadLocals.java:118) ~[reactor-core-3.6.1.jar:3.6.1]
    at reactor.core.publisher.MonoCreate$DefaultMonoSink.success(MonoCreate.java:176) ~[reactor-core-3.6.1.jar:3.6.1]
    at io.rsocket.core.SetupHandlingDuplexConnection.onNext(SetupHandlingDuplexConnection.java:115) ~[rsocket-core-1.1.3.jar:na]
    at io.rsocket.core.SetupHandlingDuplexConnection.onNext(SetupHandlingDuplexConnection.java:19) ~[rsocket-core-1.1.3.jar:na]
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122) ~[reactor-core-3.6.1.jar:3.6.1]
    at reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onNext(FluxContextWriteRestoringThreadLocals.java:118) ~[reactor-core-3.6.1.jar:3.6.1]
    at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:224) ~[reactor-core-3.6.1.jar:3.6.1]
    at reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onNext(FluxContextWriteRestoringThreadLocals.java:118) ~[reactor-core-3.6.1.jar:3.6.1]
    at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:294) ~[reactor-netty-core-1.1.14.jar:1.1.14]
    at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:403) ~[reactor-netty-core-1.1.14.jar:1.1.14]
    at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:426) ~[reactor-netty-core-1.1.14.jar:1.1.14]
    at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:114) ~[reactor-netty-core-1.1.14.jar:1.1.14]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) ~[netty-codec-4.1.104.Final.jar:4.1.104.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318) ~[netty-codec-4.1.104.Final.jar:4.1.104.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.104.Final.jar:4.1.104.Final]
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.104.Final.jar:4.1.104.Final]
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.104.Final.jar:4.1.104.Final]
    at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]

Steps to Reproduce

Sample project to reproduce the issue: https://github.com/abdulrahimseera/rsocket-spring-boot-3.2.x

Needs to run both services and then call "http://localhost:3333/hello/rsocket".

  1. rsocket-client
  2. rsocket-server

Possible Solution

Your Environment

  • RSocket version(s) used: rsocket-core 1.1.3 (spring-boot-starter-rsocket)
  • Other relevant libraries versions (eg. netty, ...): rsocket-transport-netty 1.1.3 & rsocket-micrometer 1.1.4
  • Platform (eg. JVM version (javar -version) or Node version (node --version)): JDK 17 with Spring boot 3.2.1
  • OS and version (eg uname -a): MacOs Venture Version 13.6.3 (22G436)
@abdulrahimseera
Copy link
Author

Oleh Dokuka

@vasyl-pryimak
Copy link

@abdulrahimseera Hey! I had the same problem.
The fix was to downgrade reactor-core lib to 3.5.16.
implementation("io.projectreactor:reactor-core:3.5.16")

The problem was in Mono subscribe method. In 3.6.x version they changed lines
publisher.subscribe(subscriber); to

CoreSubscriber subscriber = Operators.restoreContextOnSubscriberIfPublisherNonInternal(publisher, (CoreSubscriber)subscriber);
        publisher.subscribe(subscriber);

Operators.restoreContextOnSubscriberIfPublisherNonInternal returns FuseableContextWriteRestoringThreadLocalsSubscriber
In that class method requestFusion returns 0.

And that is why we have that exception in rsocket from InMemoryResumableFramesStore
See onSubscribe method qs.requestFusion(ANY); this return 0 from FuseableContextWriteRestoringThreadLocalsSubscriber

Didn't dig further, but hope this helps.

@OlegDokuka fyi

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants