Skip to content

Commit

Permalink
Consensus: remove Consensus poll timer
Browse files Browse the repository at this point in the history
  • Loading branch information
styppo committed Apr 10, 2024
1 parent 1d4a1ee commit ea2ef23
Showing 1 changed file with 4 additions and 35 deletions.
39 changes: 4 additions & 35 deletions consensus/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ use tokio::sync::{
},
oneshot::{error::RecvError, Sender as OneshotSender},
};
#[cfg(not(target_family = "wasm"))]
use tokio::time::{sleep, Sleep};
use tokio_stream::wrappers::BroadcastStream;

use self::consensus_proxy::ConsensusProxy;
Expand Down Expand Up @@ -128,10 +126,6 @@ pub struct Consensus<N: Network> {

pub sync: SyncerProxy<N>,

/// A Delay which exists purely for the waker on its poll to reactivate the task running Consensus::poll
/// FIXME Remove this
#[cfg(not(target_family = "wasm"))]
next_execution_timer: Option<Pin<Box<Sleep>>>,
events: BroadcastSender<ConsensusEvent>,
established_flag: Arc<AtomicBool>,
head_requests: Option<HeadRequests<N>>,
Expand Down Expand Up @@ -168,13 +162,6 @@ impl<N: Network> Consensus<N> {
/// established state and to advance the chain.
const HEAD_REQUESTS_TIMEOUT: Duration = Duration::from_secs(5);

/// Timeout after which the consensus is polled after it ran last
///
/// TODO: Set appropriate duration
/// FIXME Remove this
#[cfg(not(target_family = "wasm"))]
const CONSENSUS_POLL_TIMER: Duration = Duration::from_secs(1);

pub fn from_network(
blockchain: BlockchainProxy,
network: Arc<N>,
Expand Down Expand Up @@ -210,16 +197,11 @@ impl<N: Network> Consensus<N> {

let established_flag = Arc::new(AtomicBool::new(false));

#[cfg(not(target_family = "wasm"))]
let timer = Box::pin(sleep(Self::CONSENSUS_POLL_TIMER));

Consensus {
blockchain,
network,
sync: syncer,
events: tx,
#[cfg(not(target_family = "wasm"))]
next_execution_timer: Some(timer),
established_flag,
head_requests: None,
head_requests_time: None,
Expand Down Expand Up @@ -428,7 +410,7 @@ impl<N: Network> Future for Consensus<N> {
type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// 1. Poll and advance block queue
// Poll and advance block queue
while let Poll::Ready(Some(event)) = self.sync.poll_next_unpin(cx) {
match event {
LiveSyncPushEvent::AcceptedAnnouncedBlock(_) => {
Expand Down Expand Up @@ -473,7 +455,7 @@ impl<N: Network> Future for Consensus<N> {
self.events.send(event).ok();
}

// 2. Poll any head requests if active.
// Poll any head requests if active.
if let Some(ref mut head_requests) = self.head_requests {
if let Poll::Ready(mut result) = head_requests.poll_unpin(cx) {
// Reset head requests.
Expand All @@ -491,27 +473,14 @@ impl<N: Network> Future for Consensus<N> {
}
}

// 3. Check if a ConsensusRequest was received
// Check if a ConsensusRequest was received
while let Poll::Ready(Some(request)) = self.requests.1.poll_recv(cx) {
match request {
ConsensusRequest::ResolveBlock(request) => self.resolve_block(request),
}
}

// 4. Update timer and poll it so the task gets woken when the timer runs out (at the latest)
// The timer itself running out (producing an Instant) is of no interest to the execution. This poll method
// was potentially awoken by the delays waker, but even then all there is to do is set up a new timer such
// that it will wake this task again after another time frame has elapsed. No interval was used as that
// would periodically wake the task even though it might have just executed
#[cfg(not(target_family = "wasm"))]
{
let mut timer = Box::pin(sleep(Self::CONSENSUS_POLL_TIMER));
// If the sleep wasn't pending anymore, it didn't register us with the waker, but we need that.
assert!(timer.poll_unpin(cx) == Poll::Pending);
self.next_execution_timer = Some(timer);
}

// 5. Advance consensus and catch-up through head requests.
// Advance consensus and catch-up through head requests.
self.request_heads();

Poll::Pending
Expand Down

0 comments on commit ea2ef23

Please sign in to comment.