Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Attestation pool take 1: interop #176

Draft
wants to merge 64 commits into
base: interop
Choose a base branch
from

Conversation

mkalinin
Copy link
Contributor

No description provided.

@Nashatyrev
Copy link
Contributor

TimeProcessors and similar classes:
what's the rationale behind merging all events into single stream and then sorting them out by instanceof?
May be it make sense to subscribe all feed* methods directly to the corresponding streams? (to make this thread safe you may do publishOn(singleThreadScheduler) for all inbound streams)

* <p>Implementor MAY throw an {@link AssertionError} if it's been called before inner state has
* been initialised.
*/
public interface StatefulProcessor {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand the purpose of this interface.
You either assert that a component is inited or just skip an entry if not inited. Couldn't this be the internal logic of this component?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For instance, we have checkers which returns value of boolean type saying whether check passed or not. These checkers may have several inputs and state initialized from either one or all of those inputs. Since, check MUST returns true/false there is no room for merely discarding the value fed to a checker. We might use Optional<Boolean> for that but I am not sure that it would be a better choice.

@Nashatyrev
Copy link
Contributor

Nashatyrev commented Aug 29, 2019

Regarding a component which is statefull and has several inbound streams - I'm still not sure what should be the proper pattern for it
Ideally all this RX stuff should be stateless but often it is pretty tricky and we need to go back to imperative world

@mkalinin
Copy link
Contributor Author

Ideally all this RX stuff should be stateless but often it is pretty tricky and we need to go back to imperative world

I agree with that. It looks like Reactor is not meant to handle stateful processors and was created for another purposes. This way it looks more like a misuse. Let's keep it for now as it is and replace Reactor with a proper thing later.

public IdentificationProcessor(
UnknownAttestationPool pool,
Schedulers schedulers,
Flux<ReceivedAttestation> source,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would consider using more general Publisher as an input. I believe Flux.from(Publisher) shortcuts the call when the parameter is Flux so this should be for free

}
}

public DirectProcessor<ReceivedAttestation> getUnknownAttestations() {
return unknownAttestations;
public OutsourcePublisher<ReceivedAttestation> getIdentified() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't expose OutsourcePublisher implementation until the caller is supposed to write items to this stream. Wouldn't returning Publisher suits?

}
}

public Flux<ReceivedAttestation> getValid() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same here, m.b. just Publisher ?

public Flux<ReceivedAttestation> getInvalid() {
return invalid;
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why didn't you finally end up with the laconic form as we discussed?

validatedAtt = att.map(att -> Pair.of(att, checker.check(att))).publish()
validAtt = validatedAtt.filter(a -> a.right).map(a -> a.left)
invalidAtt = validatedAtt.filter(a -> !a.right).map(a -> a.left)

@Override
public void subscribe(CoreSubscriber<? super ReceivedAttestation> actual) {
out.subscribe(actual);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not expose the Publisher<ReceivedAttestation> here to make it uniform across the whole lib?

@Nashatyrev
Copy link
Contributor

The general feeling is that using RX shouldn't require such amount of boilerplate code

@mkalinin mkalinin changed the base branch from develop to interop September 7, 2019 14:10
@mkalinin mkalinin changed the title [WIP] Attestation pool [WIP] Attestation pool: take 1, interop Sep 7, 2019
@mkalinin mkalinin changed the title [WIP] Attestation pool: take 1, interop [WIP] Attestation pool take 1: interop Sep 7, 2019
@mkalinin mkalinin mentioned this pull request Sep 8, 2019
11 tasks
mkalinin and others added 22 commits September 8, 2019 23:58
…con-chain-java into feature/attestation-pool
(i.e. lexicographically, from left to right, unsigned bytes).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants