Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

When failedPublishes list is empty, then receive a dup message , this causes it to be sent to all subscribers #671

Open
Taoaozw opened this issue Apr 5, 2022 · 0 comments

Comments

@Taoaozw
Copy link

Taoaozw commented Apr 5, 2022

Expected behavior

nothing todo

Actual behavior

send to all subscribers

Steps to reproduce

Minimal yet complete reproducer code (or URL to code) or complete log file

Please see comments:

  private CompletableFuture<RoutingResults> publish2Subscribers(ByteBuf payload, Topic topic, MqttQoS publishingQos,
                                                                  Set<String> filterTargetClients) {
        Set<Subscription> topicMatchingSubscriptions = subscriptions.matchQosSharpening(topic);

        final BatchingPublishesCollector collector = new BatchingPublishesCollector(eventLoops);

        for (final Subscription sub : topicMatchingSubscriptions) {
           // when  receive a dup message , but filterTargetClients is empty, 
          // it will continue to  send to all subscriber 
            if (filterTargetClients == NO_FILTER || filterTargetClients.contains(sub.getClientId())) {
                collector.add(sub);
            }
        }

        List<CompletableFuture<RouteResult>> publishFutures = collector.routeBatchedPublishes((batch) -> {
            publishToSession(payload, topic, batch, publishingQos);
        });

        final CompletableFuture<Void> publishes = CompletableFuture.allOf(publishFutures.toArray(new CompletableFuture[0]));
        return publishes.handle((result, exception) -> {
            final List<String> failedRoutings = new ArrayList<>();
            final List<String> successedRoutings = new ArrayList<>();
            for (CompletableFuture<RouteResult> cf : publishFutures) {
                RouteResult rr = cf.join();
                Collection<String> subscibersIds = collector.subscriberIdsByEventLoop(rr.clientId);
                if (rr.status == RouteResult.Status.FAIL) {
                    failedRoutings.addAll(subscibersIds);
                } else {
                    successedRoutings.addAll(subscibersIds);
                }
            }
            return new RoutingResults(successedRoutings, failedRoutings);
        });
    }

Moquette MQTT version

0.16

JVM version (e.g. java -version)

1.8

OS version (e.g. uname -a)

Darwin Kernel Version 21.3.0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant