Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

java.util.concurrent.CancellationException: Disposed when disposing flux from channel communication in tests #1047

Open
mkrzywanski opened this issue Apr 10, 2022 · 5 comments

Comments

@mkrzywanski
Copy link

I have created a sample application which uses channel style communication. Unfortunatelly when cancelling the subscription from the tests I receive an exception :

stacktrace
  022-04-10 20:33:49.106 ERROR 338925 --- [or-http-epoll-4] reactor.core.publisher.Operators         : Operator called default onErrorDropped

reactor.core.Exceptions$ErrorCallbackNotImplemented: java.util.concurrent.CancellationException: Disposed
Caused by: java.util.concurrent.CancellationException: Disposed
	at io.rsocket.internal.UnboundedProcessor.dispose(UnboundedProcessor.java:545) ~[rsocket-core-1.1.2.jar:na]
	at io.rsocket.transport.netty.TcpDuplexConnection.doOnClose(TcpDuplexConnection.java:67) ~[rsocket-transport-netty-1.1.1.jar:na]
	at io.rsocket.internal.BaseDuplexConnection.lambda$new$0(BaseDuplexConnection.java:30) ~[rsocket-core-1.1.2.jar:na]
	at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.runFinally(FluxDoFinally.java:163) ~[reactor-core-3.4.16.jar:3.4.16]
	at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onComplete(FluxDoFinally.java:146) ~[reactor-core-3.4.16.jar:3.4.16]
	at reactor.core.publisher.SinkEmptyMulticast$VoidInner.complete(SinkEmptyMulticast.java:238) ~[reactor-core-3.4.16.jar:3.4.16]
	at reactor.core.publisher.SinkEmptyMulticast.tryEmitEmpty(SinkEmptyMulticast.java:70) ~[reactor-core-3.4.16.jar:3.4.16]
	at reactor.core.publisher.SinkEmptySerialized.tryEmitEmpty(SinkEmptySerialized.java:46) ~[reactor-core-3.4.16.jar:3.4.16]
	at io.rsocket.internal.BaseDuplexConnection.dispose(BaseDuplexConnection.java:51) ~[rsocket-core-1.1.2.jar:na]
	at io.rsocket.transport.netty.TcpDuplexConnection.lambda$new$0(TcpDuplexConnection.java:49) ~[rsocket-transport-netty-1.1.1.jar:na]
	at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578) ~[netty-common-4.1.75.Final.jar:4.1.75.Final]
	at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:571) ~[netty-common-4.1.75.Final.jar:4.1.75.Final]
	at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:550) ~[netty-common-4.1.75.Final.jar:4.1.75.Final]
	at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491) ~[netty-common-4.1.75.Final.jar:4.1.75.Final]
	at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616) ~[netty-common-4.1.75.Final.jar:4.1.75.Final]
	at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:605) ~[netty-common-4.1.75.Final.jar:4.1.75.Final]
	at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:104) ~[netty-common-4.1.75.Final.jar:4.1.75.Final]
	at io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:84) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
	at io.netty.channel.AbstractChannel$CloseFuture.setClosed(AbstractChannel.java:1164) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
	at io.netty.channel.AbstractChannel$AbstractUnsafe.doClose0(AbstractChannel.java:755) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
	at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:731) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
	at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:620) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
	at io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.shutdownInput(AbstractEpollChannel.java:522) ~[netty-transport-classes-epoll-4.1.75.Final.jar:4.1.75.Final]
	at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:823) ~[netty-transport-classes-epoll-4.1.75.Final.jar:4.1.75.Final]
	at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480) ~[netty-transport-classes-epoll-4.1.75.Final.jar:4.1.75.Final]
	at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) ~[netty-transport-classes-epoll-4.1.75.Final.jar:4.1.75.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) ~[netty-common-4.1.75.Final.jar:4.1.75.Final]
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.75.Final.jar:4.1.75.Final]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.75.Final.jar:4.1.75.Final]
	at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]

Expected Behavior

The exception is not thrown on channel subscription cancellation.

Actual Behavior

Exception is thrown.

Steps to Reproduce

Sample application - https://github.com/mkrzywanski/rsocket-reactive-chat. Just run user1ShouldGetMessagesFromUser2 test.

Your Environment

Library versions are listed in the provided project.

  • Platform (eg. JVM version (javar -version) or Node version (node --version)):
    openjdk 17.0.1 2021-10-19
    OpenJDK Runtime Environment Temurin-17.0.1+12 (build 17.0.1+12)
    OpenJDK 64-Bit Server VM Temurin-17.0.1+12 (build 17.0.1+12, mixed mode, sharing)

  • OS and version (eg uname -a): Ubunut 20

@OlegDokuka
Copy link
Member

@mkrzywanski can your try to add to your main method a line with Hooks.onOperatorDebug to see the full stack trace of the exception

@mkrzywanski
Copy link
Author

Since it appears in tests, I have added it in my @BeforeAll method but the stacktrace has not changed, it remains the same. Also the exception seems to be logged multiple times from different threads.

Adding Hooks.onErrorDropped(throwable -> {}); makes it disappear tho, but this is not a way to go I believe.

@OlegDokuka
Copy link
Member

OlegDokuka commented Apr 10, 2022

@mkrzywanski Also, can you try to reproduce the same just with a pure rsocket to reduce the problem surface?

@mkrzywanski
Copy link
Author

mkrzywanski commented Apr 10, 2022

I can give it a try, but I am not that familiar with rsocket api yet as I started using it today.

@mkrzywanski
Copy link
Author

mkrzywanski commented Apr 12, 2022

It seems that the error is caused when cancelling the Flux returned from mongodb changeStream feature. I tried several configurations with and without security/mongodb change stream and only not returning mongodb changeStream flux from my message mapping handler helped.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants