Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(iroh): Gossip client #2258

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open

feat(iroh): Gossip client #2258

wants to merge 12 commits into from

Conversation

rklaehn
Copy link
Contributor

@rklaehn rklaehn commented May 1, 2024

Description

This makes gossip available in iroh. Using iroh-gossip directly, while not horrible, is a bit verbose.

Breaking Changes

Not sure. It exports a few more things, and adds things. But in theory it should not modify existing things.

Notes & open questions

  • There can be some scenarios where this can cause trouble. E.g. when subscribing and then unsubscribing to a topic that is also used for doc sync.
  • Gossip messages are deduped it seems. So if you send the exact same message twice it will be dropped. If that's how it work we must document it.
  • I see lots of errors like the following:
2024-05-01T13:16:20.430596Z ERROR sync{me=kkpbi4eb5h6jcdjz}:sync{me=kkpbi4eb5h6jcdjz}: iroh::sync_engine::gossip: Failed to process gossip event namespace=atqlwoptbmnd724j err=Serde Deserialization Error

It seems that the sync engine thinks that it "owns" gossip, so it uses subscribe_all and then will deserialize everything, even messages that are not for it. How can we avoid that? Using an entirely different ALPN for normal gossip?

  • What should be the behaviour when a subscriber lags? Close it or just send Lagged and continue?

I think that relying on guaranteed delivery for gossip is not a good idea in any case. so maybe just sending a Lagged but then continuing is best? Forcing the client to resubscribe could be a bit annoying.

Change checklist

  • Self-review.
  • Documentation updates if relevant.
  • Tests if relevant.
  • All breaking changes documented.

and make a fuew things public
@rklaehn rklaehn requested a review from Frando May 1, 2024 13:23
@rklaehn
Copy link
Contributor Author

rklaehn commented May 1, 2024

I think it is safe to just ignore any messages that are not for us. See b88398e

Obviously you can mess with things by subscribing to a namespace that is currently being synced, but doing so accidentally is highly unlikely. And you can also mess with other stuff by using the low level blobs api, so 🤷 .

@rklaehn
Copy link
Contributor Author

rklaehn commented May 1, 2024

It seems that due to the fact that the rpc protocol is public, we can not ever do an additional rpc request without breaking compat.

--- failure constructible_struct_adds_field: externally-constructible struct adds field ---

Description:
A pub struct constructible with a struct literal has a new pub field. Existing struct literals must be updated to include the new field.
        ref: https://doc.rust-lang.org/reference/expressions/struct-expr.html
       impl: https://github.com/obi1kenobi/cargo-semver-checks/tree/v0.31.0/src/lints/constructible_struct_adds_field.ron

Failed in:
  field Iroh.gossip in /home/runner/work/iroh/iroh/iroh/src/client.rs:44

--- failure enum_variant_added: enum variant added on exhaustive enum ---

Description:
A publicly-visible enum without #[non_exhaustive] has a new variant.
        ref: https://doc.rust-lang.org/cargo/reference/semver.html#enum-variant-new
       impl: https://github.com/obi1kenobi/cargo-semver-checks/tree/v0.31.0/src/lints/enum_variant_added.ron

@dignifiedquire
Copy link
Contributor

we need to mark the rpc enums as non exhaustive, that should solve this

@dignifiedquire
Copy link
Contributor

longterm I wish the rpc types where all private anyway

@dignifiedquire
Copy link
Contributor

It seems that the sync engine thinks that it "owns" gossip, so it uses subscribe_all

@Frando can we change this? seems unfortunate to subscribe to everything by default

@Frando
Copy link
Member

Frando commented May 3, 2024

It seems that the sync engine thinks that it "owns" gossip, so it uses subscribe_all

@Frando can we change this? seems unfortunate to subscribe to everything by default

I think we can, yes. What is our primitive of choice these days to merge a changing list of streams?

with futures and thus its merge combinators phased out in iroh - maybe https://docs.rs/futures-concurrency/latest/futures_concurrency/stream/trait.Merge.html ?

@dignifiedquire
Copy link
Contributor

@Frando
Copy link
Member

Frando commented May 3, 2024

I did not yet give the new gossip dispatcher module a real review, but my first thought is: What about we move the gossip_dispatcher from iroh to iroh-gossip? And make it the main interface. Because then we don't need broadcast channels at all anymore. And save one channel with its buffers. The sync engine would then use the gossip dispatcher, as would the RPC handlers. And if we get things right, less code - because the sync engine also dances around the different state (but your dispatcher has the nicer impl with the state enum).

@Frando
Copy link
Member

Frando commented May 6, 2024

#2265 changes the sync engine usage of gossip to not use subscribe_all (based on this branch).
However as outlined in the previous comment I think I'd prefer to move the new gossip_dispatcher module to iroh_gossip, and make both the node's gossip client and the node's sync engine use that dispatcher.

@ramfox ramfox added this to the v0.18.0 milestone Jun 3, 2024
@rklaehn
Copy link
Contributor Author

rklaehn commented Jun 5, 2024

@Frando so the basic idea of the gossip dispatcher is just to have a thing that wraps the gossip and does some management so that it is less fragile to use. It does the join/unjoin etc. for you and deals with laggy subscribers.

If you agree with / can live with the public interface (which is just the subscribe function), we could move something with that signature into iroh-gossip as a higher level interface and then I would use it here, and you could update docs to use it (or not...).

The interface is basically this:

pub fn subscribe(
    &self,
    msg: GossipSubscribeRequest,
    updates: UpdateStream,
) -> impl Stream<Item = RpcResult<GossipSubscribeResponse>> {

where UpdateStream is just a stream of either broadcast or broadcast_neighbours:

/// Send a gossip message
#[derive(Serialize, Deserialize, Debug)]
pub enum GossipSubscribeUpdate {
    /// Broadcast a message to all nodes in the swarm
    Broadcast(Bytes),
    /// Broadcast a message to all direct neighbors
    BroadcastNeighbors(Bytes),
}

There is a hardcoded constant somewhere about how much you are allowed to lag before you are thrown out. We could make that configurable as well.

@Frando
Copy link
Member

Frando commented Jun 5, 2024

I think the API is fine. And still very much in favor of having only one and not 2 high level dispatch interfaces to gossip.

@rklaehn
Copy link
Contributor Author

rklaehn commented Jun 5, 2024

Ok, I will clean up the dispatcher a bit and make sure it is independent of the rpc protocol, in preparation for moving it into iroh-gossip.

@rklaehn
Copy link
Contributor Author

rklaehn commented Jun 5, 2024

Moved the dispatcher to iroh-gossip and moved the types into the dispatcher file.

I reexported some of the dispatcher types instead of making newtypes for them in the rpc protocol. Not 100% sure about this, but making so many single element newtypes is a drag...

One question is if the dispatcher should present a simplified interface for creation. Currently it takes a Gossip. But I don't really see how this can be made really nice. Take an endpoint and an options object?

# Conflicts:
#	Cargo.lock
#	iroh-docs/src/engine/gossip.rs
#	iroh-gossip/Cargo.toml
#	iroh/src/client.rs
#	iroh/src/node.rs
#	iroh/src/node/builder.rs
#	iroh/src/rpc_protocol.rs
@rklaehn rklaehn marked this pull request as ready for review June 6, 2024 09:11
@rklaehn
Copy link
Contributor Author

rklaehn commented Jun 6, 2024

This is missing adapting the docs stuff to the new gossip dispatcher. Other than that I think it is good to go.

///
/// Basically just flattens the two stages of joining into one.
async fn join(gossip: Gossip, topic: TopicId, bootstrap: Vec<NodeId>) -> anyhow::Result<()> {
tracing::error!("Joining gossip topic {:?}", topic);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove / downgrade to trace (plus a few more in here)

/// This should not fail unless the gossip instance is faulty.
async fn dispatch_loop(self) -> anyhow::Result<()> {
use futures_lite::stream::StreamExt;
let stream = self.gossip.clone().subscribe_all();
Copy link
Member

@Frando Frando Jun 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So subscribe_all sends a message into net.rs, which has an actor loop task of itself, and in there then subscribes to a broadcast channel on which all events are emitted. If we say that the dispatcher is the main interface we use to access gossip, we could remove all the channeling from net.rs alltogether and just expose a single, non-broadcast stream, which is then processed here directly. This is what I meant originally by "moving the dispatcher into gossip": Have only one place where multiple channels for subscriptions are supported.

However I'm also fine with merging this as-is and then potentially revisit in followups, as this has no consequence on the client api for iroh-gossip. We'd have to check in with current users maybe, especially deltachat, which API they prefer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I did not look into the details of gossip that much so far. Just treated it as a black box. I guess once the dispatcher is moved to gossip we can optimize things. The important thing for me (and the reason for this PR in the first place) is to have a more high level way to use gossip.

verbose,
} => {
let bootstrap = bootstrap.into_iter().collect();
let topic = blake3::hash(topic.as_ref()).into();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should document this somewhere (CLI help maybe) that the passed-in topic will be blake3 hashed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure. Maybe we should just allow people to specify the exact gossip topic instead of hiding it. I was just too lazy to add a new command line arg.

Change the RPC to contain an actual topic-id, and have alternative CLI options topic: String (gets hashed on the client side) and --topic_raw: TopicId ?

/// Set of bootstrap nodes we are using.
bootstrap: BTreeSet<NodeId>,
/// The task that is driving the join future.
#[allow(dead_code)]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why allow(dead_code)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's an AbortingJoinHandle, so it only exists for Drop. But somehow rust does not consider that during dead code analysis. Any way to make it aware?

@@ -184,9 +184,6 @@ mod tests {
// for testing a mapping is simply an ip, port pair
type M = (Ipv4Addr, NonZeroU16);

const TEST_PORT: NonZeroU16 = // SAFETY: it's clearly non zero
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unrelated change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Somehow I had dead code warnings without it 🤷

where
C: ServiceConnection<RpcService>,
{
/// Subscribe to a gossip topic.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add some more docs on the return value (sink and stream).

@Frando
Copy link
Member

Frando commented Jun 7, 2024

Also at least a smoke test for the gossip client API would be good so that we don't break things.

- remove some error level loggings
- more docs for subscribe
- add subscribe opts
- allow passing in raw topic
github-merge-queue bot pushed a commit that referenced this pull request Jun 7, 2024
…and use ref-cast to eliminate them entirely (#2350)

## Description

With 4 different clients, the current approach might be OK. But we are
going to have more. E.g. gossip, see
#2258 .

And in any case it feels weird to store the same thing multiple times.

So this replaces public fields in the iroh client with accessors and use
ref-cast to eliminate them entirely. There is now only one rpc field,
and you can switch from that to the different subsystem client views
without runtime overhead, not even an arc clone.

## Breaking Changes

Everything that uses the iroh client will have to switch from field
accesses to accessor fns.

## Notes & open questions

<!-- Any notes, remarks or open questions you have to make about the PR.
-->

## Change checklist

- [ ] Self-review.
- [ ] Documentation updates if relevant.
- [ ] Tests if relevant.
- [ ] All breaking changes documented.
@dignifiedquire dignifiedquire modified the milestones: v0.18.0, v0.19.0 Jun 7, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: 🏗 In progress
Development

Successfully merging this pull request may close these issues.

None yet

4 participants