From 3b74071fbea34410c5c69414d41265ebf62818d8 Mon Sep 17 00:00:00 2001 From: Sebastien Chapuis Date: Tue, 26 Mar 2024 11:22:24 +0100 Subject: [PATCH] Integrate the pool in the state machine --- ledger/src/transaction_pool.rs | 87 +++++++++++++++++++++++++++----- node/src/lib.rs | 1 + node/src/transaction_pool/mod.rs | 63 +++++++++++++++++++++++ 3 files changed, 137 insertions(+), 14 deletions(-) create mode 100644 node/src/transaction_pool/mod.rs diff --git a/ledger/src/transaction_pool.rs b/ledger/src/transaction_pool.rs index 6baa0ea2c..52d015bdc 100644 --- a/ledger/src/transaction_pool.rs +++ b/ledger/src/transaction_pool.rs @@ -130,7 +130,7 @@ const REPLACE_FEE: Fee = Fee::of_nanomina_int_exn(1); type ValidCommandWithHash = WithHash; -mod diff { +pub mod diff { use super::*; #[derive(Debug, Clone)] @@ -441,7 +441,7 @@ struct SenderState { state: Option<(VecDeque, Amount)>, } -enum RevalidateKind<'a> { +pub enum RevalidateKind<'a> { EntirePool, Subset(&'a HashSet), } @@ -1260,7 +1260,7 @@ impl Envelope { } #[derive(Debug)] -enum ApplyDecision { +pub enum ApplyDecision { Accept, Reject, } @@ -1281,6 +1281,40 @@ pub struct TransactionPool { } impl TransactionPool { + pub fn on_new_best_tip(&mut self, new_best_tip: Mask) { + let validation_ledger = new_best_tip; + self.best_tip_ledger.replace(validation_ledger.clone()); + + let dropped = + self.pool.revalidate( + RevalidateKind::EntirePool, + |sender_id| match validation_ledger.location_of_account(sender_id) { + None => Account::empty(), + Some(addr) => *validation_ledger + .get(addr) + .expect("Location without account"), + }, + ); + + let dropped_locally_generated = dropped + .iter() + .filter(|cmd| { + let dropped_commited = self.locally_generated_committed.remove(cmd).is_some(); + let dropped_uncommited = self.locally_generated_uncommitted.remove(cmd).is_some(); + // Nothing should be in both tables. + assert!(!(dropped_commited && dropped_uncommited)); + dropped_commited || dropped_uncommited + }) + .collect::>(); + + if !dropped_locally_generated.is_empty() { + eprintln!( + "Dropped locally generated commands $cmds from pool when transition frontier was recreated. {:?}", + dropped_locally_generated + ) + } + } + fn has_sufficient_fee(&self, pool_max_size: usize, cmd: &valid::UserCommand) -> bool { match self.pool.min_fee() { None => true, @@ -1306,7 +1340,11 @@ impl TransactionPool { list } - fn handle_transition_frontier_diff(&mut self, diff: diff::BestTipDiff, best_tip_ledger: Mask) { + pub fn handle_transition_frontier_diff( + &mut self, + diff: &diff::BestTipDiff, + best_tip_ledger: Mask, + ) { let diff::BestTipDiff { new_commands, removed_commands, @@ -1318,12 +1356,11 @@ impl TransactionPool { let pool_max_size = self.config.pool_max_size; - self.verification_key_table.increment_list(&new_commands); - self.verification_key_table - .decrement_list(&removed_commands); + self.verification_key_table.increment_list(new_commands); + self.verification_key_table.decrement_list(removed_commands); let mut dropped_backtrack = Vec::with_capacity(256); - for cmd in &removed_commands { + for cmd in removed_commands { let cmd = transaction_hash::hash_command(cmd.data.clone()); if let Some(time_added) = self.locally_generated_committed.remove(&cmd) { @@ -1349,7 +1386,7 @@ impl TransactionPool { let dropped_commands = { let accounts_to_check = new_commands .iter() - .chain(&removed_commands) + .chain(removed_commands) .flat_map(|cmd| cmd.data.forget_check().accounts_referenced()) .collect::>(); @@ -1454,7 +1491,8 @@ impl TransactionPool { fn apply( &mut self, - diff: Envelope, + diff: &diff::DiffVerified, + is_sender_local: bool, ) -> Result< ( ApplyDecision, @@ -1463,8 +1501,6 @@ impl TransactionPool { ), String, > { - let is_sender_local = diff.is_sender_local(); - let ledger = self.best_tip_ledger.as_ref().ok_or_else(|| { "Got transaction pool diff when transitin frontier is unavailable, ignoring." .to_string() @@ -1472,7 +1508,7 @@ impl TransactionPool { let fee_payer = |cmd: &ValidCommandWithHash| cmd.data.fee_payer(); - let fee_payer_account_ids: HashSet<_> = diff.data().list.iter().map(fee_payer).collect(); + let fee_payer_account_ids: HashSet<_> = diff.list.iter().map(fee_payer).collect(); let fee_payer_accounts = preload_accounts(ledger, &fee_payer_account_ids); let check_command = |pool: &IndexedPool, cmd: &ValidCommandWithHash| { @@ -1495,7 +1531,6 @@ impl TransactionPool { }; let add_results = diff - .data() .list .iter() .map(|cmd| { @@ -1611,6 +1646,30 @@ impl TransactionPool { Ok((decision, accepted, rejected)) } + pub fn unsafe_apply( + &mut self, + diff: &diff::DiffVerified, + is_sender_local: bool, + ) -> Result< + ( + ApplyDecision, + Vec, + Vec<(UserCommand, diff::Error)>, + ), + String, + > { + let (decision, accepted, rejected) = self.apply(diff, is_sender_local)?; + let accepted = accepted + .into_iter() + .map(|cmd| cmd.data.forget_check()) + .collect::>(); + let rejected = rejected + .into_iter() + .map(|(cmd, e)| (cmd.data.forget_check(), e)) + .collect::>(); + Ok((decision, accepted, rejected)) + } + fn register_locally_generated(&mut self, cmd: &ValidCommandWithHash) { match self.locally_generated_uncommitted.entry(cmd.clone()) { Entry::Occupied(mut entry) => { diff --git a/node/src/lib.rs b/node/src/lib.rs index ad9098457..7f9781018 100644 --- a/node/src/lib.rs +++ b/node/src/lib.rs @@ -36,6 +36,7 @@ pub mod p2p; pub mod rpc; pub mod snark; pub mod snark_pool; +pub mod transaction_pool; pub mod transition_frontier; pub mod watched_accounts; diff --git a/node/src/transaction_pool/mod.rs b/node/src/transaction_pool/mod.rs new file mode 100644 index 000000000..96bb1573c --- /dev/null +++ b/node/src/transaction_pool/mod.rs @@ -0,0 +1,63 @@ +use ledger::{ + transaction_pool::{ + diff::{BestTipDiff, DiffVerified}, + ApplyDecision, + RevalidateKind::EntirePool, + }, + Account, BaseLedger, Mask, +}; + +enum TransactionPoolAction { + BestTipChanged { + best_tip: Mask, + }, + ApplyVerifiedDiff { + diff: DiffVerified, + is_sender_local: bool, + }, + ApplyTransitionFrontierDiff { + best_tip: Mask, + diff: BestTipDiff, + }, + // Rebroadcast locally generated pool items every 10 minutes. Do so for 50 + // minutes - at most 5 rebroadcasts - before giving up. + Rebroadcast, +} + +impl redux::EnablingCondition for TransactionPoolAction { + fn is_enabled(&self, state: &crate::State, time: redux::Timestamp) -> bool { + true + } +} + +pub struct TransactionPoolState { + pool: ledger::transaction_pool::TransactionPool, +} + +type TransactionPoolActionWithMetaRef<'a> = redux::ActionWithMeta<&'a TransactionPoolAction>; + +impl TransactionPoolState { + pub fn reducer(&mut self, action: TransactionPoolActionWithMetaRef<'_>) { + use TransactionPoolAction::*; + + let (action, meta) = action.split(); + match action { + BestTipChanged { best_tip } => { + self.pool.on_new_best_tip(best_tip.clone()); + } + ApplyVerifiedDiff { + diff, + is_sender_local, + } => match self.pool.unsafe_apply(diff, *is_sender_local) { + Ok((ApplyDecision::Accept, accepted, rejected)) => todo!(), + Ok((ApplyDecision::Reject, accepted, rejected)) => todo!(), + Err(_) => todo!(), + }, + ApplyTransitionFrontierDiff { best_tip, diff } => { + self.pool + .handle_transition_frontier_diff(diff, best_tip.clone()); + } + Rebroadcast => todo!(), + } + } +}