Skip to content

Commit

Permalink
Allow async/await on multiple file descriptors
Browse files Browse the repository at this point in the history
Provide two new public APIs: lwan_request_awaitv_any() and
lwan_request_awaitv_all(), which, respectively, will await for
an operation on at least one of the awaited file descriptors,
returning the one that unblocked the coroutine, and for all
the awaited file descriptors.

The APIs are experimental but you can already see how much
it improves the chat implementation of the websockets sample:
now, instead of having to poll both the websocket and the
pub/sub subscription, and wait a few milliseconds, it now
instantaneously wakes up when there's data in either one of
them, and processes only what has data.  The chat now feels
like a proper chat app (well, within reason for that crude
app, but you get the idea).

(As a side effect: we now send websocket pings periodically.)

There's a lot to clean up here, but I'm tired and this will
be done eventually.
  • Loading branch information
lpereira committed May 9, 2024
1 parent 945fd78 commit b530ed6
Show file tree
Hide file tree
Showing 9 changed files with 323 additions and 91 deletions.
1 change: 1 addition & 0 deletions src/lib/liblwan.sym
Expand Up @@ -73,6 +73,7 @@ global:
lwan_handler_info_*;

lwan_request_await_*;
lwan_request_awaitv_*;
lwan_request_async_*;

lwan_straitjacket_enforce*;
Expand Down
2 changes: 2 additions & 0 deletions src/lib/lwan-private.h
Expand Up @@ -283,3 +283,5 @@ void lwan_request_foreach_header_for_cgi(struct lwan_request *request,
size_t value_len,
void *user_data),
void *user_data);

bool lwan_send_websocket_ping_for_tq(struct lwan_connection *conn);
32 changes: 0 additions & 32 deletions src/lib/lwan-request.c
Expand Up @@ -2083,38 +2083,6 @@ __attribute__((used)) int fuzz_parse_http_request(const uint8_t *data,
}
#endif

static inline int64_t
make_async_yield_value(int fd, enum lwan_connection_coro_yield event)
{
return (int64_t)(((uint64_t)fd << 32 | event));
}

static inline struct lwan_connection *async_await_fd(
struct coro *coro, int fd, enum lwan_connection_coro_yield events)
{
assert(events >= CONN_CORO_ASYNC_AWAIT_READ &&
events <= CONN_CORO_ASYNC_AWAIT_READ_WRITE);

int64_t from_coro = coro_yield(coro, make_async_yield_value(fd, events));
return (struct lwan_connection *)(intptr_t)from_coro;
}

struct lwan_connection *lwan_request_await_read(struct lwan_request *r, int fd)
{
return async_await_fd(r->conn->coro, fd, CONN_CORO_ASYNC_AWAIT_READ);
}

struct lwan_connection *lwan_request_await_write(struct lwan_request *r, int fd)
{
return async_await_fd(r->conn->coro, fd, CONN_CORO_ASYNC_AWAIT_WRITE);
}

struct lwan_connection *lwan_request_await_read_write(struct lwan_request *r,
int fd)
{
return async_await_fd(r->conn->coro, fd, CONN_CORO_ASYNC_AWAIT_READ_WRITE);
}

ssize_t lwan_request_async_read_flags(
struct lwan_request *request, int fd, void *buf, size_t len, int flags)
{
Expand Down
1 change: 1 addition & 0 deletions src/lib/lwan-strbuf.c
Expand Up @@ -27,6 +27,7 @@
#include <stdlib.h>
#include <string.h>
#include <sys/stat.h>
#include <unistd.h>

#include "lwan-private.h"

Expand Down
190 changes: 178 additions & 12 deletions src/lib/lwan-thread.c
Expand Up @@ -559,7 +559,7 @@ conn_flags_to_epoll_events(enum lwan_connection_flags flags)
return EPOLL_EVENTS(flags);
}

