Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

throw exception on errorConsumer, client will not finish #1105

Open
levil3 opened this issue Feb 4, 2024 · 0 comments
Open

throw exception on errorConsumer, client will not finish #1105

levil3 opened this issue Feb 4, 2024 · 0 comments

Comments

@levil3
Copy link

levil3 commented Feb 4, 2024

I tried throwing an exception in the errorConsumer on the server, so that the client's request seems to not be released, and will not do doOnComplete() and map()

Expected Behavior

I expect it to print 10 times "complete 1"

Actual Behavior

But it didn't print even once

Steps to Reproduce

my code is :

  • server
public static void main(String[] args) throws InterruptedException {
        RSocket rsocket = new RSocket() {
            @Override
            public Flux<Payload> requestStream(Payload p) {
                return Flux.create(emitter -> Mono.just("test").subscribe(payload -> {
                    throw new RuntimeException();
                }, throwable -> {
                    throw new RuntimeException();
                }, emitter::complete));
            }
        };

        LoopResources resources = LoopResources.create("test", 1, 2, true);
        TcpServer server = TcpServer.create().host("localhost").port(8090).runOn(resources);
        RSocketServer.create(SocketAcceptor.with(rsocket))
                .bind(TcpServerTransport.create(server))
                .subscribe();

        // wait
        TimeUnit.MINUTES.sleep(10);
}
  • client
public static void main(String[] args) throws InterruptedException {
        RSocket rSocket = RSocketConnector.create()
                .payloadDecoder(PayloadDecoder.ZERO_COPY)
                .connect(TcpClientTransport.create(8090))
                .doOnSuccess(rSocket1 -> {
                    System.out.println("connect server success!");
                })
                .block();

        for (int i = 0; i < 10; ++i) {
            Payload payload = ByteBufPayload.create("0");
            rSocket.requestStream(payload).map(Payload::getDataUtf8).doOnComplete(() -> {
                System.out.println("complete 1");
            }).map(e -> {
                System.out.println("doMap");
                return e;
            }).blockLast();
        }

        Thread.sleep(TimeUnit.SECONDS.toNanos(1000));

        // dispose
        rSocket.dispose();
}

Possible Solution

Your Environment

  • RSocket version(s) used: v1.1.4
  • jdk: 1.8
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant