Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FollowStreamDriverSubscription's will lead to events being collected in unbounded VecDeque until polled. #1443

Open
jsdw opened this issue Feb 23, 2024 · 1 comment
Labels
good first issue Small, well scoped, simple; good for newcomers

Comments

@jsdw
Copy link
Collaborator

jsdw commented Feb 23, 2024

This is an issue raised by the auditors.

Simply put, if a user is using the UnstableBackend, then there are Backend calls which create FollowStreamDriverSubscription's. These subscriptions contain a queue of all of the un-consumed events received from the chain. When the FollowStreamDriver is polled (which would often handle in the background), it will continue to receive events from the backend and add them to the queues for any active subscriptions. So, if these subscriptions aren't polled, they will store an ever-growing list of events waiting to be consumed through polling.

The user is expected to poll FollowStreamDriver (actually UnstableBackendDriver, which is the thing they get back when creating an UnstableBackend and contains it) more slowly if they are struggling to keep up, which would enforce backpressure and slow down the rate at which events are obtained from the chain.

To help bound memory usage a little better, we could also consider adding a configuration option to UnstableBackendBuilder like fn max_event_buffer_per_subscription(self, size: usize) -> Self to bound the number of events that can be queued up on any given subscription before it's shut down and cleaned up. We could also consider setting an arbitrary default, like 1024 events, to give breathing room but prevent unlimited growth when nothing is being polled except theUnstableBackendDriver

@jsdw
Copy link
Collaborator Author

jsdw commented Apr 16, 2024

More detail on how I'd look to handle this offhand (all in follow_stream_driver.rs):

  • Add something like max_queued_messages: usize to SharedState
  • Make SubscriberDetails.items be eg enum QueuedMessages { Overflowed, Messages(VecDeque) }. If we try to push more messages than max_queued_messages then set it to Overflowed.
  • Make impl<Hash: BlockHash> Stream for FollowStreamDriverSubscription<Hash>'s Item be a Result so we can return an error.
    • Now, return a suitable error if we try reading from shared state and see QueuedMessages::Overflowed and mark as done.
    • Also read the max_queued_messages prop at same time as asking for items, and if local_messages.len() + new_iems.len() > max_queued_messages then mark as done and return same error as above.

Because we return a Result now, we'll need to modify the UnstableBackend impl to accomodate this (hopefully should be straightforward!).

@jsdw jsdw added the good first issue Small, well scoped, simple; good for newcomers label Apr 16, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
good first issue Small, well scoped, simple; good for newcomers
Projects
None yet
Development

No branches or pull requests

1 participant