Skip to content

Commit

Permalink
Merge pull request #6549 from raffenet/4.1.x-fixes
Browse files Browse the repository at this point in the history
[4.1.x] another batch of fixes

Approved-by: Hui Zhou <hzhou321@anl.gov>
  • Loading branch information
raffenet committed Jun 6, 2023
2 parents d2df7a0 + e802e44 commit fba49d6
Show file tree
Hide file tree
Showing 17 changed files with 231 additions and 122 deletions.
11 changes: 10 additions & 1 deletion CHANGES
Expand Up @@ -9,17 +9,26 @@

# Fix compiler wrapper scripts to be compatible with CUDA memory hooks

# Fix MPI_WAITALL_ENQUEUE to make a copy of the input request array
# Fix MPIX_WAITALL_ENQUEUE to make a copy of the input request array

# Fix bug in MPI_ALLREDUCE that could result in ranks receiving
different floating point values

# Fix potential deadlock when progressing RMA windows

# Fix potential crash in MPI_REDUCE with non-zero root and MPI_IN_PLACE

# Fix potential crash during probe with libfabric CXI provider

# Fix MPI_PARRIVED when the partitioned request is inactive

# Fix potential bug when an attribute delete callback deletes another
attribute on the same object

# Fix build issue in ROMIO Lustre driver

# Improve Fortran 2008 binding support detection during configure

# Report an error if collective tuning json file fails to open

