Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bound parallel verification of transactions #2387

Merged
merged 11 commits into from Mar 27, 2024
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions ledger/src/check_transaction_basic.rs
Expand Up @@ -24,4 +24,13 @@ impl<N: Network, C: ConsensusStorage<N>> Ledger<N, C> {
) -> Result<()> {
self.vm().check_transaction(transaction, rejected_id, rng)
}

/// Checks that the given list of transactions are well-formed and unique.
pub fn check_transactions_basic<R: CryptoRng + Rng>(
vicsn marked this conversation as resolved.
Show resolved Hide resolved
&self,
transactions: &[(&Transaction<N>, Option<Field<N>>)],
rng: &mut R,
) -> Result<()> {
self.vm().check_transactions(transactions, rng)
}
}
5 changes: 5 additions & 0 deletions synthesizer/Cargo.toml
Expand Up @@ -130,6 +130,11 @@ path = "./snark"
version = "=0.16.19"
optional = true

[dependencies.utilities]
package = "snarkvm-utilities"
path = "../utilities"
version = "=0.16.19"

[dependencies.aleo-std]
version = "0.1.24"
default-features = false
Expand Down
32 changes: 13 additions & 19 deletions synthesizer/process/src/finalize.rs
Expand Up @@ -15,7 +15,7 @@
use super::*;
use console::program::{FinalizeType, Future, Register};
use synthesizer_program::{Await, FinalizeRegistersState, Operand};
use utilities::handle_halting;
use utilities::try_vm_runtime;

use std::collections::HashSet;

Expand Down Expand Up @@ -227,9 +227,7 @@ fn finalize_transition<N: Network, P: FinalizeStorage<N>>(
// Finalize the command.
match &command {
Command::BranchEq(branch_eq) => {
let result = handle_halting!(panic::AssertUnwindSafe(|| {
branch_to(counter, branch_eq, finalize, stack, &registers)
}));
let result = try_vm_runtime!(|| branch_to(counter, branch_eq, finalize, stack, &registers));
match result {
Ok(Ok(new_counter)) => {
counter = new_counter;
Expand All @@ -241,9 +239,7 @@ fn finalize_transition<N: Network, P: FinalizeStorage<N>>(
}
}
Command::BranchNeq(branch_neq) => {
let result = handle_halting!(panic::AssertUnwindSafe(|| {
branch_to(counter, branch_neq, finalize, stack, &registers)
}));
let result = try_vm_runtime!(|| branch_to(counter, branch_neq, finalize, stack, &registers));
match result {
Ok(Ok(new_counter)) => {
counter = new_counter;
Expand Down Expand Up @@ -277,16 +273,15 @@ fn finalize_transition<N: Network, P: FinalizeStorage<N>>(
None => bail!("Transition ID '{transition_id}' not found in call graph"),
};

let callee_state = match handle_halting!(panic::AssertUnwindSafe(|| {
// Set up the finalize state for the await.
setup_await(state, await_, stack, &registers, child_transition_id)
})) {
Ok(Ok(callee_state)) => callee_state,
// If the evaluation fails, bail and return the error.
Ok(Err(error)) => bail!("'finalize' failed to evaluate command ({command}): {error}"),
// If the evaluation fails, bail and return the error.
Err(_) => bail!("'finalize' failed to evaluate command ({command})"),
};
// Set up the finalize state for the await.
let callee_state =
match try_vm_runtime!(|| setup_await(state, await_, stack, &registers, child_transition_id)) {
Ok(Ok(callee_state)) => callee_state,
// If the evaluation fails, bail and return the error.
Ok(Err(error)) => bail!("'finalize' failed to evaluate command ({command}): {error}"),
// If the evaluation fails, bail and return the error.
Err(_) => bail!("'finalize' failed to evaluate command ({command})"),
};

// Increment the call counter.
call_counter += 1;
Expand All @@ -306,8 +301,7 @@ fn finalize_transition<N: Network, P: FinalizeStorage<N>>(
continue 'outer;
}
_ => {
let result =
handle_halting!(panic::AssertUnwindSafe(|| { command.finalize(stack, store, &mut registers) }));
let result = try_vm_runtime!(|| command.finalize(stack, store, &mut registers));
match result {
// If the evaluation succeeds with an operation, add it to the list.
Ok(Ok(Some(finalize_operation))) => finalize_operations.push(finalize_operation),
Expand Down
96 changes: 64 additions & 32 deletions synthesizer/src/vm/finalize.rs
Expand Up @@ -16,8 +16,6 @@ use super::*;

use ledger_committee::{MAX_DELEGATORS, MIN_DELEGATOR_STAKE, MIN_VALIDATOR_STAKE};

use rand::{rngs::StdRng, SeedableRng};

impl<N: Network, C: ConsensusStorage<N>> VM<N, C> {
/// Speculates on the given list of transactions in the VM.
/// This function aborts all transactions that are not are well-formed or unique.
Expand Down Expand Up @@ -47,32 +45,16 @@ impl<N: Network, C: ConsensusStorage<N>> VM<N, C> {
let candidate_transactions: Vec<_> = candidate_transactions.collect::<Vec<_>>();
let candidate_transaction_ids: Vec<_> = candidate_transactions.iter().map(|tx| tx.id()).collect();

// Determine if the vm is currently processing the genesis block.
let is_genesis =
self.block_store().find_block_height_from_state_root(self.block_store().current_state_root())?.is_none();
// If the transactions are not part of the genesis block, ensure each transaction is well-formed and unique. Abort any transactions that are not.
let (verified_transactions, verification_aborted_transactions) =
match self.block_store().find_block_height_from_state_root(self.block_store().current_state_root())? {
// If the current state root does not exist in the block store, then the genesis block has not been introduced yet.
None => (candidate_transactions, vec![]),
// Verify transactions for all non-genesis cases.
_ => {
let rngs =
(0..candidate_transactions.len()).map(|_| StdRng::from_seed(rng.gen())).collect::<Vec<_>>();
// Verify the transactions and collect the error message if there is one.
cfg_into_iter!(candidate_transactions).zip(rngs).partition_map(|(transaction, mut rng)| {
// Abort the transaction if it is a fee transaction.
if transaction.is_fee() {
return Either::Right((
transaction,
"Fee transactions are not allowed in speculate".to_string(),
));
}
// Verify the transaction.
match self.check_transaction(transaction, None, &mut rng) {
Ok(_) => Either::Left(transaction),
Err(e) => Either::Right((transaction, e.to_string())),
}
})
}
};
let (verified_transactions, verification_aborted_transactions) = match is_genesis {
// If the current state root does not exist in the block store, then the genesis block has not been introduced yet.
true => (candidate_transactions, vec![]),
// Verify transactions for all non-genesis cases.
false => self.prepare_for_speculate(&candidate_transactions, rng)?,
};

// Performs a **dry-run** over the list of ratifications, solutions, and transactions.
let (ratifications, confirmed_transactions, speculation_aborted_transactions, ratified_finalize_operations) =
Expand Down Expand Up @@ -127,15 +109,15 @@ impl<N: Network, C: ConsensusStorage<N>> VM<N, C> {
) -> Result<Vec<FinalizeOperation<N>>> {
let timer = timer!("VM::check_speculate");

// Retrieve the transactions and their rejected IDs.
let transactions_and_rejected_ids = cfg_iter!(transactions)
.map(|transaction| transaction.to_rejected_id().map(|rejected_id| (transaction.deref(), rejected_id)))
.collect::<Result<Vec<_>>>()?;
// Ensure each transaction is well-formed and unique.
// NOTE: We perform the transaction checks here prior to `atomic_speculate` because we must
// ensure that the `Fee` transactions are valid. We can't unify the transaction checks in `atomic_speculate`
// because we run speculation on the unconfirmed variant of the transactions.
let rngs = (0..transactions.len()).map(|_| StdRng::from_seed(rng.gen())).collect::<Vec<_>>();
cfg_iter!(transactions).zip(rngs).try_for_each(|(transaction, mut rng)| {
self.check_transaction(transaction, transaction.to_rejected_id()?, &mut rng)
.map_err(|e| anyhow!("Invalid transaction found in the transactions list: {e}"))
})?;
self.check_transactions(&transactions_and_rejected_ids, rng)?;
vicsn marked this conversation as resolved.
Show resolved Hide resolved

// Reconstruct the candidate ratifications to verify the speculation.
let candidate_ratifications = ratifications.iter().cloned().collect::<Vec<_>>();
Expand Down Expand Up @@ -806,6 +788,56 @@ impl<N: Network, C: ConsensusStorage<N>> VM<N, C> {
})
}

/// Performs precondition checks on the transactions prior to speculation.
///
/// This method is used to check the following conditions:
/// - If a transaction is a fee transaction or if it is invalid,
/// then the transaction will be aborted.
pub(crate) fn prepare_for_speculate<'a, R: CryptoRng + Rng>(
&self,
transactions: &[&'a Transaction<N>],
rng: &mut R,
) -> Result<(Vec<&'a Transaction<N>>, Vec<(&'a Transaction<N>, String)>)> {
// Construct the list of valid and invalid transactions.
let mut valid_transactions = Vec::with_capacity(transactions.len());
let mut aborted_transactions = Vec::with_capacity(transactions.len());

// Separate the transactions into deploys and executions.
let (deployments, executions): (Vec<&Transaction<N>>, Vec<&Transaction<N>>) =
transactions.iter().partition(|tx| tx.is_deploy());
// Chunk the deploys and executions into groups for parallel verification.
let deployments_for_verification = deployments.chunks(Self::MAX_PARALLEL_DEPLOY_VERIFICATIONS);
let executions_for_verification = executions.chunks(Self::MAX_PARALLEL_EXECUTE_VERIFICATIONS);

// Verify the transactions in batches and separate the valid and invalid transactions.
for transactions in deployments_for_verification.chain(executions_for_verification) {
let rngs = (0..transactions.len()).map(|_| StdRng::from_seed(rng.gen())).collect::<Vec<_>>();
// Verify the transactions and collect the error message if there is one.
let (valid, invalid): (Vec<_>, Vec<_>) =
cfg_into_iter!(transactions).zip(rngs).partition_map(|(transaction, mut rng)| {
// Abort the transaction if it is a fee transaction.
if transaction.is_fee() {
return Either::Right((
*transaction,
"Fee transactions are not allowed in speculate".to_string(),
));
}
// Verify the transaction.
match self.check_transaction(transaction, None, &mut rng) {
Ok(_) => Either::Left(*transaction),
Err(e) => Either::Right((*transaction, e.to_string())),
}
});

// Collect the valid and aborted transactions.
valid_transactions.extend(valid);
aborted_transactions.extend(invalid);
}

// Return the valid and invalid transactions.
Ok((valid_transactions, aborted_transactions))
}

/// Performs precondition checks on the transaction prior to execution.
///
/// This method is used to check the following conditions:
Expand Down
31 changes: 27 additions & 4 deletions synthesizer/src/vm/mod.rs
Expand Up @@ -59,12 +59,14 @@ use ledger_store::{
};
use synthesizer_process::{deployment_cost, execution_cost, Authorization, Process, Trace};
use synthesizer_program::{FinalizeGlobalState, FinalizeOperation, FinalizeStoreTrait, Program};
use utilities::try_vm_runtime;

use aleo_std::prelude::{finish, lap, timer};
use indexmap::{IndexMap, IndexSet};
use itertools::Either;
use lru::LruCache;
use parking_lot::{Mutex, RwLock};
use rand::{rngs::StdRng, SeedableRng};
use std::{collections::HashSet, num::NonZeroUsize, sync::Arc};

#[cfg(not(feature = "serial"))]
Expand Down Expand Up @@ -733,7 +735,6 @@ function compute:
// Construct the new block header.
let (ratifications, transactions, aborted_transaction_ids, ratified_finalize_operations) =
vm.speculate(sample_finalize_state(1), None, vec![], &None.into(), transactions.iter(), rng)?;
assert!(aborted_transaction_ids.is_empty());

// Construct the metadata associated with the block.
let metadata = Metadata::new(
Expand Down Expand Up @@ -1383,12 +1384,12 @@ function do:
}

#[test]
#[should_panic]
fn test_deployment_synthesis_underreport() {
let rng = &mut TestRng::default();

// Initialize a private key.
let private_key = sample_genesis_private_key(rng);
let address = Address::try_from(&private_key).unwrap();

// Initialize the genesis block.
let genesis = sample_genesis_block(rng);
Expand Down Expand Up @@ -1432,8 +1433,30 @@ function do:
Deployment::new(deployment.edition(), deployment.program().clone(), vks_with_underreport).unwrap();
let adjusted_transaction = Transaction::Deploy(txid, program_owner, Box::new(adjusted_deployment), fee);

// Verify the deployment transaction. It should panic when enforcing the first constraint over the vk limit.
let _ = vm.check_transaction(&adjusted_transaction, None, rng);
// Verify the deployment transaction. It should error when enforcing the first constraint over the vk limit.
let result = vm.check_transaction(&adjusted_transaction, None, rng);
assert!(result.is_err());

// Create a standard transaction
// Prepare the inputs.
let inputs = [
Value::<CurrentNetwork>::from_str(&address.to_string()).unwrap(),
Value::<CurrentNetwork>::from_str("1u64").unwrap(),
]
.into_iter();

// Execute.
let transaction =
vm.execute(&private_key, ("credits.aleo", "transfer_public"), inputs, None, 0, None, rng).unwrap();

// Check that the deployment transaction will be aborted if injected into a block.
let block = sample_next_block(&vm, &private_key, &[transaction, adjusted_transaction.clone()], rng).unwrap();

// Check that the block aborts the deployment transaction.
assert_eq!(block.aborted_transaction_ids(), &vec![adjusted_transaction.id()]);

// Update the VM.
vm.add_next_block(&block).unwrap();
}

#[test]
Expand Down
43 changes: 41 additions & 2 deletions synthesizer/src/vm/verify.rs
Expand Up @@ -31,6 +31,38 @@ macro_rules! ensure_is_unique {
};
}

impl<N: Network, C: ConsensusStorage<N>> VM<N, C> {
/// The maximum number of deployments to verify in parallel.
pub(crate) const MAX_PARALLEL_DEPLOY_VERIFICATIONS: usize = 5;
/// The maximum number of executions to verify in parallel.
pub(crate) const MAX_PARALLEL_EXECUTE_VERIFICATIONS: usize = 1000;

/// Verifies the list of transactions in the VM. On failure, returns an error.
pub fn check_transactions<R: CryptoRng + Rng>(
&self,
transactions: &[(&Transaction<N>, Option<Field<N>>)],
rng: &mut R,
) -> Result<()> {
// Separate the transactions into deploys and executions.
let (deployments, executions): (Vec<_>, Vec<_>) = transactions.iter().partition(|(tx, _)| tx.is_deploy());
// Chunk the deploys and executions into groups for parallel verification.
let deployments_for_verification = deployments.chunks(Self::MAX_PARALLEL_DEPLOY_VERIFICATIONS);
let executions_for_verification = executions.chunks(Self::MAX_PARALLEL_EXECUTE_VERIFICATIONS);

// Verify the transactions in batches.
for transactions in deployments_for_verification.chain(executions_for_verification) {
// Ensure each transaction is well-formed and unique.
let rngs = (0..transactions.len()).map(|_| StdRng::from_seed(rng.gen())).collect::<Vec<_>>();
cfg_iter!(transactions).zip(rngs).try_for_each(|((transaction, rejected_id), mut rng)| {
self.check_transaction(transaction, *rejected_id, &mut rng)
.map_err(|e| anyhow!("Invalid transaction found in the transactions list: {e}"))
})?;
}

Ok(())
}
}

impl<N: Network, C: ConsensusStorage<N>> VM<N, C> {
/// Verifies the transaction in the VM. On failure, returns an error.
#[inline]
Expand Down Expand Up @@ -121,7 +153,11 @@ impl<N: Network, C: ConsensusStorage<N>> VM<N, C> {
}
// Verify the deployment if it has not been verified before.
if !is_partially_verified {
self.check_deployment_internal(deployment, rng)?;
// Verify the deployment.
match try_vm_runtime!(|| self.check_deployment_internal(deployment, rng)) {
Ok(result) => result?,
Err(_) => bail!("VM safely halted transaction '{id}' during verification"),
}
}
}
Transaction::Execute(id, execution, _) => {
Expand All @@ -134,7 +170,10 @@ impl<N: Network, C: ConsensusStorage<N>> VM<N, C> {
bail!("Transaction '{id}' contains a previously rejected execution")
}
// Verify the execution.
self.check_execution_internal(execution, is_partially_verified)?;
match try_vm_runtime!(|| self.check_execution_internal(execution, is_partially_verified)) {
Ok(result) => result?,
Err(_) => bail!("VM safely halted transaction '{id}' during verification"),
}
}
Transaction::Fee(..) => { /* no-op */ }
}
Expand Down
12 changes: 6 additions & 6 deletions utilities/src/error.rs
Expand Up @@ -38,11 +38,11 @@ impl Error for crate::String {}
#[cfg(not(feature = "std"))]
impl Error for crate::io::Error {}

/// This purpose of this macro is to catch the instances of halting
/// without producing logs looking like unexpected panics. It prints
/// to stderr using the format: "Halted at <location>: <halt message>".
/// This macro provides a VM runtime environment which will safely halt
/// without producing logs that look like unexpected behavior.
/// It prints to stderr using the format: "VM safely halted at <location>: <halt message>".
#[macro_export]
macro_rules! handle_halting {
macro_rules! try_vm_runtime {
($e:expr) => {{
use std::panic;

Expand All @@ -52,12 +52,12 @@ macro_rules! handle_halting {
let msg = e.to_string();
let msg = msg.split_ascii_whitespace().skip_while(|&word| word != "panicked").collect::<Vec<&str>>();
let mut msg = msg.join(" ");
msg = msg.replacen("panicked", "Halted", 1);
msg = msg.replacen("panicked", "VM safely halted", 1);
eprintln!("{msg}");
}));

// Perform the operation that may panic.
let result = panic::catch_unwind($e);
let result = panic::catch_unwind(panic::AssertUnwindSafe($e));

// Restore the standard panic hook.
let _ = panic::take_hook();
Expand Down