Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add own abstraction layer over tokio::time::Interval #2318

Open
wants to merge 8 commits into
base: styppo/spawn
Choose a base branch
from

Conversation

sisou
Copy link
Member

@sisou sisou commented Mar 16, 2024

Using gloo-timers for WASM. This change gets rid of the unmaintained wasm-timer crate, of which we were already using a fork with a fix from me anyway.

The resulting abstraction layer is very thin and since it exposes the exact same interface for both native and WASM targets, chunks of code that concerned themselves with the differentiation can be removed.

Pull request checklist

  • All tests pass. The project builds and runs.
  • I have resolved any merge conflicts.
  • I have resolved all clippy and rustfmt warnings.

@sisou sisou added refactor Refactoring of some component WASM labels Mar 16, 2024
@sisou sisou requested review from styppo and hrxi March 16, 2024 16:48
@sisou sisou self-assigned this Mar 16, 2024
@sisou
Copy link
Member Author

sisou commented Mar 17, 2024

I looked at the failing test it_can_limit_requests_rate: Before my force-push, which fixed most intervals to the previous behavior of having their first tick only after the first duration, it failed at the first limit reset already with a ConnectionClosed error. After, it only fails at the third limit reset. However, it's still not failing with limit-reset related problem, but because of a disconnecting peer.

I did not yet investigate the other failing test it_can_reset_requests_rate_with_reconnections.

@styppo styppo force-pushed the sisou/nimiq-time branch 3 times, most recently from ea2ef23 to c658661 Compare April 10, 2024 20:04
@sisou
Copy link
Member Author

sisou commented Apr 11, 2024

@styppo you are ignoring two request-response tests in your commits here. Is there no way to enable them again? Or will you create a tracking issue for them?

Copy link
Contributor

@hrxi hrxi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found a small bug in test code. Plus, as @sisou mentions, tests seem to be ignored.

@@ -300,7 +296,7 @@ async fn it_can_aggregate() {
}

loop {
match tokio::time::timeout_at(deadline, aggregation.next()).await {
match nimiq_time::timeout(timeout, aggregation.next()).await {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This changes the timeout logic from 300 ms * contributors / 7 for the entire thing to 300 ms * contributors / 7 per loop.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean the variable was copied wrong (deadline => timeout), or the new timeout method behaves differently than timeout_at did?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I mean neither.

Previously, the timeout_at function made sure that the entire loop did not take longer than timeout_ms. Now, the usage of the timeout function only makes sure that each loop iteration takes no longer than timeout.

I.e. previously the whole loop was limited in time, now each iteration can take as long as the whole loop previously.

Not sure if that made it clearer…

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I understand now.

OK, so since we don't have a timeout_at anymore, we need to make the timeout duration dynamic. Calculate it from the difference of deadline - <now>. Would that be right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that would work.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, check out a3fb606

match result {
Err(_) => Err(RequestError::OutboundRequest(
OutboundRequestError::SenderFutureDropped,
)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a warn! here so the condition could be more easily noticed in logs?

use gloo_timers::future::{IntervalStream, TimeoutFuture};
use send_wrapper::SendWrapper;

pub type Interval = SendWrapper<IntervalStream>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the send_wrapper docs:

This Rust library implements a wrapper type called SendWrapper which allows you to move around non-Send types between threads, as long as you access the contained value only from within the original thread.

This means we're locking our wasm implementation to one thread. I guess that's okay.

assert!(
period.as_millis() <= u32::MAX as u128,
"Period as millis must fit in u32"
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A comment why this is done would be good. I assume it's due to the restriction on gloo's sleep function:

This function will panic if the specified Duration cannot be casted into a u32 in milliseconds.

@@ -66,7 +66,9 @@ pub async fn it_can_initialize_with_mock_network() {
}),
);

spawn_local(consensus);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was this removed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the NodeJS test runner was not exiting after the test completed successfully, because of pending timeouts or intervals. We tried to work around it by not spawning the consensus, but only polling it once (the added lines after this removed line), but that didn't work really either.

I have now changed the WASM tests to run in headless Chrome and there the test runner exits correctly, so I have added this back in.

styppo and others added 7 commits May 7, 2024 23:25
- Add a utility function `spawn` which calls `wasm_bindgen_futures::spawn_local`
  for WASM targets and `tokio::spawn` otherwise.
- Remove `TaskExecutor`.
* Remove most references to `tokio::time`
* Remove `wasm_timer`
As the default NodeJS runtime fails to exit with pending timeouts or intervals, even after the tests all succeeded.
@sisou sisou changed the base branch from albatross to styppo/spawn May 8, 2024 22:22
@@ -17,6 +17,7 @@ workspace = true
async-trait = "0.1"
futures = { package = "futures-util", version = "0.3", features = ["sink"] }
log = { package = "tracing", version = "0.1", features = ["log"] }
instant = { version = "0.1", features = [ "wasm-bindgen" ] }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd add the wasm-bindgen feature as a feature of this crate:

[features]
wasm-bindgen = ["instant/wasm-bindgen"]

Comment on lines +1655 to +1657
// let timeout = Req::TIME_WINDOW.mul_f32(1.5f32);
// let future = response_rx.map_err(std::io::Error::other);
//match wasm_timer::TryFutureExt::timeout(future, timeout).await {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// let timeout = Req::TIME_WINDOW.mul_f32(1.5f32);
// let future = response_rx.map_err(std::io::Error::other);
//match wasm_timer::TryFutureExt::timeout(future, timeout).await {


nimiq-bls = { workspace = true }
nimiq-macros = { workspace = true }
nimiq-network-interface = { workspace = true }
nimiq-primitives = { workspace = true, features = ["policy"] }
nimiq-hash = { workspace = true }
nimiq-serde = { workspace = true }
nimiq-time = { workspace = true }
nimiq-utils = { workspace = true, features = [
"tagged-signing",
"libp2p",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we could remove the tokio-time feature.

@@ -605,6 +604,7 @@ async fn send_n_request_to_fail(net1: &Arc<Network>, net2: &Arc<Network>, n: u32

#[cfg(feature = "tokio-time")]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this still needed?

@@ -727,6 +727,7 @@ async fn it_can_limit_requests_rate_after_reconnection() {

#[cfg(feature = "tokio-time")]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this still needed?


#[cfg(target_family = "wasm")]
pub use gloo::*;
#[cfg(not(target_family = "wasm"))]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd do it differently by putting this into the Cargo.toml such that these dependencies get included only depending on the target. For this to work, the Cargo.toml should look like this:

[dependencies]
[target.'cfg(target_arch = "wasm32")'.dependencies]
gloo-timers = { version = "0.2", features = ["futures"], optional = true }
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
tokio = { version = "1.36", features = ["time"], optional = true }
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
tokio-stream = { version = "0.1", features = ["time"], optional = true }

Then the code here could just use the features tokio, tokio-stream and gloo for enabling code here.

@@ -21,6 +21,7 @@ workspace = true
async-trait = "0.1"
byteorder = "1.5"
futures = { package = "futures-util", version = "0.3" }
instant = { version = "0.1", features = [ "wasm-bindgen" ] }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd add a feature wasm-bindgen that selects this:

[features]
wasm-bindgen = ["instant/wasm-bindgen"]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
refactor Refactoring of some component WASM
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants