Skip to content

Commit

Permalink
Add own abstraction layer over tokio::time::Interval, using gloo-time…
Browse files Browse the repository at this point in the history
…rs for WASM
  • Loading branch information
sisou committed Mar 16, 2024
1 parent 26a5042 commit 634fb25
Show file tree
Hide file tree
Showing 13 changed files with 128 additions and 175 deletions.
146 changes: 66 additions & 80 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ members = [
"test-log",
"test-log/proc-macro",
"test-utils",
"time",
"tools",
"transaction-builder",
"utils",
Expand Down Expand Up @@ -148,7 +149,6 @@ opt-level = 2
ark-ec = { git = "https://github.com/paberr/algebra", branch = "pb/0.4" }
ark-ff = { git = "https://github.com/paberr/algebra", branch = "pb/0.4" }
ark-r1cs-std = { git = "https://github.com/paberr/r1cs-std", branch = "pb/fix-pedersen" }
wasm-timer = { git = "https://github.com/sisou/wasm-timer.git" }

[workspace.package]
version = "0.1.0"
Expand Down Expand Up @@ -208,6 +208,7 @@ nimiq-tendermint = { path = "tendermint", default-features = false }
nimiq-test-log = { path = "test-log", default-features = false }
nimiq-test-log-proc-macro = { path = "test-log/proc-macro", default-features = false }
nimiq-test-utils = { path = "test-utils", default-features = false }
nimiq-time = { path = "time", default-features = false }
nimiq-transaction = { path = "primitives/transaction", default-features = false }
nimiq-transaction-builder = { path = "transaction-builder", default-features = false }
nimiq-trie = { path = "primitives/trie", default-features = false }
Expand Down
2 changes: 1 addition & 1 deletion network-libp2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ tokio = { version = "1.36", features = ["macros", "rt", "tracing"] }
tokio-stream = "0.1"
unsigned-varint = "0.8"
void = "1.0"
wasm-timer = "0.2"

nimiq-bls = { workspace = true }
nimiq-macros = { workspace = true }
nimiq-network-interface = { workspace = true }
nimiq-primitives = { workspace = true, features = ["policy"] }
nimiq-hash = { workspace = true }
nimiq-serde = { workspace = true }
nimiq-time = { workspace = true }
nimiq-utils = { workspace = true, features = [
"tagged-signing",
"libp2p",
Expand Down
4 changes: 2 additions & 2 deletions network-libp2p/src/connection_pool/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ use libp2p::{
};
use nimiq_macros::store_waker;
use nimiq_network_interface::{network::CloseReason, peer_info::Services};
use nimiq_time::{interval, Interval};
use parking_lot::RwLock;
use rand::{seq::IteratorRandom, thread_rng};
use void::Void;
use wasm_timer::Interval;

use super::Error;
use crate::discovery::peer_contacts::PeerContactBook;
Expand Down Expand Up @@ -325,7 +325,7 @@ impl Behaviour {
desired_peer_count,
..Default::default()
};
let housekeeping_timer = wasm_timer::Interval::new(config.housekeeping_interval);
let housekeeping_timer = interval(config.housekeeping_interval);

Self {
contacts,
Expand Down
4 changes: 2 additions & 2 deletions network-libp2p/src/discovery/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ use libp2p::{
};
use nimiq_hash::Blake2bHash;
use nimiq_network_interface::peer_info::Services;
use nimiq_time::{interval, Interval};
use parking_lot::RwLock;
use wasm_timer::Interval;

use super::{
handler::{Handler, HandlerInEvent, HandlerOutEvent},
Expand Down Expand Up @@ -124,7 +124,7 @@ impl Behaviour {
keypair: Keypair,
peer_contact_book: Arc<RwLock<PeerContactBook>>,
) -> Self {
let house_keeping_timer = Interval::new(config.house_keeping_interval);
let house_keeping_timer = interval(config.house_keeping_interval);
peer_contact_book.write().update_own_contact(&keypair);

// Report our own known addresses as candidates to the swarm
Expand Down
7 changes: 3 additions & 4 deletions network-libp2p/src/discovery/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ use libp2p::{
use nimiq_hash::Blake2bHash;
use nimiq_network_interface::peer_info::Services;
use nimiq_serde::DeserializeError;
use nimiq_time::{interval, Interval};
use nimiq_utils::tagged_signing::TaggedKeyPair;
use parking_lot::RwLock;
use rand::{seq::IteratorRandom, thread_rng};
use thiserror::Error;
use wasm_timer::Interval;

use super::{
behaviour::Config,
Expand Down Expand Up @@ -547,9 +547,8 @@ impl ConnectionHandler for Handler {
if update_interval < min_secs {
update_interval = min_secs;
}
self.periodic_update_interval = Some(Interval::new(
Duration::from_secs(update_interval),
));
self.periodic_update_interval =
Some(interval(Duration::from_secs(update_interval)));
}

// Switch to established state
Expand Down
85 changes: 3 additions & 82 deletions network-libp2p/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,18 @@ use nimiq_network_interface::{
};
use nimiq_primitives::task_executor::TaskExecutor;
use nimiq_serde::{Deserialize, DeserializeError, Serialize};
use nimiq_time::{interval, Interval};
use nimiq_utils::tagged_signing::{TaggedKeyPair, TaggedSignable, TaggedSigned};
use nimiq_validator_network::validator_record::ValidatorRecord;
use parking_lot::{Mutex, RwLock};
use thiserror::Error;
#[cfg(feature = "tokio-time")]
use tokio::time::{Instant, Interval};
use tokio::time::Instant;
use tokio::{
sync::{broadcast, mpsc, oneshot},
time,
};
use tokio_stream::wrappers::{BroadcastStream, ReceiverStream};
#[cfg(not(feature = "tokio-time"))]
use wasm_timer::Interval;

#[cfg(feature = "metrics")]
use crate::network_metrics::NetworkMetrics;
Expand Down Expand Up @@ -383,10 +382,7 @@ impl Network {
let peer_request_limits = Arc::new(Mutex::new(HashMap::new()));
let rate_limits_pending_deletion = Arc::new(Mutex::new(PendingDeletion::default()));

#[cfg(not(feature = "tokio-time"))]
let update_scores = wasm_timer::Interval::new(params.decay_interval);
#[cfg(feature = "tokio-time")]
let update_scores = tokio::time::interval(params.decay_interval);
let update_scores = interval(params.decay_interval);

#[cfg(feature = "metrics")]
let metrics = Arc::new(NetworkMetrics::default());
Expand Down Expand Up @@ -539,81 +535,6 @@ impl Network {
&self.local_peer_id
}

#[cfg(feature = "tokio-time")]
async fn swarm_task(
mut swarm: NimiqSwarm,
events_tx: broadcast::Sender<NetworkEvent<PeerId>>,
mut action_rx: mpsc::Receiver<NetworkAction>,
mut validate_rx: mpsc::UnboundedReceiver<ValidateMessage<PeerId>>,
connected_peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
peer_request_limits: Arc<Mutex<HashMap<PeerId, HashMap<u16, RateLimit>>>>,
rate_limits_pending_deletion: Arc<Mutex<PendingDeletion>>,
mut update_scores: Interval,
contacts: Arc<RwLock<PeerContactBook>>,
force_dht_server_mode: bool,
dht_quorum: NonZeroU8,
#[cfg(feature = "metrics")] metrics: Arc<NetworkMetrics>,
) {
let mut task_state = TaskState {
dht_server_mode: force_dht_server_mode,
dht_quorum: dht_quorum.into(),
..Default::default()
};

let peer_id = Swarm::local_peer_id(&swarm);
let task_span = trace_span!("swarm task", peer_id=?peer_id);

async move {
loop {
tokio::select! {
validate_msg = validate_rx.recv() => {
if let Some(validate_msg) = validate_msg {
let topic = validate_msg.topic;
let result: Result<bool, gossipsub::PublishError> = swarm
.behaviour_mut()
.gossipsub
.report_message_validation_result(
&validate_msg.pubsub_id.message_id,
&validate_msg.pubsub_id.propagation_source,
validate_msg.acceptance,
);

match result {
Ok(true) => {}, // success
Ok(false) => debug!(topic, "Validation took too long: message is no longer in the message cache"),
Err(e) => error!(topic, error = %e, "Network error while relaying message"),
}
}
},
event = swarm.next() => {
if let Some(event) = event {
Self::handle_event(event, &events_tx, &mut swarm, &mut task_state, &connected_peers, Arc::clone(&peer_request_limits), Arc::clone(&rate_limits_pending_deletion), #[cfg( feature = "metrics")] &metrics);
}
},
action = action_rx.recv() => {
if let Some(action) = action {
Self::perform_action(action, &mut swarm, &mut task_state);
}
else {
// `action_rx.next()` will return `None` if all senders (i.e. the `Network` object) are dropped.
break;
}
},
_ = update_scores.tick() => {
swarm.behaviour().update_scores(Arc::clone(&contacts));
},
};
}
}
.instrument(task_span)
.await
}

// This is a duplicate of the previous function.
// This is because these two functions use different implementation for the handling of intervals,
// And the tokio version (needed for some test) could not be reconciled with the non tokio version
// Essentially the tokio select macro is not compatible with the condition compilation flag
#[cfg(not(feature = "tokio-time"))]
async fn swarm_task(
mut swarm: NimiqSwarm,
events_tx: broadcast::Sender<NetworkEvent<PeerId>>,
Expand Down
20 changes: 20 additions & 0 deletions time/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[package]
name = "nimiq-time"
version.workspace = true
authors.workspace = true
license.workspace = true
edition.workspace = true
description = "tokio::time for Rust and WASM"
homepage.workspace = true
repository.workspace = true
categories.workspace = true
keywords.workspace = true

[lints]
workspace = true

[dependencies]
gloo-timers = { version = "0.3.0", features = ["futures"] }
send_wrapper = { version = "0.6.0", features = ["futures"] }
tokio = { version = "1.36", features = ["time"] }
tokio-stream = { version = "0.1", features = ["sync"] }
12 changes: 12 additions & 0 deletions time/src/gloo.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
use std::convert::TryInto;

use gloo_timers::future::IntervalStream;
use send_wrapper::SendWrapper;

pub type Interval = SendWrapper<IntervalStream>;

pub fn interval(duration: std::time::Duration) -> Interval {
SendWrapper::new(IntervalStream::new(
duration.as_millis().try_into().unwrap(),
))
}
9 changes: 9 additions & 0 deletions time/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#[cfg(target_family = "wasm")]
mod gloo;
#[cfg(not(target_family = "wasm"))]
mod tokio;

#[cfg(target_family = "wasm")]
pub use gloo::*;
#[cfg(not(target_family = "wasm"))]
pub use tokio::*;
6 changes: 6 additions & 0 deletions time/src/tokio.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
use tokio::time::interval as tokio_interval;
pub use tokio_stream::wrappers::IntervalStream as Interval;

pub fn interval(duration: std::time::Duration) -> Interval {
Interval::new(tokio_interval(duration))
}
2 changes: 1 addition & 1 deletion web-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ crate-type = ["cdylib"]
[dependencies]
futures = "0.3"
futures-util = "0.3"
gloo-timers = { version = "0.3.0", features = ["futures"] }
hex = "0.4"
js-sys = "0.3"
log = { package = "tracing", version = "0.1", features = ["log"] }
Expand All @@ -33,7 +34,6 @@ tsify = { git = "https://github.com/sisou/tsify", branch = "sisou/comments", def
wasm-bindgen = "0.2"
wasm-bindgen-futures = "0.4"
wasm-bindgen-derive = { version = "0.2", optional = true }
wasm-timer = "0.2"
web-sys = { version = "0.3.69", features = ["MessageEvent"]}

nimiq-account = { workspace = true, default-features = false }
Expand Down
3 changes: 1 addition & 2 deletions web-client/src/client/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use std::{
},
rc::Rc,
str::FromStr,
time::Duration,
};

use futures::{
Expand Down Expand Up @@ -554,7 +553,7 @@ impl Client {
// Actually send the transaction
consensus.send_transaction(tx.native()).await?;

let timeout = wasm_timer::Delay::new(Duration::from_secs(10));
let timeout = gloo_timers::future::TimeoutFuture::new(10_000);

// Wait for the transaction (will be None if the timeout is reached first)
let res = select(receiver, timeout).await;
Expand Down

0 comments on commit 634fb25

Please sign in to comment.