Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpc: add full support reconnecting rpc client #1505

Merged
merged 78 commits into from May 8, 2024

Conversation

niklasad1
Copy link
Member

@niklasad1 niklasad1 commented Mar 27, 2024

This PR improves the reconnecting rpc client so that it works with the unstable RPC backend, which requires re-starting chainhead_follow along with other stuff that is required to update the state when a new connection is established.

Close #1423

Some notes about the PR:

  • RPC calls that are not part of "Backend API" such as "LegacyRpcMethods", are not "re-tried," and it's up to the user to handle such things. subxt now exposes two util methods: retry, retry_stream to retry calls, and subscriptions if one wants to retry such calls if the connection was lost.
  • subxt is responsible for retrying RPC calls, and the rpc-reconnecting-client "only reconnects"
  • Resubscribing is handled by subxt on re-connections, but subscriptions with side effects such as author_submitAndWatch are not retried, and it's up to the caller to handle such things.
  • When re-subscribing to blocks during a reconnection, it's possible that a few blocks may be missed. In such scenarios, subxt will emit an error on the stream itself.

subxt/src/backend/utils.rs Outdated Show resolved Hide resolved
subxt/src/backend/mod.rs Outdated Show resolved Hide resolved
}
}

impl<Hash: BlockHash> Stream for FollowStreamFinalizedHeads<Hash> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good to me!As a nit I'd be tempted to put it in a different file like the stream in storage_items.rs is (or put some of this stuff into a utils file or something) since it's not really a part of the follow stream driver code

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yepp, the thingy was that the FollowStreamSubscription wasn't exported outside this and I placed here to avoid sharing it with pub crate.

I can move it if it ok to make it pub crate :)

Comment on lines 80 to 81
// Track the last block to determine how many blocks that were missed when reconnecting.
let mut last_seen_block = None;
Copy link
Collaborator

@jsdw jsdw May 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Being just an example, I wonder if it's worth this extra complexity to log missing block info vs just printing block details and continuing if disconnected_will_reconnect error :)

I don't mind much either way though!

Comment on lines 56 to 59
let backend: LegacyBackend<PolkadotConfig> =
LegacyBackend::builder().build(RpcClient::new(rpc.clone()));

let api: OnlineClient<PolkadotConfig> = OnlineClient::from_backend(Arc::new(backend)).await?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just as a note: in the "default" case, one can also just to something like OnlineClient::from_rpc(rpc_client), which might be an easier thing to point them at in an example.

(And perhaps they can then figure out how to use custom backends in a separate example)

F: FnMut() -> ResubscribeFuture<R> + Send + 'static + Clone,
R: Send + 'static,
{
loop {
Copy link
Collaborator

@jsdw jsdw May 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a shame that we are sortof duplicating the try logic in an outer and then inner loop here, I think just to return the initial error in the Result before we are inside a stream and errors come in the streams.

I wonder if it makes sense to change the signature of this fn to return -> StreamOfResults<R> (ie no outer Result) and then the body can be simpler. This sortof makes sense to me anyway because it's a bit odd that we would care about catching the initial future failing but then be happy with any other errors coming through the stream instead.

eg:

// future that will resubscribe to the stream, internally retrying 
// if any DisconnectedWillReconnect (I wonder whether `retry` could be 
// used to make this bit even shorter too..)
let resubscribe = Box::new(move || {...});

// The extra Box is to encapsulate the retry subscription type
return Ok(StreamOf::new(Box::pin(RetrySubscription {
    state: PendingOrStream::Stream(v),
    resubscribe,
})));

Copy link
Collaborator

@jsdw jsdw May 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe best to leave as is for this PR though because changing it above might hjave knock-on effects on the signatures of backend fns or whatever. Just something to ponder!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you will be happier now after I removed the !DRY stuff

@@ -431,6 +506,11 @@ impl<T: Config> Stream for StorageFetchDescendantKeysStream<T> {
return Poll::Ready(Some(Ok(keys)));
}
Err(e) => {
if e.is_disconnected_will_reconnect() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps worth a comment, because this is doing something a bit weird: if a future returns an Err(DisconnectedWillReconnect), we expect to keep polling it and it will internally retry itself :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

true, changed it to continue the loop instead

/// A stream that subscribes to finalized blocks
/// and indicates whether a block was missed if was restarted.
#[derive(Debug)]
pub struct FollowStreamFinalizedHeads<Hash: BlockHash, F> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where's the type that this is using that stops it from being able to move out? I think I'm missing it :D

/// and indicates whether a block was missed if was restarted.
#[derive(Debug)]
pub struct FollowStreamFinalizedHeads<Hash: BlockHash, F> {
stream: FollowStreamDriverSubscription<Hash>,
Copy link
Member Author

@niklasad1 niklasad1 May 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This type @jsdw ☝️

I could make it generic but I prefer not to :)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that struct is pub in this file though, so we could import it from a separate file OK I think? :D

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's a pita because the test helpers are not accessible outside this module :(

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see! either way it's all good and we can always shuffle stuff about in the future or whatever anyways!

subxt/src/backend/utils.rs Outdated Show resolved Hide resolved
Copy link
Collaborator

@jsdw jsdw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, nice job @niklasad1!

I left some nits and suggestions but nothing that would block it merging :)

subxt/src/backend/utils.rs Outdated Show resolved Hide resolved
@niklasad1 niklasad1 merged commit bec896d into master May 8, 2024
13 checks passed
@niklasad1 niklasad1 deleted the na-reconnecting-rpc-client-v2 branch May 8, 2024 13:12
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

reconnecting rpc client
4 participants