Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix hang in wait_all_exit #227

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

alex-matei
Copy link
Contributor

This patch fixes a hang that can occur in wait_all_exit().

self.wait() without await returns a Future, so the notified() object is not created until we await on the returned future. That means notify_waiters() can be called before notified() is. This leads to notified() waiting forever because notify_waiters is called only once, when the last waiter is dropped.

notify_waiters() and notified() form a happens-before relationship. There are two possible scenarios:

  1. If notified() comes before notify_waiters() this means we can safely await on notified().

  2. If notified() comes after notify_waiters() this means that what happened before it is visible in the notified() thread. Waiting on notified() at this point will block but we can check for waiters count, which is guaranteed to be 0 because it was set before notify_waiters() call.

Let's move notified() call before checking that the number of waiters is 0.

self.wait() without await returns a Future, so the notified() object is not
created until we await on the returned future.  That means notify_waiters()
can be called before notified() is. This leads to notified() waiting
forever because notify_waiters is called only once, when the last waiter is
dropped.

notify_waiters() and notified() form a happens-before relationship.
There are two possible scenarios:

1. If notified() comes before notify_waiters() this means we can
   safely await on notified().

2. If notified() comes after notify_waiters() this means that what happened
   before it is visible in the notified() thread. Waiting on notified() at
   this point will block but we can check for waiters count, which is
   guaranteed to be 0 because it was set before notify_waiters() call.

Let's move notified() call before checking that the number of waiters
is 0.

Signed-off-by: Alexandru Matei <alexandru.matei@uipath.com>
Ok(())
}
}

async fn wait(&self) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
async fn wait(&self) {
pub async fn wait_all_exit(&self) -> Result<(), Elapsed> {
while self.waiters() > 0 {
let wait = self.wait(self.shared.notify_exit.notified());
....
}
}
async fn wait<F>(&self, future: F) -> Result<F::Output, Elapsed>
where F: Future
{
if let Some(tm) = self.wait_time {
timeout(tm, future).await
} else {
Ok(future.await)
}
}

wait() is just waiting, do while self.waiters() in wait_all ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't fix the issue, you need to check the waiters count after the notified() object is created but before calling await on it. Let's take an example with two threads running in parallel:

Thread 1  (executing last waiter.drop())        |               Thread 2 (in wait_all_exit() call)
                                                |
self.shared.waiters = 0                         |
                                                |
self.shared.notify_exit.notify_waiters()        |               
                                                |
                                                |               future = self.shared.notify_exit.notified();
                                                |
                                                |
                                                |               future.await;

The notify_waiters() call doesn't store any permit, it only unblocks existing notified() objects: https://docs.rs/tokio/latest/tokio/sync/struct.Notify.html#method.notify_waiters .

If the notified object is created after notify_waiters() call, await will block forever. There won't be anyone left to unblock it
because notify_waiters() is called only when the last waiter is dropped (unless a subscribe() call and a drop() happens in the meantime).

The sync.Notify struct (the type of self.shared.notify_exit) uses internally seq_cst operations on an AtomicUsize state value. This means that they form a happens before relationship between notified() and notify_waiters() calls, anything that happened before the call to notify_waiters() in thread 1, will be visible for the thread 2 after the notified() call.

Hence, we can test for waiters == 0 just after the notified() call:

  1. If the value is 0 awaiting at this point would be dangerous because of the scenario described above, but we can just exit.
  2. If the value is not 0 then that means there is still a waiter that is not yet dropped and we can expect a notify_waiters() call on another thread later on. This will unblock our existing notified() object so we can safely await.

By adding a check for the waiters count right before await happens we can protect against this hang.

Copy link
Collaborator

@wllenyj wllenyj May 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation, very detailed. I know how the race happens and have read the documentation.

The Notified future is guaranteed to receive wakeups from notify_waiters() as soon as it has been created, even if it has not yet been polled.

The problem is that Notified need to be created first, then it makes sense to test waiters. So we can move notified() forward.

My point is that the wait function is a helper function, and not public api, it only wrap the timeout or not. It maybe used to unit test or other intenal situation. The wait_all_exit is provided to users to ensure concurrency safety.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two reasons for moving the bulk of the code in wait():

  1. Let's inline wait() call in the snippet you suggested:

     pub async fn wait_all_exit(&self) -> Result<(), Elapsed> {
            while self.waiters() > 0 {
                let future = self.shared.notify_exit.notified();
                if let Some(tm) = self.wait_time {
                    let wait = timeout(tm, future).await
                } else {
                    let wait = Ok(future.await)
                }
                ....
            }
        }
    

    Between the notified() call and the await there is no check for waiters count so awaiting can still block forever if waiters count is 0. The solution is to add the test for waiters() at the beginning of wait() function. This solution while it seem ok at first it doesn't work because of the second issue.

  2. The timeout, when specified, should apply to the outer while loop as well. Otherwise the code might loop multiple times, beyond the specified timeout, which is incorrect.

Copy link
Collaborator

@wllenyj wllenyj May 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ...... is the code that leave it as is for yours.

Suggested change
async fn wait(&self) {
pub async fn wait_all_exit(&self) -> Result<(), Elapsed> {
while self.waiters() > 0 {
let wait = self.wait(self.shared.notify_exit.notified());
if self.waiters() == 0 {
return;
}
wait.await;
}
}

Will this work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This won't work, it doesn't resolve the second issue that is related to timeout. The timeout should be applied to the whole while loop because it can iterate multiple times, in case someone calls subscribe() and the waiters() goes from 0 to >0.
For example, if you pass 5 seconds for timeout this code can behave something like this:

  1. wait for 4 seconds at first in wait.await
  2. waiters() is > 0 after that, because someone did a subscribe() after the last waiter was dropped
  3. 3 seconds in wait.await
  4. waiters() is again >0;
  5. wait.await waits for another 5 seconds.
  6. waiters() is finally 0 and the code can exit.

The code waits for a total of twelve seconds in total although the original intent was to wait only for 5 seconds at most.
The idea is to move the while block inside the future that is passed to timeout() function. By moving it there, you'll get to the same piece of code I wrote.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your comment.
I'm going to take some time to review this.

@alex-matei
Copy link
Contributor Author

@wllenyj hi, any update on this?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants