Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: TopicMatcher as extension trait to iterators (BREAKING) #228

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from

Conversation

Altair-Bueno
Copy link

Solves #226

@Altair-Bueno
Copy link
Author

Thanks God ECA didn't fail this time 😅

@Altair-Bueno
Copy link
Author

Altair-Bueno commented May 10, 2024

I couldn't run the tests myself as there were some compilation errors:

error[E0277]: the trait bound `message::Message: std::convert::From<(&str, &[u8], types::QoS, bool)>` is not satisfied
   --> src/message.rs:402:19
    |
402 |         let msg = Message::from((TOPIC, PAYLOAD, QOS, RETAINED));
    |                   ^^^^^^^ the trait `std::convert::From<(&str, &[u8], types::QoS, bool)>` is not implemented for `message::Message`
    |
    = help: the following other types implement trait `std::convert::From<T>`:
              <message::Message as std::convert::From<token::DeliveryToken>>
              <message::Message as std::convert::From<(&'a str, &'b [u8])>>
              <message::Message as std::convert::From<(&'a str, &'b [u8], i32, bool)>>

error[E0308]: mismatched types
   --> src/message.rs:545:38
    |
545 |             assert_eq!(QOS as c_int, msg.qos());
    |                                      ^^^^^^^^^ expected `i32`, found `QoS`

Some errors have detailed explanations: E0277, E0308.
For more information about an error, try `rustc --explain E0277`.
error: could not compile `paho-mqtt` (lib test) due to 2 previous errors
warning: build failed, waiting for other jobs to finish...

@Altair-Bueno Altair-Bueno marked this pull request as draft May 10, 2024 07:38
@fpagliughi
Copy link
Contributor

Thanks for the submission and wrestling with the ECA!

The simple matches() function with pattern matching is really nice; efficient for a single filter, and easy to follow. I like it. I think it would definitely be useful for the TopicFilter and as a publicly available function that applications could use directly if they wanted. I would probably want to split it into two functions, putting the main implementation into a function that took an iterator to the fields of the filter so that you didn't have the runtime overhead of splitting the filter string every time you tested for a match. My assumption is that a primary use case for topic matching would have the filters set up once at the beginning of the program, and then used throughout. So you could split once and use over and over.

But the function does miss a corner case: Wildcards in the first field should not match a topic that starts with a dollar sign $. See: https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901246

So together, something like this:

pub fn matches(filter: &str, topic: &str) -> bool {
    matches_iter(filter.split('/'), topic)
}

pub fn matches_iter<'a>(mut filter: impl Iterator<Item=&'a str>, topic: &str) -> bool {
    let mut first = true;
    let mut topic = topic.split('/');

    loop {
        match (filter.next(), topic.next()) {
            // Exhausted both filter and topic
            (None, None) => return true,
            // Wildcard on filter
            (Some("#"), Some(field)) => {
                return !first || !field.starts_with('$');
            }
            // Single level wildcard on filter
            (Some("+"), Some(field)) => {
                if first && field.starts_with('$') {
                    return false;
                } // else ()
            }
            // Equal levels
            (Some(filter), Some(topic)) if filter == topic => (),
            // Otherwise, no match
            _ => return false,
        }
        first = false;
    }
}

To verify, I extracted the tests from the Python library and added a few extras:

    // Should match

    assert!(matches("foo/bar", "foo/bar"));
    assert!(matches("foo/+", "foo/bar"));
    assert!(matches("foo/+/baz", "foo/bar/baz"));
    assert!(matches("foo/+/#", "foo/bar/baz"));
    assert!(matches("A/B/+/#", "A/B/B/C"));
    assert!(matches("#", "foo/bar/baz"));
    assert!(matches("#", "/foo/bar"));
    assert!(matches("/#", "/foo/bar"));
    assert!(matches("$SYS/bar", "$SYS/bar"));
    assert!(matches("foo/#", "foo/$bar"));
    assert!(matches("foo/+/baz", "foo/$bar/baz"));

    // Should not match

    assert!(!matches("test/6/#", "test/3"));
    assert!(!matches("foo/bar", "foo"));
    assert!(!matches("foo/+", "foo/bar/baz"));
    assert!(!matches("foo/+/baz", "foo/bar/bar"));
    assert!(!matches("foo/+/#", "fo2/bar/baz"));
    assert!(!matches("/#", "foo/bar"));
    assert!(!matches("#", "$SYS/bar"));
    assert!(!matches("$BOB/bar", "$SYS/bar"));
    assert!(!matches("+/bar", "$SYS/bar"));

The updated version passes all the tests.

@fpagliughi
Copy link
Contributor

But the problem with implementing the matcher as a collection of string filters is that it runs the risk of trading some up-front (probably one-time) allocations with runtime performance. And runtime performance is far more critical.

A trie structure is extremely performant for lookups. Especially one that works correctly 🫤

Say you're writing an RPC server that maps a lot of similar request topics to callback functions, like:

requests/gpiodaemon/v1/gpio/set_bit/#  -> set_bit()
requests/gpiodaemon/v1/gpio/get_bit/#  -> get_bit()
requests/gpiodaemon/v1/gpio/toggle_bit/#  -> toggle_bit()
...

and imagine you had hundreds of callbacks. For each incoming request message you would need to manually split each of those hundres of filters, as you searched through them, and then walk down the fields of each topic. Every time.

With a trie you only need to walk down six nodes to find the matching value in this example... no matter how many topics are in the collection.

So finding a match against a single topic is less efficient. But finding a match in a large collection of topics is much more efficient. At the cost of some up-front allocations.

@Altair-Bueno
Copy link
Author

putting the main implementation into a function that took an iterator to the fields of the filter so that you didn't have the runtime overhead of splitting the filter string every time you tested for a match

Good idea. It is also useful for application that (internally) use topics as arrays of levels.

But the function does miss a corner case: Wildcards in the first field should not match a topic that starts with a dollar sign $. See: https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901246

Wow that's a good one. Gotcha. However I would rather have something like this:

pub fn matches_iter<'a>(filter: impl Iterator<Item = &'a str>, topic: impl Iterator<Item = &'a str>) -> bool {
    let mut filter = filter.peekable();
    let mut topic = topic.peekable();
    
    // See https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901246
    if matches!(filter.peek(), Some(&"#" | &"+")) && topic.peek().unwrap_or_default().starts_with('$') {
        return false;
    }

    loop {
        let filter_level = filter.next();
        let topic_level = topic.next();
        match (filter_level, topic_level) {
            // Exhausted both filter and topic
            (None, None) => return true,
            // Wildcard on filter
            (Some("#"), _) => return true,
            // Single level wildcard on filter
            (Some("+"), Some(_)) => continue,
            // Equal levels
            (Some(filter), Some(topic)) if filter == topic => continue,
            // Otherwise, no match
            _ => return false,
        }
    }
}

It clearly states the corner case before the main loop. Also, while we are at it, why not make topic an iterator too?

A trie structure is extremely performant for lookups. Especially one that works correctly 🫤

In theory, yes. However, I don't think its worth our time. Modern CPUs really like comparing contiguous strings. A trie structure would only be desirable if you maintain a considerable large collection of subscriptions with lots levels. Considering that the target audience for this crate are mostly MQTT clients for embedded and web services, I think this implementation is sufficient, and more importantly, correct.

@fpagliughi
Copy link
Contributor

Yeah, I was thinking of checking for the corner case at the top. I think either implementation is fine.

Also, while we are at it, why not make topic an iterator too?

No, that's not necessary. You typically specify the filter once when you create the matcher object, so it's optimal to split it once at the time of creation. But you get the topic as a &str for each incoming message, so you do need to split the topic for every match operation.

A trie structure would only be desirable if you maintain a considerable large collection of subscriptions with lots levels.

I, personally, maintain considerably large collection of subscriptions with lots levels! I do a lot of RPC-driven microservices on beefy Linux gateways, so runtime performance is paramount and I'm willing to sacrifice some allocations to get it.

But really the only way to find out the optimal solution is to write some tests to measure performance. TODO.

Considering that the target audience for this crate are mostly MQTT clients for embedded and web services...

This library is for the heftier operating systems: Linux, Windows, and MacOs; not for an RTOS. And considering all the issues that I've answered for problems on Windows, I can't say that this is mostly used in embedded systems! Who knows?

But your point is taken. This library may be used in a number of different systems where one solution is not the best.

So what I'm thinking....

  1. Let's merge your code, but rename the trait something else. (MqttTopicMatcher?, TopicMatcherExt?, something...)
  2. Let's keep the trie as TopicMatcher, but fix it so it works as expected, and optimize it to reduce allocations and maximize runtime performance.
  3. Make the trie TopicMatcher implement the trait.
  4. Any additions to the clients would use trait objects and not the trie, specifically.

That should give us the best of both worlds. User's would have a few choices out-of-the-box, but also be able to create a new implementation to suit their personal needs.

And it will (mostly) eliminate the breaking change. (Just need to add the trait declaration to apps). I have a lot of code I don't want to change.

I can merge your PR as-is and then restore the trie, or if you want to update the PR to just add your code and keep the pre-existing code. Let me know. (I've already started on #2 and #3)


Oh, and BTW... if you can use MQTT v5, then use subscription identifiers whenever possible, and you don't need a topic matcher at all!!!

@Altair-Bueno
Copy link
Author

Altair-Bueno commented May 13, 2024

No, that's not necessary. You typically specify the filter once when you create the matcher object, so it's optimal to split it once at the time of creation. But you get the topic as a &str for each incoming message, so you do need to split the topic for every match operation.

Consider this example:

use paho_mqtt::topic_matcher::matches_iter;

let message = // From a received MQTT message

let levels: Vec<_> = message.topic().split('/').collect();

if matches_iter(["+", "living_room", "+"].iter(), levels.iter()) {
    let device_id: u8 = levels[0].parse()?;
    let command: Command = levels[3].parse()?;
    do_stuff(device_id, command, message);
}

if matches_iter(["+", "bathroom", "+", "+"].iter(), levels.iter()) {
    let device_id: u8 = levels[0].parse()?;
    let command: Command = levels[3].parse()?;
    let arg = levels[4].parse()?;
    // ...
}

If matches_iter took a string, the splitting would be duplicated for every matches_iter called.

Also, consider matches_iter taking IntoIter arguments instead of Iterator (https://doc.rust-lang.org/std/iter/trait.IntoIterator.html#impl-IntoIterator-for-I)

Let's merge your code, but rename the trait something else. (MqttTopicMatcher?, TopicMatcherExt?, something...)

Roger. TopicMatcherExt then. I'll add the missing tests. Waiting for final comments on matches_iter and then we can merge

Make the trie TopicMatcher implement the trait.

Note that if TopicMatcher: IntoIterator this won't be possible, as for the blanked implementation.

I can merge your PR as-is and then restore the trie, or if you want to update the PR to just add your code and keep the pre-existing code. Let me know. (I've already started on #2 and #3)

Better to keep it separate imho

Oh, and BTW... if you can use MQTT v5, then use subscription identifiers whenever possible, and you don't need a topic matcher at all!!!

Hmm didn't really bother with MQTT v5. Ill take a look nonetheless.

@fpagliughi
Copy link
Contributor

You should be using v5. Everyone should. The improvements to the spec are astounding. Better error reporting, server management, subscription options (no echo, identifiers), and message metadata (such as for standardized RPCs).

Sorry, I should have mentioned this from the start because v5 can totally eliminate the burden of matching from the client and shift it to the server. Which is probably what you want in an under-powered client.

As of a few months ago RabbitMQ was the last of the major brokers to add MQTT v5 support, and although it is incomplete, it does cover a lot of the spec, including subscription identifiers. All the other major brokers (Mosquitto, VerneMQ, HiveMQ, etc) have had full v5 support for years. So unless your suck with a really old broker due to your slow-moving IT department, there's no reason not to move to v5.

With subscription identifiers, when you subscribe, you add a property to the subscription message assigning an integer identifier to the topic filter.

Then, when the broker sends you a message, it includes the subscription identifier(s), telling you which filters matched for that message by integer ID. So you don't need a topic matcher at all; the broker does the work for you. And finding a value associated with that specific topic filter is a simple integer lookup table.

Note that if you "subscribe many" with several filters in one SUBSCRIBE message, they all get the same ID, so you would typically subscribe to filters individually to tell them apart. And the broker has the option of sending you a separate message for each match with a single ID, or one message with multiple IDs. So watch for that if you overlap.

The sync_consume_v5.rs example is a full implementation of a client using subscription identifiers with integer table lookup for associating callbacks to incoming messages.

@Altair-Bueno
Copy link
Author

Unfortunately my application is required to work with v3.11 as a baseline. We will look into optimizing the software for v5 tho, it sounds promising!

@Altair-Bueno Altair-Bueno marked this pull request as ready for review May 13, 2024 15:09
@fpagliughi
Copy link
Contributor

fpagliughi commented May 18, 2024

So I fixed, cleaned up, and optimized the TopicMapper trie implementation a little better. It now passes the same unit tests for matching. (I had missed the leading '$' non-wildcard-match, too!). Now the only dynamic allocation for a search is a Vec<> stack, which seems pretty typical for a Rust tree iterator.

I put it into a new branch topic_matcher and merged it with this PR.
https://github.com/eclipse/paho.mqtt.rust/blob/topic_matcher/src/topic_matcher.rs

Now I'm trying to figure how to get to a trait that both techniques can share.

I implemented an iterator that walks the whole collection so you can see all the items. And used it for an IntoIterator. But that's not how it should be used to do brute-force matches through all the items to match a topic. It totally eliminates the optimized search that the trie provides in the first place.

So, any ideas for a trait that allows for an implementation to use its optimal search? Or am I missing how to implement what's here already?

Oh, I also renamed the matches() function to topic_matches(), if that's ok. I kept confusing it with the std matches!() macro.


EDIT: Oh, I see one mistake I made. A HashMap returns k/v pairs as a tuple of references, (&K, &V). For runtime performance I'm caching the key as a string in each node so that I don't need to dynamically rebuild the keys in the iterators. But returning the pair like that gives a ref to the tuple,&(String, V). That seems wrong in two ways, I can change that to a tuple of references, but it seems better to use a &str rather than &String. So I will update the iterators to return (&str, &V), which should bring the TopicMapper one step closer to a generic mapper.

@fpagliughi
Copy link
Contributor

The existing trait also breaks MSRV with v1.63.0:

error[E0562]: `impl Trait` only allowed in function and inherent method return types, not in trait method return
   --> src/topic_matcher.rs:127:10
    |
127 |     ) -> impl Iterator<Item = (Self::Key, Self::Value)> + 'topic
    |          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

error[E0562]: `impl Trait` only allowed in function and inherent method return types, not in `impl` method return
   --> src/topic_matcher.rs:143:10
    |
143 |     ) -> impl Iterator<Item = (Self::Key, Self::Value)> + 'topic
    |          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

For more information about this error, try `rustc --explain E0562`.
error: could not compile `paho-mqtt` due to 2 previous errors

We may be able to bump that at some point, but I know people are using this crate with Yocto, and the the Rust compiler bundled into the last two versions are:

  • Langdale, Rust v1.63
  • Mickledore, Rust v1.68.2

@Altair-Bueno
Copy link
Author

So I fixed, cleaned up, and optimized the TopicMapper trie implementation a little better

Those are some amazing news. Thank you.

So, any ideas for a trait that allows for an implementation to use its optimal search? Or am I missing how to implement what's here already?

Without https://rust-lang.github.io/rfcs/1210-impl-specialization.html there is little to be done. Either the trait is changed or TopicMatcher:!IntoIterator

Oh, I also renamed the matches() function to topic_matches(), if that's ok. I kept confusing it with the std matches!() macro.

Yea, good one. I'm terrible naming things

The existing trait also breaks MSRV with v1.63.0:

Does it break MSRV if you guard the trait behind a feature flag?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants