Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Out of order events with imitate #239

Open
lolgesten opened this issue Feb 11, 2018 · 10 comments
Open

Out of order events with imitate #239

lolgesten opened this issue Feb 11, 2018 · 10 comments

Comments

@lolgesten
Copy link

I'm observing out of order events. First some pseudo to explain what I got.

function main(sources) {
  // updates to merged app state on the form `{key: value}`
  const update$ = xs.create();

  // folded app state
  const state$ = update$
    .fold((state, update) => ({...state, ...update}), {})
    .debug('state');

  // various inputs that produces updates$. 
  // one also evaluates state$ to updates. cyclic, but should be ok.
  ...

  // proxy that follows stream$, but sees a different order!!!
  const proxy$ = xs.from(stream$).debug('proxy');

  const realUpdate$ = xs.merge(
    ...
  ).debug('update');

  updates$.imitate(realUpdate$); // cyclic!

  return { ... };
}

My real scenario is more complicated with lots of state evaluation, so it's entirely possible that the problem is introduced somewhere else.

I observe the correct state order on state, but the proxy sees another.

screen shot 2018-02-11 at 20 50 49

  • 4th row, update with config merged into state.
  • 5th row new state with merged in config
  • 6th row evaluating the state$ we spot state IDLE and config, produce update INITED
  • 7th row new state merged in new appState
  • 8th - 9th row. proxy sees the events out of order. :(
@lolgesten
Copy link
Author

This is run with cycle.js, if that makes any difference.

@staltz
Copy link
Owner

staltz commented Feb 11, 2018

Hi! Thanks for reporting. Just to be sure, which version of xstream and cycle/run?

@lolgesten
Copy link
Author

    "@cycle/run": "^4.1.0",
    "xstream": "^11.2.0"

Also. I just found a workaround by using .compose(delay(1)) on the update stream that evaluates the app state.

@lolgesten
Copy link
Author

const xs = require('xstream').default;

const START_STATE = {
  ver: 0,   // counts up to see out of order
  a: false, // we toggle this on a timer
  b: false, // we toggle this on a different timer
  c: false, // we derive this from a and b
};

function loop() {

  // imitate cycle
  const update$ = xs.create();

  // the folded state, with ever increasing version number
  const state$ = update$.
        fold((p, c) => ({...p, ver: p.ver + 1, ...c}), START_STATE)
        .debug('state');

  // the timer updates
  const aUpdate$ = xs.periodic(500).map(n => ({a: n % 2 === 0}));
  const bUpdate$ = xs.periodic(100).map(n => ({b: n % 3 === 0}));

  // derived update from a/b
  const derivedUpdate$ = state$
        .filter(state => state.c !== state.a && state.b)
        .map(state => ({c: state.a && state.b}));

  // the proxy that sees the error
  const proxy$ = xs.from(state$).debug('proxy');

  // merge the update streams
  const realUpdate$ = xs.merge(
    aUpdate$,
    bUpdate$,
    derivedUpdate$
  );

  // and cycle back to top
  update$.imitate(realUpdate$);

  return {
    state$,
    proxy$,
  };

}

const {state$, proxy$} = loop();
state$.addListener({listener: next => {}});
proxy$.addListener({listener: next => {}});

@staltz here's a test case that reproduces the problem every time for me. It might be a bit contrived, i.e. there may be a more succinct way to construct a test case.

I see the problem in the proxy at ver: 6

state: { ver: 5, a: true, b: true, c: false }
state: { ver: 6, a: true, b: true, c: true }
proxy: { ver: 6, a: true, b: true, c: true }
proxy: { ver: 5, a: true, b: true, c: false }

@lolgesten
Copy link
Author

Wrote it as a test case too. https://gist.github.com/lolgesten/318dd558786d6da3d56338c123bdf3c7

@lolgesten
Copy link
Author

Looking at xstream code I think I got it wrong that this has to do with observables.

x.from() short circuits if it detects a Stream and returns the original stream instead.

I think that means we get two InternalListener on the same stream, and if the first listener causes an update to the stream itself, it will also see the update before the second.

@lolgesten
Copy link
Author

Maybe this isn't a bug. It's just I need to be aware of that shortcut. With that in mind, the fix is:

  // merge the update streams
  const realUpdate$ = xs.merge(
    aUpdate$,
    bUpdate$,
    derivedUpdate$
  ).compose(delay(1)); // delay update to allow all observers of state$ to see value

@staltz
Copy link
Owner

staltz commented Mar 20, 2018

@lolgesten A delay will fix it, but the issue is still real. I think imitate should do a buffer for synchronous emissions, like we have inside Cycle run. Hence, reopening this issue.

@staltz staltz reopened this Mar 20, 2018
@lolgesten
Copy link
Author

Ah ok! I agree that it'd be nicer if I didn't have to lose the synchronicity (is that a word? :) )

@lolgesten
Copy link
Author

Here's the smallest possible repro I've come up with.

const imitNumber$ = xs.create<number>();

const numberAdd$ = imitNumber$.filter((n) => n < 100).map((n) => n + 100);

const source$ = xs.create<number>();

const realNumber$ = xs.merge(source$, numberAdd$);

imitNumber$.imitate(realNumber$);

realNumber$.subscribe({next: (n) => console.log('real', n)});
imitNumber$.subscribe({next: (n) => console.log('imit', n)});

source$.shamefullySendNext(1);

The output is:

real 1
real 101
imit 101
imit 1

So what we see is that the imit subscriber is observing 101 before 1, even though 1 clearly has been evaluated since it's the cause of 101.

The problem occurs because numberAdd$ is synchronously evaluated.

Our fix to this problem is to patch imitate. Arguably the patch should be done for error as well, but we don't use errors really, so this is enough for us.

The important bit is that even if target$.shamefullySendNext(next!) causes a recursive invocation of next, we must not update the reactive tree with that value until later.

export const patchImitate = () => {
    Stream.prototype.imitate = function (source$: any) {
        return imitate(source$, this);
    };
};

const imitate = <T>(source$: Stream<T>, target$: Stream<T>) => {
    const queue: T[] = [];
    let running = false;

    source$.subscribe({
        next: (v) => {
            queue.push(v);

            if (running) {
                return;
            }

            running = true;

            while (queue.length) {
                const next = queue.shift();
                target$.shamefullySendNext(next!);
            }

            running = false;
        },
        error: (e) => {
            target$.shamefullySendError(e);
        },
        complete: () => {
            target$.shamefullySendComplete();
        },
    });
};

@lolgesten lolgesten changed the title Out of order events with immitate Out of order events with imitate Aug 13, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants