Skip to content

Commit

Permalink
UCP/WIREUP: Increase UCP_MAX_LANES to 64
Browse files Browse the repository at this point in the history
  • Loading branch information
ivankochin committed May 15, 2024
1 parent ba2da5e commit ffcca4a
Show file tree
Hide file tree
Showing 19 changed files with 102 additions and 63 deletions.
4 changes: 2 additions & 2 deletions src/ucp/core/ucp_ep.inl
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ static inline ucp_lane_index_t ucp_ep_num_lanes(ucp_ep_h ep)

static inline int ucp_ep_is_lane_p2p(ucp_ep_h ep, ucp_lane_index_t lane)
{
return ucp_ep_config(ep)->p2p_lanes & UCS_BIT(lane);
return !!(ucp_ep_config(ep)->p2p_lanes & UCS_BIT(lane));
}

static inline ucp_md_index_t ucp_ep_md_index(ucp_ep_h ep, ucp_lane_index_t lane)
Expand Down Expand Up @@ -239,7 +239,7 @@ static inline ucp_rsc_index_t
ucp_ep_config_get_dst_md_cmpt(const ucp_ep_config_key_t *key,
ucp_md_index_t dst_md_index)
{
unsigned idx = ucs_popcount(key->reachable_md_map & UCS_MASK(dst_md_index));
unsigned idx = ucs_bitmap2idx(key->reachable_md_map, dst_md_index);

return key->dst_md_cmpts[idx];
}
Expand Down
2 changes: 1 addition & 1 deletion src/ucp/core/ucp_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ UCP_UINT_TYPE(UCP_MD_INDEX_BITS) ucp_md_map_t;


/* Lanes */
#define UCP_MAX_LANES 16
#define UCP_MAX_LANES 64
#define UCP_MAX_FAST_PATH_LANES 5
#define UCP_MAX_SLOW_PATH_LANES (UCP_MAX_LANES - UCP_MAX_FAST_PATH_LANES)

Expand Down
5 changes: 2 additions & 3 deletions src/ucp/core/ucp_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -3470,8 +3470,7 @@ static int ucp_worker_do_ep_keepalive(ucp_worker_h worker, ucs_time_t now)
ucs_trace("ep %p: do keepalive on lane[%d]=%p ep->flags=0x%x", ep, lane,
uct_ep, ep->flags);

if (ucp_ep_is_am_keepalive(ep, rsc_index,
ucp_ep_config(ep)->p2p_lanes & UCS_BIT(lane))) {
if (ucp_ep_is_am_keepalive(ep, rsc_index, ucp_ep_is_lane_p2p(ep, lane))) {
status = ucp_ep_do_uct_ep_am_keepalive(ep, uct_ep, rsc_index);
} else {
status = uct_ep_check(uct_ep, 0, NULL);
Expand Down Expand Up @@ -3773,7 +3772,7 @@ ucp_wiface_process_for_each_lane(ucp_worker_h worker,

ucs_for_each_bit(lane, lane_map) {
ucs_assertv(lane < UCP_MAX_LANES,
"lane=%" PRIu8 ", lane_map=0x%" PRIx16, lane, lane_map);
"lane=%" PRIu8 ", lane_map=0x%" PRIx64, lane, lane_map);
rsc_index = ep_config->key.lanes[lane].rsc_index;
wiface = ucp_worker_iface(worker, rsc_index);
wiface_process(wiface);
Expand Down
8 changes: 7 additions & 1 deletion src/ucp/proto/proto_init.c
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,17 @@ void ucp_proto_common_add_ppln_range(ucp_proto_caps_t *caps,
frag_overhead =
ucs_linear_func_apply(ppln_perf[UCP_PROTO_PERF_TYPE_SINGLE], frag_size) -
ucs_linear_func_apply(ppln_perf[UCP_PROTO_PERF_TYPE_MULTI], frag_size);
ucs_assert(frag_overhead >= 0);

ucs_trace("frag-size: %zd" UCP_PROTO_TIME_FMT(frag_overhead), frag_size,
UCP_PROTO_TIME_ARG(frag_overhead));

/* In certain cases multi perf can be bigger than single (e.g. FAST_CMPL
* with huge send_post_overhead) so the fragment overhead can be negative
* according to the logic above. The simplest estimation is to ignore
* fragment overhead in that particular case.
*/
frag_overhead = ucs_max(frag_overhead, 0);

/* Apply the pipelining effect when sending multiple fragments */
ppln_perf[UCP_PROTO_PERF_TYPE_SINGLE] =
ucs_linear_func_add(ppln_perf[UCP_PROTO_PERF_TYPE_MULTI],
Expand Down
7 changes: 4 additions & 3 deletions src/ucp/proto/proto_multi.inl
Original file line number Diff line number Diff line change
Expand Up @@ -255,9 +255,10 @@ static UCS_F_ALWAYS_INLINE ucs_status_t ucp_proto_multi_lane_map_progress(
ucp_lane_index_t lane;
ucs_status_t status;

ucs_assertv(remaining_lane_map != 0, "req=%p *lane_p=%d lane_map=0x%x", req,
*lane_p, lane_map);
lane = ucs_ffs32(remaining_lane_map);
ucs_assertv(remaining_lane_map != 0,
"req=%p *lane_p=%d lane_map=0x%" PRIx64, req, *lane_p,
lane_map);
lane = ucs_ffs64(remaining_lane_map);

status = send_func(req, lane);
if (ucs_likely(status == UCS_OK)) {
Expand Down
31 changes: 18 additions & 13 deletions src/ucp/rma/flush.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@ ucp_ep_flush_request_update_uct_comp(ucp_request_t *req, int diff,
&req->send.state.uct_comp, req->send.state.uct_comp.count,
diff);
ucs_assertv(!(req->send.flush.started_lanes & new_started_lanes),
"req=%p started_lanes=0x%x new_started_lanes=0x%x", req,
req->send.flush.started_lanes, new_started_lanes);
"req=%p started_lanes=0x%" PRIx64
" new_started_lanes=0x%" PRIx64,
req, req->send.flush.started_lanes, new_started_lanes);

ucp_trace_req(req,
"flush update ep %p comp_count %d->%d num_lanes %d->%d "
"started_lanes 0x%x->0x%x",
"started_lanes 0x%" PRIx64 "->0x%" PRIx64,
req->send.ep, req->send.state.uct_comp.count,
req->send.state.uct_comp.count + diff,
req->send.flush.num_lanes, ucp_ep_num_lanes(req->send.ep),
Expand Down Expand Up @@ -99,7 +100,8 @@ static void ucp_ep_flush_progress(ucp_request_t *req)
}

ucp_trace_req(req,
"progress ep=%p flush flags=0x%x started_lanes=0x%x count=%d",
"progress ep=%p flush flags=0x%x started_lanes=0x%" PRIx64
" count=%d",
ep, ep->flags, req->send.flush.started_lanes,
req->send.state.uct_comp.count);

Expand Down Expand Up @@ -229,20 +231,22 @@ static void ucp_ep_flush_request_resched(ucp_ep_h ep, ucp_request_t *req)
(ucp_ep_config(ep)->p2p_lanes &&
ep->worker->context->config.ext.proto_request_reset)) {
ucs_assertv(!req->send.flush.started_lanes,
"req=%p flush started_lanes=0x%x", req,
"req=%p flush started_lanes=0x%" PRIx64, req,
req->send.flush.started_lanes);
} else {
ucs_assertv(!(UCS_BIT(req->send.lane) & req->send.flush.started_lanes),
"req=%p lane=%d started_lanes=0x%x", req, req->send.lane,
req->send.flush.started_lanes);
ucs_assertv(!(UCS_BIT(req->send.lane) &
req->send.flush.started_lanes),
"req=%p lane=%d started_lanes=0x%" PRIx64, req,
req->send.lane, req->send.flush.started_lanes);

/* Only lanes connected to iface can be started/flushed before
* wireup is done because connect2iface does not create wireup_ep
* without cm mode */
ucs_assertv(!(req->send.flush.started_lanes &
ucp_ep_config(ep)->p2p_lanes),
"req=%p flush started_lanes=0x%x p2p_lanes=0x%x", req,
req->send.flush.started_lanes,
"req=%p flush started_lanes=0x%" PRIx64
" p2p_lanes=0x%" PRIx64,
req, req->send.flush.started_lanes,
ucp_ep_config(ep)->p2p_lanes);
}

Expand Down Expand Up @@ -348,9 +352,10 @@ void ucp_ep_flush_request_ff(ucp_request_t *req, ucs_status_t status)
int num_comps = req->send.flush.num_lanes -
ucs_popcount(req->send.flush.started_lanes);

ucp_trace_req(req, "fast-forward flush, comp-=%d num_lanes %d started 0x%x",
num_comps, req->send.flush.num_lanes,
req->send.flush.started_lanes);
ucp_trace_req(
req, "fast-forward flush, comp-=%d num_lanes %d started 0x%" PRIx64,
num_comps, req->send.flush.num_lanes,
req->send.flush.started_lanes);

ucp_ep_flush_request_update_uct_comp(req, -num_comps,
UCS_MASK(req->send.flush.num_lanes) &
Expand Down
4 changes: 2 additions & 2 deletions src/ucp/rndv/rndv.c
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ static void ucp_rndv_zcopy_next_lane(ucp_request_t *rndv_req)
ucp_lane_map_t lane_map;
lane_map = lanes_map_all & ~UCS_MASK(rndv_req->send.multi_lane_idx + 1);

rndv_req->send.multi_lane_idx = ucs_ffs32((lane_map > 0) ? lane_map :
rndv_req->send.multi_lane_idx = ucs_ffs64((lane_map > 0) ? lane_map :
lanes_map_all);
}

Expand Down Expand Up @@ -694,7 +694,7 @@ static void ucp_rndv_req_init_lanes(ucp_request_t *req,
ucp_lane_map_t lanes_map)
{
req->send.rndv.zcopy.lanes_map_all = lanes_map;
req->send.multi_lane_idx = ucs_ffs32(lanes_map);
req->send.multi_lane_idx = ucs_ffs64(lanes_map);
}

static void ucp_rndv_req_init_zcopy_lane_map(ucp_request_t *rndv_req,
Expand Down
8 changes: 6 additions & 2 deletions src/ucp/rndv/rndv_rkey_ptr.c
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,12 @@ ucp_rndv_rkey_ptr_query_common(const ucp_proto_query_params_t *params,
const ucp_proto_rndv_rkey_ptr_priv_t *rpriv = params->priv;

ucp_proto_default_query(params, attr);
attr->lane_map = UCS_BIT(rpriv->spriv.super.lane) |
UCS_BIT(rpriv->ack.lane);

ucs_assert(rpriv->spriv.super.lane != UCP_NULL_LANE);
attr->lane_map = UCS_BIT(rpriv->spriv.super.lane);
if (rpriv->ack.lane != UCP_NULL_LANE) {
attr->lane_map |= UCS_BIT(rpriv->ack.lane);
}
}

static void
Expand Down
23 changes: 13 additions & 10 deletions src/ucp/wireup/select.c
Original file line number Diff line number Diff line change
Expand Up @@ -1570,12 +1570,11 @@ ucp_wireup_add_bw_lanes(const ucp_wireup_select_params_t *select_params,
ucp_wireup_select_context_t *select_ctx,
unsigned allow_extra_path)
{
ucp_rsc_index_t skip_dev_index = UCP_NULL_RESOURCE;
ucp_ep_h ep = select_params->ep;
ucp_context_h context = ep->worker->context;
ucp_wireup_dev_usage_count dev_count = {};
UCS_ARRAY_DEFINE_ONSTACK(ucp_proto_select_info_array_t, sinfo_array,
UCP_MAX_LANES);
ucp_proto_select_info_array_t sinfo_array = UCS_ARRAY_DYNAMIC_INITIALIZER;
ucp_rsc_index_t skip_dev_index = UCP_NULL_RESOURCE;
ucp_ep_h ep = select_params->ep;
ucp_context_h context = ep->worker->context;
ucp_wireup_dev_usage_count dev_count = {};
const uct_iface_attr_t *iface_attr;
const ucp_address_entry_t *ae;
ucs_status_t status;
Expand All @@ -1585,7 +1584,7 @@ ucp_wireup_add_bw_lanes(const ucp_wireup_select_params_t *select_params,
ucp_rsc_index_t rsc_index;
unsigned addr_index;
ucp_wireup_select_info_t *sinfo;
unsigned max_lanes;
unsigned max_lanes, num_lanes;
unsigned local_num_paths, remote_num_paths;

local_dev_bitmap = bw_info->local_dev_bitmap;
Expand All @@ -1605,7 +1604,7 @@ ucp_wireup_add_bw_lanes(const ucp_wireup_select_params_t *select_params,
* memory registration) */
while (ucs_array_length(&sinfo_array) < max_lanes) {
if (excl_lane == UCP_NULL_LANE) {
sinfo = ucs_array_append_fixed(&sinfo_array);
sinfo = ucs_array_append(&sinfo_array, break);
status = ucp_wireup_select_transport(select_ctx, select_params,
&bw_info->criteria, tl_bitmap,
UINT64_MAX, local_dev_bitmap,
Expand Down Expand Up @@ -1665,9 +1664,13 @@ ucp_wireup_add_bw_lanes(const ucp_wireup_select_params_t *select_params,
}

bw_info->criteria.arg = NULL; /* To suppress compiler warning */
num_lanes = ucp_wireup_add_fast_lanes(ep->worker, select_params,
&sinfo_array,
bw_info->criteria.lane_type,
select_ctx);

return ucp_wireup_add_fast_lanes(ep->worker, select_params, &sinfo_array,
bw_info->criteria.lane_type, select_ctx);
ucs_array_cleanup_dynamic(&sinfo_array);
return num_lanes;
}

static ucs_status_t
Expand Down
5 changes: 2 additions & 3 deletions src/ucp/wireup/wireup.c
Original file line number Diff line number Diff line change
Expand Up @@ -1169,7 +1169,7 @@ ucp_wireup_connect_lane(ucp_ep_h ep, unsigned ep_init_flags,
* create a wireup endpoint which will start connection establishment
* protocol using an auxiliary transport.
*/
if (ucp_ep_config(ep)->p2p_lanes & UCS_BIT(lane)) {
if (ucp_ep_is_lane_p2p(ep, lane)) {
return ucp_wireup_connect_lane_to_ep(ep, ep_init_flags, lane,
path_index, rsc_index, wiface,
remote_address);
Expand Down Expand Up @@ -1447,8 +1447,7 @@ ucp_wireup_check_config_intersect(ucp_ep_h ep, ucp_ep_config_key_t *new_key,
ucp_wireup_ep_set_aux(cm_wireup_ep,
ucp_wireup_ep_extract_next_ep(uct_ep),
old_key->lanes[reuse_lane].rsc_index,
ucp_ep_config(ep)->p2p_lanes &
UCS_BIT(reuse_lane));
ucp_ep_is_lane_p2p(ep, reuse_lane));

/* reset the UCT EP from the previous WIREUP lane and destroy its WIREUP EP,
* since it's not needed anymore in the new configuration, UCT EP will be
Expand Down
2 changes: 1 addition & 1 deletion src/ucp/wireup/wireup_cm.c
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ static ucs_status_t ucp_cm_ep_init_lanes(ucp_ep_h ep,
ucp_ep_set_lane(ep, lane_idx, uct_ep);

UCS_STATIC_BITMAP_SET(tl_bitmap, rsc_idx);
if (ucp_ep_config(ep)->p2p_lanes & UCS_BIT(lane_idx)) {
if (ucp_ep_is_lane_p2p(ep, lane_idx)) {
path_index = ucp_ep_get_path_index(ep, lane_idx);
status = ucp_wireup_ep_connect(ucp_ep_get_lane(ep, lane_idx), 0,
rsc_idx, path_index, 0, NULL);
Expand Down
2 changes: 1 addition & 1 deletion src/ucs/arch/bitops.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ BEGIN_C_DECLS
((sizeof(_n) <= 4) ? __builtin_clz((uint32_t)(_n)) : __builtin_clzl(_n))

/* Returns the number of bits lower than 'bit_index' that are set in 'mask'
* For example: ucs_idx2bitmap(mask=0xF0, idx=6) returns 2
* For example: ucs_bitmap2idx(mask=0xF0, idx=6) returns 2
*/
static inline unsigned ucs_bitmap2idx(uint64_t mask, unsigned bit_index)
{
Expand Down
2 changes: 1 addition & 1 deletion src/ucs/datastruct/array.h
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ ucs_array_old_buffer_set_null(void **old_buffer_p)
* fixed-size or not, the maximum capacity range is reduced by 1 bit.
*/
#define ucs_array_max_capacity(_array) \
UCS_MASK_SAFE((CHAR_BIT * sizeof((_array)->length)) - 1)
UCS_MASK((CHAR_BIT * sizeof((_array)->length)) - 1)


/**
Expand Down
6 changes: 3 additions & 3 deletions src/ucs/datastruct/pgtable.c
Original file line number Diff line number Diff line change
Expand Up @@ -531,11 +531,11 @@ static void ucs_pgtable_search_recurs(const ucs_pgtable_t *pgtable,
*last_p = region;

/* Assert that the region actually overlaps the address */
ucs_assertv(ucs_max(region->start, address) <=
ucs_min(region->end - 1, address + UCS_MASK_SAFE(order)),
ucs_assertv(ucs_max(region->start, address) <=
ucs_min(region->end - 1, address + UCS_MASK(order)),
UCS_PGT_REGION_FMT " address=0x%lx order=%d mask 0x%lx",
UCS_PGT_REGION_ARG(region), address, order,
(ucs_pgt_addr_t)UCS_MASK_SAFE(order));
(ucs_pgt_addr_t)UCS_MASK(order));

/* Call the callback */
cb(pgtable, region, arg);
Expand Down
2 changes: 1 addition & 1 deletion src/ucs/sys/compiler_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@
#define UCS_BIT(i) (1ul << (i))

/* Mask of bits 0..i-1 */
#define UCS_MASK(i) (UCS_BIT(i) - 1)
#define UCS_MASK(_i) (((_i) >= 64) ? ~0 : (UCS_BIT(_i) - 1))

/*
* Enable compiler checks for printf-like formatting.
Expand Down
3 changes: 0 additions & 3 deletions src/ucs/sys/math.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,6 @@ BEGIN_C_DECLS
#define ucs_signum(_n) \
(((_n) > (ucs_typeof(_n))0) - ((_n) < (ucs_typeof(_n))0))

#define UCS_MASK_SAFE(_i) \
(((_i) >= 64) ? ((uint64_t)(-1)) : UCS_MASK(_i))

#define ucs_div_round_up(_n, _d) \
(((_n) + (_d) - 1) / (_d))

Expand Down
4 changes: 3 additions & 1 deletion test/gtest/common/mem_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#define GTEST_MEM_BUFFER_H_

#include <ucs/memory/memory_type.h>
#include <ucs/debug/assert.h>
#include <ucs/sys/math.h>
#include <stdint.h>
#include <limits>
Expand Down Expand Up @@ -129,7 +130,8 @@ class mem_buffer {
size_t length, size_t offset,
const void *buffer, const void *orig_ptr)
{
const uint64_t mask = UCS_MASK_SAFE(length * 8 * sizeof(char));
ucs_assertv(length <= 8, "length=%zu", length);
const uint64_t mask = UCS_MASK(length * 8 * sizeof(char));

if (ucs_unlikely(actual != (expected & mask))) {
pattern_check_failed(expected, actual, length, mask, offset,
Expand Down
41 changes: 32 additions & 9 deletions test/gtest/ucp/test_ucp_sockaddr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1444,27 +1444,50 @@ UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_wireup, compare_cm_and_wireup_configs,
}
}

class test_max_lanes : public test_ucp_sockaddr {
template<size_t NumLanes> class test_max_lanes : public test_ucp_sockaddr {
public:
static void get_test_variants(std::vector<ucp_test_variant> &variants)
{
get_test_variants_cm_mode(variants, UCP_FEATURE_TAG, CONN_REQ_TAG,
"tag");
}

void init() override
{
auto num_lanes_str = ucs::to_string(NumLanes);
modify_config("MAX_RNDV_RAILS", num_lanes_str); // for protov1
modify_config("IB_NUM_PATHS", num_lanes_str, SETENV_IF_NOT_EXIST);
modify_config("TM_SW_RNDV", "y");

test_ucp_sockaddr::init();
}

void test_num_lanes()
{
/* get configuration index for EP created through CM */
listen_and_communicate(false, SEND_DIRECTION_C2S);

ASSERT_LE(NumLanes, (int)ucp_ep_num_lanes(sender().ep()));
ASSERT_LE(NumLanes, (int)ucp_ep_num_lanes(receiver().ep()));
}
};

UCS_TEST_SKIP_COND_P(test_max_lanes, 16_lanes_reconf, !cm_use_all_devices(),
"MAX_RNDV_LANES=16", "MAX_EAGER_LANES=16",
"IB_NUM_PATHS?=16", "TM_SW_RNDV=y")
using test_max_lanes_16 = test_max_lanes<16>;
UCS_TEST_SKIP_COND_P(test_max_lanes_16, lanes_reconf, !cm_use_all_devices())
{
/* get configuration index for EP created through CM */
listen_and_communicate(false, SEND_DIRECTION_C2S);
test_num_lanes();
}

UCP_INSTANTIATE_TEST_CASE_TLS(test_max_lanes_16, ib, "ib")

ASSERT_EQ(16, (int)ucp_ep_num_lanes(sender().ep()));
ASSERT_EQ(16, (int)ucp_ep_num_lanes(receiver().ep()));
using test_max_lanes_64 = test_max_lanes<64>;
UCS_TEST_SKIP_COND_P(test_max_lanes_64, lanes_reconf, !cm_use_all_devices())
{
test_num_lanes();
}

UCP_INSTANTIATE_TEST_CASE_TLS(test_max_lanes, ib, "ib")
/* TODO: Enable this test execution with UD and DC */
UCP_INSTANTIATE_TEST_CASE_TLS(test_max_lanes_64, rc, "rc")

class test_ucp_sockaddr_wireup_fail : public test_ucp_sockaddr_wireup {
protected:
Expand Down

0 comments on commit ffcca4a

Please sign in to comment.