Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify the operation handling code by replacing fragmented control flow with regular async/await #2881

Closed
wants to merge 6 commits into from

Conversation

Bravo555
Copy link
Contributor

@Bravo555 Bravo555 commented May 15, 2024

Proposed changes

This PR should make the operation handle simpler to reason about and change, by allowing operation handlers to use .await without blocking. The scope of the PR is to show the idea working on the config_snapshot and config_update operations.

Below i describe the current state, which is subject to change.

  1. In 6b90d51, the main C8y mapper actor is prepared for its messages to be handled concurrently in separate tasks. For now, we use Mutexes to guard the MqttPublisher and CumulocityConverter:
    • using wrapper struct MqttPublisher(Arc<Mutex<LoggingSender<MqttMessage>>>) with fn send(&self) is ok, as we lock the mutex to use the sender, and unlock it immediately after we finished sending, so it shouldn't block. The alternative would be to clone the sender for each worker, and I will explore it later.
    • with Mutex<CumulocityConverter>, it ensures that only a single worker can use CumulocityConverter at any given time, which can lead to deadlocks if we're not careful, and currently results in the same behaviour as before, i.e. blocking in the operation handling code will block processing of other messages, because the lock will be open across await points.
      This is temporary, and will have to be fixed, but is used right now so I could show how the full operation handling function would look like, without the current fragmentation. Also, the test demonstrating that the converter doesn't block should be made.
  2. In 72dc9d0, I export Identity type from the download crate, so I can use it in the next commit.
  3. In 0e86568, Downloader is used directly in the operation handler, to show how we can use it directly without going through the actor: if we only want to download a file via HTTP, and this download shouldn't influence anything else, we can use it directly.
  4. In 892ecbe we use UploaderActor via ClientMessageBox, where we receive the response directly in the same place where we send(), instead of going to the top-level receiver in the top-level actor, which then needs to decide to call an operation handling function again. This way we still use an uploader actor, and have messages sent between them, but don't need to fragment our control flow. This was done to show that we can still send and receive messages from other actors from within the proposed new operation handlers.

Next steps

  • rebase newest changes
  • replace lock on mqtt sender with cloned senders (can be done, but will require &mut self)
  • add more tests to see which operations are blocked/executed concurrently
  • make necessary changes to CumulocityConverter so operation handling doesn't block
  • clean up operation handling code

Types of changes

  • Bugfix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Improvement (general improvements like code refactoring that doesn't explicitly fix a bug or add any new functionality)
  • Documentation Update (if none of the other choices apply)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)

Paste Link to the issue

Checklist

  • I have read the CONTRIBUTING doc
  • I have signed the CLA (in all commits with git commit -s)
  • I ran cargo fmt as mentioned in CODING_GUIDELINES
  • I used cargo clippy as mentioned in CODING_GUIDELINES
  • I have added tests that prove my fix is effective or that my feature works
  • I have added necessary documentation (if appropriate)

Further comments

@@ -60,15 +64,36 @@ pub(crate) type IdDownloadRequest = (CmdId, DownloadRequest);
fan_in_message_type!(C8yMapperInput[MqttMessage, FsWatchEvent, SyncComplete, IdUploadResult, IdDownloadResult] : Debug);
type C8yMapperOutput = MqttMessage;

#[derive(Clone)]
struct MqttPublisher(Arc<Mutex<LoggingSender<MqttMessage>>>);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any specific reason you couldn't just freely clone the senders and use them independently? I see that you're already planning on exploring that option, but just wondering why you chose this path first. Faced any issues with cloned senders?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did that first because I knew that I'll need Mutexes for synchronization of other parts, and to be honest, I forgot that our senders are Clone, so I thought Mutex is the only option, but it's not the case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to be honest, I forgot that our senders are Clone, so I thought Mutex is the only option, but it's not the case

That's not entirely correct. Because our LoggingSender<MqttMessage> internally uses futures::channel::mpsc::Sender, which is send(&mut self), we need the methods to be &mut self as well if we want to use the sender. However, if we put the sender behind a Mutex, we can implement send(&self), allowing methods of the worker to also be &self.

