From 238c0e1714d2f102af608ef1b04486780efd0949 Mon Sep 17 00:00:00 2001 From: Stan Bondi Date: Fri, 17 Jan 2025 09:05:07 +0400 Subject: [PATCH] fix flaky test --- .../src/base_layer_scanner.rs | 8 +- .../src/command/transaction.rs | 4 +- dan_layer/common_types/src/option.rs | 75 +++++++++++++------ .../src/hotstuff/block_change_set.rs | 2 +- .../hotstuff/foreign_proposal_processor.rs | 2 +- .../src/hotstuff/on_next_sync_view.rs | 2 +- .../consensus/src/hotstuff/on_propose.rs | 2 +- .../on_ready_to_vote_on_local_block.rs | 2 +- .../hotstuff/on_receive_foreign_proposal.rs | 8 +- ...on_receive_request_missing_transactions.rs | 2 +- .../hotstuff/substate_store/pending_store.rs | 24 +++++- dan_layer/consensus_tests/src/consensus.rs | 33 ++++---- .../consensus_tests/src/support/harness.rs | 9 --- .../src/support/validator/instance.rs | 8 +- dan_layer/state_tree/src/staged_store.rs | 2 +- .../storage/src/consensus_models/evidence.rs | 2 +- .../src/consensus_models/transaction.rs | 2 +- .../src/consensus_models/transaction_pool.rs | 2 +- 18 files changed, 110 insertions(+), 79 deletions(-) diff --git a/applications/tari_dan_app_utilities/src/base_layer_scanner.rs b/applications/tari_dan_app_utilities/src/base_layer_scanner.rs index f28ccc60f..1e87575e2 100644 --- a/applications/tari_dan_app_utilities/src/base_layer_scanner.rs +++ b/applications/tari_dan_app_utilities/src/base_layer_scanner.rs @@ -48,13 +48,7 @@ use tari_crypto::{ ristretto::RistrettoPublicKey, tari_utilities::{ByteArray, ByteArrayError}, }; -use tari_dan_common_types::{ - option::DisplayContainer, - optional::Optional, - Epoch, - NodeAddressable, - VersionedSubstateId, -}; +use tari_dan_common_types::{option::Displayable, optional::Optional, Epoch, NodeAddressable, VersionedSubstateId}; use tari_dan_storage::{ consensus_models::{BurntUtxo, SubstateRecord}, global::{GlobalDb, MetadataKey}, diff --git a/applications/tari_validator_node_cli/src/command/transaction.rs b/applications/tari_validator_node_cli/src/command/transaction.rs index 1d641e1c9..7d9064e8e 100644 --- a/applications/tari_validator_node_cli/src/command/transaction.rs +++ b/applications/tari_validator_node_cli/src/command/transaction.rs @@ -30,7 +30,7 @@ use std::{ use anyhow::anyhow; use clap::{Args, Subcommand}; use tari_dan_common_types::{ - option::{DisplayCont, DisplayContainer}, + option::{DisplayContainer, Displayable}, optional::Optional, SubstateAddress, SubstateRequirement, @@ -498,7 +498,7 @@ fn summarize_finalize_result(finalize: &FinalizeResult) { } fn display_vec(writer: &mut W, ty: &Type, result: &InstructionResult) -> fmt::Result { - fn display_slice(slice: &[T]) -> DisplayCont<&[T]> { + fn display_slice(slice: &[T]) -> DisplayContainer<&[T]> { slice.display() } diff --git a/dan_layer/common_types/src/option.rs b/dan_layer/common_types/src/option.rs index 12cbfaaf5..3b8c7d3e9 100644 --- a/dan_layer/common_types/src/option.rs +++ b/dan_layer/common_types/src/option.rs @@ -2,7 +2,7 @@ // SPDX-License-Identifier: BSD-3-Clause use std::{ - collections::{BTreeSet, HashSet}, + collections::{BTreeSet, HashMap, HashSet}, fmt, fmt::{Debug, Display}, }; @@ -12,7 +12,7 @@ use std::{ /// /// # Example /// ```rust -/// use tari_dan_common_types::option::DisplayContainer; +/// use tari_dan_common_types::option::Displayable; /// /// let some_value = Some(42); /// let none_value: Option = None; @@ -34,17 +34,17 @@ use std::{ /// "list: 1.01, 2.00, 3.00" /// ); /// ``` -pub trait DisplayContainer { +pub trait Displayable { type Item: ?Sized; - fn display(&self) -> DisplayCont<&'_ Self::Item>; + fn display(&self) -> DisplayContainer<&'_ Self::Item>; } #[derive(Debug, Clone, Copy)] -pub struct DisplayCont { +pub struct DisplayContainer { value: T, } -impl Display for DisplayCont<&'_ Option> { +impl Display for DisplayContainer<&'_ Option> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self.value { Some(value) => Display::fmt(value, f), @@ -53,81 +53,110 @@ impl Display for DisplayCont<&'_ Option> { } } -impl Display for DisplayCont<&'_ [T]> { +impl Display for DisplayContainer<&'_ [T]> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let len = self.value.len(); + write!(f, "[")?; for (i, item) in self.value.iter().enumerate() { Display::fmt(item, f)?; if i < len - 1 { write!(f, ", ")?; } } + write!(f, "]")?; Ok(()) } } -impl Display for DisplayCont<&'_ HashSet> { +impl Display for DisplayContainer<&'_ HashSet> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let len = self.value.len(); + write!(f, "{{")?; for (i, item) in self.value.iter().enumerate() { Display::fmt(item, f)?; if i < len - 1 { write!(f, ", ")?; } } + write!(f, "}}")?; Ok(()) } } -impl Display for DisplayCont<&'_ BTreeSet> { +impl Display for DisplayContainer<&'_ BTreeSet> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let len = self.value.len(); + write!(f, "{{")?; for (i, item) in self.value.iter().enumerate() { Display::fmt(item, f)?; if i < len - 1 { write!(f, ", ")?; } } + write!(f, "}}")?; Ok(()) } } -impl DisplayContainer for Option { +impl Display for DisplayContainer<&'_ HashMap> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let len = self.value.len(); + write!(f, "{{")?; + for (i, (k, v)) in self.value.iter().enumerate() { + write!(f, "{k}: {v}")?; + if i < len - 1 { + write!(f, ", ")?; + } + } + write!(f, "}}")?; + Ok(()) + } +} + +impl Displayable for Option { type Item = Self; - fn display(&self) -> DisplayCont<&'_ Self> { - DisplayCont { value: self } + fn display(&self) -> DisplayContainer<&'_ Self> { + DisplayContainer { value: self } } } -impl DisplayContainer for [T] { +impl Displayable for [T] { type Item = Self; - fn display(&self) -> DisplayCont<&'_ Self> { - DisplayCont { value: self } + fn display(&self) -> DisplayContainer<&'_ Self> { + DisplayContainer { value: self } } } -impl DisplayContainer for Vec { +impl Displayable for Vec { type Item = [T]; - fn display(&self) -> DisplayCont<&'_ [T]> { + fn display(&self) -> DisplayContainer<&'_ [T]> { (*self.as_slice()).display() } } -impl DisplayContainer for HashSet { +impl Displayable for HashSet { + type Item = Self; + + fn display(&self) -> DisplayContainer<&'_ Self> { + DisplayContainer { value: self } + } +} + +impl Displayable for BTreeSet { type Item = Self; - fn display(&self) -> DisplayCont<&'_ Self> { - DisplayCont { value: self } + fn display(&self) -> DisplayContainer<&'_ Self> { + DisplayContainer { value: self } } } -impl DisplayContainer for BTreeSet { +impl Displayable for HashMap { type Item = Self; - fn display(&self) -> DisplayCont<&'_ Self> { - DisplayCont { value: self } + fn display(&self) -> DisplayContainer<&'_ Self> { + DisplayContainer { value: self } } } diff --git a/dan_layer/consensus/src/hotstuff/block_change_set.rs b/dan_layer/consensus/src/hotstuff/block_change_set.rs index d7d7084bb..abf859e78 100644 --- a/dan_layer/consensus/src/hotstuff/block_change_set.rs +++ b/dan_layer/consensus/src/hotstuff/block_change_set.rs @@ -10,7 +10,7 @@ use std::{ use indexmap::IndexMap; use log::*; use tari_common_types::types::PublicKey; -use tari_dan_common_types::{option::DisplayContainer, optional::Optional, shard::Shard, Epoch, ShardGroup}; +use tari_dan_common_types::{option::Displayable, optional::Optional, shard::Shard, Epoch, ShardGroup}; use tari_dan_storage::{ consensus_models::{ Block, diff --git a/dan_layer/consensus/src/hotstuff/foreign_proposal_processor.rs b/dan_layer/consensus/src/hotstuff/foreign_proposal_processor.rs index 29e347b56..a0158aee9 100644 --- a/dan_layer/consensus/src/hotstuff/foreign_proposal_processor.rs +++ b/dan_layer/consensus/src/hotstuff/foreign_proposal_processor.rs @@ -2,7 +2,7 @@ // SPDX-License-Identifier: BSD-3-Clause use log::*; -use tari_dan_common_types::{committee::CommitteeInfo, option::DisplayContainer, ShardGroup, SubstateAddress}; +use tari_dan_common_types::{committee::CommitteeInfo, option::Displayable, ShardGroup, SubstateAddress}; use tari_dan_storage::{ consensus_models::{ BlockPledge, diff --git a/dan_layer/consensus/src/hotstuff/on_next_sync_view.rs b/dan_layer/consensus/src/hotstuff/on_next_sync_view.rs index d2ccc9000..d94c8bc17 100644 --- a/dan_layer/consensus/src/hotstuff/on_next_sync_view.rs +++ b/dan_layer/consensus/src/hotstuff/on_next_sync_view.rs @@ -2,7 +2,7 @@ // SPDX-License-Identifier: BSD-3-Clause use log::*; -use tari_dan_common_types::{committee::Committee, option::DisplayContainer, optional::Optional, Epoch, NodeHeight}; +use tari_dan_common_types::{committee::Committee, option::Displayable, optional::Optional, Epoch, NodeHeight}; use tari_dan_storage::{ consensus_models::{HighQc, LastSentVote, LeafBlock}, StateStore, diff --git a/dan_layer/consensus/src/hotstuff/on_propose.rs b/dan_layer/consensus/src/hotstuff/on_propose.rs index d64ebacf0..b3b10c019 100644 --- a/dan_layer/consensus/src/hotstuff/on_propose.rs +++ b/dan_layer/consensus/src/hotstuff/on_propose.rs @@ -12,7 +12,7 @@ use tari_common_types::types::{FixedHash, PublicKey}; use tari_crypto::tari_utilities::epoch_time::EpochTime; use tari_dan_common_types::{ committee::{Committee, CommitteeInfo}, - option::DisplayContainer, + option::Displayable, optional::Optional, shard::Shard, Epoch, diff --git a/dan_layer/consensus/src/hotstuff/on_ready_to_vote_on_local_block.rs b/dan_layer/consensus/src/hotstuff/on_ready_to_vote_on_local_block.rs index 06b833b88..2b3966c1d 100644 --- a/dan_layer/consensus/src/hotstuff/on_ready_to_vote_on_local_block.rs +++ b/dan_layer/consensus/src/hotstuff/on_ready_to_vote_on_local_block.rs @@ -7,7 +7,7 @@ use log::*; use tari_crypto::ristretto::RistrettoPublicKey; use tari_dan_common_types::{ committee::CommitteeInfo, - option::DisplayContainer, + option::Displayable, optional::Optional, Epoch, ShardGroup, diff --git a/dan_layer/consensus/src/hotstuff/on_receive_foreign_proposal.rs b/dan_layer/consensus/src/hotstuff/on_receive_foreign_proposal.rs index daf470483..c6d12f0b7 100644 --- a/dan_layer/consensus/src/hotstuff/on_receive_foreign_proposal.rs +++ b/dan_layer/consensus/src/hotstuff/on_receive_foreign_proposal.rs @@ -4,13 +4,7 @@ use std::collections::HashSet; use log::*; -use tari_dan_common_types::{ - committee::CommitteeInfo, - option::DisplayContainer, - optional::Optional, - Epoch, - ShardGroup, -}; +use tari_dan_common_types::{committee::CommitteeInfo, option::Displayable, optional::Optional, Epoch, ShardGroup}; use tari_dan_storage::{ consensus_models::{ Block, diff --git a/dan_layer/consensus/src/hotstuff/on_receive_request_missing_transactions.rs b/dan_layer/consensus/src/hotstuff/on_receive_request_missing_transactions.rs index d49d6ee71..5acaa7c3c 100644 --- a/dan_layer/consensus/src/hotstuff/on_receive_request_missing_transactions.rs +++ b/dan_layer/consensus/src/hotstuff/on_receive_request_missing_transactions.rs @@ -2,7 +2,7 @@ // SPDX-License-Identifier: BSD-3-Clause use log::*; -use tari_dan_common_types::option::DisplayContainer; +use tari_dan_common_types::option::Displayable; use tari_dan_storage::{consensus_models::TransactionRecord, StateStore}; use crate::{ diff --git a/dan_layer/consensus/src/hotstuff/substate_store/pending_store.rs b/dan_layer/consensus/src/hotstuff/substate_store/pending_store.rs index 65be6158a..df6998c5e 100644 --- a/dan_layer/consensus/src/hotstuff/substate_store/pending_store.rs +++ b/dan_layer/consensus/src/hotstuff/substate_store/pending_store.rs @@ -6,6 +6,7 @@ use std::{borrow::Cow, collections::HashMap, fmt::Display}; use indexmap::IndexMap; use log::*; use tari_dan_common_types::{ + option::Displayable, optional::Optional, LockIntent, NumPreshards, @@ -531,7 +532,7 @@ impl<'a, 'tx, TStore: StateStore + 'a + 'tx> PendingSubstateStore<'a, 'tx, TStor fn get_pending(&self, addr: &SubstateAddress) -> Option<&SubstateChange> { self.pending .get(addr) - .map(|&pos| self.diff.get(pos).expect("Index map and diff are out of sync")) + .map(|&pos| self.diff.get(pos).expect("pending map and diff are out of sync")) } fn insert(&mut self, change: SubstateChange) { @@ -565,6 +566,13 @@ impl<'a, 'tx, TStore: StateStore + 'a + 'tx> PendingSubstateStore<'a, 'tx, TStor } fn assert_is_up(&self, id: &VersionedSubstateId) -> Result<(), SubstateStoreError> { + debug!( + target: LOG_TARGET, + "assert_is_up: id: {}, pending: {}, head: {}", + id, + self.pending.display(), + self.head.display() + ); if let Some(change) = self.get_pending(&id.to_substate_address()) { if change.is_down() { return Err(SubstateStoreError::SubstateIsDown { id: id.clone() }); @@ -572,8 +580,14 @@ impl<'a, 'tx, TStore: StateStore + 'a + 'tx> PendingSubstateStore<'a, 'tx, TStor return Ok(()); } + debug!( + target: LOG_TARGET, + "assert_is_up id: {} not found in pending", + id, + ); + if let Some(change) = - BlockDiff::get_for_substate(self.read_transaction(), &self.parent_block, &id.substate_id).optional()? + BlockDiff::get_for_substate(self.read_transaction(), &self.parent_block, id.substate_id()).optional()? { if change.is_up() { return Ok(()); @@ -581,6 +595,12 @@ impl<'a, 'tx, TStore: StateStore + 'a + 'tx> PendingSubstateStore<'a, 'tx, TStor return Err(SubstateStoreError::SubstateIsDown { id: id.clone() }); } + debug!( + target: LOG_TARGET, + "assert_is_up: id: {} not found in block diff", + id, + ); + match SubstateRecord::substate_is_up(self.read_transaction(), &id.to_substate_address()).optional()? { Some(true) => Ok(()), Some(false) => Err(SubstateStoreError::SubstateIsDown { id: id.clone() }), diff --git a/dan_layer/consensus_tests/src/consensus.rs b/dan_layer/consensus_tests/src/consensus.rs index 43b7bef1b..10e5d10af 100644 --- a/dan_layer/consensus_tests/src/consensus.rs +++ b/dan_layer/consensus_tests/src/consensus.rs @@ -748,11 +748,11 @@ async fn multishard_output_conflict_abort() { test.assert_all_validators_at_same_height().await; // Currently not deterministic (test harness) which transaction will arrive first so we check that one transaction // is committed and the other is aborted. TODO: It is also possible that both are aborted. - let tx1_vn1 = test.get_transaction_record(&TestAddress::new("1"), tx_ids[0]); - let tx2_vn1 = test.get_transaction_record(&TestAddress::new("1"), tx_ids[1]); + let tx1_vn1 = test.get_validator(&TestAddress::new("1")).get_transaction(tx_ids[0]); + let tx2_vn1 = test.get_validator(&TestAddress::new("1")).get_transaction(tx_ids[1]); - let tx1_vn3 = test.get_transaction_record(&TestAddress::new("3"), tx_ids[0]); - let tx2_vn3 = test.get_transaction_record(&TestAddress::new("3"), tx_ids[1]); + let tx1_vn3 = test.get_validator(&TestAddress::new("3")).get_transaction(tx_ids[0]); + let tx2_vn3 = test.get_validator(&TestAddress::new("3")).get_transaction(tx_ids[1]); assert_eq!(tx1_vn1.final_decision().unwrap(), tx1_vn3.final_decision().unwrap()); assert_eq!(tx2_vn1.final_decision().unwrap(), tx2_vn3.final_decision().unwrap()); @@ -813,22 +813,19 @@ async fn single_shard_inputs_from_previous_outputs() { } test.assert_all_validators_at_same_height().await; - // We do not work out input dependencies when we sequence transactions in blocks. Currently ordering within a block - // is lexicographical by transaction id, therefore both will only be committed if tx1 happens to be sequenced - // first. - // if tx1.id() < tx2.id() { + // Assert that the decision matches for all validators. If tx2 is sequenced first, then it will be aborted due to + // the input not existing test.assert_all_validators_have_decision(tx1.id(), Decision::Commit) .await; - test.assert_all_validators_have_decision(tx2.id(), Decision::Commit) - .await; - // TODO: this is no longer true - it always commits both. Need to confirm correctness because it may be that the - // implementation is more intelligent (correct sequencing or downgrading to a read lock) but not certain. - // } else { - // test.assert_all_validators_have_decision(tx1.id(), Decision::Commit) - // .await; - // test.assert_all_validators_have_decision(tx2.id(), Decision::Abort(AbortReason::OneOrMoreInputsNotFound)) - // .await; - // } + let decision_tx2 = test + .get_validator(&TestAddress::new("1")) + .get_transaction(tx2.id()) + .final_decision() + .expect("tx2 final decision not reached"); + test.assert_all_validators_have_decision(tx2.id(), decision_tx2).await; + if let Some(reason) = decision_tx2.abort_reason() { + assert_eq!(reason, AbortReason::OneOrMoreInputsNotFound); + } test.assert_clean_shutdown().await; log::info!("total messages sent: {}", test.network().total_messages_sent()); diff --git a/dan_layer/consensus_tests/src/support/harness.rs b/dan_layer/consensus_tests/src/support/harness.rs index 4acbc0c47..c01ba738a 100644 --- a/dan_layer/consensus_tests/src/support/harness.rs +++ b/dan_layer/consensus_tests/src/support/harness.rs @@ -214,15 +214,6 @@ impl Test { &self.validators } - pub fn get_transaction_record(&self, address: &TestAddress, transaction_id: &TransactionId) -> TransactionRecord { - self.validators - .get(address) - .unwrap() - .state_store - .with_read_tx(|tx| TransactionRecord::get(tx, transaction_id)) - .unwrap() - } - pub async fn on_hotstuff_event(&mut self) -> (TestAddress, HotstuffEvent) { if self.network.task_handle().is_finished() { panic!("Network task exited while waiting for Hotstuff event"); diff --git a/dan_layer/consensus_tests/src/support/validator/instance.rs b/dan_layer/consensus_tests/src/support/validator/instance.rs index d8ea856d2..b175767ce 100644 --- a/dan_layer/consensus_tests/src/support/validator/instance.rs +++ b/dan_layer/consensus_tests/src/support/validator/instance.rs @@ -7,7 +7,7 @@ use tari_consensus::{ }; use tari_dan_common_types::{optional::Optional, NodeHeight, ShardGroup, SubstateAddress}; use tari_dan_storage::{ - consensus_models::{BlockId, LeafBlock}, + consensus_models::{BlockId, LeafBlock, TransactionRecord}, StateStore, StateStoreReadTransaction, }; @@ -92,4 +92,10 @@ impl Validator { let tx = self.state_store().create_read_tx().unwrap(); tx.substates_exists_for_transaction(tx_id).unwrap() } + + pub fn get_transaction(&self, transaction_id: &TransactionId) -> TransactionRecord { + self.state_store + .with_read_tx(|tx| TransactionRecord::get(tx, transaction_id)) + .unwrap() + } } diff --git a/dan_layer/state_tree/src/staged_store.rs b/dan_layer/state_tree/src/staged_store.rs index 1342150e7..e9b30513b 100644 --- a/dan_layer/state_tree/src/staged_store.rs +++ b/dan_layer/state_tree/src/staged_store.rs @@ -4,7 +4,7 @@ use std::collections::{HashMap, VecDeque}; use log::debug; -use tari_dan_common_types::option::DisplayContainer; +use tari_dan_common_types::option::Displayable; use tari_jellyfish::{JmtStorageError, Node, NodeKey, StaleTreeNode, TreeStoreReader, TreeStoreWriter}; use crate::StateHashTreeDiff; diff --git a/dan_layer/storage/src/consensus_models/evidence.rs b/dan_layer/storage/src/consensus_models/evidence.rs index db021c17d..197876c0d 100644 --- a/dan_layer/storage/src/consensus_models/evidence.rs +++ b/dan_layer/storage/src/consensus_models/evidence.rs @@ -9,7 +9,7 @@ use log::*; use serde::{Deserialize, Serialize}; use tari_dan_common_types::{ borsh::indexmap as indexmap_borsh, - option::DisplayContainer, + option::Displayable, LockIntent, NumPreshards, ShardGroup, diff --git a/dan_layer/storage/src/consensus_models/transaction.rs b/dan_layer/storage/src/consensus_models/transaction.rs index 3bec5f0f6..0d0ddc0bb 100644 --- a/dan_layer/storage/src/consensus_models/transaction.rs +++ b/dan_layer/storage/src/consensus_models/transaction.rs @@ -11,7 +11,7 @@ use log::*; use serde::Deserialize; use tari_dan_common_types::{ committee::CommitteeInfo, - option::DisplayContainer, + option::Displayable, NumPreshards, SubstateLockType, VersionedSubstateId, diff --git a/dan_layer/storage/src/consensus_models/transaction_pool.rs b/dan_layer/storage/src/consensus_models/transaction_pool.rs index be248c942..155b21e65 100644 --- a/dan_layer/storage/src/consensus_models/transaction_pool.rs +++ b/dan_layer/storage/src/consensus_models/transaction_pool.rs @@ -13,7 +13,7 @@ use log::*; use serde::Serialize; use tari_dan_common_types::{ committee::CommitteeInfo, - option::DisplayContainer, + option::Displayable, optional::{IsNotFoundError, Optional}, NumPreshards, SubstateAddress,