Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes surprising possible ordering of nng_pipe_notify events. #961

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/core/pipe.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,17 @@ nni_pipe_sys_fini(void)
static void
pipe_destroy(nni_pipe *p)
{
nni_sock *s;
nni_mtx *pipe_cbs_mtx;
if (p == NULL) {
return;
}
s = p->p_sock;
pipe_cbs_mtx = nni_sock_pipe_cbs_mtx(s);

nni_mtx_lock(pipe_cbs_mtx);
nni_pipe_run_cb(p, NNG_PIPE_EV_REM_POST);
nni_mtx_unlock(pipe_cbs_mtx);

// Make sure any unlocked holders are done with this.
// This happens during initialization for example.
Expand Down Expand Up @@ -372,4 +378,4 @@ nni_pipe_bump_error(nni_pipe *p, int err)
} else {
nni_listener_bump_error(p->p_listener, err);
}
}
}
21 changes: 16 additions & 5 deletions src/core/socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -1425,27 +1425,32 @@ nni_dialer_add_pipe(nni_dialer *d, void *tpipe)
nni_stat_inc_atomic(&s->s_stats.s_npipes, 1);
nni_stat_inc_atomic(&d->d_stats.s_npipes, 1);

// grab the pipe_cbs_mtx for the entire time between calling pre-connect
// callbacks and post-connect callbacks. This ensures that the post-remove
// callback cannot fire before the post-connect callback.
nni_mtx_lock(&s->s_pipe_cbs_mtx);
nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_PRE);

nni_mtx_lock(&s->s_mx);
if (p->p_closed) {
nni_mtx_unlock(&s->s_mx);
nni_mtx_unlock(&s->s_pipe_cbs_mtx);
nni_stat_inc_atomic(&d->d_stats.s_reject, 1);
nni_stat_inc_atomic(&s->s_stats.s_reject, 1);
nni_pipe_rele(p);
return;
}
if (p->p_proto_ops.pipe_start(p->p_proto_data) != 0) {
nni_mtx_unlock(&s->s_mx);
nni_mtx_unlock(&s->s_pipe_cbs_mtx);
nni_stat_inc_atomic(&d->d_stats.s_reject, 1);
nni_stat_inc_atomic(&s->s_stats.s_reject, 1);
nni_pipe_close(p);
nni_pipe_rele(p);
return;
}
nni_mtx_unlock(&s->s_mx);

nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_POST);
nni_mtx_unlock(&s->s_pipe_cbs_mtx);
nni_pipe_rele(p);
}

Expand Down Expand Up @@ -1534,18 +1539,21 @@ nni_listener_add_pipe(nni_listener *l, void *tpipe)
nni_stat_inc_atomic(&l->l_stats.s_npipes, 1);
nni_stat_inc_atomic(&s->s_stats.s_npipes, 1);

nni_mtx_lock(&s->s_pipe_cbs_mtx);
nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_PRE);

nni_mtx_lock(&s->s_mx);
if (p->p_closed) {
nni_mtx_unlock(&s->s_mx);
nni_mtx_unlock(&s->s_pipe_cbs_mtx);
nni_stat_inc_atomic(&l->l_stats.s_reject, 1);
nni_stat_inc_atomic(&s->s_stats.s_reject, 1);
nni_pipe_rele(p);
return;
}
if (p->p_proto_ops.pipe_start(p->p_proto_data) != 0) {
nni_mtx_unlock(&s->s_mx);
nni_mtx_unlock(&s->s_pipe_cbs_mtx);
nni_stat_inc_atomic(&l->l_stats.s_reject, 1);
nni_stat_inc_atomic(&s->s_stats.s_reject, 1);
nni_pipe_close(p);
Expand All @@ -1555,6 +1563,7 @@ nni_listener_add_pipe(nni_listener *l, void *tpipe)
nni_mtx_unlock(&s->s_mx);

nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_POST);
nni_mtx_unlock(&s->s_pipe_cbs_mtx);
nni_pipe_rele(p);
}

Expand Down Expand Up @@ -1632,19 +1641,16 @@ nni_pipe_run_cb(nni_pipe *p, nng_pipe_ev ev)
nng_pipe_cb cb;
void * arg;

nni_mtx_lock(&s->s_pipe_cbs_mtx);
if (!p->p_cbs) {
if (ev == NNG_PIPE_EV_ADD_PRE) {
// First event, after this we want all other events.
p->p_cbs = true;
} else {
nni_mtx_unlock(&s->s_pipe_cbs_mtx);
return;
}
}
cb = s->s_pipe_cbs[ev].cb_fn;
arg = s->s_pipe_cbs[ev].cb_arg;
nni_mtx_unlock(&s->s_pipe_cbs_mtx);

if (cb != NULL) {
nng_pipe pid;
Expand All @@ -1653,6 +1659,11 @@ nni_pipe_run_cb(nni_pipe *p, nng_pipe_ev ev)
}
}

extern
nni_mtx *nni_sock_pipe_cbs_mtx(nni_sock *s) {
return &s->s_pipe_cbs_mtx;
}

void
nni_pipe_remove(nni_pipe *p)
{
Expand Down
2 changes: 2 additions & 0 deletions src/core/socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ extern uint32_t nni_sock_flags(nni_sock *);
// types.) The second argument is a mask of events for which the callback
// should be executed.
extern void nni_sock_set_pipe_cb(nni_sock *sock, int, nng_pipe_cb, void *);
// the pipe_cbs_mtx must be held whenever pipe callback functions are called.
extern nni_mtx *nni_sock_pipe_cbs_mtx(nni_sock *);

// nni_ctx_open is used to open/create a new context structure.
// Contexts are not supported by most protocols, but for those that do,
Expand Down