static void update_epoll_flags(const struct timeout_queue *tq,
static void update_epoll_flags(const struct lwan *lwan,
struct lwan_connection *conn,
int epoll_fd,
enum lwan_connection_coro_yield yield_result)
Expand Down Expand Up @@ -609,7 +609,7 @@ static void update_epoll_flags(const struct timeout_queue *tq,

struct epoll_event event = {.events = conn_flags_to_epoll_events(conn->flags),
.data.ptr = conn};
int fd = lwan_connection_get_fd(tq->lwan, conn);
int fd = lwan_connection_get_fd(lwan, conn);

if (UNLIKELY(epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &event) < 0))
lwan_status_perror("epoll_ctl");
Expand All @@ -619,7 +619,11 @@ static void unasync_await_conn(void *data1, void *data2)
{
struct lwan_connection *async_fd_conn = data1;

async_fd_conn->flags &= ~(CONN_ASYNC_AWAIT | CONN_HUNG_UP);
async_fd_conn->flags &=
~(CONN_ASYNC_AWAIT | CONN_HUNG_UP | CONN_ASYNC_AWAIT_MULTIPLE);
assert(async_fd_conn->parent);
async_fd_conn->parent->flags &= ~CONN_ASYNC_AWAIT_MULTIPLE;

async_fd_conn->thread = data2;

/* If this file descriptor number is used again in the future as an HTTP
Expand All @@ -635,9 +639,9 @@ static void unasync_await_conn(void *data1, void *data2)
}

static enum lwan_connection_coro_yield
resume_async(const struct timeout_queue *tq,
resume_async(const struct lwan *l,
enum lwan_connection_coro_yield yield_result,
int64_t from_coro,
int await_fd,
struct lwan_connection *conn,
int epoll_fd)
{
Expand All @@ -646,7 +650,6 @@ resume_async(const struct timeout_queue *tq,
[CONN_CORO_ASYNC_AWAIT_WRITE] = CONN_EVENTS_WRITE,
[CONN_CORO_ASYNC_AWAIT_READ_WRITE] = CONN_EVENTS_READ_WRITE,
};
int await_fd = (int)((uint64_t)from_coro >> 32);
enum lwan_connection_flags flags;
int op;

Expand All @@ -656,7 +659,7 @@ resume_async(const struct timeout_queue *tq,

flags = to_connection_flags[yield_result];

struct lwan_connection *await_fd_conn = &tq->lwan->conns[await_fd];
struct lwan_connection *await_fd_conn = &l->conns[await_fd];
if (LIKELY(await_fd_conn->flags & CONN_ASYNC_AWAIT)) {
if (LIKELY((await_fd_conn->flags & CONN_EVENTS_MASK) == flags))
return CONN_CORO_SUSPEND;
Expand Down Expand Up @@ -697,6 +700,168 @@ resume_async(const struct timeout_queue *tq,
return CONN_CORO_ABORT;
}

struct flag_update {
unsigned int num_awaiting;
enum lwan_connection_coro_yield request_conn_yield;
};

static struct flag_update
update_flags_for_async_awaitv(struct lwan_request *r, struct lwan *l, va_list ap)
{
int epoll_fd = r->conn->thread->epoll_fd;
struct flag_update update = {.num_awaiting = 0,
.request_conn_yield = CONN_CORO_YIELD};

while (true) {
int await_fd = va_arg(ap, int);
if (await_fd < 0) {
return update;
}

enum lwan_connection_coro_yield events =
va_arg(ap, enum lwan_connection_coro_yield);
if (UNLIKELY(events < CONN_CORO_ASYNC_AWAIT_READ ||
events > CONN_CORO_ASYNC_AWAIT_READ_WRITE)) {
lwan_status_error("awaitv() called with invalid events");
coro_yield(r->conn->coro, CONN_CORO_ABORT);
__builtin_unreachable();
}

struct lwan_connection *conn = &l->conns[await_fd];

if (UNLIKELY(conn->flags & CONN_ASYNC_AWAIT_MULTIPLE)) {
lwan_status_debug("ignoring second awaitv call on same fd: %d",
await_fd);
continue;
}

conn->flags |= CONN_ASYNC_AWAIT_MULTIPLE;
update.num_awaiting++;

if (await_fd == r->fd) {
static const enum lwan_connection_coro_yield to_request_yield[] = {
[CONN_CORO_ASYNC_AWAIT_READ] = CONN_CORO_WANT_READ,
[CONN_CORO_ASYNC_AWAIT_WRITE] = CONN_CORO_WANT_WRITE,
[CONN_CORO_ASYNC_AWAIT_READ_WRITE] = CONN_CORO_WANT_READ_WRITE,
};

update.request_conn_yield = to_request_yield[events];
continue;
}

events = resume_async(l, events, await_fd, r->conn, epoll_fd);
if (UNLIKELY(events == CONN_CORO_ABORT)) {
lwan_status_error("could not register fd for async operation");
coro_yield(r->conn->coro, CONN_CORO_ABORT);
__builtin_unreachable();
}
}
}

static void reset_conn_async_await_multiple_flag(struct lwan_connection *conns,
va_list ap)
{
while (true) {
int await_fd = va_arg(ap, int);
if (await_fd < 0)
return;

struct lwan_connection *conn = &conns[await_fd];
conn->flags &= ~CONN_ASYNC_AWAIT_MULTIPLE;

LWAN_NO_DISCARD(va_arg(ap, enum lwan_connection_coro_yield));
}
}

int lwan_request_awaitv_any(struct lwan_request *r, ...)
{
struct lwan *l = r->conn->thread->lwan;
va_list ap;

va_start(ap, r);
reset_conn_async_await_multiple_flag(l->conns, ap);
va_end(ap);

va_start(ap, r);
struct flag_update update = update_flags_for_async_awaitv(r, l, ap);
va_end(ap);

while (true) {
int64_t v = coro_yield(r->conn->coro, update.request_conn_yield);
struct lwan_connection *conn = (struct lwan_connection *)(uintptr_t)v;

if (conn->flags & CONN_ASYNC_AWAIT_MULTIPLE) {
va_start(ap, r);
reset_conn_async_await_multiple_flag(l->conns, ap);
va_end(ap);

return lwan_connection_get_fd(l, conn);
}
}
}

void lwan_request_awaitv_all(struct lwan_request *r, ...)
{
struct lwan *l = r->conn->thread->lwan;
va_list ap;

va_start(ap, r);
reset_conn_async_await_multiple_flag(l->conns, ap);
va_end(ap);

va_start(ap, r);
struct flag_update update = update_flags_for_async_awaitv(r, l, ap);
va_end(ap);

while (update.num_awaiting) {
int64_t v = coro_yield(r->conn->coro, update.request_conn_yield);
struct lwan_connection *conn = (struct lwan_connection *)(uintptr_t)v;

if (conn->flags & CONN_ASYNC_AWAIT_MULTIPLE) {
conn->flags &= ~CONN_ASYNC_AWAIT_MULTIPLE;
update.num_awaiting--;
}
}
}

static inline int64_t
make_async_yield_value(int fd, enum lwan_connection_coro_yield event)
{
assert(event >= CONN_CORO_ASYNC_AWAIT_READ &&
event <= CONN_CORO_ASYNC_AWAIT_READ_WRITE);

return (int64_t)(((uint64_t)fd << 32 | event));
}

static inline int async_await_fd(struct lwan_connection *conn,
int fd,
enum lwan_connection_coro_yield events)
{
int64_t yield_value = make_async_yield_value(fd, events);
int64_t from_coro = coro_yield(conn->coro, yield_value);
struct lwan_connection *conn_from_coro =
(struct lwan_connection *)(intptr_t)from_coro;

assert(conn_from_coro->flags & CONN_ASYNC_AWAIT);

return lwan_connection_get_fd(conn->thread->lwan, conn_from_coro);
}

inline int lwan_request_await_read(struct lwan_request *r, int fd)
{
return async_await_fd(r->conn, fd, CONN_CORO_ASYNC_AWAIT_READ);
}

inline int lwan_request_await_write(struct lwan_request *r, int fd)
{
return async_await_fd(r->conn, fd, CONN_CORO_ASYNC_AWAIT_WRITE);
}

inline int lwan_request_await_read_write(struct lwan_request *r, int fd)
{
return async_await_fd(r->conn, fd, CONN_CORO_ASYNC_AWAIT_READ_WRITE);
}

static ALWAYS_INLINE void resume_coro(struct timeout_queue *tq,
struct lwan_connection *conn_to_resume,
struct lwan_connection *conn_to_yield,
Expand All @@ -710,14 +875,15 @@ static ALWAYS_INLINE void resume_coro(struct timeout_queue *tq,
enum lwan_connection_coro_yield yield_result = from_coro & 0xffffffff;

if (UNLIKELY(yield_result >= CONN_CORO_ASYNC)) {
yield_result =
resume_async(tq, yield_result, from_coro, conn_to_resume, epoll_fd);
int await_fd = (int)((uint64_t)from_coro >> 32);
yield_result = resume_async(tq->lwan, yield_result, await_fd,
conn_to_resume, epoll_fd);
}

if (UNLIKELY(yield_result == CONN_CORO_ABORT)) {
timeout_queue_expire(tq, conn_to_resume);
} else {
update_epoll_flags(tq, conn_to_resume, epoll_fd, yield_result);
update_epoll_flags(tq->lwan, conn_to_resume, epoll_fd, yield_result);
timeout_queue_move_to_last(tq, conn_to_resume);
}
}
Expand Down Expand Up @@ -787,7 +953,7 @@ static bool process_pending_timers(struct timeout_queue *tq,
}

request = container_of(timeout, struct lwan_request, timeout);
update_epoll_flags(tq, request->conn, epoll_fd, CONN_CORO_RESUME);
update_epoll_flags(tq->lwan, request->conn, epoll_fd, CONN_CORO_RESUME);
}

if (should_expire_timers) {
Expand Down Expand Up @@ -1452,7 +1618,7 @@ void lwan_thread_init(struct lwan *l)

for (unsigned int i = 0; i < l->thread.count; i++) {
struct lwan_thread *thread;

if (schedtbl) {
/* For SO_ATTACH_REUSEPORT_CBPF to work with the program
* we provide the kernel, sockets have to be added to the
Expand Down
9 changes: 8 additions & 1 deletion src/lib/lwan-tq.c
Expand Up @@ -112,7 +112,14 @@ void timeout_queue_expire_waiting(struct timeout_queue *tq)
if (conn->time_to_expire > tq->current_time)
return;

timeout_queue_expire(tq, conn);
if (LIKELY(!(conn->flags & CONN_IS_WEBSOCKET))) {
timeout_queue_expire(tq, conn);
} else {
if (LIKELY(lwan_send_websocket_ping_for_tq(conn)))
timeout_queue_move_to_last(tq, conn);
else
timeout_queue_expire(tq, conn);
}
}

/* Timeout queue exhausted: reset epoch */
Expand Down

0 comments on commit b530ed6

Please sign in to comment.