Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes surprising possible ordering of nng_pipe_notify events. #961

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

codypiersall
Copy link
Contributor

On nng_pipe_notify events, it was possible for events to fire in an
unexpected order: pre-connect, post-remove, and finally
post-connect. This can cause errors in the wild if a resource is
attained in pre-connect and released in post-remove, as the resource
cannot be used in the post-connect event if the race is exercised.

Now, events will fire strictly in the order of pre-connect,
post-connect, and post-remove. If the pipe is closed in
pre-connect, post-connect and post-remove will not be called.

On nng_pipe_notify events, it was possible for events to fire in an
unexpected order: pre-connect, post-*remove*, and finally
post-*connect*.  This can cause errors in the wild if a resource is
attained in pre-connect and released in post-remove, as the resource
cannot be used in the post-connect event if the race is exercised.

Now, events will fire strictly in the order of pre-connect,
post-connect, and post-remove.  If the pipe is closed in
pre-connect, post-connect and post-remove will not be called.
@codypiersall
Copy link
Contributor Author

There is a semantic difference: now the callback is called with the s_pipe_cbs_mtx held. I suppose this means that users cannot change callbacks within a callback on pain of a deadlock, and there may be other issues I haven't thought of as well.

This fixes codypiersall/pynng#40. For a little background, in pynng, we register callback functions for each pipe event. Inside our callback, we call any callbacks specified by the user. To keep track of the Python representation of pipes, we keep track of some stuff internally when the callbacks are called. Whenever the callbacks are not called in an expected order we run into problems.

This PR doesn't fix any "bugs", exactly, but it does make it easier to reason about invariants. The pipe callbacks can now strictly only be called in the order of pre_connect, post_connect, and post_remove, or a subset of them. It can still happen that pre_connect is only called due to closing the associated socket (not just the pipe!), and it is also possible that only pre_connect and post_remove are called, if there is a race between closing the socket and a pipe getting attached. It is easier to think about than before though, IMO.


A different implementation that would accomplish the same thing would be to add a flags argument to nni_pipe_run_cb to indicate whether the s_pipe_cbs_mtx is already held. Then the method for grabbing the pipe cb mutex doesn't need to be added to src/core/socket.h, and the code in
src/core/pipe.c would just need to pass in a flag saying it doesn't have the mutex. I'm not sure if that's cleaner or not.

@bb010g
Copy link

bb010g commented Jun 19, 2019

Is a mutex strictly necessary here? I'm not familiar with nng code, but browsing around some, it seems you're mixing an ordering / state transition problem with an exclusivity problem. If you only want one callback running at once, in a set order, then you could use a condition variable locking on a new mutex for callback activity that guards the last nng_pipe_ev on being a valid input state for the transition. I.e., keep waiting if it's too early, and break out if you missed. This avoids your mentioned deadlock on hot-swapping callbacks. No idea if the cost of all that being on every pipe is worth it, though, or if the mutex would protect against other (potential?) problems I don't know about.

@codypiersall
Copy link
Contributor Author

Is a mutex strictly necessary here?

I see your point, @bb010g. This patch is using the mutex for more than what mutexes are traditionally used for--protecting data structures from being used in inconsistent states. Technically, this increases the critical section the mutex is protecting, since I didn't create the mutex for this patch, I just hold it longer.

I like the idea of using condition variables. When I have more time (such a precious commodity!) I'll create an issue with a more full discussion, and point to this PR as a potential solution. It probably won't be for a couple weeks though; I've been fighting with a lawn mower, and many battles are yet to come.

@gdamore
Copy link
Contributor

gdamore commented Jun 28, 2019

I need to look at this in more detail. I have some specific concerns about holding the lock across the callbacks -- I am worried that activity done in a callback might cause a deadlock (such as if the callback rejects the connection.)

The other thing is that if ordering is being caused by being run on different threads, we can actually create a separate thread just for running these events, providing serialization in that way. There may be concerns with that too -- as I would need to synchronize back against the socket when the preconnect and postconnect routines return.

Let me think about this.

@codypiersall
Copy link
Contributor Author

I may have been wrong about this being an issue in nng. The following C code I expected to reproduce this without using the Python bindings, but I couldn't reproduce it:

This C code does not seem to demonstrate the race as I thought it would:
issuex.c

#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>

#include <nng/nng.h>
#include <nng/protocol/pair0/pair.h>

#define CHECK(x) (assert((x) == 0))
#define ADDR "tcp://127.0.0.1:14224"

#define NUM_ITER 1000

typedef struct cb_arg {
    int sock_num;
    int events_idx;
    int events[3];
} cb_arg;

void cb(nng_pipe pipe, nng_pipe_ev event, void *data) {
    struct cb_arg *info = (struct cb_arg *) data;
    info->events[info->events_idx++] = event;
}

int main(int argc, char *argv[]) {
    nng_socket listener, dialer;
    int num_iter = NUM_ITER;
    char *addr = ADDR;
    if (argc >= 3) {
        num_iter = strtol(argv[2], NULL, 0);
    }
    if (argc >= 2) {
        addr = argv[1];
    }
    CHECK(nng_pair0_open(&listener));
    CHECK(nng_listen(listener, addr, NULL, 0));

    struct cb_arg *cb_args = malloc(num_iter * sizeof *cb_args);
    for (int i = 0; i < num_iter; i++) {
        struct cb_arg *cb_arg = &cb_args[i];
        cb_arg->sock_num = i;
        CHECK(nng_pair_open(&dialer));
        CHECK(nng_pipe_notify(dialer, NNG_PIPE_EV_ADD_PRE, cb, cb_arg));
        CHECK(nng_pipe_notify(dialer, NNG_PIPE_EV_ADD_POST, cb, cb_arg));
        CHECK(nng_pipe_notify(dialer, NNG_PIPE_EV_REM_POST, cb, cb_arg));
        CHECK(nng_dial(dialer, addr, NULL, 0));
        CHECK(nng_close(dialer));
    }

    for (int i = 0; i < num_iter; i++) {
        int *events = cb_args[i].events;
        fprintf(stdout, "%d %d %d\n", events[0], events[1], events[2]);
    }
    free(cb_args);
}

Assuming that code is called issuex.c, it can be built like so:

gcc -O2 -Wall issuex.c -lnng -lpthread -o issuex

And run like so, to output the unique ordering of pipe events:

./issuex inproc:///tmp/whatever 10000 | sort | uniq

So this "issue" should be treated as a non-issue until I can actually reproduce it in C without using pynng.

@codypiersall
Copy link
Contributor Author

Ugh, I realized that in my "reproducer" above, I was potentially watching pipe events from the wrong socket, if the short-lived sockets cause a race in the longer lived ones. I'll update the code and repost when I'm able to

@codypiersall
Copy link
Contributor Author

Okay, it took forever for me to get back to this, but I have finally made a plain C reproducer that demonstrates the bad pipe event ordering, and it is in the "details" tag below. My confusion before was that:

  1. The out-of-order events is only seen on a dialer socket, and
  2. Only when dialing with the flag NNG_FLAG_NONBLOCK.

I also added some sleeps depending on the event type in the callback to try to observe the out-of-order events.

The C program below just prints out nng_pipe_id event and the data can be post-processed in Python. The Python for the post-processing is below. Also, the C file, a Makefile, and the Python script are in a branch: https://github.com/codypiersall/nng/tree/issueX.

issuex.c: reproduces pipe event ordering
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <string.h>
#include <unistd.h>

#include <nng/nng.h>
#include <nng/protocol/pair0/pair.h>

#define CHECK(x) (assert((x) == 0))
#define ADDR "tcp://127.0.0.1:14224"

#define NUM_ITER 1000

struct seen_events {
    int pipe_id;
    int event;
};

struct cb_data {
    int iter;
    pthread_mutex_t mut;
    struct seen_events *events;
};

/* This callback just adds the current pipe id and event to the passed in events. */
void cb(nng_pipe pipe, nng_pipe_ev event, void *data) {
    // add delays to simulate doing real work.
    // These are the delays that demonstrated the race on my computer.
    if (event == NNG_PIPE_EV_ADD_PRE) {
        usleep(40);
    }
    else if (event == NNG_PIPE_EV_ADD_POST) {
        usleep(50);
    }
    else if (event == NNG_PIPE_EV_REM_POST) {
        usleep(20);
    }
    struct cb_data *d = (struct cb_data *) data;
    pthread_mutex_lock(&d->mut);
    d->iter += 1;
    d->events[d->iter].pipe_id = nng_pipe_id(pipe);
    d->events[d->iter].event = event;
    pthread_mutex_unlock(&d->mut);
}


void init_cb_data(struct cb_data *data, int num_iter) {
    data->iter = 0;
    pthread_mutex_init(&data->mut, NULL);
    data->events = malloc(sizeof (*data->events) * num_iter * 3);
}

int main(int argc, char *argv[]) {
    nng_socket listener, dialer;
    struct cb_data listener_cb_data, dialer_cb_data;
    init_cb_data(&listener_cb_data, NUM_ITER);
    init_cb_data(&dialer_cb_data, NUM_ITER);

    CHECK(nng_pair0_open(&listener));
    CHECK(nng_listen(listener, ADDR, NULL, 0));

    for (int j = 0; j < NUM_ITER; j++) {
        CHECK(nng_pair_open(&dialer));
        CHECK(nng_pipe_notify(dialer, NNG_PIPE_EV_ADD_PRE, cb, &dialer_cb_data));
        CHECK(nng_pipe_notify(dialer, NNG_PIPE_EV_ADD_POST, cb, &dialer_cb_data));
        CHECK(nng_pipe_notify(dialer, NNG_PIPE_EV_REM_POST, cb, &dialer_cb_data));
        CHECK(nng_dial(dialer, ADDR, NULL, NNG_FLAG_NONBLOCK));
        usleep(100);
        CHECK(nng_close(dialer));
    }

    for (int j = 0; j < NUM_ITER * 3; j++) {
        struct seen_events *e = &dialer_cb_data.events[j];
        fprintf(stdout, "%d %d\n", e->pipe_id, e->event);
    }
    usleep(100000);
}

The Python script outputs the pipe event order and number of occurrences:

Python script for postprocessing: parse_output.py
from collections import defaultdict, Counter
import fileinput

# appends pipe events in the order they are viewed.
d = defaultdict(list)
for line in fileinput.input():
    pipe_id, event = line.split()
    d[pipe_id].append(event)

d.pop('0', None)

vals = [tuple(x) for x in d.values()]
c = Counter(vals)
for event_order, n_occurred in c.items():
    orders = ','.join(event_order)
    print('{:7s} {:>3d}'.format(orders, n_occurred))

Build the C program with something like this:

gcc -O0 -Wall issuex.c -Iinclude -lnng -lpthread -Lbuild -o issuex

And run it like this:

./issuex | python3 parse_output.py

On my machine, it outputs something like this:

0,2     270
0,2,1   304
0,1,2   415
2,0       1

This indicates that 304 times, the pipe events fired (well, finished running) in the order pre_connect, post_remove, and post_connect, and 1 time in the order post_remove, pre_connect.

I intentionally tried to make a callback that would exacerbate the issues I saw happening in pynng at codypiersall/pynng#40

@gdamore
Copy link
Contributor

gdamore commented Dec 27, 2019

I think I want to take a somewhat different approach with this.
It seems like we need a state machine per pipe -- meaning the pipe should keep track of which cbs have run, and threads that are trying to run should back off. The removal should not run until after the others have run.

So basically we would have a lock (and possibly a condvar, though I'm not sure that's strictly necessary yet) and a couple of ints on the pipe that track what notifications have run and whether a notifier is running.

This would avoid the global locks (hopefully minimizing contention as well). I'd like to also take a close look at the preattach hook and other synchronization with the rest of the socket.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants