Skip to content

Commit

Permalink
coll/tuned: Extend the collective tuning file to be topology-aware
Browse files Browse the repository at this point in the history
TUNED collectives selection should account for communicator topology
like HAN. The communicator size and message based algorithm selection logic
is no longer sufficient to achieve optimal performance when HAN is used.
The best algorithm differs between inter-node and intra-node for the same
communicator size and message size based on the tuning results.

This commit introduces topology dimension in both TUNED collective tuning
file rule and the algorithm selection logic. The topological level can
be intra-node, internode, or default(mixed).

Specify @inter_node or @intra_node after the message size in the
dynamic file rules. This is an optional feature so it will not break the
old file format. See the file example in coll_tuned_dynamic_file.h

Signed-off-by: Jessie Yang <jiaxiyan@amazon.com>
  • Loading branch information
jiaxiyan committed Feb 9, 2024
1 parent 92b82f6 commit 4c6b561
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 22 deletions.
28 changes: 14 additions & 14 deletions ompi/mca/coll/tuned/coll_tuned_decision_dynamic.c
Expand Up @@ -80,7 +80,7 @@ ompi_coll_tuned_allreduce_intra_dec_dynamic (const void *sbuf, void *rbuf, int c
dsize *= count;

alg = ompi_coll_tuned_get_target_method_params (tuned_module->com_rules[ALLREDUCE],
dsize, &faninout, &segsize, &ignoreme);
dsize, comm, &faninout, &segsize, &ignoreme);

if (alg) {
/* we have found a valid choice from the file based rules for this message size */
Expand Down Expand Up @@ -136,7 +136,7 @@ int ompi_coll_tuned_alltoall_intra_dec_dynamic(const void *sbuf, int scount,
dsize *= (ptrdiff_t)comsize * (ptrdiff_t)scount;

alg = ompi_coll_tuned_get_target_method_params (tuned_module->com_rules[ALLTOALL],
dsize, &faninout, &segsize, &max_requests);
dsize, comm, &faninout, &segsize, &max_requests);

if (alg) {
/* we have found a valid choice from the file based rules for this message size */
Expand Down Expand Up @@ -187,7 +187,7 @@ int ompi_coll_tuned_alltoallv_intra_dec_dynamic(const void *sbuf, const int *sco
int alg, faninout, segsize, max_requests;

alg = ompi_coll_tuned_get_target_method_params (tuned_module->com_rules[ALLTOALLV],
0, &faninout, &segsize, &max_requests);
0, comm, &faninout, &segsize, &max_requests);

if (alg) {
/* we have found a valid choice from the file based rules for this message size */
Expand Down Expand Up @@ -231,7 +231,7 @@ int ompi_coll_tuned_barrier_intra_dec_dynamic(struct ompi_communicator_t *comm,
int alg, faninout, segsize, ignoreme;

alg = ompi_coll_tuned_get_target_method_params (tuned_module->com_rules[BARRIER],
0, &faninout, &segsize, &ignoreme);
0, comm, &faninout, &segsize, &ignoreme);

if (alg) {
/* we have found a valid choice from the file based rules for this message size */
Expand Down Expand Up @@ -278,7 +278,7 @@ int ompi_coll_tuned_bcast_intra_dec_dynamic(void *buf, int count,
dsize *= count;

alg = ompi_coll_tuned_get_target_method_params (tuned_module->com_rules[BCAST],
dsize, &faninout, &segsize, &ignoreme);
dsize, comm, &faninout, &segsize, &ignoreme);

if (alg) {
/* we have found a valid choice from the file based rules for this message size */
Expand Down Expand Up @@ -332,7 +332,7 @@ int ompi_coll_tuned_reduce_intra_dec_dynamic( const void *sbuf, void *rbuf,
dsize *= count;

alg = ompi_coll_tuned_get_target_method_params (tuned_module->com_rules[REDUCE],
dsize, &faninout, &segsize, &max_requests);
dsize, comm, &faninout, &segsize, &max_requests);

if (alg) {
/* we have found a valid choice from the file based rules for this message size */
Expand Down Expand Up @@ -388,7 +388,7 @@ int ompi_coll_tuned_reduce_scatter_intra_dec_dynamic(const void *sbuf, void *rbu
dsize *= count;

alg = ompi_coll_tuned_get_target_method_params (tuned_module->com_rules[REDUCESCATTER],
dsize, &faninout,
dsize, comm, &faninout,
&segsize, &ignoreme);
if (alg) {
/* we have found a valid choice from the file based rules for this message size */
Expand Down Expand Up @@ -442,7 +442,7 @@ int ompi_coll_tuned_reduce_scatter_block_intra_dec_dynamic(const void *sbuf, voi
dsize *= rcount * size;

alg = ompi_coll_tuned_get_target_method_params(tuned_module->com_rules[REDUCESCATTERBLOCK],
dsize, &faninout,
dsize, comm, &faninout,
&segsize, &ignoreme);
if (alg) {
/* we have found a valid choice from the file based rules for this message size */
Expand Down Expand Up @@ -500,7 +500,7 @@ int ompi_coll_tuned_allgather_intra_dec_dynamic(const void *sbuf, int scount,
dsize *= (ptrdiff_t)comsize * (ptrdiff_t)scount;

alg = ompi_coll_tuned_get_target_method_params (tuned_module->com_rules[ALLGATHER],
dsize, &faninout, &segsize, &ignoreme);
dsize, comm, &faninout, &segsize, &ignoreme);
if (alg) {
/* we have found a valid choice from the file based rules for
this message size */
Expand Down Expand Up @@ -565,7 +565,7 @@ int ompi_coll_tuned_allgatherv_intra_dec_dynamic(const void *sbuf, int scount,
per_rank_size = total_size / comsize;

alg = ompi_coll_tuned_get_target_method_params (tuned_module->com_rules[ALLGATHERV],
per_rank_size, &faninout, &segsize, &ignoreme);
per_rank_size, comm, &faninout, &segsize, &ignoreme);
if (alg) {
/* we have found a valid choice from the file based rules for
this message size */
Expand Down Expand Up @@ -618,7 +618,7 @@ int ompi_coll_tuned_gather_intra_dec_dynamic(const void *sbuf, int scount,
dsize *= scount * comsize;

alg = ompi_coll_tuned_get_target_method_params (tuned_module->com_rules[GATHER],
dsize, &faninout, &segsize, &max_requests);
dsize, comm, &faninout, &segsize, &max_requests);

if (alg) {
/* we have found a valid choice from the file based rules for this message size */
Expand Down Expand Up @@ -668,7 +668,7 @@ int ompi_coll_tuned_scatter_intra_dec_dynamic(const void *sbuf, int scount,
dsize *= scount * comsize;

alg = ompi_coll_tuned_get_target_method_params (tuned_module->com_rules[SCATTER],
dsize, &faninout, &segsize, &max_requests);
dsize, comm, &faninout, &segsize, &max_requests);

if (alg) {
/* we have found a valid choice from the file based rules for this message size */
Expand Down Expand Up @@ -714,7 +714,7 @@ int ompi_coll_tuned_exscan_intra_dec_dynamic(const void *sbuf, void* rbuf, int c
dsize *= comsize;

alg = ompi_coll_tuned_get_target_method_params (tuned_module->com_rules[EXSCAN],
dsize, &faninout, &segsize, &max_requests);
dsize, comm, &faninout, &segsize, &max_requests);

if (alg) {
/* we have found a valid choice from the file based rules for this message size */
Expand Down Expand Up @@ -758,7 +758,7 @@ int ompi_coll_tuned_scan_intra_dec_dynamic(const void *sbuf, void* rbuf, int cou
dsize *= comsize;

alg = ompi_coll_tuned_get_target_method_params (tuned_module->com_rules[SCAN],
dsize, &faninout, &segsize, &max_requests);
dsize, comm, &faninout, &segsize, &max_requests);

if (alg) {
/* we have found a valid choice from the file based rules for this message size */
Expand Down
40 changes: 38 additions & 2 deletions ompi/mca/coll/tuned/coll_tuned_dynamic_file.c
Expand Up @@ -42,6 +42,7 @@
static int fileline=0; /* used for verbose error messages */

#define getnext(fptr, pval) ompi_coll_base_file_getnext_long(fptr, &fileline, pval)
#define getnext_string(fptr, pval) ompi_coll_base_file_getnext_string(fptr, &fileline, pval)

/*
* Reads a rule file called fname
Expand All @@ -59,7 +60,7 @@ int ompi_coll_tuned_read_rules_config_file (char *fname, ompi_coll_alg_rule_t**
{
long CI, NCS, CS, ALG, NMS, FANINOUT, X, MS, SS;
FILE *fptr = (FILE*) NULL;
int x, ncs, nms;
int x, ncs, nms, topo_lvl;

ompi_coll_alg_rule_t *alg_rules = (ompi_coll_alg_rule_t*) NULL; /* complete table of rules */

Expand Down Expand Up @@ -176,11 +177,46 @@ int ompi_coll_tuned_read_rules_config_file (char *fname, ompi_coll_alg_rule_t**

msg_p = &(com_p->msg_rules[nms]);

if( (getnext (fptr, &MS) < 0) || (MS < 0) ) {
char *msg_topo = NULL;
if( getnext_string(fptr, &msg_topo) < 0 ) {
OPAL_OUTPUT((ompi_coll_tuned_stream,"Could not read message size/name of a topo level for collective ID %ld com rule %d msg rule %d at around line %d\n",
CI, ncs, nms, fileline));
goto on_file_error;
}

char *temp_str = strdup(msg_topo);
const char *delimiter = "@";
char *msg_size_str = strtok(temp_str, delimiter);
if (NULL == msg_size_str) {
OPAL_OUTPUT((ompi_coll_tuned_stream,"Could not read message size for collective ID %ld com rule %d msg rule %d at around line %d\n", CI, ncs, nms, fileline));
goto on_file_error;
}

char *endptr;
errno = 0;
MS = strtol(msg_size_str, &endptr, 10);
if (errno != 0 || (endptr == msg_size_str) || ('\0' != *endptr ) || MS < 0) {
OPAL_OUTPUT((ompi_coll_tuned_stream,"Invalid message size for collective ID %ld com rule %d msg rule %d at around line %d\n", CI, ncs, nms, fileline));
goto on_file_error;
}
msg_p->msg_size = (size_t)MS;

char *topo_lvl_name = strtok(NULL, delimiter);
if (NULL == topo_lvl_name) {
msg_p->topologic_level = DEFAULT;
} else {
topo_lvl = mca_coll_tuned_topo_name_to_id(topo_lvl_name);
if (topo_lvl < 0) {
char *endp;
topo_lvl = (int)strtol(topo_lvl_name, &endp, 10);
if (('\0' != *endp ) || (topo_lvl < DEFAULT) || (topo_lvl >= NB_TOPO_LVL)) {
OPAL_OUTPUT((ompi_coll_tuned_stream,"Found an error at line %d: unknown topo level '%s'\n", fileline, topo_lvl_name));
goto on_file_error;
}
}
msg_p->topologic_level = (TOPO_LVL_T)topo_lvl;
}
free (temp_str);

if( (getnext (fptr, &ALG) < 0) || (ALG < 0) ) {
OPAL_OUTPUT((ompi_coll_tuned_stream,"Could not read target algorithm method for collective ID %ld com rule %d msg rule %d at around line %d\n", CI, ncs, nms, fileline));
Expand Down
58 changes: 58 additions & 0 deletions ompi/mca/coll/tuned/coll_tuned_dynamic_file.h
Expand Up @@ -24,6 +24,64 @@
/* also need the dynamic rule structures */
#include "coll_tuned_dynamic_rules.h"

/*
* @file
*
* #######################
* # Dynamic file format #
* #######################
* File defined rules precede MCA parameter defined rule.
* To activate file reader, the MCA parameter use_dynamic_file_rules must
* be set to true. The path to the dynamic file is given by the MCA
* parameter dynamic_rules_filename. If there is any issue reading the file,
* the file is considered as invalid and only MCA parameter defined rules are
* used. If a potential logical issue is identified in the file, a
* warning is printed but the file is not considered as invalid.
*
*
* Here is an example of a dynamic rules file:
* 1 # number of collectives
* 3 # Collective identifier 1 (defined in ompi/mca/coll/base/coll_base_functions.h)
* 2 # number of comm sizes
* 1 # comm size 1
* 1 # number of message size rules
* 0 1 0 0 # for message size 0, choose algorithm 1, topo 0, 0 segmentation
* 8 # comm size 8
* 4 # number of message size rules
* 0 1 0 0 # for message size 0, choose algorithm 1, topo 0, 0 segmentation
* 32768 2 0 0 # for message size 32768, choose algorithm 2, topo 0, 0 segmentation
* 262144 1 0 0 # for message size 262144, choose algorithm 1, topo 0, 0 segmentation
* 524288 2 0 0 # for message size 524288, choose algorithm 2, topo 0, 0 segmentation
*
* Optionally, specify topological level in the message size rules,
* which can be singlenode or disjoint.
* 1 # number of collectives
* 3 # Collective identifier 1 (defined in ompi/mca/coll/base/coll_base_functions.h)
* 2 # number of comm sizes
* 1 # comm size 1
* 2 # number of message size rules
* 0@singlenode 2 0 0 # for message size 0 and single node communication, choose algorithm 2, topo 0, 0 segmentation
* 0@disjoint 1 0 0 # for message size 0 and disjoint communication, choose algorithm 1, topo 0, 0 segmentation
* 8 # comm size 8
* 6 # number of message size rules
* 0@singlenode 2 0 0 # for message size 0 and single node communication, choose algorithm 2, topo 0, 0 segmentation
* 0@disjoint 1 0 0 # for message size 0 and disjoint communication, choose algorithm 1, topo 0, 0 segmentation
* 32768@singlenode 2 0 0 # for message size 32768 and single node communication, choose algorithm 2, topo 0, 0 segmentation
* 32768@disjoint 1 0 0 # for message size 32768 and disjoint communication, choose algorithm 1, topo 0, 0 segmentation
* 262144 1 0 0 # for message size 262144, choose algorithm 1, topo 0, 0 segmentation
* 524288 2 0 0 # for message size 524288, choose algorithm 2, topo 0, 0 segmentation
*
*
* Note that comm size and message size rules define minimal
* values and each new rule precede every other rules. This property
* implies that this types of rules must be sorted by increasing value.
* If they are not, some rules wont be used.
*
* The counts define a stack. If the count is set to x, the reader will
* attempt to read x rules of the corresponding type. If a set of rules
* has an invalid count, this is an error and it might not be detected by
* the reader.
*/

BEGIN_C_DECLS

Expand Down
48 changes: 43 additions & 5 deletions ompi/mca/coll/tuned/coll_tuned_dynamic_rules.c
Expand Up @@ -38,6 +38,30 @@
#include "ompi/mca/coll/base/coll_base_util.h"


/*
* topo level conversions both ways; str <-> id
* An enum is used for conversions.
*/
static mca_base_var_enum_value_t level_enumerator[] = {
{ SINGLE_NODE, "singlenode" },
{ DISJOINT, "disjoint" },
{ 0 }
};

/*
* Stringifier for topological level
*/
int mca_coll_tuned_topo_name_to_id(const char *topo_level_name)
{
for (int i = 0; level_enumerator[i].string != NULL; i++) {
if (0 == strcmp(topo_level_name, level_enumerator[i].string)) {
return i;
}
}
return -1;
}


ompi_coll_alg_rule_t* ompi_coll_tuned_mk_alg_rules (int n_alg)
{
int i;
Expand Down Expand Up @@ -87,6 +111,7 @@ ompi_coll_msg_rule_t* ompi_coll_tuned_mk_msg_rules (int n_msg_rules, int alg_rul
msg_rules[i].com_rule_id = com_rule_id;
msg_rules[i].msg_rule_id = i;
msg_rules[i].msg_size = 0; /* unknown */
msg_rules[i].topologic_level = DEFAULT; /* unknown & default */
msg_rules[i].result_alg = 0; /* unknown */
msg_rules[i].result_topo_faninout = 0; /* unknown */
msg_rules[i].result_segsize = 0; /* unknown */
Expand Down Expand Up @@ -327,8 +352,9 @@ ompi_coll_com_rule_t* ompi_coll_tuned_get_com_rule_ptr (ompi_coll_alg_rule_t* ru

/*
* This function takes a com_rule ptr (from the communicators coll tuned data structure)
* (Which is chosen for a particular MPI collective)
* and a (total_)msg_size and it returns (0) and a algorithm to use and a recommended topo faninout and segment size
* (Which is chosen for a particular MPI collective),
* a (total_)msg_size, and the communicator(comm) to which the process belongs,
* and it returns (0) and a algorithm to use and a recommended topo faninout and segment size
* all based on the user supplied rules
*
* Just like the above functions it uses a less than or equal msg size
Expand All @@ -340,8 +366,9 @@ ompi_coll_com_rule_t* ompi_coll_tuned_get_com_rule_ptr (ompi_coll_alg_rule_t* ru
*
*/

int ompi_coll_tuned_get_target_method_params (ompi_coll_com_rule_t* base_com_rule, size_t mpi_msgsize, int *result_topo_faninout,
int* result_segsize, int* max_requests)
int ompi_coll_tuned_get_target_method_params (ompi_coll_com_rule_t* base_com_rule, const size_t mpi_msgsize,
const struct ompi_communicator_t *comm,
int *result_topo_faninout, int* result_segsize, int* max_requests)
{
ompi_coll_msg_rule_t* msg_p = (ompi_coll_msg_rule_t*) NULL;
ompi_coll_msg_rule_t* best_msg_p = (ompi_coll_msg_rule_t*) NULL;
Expand All @@ -357,12 +384,18 @@ int ompi_coll_tuned_get_target_method_params (ompi_coll_com_rule_t* base_com_rul
/* make a copy of the first msg rule */
best_msg_p = msg_p = base_com_rule->msg_rules;
i = 0;
bool found_rules = false;

while (i<base_com_rule->n_msg_sizes) {
/* OPAL_OUTPUT((ompi_coll_tuned_stream,"checking mpi_msgsize %d against com_id %d msg_id %d index %d msg_size %d", */
/* mpi_msgsize, msg_p->com_rule_id, msg_p->msg_rule_id, i, msg_p->msg_size)); */
if (msg_p->msg_size <= mpi_msgsize) {
best_msg_p = msg_p;
if (msg_p->topologic_level == DEFAULT ||
(msg_p->topologic_level == SINGLE_NODE && !ompi_group_have_remote_peers(comm->c_local_group)) ||
(msg_p->topologic_level == DISJOINT && OMPI_COMM_IS_INTRA(comm) && OMPI_COMM_IS_DISJOINT_SET(comm) && OMPI_COMM_IS_DISJOINT(comm))) {
best_msg_p = msg_p;
found_rules = true;
}
/* OPAL_OUTPUT((ompi_coll_tuned_stream(":ok\n")); */
}
else {
Expand All @@ -374,6 +407,11 @@ int ompi_coll_tuned_get_target_method_params (ompi_coll_com_rule_t* base_com_rul
i++;
}

if (!found_rules) {
/* Fall back to fixed rules if there is no corresponding topological rule in the file */
return (0);
}

OPAL_OUTPUT((ompi_coll_tuned_stream,"Selected the following msg rule id %d\n", best_msg_p->msg_rule_id));
ompi_coll_tuned_dump_msg_rule (best_msg_p);

Expand Down
15 changes: 14 additions & 1 deletion ompi/mca/coll/tuned/coll_tuned_dynamic_rules.h
Expand Up @@ -26,6 +26,15 @@
BEGIN_C_DECLS


/* Topologic levels */
typedef enum TOPO_LVL {
DEFAULT = -1,
SINGLE_NODE,
DISJOINT,
NB_TOPO_LVL
} TOPO_LVL_T;


typedef struct msg_rule_s {
/* paranoid / debug */
int mpi_comsize; /* which MPI comm size is this for */
Expand All @@ -37,6 +46,7 @@ typedef struct msg_rule_s {

/* RULE */
size_t msg_size; /* message size */
TOPO_LVL_T topologic_level; /* single node or disjoint */

/* RESULT */
int result_alg; /* result algorithm to use */
Expand Down Expand Up @@ -94,10 +104,13 @@ int ompi_coll_tuned_free_all_rules (ompi_coll_alg_rule_t* alg_p, int n_algs);

ompi_coll_com_rule_t* ompi_coll_tuned_get_com_rule_ptr (ompi_coll_alg_rule_t* rules, int alg_id, int mpi_comsize);

int ompi_coll_tuned_get_target_method_params (ompi_coll_com_rule_t* base_com_rule, size_t mpi_msgsize,
int ompi_coll_tuned_get_target_method_params (ompi_coll_com_rule_t* base_com_rule, const size_t mpi_msgsize,
const struct ompi_communicator_t *comm,
int* result_topo_faninout, int* result_segsize,
int* max_requests);

/* Miscellaneous function */
int mca_coll_tuned_topo_name_to_id(const char *topo_level_name);

END_C_DECLS
#endif /* MCA_COLL_TUNED_DYNAMIC_RULES_H_HAS_BEEN_INCLUDED */
Expand Down

0 comments on commit 4c6b561

Please sign in to comment.