Skip to content

Commit

Permalink
Merge pull request #6781 from dycz0fx/inter_coll
Browse files Browse the repository at this point in the history
coll: add a new bcast composition

Approved-by: Hui Zhou
  • Loading branch information
hzhou committed May 2, 2024
2 parents 67066d1 + bcdb21e commit 5ae17c4
Show file tree
Hide file tree
Showing 2 changed files with 145 additions and 25 deletions.
9 changes: 9 additions & 0 deletions src/mpid/ch4/src/ch4_coll.h
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,15 @@ MPL_STATIC_INLINE_PREFIX int MPID_Bcast(void *buffer, MPI_Aint count, MPI_Dataty
mpi_errno =
MPIDI_Bcast_intra_composition_gamma(buffer, count, datatype, root, comm, errflag);
break;
case 4:
MPII_COLLECTIVE_FALLBACK_CHECK(comm->rank,
(comm->comm_kind == MPIR_COMM_KIND__INTRACOMM) &&
(comm->hierarchy_kind ==
MPIR_COMM_HIERARCHY_KIND__PARENT), mpi_errno,
"Bcast composition delta cannot be applied.\n");
mpi_errno =
MPIDI_Bcast_intra_composition_delta(buffer, count, datatype, root, comm, errflag);
break;
default:
mpi_errno =
MPIDI_Bcast_allcomm_composition_json(buffer, count, datatype, root, comm, errflag);
Expand Down
161 changes: 136 additions & 25 deletions src/mpid/ch4/src/ch4_coll_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -233,35 +233,39 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_Bcast_intra_composition_alpha(void *buffer, M
MPI_Aint nbytes, recvd_size, type_size;
#endif

if (comm->node_roots_comm == NULL && comm->rank == root) {
coll_ret = MPIC_Send(buffer, count, datatype, 0, MPIR_BCAST_TAG, comm->node_comm, errflag);
MPIR_ERR_COLL_CHECKANDCONT(coll_ret, errflag, mpi_errno);
}

if (comm->node_roots_comm != NULL && comm->rank != root &&
MPIR_Get_intranode_rank(comm, root) != -1) {
int intra_root = MPIR_Get_intranode_rank(comm, root);
/* if node_comm exists and root is not local leader (node_comm rank 0)*/
if (intra_root != -1 && intra_root != 0) {
/* root sends message to local leader (node_comm rank 0) */
if (comm->rank == root) {
coll_ret = MPIC_Send(buffer, count, datatype, 0, MPIR_BCAST_TAG, comm->node_comm, errflag);
MPIR_ERR_COLL_CHECKANDCONT(coll_ret, errflag, mpi_errno);
}
/* local leader receives message from root */
if (comm->node_roots_comm != NULL){
#ifndef HAVE_ERROR_CHECKING
coll_ret =
MPIC_Recv(buffer, count, datatype, MPIR_Get_intranode_rank(comm, root), MPIR_BCAST_TAG,
comm->node_comm, MPI_STATUS_IGNORE);
MPIR_ERR_COLL_CHECKANDCONT(coll_ret, errflag, mpi_errno);
coll_ret =
MPIC_Recv(buffer, count, datatype, intra_root, MPIR_BCAST_TAG, comm->node_comm,
MPI_STATUS_IGNORE);
MPIR_ERR_COLL_CHECKANDCONT(coll_ret, errflag, mpi_errno);
#else
coll_ret =
MPIC_Recv(buffer, count, datatype, MPIR_Get_intranode_rank(comm, root), MPIR_BCAST_TAG,
comm->node_comm, &status);
MPIR_ERR_COLL_CHECKANDCONT(coll_ret, errflag, mpi_errno);

MPIR_Datatype_get_size_macro(datatype, type_size);
nbytes = type_size * count;
/* check that we received as much as we expected */
MPIR_Get_count_impl(&status, MPI_BYTE, &recvd_size);
if (recvd_size != nbytes) {
MPIR_ERR_SET2(coll_ret, MPI_ERR_OTHER,
"**collective_size_mismatch",
"**collective_size_mismatch %d %d", recvd_size, nbytes);
coll_ret =
MPIC_Recv(buffer, count, datatype, intra_root, MPIR_BCAST_TAG, comm->node_comm,
&status);
MPIR_ERR_COLL_CHECKANDCONT(coll_ret, errflag, mpi_errno);
}

MPIR_Datatype_get_size_macro(datatype, type_size);
nbytes = type_size * count;
/* check that we received as much as we expected */
MPIR_Get_count_impl(&status, MPI_BYTE, &recvd_size);
if (recvd_size != nbytes) {
MPIR_ERR_SET2(coll_ret, MPI_ERR_OTHER,
"**collective_size_mismatch",
"**collective_size_mismatch %d %d", recvd_size, nbytes);
MPIR_ERR_COLL_CHECKANDCONT(coll_ret, errflag, mpi_errno);
}
#endif
}
}

MPIR_GPU_query_pointer_attr(buffer, &attr);
Expand Down Expand Up @@ -416,6 +420,113 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_Bcast_intra_composition_gamma(void *buffer, M
goto fn_exit;
}

/*
* This type of composition performs well for GPU bcast as it can utilize the direct links between
* the GPUs in the same node. It has four steps:
* 1. Root copies data to the host.
* 2. Inter-node bcast among the leaders on the host.
* 3. Leaders copy data to GPU.
* 4. Intra-node bcast in each node on the GPU buffer.
*/
MPL_STATIC_INLINE_PREFIX int MPIDI_Bcast_intra_composition_delta(void *buffer, MPI_Aint count,
MPI_Datatype datatype,
int root, MPIR_Comm * comm,
MPIR_Errflag_t errflag)
{
int mpi_errno = MPI_SUCCESS;
int coll_ret = MPI_SUCCESS;
void *host_buffer = NULL;
void *saved_buffer = buffer;
MPL_pointer_attr_t attr;
MPI_Aint size, shift;

#ifdef HAVE_ERROR_CHECKING
MPI_Status status;
MPI_Aint nbytes, recvd_size, type_size;
#endif

int intra_root = MPIR_Get_intranode_rank(comm, root);
/* if node_comm exists and root is not local leader (node_comm rank 0)*/
if (intra_root != -1 && intra_root != 0) {
/* root sends message to local leader (node_comm rank 0) */
if (comm->rank == root) {
coll_ret = MPIC_Send(buffer, count, datatype, 0, MPIR_BCAST_TAG, comm->node_comm, errflag);
MPIR_ERR_COLL_CHECKANDCONT(coll_ret, errflag, mpi_errno);
}
/* local leader receives message from root */
if (comm->node_roots_comm != NULL){
#ifndef HAVE_ERROR_CHECKING
coll_ret =
MPIC_Recv(buffer, count, datatype, intra_root, MPIR_BCAST_TAG, comm->node_comm,
MPI_STATUS_IGNORE);
MPIR_ERR_COLL_CHECKANDCONT(coll_ret, errflag, mpi_errno);
#else
coll_ret =
MPIC_Recv(buffer, count, datatype, intra_root, MPIR_BCAST_TAG, comm->node_comm,
&status);
MPIR_ERR_COLL_CHECKANDCONT(coll_ret, errflag, mpi_errno);

MPIR_Datatype_get_size_macro(datatype, type_size);
nbytes = type_size * count;
/* check that we received as much as we expected */
MPIR_Get_count_impl(&status, MPI_BYTE, &recvd_size);
if (recvd_size != nbytes) {
MPIR_ERR_SET2(coll_ret, MPI_ERR_OTHER,
"**collective_size_mismatch",
"**collective_size_mismatch %d %d", recvd_size, nbytes);
MPIR_ERR_COLL_CHECKANDCONT(coll_ret, errflag, mpi_errno);
}
#endif
}
}

MPIR_GPU_query_pointer_attr(buffer, &attr);

MPIDI_Coll_calculate_size_shift(count, datatype, &size, &shift);

/* only node leaders need to allocate a host buffer */
if (attr.type == MPL_GPU_POINTER_DEV && size <= MPIR_CVAR_CH4_GPU_COLL_SWAP_BUFFER_SZ
&& comm->node_roots_comm != NULL) {
MPIDU_genq_private_pool_alloc_cell(MPIDI_global.gpu_coll_pool, (void **) &host_buffer);
if (host_buffer != NULL) {
host_buffer = (char *) host_buffer - shift;
MPIR_gpu_host_swap_gpu(buffer, count, datatype, attr, host_buffer);
buffer = host_buffer;
}
}

if (comm->node_roots_comm != NULL) {
coll_ret =
MPIDI_NM_mpi_bcast(buffer, count, datatype, MPIR_Get_internode_rank(comm, root),
comm->node_roots_comm, errflag);
MPIR_ERR_COLL_CHECKANDCONT(coll_ret, errflag, mpi_errno);

/* Node leaders copy data to GPU */
buffer = saved_buffer;
if (host_buffer != NULL && comm->rank != root) {
MPIR_gpu_swap_back_gpu(host_buffer, buffer, count, datatype, attr);
host_buffer = (char *) host_buffer + shift;
MPIDU_genq_private_pool_free_cell(MPIDI_global.gpu_coll_pool, host_buffer);
} else if (host_buffer != NULL && comm->rank == root) {
host_buffer = (char *) host_buffer + shift;
MPIDU_genq_private_pool_free_cell(MPIDI_global.gpu_coll_pool, host_buffer);
}
}

/* intra-node Bcast */
if (comm->node_comm != NULL) {
#ifndef MPIDI_CH4_DIRECT_NETMOD
coll_ret = MPIDI_SHM_mpi_bcast(buffer, count, datatype, 0, comm->node_comm, errflag);
MPIR_ERR_COLL_CHECKANDCONT(coll_ret, errflag, mpi_errno);
#else
coll_ret = MPIDI_NM_mpi_bcast(buffer, count, datatype, 0, comm->node_comm, errflag);
MPIR_ERR_COLL_CHECKANDCONT(coll_ret, errflag, mpi_errno);
#endif /* MPIDI_CH4_DIRECT_NETMOD */
}

return mpi_errno;
}

MPL_STATIC_INLINE_PREFIX int MPIDI_Allreduce_intra_composition_alpha(const void *sendbuf,
void *recvbuf, MPI_Aint count,
MPI_Datatype datatype,
Expand Down

0 comments on commit 5ae17c4

Please sign in to comment.