Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add own abstraction layer over tokio::time::Interval #2318

Merged
merged 9 commits into from
Jun 3, 2024
156 changes: 75 additions & 81 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ members = [
"test-log",
"test-log/proc-macro",
"test-utils",
"time",
"tools",
"transaction-builder",
"utils",
Expand Down Expand Up @@ -148,7 +149,6 @@ opt-level = 2
ark-ec = { git = "https://github.com/paberr/algebra", branch = "pb/0.4" }
ark-ff = { git = "https://github.com/paberr/algebra", branch = "pb/0.4" }
ark-r1cs-std = { git = "https://github.com/paberr/r1cs-std", branch = "pb/fix-pedersen" }
wasm-timer = { git = "https://github.com/sisou/wasm-timer.git" }

[workspace.package]
version = "0.1.0"
Expand Down Expand Up @@ -214,6 +214,7 @@ nimiq-tendermint = { path = "tendermint", default-features = false }
nimiq-test-log = { path = "test-log", default-features = false }
nimiq-test-log-proc-macro = { path = "test-log/proc-macro", default-features = false }
nimiq-test-utils = { path = "test-utils", default-features = false }
nimiq-time = { path = "time", default-features = false }
nimiq-transaction = { path = "primitives/transaction", default-features = false }
nimiq-transaction-builder = { path = "transaction-builder", default-features = false }
nimiq-trie = { path = "primitives/trie", default-features = false }
Expand Down
3 changes: 1 addition & 2 deletions client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@ use std::time::Duration;
use log::info;
use nimiq::prover::prover_main;
pub use nimiq::{
client::{Client, Consensus},
client::Client,
config::{command_line::CommandLine, config::ClientConfig, config_file::ConfigFile},
error::Error,
extras::{
deadlock::initialize_deadlock_detection,
logging::{initialize_logging, log_error_cause_chain},
metrics_server::NimiqTaskMonitor,
panic::initialize_panic_reporting,
Expand Down
2 changes: 1 addition & 1 deletion consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ serde = "1.0"
thiserror = "1.0"
tokio = { version = "1.37", features = ["rt", "sync", "time"] }
tokio-stream = { version = "0.1", features = ["sync"] }
wasm-timer = "0.2"

nimiq-account = { workspace = true, default-features = false }
nimiq-block = { workspace = true }
Expand All @@ -47,6 +46,7 @@ nimiq-mmr = { workspace = true }
nimiq-network-interface = { workspace = true }
nimiq-primitives = { workspace = true, features = ["policy", "trie"] }
nimiq-serde = { workspace = true }
nimiq-time = { workspace = true }
nimiq-transaction = { workspace = true }
nimiq-utils = { workspace = true, features = [
"math",
Expand Down
39 changes: 4 additions & 35 deletions consensus/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ use tokio::sync::{
},
oneshot::{error::RecvError, Sender as OneshotSender},
};
#[cfg(not(target_family = "wasm"))]
use tokio::time::{sleep, Sleep};
use tokio_stream::wrappers::BroadcastStream;

use self::consensus_proxy::ConsensusProxy;
Expand Down Expand Up @@ -128,10 +126,6 @@ pub struct Consensus<N: Network> {

pub sync: SyncerProxy<N>,

/// A Delay which exists purely for the waker on its poll to reactivate the task running Consensus::poll
/// FIXME Remove this
#[cfg(not(target_family = "wasm"))]
next_execution_timer: Option<Pin<Box<Sleep>>>,
events: BroadcastSender<ConsensusEvent>,
established_flag: Arc<AtomicBool>,
head_requests: Option<HeadRequests<N>>,
Expand Down Expand Up @@ -168,13 +162,6 @@ impl<N: Network> Consensus<N> {
/// established state and to advance the chain.
const HEAD_REQUESTS_TIMEOUT: Duration = Duration::from_secs(5);

/// Timeout after which the consensus is polled after it ran last
///
/// TODO: Set appropriate duration
/// FIXME Remove this
#[cfg(not(target_family = "wasm"))]
const CONSENSUS_POLL_TIMER: Duration = Duration::from_secs(1);

pub fn from_network(
blockchain: BlockchainProxy,
network: Arc<N>,
Expand Down Expand Up @@ -210,16 +197,11 @@ impl<N: Network> Consensus<N> {

let established_flag = Arc::new(AtomicBool::new(false));

#[cfg(not(target_family = "wasm"))]
let timer = Box::pin(sleep(Self::CONSENSUS_POLL_TIMER));

Consensus {
blockchain,
network,
sync: syncer,
events: tx,
#[cfg(not(target_family = "wasm"))]
next_execution_timer: Some(timer),
established_flag,
head_requests: None,
head_requests_time: None,
Expand Down Expand Up @@ -428,7 +410,7 @@ impl<N: Network> Future for Consensus<N> {
type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// 1. Poll and advance block queue
// Poll and advance block queue
while let Poll::Ready(Some(event)) = self.sync.poll_next_unpin(cx) {
match event {
LiveSyncPushEvent::AcceptedAnnouncedBlock(_) => {
Expand Down Expand Up @@ -473,7 +455,7 @@ impl<N: Network> Future for Consensus<N> {
self.events.send(event).ok();
}

// 2. Poll any head requests if active.
// Poll any head requests if active.
if let Some(ref mut head_requests) = self.head_requests {
if let Poll::Ready(mut result) = head_requests.poll_unpin(cx) {
// Reset head requests.
Expand All @@ -491,27 +473,14 @@ impl<N: Network> Future for Consensus<N> {
}
}

// 3. Check if a ConsensusRequest was received
// Check if a ConsensusRequest was received
while let Poll::Ready(Some(request)) = self.requests.1.poll_recv(cx) {
match request {
ConsensusRequest::ResolveBlock(request) => self.resolve_block(request),
}
}

// 4. Update timer and poll it so the task gets woken when the timer runs out (at the latest)
// The timer itself running out (producing an Instant) is of no interest to the execution. This poll method
// was potentially awoken by the delays waker, but even then all there is to do is set up a new timer such
// that it will wake this task again after another time frame has elapsed. No interval was used as that
// would periodically wake the task even though it might have just executed
#[cfg(not(target_family = "wasm"))]
{
let mut timer = Box::pin(sleep(Self::CONSENSUS_POLL_TIMER));
// If the sleep wasn't pending anymore, it didn't register us with the waker, but we need that.
assert!(timer.poll_unpin(cx) == Poll::Pending);
self.next_execution_timer = Some(timer);
}

// 5. Advance consensus and catch-up through head requests.
// Advance consensus and catch-up through head requests.
self.request_heads();

Poll::Pending
Expand Down
7 changes: 4 additions & 3 deletions consensus/src/sync/live/diff_queue/diff_request_component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ use std::{sync::Arc, time::Duration};
use futures::future::BoxFuture;
use nimiq_network_interface::network::{Network, PubsubId};
use nimiq_primitives::{trie::trie_diff::TrieDiff, TreeProof};
use nimiq_time::sleep;
use parking_lot::RwLock;
use tokio::{sync::Semaphore, time};
use tokio::sync::Semaphore;

use super::{RequestTrieDiff, ResponseTrieDiff};
use crate::sync::{
Expand Down Expand Up @@ -72,7 +73,7 @@ impl<N: Network> DiffRequestComponent<N> {
Some(peer_id) => peer_id,
None => {
error!("couldn't fetch diff: no peers");
time::sleep(Duration::from_secs(5)).await;
sleep(Duration::from_secs(5)).await;
continue;
}
};
Expand Down Expand Up @@ -112,7 +113,7 @@ impl<N: Network> DiffRequestComponent<N> {
if num_tries >= max_tries {
error!(%num_tries, %max_tries, ?backoff_delay, "couldn't fetch diff: maximum tries reached");

time::sleep(backoff_delay).await;
sleep(backoff_delay).await;
backoff_delay = Duration::min(backoff_delay.mul_f32(2_f32), max_backoff);
num_tries = 0;
}
Expand Down
4 changes: 2 additions & 2 deletions consensus/src/sync/syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use nimiq_blockchain_interface::AbstractBlockchain;
use nimiq_blockchain_proxy::BlockchainProxy;
use nimiq_hash::Blake2bHash;
use nimiq_network_interface::network::{CloseReason, Network};
use wasm_timer::Interval;
use nimiq_time::{interval, Interval};

use crate::{consensus::ResolveBlockRequest, messages::RequestHead};

Expand Down Expand Up @@ -158,7 +158,7 @@ impl<N: Network, M: MacroSync<N::PeerId>, L: LiveSync<N>> Syncer<N, M, L> {
network,
outdated_peers: Default::default(),
incompatible_peers: Default::default(),
check_interval: Interval::new(Self::CHECK_INTERVAL),
check_interval: interval(Self::CHECK_INTERVAL),
pending_checks: Default::default(),
accepted_announcements: 0,
}
Expand Down
7 changes: 4 additions & 3 deletions consensus/tests/history_sync.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
mod sync_utils;

use std::{sync::Arc, time::Duration};

use futures::StreamExt;
Expand Down Expand Up @@ -29,12 +27,15 @@ use nimiq_test_utils::{
node::TESTING_BLS_CACHE_MAX_CAPACITY,
test_network::TestNetwork,
};
use nimiq_time::sleep;
use nimiq_utils::time::OffsetTime;
use nimiq_zkp_component::ZKPComponent;
use parking_lot::{Mutex, RwLock};

use crate::sync_utils::{sync_two_peers, SyncMode};

mod sync_utils;

#[test(tokio::test)]
async fn two_peers_can_sync_empty_chain() {
sync_two_peers(0, 0, SyncMode::History).await
Expand Down Expand Up @@ -272,7 +273,7 @@ async fn sync_ingredients() {
Network::connect_networks(&networks, 3u64).await;
// Then wait for connection to be established.
let _ = stream.next().await.unwrap();
tokio::time::sleep(Duration::from_secs(1)).await; // FIXME, Prof. Berrang told me to do this
sleep(Duration::from_secs(1)).await; // FIXME, Prof. Berrang told me to do this

// Test ingredients:
// Request macro chain, first request must return all epochs and one checkpoint.
Expand Down
8 changes: 5 additions & 3 deletions consensus/tests/request_component.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{sync::Arc, time::Duration};

use futures::StreamExt;
use nimiq_blockchain::BlockProducer;
use nimiq_blockchain_interface::AbstractBlockchain;
use nimiq_bls::KeyPair as BLSKeyPair;
Expand All @@ -14,6 +15,7 @@ use nimiq_test_utils::{
node::Node,
validator::seeded_rng,
};
use nimiq_time::{interval, sleep};

#[test(tokio::test(flavor = "multi_thread", worker_threads = 4))]
#[ignore]
Expand Down Expand Up @@ -56,13 +58,13 @@ async fn test_request_component() {
tokio::spawn(async move {
loop {
produce_macro_blocks(&producer1, &prod_blockchain, 1);
tokio::time::sleep(Duration::from_secs(5)).await;
sleep(Duration::from_secs(5)).await;
}
});
}

let mut connected = false;
let mut interval = tokio::time::interval(Duration::from_secs(1));
let mut interval = interval(Duration::from_secs(1));
loop {
if node1.blockchain.read().block_number() > 200 + Policy::genesis_block_number()
&& !connected
Expand All @@ -83,6 +85,6 @@ async fn test_request_component() {
node2.blockchain.read().head_hash()
);

interval.tick().await;
interval.next().await;
}
}
7 changes: 4 additions & 3 deletions handel/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ workspace = true
async-trait = "0.1"
futures = { workspace = true }
log = { workspace = true }
instant = { version = "0.1", features = ["wasm-bindgen"] }
parking_lot = "0.12"
rand = "0.8"
serde = "1.0"
Expand All @@ -28,13 +29,13 @@ nimiq-bls = { workspace = true }
nimiq-collections = { workspace = true }
nimiq-hash = { workspace = true }
nimiq-serde = { workspace = true }
nimiq-utils = { workspace = true, features = [
"math",
] }
nimiq-time = { workspace = true }
nimiq-utils = { workspace = true, features = ["math"] }

[dev-dependencies]
nimiq-network-interface = { workspace = true }
nimiq-network-mock = { workspace = true }
nimiq-test-log = { workspace = true }

tokio = { version = "1.37", features = ["rt", "time", "macros"] }
nimiq-utils = { workspace = true, features = ["math"] }
15 changes: 5 additions & 10 deletions handel/src/aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ use futures::{
ready, select,
stream::{BoxStream, Stream, StreamExt},
};
use tokio::time::{interval_at, Instant};
use tokio_stream::wrappers::IntervalStream;
use nimiq_time::{interval, Interval};

use crate::{
config::Config,
Expand Down Expand Up @@ -57,10 +56,10 @@ struct NextAggregation<
sender: LevelUpdateSender<N>,

/// Interval for starting the next level regardless of previous levels completion
start_level_interval: IntervalStream,
start_level_interval: Interval,

/// Interval for sending level updates to the corresponding peers regardless of progression
periodic_update_interval: IntervalStream,
periodic_update_interval: Interval,

/// the level which needs activation next
next_level_timeout: usize,
Expand Down Expand Up @@ -90,15 +89,11 @@ impl<

// Regardless of level completion consecutive levels need to be activated at some point. Activate Levels every time this interval ticks,
// if the level has not already been activated due to level completion
let start_level_interval =
IntervalStream::new(interval_at(Instant::now() + config.timeout, config.timeout));
let start_level_interval = interval(config.timeout);

// Every `config.update_interval` send Level updates to corresponding peers no matter the aggregations progression
// (makes sure other peers can catch up).
let periodic_update_interval = IntervalStream::new(interval_at(
Instant::now() + config.update_interval,
config.update_interval,
));
let periodic_update_interval = interval(config.update_interval);

// Create the NextAggregation struct
Self {
Expand Down
9 changes: 4 additions & 5 deletions handel/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,14 +159,13 @@ impl<TNetwork: Network + Unpin> Stream for LevelUpdateSender<TNetwork> {

#[cfg(test)]
mod test {
use std::{sync::Arc, task::Context};
use std::{sync::Arc, task::Context, time::Duration};

use futures::{FutureExt, StreamExt};
use nimiq_collections::BitSet;
use nimiq_test_log::test;
use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
use tokio::time;

use crate::{
contribution::{AggregatableContribution, ContributionError},
Expand Down Expand Up @@ -199,7 +198,7 @@ mod test {
) -> futures::future::BoxFuture<'static, ()> {
self.0.lock().push(msg);

async move { time::sleep(time::Duration::from_millis(100)).await }.boxed()
async move { nimiq_time::sleep(Duration::from_millis(100)).await }.boxed()
}
}

Expand Down Expand Up @@ -247,7 +246,7 @@ mod test {
// Clear the buffer so test starts from scratch
t.lock().clear();
// Needed because the send also sleeps
time::sleep(time::Duration::from_millis(110)).await;
nimiq_time::sleep(Duration::from_millis(110)).await;

assert_eq!(0, t.lock().len());
send(&mut sender, 0);
Expand All @@ -271,7 +270,7 @@ mod test {
assert_eq!(10, t.lock().len());

// Wait for the futures to resolve, imitating a delay
time::sleep(time::Duration::from_millis(150)).await;
nimiq_time::sleep(Duration::from_millis(150)).await;
// Send some more
send(&mut sender, 9); // Not a Duplicate, this should be accepted
send(&mut sender, 8); // Not a Duplicate, this should be accepted
Expand Down