Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rxHCResponse.asObservable() would't call onNext when work with Vertx.timer #42

Open
zhangpp520 opened this issue Aug 14, 2015 · 6 comments

Comments

@zhangpp520
Copy link

First I defined a Observable

Observable participants(){
return new RxHttpClient(..).request("GET",path,requestBuilder-> {
requestBuilder.end();
}).flatMap(rxHCResponse -> {
return rxHCResponse.asObservable().reduce(new Buffer(), RxSupport.mergeBuffers).map(buffer-> {
//never called !!!!
return new JsonObject(buffer.toString());
});
});
}

then

vertx.setTimer(1000, timerId -> {
participants().subscribe(result-> {
System.out.println(result);
})
});

I found I can't get anything from the Observable.

@petermd
Copy link
Contributor

petermd commented Aug 24, 2015

Hi,

is the problem specific to calling subscribe() from a timer?

it would be really helpful if you had a minimal test that reproduced the issue?

Thanks,
Peter

@zhangpp520
Copy link
Author

import io.vertx.rxcore.RxSupport;
import io.vertx.rxcore.java.http.RxHttpClient;
import org.vertx.java.core.buffer.Buffer;
import org.vertx.java.platform.Verticle;
import rx.Observable;

public class Test extends Verticle {
    public void start() {
        RxHttpClient httpClient = new RxHttpClient(vertx.createHttpClient().setHost("www.baidu.com"));
        Observable<String> ob= httpClient.request("GET", "/", requestBuilder -> {
            requestBuilder.end();
        }).flatMap(rxHCResponse -> {
            return rxHCResponse.asObservable().reduce(new Buffer(), RxSupport.mergeBuffers).map(buffer -> {
                return buffer.toString();
            });
        });

        ob.subscribe(c -> {
            //this can print
            System.out.println(c);
        });

        vertx.setTimer(1000, timerId -> {
            ob.subscribe(c -> {
                //this can't print
                System.out.println(c);
            });
        });
    }
}

The command is 'vertx run Test.java -includes io.vertxmod-rxvertx1.0.0-beta4'。
If I subscribe the ob inside the timer,I can't get anything.

@zhangpp520
Copy link
Author

And I find when I post data,there is a strange program.
If I use the format requestBuilder.end(data),it's Ok.
But when I use the format requestBuilder.write(data).end(),the server can't get the data occasionally.

@zhangpp520
Copy link
Author

I think it's same as the #5。If I wrap the Observable into Func,it works well.
So how can I make rxHttpClient lazily executed?

@petermd
Copy link
Contributor

petermd commented Sep 11, 2015

Hi @zhangpp520,

right - it is the same issue as #5, so there are two options

  1. add cache() immediately to the generated Observable so the request results are stored and replayed to all subscribers
  2. we change the behaviour to execute-on-subscribe
    1. by modifying the existing methods
    2. by adding a new set of observeMETHOD requests

Since it's a legacy library I'd suggest that 2(ii) is safer and hopefully the correct semantics can be implemented from the start in Vert.x 3

-Peter

@zhangpp520
Copy link
Author

Thank you so much,Peter!
You gave me great help when I was learning RxJava and rxvertx.
I find another problem due to the wrong semantics.

       rxHttpClient.request("GET",path,requestBuilder-> {
            requestBuilder.setTimeout(1000).end();
        }).flatMap(rxHCResponse -> {
            return rxHCResponse.asObservable().reduce(new Buffer(),       RxSupport.mergeBuffers).map(buffer -> {
                return new JsonObject(buffer.toString());
            });
        }).onErrorResumeNext(throwable -> {
            return Observable.just(new JsonObject());
        }).flatMap(rs-> {
              return Observable.just("never return").map(s-> {
                   return s;
              );
        }).subscribe(r-> {
              //never call
        });

If a latMap is subsequent to an http Observable,when there is an error occur and the stream resume by the onErrorResumeNext method,the subscribe method never call.I can use cache() to workround that.

Maybe I can add an observerequest method and pull a request,I will try that.

Thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants