Skip to content

Commit

Permalink
add write_blocked flag to packet bio
Browse files Browse the repository at this point in the history
along with a flush API.  So that the individual protocols don't
have to do that.

and have the memory bio return IO_WOULD_BLOCK if it can't flush
the pending data.
  • Loading branch information
alandekok committed May 14, 2024
1 parent 73dd8ae commit d50a131
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 18 deletions.
13 changes: 8 additions & 5 deletions src/lib/bio/mem.c
Original file line number Diff line number Diff line change
Expand Up @@ -416,8 +416,11 @@ static ssize_t fr_bio_mem_write_next(fr_bio_t *bio, void *packet_ctx, void const
/*
* Some of the data base been written to the next bio, and some to our cache. The caller has to
* ensure that the first subsequent write will send over the rest of the data.
*
* However, we tell the caller that we wrote the entire packet. Because we are now responsible
* for writing the remaining bytes.
*/
return rcode + leftover;
return size;
}

/** Flush the memory buffer.
Expand Down Expand Up @@ -467,19 +470,19 @@ static ssize_t fr_bio_mem_write_flush(fr_bio_mem_t *my, size_t size)
}

/*
* We didn't write anything, return that.
* We didn't write anything, the bio is still blocked.
*/
if ((rcode == 0) || (rcode == fr_bio_error(IO_WOULD_BLOCK))) return rcode;
if ((rcode == 0) || (rcode == fr_bio_error(IO_WOULD_BLOCK))) return fr_bio_error(IO_WOULD_BLOCK);

/*
* Tell the buffer that we've read a certain amount of data from it.
*/
(void) fr_bio_buf_read(&my->write_buffer, NULL, (size_t) rcode);

/*
* We haven't emptied the buffer, return the partial write.
* We haven't emptied the buffer, any further IO is blocked.
*/
if ((size_t) rcode < used) return rcode;
if ((size_t) rcode < used) return fr_bio_error(IO_WOULD_BLOCK);

/*
* We've flushed all of the buffer. Revert back to "pass through" writing.
Expand Down
61 changes: 53 additions & 8 deletions src/lib/bio/packet.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ struct fr_bio_packet_s {
fr_bio_packet_write_t write; //!< write to the underlying bio

fr_bio_t *bio; //!< underlying bio for IO

bool write_blocked; //!< are writes to the bio blocked?
};


Expand All @@ -82,18 +84,18 @@ struct fr_bio_packet_s {
* check for that case, and handle blocking errors. Typically by pushing the packet to a queue, and trying
* it again later.
*
* @param bio the packet-based bio
* @param my the packet-based bio
* @param[out] request_ctx_p the larger context for the original request packet
* @param[out] packet_p Where the allocated #fr_packet_t will be stored
* @param[out] out_ctx for the output pairs
* @param[out] packet_p Where the allocated #fr_packet_t will be stored
* @param[out] out_ctx for the output pairs
* @param[out] out decoded output pairs
* @return
* - <0 on error. This is fr_bio_error(FOO)
* - 0 for success
*/
static inline CC_HINT(nonnull) int fr_bio_packet_read(fr_bio_packet_t *bio, void **request_ctx_p, fr_packet_t **packet_p, TALLOC_CTX *out_ctx, fr_pair_list_t *out)
static inline CC_HINT(nonnull) int fr_bio_packet_read(fr_bio_packet_t *my, void **request_ctx_p, fr_packet_t **packet_p, TALLOC_CTX *out_ctx, fr_pair_list_t *out)
{
return bio->read(bio, request_ctx_p, packet_p, out_ctx, out);
return my->read(my, request_ctx_p, packet_p, out_ctx, out);
}

/** Write a packet to a packet BIO
Expand All @@ -102,15 +104,58 @@ static inline CC_HINT(nonnull) int fr_bio_packet_read(fr_bio_packet_t *bio, void
* check for that case, and handle blocking errors. Typically by pushing the packet to a queue, and trying
* it again later.
*
* @param bio the packet-based bio
* @param my the packet-based bio
* @param request_ctx the larger context for the packet
* @param packet the output packet descriptor. Contains raw protocol data (IDs, counts, etc.)
* @param list of pairs to write
* @return
* - <0 on error. This is fr_bio_error(FOO)
* - 0 for success
*/
static inline CC_HINT(nonnull) int fr_bio_packet_write(fr_bio_packet_t *bio, void *request_ctx, fr_packet_t *packet, fr_pair_list_t *list)
static inline CC_HINT(nonnull) int fr_bio_packet_write(fr_bio_packet_t *my, void *request_ctx, fr_packet_t *packet, fr_pair_list_t *list)
{
return bio->write(bio, request_ctx, packet, list);
int rcode;

/*
* We don't allow more writes if the bio is blocked.
*/
if (my->write_blocked) return fr_bio_error(IO_WOULD_BLOCK);

rcode = my->write(my, request_ctx, packet, list);
if (rcode == 0) return 0;

my->write_blocked = (rcode == fr_bio_error(IO_WOULD_BLOCK));
return rcode;
}

/** Flush a bio which is blocked.
*
* Note that the bio MAY return fr_bio_error(IO_WOULD_BLOCK), which is not a fatal error. The caller has to
* check for that case, and handle blocking errors. Typically by pushing the packet to a queue, and trying
* it again later.
*
* @param my the packet-based bio
* @return
* - <0 on error. This is fr_bio_error(FOO)
* - 0 for success
*/
static inline CC_HINT(nonnull) int fr_bio_packet_write_flush(fr_bio_packet_t *my)
{
ssize_t slen;

if (!my->write_blocked) return 0;

/*
* Note that "wrote no data" means that there's no pending data, so we're no longer blocked.
*/
slen = my->bio->write(my->bio, NULL, NULL, SIZE_MAX);
if (slen >= 0) {
my->write_blocked = false;
return 0;
}

/*
* Likely a fatal bio error.
*/
return slen;
}
8 changes: 3 additions & 5 deletions src/protocols/radius/client.c
Original file line number Diff line number Diff line change
Expand Up @@ -167,12 +167,10 @@ int fr_radius_client_fd_bio_write(fr_radius_client_fd_bio_t *my, void *request_c
}

/*
* Didn't write anything, that's a blocking error.
* We are using an outgoing memory bio, which takes care of writing partial packets. As a
* result, our call to the bio will always return that a full packet was written.
*/
if (slen == 0) {
fr_radius_code_id_push(my->codes, packet);
return fr_bio_error(IO_WOULD_BLOCK);
}
fr_assert((size_t) slen == packet->data_len);

my->info.outstanding++;

Expand Down

0 comments on commit d50a131

Please sign in to comment.