Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use broadcast channel for event listeners #8193

Merged
merged 44 commits into from May 22, 2024

Conversation

fgimenez
Copy link
Member

@fgimenez fgimenez commented May 10, 2024

EventListeners implements a multi producer multi consumer queue where each sent value is seen by all consumers.
To achieve this EventListeners allocates a std::Vec to be filled with tokio::sync::UnboundedSender every time EventListeners::new_listener is called.

As every value sent via EventListeners is cloned to each UnboundedReceiver and the channels are unbounded this is prone to unlimited memory growth and eventual OOM attacks.

To prevent this, in this PR tokio's tokio::sync::broadcast multi producer multi consumer queue is used instead.

For now the size of all the broadcast channels is set to 1000, would be good to measure how much is needed for each. Pending adding metrics as suggested in this comment #8193 (comment) will be done in a follow up.

@fgimenez fgimenez force-pushed the fgimenez/event-listeners-broadcast-channel branch from 80f3deb to 9d5d81d Compare May 10, 2024 15:44
@emhane emhane added the C-security Issue or pull request related to security. label May 10, 2024
@emhane
Copy link
Member

emhane commented May 10, 2024

check out these types we have

/// A wrapper type around [UnboundedSender](mpsc::UnboundedSender) that updates metrics on send.
#[derive(Debug)]
pub struct UnboundedMeteredSender<T> {
/// The [UnboundedSender](mpsc::UnboundedSender) that this wraps around
sender: mpsc::UnboundedSender<T>,
/// Holds metrics for this type
metrics: MeteredSenderMetrics,
}

/// A wrapper type around [Receiver](mpsc::UnboundedReceiver) that updates metrics on receive.
#[derive(Debug)]
pub struct UnboundedMeteredReceiver<T> {
/// The [Sender](mpsc::Sender) that this wraps around
receiver: mpsc::UnboundedReceiver<T>,
/// Holds metrics for this type
metrics: MeteredReceiverMetrics,
}

we use them so far only for the channel NetworkManager->TransactionsManager for incoming transaction gossip, but I'd like to see more of them in the codebase.

we have a panel for observing this channel
https://reth.paradigm.xyz/d/d47d679c-c3b8-40b6-852d-cbfaa2dcdb37/reth---transaction-pool?orgId=1&refresh=30s&viewPanel=95

@emhane emhane added the A-networking Related to networking in general label May 10, 2024
@fgimenez
Copy link
Member Author

fgimenez commented May 10, 2024

@emhane awesome thx! will check how to include something similar for the broadcast channels, the metrics can be very useful to assign the proper size to each

@fgimenez fgimenez force-pushed the fgimenez/event-listeners-broadcast-channel branch from a6f2112 to 16b61b9 Compare May 13, 2024 09:13
@fgimenez fgimenez changed the title WIP feat: use broadcast channel for event listeners feat: use broadcast channel for event listeners May 13, 2024
@fgimenez fgimenez marked this pull request as ready for review May 13, 2024 16:11
@fgimenez fgimenez force-pushed the fgimenez/event-listeners-broadcast-channel branch from a55828b to 06ff479 Compare May 13, 2024 16:58
Copy link
Member

@emhane emhane left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, you don't need a new BeaconEngineMessage to subscribe to the broadcast stream because you clone the sender and pass it to the handle, neat

