Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(consensus): reject transactions that DOWN substates pledged as input refs #954

Closed
2 changes: 2 additions & 0 deletions applications/tari_dan_wallet_cli/src/command/transaction.rs
Expand Up @@ -268,6 +268,7 @@ pub async fn handle_submit(args: SubmitArgs, client: &mut WalletDaemonClient) ->
fee_instructions,
instructions,
inputs: common.inputs,
input_refs: vec![],
override_inputs: common.override_inputs.unwrap_or_default(),
is_dry_run: common.dry_run,
proof_ids: vec![],
Expand Down Expand Up @@ -307,6 +308,7 @@ async fn handle_submit_manifest(
.collect(),
instructions: instructions.instructions,
inputs: common.inputs,
input_refs: vec![],
override_inputs: common.override_inputs.unwrap_or_default(),
is_dry_run: common.dry_run,
proof_ids: vec![],
Expand Down
Expand Up @@ -81,6 +81,7 @@ pub async fn handle_submit_instruction(
fee_instructions: vec![],
instructions: vec![],
inputs: req.inputs,
input_refs: req.input_refs,
override_inputs: req.override_inputs.unwrap_or_default(),
is_dry_run: req.is_dry_run,
proof_ids: vec![],
Expand Down Expand Up @@ -142,6 +143,7 @@ pub async fn handle_submit(
.with_fee_instructions(req.fee_instructions)
.with_min_epoch(req.min_epoch)
.with_max_epoch(req.max_epoch)
.with_input_refs(req.input_refs)
.sign(&key.key)
.build()
};
Expand Down
3 changes: 3 additions & 0 deletions clients/wallet_daemon_client/src/types.rs
Expand Up @@ -71,6 +71,8 @@ pub struct CallInstructionRequest {
#[serde(default)]
pub inputs: Vec<SubstateRequirement>,
#[serde(default)]
pub input_refs: Vec<SubstateRequirement>,
#[serde(default)]
pub override_inputs: Option<bool>,
#[serde(default)]
pub new_outputs: Option<u8>,
Expand Down Expand Up @@ -99,6 +101,7 @@ pub struct TransactionSubmitRequest {
#[cfg_attr(feature = "ts", ts(type = "number | null"))]
pub signing_key_index: Option<u64>,
pub inputs: Vec<SubstateRequirement>,
pub input_refs: Vec<SubstateRequirement>,
pub override_inputs: bool,
pub is_dry_run: bool,
#[cfg_attr(feature = "ts", ts(type = "Array<number>"))]
Expand Down
68 changes: 46 additions & 22 deletions dan_layer/consensus/src/hotstuff/on_ready_to_vote_on_local_block.rs
@@ -1,7 +1,11 @@
// Copyright 2023 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause

use std::{collections::HashSet, num::NonZeroU64, ops::DerefMut};
use std::{
collections::{HashMap, HashSet},
num::NonZeroU64,
ops::DerefMut,
};

use log::*;
use tari_common::configuration::Network;
Expand Down Expand Up @@ -385,7 +389,7 @@ where TConsensusSpec: ConsensusSpec
local_committee_shard: &CommitteeShard,
) -> Result<Option<QuorumDecision>, HotStuffError> {
let mut total_leader_fee = 0;
let mut locked_inputs = HashSet::new();
let mut locked_inputs = HashMap::new();
let mut locked_outputs = HashSet::new();

// Executor used for transactions that have inputs without specific versions.
Expand Down Expand Up @@ -447,11 +451,10 @@ where TConsensusSpec: ConsensusSpec
if tx_rec.current_decision() == t.decision {
if tx_rec.current_decision().is_commit() {
let executed = self.get_executed_transaction(tx, &t.id, &mut executor)?;
let transaction = executed.transaction();

// Lock all inputs for the transaction as part of Prepare
let is_inputs_locked =
self.check_lock_inputs(tx, transaction, local_committee_shard, &mut locked_inputs)?;
self.check_lock_inputs(tx, &executed, local_committee_shard, &mut locked_inputs)?;
let is_outputs_locked =
is_inputs_locked && self.check_lock_outputs(tx, &executed, &mut locked_outputs)?;

Expand All @@ -461,7 +464,7 @@ where TConsensusSpec: ConsensusSpec
target: LOG_TARGET,
"❌ Unable to lock all inputs for transaction {} in block {}.",
block.id(),
transaction.id(),
executed.id(),
);
// We change our decision to ABORT so that the next time we propose/receive a
// proposal we will check for ABORT. It may
Expand Down Expand Up @@ -490,7 +493,7 @@ where TConsensusSpec: ConsensusSpec
// We need to update the database (transaction result and inputs/outpus)
// in case the transaction was re-executed because it has inputs without versions
let has_involved_shards = executed.num_involved_shards() > 0;
if transaction.has_inputs_without_version() && has_involved_shards {
if executed.transaction().has_inputs_without_version() && has_involved_shards {
executed.update(tx)?;
}
}
Expand Down Expand Up @@ -804,21 +807,27 @@ where TConsensusSpec: ConsensusSpec
fn check_lock_inputs(
&self,
tx: &mut <TConsensusSpec::StateStore as StateStore>::ReadTransaction<'_>,
transaction: &Transaction,
transaction: &ExecutedTransaction,
local_committee_shard: &CommitteeShard,
locked_inputs: &mut HashSet<SubstateAddress>,
locked_inputs: &mut HashMap<SubstateAddress, SubstateLockFlag>,
) -> Result<bool, HotStuffError> {
let Some(diff) = transaction.result().finalize.accept() else {
// No locking for aborted transactions. SIDENOTE: this isn't ever hit because we check for accept before
// calling.
return Ok(true);
};

let inputs_that_should_be_refs = transaction
.transaction()
.inputs()
.iter()
.filter(|input| diff.down_iter().all(|(s, _)| input.substate_id != *s))
.collect::<Vec<_>>();

// TODO: for inputs without version, investigate if we need to use the results of re-execution
let inputs = local_committee_shard
.filter(
transaction
.inputs()
.iter()
.chain(transaction.filled_inputs())
.filter(|i| i.version().is_some())
.map(|i| i.to_substate_address()),
)
.collect::<HashSet<_>>();
.filter(diff.down_iter().map(|(id, v)| SubstateAddress::from_address(id, *v)))
.collect::<Vec<_>>();
let state = SubstateRecord::check_lock_all(tx, inputs.iter(), SubstateLockFlag::Write)?;
if !state.is_acquired() {
warn!(
Expand All @@ -829,27 +838,28 @@ where TConsensusSpec: ConsensusSpec
);
return Ok(false);
}
if inputs.iter().any(|i| locked_inputs.contains(i)) {
if inputs.iter().any(|i| locked_inputs.contains_key(i)) {
warn!(
target: LOG_TARGET,
"❌ Locks for transaction {} conflict with other transactions in the block",
transaction.id(),
);
return Ok(false);
}
locked_inputs.extend(inputs);
locked_inputs.extend(inputs.into_iter().map(|i| (i, SubstateLockFlag::Write)));
// TODO: Same as before, for inputs without version, investigate if we need to use the results of re-execution
let inputs = local_committee_shard
let input_refs = local_committee_shard
.filter(
transaction
.transaction()
.input_refs()
.iter()
.chain(inputs_that_should_be_refs)
.filter(|i| i.version().is_some())
.map(|i| i.to_substate_address()),
)
.collect::<HashSet<_>>();
let state = SubstateRecord::check_lock_all(tx, inputs.iter(), SubstateLockFlag::Read)?;

let state = SubstateRecord::check_lock_all(tx, input_refs.iter(), SubstateLockFlag::Read)?;
if !state.is_acquired() {
warn!(
target: LOG_TARGET,
Expand All @@ -860,6 +870,20 @@ where TConsensusSpec: ConsensusSpec
return Ok(false);
}

if input_refs
.iter()
.any(|i| matches!(locked_inputs.get(i), Some(SubstateLockFlag::Write)))
{
warn!(
target: LOG_TARGET,
"❌ Once or more read locks for transaction {} conflict with other transactions in the block",
transaction.id(),
);
return Ok(false);
}

locked_inputs.extend(input_refs.into_iter().map(|i| (i, SubstateLockFlag::Read)));

debug!(
target: LOG_TARGET,
"🔒️ Locked inputs for transaction {}",
Expand Down
1 change: 0 additions & 1 deletion dan_layer/consensus_tests/src/consensus.rs
Expand Up @@ -298,7 +298,6 @@ async fn leader_failure_output_conflict() {
setup_logger();
let mut test = Test::builder()
.with_test_timeout(Duration::from_secs(60))
.debug_sql("/tmp/test{}.db")
.add_committee(0, vec!["1", "2"])
// .add_committee(1, vec!["3", "4"])
.start()
Expand Down
10 changes: 5 additions & 5 deletions dan_layer/indexer_lib/src/transaction_autofiller.rs
Expand Up @@ -73,14 +73,14 @@ where
}
for handle in handles {
let res = handle.await??;
if let Some((address, substate)) = res {
let shard = SubstateRequirement::new(address.clone(), Some(substate.version()));
if let Some((id, substate)) = res {
let shard = SubstateRequirement::new(id.clone(), Some(substate.version()));
if autofilled_transaction.input_refs().contains(&shard) {
// Shard is already an input as a ref
continue;
}
input_shards.push(shard);
found_substates.insert(address, substate);
found_substates.insert(id, substate);
}
}
info!(target: LOG_TARGET, "✏️️ Found {} input substates", found_substates.len());
Expand Down Expand Up @@ -113,7 +113,7 @@ where
for address in related_addresses {
info!(target: LOG_TARGET, "✏️️️ Found {} related substates", address);
let handle = tokio::spawn(get_substate(substate_scanner_ref.clone(), address.clone(), None));
handles.insert(address.clone(), handle);
handles.insert(address, handle);
}
for (address, handle) in handles {
let scan_res = handle.await??;
Expand All @@ -133,7 +133,7 @@ where
// Shard is already an input (TODO: what a waste)
continue;
}
autofilled_inputs.push(SubstateRequirement::new(id, Some(substate.version())));
autofilled_inputs.push(substate_requirement);
found_substates.insert(address, substate);
// found_this_round += 1;
} else {
Expand Down
5 changes: 5 additions & 0 deletions integration_tests/src/wallet_daemon_cli.rs
Expand Up @@ -220,6 +220,7 @@ pub async fn transfer_confidential(
override_inputs: false,
instructions: vec![],
inputs: vec![source_account_addr, dest_account_addr],
input_refs: vec![],
min_epoch,
max_epoch,
};
Expand Down Expand Up @@ -450,6 +451,7 @@ pub async fn submit_manifest_with_signing_keys(
is_dry_run: false,
proof_ids: vec![],
inputs,
input_refs: vec![],
min_epoch,
max_epoch,
};
Expand Down Expand Up @@ -523,6 +525,7 @@ pub async fn submit_manifest(
is_dry_run: false,
proof_ids: vec![],
inputs,
input_refs: vec![],
min_epoch,
max_epoch,
};
Expand Down Expand Up @@ -570,6 +573,7 @@ pub async fn submit_transaction(
override_inputs: false,
is_dry_run: false,
inputs,
input_refs: vec![],
proof_ids: vec![],
min_epoch,
max_epoch,
Expand Down Expand Up @@ -623,6 +627,7 @@ pub async fn create_component(
is_dry_run: false,
proof_ids: vec![],
inputs: vec![],
input_refs: vec![],
min_epoch,
max_epoch,
};
Expand Down