Skip to content

Commit

Permalink
add read blocked / resume callbacks
Browse files Browse the repository at this point in the history
  • Loading branch information
alandekok committed May 14, 2024
1 parent 9f782e9 commit 11e596d
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 36 deletions.
42 changes: 6 additions & 36 deletions src/lib/bio/fd.c
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,8 @@ static ssize_t fr_bio_fd_read_stream(fr_bio_t *bio, UNUSED void *packet_ctx, voi
ssize_t rcode;
fr_bio_fd_t *my = talloc_get_type_abort(bio, fr_bio_fd_t);

my->info.read_blocked = false;

retry:
rcode = read(my->info.socket.fd, buffer, size);
if (rcode > 0) return rcode;

if (rcode == 0) {
/*
* Stream sockets return 0 at EOF. However, we want to distinguish that from the case of datagram
Expand All @@ -131,9 +127,7 @@ static ssize_t fr_bio_fd_read_stream(fr_bio_t *bio, UNUSED void *packet_ctx, voi
return fr_bio_error(EOF);
}

#undef flag_blocked
#define flag_blocked read_blocked
#include "fd_errno.h"
#include "fd_read.h"

return fr_bio_error(IO);
}
Expand All @@ -151,15 +145,10 @@ static ssize_t fr_bio_fd_read_connected_datagram(fr_bio_t *bio, UNUSED void *pac
ssize_t rcode;
fr_bio_fd_t *my = talloc_get_type_abort(bio, fr_bio_fd_t);

my->info.read_blocked = false;

retry:
rcode = read(my->info.socket.fd, buffer, size);
if (rcode >= 0) return rcode;

#undef flag_blocked
#define flag_blocked read_blocked
#include "fd_errno.h"
#include "fd_read.h"

return fr_bio_error(IO);
}
Expand All @@ -174,8 +163,6 @@ static ssize_t fr_bio_fd_recvfrom(fr_bio_t *bio, void *packet_ctx, void *buffer,
socklen_t salen;
struct sockaddr_storage sockaddr;

my->info.read_blocked = false;

retry:
salen = sizeof(sockaddr);

Expand All @@ -190,14 +177,9 @@ static ssize_t fr_bio_fd_recvfrom(fr_bio_t *bio, void *packet_ctx, void *buffer,

(void) fr_ipaddr_from_sockaddr(&addr->socket.inet.src_ipaddr, &addr->socket.inet.src_port,
&sockaddr, salen);
return rcode;
}

if (rcode == 0) return rcode;

#undef flag_blocked
#define flag_blocked read_blocked
#include "fd_errno.h"
#include "fd_read.h"

return fr_bio_error(IO);
}
Expand Down Expand Up @@ -280,8 +262,6 @@ static ssize_t fd_fd_recvfromto_common(fr_bio_fd_t *my, void *packet_ctx, void *
from.ss_family = AF_UNSPEC;
#endif

my->info.read_blocked = false;

memset(&my->cbuf, 0, sizeof(my->cbuf));
memset(&my->msgh, 0, sizeof(struct msghdr));

Expand All @@ -307,15 +287,9 @@ static ssize_t fd_fd_recvfromto_common(fr_bio_fd_t *my, void *packet_ctx, void *

(void) fr_ipaddr_from_sockaddr(&addr->socket.inet.src_ipaddr, &addr->socket.inet.src_port,
&from, my->msgh.msg_namelen);

return rcode;
}

if (rcode == 0) return rcode;

#undef flag_blocked
#define flag_blocked read_blocked
#include "fd_errno.h"
#include "fd_read.h"

return fr_bio_error(IO);
}
Expand Down Expand Up @@ -1163,15 +1137,11 @@ static ssize_t fr_bio_fd_read_discard(fr_bio_t *bio, UNUSED void *packet_ctx, vo
ssize_t rcode;
fr_bio_fd_t *my = talloc_get_type_abort(bio, fr_bio_fd_t);

my->info.read_blocked = false;

retry:
rcode = read(my->info.socket.fd, buffer, size);
if (rcode >= 0) return 0;
if (rcode > 0) rcode = 0; /* always return that we read no data */

#undef flag_blocked
#define flag_blocked read_blocked
#include "fd_errno.h"
#include "fd_read.h"

return fr_bio_error(IO);
}
Expand Down
34 changes: 34 additions & 0 deletions src/lib/bio/fd_read.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Common finalization code for the read functions.
*
* This is in a header file because of "goto retry" in fd_errno.h.
*
* @todo - do we want the callbacks to notify the _previous_ BIO in the chain? That way the top-level
* BIO can notify the application.
*/
if (rcode > 0) {
/*
* We weren't blocked, so we're still not blocked.
*/
if (!my->info.read_blocked) {
return rcode;
}

/*
* We were blocked. Since we just read data, we're now unblocked.
*/
my->info.read_blocked = false;

/*
* Call the "resume" function when we transition to being unblocked.
*/
if (my->cb.read_resume) my->cb.read_resume((fr_bio_t *) my);

return rcode;
}

if (rcode == 0) return rcode;

#undef flag_blocked
#define flag_blocked read_blocked
#include "fd_errno.h"

0 comments on commit 11e596d

Please sign in to comment.