Comment on lines 674 to 688
/// Transforms a stream of `Result<T, BroadcastStreamRecvError>` into a stream of `NodeEvent`,
/// applying a uniform error handling and conversion strategy.
pub fn handle_broadcast_stream<T>(
stream: impl Stream<Item = Result<T, BroadcastStreamRecvError>> + Unpin,
) -> impl Stream<Item = NodeEvent> + Unpin
where
T: Into<NodeEvent>,
{
stream.map(|result_event| {
result_event
.map(Into::into)
.unwrap_or_else(|err| NodeEvent::Other(format!("Stream error: {:?}", err)))
})
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about implementing FromIterator here, that will work I think

Copy link
Member

@Rjected Rjected May 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about implementing FromIterator here, that will work I think

the map is provided by streamext, and streams are not (sync) iterators, so I'm not sure Fromiterator is the right fit here

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rip, would be nice but it's sealed, hopefully soon ™️ in stable

crates/rpc/rpc-builder/tests/it/utils.rs Outdated Show resolved Hide resolved
crates/tokio-util/src/event_listeners.rs Outdated Show resolved Hide resolved
crates/tokio-util/src/event_listeners.rs Outdated Show resolved Hide resolved
crates/tokio-util/src/event_listeners.rs Outdated Show resolved Hide resolved
crates/net/network/src/transactions/mod.rs Outdated Show resolved Hide resolved
crates/net/network/src/transactions/mod.rs Outdated Show resolved Hide resolved
crates/net/network/src/network.rs Outdated Show resolved Hide resolved
crates/consensus/beacon/src/engine/handle.rs Outdated Show resolved Hide resolved
crates/tokio-util/src/event_listeners.rs Outdated Show resolved Hide resolved
@fgimenez fgimenez requested a review from rakita as a code owner May 20, 2024 17:17
@fgimenez fgimenez force-pushed the fgimenez/event-listeners-broadcast-channel branch from 5da034a to 77df31b Compare May 20, 2024 17:36
@fgimenez fgimenez requested a review from emhane May 21, 2024 08:26
Copy link
Collaborator

@mattsse mattsse left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall I think this great.
I don't think we'll have any issues with this for engine/pipeline events because those basically just for reporting and it's fine to drop some.

My main concern is the network transaction task which is more likely to drop messages but it relies on networkevents for peer tracking for example.
although 1k messages should be fine, I'd feel more comfortable if we could bump the default capacity to 2k and add a metric for when we lag in the tx task. maybe we should emit peer added/removed separately, but we should still proceed with this.

I'd also like a new function/stream variant that does not return results but rather skips the lag error, this would make the API easier in some places, ref

/// A Stream of [CanonStateNotification].
#[derive(Debug)]
#[pin_project::pin_project]
pub struct CanonStateNotificationStream {
#[pin]
st: BroadcastStream<CanonStateNotification>,
}

we could move this stream type to our tokio util crate

we also need this for the txpool channels which is mostlikely the most critical part because exposed over RPC.

@@ -197,7 +199,7 @@ pub struct TransactionsManager<Pool> {
/// Subscriptions to all network related events.
///
/// From which we get all new incoming transaction related messages.
network_events: UnboundedReceiverStream<NetworkEvent>,
network_events: BroadcastStream<NetworkEvent>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm slightly concerned about this, because now we're no longer guaranteed delivery of all network events which can result in wrong peer tracking, for example session closed, although 1000 messages should be sufficient

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think dropping NetworkEvent::SessionEstablished and NetworkEvent::PeerAdded is recoverable, but not sure if dropping NetworkEvent::SessionClosed and NetworkEvent::PeerRemoved can lead to memory leak. depends on if all data structures that are updated accordingly are bounded.

crates/tokio-util/src/event_listeners.rs Outdated Show resolved Hide resolved
@fgimenez fgimenez force-pushed the fgimenez/event-listeners-broadcast-channel branch from 4f7d39d to c400887 Compare May 22, 2024 07:16
crates/tokio-util/src/event_listeners.rs Outdated Show resolved Hide resolved
crates/net/network/src/network.rs Outdated Show resolved Hide resolved
crates/net/network/src/network.rs Outdated Show resolved Hide resolved
@fgimenez
Copy link
Member Author

I'd also like a new function/stream variant that does not return results but rather skips the lag error,

makes total sense, done ptal

@mattsse
Copy link
Collaborator

mattsse commented May 22, 2024

this is great!
broadcast is def better for this

@fgimenez fgimenez added this pull request to the merge queue May 22, 2024
Merged via the queue into main with commit d0386b8 May 22, 2024
30 checks passed
@fgimenez fgimenez deleted the fgimenez/event-listeners-broadcast-channel branch May 22, 2024 17:50
@fgimenez fgimenez restored the fgimenez/event-listeners-broadcast-channel branch May 22, 2024 18:42
@fgimenez fgimenez deleted the fgimenez/event-listeners-broadcast-channel branch May 22, 2024 18:44
Rjected pushed a commit that referenced this pull request May 23, 2024
Co-authored-by: Emilia Hane <elsaemiliaevahane@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-networking Related to networking in general C-security Issue or pull request related to security.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants