Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Desynchronization downstream of combine #309

Open
CarlLeth opened this issue Jul 30, 2020 · 16 comments
Open

Desynchronization downstream of combine #309

CarlLeth opened this issue Jul 30, 2020 · 16 comments

Comments

@CarlLeth
Copy link

CarlLeth commented Jul 30, 2020

I have an issue where I combine two streams, then make a new object. I then branch from that stream into two others, and it's important that the values in the two streams must originate from the same object. This has to do with the other libraries I'm using, but I also think it's the expected and intended behavior. Instead, the two streams end up looking at different objects because they've received different signals.

A simple illustration of the problem is below. Note that Math.random() is a stand-in for creating a new object that the signals in streams x and y both must originate from. As an example of why this is important, consider if the object is a new data entity that comes with a randomly-generated UUID; clearly x and y must agree on what that UUID is.

Expected behavior:

  • Both x and y fire every time ab fires.
  • xy emits the value "true"

Actual behavior:

  • ab fires, then only x fires, then ab fires, then only y fires.
  • xy emits the value "false" since x and y received different signals.
    const a = xs.of('a');
    const b = xs.of('b');

    const ab = xs
        .combine(a, b)
        .map(([a, b]) => Math.random())
        .debug('ab');

    const x = ab.map(o => o).debug('x');
    const y = ab.map(o => o).debug('y');

    const xy = xs
        .combine(x, y)
        .map(([x, y]) => x === y)
        .debug('xy');

    xy.addListener({
        next: e => console.log(e)
    });
@CarlLeth
Copy link
Author

Here's something unexpected. I wanted to see if it had to do with the complete signals from xs.of(). So I changed each xs.of('') to xs.never().startWith(''). In that situation, ab fires, and then x fires, and then nothing else happens. I can't quite figure out why that would be.

    const a = xs.never().startWith('a');
    const b = xs.never().startWith('b');

    const ab = ... // Same from here

This alternative works as expected: xy settles on 'true' after every tick of either a or b (apart from the mildly annoying intermediate 'false' value it emits, but that's a separate issue and can be solved with debounce(0))

    const a = xs.periodic(2000);
    const b = xs.periodic(3000);

    const ab = ... // Same from here

@CarlLeth
Copy link
Author

CarlLeth commented Jul 30, 2020

I've found that adding .compose(delay(0)) to every xs.of() and .startWith() call (and presumably anything else that causes an immediately emitted value) seems to restore the expected behavior. This may be an acceptable workaround for now.

@staltz
Copy link
Owner

staltz commented Jul 30, 2020

Hey @CarlLeth! I didn't read everything, because I'm quite confident this is a "glitch". It looks like the typical "diamond shaped" case of two synchronous streams (or in general, two streams that emit at the "same" time) being combined.

I wrote two blog posts about this (about RxJS but applicable to xstream too):

Read those carefully, and if you still have comments, let me know.

@CarlLeth
Copy link
Author

@staltz This is not a case of a transient, intermediate value being replaced by the correct value. The streams in the example never emit the correct value.

@CarlLeth
Copy link
Author

@staltz those were good articles and I agree with the majority of what you said. However, I don't think they apply here. I think this is a bigger problem than you're thinking it is. I'm glad I was able to find the delay(0) workaround, because even though the preconditions are perhaps unusual, this was a show-stopping bug for my project. Some important points about this functionality:

1. It violates basic invariants in reactive programming

Let's say I had this function:

function doubleStream(numbers: Stream<number>) {
    return numbers.map(x => x * 2);
}

I hope you agree it's reasonable to expect that (assuming subscriptions are running) whenever numbers emits a number, the stream returned by doubleStream emits twice that number. This is false. You cannot be guaranteed that numbers.map(...) ever receives the message from numbers, because it might have been created by combining two or more xs.of or .startWith calls. If the behavior of this function is completely different depending on how numbers was constructed, I've lost the ability to predict what my program will do (and test my code, since xs.of is very common in testing).

2. There are legit reasons to create new information and objects within a map function

I have lots of types like this (over-simplified for the example):

class Button {
    clicks: Stream<MouseEvent>;
    html: Stream<string>;

    constructor() { /*... */ }
}

class TextBox {
    updates: Stream<string>;
    html: Stream<string>;

    constructor() { /*... */ }
}

const addNewRecordButton = new Button();
const newRecords = addNewRecordButton.clicks.map(o => (records: Array<any>) => records.concat([{
    id: createGuid(),
    name: new TextBox()
}]));

Later on I merge all the TextBox.updates streams and separately combine all the TextBox.html streams, etc. You can see how much of a problem it would be if down-stream streams disagreed on which value they got out of createGuid(), or which memory location the streams created in new TextBox() are. This is exactly the desynchronization I'm seeing.

Again: this is not about glitches. I really don't care how many times createGuid() is called, as long as every time it's called, that value gets propagated to all downstream listeners. Right now, under the right conditions, it gets propagated to some downstream listeners, but not others. I hope you agree with me that's a clear error.

@wclr
Copy link
Contributor

wclr commented Aug 2, 2020

I belive everthying works as expected.

You can add this:

const ab = xs
  .combine(a, b)
  .map(([a, b]) => Math.random())
  .debug('ab')
  .concat(xs.never())
  .remember()

With this you make ab stream never complete and keep the latest value for the late subscriber y.

@CarlLeth
Copy link
Author

CarlLeth commented Aug 3, 2020

The promise of reactive programming is that your program becomes much more predictable and less susceptible to incorrect behavior caused by unexpected sequences of events. The promise of functional programming is that you can think mathematically, writing nearly "self-proving" and "obviously correct" programs. In neither of those worlds does it seem reasonable to me to say that this "works as expected".

I understand that if you pull off the covers and trace how the subscription actually happens, it's doing what you expect. But I wrote a simple statement that x depends on ab, and in xstream.js, it apparently doesn't. Given @staltz 's long history of writing deeply theoretical articles on reactive programming, I'm surprised to see you guys taking a stance of "the idiosyncrasies of our engine are more important than the underlying math". At least that's how it reads to me right now -- maybe there's some theoretical reason why this behavior must exist? It's certainly not explained or justified by the two articles @staltz linked.

But it's obviously your library and so far it's been the best reactive library I've found so it's hard for me to be too critical. Thanks for considering it.

@staltz
Copy link
Owner

staltz commented Aug 3, 2020

@CarlLeth I read the examples, and this still looks to me like a glitch. I've seen many reactive programming libraries throughout the years (rx, rxjs, Bacon.js, Most.js, Elm Signals (yes Elm used to have reactive streams), etc), and seen many issues opened just like this one, and the usual example is something along the lines of a combine with some synchronously emitting streams (or streams emitting at "the same time"), and the symptoms are the same. This issue looks exactly like one of those.

Glitches as such are, as you said, not mathematically pleasing, and break our ability to reason about code. There are two ways of fixing them:

(1) make the library free of glitches. This usually involves heavyweight techniques such as keeping a directed-acyclic graph-like structure that keeps track of all streams that "want" to emit, and then using a custom scheduler that does a breadth-first search through that graph and guarantees that glitches don't occur. I've seen this kind of implementation in menrva, Derivables, MobX, and others. It fits a specific use case of reactive programming, but the tracking of emissions with the DAG often has non-negligible performance costs.

(2) use the reactive programming library in a different way, often by modelling the variables in your application in a different way, making sure you combine streams that are always unrelated to each other, with no time-related dependencies, and using "MemoryStreams" for "values over time" and "Streams" for events. If you follow this code style, you will basically rarely or never find a glitch. I've been programming with reactive streams in 2 projects for the past 2~3 years and have still not found a glitch in those, due to this code style. The upside is that we get to keep some performance benefits, since emissions can propagate naively (in depth-first style) throughout the stream graph.

Solution (1) is a huge undertaking, would require rethinking xstream entirely, and in the end would be not as performant as today.

Solution (2) requires changing your code style to adhere to a reactive programming code style, but once learned you can forget about this category of problems, and keep the performance.

Note that solution (2) is what all these libraries decided to commit to: RxJS, most.js, xstream.

@CarlLeth
Copy link
Author

CarlLeth commented Aug 3, 2020

@staltz Thank you for the thoughtful response. I agree that a glitch-free library is based on a very different philosophy and I certainly don't expect a rethink of xstream. I still feel like this case is somehow different from a "normal" glitch, in that it's more about what happens during a subscribe ("backward") than how an emission is handled ("forward"). It seems like the values that propagate down the chain during the initial subscribe event are somehow cheating, or at least playing by different rules than values that propagate because they were emitted. Under the normal rules, ab is not allowed to emit a new value to some but not all of its active subscribers, but it is allowed to do that during a subscription. I wonder if subscribe is the right time to take a moment to analyze the graph.

I'll consider your advice in point (2). I'm not convinced I can achieve the level of abstraction I want if every module needs to know whether it's allowed to combine two of its input streams or not, but I'll keep trying. Thanks for the discussion.

@wclr
Copy link
Contributor

wclr commented Aug 4, 2020

The promise of reactive programming is that your program becomes much more predictable and less susceptible to incorrect behavior caused by unexpected sequences of events.

I believe there is no canonical definition of reactive programming but mostly it is referred to as programming with asynchronous data streams.

Your example is synchronous (it happens all in one JS-engine tick) and you should consider the specifics of the platform:

  • All subscriptions start when you add addListener at the end of the chain. But they actually happen sequentially as everything in JS engines. First x is subscribed, then y - as they are ordered in combine.
  • ab emits value to x when it subscribes and completes immediately (in the same tick). Why? Because there are no other subscribers to get the value at this moment and no more values to emit (a and b actually completed too, as they have just emitted a single value they are supposed to).
  • Then y subscribes and it starts ab stream all over again and gets the new random value. And in xy combine you get different values.

So you just should understand how things actually work.

@CarlLeth
Copy link
Author

CarlLeth commented Aug 4, 2020

So you just should understand how things actually work.

I can rephrase that statement: "So every module of code you write should understand exactly how every stream that gets passed in is constructed, since that will affect its behavior". That breaks the entire point of abstraction. Say I wrote a function:

function addSeven(x: number) {
  return x + 7;
}

I could reasonably complain if this function sometimes works and sometimes doesn't, depending on whether x was computed from a multiplication operation instead of addition. And you could come back to me and say: well, of course it works that way, if you just understood how our library works! Multiplication changes the bit-parity of the mantissa in the underlying pseudo-binary representation, and addition doesn't! Haha, see, you'd understand if you weren't so stupid!

And I would say: okay, well, this function has no way to check the bit-parity of the mantissa of the pseudo-binary representation, and besides, that's way outside of the scope of what number is supposed to represent. That's the definition of a leaky abstraction.

So when I write

function addSeven(numbers: Stream<number>) {
  return numbers.map(x => x +7);
}

and ask: why doesn't this always work? Why can numbers emit a value that the stream returned by this function never sees? And you come back and say: well of course it works that way, when numbers is downstream of a combine of two streams that are not temporally independent! You'd understand that if you weren't so stupid!

And I say: okay, well, this function has no way to check if the numbers stream is downstream of a combine of two streams that are not temporally independent, and besides, that's outside the abstraction of what I thought a Stream was supposed to be.

I understand the technical reasons why it's occurring in this case. But it is a flaw. It may be a theoretically necessary flaw, one that's not feasible to fix without causing other problems, and/or one you can work around if you completely change your coding style, but it's still a flaw. Don't act like this is some sort of stupid question.

@CarlLeth
Copy link
Author

CarlLeth commented Aug 4, 2020

Actually this problem would go away if a MemoryStream simply gave the last value it emitted to any new subscribers on subscription, right? Why doesn't it? Why does concat(ab, xs.never()).remember() work but ab.remember() doesn't? The completion signal causes .remember() to forget?

@wclr
Copy link
Contributor

wclr commented Aug 5, 2020

I could reasonably complain if this function sometimes works and sometimes doesn't,

If you know/understand how things work they always will be working that way, if you don't they sometimes maybe working in unknown/unpredictable fashion. -)

Why does concat(ab, xs.never()).remember() work but ab.remember() doesn't?

Because ab completes immediately after emitting single value to it has to x as I explained above, and y is subscribed after this and it restarts the ab stream. Contacting never makes stream not complete, you could also concat for example delay(0), it would work in this case too.

@CarlLeth
Copy link
Author

CarlLeth commented Aug 5, 2020

So the question boils down to: why does a MemoryStream forget its value when it receives a complete signal? That actually seems like the fundamental confusion point in a lot of these issues.

@wclr
Copy link
Contributor

wclr commented Aug 5, 2020

When hot stream completes - that means it's dead, it's over, it ain't going any further, nothing can "resume" completed stream, just restart all over again.

@CarlLeth
Copy link
Author

CarlLeth commented Aug 5, 2020

But isn't restarting a stream and replaying past events firmly in "cold" territory? If I subscribe to a MemoryStream, why is it "colder" to say "7, and don't expect any more values" instead of "oh hold on let me go run all the numbers again"? Besides, isn't MemoryStream forced to sit right on the boundary between hot and cold anyway? What's the actual logical reason why you wouldn't simply regurgitate the last value and complete?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants