Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

prov/efa: Make num_runt_bytes_in_flight per domain #9967

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions prov/efa/src/efa_domain.c
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ static int efa_domain_init_rdm(struct efa_domain *efa_domain, struct fi_info *in
efa_domain->rdm_cq_size = MAX(info->rx_attr->size + info->tx_attr->size,
efa_env.cq_size);
efa_domain->num_read_msg_in_flight = 0;
efa_domain->num_runt_bytes_in_flight = 0;
return 0;
}

Expand Down
8 changes: 8 additions & 0 deletions prov/efa/src/efa_domain.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,15 @@ struct efa_domain {
size_t rdm_cq_size;
struct dlist_entry list_entry; /* linked to g_efa_domain_list */
struct ofi_genlock srx_lock; /* shared among peer providers */
/**
* @brief number of messages that are using read based protocol
*/
uint64_t num_read_msg_in_flight;
/**
* @brief number of bytes that has been sent as part of runting protocols
* @details this value is capped by efa_env.efa_runt_size
*/
int64_t num_runt_bytes_in_flight;
};

extern struct dlist_entry g_efa_domain_list;
Expand Down
4 changes: 4 additions & 0 deletions prov/efa/src/rdm/efa_rdm_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,10 @@ ssize_t efa_rdm_ep_post_queued_pkts(struct efa_rdm_ep *ep,

size_t efa_rdm_ep_get_memory_alignment(struct efa_rdm_ep *ep, enum fi_hmem_iface iface);

size_t efa_rdm_ep_get_runt_size(struct efa_rdm_ep *ep, struct efa_rdm_ope *ope);

int efa_rdm_ep_select_readbase_rtm(struct efa_rdm_ep *ep, struct efa_rdm_ope *ope);

static inline
struct efa_domain *efa_rdm_ep_domain(struct efa_rdm_ep *ep)
{
Expand Down
56 changes: 56 additions & 0 deletions prov/efa/src/rdm/efa_rdm_ep_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -682,3 +682,59 @@ size_t efa_rdm_ep_get_memory_alignment(struct efa_rdm_ep *ep, enum fi_hmem_iface
return memory_alignment;
}

/**
* @brief Get the runt size for a given ep and ope
*
* @param ep efa rdm ep
* @param ope efa rdm ope
* @return size_t the number of bytes that can be runt
*/
size_t efa_rdm_ep_get_runt_size(struct efa_rdm_ep *ep, struct efa_rdm_ope *ope)
{
struct efa_domain *domain = efa_rdm_ep_domain(ep);
struct efa_hmem_info *hmem_info;
size_t runt_size;
size_t memory_alignment;
int iface;

hmem_info = domain->hmem_info;
iface = ope->desc[0] ? ((struct efa_mr*) ope->desc[0])->peer.iface : FI_HMEM_SYSTEM;

if (domain->hmem_info[iface].runt_size < domain->num_runt_bytes_in_flight)
return 0;

runt_size = MIN(hmem_info[iface].runt_size - domain->num_runt_bytes_in_flight, ope->total_len);
memory_alignment = efa_rdm_ep_get_memory_alignment(ep, iface);
/*
* runt size must be aligned because:
* 1. For LL128 protocol, the size to be copied on the receiver side must be 128-multiple,
* 128 is the alignment in this case.
* 2. For non-LL128 protocol, using aligned runt size has optimal performance for data copy.
* Note the returned value can be 0. In that case we will not use runting read protocol.
*/
return (runt_size & ~(memory_alignment - 1));
}

/**
* @brief Determine which Read based protocol to use
*
* @param[in] ep efa rdm ep
* @param[in] efa_rdm_ope efa rdm ope
* @return The read-based protocol to use based on inputs.
*/
int efa_rdm_ep_select_readbase_rtm(struct efa_rdm_ep *ep, struct efa_rdm_ope *ope)
{
int op = ope->op;

assert(op == ofi_op_tagged || op == ofi_op_msg);

if (efa_rdm_ep_domain(ep)->num_read_msg_in_flight == 0 &&
efa_rdm_ep_get_runt_size(ep, ope) > 0 &&
!(ope->fi_flags & FI_DELIVERY_COMPLETE)) {
return (op == ofi_op_tagged) ? EFA_RDM_RUNTREAD_TAGRTM_PKT
: EFA_RDM_RUNTREAD_MSGRTM_PKT;
} else {
return (op == ofi_op_tagged) ? EFA_RDM_LONGREAD_TAGRTM_PKT
: EFA_RDM_LONGREAD_MSGRTM_PKT;
}
}
2 changes: 1 addition & 1 deletion prov/efa/src/rdm/efa_rdm_msg.c
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ int efa_rdm_msg_select_rtm(struct efa_rdm_ep *efa_rdm_ep, struct efa_rdm_ope *tx

eager_rtm_max_data_size = efa_rdm_txe_max_req_data_capacity(efa_rdm_ep, txe, eager_rtm);

readbase_rtm = efa_rdm_peer_select_readbase_rtm(txe->peer, efa_rdm_ep, txe);
readbase_rtm = efa_rdm_ep_select_readbase_rtm(efa_rdm_ep, txe);

if (txe->total_len >= hmem_info[iface].min_read_msg_size &&
efa_rdm_interop_rdma_read(efa_rdm_ep, txe->peer) &&
Expand Down
7 changes: 1 addition & 6 deletions prov/efa/src/rdm/efa_rdm_ope.c
Original file line number Diff line number Diff line change
Expand Up @@ -324,17 +324,12 @@ int efa_rdm_txe_prepare_to_be_read(struct efa_rdm_ope *txe, struct fi_rma_iov *r
static inline
void efa_rdm_txe_set_runt_size(struct efa_rdm_ep *ep, struct efa_rdm_ope *txe)
{
struct efa_rdm_peer *peer;

assert(txe->type == EFA_RDM_TXE);

if (txe->bytes_runt > 0)
return;

peer = efa_rdm_ep_get_peer(ep, txe->addr);

assert(peer);
txe->bytes_runt = efa_rdm_peer_get_runt_size(peer, ep, txe);
txe->bytes_runt = efa_rdm_ep_get_runt_size(ep, txe);

assert(txe->bytes_runt);
}
Expand Down
60 changes: 0 additions & 60 deletions prov/efa/src/rdm/efa_rdm_peer.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ void efa_rdm_peer_construct(struct efa_rdm_peer *peer, struct efa_rdm_ep *ep, st
peer->efa_fiaddr = conn->fi_addr;
peer->is_self = efa_is_same_addr(&ep->base_ep.src_addr, conn->ep_addr);
peer->host_id = peer->is_self ? ep->host_id : 0; /* Peer host id is exchanged via handshake */
peer->num_runt_bytes_in_flight = 0;
ofi_recvwin_buf_alloc(&peer->robuf, efa_env.recvwin_size);
dlist_init(&peer->outstanding_tx_pkts);
dlist_init(&peer->txe_list);
Expand Down Expand Up @@ -223,62 +222,3 @@ void efa_rdm_peer_proc_pending_items_in_robuf(struct efa_rdm_peer *peer, struct
return;
}

/**
* @brief Get the runt size for a given peer and ope
*
* @param peer rdm peer
* @param ep efa rdm ep
* @param ope efa rdm ope
* @return size_t the number of bytes that can be runt
*/
size_t efa_rdm_peer_get_runt_size(struct efa_rdm_peer *peer,
struct efa_rdm_ep *ep, struct efa_rdm_ope *ope)
{
struct efa_hmem_info *hmem_info;
size_t runt_size;
size_t memory_alignment;
int iface;

hmem_info = efa_rdm_ep_domain(ep)->hmem_info;
iface = ope->desc[0] ? ((struct efa_mr*) ope->desc[0])->peer.iface : FI_HMEM_SYSTEM;

if (hmem_info[iface].runt_size < peer->num_runt_bytes_in_flight)
return 0;

runt_size = MIN(hmem_info[iface].runt_size - peer->num_runt_bytes_in_flight, ope->total_len);
memory_alignment = efa_rdm_ep_get_memory_alignment(ep, iface);
/*
* runt size must be aligned because:
* 1. For LL128 protocol, the size to be copied on the receiver side must be 128-multiple,
* 128 is the alignment in this case.
* 2. For non-LL128 protocol, using aligned runt size has optimal performance for data copy.
* Note the returned value can be 0. In that case we will not use runting read protocol.
*/
return (runt_size & ~(memory_alignment - 1));
}

/**
* @brief Determine which Read based protocol to use for a given peer
*
* @param[in] peer rdm peer
* @param[in] ep efa rdm ep
* @param[in] efa_rdm_ope efa rdm ope
* @return The read-based protocol to use based on inputs.
*/
int efa_rdm_peer_select_readbase_rtm(struct efa_rdm_peer *peer,
struct efa_rdm_ep *ep, struct efa_rdm_ope *ope)
{
int op = ope->op;

assert(op == ofi_op_tagged || op == ofi_op_msg);

if (efa_rdm_ep_domain(ep)->num_read_msg_in_flight == 0 &&
efa_rdm_peer_get_runt_size(peer, ep, ope) > 0 &&
!(ope->fi_flags & FI_DELIVERY_COMPLETE)) {
return (op == ofi_op_tagged) ? EFA_RDM_RUNTREAD_TAGRTM_PKT
: EFA_RDM_RUNTREAD_MSGRTM_PKT;
} else {
return (op == ofi_op_tagged) ? EFA_RDM_LONGREAD_TAGRTM_PKT
: EFA_RDM_LONGREAD_MSGRTM_PKT;
}
}
10 changes: 0 additions & 10 deletions prov/efa/src/rdm/efa_rdm_peer.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,6 @@ struct efa_rdm_peer {
struct dlist_entry rx_unexp_tagged_list; /**< a list of unexpected tagged rxe for this peer */
struct dlist_entry txe_list; /**< a list of txe related to this peer */
struct dlist_entry rxe_list; /**< a list of rxe relased to this peer */

/**
* @brief number of bytes that has been sent as part of runting protocols
* @details this value is capped by efa_env.efa_runt_size
*/
int64_t num_runt_bytes_in_flight;
};

/**
Expand Down Expand Up @@ -248,8 +242,4 @@ int efa_rdm_peer_reorder_msg(struct efa_rdm_peer *peer, struct efa_rdm_ep *ep, s

void efa_rdm_peer_proc_pending_items_in_robuf(struct efa_rdm_peer *peer, struct efa_rdm_ep *ep);

size_t efa_rdm_peer_get_runt_size(struct efa_rdm_peer *peer, struct efa_rdm_ep *ep, struct efa_rdm_ope *ope);

int efa_rdm_peer_select_readbase_rtm(struct efa_rdm_peer *peer, struct efa_rdm_ep *ep, struct efa_rdm_ope *ope);

#endif /* EFA_RDM_PEER_H */
12 changes: 3 additions & 9 deletions prov/efa/src/rdm/efa_rdm_pke_rtm.c
Original file line number Diff line number Diff line change
Expand Up @@ -1340,16 +1340,13 @@ void efa_rdm_pke_handle_runtread_rtm_sent(struct efa_rdm_pke *pkt_entry)
{
struct efa_rdm_ep *ep;
struct efa_rdm_ope *txe;
struct efa_rdm_peer *peer;
size_t pkt_data_size = pkt_entry->payload_size;

ep = pkt_entry->ep;
peer = efa_rdm_ep_get_peer(ep, pkt_entry->addr);
assert(peer);

txe = pkt_entry->ope;
txe->bytes_sent += pkt_data_size;
peer->num_runt_bytes_in_flight += pkt_data_size;
efa_rdm_ep_domain(ep)->num_runt_bytes_in_flight += pkt_data_size;

if (efa_rdm_pke_get_runtread_rtm_base_hdr(pkt_entry)->seg_offset == 0 &&
txe->total_len > txe->bytes_runt)
Expand All @@ -1368,18 +1365,15 @@ void efa_rdm_pke_handle_runtread_rtm_send_completion(struct efa_rdm_pke *pkt_ent
{
struct efa_rdm_ep *ep;
struct efa_rdm_ope *txe;
struct efa_rdm_peer *peer;
size_t pkt_data_size;

ep = pkt_entry->ep;
txe = pkt_entry->ope;
pkt_data_size = pkt_entry->payload_size;
txe->bytes_acked += pkt_data_size;

peer = efa_rdm_ep_get_peer(ep, pkt_entry->addr);
assert(peer);
assert(peer->num_runt_bytes_in_flight >= pkt_data_size);
peer->num_runt_bytes_in_flight -= pkt_data_size;
assert(efa_rdm_ep_domain(ep)->num_runt_bytes_in_flight >= pkt_data_size);
efa_rdm_ep_domain(ep)->num_runt_bytes_in_flight -= pkt_data_size;
if (txe->total_len == txe->bytes_acked)
efa_rdm_ope_handle_send_completed(txe);
}