Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send a message to an already established request-stream connection from the rsocket-server to all signed rscoket-clients except one #1096

Open
TashaBond opened this issue Sep 10, 2023 · 0 comments

Comments

@TashaBond
Copy link

TashaBond commented Sep 10, 2023

I have two rsocket-client and one rsocket-server. Both rsocket-client initially subscribe to the broadcast route listenCommand to receive commands from rsocket-server. Then one of the clients sends a command on route executeCommand to send it to other rsocketRequsters besides itself via the previously established route listenCommand connection. That is, I need to send the message to all rsocketRequester excluding myself through the already established connection between client and server. Now I get the error "reactor.core.Execptions$ErrorCallbackNotImplemented: ApplicationErrorExecption (0x201): Requset-Stream not implemented" and the second client does not receive a command from the listenCommand subscription. I've seen that it is suggested to make a client-side handler, but is it possible to solve this problem on the server? Maybe I'm missing something important?

Expected Behavior

The client that sends the command will not receive it over the broadcast connection, but the other client will.

Actual Behavior

Now I get the error "reactor.core.Execptions$ErrorCallbackNotImplemented: ApplicationErrorExecption (0x201): Requset-Stream not implemented" and the second client does not receive a command from the listenCommand subscription.

Rsocket-server controller:

private Sinks.Many<String> executedCommandSink = Sinks.many().multicast()
    .directBestEffort();

@MessageMapping("project.{projectId}.command")
public Flux<String> listenCommand(@DestinationVariable String projectId, RSocketRequester requester) {

    return executedCommandSink.asFlux();
}

@MessageMapping("project.{projectId}.command.execute")
public Mono<?> executeCommand(
        @DestinationVariable String projectId, Mono<String> commandMono, RSocketRequester requester) {

    Set<RSocketRequester> requesters = getRequestersForSendCommand(projectId, requester);

    return commandMono.flatMap(command -> {
        sendGraphCommand(requesters, command);
        return Mono.just(command);
    });
}

private void sendCommand(Set<RSocketRequester> requesters, String command) {
    requesters.forEach(requesterToSend -> requesterToSend
        .route("project.{projectId}.command", command)
        .data(Flux.just(command))
        .retrieveFlux(String.class)
        .subscribe());
}

public Set<RSocketRequester> getRequestersForSendGraphCommand(String projectId, RSocketRequester requester) {
    Set<RSocketRequester> sessionsByProject = ConcurrentHashMap.newKeySet();
    allRequesters.forEachEntry(1L, entry -> {
        if (entry.getValue().equals(projectId) && entry.getKey() != requester) {
            sessionsByProject.add(entry.getKey());
        }
    });
    return sessionsByProject;
}

Rsocket-client:

@Autowired
    public RSocketManagerClient(RSocketRequester.Builder rsocketRequesterBuilder, RSocketStrategies strategies) {

        clientUUID = UUID.randomUUID().toString();
        log.info("Connecting using client ID: {}", clientUUID);

        this.rsocketRequester = rsocketRequesterBuilder
                .setupData(clientUUID)
                .rsocketStrategies(strategies)
    .connectWebSocket(URI.create("ws://127.0.0.1:8307/rsocket"))
                .subscribeOn(Schedulers.parallel())
                .block();

        this.rsocketRequester.rsocket()
                .onClose()
                .doFirst(() -> log.info("Client: {} CONNECTED.", clientUUID))
                .doOnError(error -> log.error("Connection to client {} CLOSED", clientUUID))
                .doFinally(consumer -> log.info("Client {} DISCONNECTED", clientUUID))
                .subscribe();
    
public void executeCommand() {
    log.info("\nClient with id-{} subscribe on execute command", clientUUID);

    String block = this.rsocketRequester
            .route("project.db28981b-9ad4-4867-bc2d-11b4522f865c.command.execute")
            .data("Hello")
            .retrieveMono(String.class)
            .doOnNext(System.out::println)
            .block();
    log.info("Response: {}", block);
}

public void subscribeOnCommand() {
    log.info("\nClient with id-{} subscribe on command", clientUUID);

    this.rsocketRequester
            .route("project.db28981b-9ad4-4867-bc2d-11b4522f865c.command")
            .retrieveFlux(String.class)
            .doOnNext(System.out::println)
            .subscribe();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant