Skip to content

Commit

Permalink
monitor alerts
Browse files Browse the repository at this point in the history
  • Loading branch information
ii-cruz committed Nov 9, 2023
1 parent be44684 commit f9417b4
Showing 1 changed file with 147 additions and 28 deletions.
175 changes: 147 additions & 28 deletions src/bin/monitor.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use nimiq_keys::PublicKey;
use snark_setup_operator::{
data_structs::{Ceremony, Response},
data_structs::{Ceremony, Response, UniqueChunkId},
error::MonitorError,
};

use anyhow::Result;
use chrono::Duration;
use chrono::{DateTime, Duration, Utc};
use gumdrop::Options;
use std::collections::HashSet;
use tracing::info;
use tracing::{error, info, warn};
use url::Url;

#[derive(Debug, Options, Clone)]
Expand All @@ -20,41 +21,85 @@ pub struct MonitorOpts {
pub coordinator_url: String,
#[options(help = "polling interval in minutes", default = "1")]
pub polling_interval: u64,
#[options(help = "timeout in minutes", default = "1")]
pub timeout: i64,
#[options(help = "chunk lock timeout in minutes", default = "1")]
pub chunk_timeout: i64,
#[options(help = "ceremony timeout in minutes", default = "5")]
pub ceremony_timeout: i64,
}

pub struct Monitor {
// Settings
pub server_url: Url,
pub timeout: Duration,
pub ceremony_timeout: Duration,

// Last changed values in the ceremony
pub last_ceremony_version: u64,
pub last_ceremony_update: DateTime<Utc>,

pub last_setups_all_done: Vec<bool>,

pub last_timed_out_participant_ids: Vec<PublicKey>,

pub last_chunks_complete: Vec<UniqueChunkId>,
pub last_chunks_incomplete: Vec<UniqueChunkId>,
pub participant_ids_incomplete: Vec<PublicKey>,
}

