Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

requestChannel: error when subscribing again #204

Open
dholzenburg opened this issue Jan 2, 2022 · 1 comment
Open

requestChannel: error when subscribing again #204

dholzenburg opened this issue Jan 2, 2022 · 1 comment

Comments

@dholzenburg
Copy link

dholzenburg commented Jan 2, 2022

When using requestChannel(), one can subscribe to the Flowable returned. One can unsubscribe as well. If one subscribes again to this flowable, it does not work.

Expected Behavior

After re-subscribing, one will receive values again with the given subscriber. No warning will occur on console.

Actual Behavior

A warning is logged on console (RSocketClient: re-entrant call to request n before initial channel established.). No new values are received by the subscriber.

Steps to Reproduce

const metadata = ...;
const client = new RSocketClient(...);
const processor = new FlowableProcessor(sub => { });
const inputFlowable = processor.map(i => { return { data: i, metadata: metadata } });
client.connect().then(
	socket => {
		var subscription;
		const subscriber = {
			onNext: value => console.log('Value from Responder', value.data),
			onSubscribe: sub => { subscription = sub; sub.request(0x7fffffff) },
		};

                // responder will multiply the given value by 2
		const channelFlowable = socket.requestChannel(inputFlowable);
		channelFlowable.subscribe(subscriber);

		setTimeout(() => processor.onNext(1), 1000);
		setTimeout(() => processor.onNext(2), 2000);
		setTimeout(() => { console.log("unsubscribe"); subscription.cancel(); }, 3000);
		setTimeout(() => { console.log("subscribe again"); channelFlowable.subscribe(subscriber); }, 4000);
		setTimeout(() => processor.onNext(3), 5000);

               //output on console
               // Value from Responder 2
               // Value from Responder 4
               // Unsubscribe
               // subscribe again
               // RSocketClient: re-entrant call to request n before initial channel established.
	}
);

Workaround

Instead of re-subscribing call requestChannel() again

Possible Solution

Looks like the variables 'initialized' and 'payloadsSubscribed' in the requestChannel()-implementation of RSocketMachine are not reset.

Your Environment

  • RSocket version(s) used: 0.0.27

Happy new year!

@viglucci viglucci added the needs triage Issue/PR needs triage by a project maintainer label Mar 6, 2022
@viglucci
Copy link
Member

viglucci commented May 9, 2022

Hi @dholzenburg ,

If I am interpreting the protocol documentation correctly, I believe that this is working as per protocol spec. The Protocol spec for Request Channel states:

Upon receiving a CANCEL, the stream is terminated on the Responder.

Upon sending a CANCEL, the stream is terminated on the Requester.

Given that, if the Requester or a Responder of a RequestChannel stream send CANCEL, you must re-establish the stream using a subsequent call to requestChannel. You cannot simply re-subscribe to the stream returned form requestStream, as it has already been terminated.

I will also note that while the protocol has not changed, these APIs have changed recently with 1.0.0-alpha.1, and the flowable API is no longer available.

@viglucci viglucci removed the needs triage Issue/PR needs triage by a project maintainer label May 9, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants