Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ch4/isend-irecv: add parent reques completion in isend/irecv #6640

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 5 additions & 4 deletions src/mpid/ch4/ch4_api.txt
Original file line number Diff line number Diff line change
Expand Up @@ -136,14 +136,14 @@ Non Native API:

Native API:
mpi_isend : int
NM*: buf, count, datatype, rank, tag, comm, attr-2, addr, req_p
SHM*: buf, count, datatype, rank, tag, comm, attr-2, addr, req_p
NM*: buf, count, datatype, rank, tag, comm, attr-2, addr, parent_cc_ptr, req_p
SHM*: buf, count, datatype, rank, tag, comm, attr-2, addr, parent_cc_ptr, req_p
mpi_cancel_send : int
NM*: sreq
SHM*: sreq
mpi_irecv : int
NM*: buf-2, count, datatype, rank, tag, comm, attr-2, addr, req_p, partner
SHM*: buf-2, count, datatype, rank, tag, comm, attr-2, req_p
NM*: buf-2, count, datatype, rank, tag, comm, attr-2, addr, parent_cc_ptr, req_p, partner
SHM*: buf-2, count, datatype, rank, tag, comm, attr-2, parent_cc_ptr, req_p
mpi_imrecv : int
NM*: buf-2, count, datatype, message
SHM*: buf-2, count, datatype, message
Expand Down Expand Up @@ -477,6 +477,7 @@ PARAM:
origin_addr-2: void *
origin_count: MPI_Aint
origin_datatype: MPI_Datatype
parent_cc_ptr: MPIR_cc_t*
partner: MPIR_Request *
port_name: const char *
port_name-2: char *
Expand Down
14 changes: 13 additions & 1 deletion src/mpid/ch4/netmod/include/netmod_am_fallback_send.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@
#ifndef NETMOD_AM_FALLBACK_SEND_H_INCLUDED
#define NETMOD_AM_FALLBACK_SEND_H_INCLUDED

/* parent_cc_ptr: allows the created request to decrement the counter of a parent request when completed. use NULL to not use this mechanism */
MPL_STATIC_INLINE_PREFIX int MPIDI_NM_mpi_isend(const void *buf,
MPI_Aint count,
MPI_Datatype datatype,
int rank,
int tag,
MPIR_Comm * comm, int attr,
MPIDI_av_entry_t * addr, MPIR_Request ** request)
MPIDI_av_entry_t * addr,
MPIR_cc_t * parent_cc_ptr, MPIR_Request ** request)
{
int mpi_errno = MPI_SUCCESS;

Expand All @@ -27,6 +29,16 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_NM_mpi_isend(const void *buf,
MPID_THREAD_CS_ENTER(VCI, MPIDI_VCI(vci_src).lock);
mpi_errno = MPIDIG_mpi_isend(buf, count, datatype, rank, tag, comm, context_offset, addr,
vci_src, vci_dst, request, syncflag, errflag);
/* if the parent_cc_ptr exists */
if (parent_cc_ptr) {
if (MPIR_Request_is_complete(*request)) {
/* if the request is already completed, decrement the parent counter */
MPIR_cc_dec(parent_cc_ptr);
} else {
/* if the request is not done yet, assign the completion pointer to the parent one and it will be decremented later */
(*request)->dev.completion_notification = parent_cc_ptr;
}
}
Comment on lines +33 to +41
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be done by the caller instead? it might be cleaner, assuming we have access to the parent request when each operation is issued. something like:

MPIDI_NM_mpi_isend(..., &childreq);
if (childreq && !MPIR_Request_is_complete(childreq)) {
    childreq->dev.completion_notification = parentreq->cc_ptr;
} else {
    MPID_Request_complete(parentreq);
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand your suggestion correctly, we would exit the lock around the child request creation which might not work when async progress is enabled (as you suggested a few months ago :-) )
The reason is that if you are not in the same lock section, you have no guarantee the completion notification will be triggered: you might complete the request between the release of the lock at creation and the assignment of childreq->dev.completion_notification = parentreq->cc_ptr;

MPID_THREAD_CS_EXIT(VCI, MPIDI_VCI(vci_src).lock);

return mpi_errno;
Expand Down
16 changes: 14 additions & 2 deletions src/mpid/ch4/netmod/ofi/ofi_recv.h
Original file line number Diff line number Diff line change
Expand Up @@ -319,8 +319,9 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_NM_mpi_irecv(void *buf,
int rank,
int tag,
MPIR_Comm * comm, int attr,
MPIDI_av_entry_t * addr, MPIR_Request ** request,
MPIR_Request * partner)
MPIDI_av_entry_t * addr,
MPIR_cc_t * parent_cc_ptr,
MPIR_Request ** request, MPIR_Request * partner)
{
int mpi_errno = MPI_SUCCESS;
MPIR_FUNC_ENTER;
Expand Down Expand Up @@ -354,6 +355,17 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_NM_mpi_irecv(void *buf,
MPIDI_OFI_ON_HEAP, 0ULL);
MPIDI_REQUEST_SET_LOCAL(*request, 0, partner);
}

/* if the parent_cc_ptr exists */
if (parent_cc_ptr) {
if (MPIR_Request_is_complete(*request)) {
/* if the request is already completed, decrement the parent counter */
MPIR_cc_dec(parent_cc_ptr);
} else {
/* if the request is not done yet, assign the completion pointer to the parent one and it will be decremented later */
(*request)->dev.completion_notification = parent_cc_ptr;
}
}
if (need_cs) {
MPIDI_OFI_THREAD_CS_EXIT_VCI_OPTIONAL(vci_dst);
}
Expand Down
15 changes: 14 additions & 1 deletion src/mpid/ch4/netmod/ofi/ofi_send.h
Original file line number Diff line number Diff line change
Expand Up @@ -460,10 +460,12 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_send(const void *buf, MPI_Aint count, MPI
} \
} while (0)

/* parent_cc_ptr: allows the created request to decrement the counter of a parent request when completed. use NULL to not use this mechanism */
MPL_STATIC_INLINE_PREFIX int MPIDI_NM_mpi_isend(const void *buf, MPI_Aint count,
MPI_Datatype datatype, int rank, int tag,
MPIR_Comm * comm, int attr,
MPIDI_av_entry_t * addr, MPIR_Request ** request)
MPIDI_av_entry_t * addr,
MPIR_cc_t * parent_cc_ptr, MPIR_Request ** request)
{
int mpi_errno;
MPIR_FUNC_ENTER;
Expand All @@ -485,6 +487,17 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_NM_mpi_isend(const void *buf, MPI_Aint count,
context_offset, addr, vci_src, vci_dst,
request, 0, syncflag, errflag);
}

/* if the parent_cc_ptr exists */
if (parent_cc_ptr) {
if (MPIR_Request_is_complete(*request)) {
/* if the request is already completed, decrement the parent counter */
MPIR_cc_dec(parent_cc_ptr);
} else {
/* if the request is not done yet, assign the completion pointer to the parent one and it will be decremented later */
(*request)->dev.completion_notification = parent_cc_ptr;
}
}
MPIDI_OFI_THREAD_CS_EXIT_VCI_OPTIONAL(vci_src);