impl Monitor {
pub fn new(opts: &MonitorOpts) -> Result<Self> {
let monitor = Self {
server_url: Url::parse(&opts.coordinator_url)?.join("ceremony")?,
timeout: Duration::minutes(opts.timeout),
timeout: Duration::minutes(opts.chunk_timeout),
ceremony_timeout: Duration::minutes(opts.ceremony_timeout),
last_ceremony_version: 0,
last_ceremony_update: chrono::Utc::now(),
last_setups_all_done: vec![],
last_timed_out_participant_ids: vec![],
last_chunks_complete: vec![],
last_chunks_incomplete: vec![],
participant_ids_incomplete: vec![],
};
Ok(monitor)
}

async fn run(&self) -> Result<()> {
async fn run(&mut self) -> Result<()> {
let response = reqwest::get(self.server_url.as_str())
.await?
.error_for_status()?;
let data = response.text().await?;
let ceremony: Ceremony = serde_json::from_str::<Response<Ceremony>>(&data)?.result;

self.check_timeout(&ceremony)?;
self.check_progress(&ceremony)?;
self.check_all_done(&ceremony)?;
self.check_timeout(&ceremony)?;
self.show_finished_chunks(&ceremony)?;

Ok(())
}

fn check_timeout(&self, ceremony: &Ceremony) -> Result<()> {
fn check_progress(&mut self, ceremony: &Ceremony) -> Result<()> {
let current_time = chrono::Utc::now();
let mut timed_out_participant_ids = vec![];
let elapsed = current_time - self.last_ceremony_update;
if ceremony.version == self.last_ceremony_version {
if self.ceremony_timeout > elapsed {
warn!(
"Ceremony progress is stuck at version {:?} for {:?} minutes",
ceremony.version,
elapsed.num_minutes()
);
}
} else {
self.last_ceremony_update = current_time;
self.last_ceremony_version = ceremony.version;
}

Ok(())
}

fn check_timeout(&mut self, ceremony: &Ceremony) -> Result<()> {
let current_time = chrono::Utc::now();
let mut timed_out_participant_ids = HashSet::new();

for setup in ceremony.setups.iter() {
for chunk in setup.chunks.iter() {
Expand All @@ -71,20 +116,44 @@ impl Monitor {
.ok_or(MonitorError::LockTimeIsNoneError)?;
let elapsed = current_time - lock_time;
if elapsed > self.timeout {
timed_out_participant_ids.push(participant_id);
timed_out_participant_ids.insert(participant_id);
}
}
}
info!("timed out participants: {:?}", timed_out_participant_ids);
let timed_out_participant_ids: Vec<_> = timed_out_participant_ids
.iter()
.map(|pk| pk.clone())
.collect();
if !self
.last_timed_out_participant_ids
.eq(&timed_out_participant_ids)
{
if !timed_out_participant_ids.is_empty() {
warn!("timed out participants: {:?}", timed_out_participant_ids);
}
self.last_timed_out_participant_ids = timed_out_participant_ids;
}

Ok(())
}

fn check_all_done(&self, ceremony: &Ceremony) -> Result<()> {
let participant_ids: HashSet<_> = ceremony.contributor_ids.iter().clone().collect();
fn check_all_done(&mut self, ceremony: &Ceremony) -> Result<()> {
let participant_ids: HashSet<_> = ceremony
.contributor_ids
.iter()
.map(|pk| pk.clone())
.collect();
let mut last_setups_all_done = vec![];

if self.last_setups_all_done.len() <= ceremony.setups.len() {
let missing_setups = ceremony.setups.len() - self.last_setups_all_done.len();
for _ in 0..missing_setups {
self.last_setups_all_done.push(false);
}
}

for setup in ceremony.setups.iter() {
if setup.chunks.iter().all(|chunk| {
let done = setup.chunks.iter().all(|chunk| {
let verified_participant_ids_in_chunk: HashSet<_> = chunk
.contributions
.iter()
Expand All @@ -94,17 +163,33 @@ impl Monitor {
.collect();
participant_ids
.iter()
.all(|p| verified_participant_ids_in_chunk.contains(*p))
}) {
info!("setup {:?} all done", setup.setup_id);
} else {
info!("setup {:?} not finished", setup.setup_id);
.all(|p| verified_participant_ids_in_chunk.contains(p))
});
last_setups_all_done.push(done);
let index = setup.setup_id.len() - 1;

if self.last_setups_all_done[index] != last_setups_all_done[index] {
self.last_setups_all_done[index] = last_setups_all_done[index];
let all_done = last_setups_all_done.iter().all(|done| *done);

if all_done {
warn!("setups are all done!");
if self.last_setups_all_done.iter().all(|done| *done) {
self.reset();
break;
}
} else {
warn!(
"setup {:?} done: {}",
setup.setup_id, last_setups_all_done[index]
);
}
}
}
Ok(())
}

fn show_finished_chunks(&self, ceremony: &Ceremony) -> Result<()> {
fn show_finished_chunks(&mut self, ceremony: &Ceremony) -> Result<()> {
let participant_ids: HashSet<_> = ceremony.contributor_ids.iter().clone().collect();

let mut chunks_complete = vec![];
Expand All @@ -117,7 +202,7 @@ impl Monitor {
.contributions
.iter()
.filter(|c| c.verified)
.map(|c| c.contributor_id.as_ref())
.map(|c| c.contributor_id)
.filter_map(|e| e)
.collect();
if participant_ids
Expand All @@ -130,19 +215,53 @@ impl Monitor {
.iter()
.filter(|x| !verified_participant_ids_in_chunk.contains(*x))
.for_each(|p| {
participant_ids_incomplete.insert(p);
participant_ids_incomplete.insert((*p).clone());
});
chunks_incomplete.push(chunk.unique_chunk_id.clone())
}
}
}

info!("complete chunks: {:?}", chunks_complete);
info!("incomplete chunks: {:?}", chunks_incomplete);
info!("incomplete participants: {:?}", participant_ids_incomplete);
if !self.last_chunks_complete.eq(&chunks_complete) {
if !chunks_complete.is_empty() {
info!("complete chunks: {:?}", chunks_complete);
}
self.last_chunks_complete = chunks_complete;
}
if !self.last_chunks_incomplete.eq(&chunks_incomplete) {
if !chunks_incomplete.is_empty() {
info!("incomplete chunks: {:?}", chunks_incomplete);
}
self.last_chunks_incomplete = chunks_incomplete;
}
let participant_ids_incomplete_vec: Vec<PublicKey> = participant_ids_incomplete
.iter()
.map(|pk| pk.clone())
.collect();
if !self
.participant_ids_incomplete
.eq(&participant_ids_incomplete_vec)
{
if !participant_ids_incomplete_vec.is_empty() {
info!(
"incomplete participants: {:?}",
participant_ids_incomplete_vec
);
}
self.participant_ids_incomplete = participant_ids_incomplete_vec;
}

Ok(())
}

fn reset(&mut self) {
warn!("setups are all done!");
self.last_setups_all_done = vec![];
self.last_timed_out_participant_ids = vec![];
self.last_chunks_complete = vec![];
self.last_chunks_incomplete = vec![];
self.participant_ids_incomplete = vec![];
}
}

#[tokio::main]
Expand All @@ -151,14 +270,14 @@ async fn main() {

let opts: MonitorOpts = MonitorOpts::parse_args_default_or_exit();

let monitor = Monitor::new(&opts).expect("Should have been able to create a monitor.");
let mut monitor = Monitor::new(&opts).expect("Should have been able to create a monitor.");
let mut monitor_interval =
tokio::time::interval(std::time::Duration::from_secs(60 * opts.polling_interval));
loop {
monitor_interval.tick().await;

match monitor.run().await {
Err(e) => info!("Got error from monitor: {}", e.to_string()),
Err(e) => error!("Got error from monitor: {}", e.to_string()),
_ => {}
}
}
Expand Down

0 comments on commit f9417b4

Please sign in to comment.