Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

java.util.concurrent.CancellationException: Disposed when disposing flux from channel communication in tests #1047

Open
mkrzywanski opened this issue Apr 10, 2022 · 5 comments

Comments

@mkrzywanski
Copy link

I have created a sample application which uses channel style communication. Unfortunatelly when cancelling the subscription from the tests I receive an exception :

stacktrace
  022-04-10 20:33:49.106 ERROR 338925 --- [or-http-epoll-4] reactor.core.publisher.Operators         : Operator called default onErrorDropped

reactor.core.Exceptions$ErrorCallbackNotImplemented: java.util.concurrent.CancellationException: Disposed
Caused by: java.util.concurrent.CancellationException: Disposed
	at io.rsocket.internal.UnboundedProcessor.dispose(UnboundedProcessor.java:545) ~[rsocket-core-1.1.2.jar:na]
	at io.rsocket.transport.netty.TcpDuplexConnection.doOnClose(TcpDuplexConnection.java:67) ~[rsocket-transport-netty-1.1.1.jar:na]
	at io.rsocket.internal.BaseDuplexConnection.lambda$new$0(BaseDuplexConnection.java:30) ~[rsocket-core-1.1.2.jar:na]
	at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.runFinally(FluxDoFinally.java:163) ~[reactor-core-3.4.16.jar:3.4.16]
	at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onComplete(FluxDoFinally.java:146) ~[reactor-core-3.4.16.jar:3.4.16]
	at reactor.core.publisher.SinkEmptyMulticast$VoidInner.complete(SinkEmptyMulticast.java:238) ~[reactor-core-3.4.16.jar:3.4.16]
	at reactor.core.publisher.SinkEmptyMulticast.tryEmitEmpty(SinkEmptyMulticast.java:70) ~[reactor-core-3.4.16.jar:3.4.16]
	at reactor.core.publisher.SinkEmptySerialized.tryEmitEmpty(SinkEmptySerialized.java:46) ~[reactor-core-3.4.16.jar:3.4.16]
	at io.rsocket.internal.BaseDuplexConnection.dispose(BaseDuplexConnection.java:51) ~[rsocket-core-1.1.2.jar:na]
	at io.rsocket.transport.netty.TcpDuplexConnection.lambda$new$0(TcpDuplexConnection.java:49) ~[rsocket-transport-netty-1.1.1.jar:na]
	at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578) ~[netty-common-4.1.75.Final.jar:4.1.75.Final]
	at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:571) ~[netty-common-4.1.75.Final.jar:4.1.75.Final]
	at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:550) ~[netty-common-4.1.75.Final.jar:4.1.75.Final]
	at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491) ~[netty-common-4.1.75.Final.jar:4.1.75.Final]
	at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616) ~[netty-common-4.1.75.Final.jar:4.1.75.Final]
	at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:605) ~[netty-common-4.1.75.Final.jar:4.1.75.Final]
	at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:104) ~[netty-common-4.1.75.Final.jar:4.1.75.Final]
	at io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:84) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
	at io.netty.channel.AbstractChannel$CloseFuture.setClosed(AbstractChannel.java:1164) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
	at io.netty.channel.AbstractChannel$AbstractUnsafe.doClose0(AbstractChannel.java:755) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
	at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:731) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
	at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:620) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
	at io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.shutdownInput(AbstractEpollChannel.java:522) ~[netty-transport-classes-epoll-4.1.75.Final.jar:4.1.75.Final]
	at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:823) ~[netty-transport-classes-epoll-4.1.75.Final.jar:4.1.75.Final]
	at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480) ~[netty-transport-classes-epoll-4.1.75.Final.jar:4.1.75.Final]
	at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) ~[netty-transport-classes-epoll-4.1.75.Final.jar:4.1.75.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) ~[netty-common-4.1.75.Final.jar:4.1.75.Final]
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.75.Final.jar:4.1.75.Final]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.75.Final.jar:4.1.75.Final]
	at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]

Expected Behavior

The exception is not thrown on channel subscription cancellation.

Actual Behavior

Exception is thrown.

Steps to Reproduce

Sample application - https://github.com/mkrzywanski/rsocket-reactive-chat. Just run user1ShouldGetMessagesFromUser2 test.

Your Environment

Library versions are listed in the provided project.

  • Platform (eg. JVM version (javar -version) or Node version (node --version)):
    openjdk 17.0.1 2021-10-19
    OpenJDK Runtime Environment Temurin-17.0.1+12 (build 17.0.1+12)
    OpenJDK 64-Bit Server VM Temurin-17.0.1+12 (build 17.0.1+12, mixed mode, sharing)

  • OS and version (eg uname -a): Ubunut 20

@OlegDokuka
Copy link
Member

@mkrzywanski can your try to add to your main method a line with Hooks.onOperatorDebug to see the full stack trace of the exception

@mkrzywanski
Copy link
Author

Since it appears in tests, I have added it in my @BeforeAll method but the stacktrace has not changed, it remains the same. Also the exception seems to be logged multiple times from different threads.

Adding Hooks.onErrorDropped(throwable -> {}); makes it disappear tho, but this is not a way to go I believe.

@OlegDokuka
Copy link
Member

OlegDokuka commented Apr 10, 2022

@mkrzywanski Also, can you try to reproduce the same just with a pure rsocket to reduce the problem surface?

@mkrzywanski
Copy link
Author

mkrzywanski commented Apr 10, 2022

I can give it a try, but I am not that familiar with rsocket api yet as I started using it today.

@mkrzywanski
Copy link
Author

mkrzywanski commented Apr 12, 2022

It seems that the error is caused when cancelling the Flux returned from mongodb changeStream feature. I tried several configurations with and without security/mongodb change stream and only not returning mongodb changeStream flux from my message mapping handler helped.

# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants