Skip to content

Commit

Permalink
Another attempt at the close deadlock, fix use-after-free.
Browse files Browse the repository at this point in the history
When closing pipes, we defer them to be reaped, but also leave
them in the match list where they might be picked up by ep_match,
or leak.  It's best to reap these proactively and ensure that they
are not allowed to life longer once they have errored during the
negotiation phase.
  • Loading branch information
gdamore committed Apr 27, 2024
1 parent 92fd162 commit 993a237
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 132 deletions.
82 changes: 32 additions & 50 deletions src/sp/transport/ipc/ipc.c
@@ -1,5 +1,5 @@
//
// Copyright 2021 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2024 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
// Copyright 2019 Devolutions <info@devolutions.net>
//
Expand All @@ -25,29 +25,27 @@ typedef struct ipc_ep ipc_ep;

// ipc_pipe is one end of an IPC connection.
struct ipc_pipe {
nng_stream *conn;
uint16_t peer;
uint16_t proto;
size_t rcv_max;
bool closed;
ipc_ep *ep;
nni_pipe *pipe;
nni_list_node node;
nni_atomic_flag reaped;
nni_reap_node reap;
uint8_t tx_head[1 + sizeof(uint64_t)];
uint8_t rx_head[1 + sizeof(uint64_t)];
size_t got_tx_head;
size_t got_rx_head;
size_t want_tx_head;
size_t want_rx_head;
nni_list recv_q;
nni_list send_q;
nni_aio tx_aio;
nni_aio rx_aio;
nni_aio neg_aio;
nni_msg *rx_msg;
nni_mtx mtx;
nng_stream *conn;
uint16_t peer;
uint16_t proto;
size_t rcv_max;
bool closed;
ipc_ep *ep;
nni_pipe *pipe;
nni_list_node node;
uint8_t tx_head[1 + sizeof(uint64_t)];
uint8_t rx_head[1 + sizeof(uint64_t)];
size_t got_tx_head;
size_t got_rx_head;
size_t want_tx_head;
size_t want_rx_head;
nni_list recv_q;
nni_list send_q;
nni_aio tx_aio;
nni_aio rx_aio;
nni_aio neg_aio;
nni_msg *rx_msg;
nni_mtx mtx;
};

struct ipc_ep {
Expand All @@ -65,7 +63,7 @@ struct ipc_ep {
nni_aio *time_aio;
nni_list busy_pipes; // busy pipes -- ones passed to socket
nni_list wait_pipes; // pipes waiting to match to socket
nni_list neg_pipes; // pipes busy negotiating
nni_list nego_pipes; // pipes busy negotiating
nni_reap_node reap;
#ifdef NNG_ENABLE_STATS
nni_stat_item st_rcv_max;
Expand All @@ -76,7 +74,7 @@ static void ipc_pipe_send_start(ipc_pipe *p);
static void ipc_pipe_recv_start(ipc_pipe *p);
static void ipc_pipe_send_cb(void *);
static void ipc_pipe_recv_cb(void *);
static void ipc_pipe_neg_cb(void *);
static void ipc_pipe_nego_cb(void *);
static void ipc_pipe_fini(void *);
static void ipc_ep_fini(void *);

Expand All @@ -85,11 +83,6 @@ static nni_reap_list ipc_ep_reap_list = {
.rl_func = ipc_ep_fini,
};

static nni_reap_list ipc_pipe_reap_list = {
.rl_offset = offsetof(ipc_pipe, reap),
.rl_func = ipc_pipe_fini,
};

static void
ipc_tran_init(void)
{
Expand Down Expand Up @@ -161,17 +154,6 @@ ipc_pipe_fini(void *arg)
NNI_FREE_STRUCT(p);
}

static void
ipc_pipe_reap(ipc_pipe *p)
{
if (!nni_atomic_flag_test_and_set(&p->reaped)) {
if (p->conn != NULL) {
nng_stream_close(p->conn);
}
nni_reap(&ipc_pipe_reap_list, p);
}
}

static int
ipc_pipe_alloc(ipc_pipe **pipe_p)
{
Expand All @@ -183,10 +165,9 @@ ipc_pipe_alloc(ipc_pipe **pipe_p)
nni_mtx_init(&p->mtx);
nni_aio_init(&p->tx_aio, ipc_pipe_send_cb, p);
nni_aio_init(&p->rx_aio, ipc_pipe_recv_cb, p);
nni_aio_init(&p->neg_aio, ipc_pipe_neg_cb, p);
nni_aio_init(&p->neg_aio, ipc_pipe_nego_cb, p);
nni_aio_list_init(&p->send_q);
nni_aio_list_init(&p->recv_q);
nni_atomic_flag_reset(&p->reaped);
*pipe_p = p;
return (0);
}
Expand All @@ -210,7 +191,7 @@ ipc_ep_match(ipc_ep *ep)
}

static void
ipc_pipe_neg_cb(void *arg)
ipc_pipe_nego_cb(void *arg)
{
ipc_pipe *p = arg;
ipc_ep *ep = p->ep;
Expand Down Expand Up @@ -261,7 +242,7 @@ ipc_pipe_neg_cb(void *arg)

// We are ready now. We put this in the wait list, and
// then try to run the matcher.
nni_list_remove(&ep->neg_pipes, p);
nni_list_remove(&ep->nego_pipes, p);
nni_list_append(&ep->wait_pipes, p);

ipc_ep_match(ep);
Expand All @@ -276,6 +257,7 @@ ipc_pipe_neg_cb(void *arg)
if (rv == NNG_ECLOSED) {
rv = NNG_ECONNSHUT;
}
nni_list_remove(&ep->nego_pipes, p);
nng_stream_close(p->conn);
// If we are waiting to negotiate on a client side, then a failure
// here has to be passed to the user app.
Expand All @@ -284,7 +266,7 @@ ipc_pipe_neg_cb(void *arg)
nni_aio_finish_error(user_aio, rv);
}
nni_mtx_unlock(&ep->mtx);
ipc_pipe_reap(p);
ipc_pipe_fini(p);
}

static void
Expand Down Expand Up @@ -658,7 +640,7 @@ ipc_pipe_start(ipc_pipe *p, nng_stream *conn, ipc_ep *ep)
iov.iov_len = 8;
iov.iov_buf = &p->tx_head[0];
nni_aio_set_iov(&p->neg_aio, 1, &iov);
nni_list_append(&ep->neg_pipes, p);
nni_list_append(&ep->nego_pipes, p);

nni_aio_set_timeout(&p->neg_aio, 10000); // 10 sec timeout to negotiate
nng_stream_send(p->conn, &p->neg_aio);
Expand All @@ -679,7 +661,7 @@ ipc_ep_close(void *arg)
if (ep->listener != NULL) {
nng_stream_listener_close(ep->listener);
}
NNI_LIST_FOREACH (&ep->neg_pipes, p) {
NNI_LIST_FOREACH (&ep->nego_pipes, p) {
ipc_pipe_close(p);
}
NNI_LIST_FOREACH (&ep->wait_pipes, p) {
Expand Down Expand Up @@ -835,7 +817,7 @@ ipc_ep_init(ipc_ep **epp, nni_sock *sock)
nni_mtx_init(&ep->mtx);
NNI_LIST_INIT(&ep->busy_pipes, ipc_pipe, node);
NNI_LIST_INIT(&ep->wait_pipes, ipc_pipe, node);
NNI_LIST_INIT(&ep->neg_pipes, ipc_pipe, node);
NNI_LIST_INIT(&ep->nego_pipes, ipc_pipe, node);

ep->proto = nni_sock_proto_id(sock);

Expand Down
65 changes: 24 additions & 41 deletions src/sp/transport/tcp/tcp.c
Expand Up @@ -23,29 +23,27 @@ typedef struct tcptran_ep tcptran_ep;

// tcp_pipe is one end of a TCP connection.
struct tcptran_pipe {
nng_stream *conn;
nni_pipe *npipe;
uint16_t peer;
uint16_t proto;
size_t rcvmax;
bool closed;
nni_list_node node;
tcptran_ep *ep;
nni_atomic_flag reaped;
nni_reap_node reap;
uint8_t txlen[sizeof(uint64_t)];
uint8_t rxlen[sizeof(uint64_t)];
size_t gottxhead;
size_t gotrxhead;
size_t wanttxhead;
size_t wantrxhead;
nni_list recvq;
nni_list sendq;
nni_aio *txaio;
nni_aio *rxaio;
nni_aio *negoaio;
nni_msg *rxmsg;
nni_mtx mtx;
nng_stream *conn;
nni_pipe *npipe;
uint16_t peer;
uint16_t proto;
size_t rcvmax;
bool closed;
nni_list_node node;
tcptran_ep *ep;
uint8_t txlen[sizeof(uint64_t)];
uint8_t rxlen[sizeof(uint64_t)];
size_t gottxhead;
size_t gotrxhead;
size_t wanttxhead;
size_t wantrxhead;
nni_list recvq;
nni_list sendq;
nni_aio *txaio;
nni_aio *rxaio;
nni_aio *negoaio;
nni_msg *rxmsg;
nni_mtx mtx;
};

struct tcptran_ep {
Expand Down Expand Up @@ -86,11 +84,6 @@ static nni_reap_list tcptran_ep_reap_list = {
.rl_func = tcptran_ep_fini,
};

static nni_reap_list tcptran_pipe_reap_list = {
.rl_offset = offsetof(tcptran_pipe, reap),
.rl_func = tcptran_pipe_fini,
};

static void
tcptran_init(void)
{
Expand Down Expand Up @@ -162,17 +155,6 @@ tcptran_pipe_fini(void *arg)
NNI_FREE_STRUCT(p);
}

static void
tcptran_pipe_reap(tcptran_pipe *p)
{
if (!nni_atomic_flag_test_and_set(&p->reaped)) {
if (p->conn != NULL) {
nng_stream_close(p->conn);
}
nni_reap(&tcptran_pipe_reap_list, p);
}
}

static int
tcptran_pipe_alloc(tcptran_pipe **pipep)
{
Expand All @@ -192,7 +174,6 @@ tcptran_pipe_alloc(tcptran_pipe **pipep)
}
nni_aio_list_init(&p->recvq);
nni_aio_list_init(&p->sendq);
nni_atomic_flag_reset(&p->reaped);

*pipep = p;

Expand Down Expand Up @@ -293,8 +274,10 @@ tcptran_pipe_nego_cb(void *arg)
ep->useraio = NULL;
nni_aio_finish_error(uaio, rv);
}
nni_list_remove(&ep->negopipes, p);
nni_mtx_unlock(&ep->mtx);
tcptran_pipe_reap(p);

tcptran_pipe_fini(p);
}

static void
Expand Down
64 changes: 23 additions & 41 deletions src/sp/transport/tls/tls.c
Expand Up @@ -28,29 +28,27 @@ typedef struct tlstran_pipe tlstran_pipe;

// tlstran_pipe is one end of a TLS connection.
struct tlstran_pipe {
nng_stream *tls;
nni_pipe *npipe;
uint16_t peer;
uint16_t proto;
size_t rcvmax;
bool closed;
nni_list_node node;
nni_list sendq;
nni_list recvq;
tlstran_ep *ep;
nni_atomic_flag reaped;
nni_reap_node reap;
uint8_t txlen[sizeof(uint64_t)];
uint8_t rxlen[sizeof(uint64_t)];
size_t gottxhead;
size_t gotrxhead;
size_t wanttxhead;
size_t wantrxhead;
nni_aio *txaio;
nni_aio *rxaio;
nni_aio *negoaio;
nni_msg *rxmsg;
nni_mtx mtx;
nng_stream *tls;
nni_pipe *npipe;
uint16_t peer;
uint16_t proto;
size_t rcvmax;
bool closed;
nni_list_node node;
nni_list sendq;
nni_list recvq;
tlstran_ep *ep;
uint8_t txlen[sizeof(uint64_t)];
uint8_t rxlen[sizeof(uint64_t)];
size_t gottxhead;
size_t gotrxhead;
size_t wanttxhead;
size_t wantrxhead;
nni_aio *txaio;
nni_aio *rxaio;
nni_aio *negoaio;
nni_msg *rxmsg;
nni_mtx mtx;
};

// Stuff that is common to both dialers and listeners.
Expand Down Expand Up @@ -92,11 +90,6 @@ static nni_reap_list tlstran_ep_reap_list = {
.rl_func = tlstran_ep_fini,
};

static nni_reap_list tlstran_pipe_reap_list = {
.rl_offset = offsetof(tlstran_pipe, reap),
.rl_func = tlstran_pipe_fini,
};

static void
tlstran_init(void)
{
Expand Down Expand Up @@ -181,23 +174,11 @@ tlstran_pipe_alloc(tlstran_pipe **pipep)
}
nni_aio_list_init(&p->recvq);
nni_aio_list_init(&p->sendq);
nni_atomic_flag_reset(&p->reaped);

*pipep = p;
return (0);
}

static void
tlstran_pipe_reap(tlstran_pipe *p)
{
if (!nni_atomic_flag_test_and_set(&p->reaped)) {
if (p->tls != NULL) {
nng_stream_close(p->tls);
}
nni_reap(&tlstran_pipe_reap_list, p);
}
}

static void
tlstran_ep_match(tlstran_ep *ep)
{
Expand Down Expand Up @@ -285,14 +266,15 @@ tlstran_pipe_nego_cb(void *arg)
if (rv == NNG_ECLOSED) {
rv = NNG_ECONNSHUT;
}
nni_list_remove(&ep->negopipes, p);
nng_stream_close(p->tls);

if ((uaio = ep->useraio) != NULL) {
ep->useraio = NULL;
nni_aio_finish_error(uaio, rv);
}
nni_mtx_unlock(&ep->mtx);
tlstran_pipe_reap(p);
tlstran_pipe_fini(p);
}

static void
Expand Down

0 comments on commit 993a237

Please sign in to comment.