New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
QUIC Concurrency API Implementation #24257
base: feature/quic-server
Are you sure you want to change the base?
Changes from 1 commit
59c8791
52da59a
a550fc8
133d14e
a8077c3
e3520f0
940c3f1
80bf4a7
f3d81e5
0f48f9d
7468e2f
1cf3069
8ca024a
ec9c2be
36fad54
aa15ea4
e6e8abf
4e53b28
cf5a935
8555f1f
d7ed50e
01303d0
17c6c70
b25425b
d98dfb2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,6 +9,7 @@ | |
#include "internal/quic_reactor.h" | ||
#include "internal/common.h" | ||
#include "internal/thread_arch.h" | ||
#include <assert.h> | ||
|
||
/* | ||
* Core I/O Reactor Framework | ||
|
@@ -32,10 +33,17 @@ int ossl_quic_reactor_init(QUIC_REACTOR *rtor, | |
rtor->tick_cb = tick_cb; | ||
rtor->tick_cb_arg = tick_cb_arg; | ||
|
||
rtor->cur_blocking_waiters = 0; | ||
|
||
if ((flags & QUIC_REACTOR_FLAG_USE_NOTIFIER) != 0) { | ||
if (!ossl_rio_notifier_init(&rtor->notifier)) | ||
return 0; | ||
|
||
if ((rtor->notifier_cv = ossl_crypto_condvar_new()) == NULL) { | ||
ossl_rio_notifier_cleanup(&rtor->notifier); | ||
return 0; | ||
} | ||
|
||
rtor->have_notifier = 1; | ||
} else { | ||
rtor->have_notifier = 0; | ||
|
@@ -52,6 +60,8 @@ void ossl_quic_reactor_cleanup(QUIC_REACTOR *rtor) | |
if (rtor->have_notifier) { | ||
ossl_rio_notifier_cleanup(&rtor->notifier); | ||
rtor->have_notifier = 0; | ||
|
||
ossl_crypto_condvar_free(&rtor->notifier_cv); | ||
} | ||
} | ||
|
||
|
@@ -191,6 +201,7 @@ RIO_NOTIFIER *ossl_quic_reactor_get0_notifier(QUIC_REACTOR *rtor) | |
*/ | ||
static int poll_two_fds(int rfd, int rfd_want_read, | ||
int wfd, int wfd_want_write, | ||
int notify_rfd, | ||
OSSL_TIME deadline, | ||
CRYPTO_MUTEX *mutex) | ||
{ | ||
|
@@ -224,9 +235,17 @@ static int poll_two_fds(int rfd, int rfd_want_read, | |
if (wfd != -1) | ||
openssl_fdset(wfd, &efd_set); | ||
|
||
/* Check for notifier FD readability. */ | ||
if (notify_rfd != -1) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. INVALID_SOCKET |
||
openssl_fdset(notify_rfd, &rfd_set); | ||
openssl_fdset(notify_rfd, &efd_set); | ||
} | ||
|
||
maxfd = rfd; | ||
if (wfd > maxfd) | ||
maxfd = wfd; | ||
if (notify_rfd > maxfd) | ||
maxfd = notify_rfd; | ||
|
||
if (!ossl_assert(rfd != -1 || wfd != -1 | ||
|| !ossl_time_is_infinite(deadline))) | ||
|
@@ -269,7 +288,7 @@ static int poll_two_fds(int rfd, int rfd_want_read, | |
#else | ||
int pres, timeout_ms; | ||
OSSL_TIME now, timeout; | ||
struct pollfd pfds[2] = {0}; | ||
struct pollfd pfds[3] = {0}; | ||
size_t npfd = 0; | ||
|
||
if (rfd == wfd) { | ||
|
@@ -290,6 +309,12 @@ static int poll_two_fds(int rfd, int rfd_want_read, | |
++npfd; | ||
} | ||
|
||
if (notify_rfd >= 0) { | ||
pfds[npfd].fd = notify_rfd; | ||
pfds[npfd].events = POLLIN; | ||
++npfd; | ||
} | ||
|
||
if (!ossl_assert(npfd != 0 || !ossl_time_is_infinite(deadline))) | ||
/* Do not block forever; should not happen. */ | ||
return 0; | ||
|
@@ -336,8 +361,8 @@ static int poll_descriptor_to_fd(const BIO_POLL_DESCRIPTOR *d, int *fd) | |
} | ||
|
||
/* | ||
* Poll up to two abstract poll descriptors. Currently we only support | ||
* poll descriptors which represent FDs. | ||
* Poll up to two abstract poll descriptors, as well as an optional notify FD. | ||
* Currently we only support poll descriptors which represent FDs. | ||
* | ||
* If mutex is non-NULL, it is assumed be a lock currently held for write and is | ||
* unlocked for the duration of any wait. | ||
|
@@ -348,6 +373,7 @@ static int poll_descriptor_to_fd(const BIO_POLL_DESCRIPTOR *d, int *fd) | |
*/ | ||
static int poll_two_descriptors(const BIO_POLL_DESCRIPTOR *r, int r_want_read, | ||
const BIO_POLL_DESCRIPTOR *w, int w_want_write, | ||
int notify_rfd, | ||
OSSL_TIME deadline, | ||
CRYPTO_MUTEX *mutex) | ||
{ | ||
|
@@ -357,7 +383,45 @@ static int poll_two_descriptors(const BIO_POLL_DESCRIPTOR *r, int r_want_read, | |
|| !poll_descriptor_to_fd(w, &wfd)) | ||
return 0; | ||
|
||
return poll_two_fds(rfd, r_want_read, wfd, w_want_write, deadline, mutex); | ||
return poll_two_fds(rfd, r_want_read, wfd, w_want_write, | ||
notify_rfd, deadline, mutex); | ||
} | ||
|
||
void ossl_quic_reactor_notify_other_threads(QUIC_REACTOR *rtor, | ||
CRYPTO_MUTEX *mutex) | ||
{ | ||
if (!rtor->have_notifier) | ||
return; | ||
|
||
/* | ||
* This function is called when we have done anything on this thread which | ||
* might allow a predicate for a block_until_pred call on another thread to | ||
* now be met. | ||
* | ||
* When this happens, we need to wake those threads using the notifier. | ||
* However, we do not want to wake *this* thread (if/when it subsequently | ||
* enters block_until_pred) due to the notifier FD becoming readable. | ||
* Therefore, signal the notifier, and use a CV to detect when all other | ||
* threads have woken. | ||
*/ | ||
|
||
if (rtor->cur_blocking_waiters == 0) | ||
/* Nothing to do in this case. */ | ||
return; | ||
|
||
/* Signal the notifier to wake up all threads. */ | ||
if (!rtor->signalled_notifier) { | ||
ossl_rio_notifier_signal(&rtor->notifier); | ||
rtor->signalled_notifier = 1; | ||
} | ||
|
||
/* | ||
* Wait on the CV until all threads have finished the first phase of the | ||
* wakeup process and the last thread out has taken responsibility for | ||
* unsignalling the notifier. | ||
*/ | ||
while (rtor->signalled_notifier) | ||
ossl_crypto_condvar_wait(rtor->notifier_cv, mutex); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You need to lock this mutex prior to reading/writing rtor->signaled_notifier, and unlock it again after the thread you are notifying has updated signaled_notifier. I believe if the mutex isn't owned by the calling thread at the time of the underlying pthread_cond_wait call, EINVAL is returned here: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes — this code is always called with the mutex held. Incidentally, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm sorry, I see where poll_two_descriptors unlocks/relocks the mutex several times, but I never see where that mutex is first acquired. Is the reactor mutex assigned in ossl_quic_reactor_init aliased from the qctx mutex that is also used in qctx_lock? That would explain why I can't find where rtor->mutex is explicitly locked. Incidentally, while looking at this I noticed that all your usages of mutexes in QUIC are conditional on OPENSSL_THREADS being defined, which is good, but the above call to ossl_crypto_condvar_wait is not, implying that in a no-threads build, this code will pass a NULL pointer to ossl_crypto_condvar_wait, which I believe will cause -EINVAL to be returned. Not sure if thats a problem in an unthreaded build, but I thought I would point it out. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yes. The reactor does not allocate or own its own mutex, but uses the mutex donated from the quic_impl.c code. This is necessary so that it can unlock the mutex during the poll(2) call and then reacquire it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Good point. We need to have better CI coverage here... |
||
} | ||
|
||
/* | ||
|
@@ -376,9 +440,12 @@ int ossl_quic_reactor_block_until_pred(QUIC_REACTOR *rtor, | |
uint32_t flags, | ||
CRYPTO_MUTEX *mutex) | ||
{ | ||
int res, net_read_desired, net_write_desired; | ||
int res, net_read_desired, net_write_desired, notifier_fd; | ||
OSSL_TIME tick_deadline; | ||
|
||
notifier_fd | ||
= (rtor->have_notifier ? ossl_rio_notifier_as_fd(&rtor->notifier) : -1); | ||
|
||
for (;;) { | ||
if ((flags & SKIP_FIRST_TICK) != 0) | ||
flags &= ~SKIP_FIRST_TICK; | ||
|
@@ -397,12 +464,77 @@ int ossl_quic_reactor_block_until_pred(QUIC_REACTOR *rtor, | |
/* Can't wait if there is nothing to wait for. */ | ||
return 0; | ||
|
||
if (!poll_two_descriptors(ossl_quic_reactor_get_poll_r(rtor), | ||
net_read_desired, | ||
ossl_quic_reactor_get_poll_w(rtor), | ||
net_write_desired, | ||
tick_deadline, | ||
mutex)) | ||
++rtor->cur_blocking_waiters; | ||
|
||
res = poll_two_descriptors(ossl_quic_reactor_get_poll_r(rtor), | ||
net_read_desired, | ||
ossl_quic_reactor_get_poll_w(rtor), | ||
net_write_desired, | ||
notifier_fd, | ||
tick_deadline, | ||
mutex); | ||
|
||
assert(rtor->cur_blocking_waiters > 0); | ||
--rtor->cur_blocking_waiters; | ||
|
||
/* | ||
* We have now exited the OS poller call. We may have | ||
* (rtor->signalled_notifier), and other threads may still be blocking. | ||
* This means that cur_blocking_waiters may still be non-zero. As such, | ||
* we cannot unsignal the notifier until all threads have had an | ||
* opportunity to wake up. | ||
* | ||
* At the same time, we cannot unsignal in the case where | ||
* cur_blocking_waiters is now zero because this condition may not occur | ||
* reliably. Consider the following scenario: | ||
* | ||
* T1 enters block_until_pred, cur_blocking_waiters -> 1 | ||
* T2 enters block_until_pred, cur_blocking_waiters -> 2 | ||
* T3 enters block_until_pred, cur_blocking_waiters -> 3 | ||
* | ||
* T4 enters block_until_pred, does not block, ticks, | ||
* sees that cur_blocking_waiters > 0 and signals the notifier | ||
* | ||
* T3 wakes, cur_blocking_waiters -> 2 | ||
* T3 predicate is not satisfied, cur_blocking_waiters -> 3, block again | ||
* | ||
* Notifier is still signalled, so T3 immediately wakes again | ||
* and is stuck repeating the above steps. | ||
* | ||
* T1, T2 are also woken by the notifier but never see | ||
* cur_blocking_waiters drop to 0, so never unsignal the notifier. | ||
* | ||
* As such, a two phase approach is chosen when designalling the | ||
* notifier: | ||
* | ||
* First, all of the poll_two_descriptor calls on all threads are | ||
* allowed to exit due to the notifier being signalled. | ||
* | ||
* Second, the thread which happened to be the one which decremented | ||
* cur_blocking_waiters to 0 unsignals the notifier and is then | ||
* responsible for broadcasting to a CV to indicate to the other | ||
* threads that the synchronised wakeup has been cmpleted. Other | ||
* threads wait for this CV to be signalled. | ||
* | ||
*/ | ||
if (rtor->have_notifier && rtor->signalled_notifier) { | ||
if (rtor->cur_blocking_waiters == 0) { | ||
ossl_rio_notifier_unsignal(&rtor->notifier); | ||
rtor->signalled_notifier = 0; | ||
|
||
/* | ||
* Release the other threads which have woken up (and possibly | ||
* rtor_notify_other_threads as well). | ||
*/ | ||
ossl_crypto_condvar_broadcast(rtor->notifier_cv); | ||
} else { | ||
/* We are not the last waiter out - so wait for that one. */ | ||
while (rtor->signalled_notifier) | ||
ossl_crypto_condvar_wait(rtor->notifier_cv, mutex); | ||
} | ||
} | ||
|
||
if (!res) | ||
/* | ||
* We don't actually care why the call succeeded (timeout, FD | ||
* readiness), we just call reactor_tick and start trying to do I/O | ||
|
@@ -420,4 +552,6 @@ int ossl_quic_reactor_block_until_pred(QUIC_REACTOR *rtor, | |
*/ | ||
return 0; | ||
} | ||
|
||
return res; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: protected by the caller's