while let Some(event) = messages.recv().await {
let worker = Arc::clone(&worker);

tokio::spawn(async move {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a different model in mind where we have independent workers (even actors, may be) for each operation handler with minimal coordination with the top level mapper actor/worker, eliminating the need to share and lock the top level converter. This mapper actor would still coordinate the MQTT messages and convert them to typed operation messages and dispatch that to those respective operation workers. Once the worker gets an operation message, they should be able to freely exchange messages between the other actors like upload/download actors. Extending the usage of the RequestEnvelope beyond the HTTP actors would be key to enabling this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a different model in mind where we have independent workers (even actors, may be) for each operation handler with minimal coordination with the top level mapper actor/worker, eliminating the need to share and lock the top level converter.

Yes, the idea would be to decouple as much as possible from CumulocityConverter wrt. the operation handling, so it can be run separately as a new task, or maybe as a new actor, without locking the rest of the converter. Right now, basically all the functionality is implemented in CumulocityConverter in &mut self methods, so it's impossible to run stuff concurrently without locking.

Ideally the each operation handler should be as independent as possible, and we should be able to easily reason about each in isolation, without the control flow jumping between actor.rs, converter.rs, and e.g. log_upload.rs.

Right now they're just tokio tasks, because my current goal is to just move the boundary on which we block, but the handlers could definitely be extracted out of the main actor.

This mapper actor would still coordinate the MQTT messages and convert them to typed operation messages and dispatch that to those respective operation workers. Once the worker gets an operation message, they should be able to freely exchange messages between the other actors like upload/download actors. Extending the usage of the RequestEnvelope beyond the HTTP actors would be key to enabling this.

That's right, the actor which owns the MQTT message box should route these operation state change message to the operation handlers, which should then be able to do do blocking stuff, like wait for the upload, without synchronizing with the main actor. But I'd say that we overuse messages a bit. It's fine to have actors that manage shared mutable state, but for downloading/uploading, what shared state is there that needs to be managed between different downloads/uploads? Why shouldn't we use the downloader/uploader directly instead of going via the actor?

And I believe that sometimes we use the actor only so that we can mock it in tests. That instead of having an interface that we can mock, we effectively mock the interface by simulating messages being sent in the message boxes. And I think this way of testing is a bit tedious, because you need to handle and ignore unrelated messages, and sometimes when messages are not being send by actor under test, you need to do timeouts, and so on. I think there's room for improvement there.

But that's a discussion for another time, and I expect we'll discuss it more in the future. For the purposes of this PR, the important part is allowing handlers to await either tasks they themselves spawn, or requests sent to other actors via RequestEnvelope, one of these two.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why shouldn't we use the downloader/uploader directly instead of going via the actor?

One additional reason, other than message visibility and testability, is to control the bandwidth usage as well. We don't want to let too many uploads/downloads happening parallelly, to reduce the load on the network and the disk. Routing those via a centralised downloader/uploader will let us better control the maximum parallelism that can be allowed.

url: tedge_file_url.clone(),
file_dir: destination_dir,
let downloader =
Downloader::new(destination_path.clone(), self.config.identity.clone());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we can continue using the downloader actor here by using a ClientMessageBox to it so that we can await on the response receiver inline. The c8y_firmware_manager is already using the same.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right, we could use the ClientMessageBox here, but I also wanted to explore if we could just use the downloader directly, without relying on an outside actor to do it. I'd say it has an advantage of decreasing coupling. The disadvantage is that as the messages are no longer visible, but does other code need them to be visible, other than testing code? If so, then maybe we don't need to go via the actor.

But the main way I did it like this, was to just demonstrate that after the refactor it will be possible, whereas currently this would block. But now we can easily swap this with an EnvelopeSender if we want to go via the actor.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And additional benefit of going via the downloader actor is described in this comment: #2881 (comment)

@@ -375,7 +309,7 @@ impl C8yMapperBuilder {
mqtt.connect_sink(config.topics.clone(), &box_builder.get_sender());
let http_proxy = C8YHttpProxy::new(http);
let timer_sender = timer.connect_client(box_builder.get_sender().sender_clone());
let upload_sender = uploader.connect_client(box_builder.get_sender().sender_clone());
let uploader = ClientMessageBox::new(uploader);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be able to do the same for the downloader as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can, explanation why it was not done for now in comment: #2881 (comment)

Copy link

codecov bot commented May 16, 2024

Codecov Report

Attention: Patch coverage is 79.90431% with 42 lines in your changes are missing coverage. Please review.

Project coverage is 77.9%. Comparing base (644cd6f) to head (f5f7511).
Report is 6 commits behind head on main.

Current head f5f7511 differs from pull request most recent head 884b25f

Please upload reports for the commit 884b25f to get more accurate results.

Additional details and impacted files
Files Coverage Δ
crates/common/download/src/download.rs 81.3% <ø> (+0.6%) ⬆️
crates/core/tedge_api/src/lib.rs 100.0% <ø> (ø)
crates/extensions/c8y_mapper_ext/src/tests.rs 92.6% <100.0%> (+<0.1%) ⬆️
...nsions/c8y_mapper_ext/src/operations/log_upload.rs 90.2% <94.7%> (-0.1%) ⬇️
crates/extensions/c8y_mapper_ext/src/config.rs 47.8% <50.0%> (+<0.1%) ⬆️
crates/extensions/c8y_mapper_ext/src/converter.rs 83.4% <84.0%> (-0.3%) ⬇️
...es/extensions/c8y_mapper_ext/src/operations/mod.rs 91.0% <73.3%> (-1.5%) ⬇️
crates/extensions/c8y_mapper_ext/src/actor.rs 80.4% <80.5%> (-2.0%) ⬇️
...s/c8y_mapper_ext/src/operations/config_snapshot.rs 78.4% <76.9%> (-12.2%) ⬇️

... and 1 file with indirect coverage changes

To be more sure about future changes to the c8y mapper and the
converter, we need tests checking that operations happen concurrently.

This change introduces a test that only covers config_snapshot
operation, but it expectedly fails when regressions were introduced by
the previous commits in this PR.

Signed-off-by: Marcel Guzik <marcel.guzik@inetum.com>
Copy link
Contributor

Robot Results

✅ Passed ❌ Failed ⏭️ Skipped Total Pass % ⏱️ Duration
435 1 3 436 99.77 56m13.273314s

Failed Tests

Name Message ⏱️ Duration Suite
Entities persisted and restored ValueError: invalid literal for int() with base 10: '' 65.819 s Registration Lifecycle

@Bravo555
Copy link
Contributor Author

Closing in favour of #2904, which is a more promising and feasible approach.

@Bravo555 Bravo555 closed this May 25, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants