Skip to content

Commit

Permalink
ch4/ofi: remove limit in pipeline recv chunk progress
Browse files Browse the repository at this point in the history
Remove the limit in posting gpu pipeline recv chunks. The limit can be
controlled by the maximum chunks from
MPIDI_OFI_global.gpu_pipeline_recv_pool or when the libfabric return
EAGAIN.

In progressing the recv_chunk_alloc, we'll issue as many chunks as we
can instead of one at a time.

Refactor the code to have single exit point.
  • Loading branch information
hzhou committed Mar 5, 2024
1 parent 8e5a2c2 commit 55b98ba
Showing 1 changed file with 27 additions and 20 deletions.
47 changes: 27 additions & 20 deletions src/mpid/ch4/netmod/ofi/ofi_gpu_pipeline.c
Original file line number Diff line number Diff line change
Expand Up @@ -325,35 +325,39 @@ static int start_recv_chunk(MPIR_Request * rreq, int n_chunks)

static int recv_chunk_alloc_poll(MPIR_Async_thing * thing)
{
int ret = MPIR_ASYNC_THING_NOPROGRESS;
struct recv_chunk_alloc *p = MPIR_Async_thing_get_state(thing);
MPIR_Request *rreq = p->rreq;

/* arbitrary threshold */
if (MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.num_inrecv) > 1) {
return MPIR_ASYNC_THING_NOPROGRESS;
}
while (true) {
bool rc = issue_recv_alloc(rreq, false /* is_init */);
if (!rc) {
goto fn_exit;
}

bool ret = issue_recv_alloc(rreq, false /* is_init */);
if (ret) {
p->issued_chunks++;
if (p->issued_chunks == p->n_chunks) {
MPL_free(p);
return MPIR_ASYNC_THING_DONE;
ret = MPIR_ASYNC_THING_DONE;
goto fn_exit;
} else {
return MPIR_ASYNC_THING_UPDATED;
ret = MPIR_ASYNC_THING_UPDATED;
}
}

return MPIR_ASYNC_THING_NOPROGRESS;
fn_exit:
return ret;
}

/* ---- */
static bool issue_recv_alloc(MPIR_Request * rreq, bool is_init)
{
bool ret;
void *host_buf;
MPIDU_genq_private_pool_alloc_cell(MPIDI_OFI_global.gpu_pipeline_recv_pool, &host_buf);
if (!host_buf) {
return MPIR_ASYNC_THING_NOPROGRESS;
ret = false;
goto fn_exit;
}

fi_addr_t remote_addr = MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.remote_addr);
Expand All @@ -378,22 +382,25 @@ static bool issue_recv_alloc(MPIR_Request * rreq, bool is_init)
chunk_req->event_id = MPIDI_OFI_EVENT_RECV_GPU_PIPELINE;
}
MPID_THREAD_CS_ENTER(VCI, MPIDI_VCI(vci).lock);
int ret = fi_trecv(MPIDI_OFI_global.ctx[ctx_idx].rx,
host_buf, MPIR_CVAR_CH4_OFI_GPU_PIPELINE_BUFFER_SZ, NULL, remote_addr,
match_bits, mask_bits, (void *) &chunk_req->context);
int rc = fi_trecv(MPIDI_OFI_global.ctx[ctx_idx].rx,
host_buf, MPIR_CVAR_CH4_OFI_GPU_PIPELINE_BUFFER_SZ, NULL, remote_addr,
match_bits, mask_bits, (void *) &chunk_req->context);
MPID_THREAD_CS_EXIT(VCI, MPIDI_VCI(vci).lock);
if (ret == 0) {
if (rc == 0) {
MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.num_inrecv) += 1;
/* chunk_req and host_buf will be freed in recv_events */
return true;
}
if (ret != -FI_EAGAIN && ret != -FI_ENOMEM) {
/* unexpected error */
MPIR_Assert(0);
ret = true;
goto fn_exit;
}

/* assert unexpected error */
MPIR_Assert(rc != -FI_EAGAIN && rc != -FI_ENOMEM);

MPIDU_genq_private_pool_free_cell(MPIDI_OFI_global.gpu_pipeline_recv_pool, host_buf);
MPL_free(chunk_req);
return false;

fn_exit:
return ret;
};

/* ------------------------------------
Expand Down

0 comments on commit 55b98ba

Please sign in to comment.