MPIR_FUNC_EXIT;
Expand Down
16 changes: 14 additions & 2 deletions src/mpid/ch4/netmod/ucx/ucx_recv.h
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,9 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_NM_mpi_irecv(void *buf,
int rank,
int tag,
MPIR_Comm * comm, int attr,
MPIDI_av_entry_t * addr, MPIR_Request ** request,
MPIR_Request * partner)
MPIDI_av_entry_t * addr,
MPIR_cc_t * parent_cc_ptr,
MPIR_Request ** request, MPIR_Request * partner)
{
int mpi_errno;
MPIR_FUNC_ENTER;
Expand All @@ -238,6 +239,17 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_NM_mpi_irecv(void *buf,
MPIDI_UCX_recv(buf, count, datatype, rank, tag, comm, context_offset, addr, vci_dst,
request);
MPIDI_REQUEST_SET_LOCAL(*request, 0, partner);

/* if the parent_cc_ptr exists */
if (parent_cc_ptr) {
if (MPIR_Request_is_complete(*request)) {
/* if the request is already completed, decrement the parent counter */
MPIR_cc_dec(parent_cc_ptr);
} else {
/* if the request is not done yet, assign the completion pointer to the parent one and it will be decremented later */
(*request)->dev.completion_notification = parent_cc_ptr;
}
}
if (need_cs) {
MPIDI_UCX_THREAD_CS_EXIT_VCI(vci_dst);
}
Expand Down
13 changes: 12 additions & 1 deletion src/mpid/ch4/netmod/ucx/ucx_send.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_NM_mpi_isend(const void *buf,
int rank,
int tag,
MPIR_Comm * comm, int attr,
MPIDI_av_entry_t * addr, MPIR_Request ** request)
MPIDI_av_entry_t * addr,
MPIR_cc_t * parent_cc_ptr, MPIR_Request ** request)
{
int mpi_errno;
MPIR_FUNC_ENTER;
Expand All @@ -147,6 +148,16 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_NM_mpi_isend(const void *buf,
MPIDI_UCX_THREAD_CS_ENTER_VCI(vci_src);
mpi_errno = MPIDI_UCX_send(buf, count, datatype, rank, tag, comm, context_offset,
addr, request, vci_src, vci_dst, 1, is_sync);
/* if the parent_cc_ptr exists */
if (parent_cc_ptr) {
if (MPIR_Request_is_complete(*request)) {
/* if the request is already completed, decrement the parent counter */
MPIR_cc_dec(parent_cc_ptr);
} else {
/* if the request is not done yet, assign the completion pointer to the parent one and it will be decremented later */
(*request)->dev.completion_notification = parent_cc_ptr;
}
}
MPIDI_UCX_THREAD_CS_EXIT_VCI(vci_src);

MPIR_FUNC_EXIT;
Expand Down
19 changes: 16 additions & 3 deletions src/mpid/ch4/shm/ipc/src/ipc_send.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_IPCI_try_lmt_isend(const void *buf, MPI_Aint
MPI_Datatype datatype, int rank, int tag,
MPIR_Comm * comm, int attr,
MPIDI_av_entry_t * addr,
MPIR_cc_t * parent_cc_ptr,
MPIR_Request ** request, bool * done)
{
int mpi_errno = MPI_SUCCESS;
Expand Down Expand Up @@ -150,6 +151,17 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_IPCI_try_lmt_isend(const void *buf, MPI_Aint
/* TODO: add flattening datatype protocol for noncontig send. Different
* threshold may be required to tradeoff the flattening overhead.*/
}

/* if the parent_cc_ptr exists */
if (parent_cc_ptr) {
if (MPIR_Request_is_complete(*request)) {
/* if the request is already completed, decrement the parent counter */
MPIR_cc_dec(parent_cc_ptr);
} else {
/* if the request is not done yet, assign the completion pointer to the parent one and it will be decremented later */
(*request)->dev.completion_notification = parent_cc_ptr;
}
}
fn_exit:
MPIDI_POSIX_THREAD_CS_EXIT_VCI(vci_src);
MPIR_FUNC_EXIT;
Expand All @@ -161,19 +173,20 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_IPCI_try_lmt_isend(const void *buf, MPI_Aint
MPL_STATIC_INLINE_PREFIX int MPIDI_IPC_mpi_isend(const void *buf, MPI_Aint count,
MPI_Datatype datatype, int rank, int tag,
MPIR_Comm * comm, int attr,
MPIDI_av_entry_t * addr, MPIR_Request ** request)
MPIDI_av_entry_t * addr,
MPIR_cc_t * parent_cc_ptr, MPIR_Request ** request)
{
int mpi_errno = MPI_SUCCESS;
MPIR_FUNC_ENTER;

bool done = false;
mpi_errno = MPIDI_IPCI_try_lmt_isend(buf, count, datatype, rank, tag, comm,
attr, addr, request, &done);
attr, addr, parent_cc_ptr, request, &done);
MPIR_ERR_CHECK(mpi_errno);

if (!done) {
mpi_errno = MPIDI_POSIX_mpi_isend(buf, count, datatype, rank, tag, comm,
attr, addr, request);
attr, addr, parent_cc_ptr, request);
MPIR_ERR_CHECK(mpi_errno);
}

Expand Down
13 changes: 13 additions & 0 deletions src/mpid/ch4/shm/posix/posix_recv.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_POSIX_mpi_irecv(void *buf,
int rank,
int tag,
MPIR_Comm * comm, int attr,
MPIR_cc_t * parent_cc_ptr,
MPIR_Request ** request)
{
int context_offset = MPIR_PT2PT_ATTR_CONTEXT_OFFSET(attr);
Expand All @@ -67,6 +68,18 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_POSIX_mpi_irecv(void *buf,
}
int mpi_errno = MPIDIG_mpi_irecv(buf, count, datatype, rank, tag, comm, context_offset,
vci, request, 1, NULL);

/* if the lock is not set, we come from anysource and it's ok to set the counter here */
if (parent_cc_ptr) {
if (MPIR_Request_is_complete(*request)) {
/* if the request is already completed, decrement the parent counter */
MPIR_cc_dec(parent_cc_ptr);
} else {
/* if the request is not done yet, assign the completion pointer to the parent one and it will be decremented later */
(*request)->dev.completion_notification = parent_cc_ptr;
}
}

if (need_lock) {
MPIDI_POSIX_THREAD_CS_EXIT_VCI(vci);
}
Expand Down
16 changes: 15 additions & 1 deletion src/mpid/ch4/shm/posix/posix_send.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
MPL_STATIC_INLINE_PREFIX int MPIDI_POSIX_mpi_isend(const void *buf, MPI_Aint count,
MPI_Datatype datatype, int rank, int tag,
MPIR_Comm * comm, int attr,
MPIDI_av_entry_t * addr, MPIR_Request ** request)
MPIDI_av_entry_t * addr,
MPIR_cc_t * parent_cc_ptr,
MPIR_Request ** request)
{
int mpi_errno = MPI_SUCCESS;

Expand Down Expand Up @@ -78,6 +80,18 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_POSIX_mpi_isend(const void *buf, MPI_Aint cou
mpi_errno = MPIDIG_mpi_isend(buf, count, datatype, rank, tag, comm, context_offset, addr,
vci_src, vci_dst, request, syncflag, errflag);
}
/* if the parent_cc_ptr exists */
if (parent_cc_ptr) {
if (MPIR_Request_is_complete(*request)) {
/* if the request is already completed, decrement the parent counter */
MPIR_cc_dec(parent_cc_ptr);
} else {
/* if the request is not done yet, assign the completion pointer to the parent one and
* it will be decremented later */
(*request)->dev.completion_notification = parent_cc_ptr;
}
}

MPIDI_POSIX_THREAD_CS_EXIT_VCI(vci_src);

return mpi_errno;
Expand Down
14 changes: 13 additions & 1 deletion src/mpid/ch4/shm/src/shm_am_fallback_recv.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_SHM_mpi_irecv(void *buf,
int rank,
int tag,
MPIR_Comm * comm, int context_offset,
MPIR_Request ** request)
MPIR_cc_t * parent_cc_ptr, MPIR_Request ** request)
{
int mpi_errno = MPI_SUCCESS;

Expand All @@ -45,6 +45,18 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_SHM_mpi_irecv(void *buf,

mpi_errno = MPIDIG_mpi_irecv(buf, count, datatype, rank, tag, comm, context_offset,
vci, request, 1, NULL);

/* if no lock is needed then we come from the anysource code so it's correct to assign the parent_ptr anyway */
if (parent_cc_ptr) {
if (MPIR_Request_is_complete(*request)) {
/* if the request is already completed, decrement the parent counter */
MPIR_cc_dec(parent_cc_ptr);
} else {
/* if the request is not done yet, assign the completion pointer to the parent one and it will be decremented later */
(*request)->dev.completion_notification = parent_cc_ptr;
}
}

if (need_lock) {
MPID_THREAD_CS_EXIT(VCI, MPIDI_VCI(vci).lock);
}
Expand Down
15 changes: 14 additions & 1 deletion src/mpid/ch4/shm/src/shm_am_fallback_send.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_SHM_mpi_isend(const void *buf,
int rank,
int tag,
MPIR_Comm * comm, int attr,
MPIDI_av_entry_t * addr, MPIR_Request ** request)
MPIDI_av_entry_t * addr,
MPIR_cc_t * parent_cc_ptr, MPIR_Request ** request)
{
int mpi_errno = MPI_SUCCESS;

Expand All @@ -27,6 +28,18 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_SHM_mpi_isend(const void *buf,
MPID_THREAD_CS_ENTER(VCI, MPIDI_VCI(vci_src).lock);
mpi_errno = MPIDIG_mpi_isend(buf, count, datatype, rank, tag, comm, context_offset, addr,
vci_src, vci_dst, request, syncflag, errflag);

/* if the parent_cc_ptr exists */
if (parent_cc_ptr) {
if (MPIR_Request_is_complete(*request)) {
/* if the request is already completed, decrement the parent counter */
MPIR_cc_dec(parent_cc_ptr);
} else {
/* if the request is not done yet, assign the completion pointer to the parent one and it will be decremented later */
(*request)->dev.completion_notification = parent_cc_ptr;
}
}

MPID_THREAD_CS_EXIT(VCI, MPIDI_VCI(vci_src).lock);

return mpi_errno;
Expand Down
16 changes: 10 additions & 6 deletions src/mpid/ch4/shm/src/shm_p2p.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
MPL_STATIC_INLINE_PREFIX int MPIDI_SHM_mpi_isend(const void *buf, MPI_Aint count,
MPI_Datatype datatype, int rank, int tag,
MPIR_Comm * comm, int attr,
MPIDI_av_entry_t * addr, MPIR_Request ** request)
MPIDI_av_entry_t * addr,
MPIR_cc_t * parent_cc_ptr, MPIR_Request ** request)
{
int mpi_errno = MPI_SUCCESS;

Expand All @@ -24,10 +25,11 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_SHM_mpi_isend(const void *buf, MPI_Aint count
* always use MPIR_Localcopy to move the data and support GPU buffers efficiently
* with yaksa */
if (rank == comm->rank || count == 0) {
mpi_errno =
MPIDI_POSIX_mpi_isend(buf, count, datatype, rank, tag, comm, attr, addr, request);
mpi_errno = MPIDI_POSIX_mpi_isend(buf, count, datatype, rank, tag, comm, attr, addr,
parent_cc_ptr, request);
} else {
mpi_errno = MPIDI_IPC_mpi_isend(buf, count, datatype, rank, tag, comm, attr, addr, request);
mpi_errno = MPIDI_IPC_mpi_isend(buf, count, datatype, rank, tag, comm, attr, addr,
parent_cc_ptr request);
}
MPIR_ERR_CHECK(mpi_errno);

Expand All @@ -52,13 +54,15 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_SHM_mpi_cancel_send(MPIR_Request * sreq)

MPL_STATIC_INLINE_PREFIX int MPIDI_SHM_mpi_irecv(void *buf, MPI_Aint count, MPI_Datatype datatype,
int rank, int tag, MPIR_Comm * comm,
int context_offset, MPIR_Request ** request)
int context_offset,
MPIR_cc_t * parent_cc_ptr, MPIR_Request ** request)
{
int mpi_errno = MPI_SUCCESS;
MPIR_FUNC_ENTER;

mpi_errno =
MPIDI_POSIX_mpi_irecv(buf, count, datatype, rank, tag, comm, context_offset, request);
MPIDI_POSIX_mpi_irecv(buf, count, datatype, rank, tag, comm, context_offset, parent_cc_ptr,
request);
MPIR_ERR_CHECK(mpi_errno);

fn_exit:
Expand Down