# Several fixes for testsuite programs and build configuration
Expand Down
1 change: 1 addition & 0 deletions confdb/aclocal_fc.m4
Expand Up @@ -1092,6 +1092,7 @@ int foo_c(CFI_cdesc_t * a_desc, CFI_cdesc_t * b_desc)
void test_assumed_rank_async_impl_c(CFI_cdesc_t * a_desc)
{
CFI_is_contiguous(a_desc);
return;
}
]])],[mv conftest.$OBJEXT conftest1.$OBJEXT],[f08_works=no])
Expand Down
2 changes: 1 addition & 1 deletion modules/libfabric
2 changes: 1 addition & 1 deletion src/include/mpir_coll.h
Expand Up @@ -47,7 +47,7 @@ int MPIC_Issend(const void *buf, MPI_Aint count, MPI_Datatype datatype, int dest
MPIR_Comm * comm_ptr, MPIR_Request ** request, MPIR_Errflag_t * errflag);
int MPIC_Irecv(void *buf, MPI_Aint count, MPI_Datatype datatype, int source,
int tag, MPIR_Comm * comm_ptr, MPIR_Request ** request);
int MPIC_Waitall(int numreq, MPIR_Request * requests[], MPI_Status statuses[],
int MPIC_Waitall(int numreq, MPIR_Request * requests[], MPI_Status * statuses,
MPIR_Errflag_t * errflag);

int MPIR_Reduce_local(const void *inbuf, void *inoutbuf, MPI_Aint count, MPI_Datatype datatype,
Expand Down
10 changes: 10 additions & 0 deletions src/include/mpir_datatype.h
Expand Up @@ -196,6 +196,16 @@ void MPIR_Datatype_get_flattened(MPI_Datatype type, void **flattened, int *flatt
basic_type_ = MPI_DATATYPE_NULL; \
} while (0)

#define MPIR_Datatype_is_float(a, is_float) do { \
MPI_Datatype basic_type; \
MPIR_Datatype_get_basic_type(a, basic_type); \
if (basic_type == MPI_FLOAT || basic_type == MPI_DOUBLE) { \
is_float = true; \
} else { \
is_float = false; \
} \
} while (0)

#define MPIR_Datatype_get_ptr(a,ptr) MPIR_Getb_ptr(Datatype,DATATYPE,a,0x000000ff,ptr)

/* Note: Probably there is some clever way to build all of these from a macro.
Expand Down
180 changes: 71 additions & 109 deletions src/mpi/coll/allreduce/allreduce_intra_recexch.c
Expand Up @@ -9,6 +9,9 @@
#include "recexchalgo.h"
#include "algo_common.h"

static int find_myidx(int *nbrs, int k, int rank);
static int do_reduce(void **bufs, void *recvbuf, int n, int idx,
MPI_Aint count, MPI_Datatype datatype, MPI_Op op);

int MPIR_Allreduce_intra_recexch(const void *sendbuf,
void *recvbuf,
Expand All @@ -35,6 +38,9 @@ int MPIR_Allreduce_intra_recexch(const void *sendbuf,
nranks = comm->local_size;
is_commutative = MPIR_Op_is_commutative(op);

bool is_float;
MPIR_Datatype_is_float(datatype, is_float);

/* if there is only 1 rank, copy data from sendbuf
* to recvbuf and exit */
if (nranks == 1) {
Expand Down Expand Up @@ -194,12 +200,6 @@ int MPIR_Allreduce_intra_recexch(const void *sendbuf,


/* step2 */
if (!is_commutative && in_step2 && count > 0) {
/* sort the neighbor list so that receives can be posted in order */
for (phase = 0; phase < step2_nphases; phase++)
qsort(step2_nbrs[phase], k - 1, sizeof(int), MPII_Algo_compare_int);
}

/* step2 sends and reduces */
for (phase = 0; phase < step2_nphases && in_step2; phase++) {
buf = 0;
Expand All @@ -223,7 +223,6 @@ int MPIR_Allreduce_intra_recexch(const void *sendbuf,
}

send_nreq = 0;
myidx = 0;
/* send data to all the neighbors */
for (i = 0; i < k - 1; i++) {
nbr = step2_nbrs[phase][i];
Expand All @@ -238,72 +237,37 @@ int MPIR_Allreduce_intra_recexch(const void *sendbuf,
MPIR_ERR_ADD(mpi_errno_ret, mpi_errno);
}
if (rank > nbr) {
myidx = i + 1;
}
}

mpi_errno = MPIC_Waitall(send_nreq, send_reqs, MPI_STATUSES_IGNORE, errflag);
if (mpi_errno && mpi_errno != MPI_ERR_IN_STATUS)
MPIR_ERR_POP(mpi_errno);

buf = myidx - 1;

mpi_errno = MPIC_Waitall((k - 1), recv_reqs, MPI_STATUSES_IGNORE, errflag);
if (mpi_errno && mpi_errno != MPI_ERR_IN_STATUS)
MPIR_ERR_POP(mpi_errno);

for (i = myidx - 1; i >= 0 && count > 0; i--, buf--) {
mpi_errno = MPIR_Reduce_local(nbr_buffer[buf], recvbuf, count, datatype, op);
if (mpi_errno) {
/* for communication errors, just record the error but continue */
*errflag =
MPIX_ERR_PROC_FAILED ==
MPIR_ERR_GET_CLASS(mpi_errno) ? MPIR_ERR_PROC_FAILED : MPIR_ERR_OTHER;
MPIR_ERR_SET(mpi_errno, *errflag, "**fail");
MPIR_ERR_ADD(mpi_errno_ret, mpi_errno);
}
if (is_commutative && !is_float) {
myidx = k - 1;
} else {
myidx = find_myidx(step2_nbrs[phase], k, rank);
}

buf = myidx;
for (i = myidx; i < k - 1 && count > 0; i++, buf++) {
if (is_commutative) {
mpi_errno = MPIR_Reduce_local(nbr_buffer[buf], recvbuf, count, datatype, op);
if (mpi_errno) {
/* for communication errors, just record the error but continue */
*errflag =
MPIX_ERR_PROC_FAILED ==
MPIR_ERR_GET_CLASS(mpi_errno) ? MPIR_ERR_PROC_FAILED : MPIR_ERR_OTHER;
MPIR_ERR_SET(mpi_errno, *errflag, "**fail");
MPIR_ERR_ADD(mpi_errno_ret, mpi_errno);
}
} else {
mpi_errno = MPIR_Reduce_local(recvbuf, nbr_buffer[buf], count, datatype, op);
if (mpi_errno) {
/* for communication errors, just record the error but continue */
*errflag =
MPIX_ERR_PROC_FAILED ==
MPIR_ERR_GET_CLASS(mpi_errno) ? MPIR_ERR_PROC_FAILED : MPIR_ERR_OTHER;
MPIR_ERR_SET(mpi_errno, *errflag, "**fail");
MPIR_ERR_ADD(mpi_errno_ret, mpi_errno);
}

mpi_errno =
MPIR_Localcopy(nbr_buffer[buf], count, datatype, recvbuf, count, datatype);
if (mpi_errno) {
/* for communication errors, just record the error but continue */
*errflag =
MPIX_ERR_PROC_FAILED ==
MPIR_ERR_GET_CLASS(mpi_errno) ? MPIR_ERR_PROC_FAILED : MPIR_ERR_OTHER;
MPIR_ERR_SET(mpi_errno, *errflag, "**fail");
MPIR_ERR_ADD(mpi_errno_ret, mpi_errno);
}
}
mpi_errno = do_reduce(nbr_buffer, recvbuf, k, myidx, count, datatype, op);
if (mpi_errno) {
/* for communication errors, just record the error but continue */
*errflag =
MPIX_ERR_PROC_FAILED ==
MPIR_ERR_GET_CLASS(mpi_errno) ? MPIR_ERR_PROC_FAILED : MPIR_ERR_OTHER;
MPIR_ERR_SET(mpi_errno, *errflag, "**fail");
MPIR_ERR_ADD(mpi_errno_ret, mpi_errno);
}

if (single_phase_recv == false) { /* post sends and do reduction for the 2nd phase */
phase++;
if (phase < step2_nphases) {
send_nreq = 0;
myidx = 0;
/* send data to all the neighbors */
for (i = 0; i < k - 1; i++) {
nbr = step2_nbrs[phase][i];
Expand All @@ -319,9 +283,6 @@ int MPIR_Allreduce_intra_recexch(const void *sendbuf,
MPIR_ERR_SET(mpi_errno, *errflag, "**fail");
MPIR_ERR_ADD(mpi_errno_ret, mpi_errno);
}
if (rank > nbr) {
myidx = i + 1;
}
}

mpi_errno = MPIC_Waitall(send_nreq, send_reqs, MPI_STATUSES_IGNORE, errflag);
Expand All @@ -333,58 +294,19 @@ int MPIR_Allreduce_intra_recexch(const void *sendbuf,
if (mpi_errno && mpi_errno != MPI_ERR_IN_STATUS)
MPIR_ERR_POP(mpi_errno);

buf = (k - 1) + myidx - 1;
for (i = myidx - 1; i >= 0 && count > 0; i--, buf--) {
mpi_errno = MPIR_Reduce_local(nbr_buffer[buf], recvbuf, count, datatype, op);
if (mpi_errno) {
/* for communication errors, just record the error but continue */
*errflag =
MPIX_ERR_PROC_FAILED ==
MPIR_ERR_GET_CLASS(mpi_errno) ? MPIR_ERR_PROC_FAILED : MPIR_ERR_OTHER;
MPIR_ERR_SET(mpi_errno, *errflag, "**fail");
MPIR_ERR_ADD(mpi_errno_ret, mpi_errno);
}
if (is_commutative && !is_float) {
myidx = k - 1;
} else {
myidx = find_myidx(step2_nbrs[phase], k, rank);
}

buf = (k - 1) + myidx;
for (i = myidx; i < k - 1 && count > 0; i++, buf++) {
if (is_commutative) {
mpi_errno =
MPIR_Reduce_local(nbr_buffer[buf], recvbuf, count, datatype, op);
if (mpi_errno) {
/* for communication errors, just record the error but continue */
*errflag =
MPIX_ERR_PROC_FAILED ==
MPIR_ERR_GET_CLASS(mpi_errno) ? MPIR_ERR_PROC_FAILED :
MPIR_ERR_OTHER;
MPIR_ERR_SET(mpi_errno, *errflag, "**fail");
MPIR_ERR_ADD(mpi_errno_ret, mpi_errno);
}
} else {
mpi_errno =
MPIR_Reduce_local(recvbuf, nbr_buffer[buf], count, datatype, op);
if (mpi_errno) {
/* for communication errors, just record the error but continue */
*errflag =
MPIX_ERR_PROC_FAILED ==
MPIR_ERR_GET_CLASS(mpi_errno) ? MPIR_ERR_PROC_FAILED :
MPIR_ERR_OTHER;
MPIR_ERR_SET(mpi_errno, *errflag, "**fail");
MPIR_ERR_ADD(mpi_errno_ret, mpi_errno);
}
mpi_errno =
MPIR_Localcopy(nbr_buffer[buf], count, datatype, recvbuf, count,
datatype);
if (mpi_errno) {
/* for communication errors, just record the error but continue */
*errflag =
MPIX_ERR_PROC_FAILED ==
MPIR_ERR_GET_CLASS(mpi_errno) ? MPIR_ERR_PROC_FAILED :
MPIR_ERR_OTHER;
MPIR_ERR_SET(mpi_errno, *errflag, "**fail");
MPIR_ERR_ADD(mpi_errno_ret, mpi_errno);
}
}
mpi_errno = do_reduce(nbr_buffer + k - 1, recvbuf, k, myidx, count, datatype, op);
if (mpi_errno) {
/* for communication errors, just record the error but continue */
*errflag =
MPIX_ERR_PROC_FAILED ==
MPIR_ERR_GET_CLASS(mpi_errno) ? MPIR_ERR_PROC_FAILED : MPIR_ERR_OTHER;
MPIR_ERR_SET(mpi_errno, *errflag, "**fail");
MPIR_ERR_ADD(mpi_errno_ret, mpi_errno);
}
}
}
Expand Down Expand Up @@ -458,3 +380,43 @@ int MPIR_Allreduce_intra_recexch(const void *sendbuf,
fn_fail:
goto fn_exit;
}

static int find_myidx(int *nbrs, int k, int rank)
{
for (int i = 0; i < k - 1; i++) {
if (nbrs[i] > rank) {
return i;
}
}
return k - 1;
}

static int do_reduce(void **bufs, void *recvbuf, int k, int idx,
MPI_Aint count, MPI_Datatype datatype, MPI_Op op)
{
int mpi_errno = MPI_SUCCESS;

for (int i = 0; i < idx - 1; i++) {
mpi_errno = MPIR_Reduce_local(bufs[i], bufs[i + 1], count, datatype, op);
MPIR_ERR_CHECK(mpi_errno);
}
if (idx > 0) {
mpi_errno = MPIR_Reduce_local(bufs[idx - 1], recvbuf, count, datatype, op);
MPIR_ERR_CHECK(mpi_errno);
}
if (idx < k - 1) {
mpi_errno = MPIR_Reduce_local(recvbuf, bufs[idx], count, datatype, op);
MPIR_ERR_CHECK(mpi_errno);

for (int i = idx; i < k - 2; i++) {
mpi_errno = MPIR_Reduce_local(bufs[i], bufs[i + 1], count, datatype, op);
MPIR_ERR_CHECK(mpi_errno);
}

mpi_errno = MPIR_Localcopy(bufs[k - 2], count, datatype, recvbuf, count, datatype);
MPIR_ERR_CHECK(mpi_errno);
}

fn_fail:
return mpi_errno;
}
2 changes: 1 addition & 1 deletion src/mpi/coll/helper_fns.c
Expand Up @@ -587,7 +587,7 @@ int MPIC_Irecv(void *buf, MPI_Aint count, MPI_Datatype datatype, int source,
}


int MPIC_Waitall(int numreq, MPIR_Request * requests[], MPI_Status statuses[],
int MPIC_Waitall(int numreq, MPIR_Request * requests[], MPI_Status * statuses,
MPIR_Errflag_t * errflag)
{
int mpi_errno = MPI_SUCCESS;
Expand Down
1 change: 1 addition & 0 deletions src/mpi/errhan/errnames.txt
Expand Up @@ -112,6 +112,7 @@ also the value at index %d
deferred because of resource limits is not implemented
**notgenreq:Attempt to complete a request with MPI_GREQUEST_COMPLETE that \
was not started with MPI_GREQUEST_START
**cancelinactive:Attempt to cancel an inactive persistent request
**cancelunknown:Attempt to cancel an unknown type of request
**permop:Cannot free permanent MPI_Op
**attrsentinal:Internal fields in an attribute have been overwritten; \
Expand Down
4 changes: 2 additions & 2 deletions src/mpi/request/request_impl.c
Expand Up @@ -95,7 +95,7 @@ int MPIR_Cancel_impl(MPIR_Request * request_ptr)
MPIR_ERR_CHECK(mpi_errno);
}
} else {
MPIR_ERR_SETANDJUMP(mpi_errno, MPI_ERR_REQUEST, "**requestpersistactive");
MPIR_ERR_SETANDJUMP(mpi_errno, MPI_ERR_REQUEST, "**cancelinactive");
}
break;
}
Expand All @@ -106,7 +106,7 @@ int MPIR_Cancel_impl(MPIR_Request * request_ptr)
mpi_errno = MPID_Cancel_recv(request_ptr->u.persist.real_request);
MPIR_ERR_CHECK(mpi_errno);
} else {
MPIR_ERR_SETANDJUMP(mpi_errno, MPI_ERR_REQUEST, "**requestpersistactive");
MPIR_ERR_SETANDJUMP(mpi_errno, MPI_ERR_REQUEST, "**cancelinactive");
}
break;
}
Expand Down
18 changes: 18 additions & 0 deletions src/mpid/ch4/netmod/ofi/ofi_impl.h
Expand Up @@ -101,6 +101,24 @@ int MPIDI_OFI_handle_cq_error(int vni, int nic, ssize_t ret);
} while (_ret == -FI_EAGAIN); \
} while (0)

#define MPIDI_OFI_CALL_RETRY_RETURN(FUNC,vci_,ret) \
do { \
int _retry = MPIR_CVAR_CH4_OFI_MAX_EAGAIN_RETRY; \
while (1) { \
ret = FUNC; \
if (likely(ret != -FI_EAGAIN)) { \
break; \
} \
if (_retry > 0) { \
_retry--; \
MPIR_ERR_CHKANDJUMP(_retry == 0, mpi_errno, MPIX_ERR_EAGAIN, "**eagain"); \
} \
MPIDI_OFI_THREAD_CS_EXIT_VCI_OPTIONAL(vci_); \
mpi_errno = MPIDI_OFI_retry_progress(); \
MPIDI_OFI_THREAD_CS_ENTER_VCI_OPTIONAL(vci_); \
} \
} while (0)

/* per-vci macros - we'll transition into these macros once the locks are
* moved down to ofi-layer */
#define MPIDI_OFI_VCI_PROGRESS(vci_) \
Expand Down
4 changes: 3 additions & 1 deletion src/mpid/ch4/netmod/ofi/ofi_probe.h
Expand Up @@ -67,7 +67,9 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_do_iprobe(int source,
if (message) {
recv_flags |= FI_CLAIM;
}
MPIDI_OFI_CALL_RETURN(fi_trecvmsg(MPIDI_OFI_global.ctx[ctx_idx].rx, &msg, recv_flags), ofi_err);
ofi_err = 0;
MPIDI_OFI_CALL_RETRY_RETURN(fi_trecvmsg(MPIDI_OFI_global.ctx[ctx_idx].rx, &msg, recv_flags),
vni_dst, ofi_err);
if (ofi_err == -FI_ENOMSG) {
*flag = 0;
if (message)
Expand Down
2 changes: 1 addition & 1 deletion src/mpid/ch4/shm/posix/posix_init.c
Expand Up @@ -277,7 +277,7 @@ int MPIDI_POSIX_coll_init(int rank, int size)
mpi_errno = MPIR_Csel_create_from_buf(MPIDI_POSIX_coll_generic_json,
create_container, &MPIDI_global.shm.posix.csel_root);
} else {
mpi_errno = MPIR_Csel_create_from_file(MPIR_CVAR_CH4_COLL_SELECTION_TUNING_JSON_FILE,
mpi_errno = MPIR_Csel_create_from_file(MPIR_CVAR_CH4_POSIX_COLL_SELECTION_TUNING_JSON_FILE,
create_container, &MPIDI_global.shm.posix.csel_root);
}
MPIR_ERR_CHECK(mpi_errno);
Expand Down

0 comments on commit fba49d6

Please sign in to comment.