Skip to content

Commit

Permalink
Implement IndexedPool::revalidate
Browse files Browse the repository at this point in the history
  • Loading branch information
sebastiencs committed Mar 9, 2024
1 parent 9e9a893 commit e6abf86
Show file tree
Hide file tree
Showing 2 changed files with 190 additions and 16 deletions.
12 changes: 12 additions & 0 deletions ledger/src/scan_state/transaction_logic.rs
Expand Up @@ -1297,6 +1297,18 @@ pub mod zkapp_command {
pub hash: H,
}

impl<T, H: Ord> Ord for WithHash<T, H> {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.hash.cmp(&other.hash)
}
}

impl<T, H: PartialOrd> PartialOrd for WithHash<T, H> {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
self.hash.partial_cmp(&other.hash)
}
}

impl<T, H: Eq> Eq for WithHash<T, H> {}

impl<T, H: PartialEq> PartialEq for WithHash<T, H> {
Expand Down
194 changes: 178 additions & 16 deletions ledger/src/transaction_pool.rs
@@ -1,6 +1,6 @@
use std::{
borrow::Borrow,
collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
collections::{hash_map::Entry, BTreeSet, HashMap, HashSet, VecDeque},
};

use mina_hasher::Fp;
Expand Down Expand Up @@ -644,10 +644,10 @@ impl IndexedPool {

/// Remove a command from the all_by_fee and all_by_hash fields, and decrement
/// size. This may break an invariant.
fn update_remove_all_by_fee_and_hash_and_expiration(
&mut self,
cmds: VecDeque<ValidCommandWithHash>,
) {
fn update_remove_all_by_fee_and_hash_and_expiration<I>(&mut self, cmds: I)
where
I: IntoIterator<Item = ValidCommandWithHash>,
{
for cmd in cmds {
let fee_per_wu = cmd.data.forget_check().fee_per_wu();
let cmd_hash = cmd.hash.clone();
Expand All @@ -667,6 +667,25 @@ impl IndexedPool {
}

fn remove_with_dependents_exn(
&mut self,
cmd: &ValidCommandWithHash,
) -> VecDeque<ValidCommandWithHash> {
let sender = cmd.data.fee_payer();
let mut by_sender = SenderState {
state: self.all_by_sender.get(&sender).cloned(),
sender,
};

let mut updates = Vec::<Update>::with_capacity(128);
let result = self.remove_with_dependents_exn_impl(cmd, &mut by_sender, &mut updates);

self.set_sender(by_sender);
self.apply_updates(updates);

result
}

fn remove_with_dependents_exn_impl(
&self,
cmd: &ValidCommandWithHash,
by_sender: &mut SenderState,
Expand Down Expand Up @@ -907,7 +926,7 @@ impl IndexedPool {
}
}

let dropped = self.remove_with_dependents_exn(
let dropped = self.remove_with_dependents_exn_impl(
drop_queue.front().unwrap(),
by_sender,
updates,
Expand Down Expand Up @@ -994,19 +1013,153 @@ impl IndexedPool {
}
}

fn expired_by_global_slot(&self) -> Vec<ValidCommandWithHash> {
let global_slot_since_genesis = self.global_slot_since_genesis();

self.transactions_with_expiration
.iter()
.filter(|(slot, _cmd)| **slot < global_slot_since_genesis)
.flat_map(|(_slot, cmd)| cmd.iter().cloned())
.collect()
}

fn expired(&self) -> Vec<ValidCommandWithHash> {
self.expired_by_global_slot()
}

fn remove_expired(&mut self) -> Vec<ValidCommandWithHash> {
todo!()
let mut dropped = Vec::with_capacity(128);
for cmd in self.expired() {
if self.member(&cmd) {
let removed = self.remove_with_dependents_exn(&cmd);
dropped.extend(removed);
}
}
dropped
}

fn drop_until_below_max_size(&mut self, pool_max_size: usize) -> Vec<ValidCommandWithHash> {
todo!()
fn remove_lowest_fee(&mut self) -> VecDeque<ValidCommandWithHash> {
let Some(set) = self.min_fee().and_then(|fee| self.all_by_fee.get(&fee)) else {
return VecDeque::new();
};

// TODO: Should `self.all_by_fee` be a `BTreeSet` instead ?
let bset: BTreeSet<_> = set.iter().collect();
// TODO: Not sure if OCaml compare the same way than we do
let min = bset.first().map(|min| (*min).clone()).unwrap();

self.remove_with_dependents_exn(&min)
}

/// Drop commands from the end of the queue until the total currency consumed is
/// <= the current balance.
fn drop_until_sufficient_balance(
mut queue: VecDeque<ValidCommandWithHash>,
mut currency_reserved: Amount,
current_balance: Amount,
) -> (
VecDeque<ValidCommandWithHash>,
Amount,
VecDeque<ValidCommandWithHash>,
) {
let mut dropped_so_far = VecDeque::with_capacity(queue.len());

while currency_reserved > current_balance {
let last = queue.pop_back().unwrap();
let consumed = currency_consumed(&last.data.forget_check()).unwrap();
dropped_so_far.push_back(last);
currency_reserved = currency_reserved.checked_sub(&consumed).unwrap();
}

(queue, currency_reserved, dropped_so_far)
}

fn revalidate<F>(&mut self, kind: RevalidateKind, get_account: F) -> Vec<ValidCommandWithHash>
where
F: Fn(&AccountId) -> Account,
{
todo!()
let requires_revalidation = |account_id: &AccountId| match kind {
RevalidateKind::EntirePool => true,
RevalidateKind::Subset(set) => set.contains(account_id),
};

let mut dropped = Vec::new();

for (sender, (mut queue, mut currency_reserved)) in self.all_by_sender.clone() {
if !requires_revalidation(&sender) {
continue;
}
let account: Account = get_account(&sender);
let current_balance = {
let global_slot = self.global_slot_since_genesis();
account.liquid_balance_at_slot(global_slot).to_amount()
};
let first_cmd = queue.front().unwrap();
let first_nonce = first_cmd.data.forget_check().applicable_at_nonce();

if !(account.has_permission_to_send() && account.has_permission_to_increment_nonce()) {
let this_dropped = self.remove_with_dependents_exn(first_cmd);
dropped.extend(this_dropped);
} else if account.nonce < first_nonce {
let this_dropped = self.remove_with_dependents_exn(first_cmd);
dropped.extend(this_dropped);
} else {
// current_nonce >= first_nonce
let first_applicable_nonce_index = queue.iter().position(|cmd| {
let nonce = cmd.data.forget_check().applicable_at_nonce();
nonce == account.nonce
});

let keep_queue = match first_applicable_nonce_index {
Some(index) => queue.split_off(index),
None => Default::default(),
};
let drop_queue = queue;

for cmd in &drop_queue {
currency_reserved = currency_reserved
.checked_sub(&currency_consumed(&cmd.data.forget_check()).unwrap())
.unwrap();
}

let (keep_queue, currency_reserved, dropped_for_balance) =
Self::drop_until_sufficient_balance(
keep_queue,
currency_reserved,
current_balance,
);

let to_drop: Vec<_> = drop_queue.into_iter().chain(dropped_for_balance).collect();

let (head, _tail) = match to_drop.split_first() {
Some((head, tail)) => (head, tail),
None => continue,
};

self.remove_applicable_exn(head);
self.update_remove_all_by_fee_and_hash_and_expiration(to_drop.clone());

match keep_queue.front().cloned() {
None => {
self.all_by_sender.remove(&sender);
}
Some(first_kept) => {
let first_kept_unchecked = first_kept.data.forget_check();
self.all_by_sender
.insert(sender, (keep_queue, currency_reserved));
Self::map_set_insert(
&mut self.applicable_by_fee,
first_kept_unchecked.fee_per_wu(),
first_kept,
);
}
}

dropped.extend(to_drop);
}
}

dropped
}
}

Expand Down Expand Up @@ -1133,11 +1286,23 @@ impl TransactionPool {
}
}

fn drop_until_below_max_size(&mut self, pool_max_size: usize) -> Vec<ValidCommandWithHash> {
let mut list = Vec::new();

while self.pool.size() > pool_max_size {
let dropped = self.pool.remove_lowest_fee();
assert!(!dropped.is_empty());
list.extend(dropped)
}

list
}

fn handle_transition_frontier_diff(&mut self, diff: diff::BestTipDiff, best_tip_ledger: Mask) {
let diff::BestTipDiff {
new_commands,
removed_commands,
reorg_best_tip,
reorg_best_tip: _,
} = diff;

let global_slot = self.pool.global_slot_since_genesis();
Expand All @@ -1159,7 +1324,7 @@ impl TransactionPool {
}

let dropped_seq = match self.pool.add_from_backtrack(cmd) {
Ok(_) => self.pool.drop_until_below_max_size(pool_max_size),
Ok(_) => self.drop_until_below_max_size(pool_max_size),
Err(_) => todo!(), // TODO: print error
};
dropped_backtrack.extend(dropped_seq);
Expand Down Expand Up @@ -1370,10 +1535,7 @@ impl TransactionPool {
.flatten()
.collect::<Vec<_>>();

let dropped_for_size = {
self.pool
.drop_until_below_max_size(self.config.pool_max_size)
};
let dropped_for_size = { self.drop_until_below_max_size(self.config.pool_max_size) };

let all_dropped_cmds = dropped_for_add
.iter()
Expand Down

0 comments on commit e6abf86

Please sign in to comment.