Skip to content

Commit

Permalink
add callbacks for read/write blocked, and read/write resume
Browse files Browse the repository at this point in the history
for now, only write blocked/resume is plumbed in.
  • Loading branch information
alandekok committed May 14, 2024
1 parent db7b3ff commit 9f782e9
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 38 deletions.
6 changes: 6 additions & 0 deletions src/lib/bio/base.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ typedef int (*fr_bio_callback_t)(fr_bio_t *bio); /* activate / shutdown callback
typedef struct {
fr_bio_callback_t activate;
fr_bio_callback_t shutdown;

fr_bio_callback_t read_blocked;
fr_bio_callback_t write_blocked;

fr_bio_callback_t read_resume; //!< "unblocked" is too similar to "blocked"
fr_bio_callback_t write_resume;
} fr_bio_cb_funcs_t;

/** Accept a new connection on a bio
Expand Down
55 changes: 19 additions & 36 deletions src/lib/bio/fd.c
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ static ssize_t fr_bio_fd_read_stream(fr_bio_t *bio, UNUSED void *packet_ctx, voi
}

#undef flag_blocked
#define flag_blocked info.read_blocked
#define flag_blocked read_blocked
#include "fd_errno.h"

return fr_bio_error(IO);
Expand All @@ -158,7 +158,7 @@ static ssize_t fr_bio_fd_read_connected_datagram(fr_bio_t *bio, UNUSED void *pac
if (rcode >= 0) return rcode;

#undef flag_blocked
#define flag_blocked info.read_blocked
#define flag_blocked read_blocked
#include "fd_errno.h"

return fr_bio_error(IO);
Expand Down Expand Up @@ -193,16 +193,15 @@ static ssize_t fr_bio_fd_recvfrom(fr_bio_t *bio, void *packet_ctx, void *buffer,
return rcode;
}

if (rcode == 0 ) return rcode;
if (rcode == 0) return rcode;

#undef flag_blocked
#define flag_blocked info.read_blocked
#define flag_blocked read_blocked
#include "fd_errno.h"

return fr_bio_error(IO);
}


/** Write to fd.
*
* This function is used for connected sockets, where we ignore the packet_ctx.
Expand All @@ -218,8 +217,6 @@ static ssize_t fr_bio_fd_write(fr_bio_t *bio, UNUSED void *packet_ctx, const voi
*/
if (!buffer) return 0;

my->info.write_blocked = false;

retry:
/*
* We could call send() instead of write()! Posix says:
Expand All @@ -236,11 +233,8 @@ static ssize_t fr_bio_fd_write(fr_bio_t *bio, UNUSED void *packet_ctx, const voi
* here.
*/
rcode = write(my->info.socket.fd, buffer, size);
if (rcode >= 0) return rcode;

#undef flag_blocked
#define flag_blocked info.write_blocked
#include "fd_errno.h"
#include "fd_write.h"

return fr_bio_error(IO);
}
Expand All @@ -262,18 +256,13 @@ static ssize_t fr_bio_fd_sendto(fr_bio_t *bio, void *packet_ctx, const void *buf
*/
if (!buffer) return 0;

my->info.write_blocked = false;

// get destination IP
(void) fr_ipaddr_to_sockaddr(&sockaddr, &salen, &addr->socket.inet.dst_ipaddr, addr->socket.inet.dst_port);

retry:
rcode = sendto(my->info.socket.fd, buffer, size, 0, (struct sockaddr *) &sockaddr, salen);
if (rcode >= 0) return rcode;

#undef flag_blocked
#define flag_blocked info.write_blocked
#include "fd_errno.h"
#include "fd_write.h"

return fr_bio_error(IO);
}
Expand Down Expand Up @@ -325,7 +314,7 @@ static ssize_t fd_fd_recvfromto_common(fr_bio_fd_t *my, void *packet_ctx, void *
if (rcode == 0) return rcode;

#undef flag_blocked
#define flag_blocked info.read_blocked
#define flag_blocked read_blocked
#include "fd_errno.h"

return fr_bio_error(IO);
Expand Down Expand Up @@ -413,8 +402,6 @@ static ssize_t fr_bio_fd_sendfromto4(fr_bio_t *bio, void *packet_ctx, const void
fr_bio_fd_t *my = talloc_get_type_abort(bio, fr_bio_fd_t);
fr_bio_fd_packet_ctx_t *addr = fr_bio_fd_packet_ctx(my, packet_ctx);

my->info.write_blocked = false;

memset(&my->cbuf, 0, sizeof(my->cbuf));
memset(&my->msgh, 0, sizeof(struct msghdr));

Expand Down Expand Up @@ -470,9 +457,7 @@ static ssize_t fr_bio_fd_sendfromto4(fr_bio_t *bio, void *packet_ctx, const void
rcode = sendmsg(my->info.socket.fd, &my->msgh, 0);
if (rcode >= 0) return rcode;

#undef flag_blocked
#define flag_blocked info.write_blocked
#include "fd_errno.h"
#include "fd_write.h"

return fr_bio_error(IO);
}
Expand Down Expand Up @@ -566,8 +551,6 @@ static ssize_t fr_bio_fd_sendfromto6(fr_bio_t *bio, void *packet_ctx, const void
fr_bio_fd_t *my = talloc_get_type_abort(bio, fr_bio_fd_t);
fr_bio_fd_packet_ctx_t *addr = fr_bio_fd_packet_ctx(my, packet_ctx);

my->info.write_blocked = false;

memset(&my->cbuf, 0, sizeof(my->cbuf));
memset(&my->msgh, 0, sizeof(struct msghdr));

Expand Down Expand Up @@ -607,11 +590,8 @@ static ssize_t fr_bio_fd_sendfromto6(fr_bio_t *bio, void *packet_ctx, const void

retry:
rcode = sendmsg(my->info.socket.fd, &my->msgh, 0);
if (rcode >= 0) return rcode;

#undef flag_blocked
#define flag_blocked info.write_blocked
#include "fd_errno.h"
#include "fd_write.h"

return fr_bio_error(IO);
}
Expand Down Expand Up @@ -730,7 +710,10 @@ static ssize_t fr_bio_fd_try_connect(fr_bio_fd_t *my)
* to call fr_bio_fd_connect() before calling write()
*/
case EINPROGRESS:
if (!my->info.write_blocked && my->cb.write_blocked) my->cb.write_blocked((fr_bio_t *) my);

my->info.write_blocked = true;

return fr_bio_error(IO_WOULD_BLOCK);

default:
Expand Down Expand Up @@ -805,11 +788,8 @@ int fr_bio_fd_init_connected(fr_bio_fd_t *my)

my->info.eof = false;

/*
* The socket shouldn't be selected for read. But it should be selected for write.
*/
my->info.read_blocked = false;
my->info.write_blocked = true;
my->info.write_blocked = false;

#ifdef SO_NOSIGPIPE
/*
Expand All @@ -836,6 +816,9 @@ int fr_bio_fd_init_connected(fr_bio_fd_t *my)

if (rcode != fr_bio_error(IO_WOULD_BLOCK)) return rcode;

/*
* The socket is blocked, and should be selected for writing.
*/
fr_assert(my->info.write_blocked);
fr_assert(my->info.state == FR_BIO_FD_STATE_CONNECTING);

Expand Down Expand Up @@ -1035,8 +1018,8 @@ fr_bio_t *fr_bio_fd_alloc(TALLOC_CTX *ctx, fr_bio_cb_funcs_t *cb, fr_bio_fd_conf
.af = AF_UNSPEC,
},
.type = FR_BIO_FD_UNCONNECTED,
.read_blocked = true,
.write_blocked = true,
.read_blocked = false,
.write_blocked = false,
.eof = false,
.state = FR_BIO_FD_STATE_CLOSED,
};
Expand Down Expand Up @@ -1187,7 +1170,7 @@ static ssize_t fr_bio_fd_read_discard(fr_bio_t *bio, UNUSED void *packet_ctx, vo
if (rcode >= 0) return 0;

#undef flag_blocked
#define flag_blocked info.read_blocked
#define flag_blocked read_blocked
#include "fd_errno.h"

return fr_bio_error(IO);
Expand Down
6 changes: 4 additions & 2 deletions src/lib/bio/fd_errno.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Code snippet to avoid duplication.
* We have an error, so we have common error handling code.
*/
switch (errno) {
case EINTR:
Expand All @@ -17,7 +17,9 @@ case EAGAIN:
/*
* The operation would block, return that.
*/
my->flag_blocked = true;
if (!my->info.flag_blocked && my->cb.flag_blocked) my->cb.flag_blocked((fr_bio_t *) my);

my->info.flag_blocked = true;
return fr_bio_error(IO_WOULD_BLOCK);

default:
Expand Down
47 changes: 47 additions & 0 deletions src/lib/bio/fd_write.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Common finalization code for the write functions.
*
* This is in a header file because of "goto retry" in fd_errno.h.
*
* @todo - do we want the callbacks to notify the _previous_ BIO in the chain? That way the top-level
* BIO can notify the application.
*/
if (rcode > 0) {
/*
* We weren't blocked, but we are now.
*/
if (!my->info.write_blocked) {
if ((size_t) rcode == size) {
return rcode;
}

fr_assert((size_t) rcode < size);

/*
* Set the flag and run the callback.
*/
my->info.write_blocked = true;
if (my->cb.write_blocked) my->cb.write_blocked((fr_bio_t *) my);

return rcode;
}

/*
* We were blocked. We're still blocked if we wrote _less_ than the amount of requested data.
* If we wrote all of the data which was requested, then we're unblocked.
*/
my->info.write_blocked = ((size_t) rcode == size);

/*
* Call the "resume" function if we transitioned to being unblocked.
*/
if (!my->info.write_blocked && my->cb.write_resume) my->cb.write_resume((fr_bio_t *) my);

return rcode;
}

if (rcode == 0) return rcode;

#undef flag_blocked
#define flag_blocked write_blocked
#include "fd_errno.h"

0 comments on commit 9f782e9

Please sign in to comment.