Skip to content

Commit

Permalink
Integrate the pool in the state machine
Browse files Browse the repository at this point in the history
  • Loading branch information
sebastiencs committed Mar 26, 2024
1 parent 5058aba commit 3b74071
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 14 deletions.
87 changes: 73 additions & 14 deletions ledger/src/transaction_pool.rs
Expand Up @@ -130,7 +130,7 @@ const REPLACE_FEE: Fee = Fee::of_nanomina_int_exn(1);

type ValidCommandWithHash = WithHash<valid::UserCommand, BlakeHash>;

mod diff {
pub mod diff {
use super::*;

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -441,7 +441,7 @@ struct SenderState {
state: Option<(VecDeque<ValidCommandWithHash>, Amount)>,
}

enum RevalidateKind<'a> {
pub enum RevalidateKind<'a> {
EntirePool,
Subset(&'a HashSet<AccountId>),
}
Expand Down Expand Up @@ -1260,7 +1260,7 @@ impl<T> Envelope<T> {
}

#[derive(Debug)]
enum ApplyDecision {
pub enum ApplyDecision {
Accept,
Reject,
}
Expand All @@ -1281,6 +1281,40 @@ pub struct TransactionPool {
}

impl TransactionPool {
pub fn on_new_best_tip(&mut self, new_best_tip: Mask) {
let validation_ledger = new_best_tip;
self.best_tip_ledger.replace(validation_ledger.clone());

let dropped =
self.pool.revalidate(
RevalidateKind::EntirePool,
|sender_id| match validation_ledger.location_of_account(sender_id) {
None => Account::empty(),
Some(addr) => *validation_ledger
.get(addr)
.expect("Location without account"),
},
);

let dropped_locally_generated = dropped
.iter()
.filter(|cmd| {
let dropped_commited = self.locally_generated_committed.remove(cmd).is_some();
let dropped_uncommited = self.locally_generated_uncommitted.remove(cmd).is_some();
// Nothing should be in both tables.
assert!(!(dropped_commited && dropped_uncommited));
dropped_commited || dropped_uncommited
})
.collect::<Vec<_>>();

if !dropped_locally_generated.is_empty() {
eprintln!(
"Dropped locally generated commands $cmds from pool when transition frontier was recreated. {:?}",
dropped_locally_generated
)
}
}

fn has_sufficient_fee(&self, pool_max_size: usize, cmd: &valid::UserCommand) -> bool {
match self.pool.min_fee() {
None => true,
Expand All @@ -1306,7 +1340,11 @@ impl TransactionPool {
list
}

fn handle_transition_frontier_diff(&mut self, diff: diff::BestTipDiff, best_tip_ledger: Mask) {
pub fn handle_transition_frontier_diff(
&mut self,
diff: &diff::BestTipDiff,
best_tip_ledger: Mask,
) {
let diff::BestTipDiff {
new_commands,
removed_commands,
Expand All @@ -1318,12 +1356,11 @@ impl TransactionPool {

let pool_max_size = self.config.pool_max_size;

self.verification_key_table.increment_list(&new_commands);
self.verification_key_table
.decrement_list(&removed_commands);
self.verification_key_table.increment_list(new_commands);
self.verification_key_table.decrement_list(removed_commands);

let mut dropped_backtrack = Vec::with_capacity(256);
for cmd in &removed_commands {
for cmd in removed_commands {
let cmd = transaction_hash::hash_command(cmd.data.clone());

if let Some(time_added) = self.locally_generated_committed.remove(&cmd) {
Expand All @@ -1349,7 +1386,7 @@ impl TransactionPool {
let dropped_commands = {
let accounts_to_check = new_commands
.iter()
.chain(&removed_commands)
.chain(removed_commands)
.flat_map(|cmd| cmd.data.forget_check().accounts_referenced())
.collect::<HashSet<_>>();

Expand Down Expand Up @@ -1454,7 +1491,8 @@ impl TransactionPool {

fn apply(
&mut self,
diff: Envelope<diff::DiffVerified>,
diff: &diff::DiffVerified,
is_sender_local: bool,
) -> Result<
(
ApplyDecision,
Expand All @@ -1463,16 +1501,14 @@ impl TransactionPool {
),
String,
> {
let is_sender_local = diff.is_sender_local();

let ledger = self.best_tip_ledger.as_ref().ok_or_else(|| {
"Got transaction pool diff when transitin frontier is unavailable, ignoring."
.to_string()
})?;

let fee_payer = |cmd: &ValidCommandWithHash| cmd.data.fee_payer();

let fee_payer_account_ids: HashSet<_> = diff.data().list.iter().map(fee_payer).collect();
let fee_payer_account_ids: HashSet<_> = diff.list.iter().map(fee_payer).collect();
let fee_payer_accounts = preload_accounts(ledger, &fee_payer_account_ids);

let check_command = |pool: &IndexedPool, cmd: &ValidCommandWithHash| {
Expand All @@ -1495,7 +1531,6 @@ impl TransactionPool {
};

let add_results = diff
.data()
.list
.iter()
.map(|cmd| {
Expand Down Expand Up @@ -1611,6 +1646,30 @@ impl TransactionPool {
Ok((decision, accepted, rejected))
}

pub fn unsafe_apply(
&mut self,
diff: &diff::DiffVerified,
is_sender_local: bool,
) -> Result<
(
ApplyDecision,
Vec<UserCommand>,
Vec<(UserCommand, diff::Error)>,
),
String,
> {
let (decision, accepted, rejected) = self.apply(diff, is_sender_local)?;
let accepted = accepted
.into_iter()
.map(|cmd| cmd.data.forget_check())
.collect::<Vec<_>>();
let rejected = rejected
.into_iter()
.map(|(cmd, e)| (cmd.data.forget_check(), e))
.collect::<Vec<_>>();
Ok((decision, accepted, rejected))
}

fn register_locally_generated(&mut self, cmd: &ValidCommandWithHash) {
match self.locally_generated_uncommitted.entry(cmd.clone()) {
Entry::Occupied(mut entry) => {
Expand Down
1 change: 1 addition & 0 deletions node/src/lib.rs
Expand Up @@ -36,6 +36,7 @@ pub mod p2p;
pub mod rpc;
pub mod snark;
pub mod snark_pool;
pub mod transaction_pool;
pub mod transition_frontier;
pub mod watched_accounts;

Expand Down
63 changes: 63 additions & 0 deletions node/src/transaction_pool/mod.rs
@@ -0,0 +1,63 @@
use ledger::{
transaction_pool::{
diff::{BestTipDiff, DiffVerified},
ApplyDecision,
RevalidateKind::EntirePool,
},
Account, BaseLedger, Mask,
};

enum TransactionPoolAction {
BestTipChanged {
best_tip: Mask,
},
ApplyVerifiedDiff {
diff: DiffVerified,
is_sender_local: bool,
},
ApplyTransitionFrontierDiff {
best_tip: Mask,
diff: BestTipDiff,
},
// Rebroadcast locally generated pool items every 10 minutes. Do so for 50
// minutes - at most 5 rebroadcasts - before giving up.
Rebroadcast,
}

impl redux::EnablingCondition<crate::State> for TransactionPoolAction {
fn is_enabled(&self, state: &crate::State, time: redux::Timestamp) -> bool {
true
}
}

pub struct TransactionPoolState {
pool: ledger::transaction_pool::TransactionPool,
}

type TransactionPoolActionWithMetaRef<'a> = redux::ActionWithMeta<&'a TransactionPoolAction>;

impl TransactionPoolState {
pub fn reducer(&mut self, action: TransactionPoolActionWithMetaRef<'_>) {
use TransactionPoolAction::*;

let (action, meta) = action.split();
match action {
BestTipChanged { best_tip } => {
self.pool.on_new_best_tip(best_tip.clone());
}
ApplyVerifiedDiff {
diff,
is_sender_local,
} => match self.pool.unsafe_apply(diff, *is_sender_local) {
Ok((ApplyDecision::Accept, accepted, rejected)) => todo!(),
Ok((ApplyDecision::Reject, accepted, rejected)) => todo!(),
Err(_) => todo!(),
},
ApplyTransitionFrontierDiff { best_tip, diff } => {
self.pool
.handle_transition_frontier_diff(diff, best_tip.clone());
}
Rebroadcast => todo!(),
}
}
}

0 comments on commit 3b74071

Please sign in to comment.