diff --git a/src/sp/transport/ipc/ipc.c b/src/sp/transport/ipc/ipc.c index 61c25da3b..3fa083bb9 100644 --- a/src/sp/transport/ipc/ipc.c +++ b/src/sp/transport/ipc/ipc.c @@ -1,5 +1,5 @@ // -// Copyright 2021 Staysail Systems, Inc. +// Copyright 2024 Staysail Systems, Inc. // Copyright 2018 Capitar IT Group BV // Copyright 2019 Devolutions // @@ -25,29 +25,27 @@ typedef struct ipc_ep ipc_ep; // ipc_pipe is one end of an IPC connection. struct ipc_pipe { - nng_stream *conn; - uint16_t peer; - uint16_t proto; - size_t rcv_max; - bool closed; - ipc_ep *ep; - nni_pipe *pipe; - nni_list_node node; - nni_atomic_flag reaped; - nni_reap_node reap; - uint8_t tx_head[1 + sizeof(uint64_t)]; - uint8_t rx_head[1 + sizeof(uint64_t)]; - size_t got_tx_head; - size_t got_rx_head; - size_t want_tx_head; - size_t want_rx_head; - nni_list recv_q; - nni_list send_q; - nni_aio tx_aio; - nni_aio rx_aio; - nni_aio neg_aio; - nni_msg *rx_msg; - nni_mtx mtx; + nng_stream *conn; + uint16_t peer; + uint16_t proto; + size_t rcv_max; + bool closed; + ipc_ep *ep; + nni_pipe *pipe; + nni_list_node node; + uint8_t tx_head[1 + sizeof(uint64_t)]; + uint8_t rx_head[1 + sizeof(uint64_t)]; + size_t got_tx_head; + size_t got_rx_head; + size_t want_tx_head; + size_t want_rx_head; + nni_list recv_q; + nni_list send_q; + nni_aio tx_aio; + nni_aio rx_aio; + nni_aio neg_aio; + nni_msg *rx_msg; + nni_mtx mtx; }; struct ipc_ep { @@ -65,7 +63,7 @@ struct ipc_ep { nni_aio *time_aio; nni_list busy_pipes; // busy pipes -- ones passed to socket nni_list wait_pipes; // pipes waiting to match to socket - nni_list neg_pipes; // pipes busy negotiating + nni_list nego_pipes; // pipes busy negotiating nni_reap_node reap; #ifdef NNG_ENABLE_STATS nni_stat_item st_rcv_max; @@ -76,7 +74,7 @@ static void ipc_pipe_send_start(ipc_pipe *p); static void ipc_pipe_recv_start(ipc_pipe *p); static void ipc_pipe_send_cb(void *); static void ipc_pipe_recv_cb(void *); -static void ipc_pipe_neg_cb(void *); +static void ipc_pipe_nego_cb(void *); static void ipc_pipe_fini(void *); static void ipc_ep_fini(void *); @@ -85,11 +83,6 @@ static nni_reap_list ipc_ep_reap_list = { .rl_func = ipc_ep_fini, }; -static nni_reap_list ipc_pipe_reap_list = { - .rl_offset = offsetof(ipc_pipe, reap), - .rl_func = ipc_pipe_fini, -}; - static void ipc_tran_init(void) { @@ -161,17 +154,6 @@ ipc_pipe_fini(void *arg) NNI_FREE_STRUCT(p); } -static void -ipc_pipe_reap(ipc_pipe *p) -{ - if (!nni_atomic_flag_test_and_set(&p->reaped)) { - if (p->conn != NULL) { - nng_stream_close(p->conn); - } - nni_reap(&ipc_pipe_reap_list, p); - } -} - static int ipc_pipe_alloc(ipc_pipe **pipe_p) { @@ -183,10 +165,9 @@ ipc_pipe_alloc(ipc_pipe **pipe_p) nni_mtx_init(&p->mtx); nni_aio_init(&p->tx_aio, ipc_pipe_send_cb, p); nni_aio_init(&p->rx_aio, ipc_pipe_recv_cb, p); - nni_aio_init(&p->neg_aio, ipc_pipe_neg_cb, p); + nni_aio_init(&p->neg_aio, ipc_pipe_nego_cb, p); nni_aio_list_init(&p->send_q); nni_aio_list_init(&p->recv_q); - nni_atomic_flag_reset(&p->reaped); *pipe_p = p; return (0); } @@ -210,7 +191,7 @@ ipc_ep_match(ipc_ep *ep) } static void -ipc_pipe_neg_cb(void *arg) +ipc_pipe_nego_cb(void *arg) { ipc_pipe *p = arg; ipc_ep *ep = p->ep; @@ -261,7 +242,7 @@ ipc_pipe_neg_cb(void *arg) // We are ready now. We put this in the wait list, and // then try to run the matcher. - nni_list_remove(&ep->neg_pipes, p); + nni_list_remove(&ep->nego_pipes, p); nni_list_append(&ep->wait_pipes, p); ipc_ep_match(ep); @@ -276,6 +257,7 @@ ipc_pipe_neg_cb(void *arg) if (rv == NNG_ECLOSED) { rv = NNG_ECONNSHUT; } + nni_list_remove(&ep->nego_pipes, p); nng_stream_close(p->conn); // If we are waiting to negotiate on a client side, then a failure // here has to be passed to the user app. @@ -284,7 +266,7 @@ ipc_pipe_neg_cb(void *arg) nni_aio_finish_error(user_aio, rv); } nni_mtx_unlock(&ep->mtx); - ipc_pipe_reap(p); + ipc_pipe_fini(p); } static void @@ -658,7 +640,7 @@ ipc_pipe_start(ipc_pipe *p, nng_stream *conn, ipc_ep *ep) iov.iov_len = 8; iov.iov_buf = &p->tx_head[0]; nni_aio_set_iov(&p->neg_aio, 1, &iov); - nni_list_append(&ep->neg_pipes, p); + nni_list_append(&ep->nego_pipes, p); nni_aio_set_timeout(&p->neg_aio, 10000); // 10 sec timeout to negotiate nng_stream_send(p->conn, &p->neg_aio); @@ -679,7 +661,7 @@ ipc_ep_close(void *arg) if (ep->listener != NULL) { nng_stream_listener_close(ep->listener); } - NNI_LIST_FOREACH (&ep->neg_pipes, p) { + NNI_LIST_FOREACH (&ep->nego_pipes, p) { ipc_pipe_close(p); } NNI_LIST_FOREACH (&ep->wait_pipes, p) { @@ -835,7 +817,7 @@ ipc_ep_init(ipc_ep **epp, nni_sock *sock) nni_mtx_init(&ep->mtx); NNI_LIST_INIT(&ep->busy_pipes, ipc_pipe, node); NNI_LIST_INIT(&ep->wait_pipes, ipc_pipe, node); - NNI_LIST_INIT(&ep->neg_pipes, ipc_pipe, node); + NNI_LIST_INIT(&ep->nego_pipes, ipc_pipe, node); ep->proto = nni_sock_proto_id(sock); diff --git a/src/sp/transport/tcp/tcp.c b/src/sp/transport/tcp/tcp.c index 5aead15de..7c51d52af 100644 --- a/src/sp/transport/tcp/tcp.c +++ b/src/sp/transport/tcp/tcp.c @@ -23,29 +23,27 @@ typedef struct tcptran_ep tcptran_ep; // tcp_pipe is one end of a TCP connection. struct tcptran_pipe { - nng_stream *conn; - nni_pipe *npipe; - uint16_t peer; - uint16_t proto; - size_t rcvmax; - bool closed; - nni_list_node node; - tcptran_ep *ep; - nni_atomic_flag reaped; - nni_reap_node reap; - uint8_t txlen[sizeof(uint64_t)]; - uint8_t rxlen[sizeof(uint64_t)]; - size_t gottxhead; - size_t gotrxhead; - size_t wanttxhead; - size_t wantrxhead; - nni_list recvq; - nni_list sendq; - nni_aio *txaio; - nni_aio *rxaio; - nni_aio *negoaio; - nni_msg *rxmsg; - nni_mtx mtx; + nng_stream *conn; + nni_pipe *npipe; + uint16_t peer; + uint16_t proto; + size_t rcvmax; + bool closed; + nni_list_node node; + tcptran_ep *ep; + uint8_t txlen[sizeof(uint64_t)]; + uint8_t rxlen[sizeof(uint64_t)]; + size_t gottxhead; + size_t gotrxhead; + size_t wanttxhead; + size_t wantrxhead; + nni_list recvq; + nni_list sendq; + nni_aio *txaio; + nni_aio *rxaio; + nni_aio *negoaio; + nni_msg *rxmsg; + nni_mtx mtx; }; struct tcptran_ep { @@ -86,11 +84,6 @@ static nni_reap_list tcptran_ep_reap_list = { .rl_func = tcptran_ep_fini, }; -static nni_reap_list tcptran_pipe_reap_list = { - .rl_offset = offsetof(tcptran_pipe, reap), - .rl_func = tcptran_pipe_fini, -}; - static void tcptran_init(void) { @@ -162,17 +155,6 @@ tcptran_pipe_fini(void *arg) NNI_FREE_STRUCT(p); } -static void -tcptran_pipe_reap(tcptran_pipe *p) -{ - if (!nni_atomic_flag_test_and_set(&p->reaped)) { - if (p->conn != NULL) { - nng_stream_close(p->conn); - } - nni_reap(&tcptran_pipe_reap_list, p); - } -} - static int tcptran_pipe_alloc(tcptran_pipe **pipep) { @@ -192,7 +174,6 @@ tcptran_pipe_alloc(tcptran_pipe **pipep) } nni_aio_list_init(&p->recvq); nni_aio_list_init(&p->sendq); - nni_atomic_flag_reset(&p->reaped); *pipep = p; @@ -293,8 +274,10 @@ tcptran_pipe_nego_cb(void *arg) ep->useraio = NULL; nni_aio_finish_error(uaio, rv); } + nni_list_remove(&ep->negopipes, p); nni_mtx_unlock(&ep->mtx); - tcptran_pipe_reap(p); + + tcptran_pipe_fini(p); } static void diff --git a/src/sp/transport/tls/tls.c b/src/sp/transport/tls/tls.c index 30a957250..1321c8894 100644 --- a/src/sp/transport/tls/tls.c +++ b/src/sp/transport/tls/tls.c @@ -28,29 +28,27 @@ typedef struct tlstran_pipe tlstran_pipe; // tlstran_pipe is one end of a TLS connection. struct tlstran_pipe { - nng_stream *tls; - nni_pipe *npipe; - uint16_t peer; - uint16_t proto; - size_t rcvmax; - bool closed; - nni_list_node node; - nni_list sendq; - nni_list recvq; - tlstran_ep *ep; - nni_atomic_flag reaped; - nni_reap_node reap; - uint8_t txlen[sizeof(uint64_t)]; - uint8_t rxlen[sizeof(uint64_t)]; - size_t gottxhead; - size_t gotrxhead; - size_t wanttxhead; - size_t wantrxhead; - nni_aio *txaio; - nni_aio *rxaio; - nni_aio *negoaio; - nni_msg *rxmsg; - nni_mtx mtx; + nng_stream *tls; + nni_pipe *npipe; + uint16_t peer; + uint16_t proto; + size_t rcvmax; + bool closed; + nni_list_node node; + nni_list sendq; + nni_list recvq; + tlstran_ep *ep; + uint8_t txlen[sizeof(uint64_t)]; + uint8_t rxlen[sizeof(uint64_t)]; + size_t gottxhead; + size_t gotrxhead; + size_t wanttxhead; + size_t wantrxhead; + nni_aio *txaio; + nni_aio *rxaio; + nni_aio *negoaio; + nni_msg *rxmsg; + nni_mtx mtx; }; // Stuff that is common to both dialers and listeners. @@ -92,11 +90,6 @@ static nni_reap_list tlstran_ep_reap_list = { .rl_func = tlstran_ep_fini, }; -static nni_reap_list tlstran_pipe_reap_list = { - .rl_offset = offsetof(tlstran_pipe, reap), - .rl_func = tlstran_pipe_fini, -}; - static void tlstran_init(void) { @@ -181,23 +174,11 @@ tlstran_pipe_alloc(tlstran_pipe **pipep) } nni_aio_list_init(&p->recvq); nni_aio_list_init(&p->sendq); - nni_atomic_flag_reset(&p->reaped); *pipep = p; return (0); } -static void -tlstran_pipe_reap(tlstran_pipe *p) -{ - if (!nni_atomic_flag_test_and_set(&p->reaped)) { - if (p->tls != NULL) { - nng_stream_close(p->tls); - } - nni_reap(&tlstran_pipe_reap_list, p); - } -} - static void tlstran_ep_match(tlstran_ep *ep) { @@ -285,6 +266,7 @@ tlstran_pipe_nego_cb(void *arg) if (rv == NNG_ECLOSED) { rv = NNG_ECONNSHUT; } + nni_list_remove(&ep->negopipes, p); nng_stream_close(p->tls); if ((uaio = ep->useraio) != NULL) { @@ -292,7 +274,7 @@ tlstran_pipe_nego_cb(void *arg) nni_aio_finish_error(uaio, rv); } nni_mtx_unlock(&ep->mtx); - tlstran_pipe_reap(p); + tlstran_pipe_fini(p); } static void