Skip to content

Commit

Permalink
[refactor] simplify the consensus submit response.
Browse files Browse the repository at this point in the history
  • Loading branch information
akichidis committed Oct 31, 2024
1 parent 13eec9e commit 3618464
Show file tree
Hide file tree
Showing 8 changed files with 65 additions and 68 deletions.
2 changes: 1 addition & 1 deletion consensus/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ mod test_dag_parser;

/// Exported consensus API.
pub use authority_node::ConsensusAuthority;
pub use block::{BlockAPI, Round, TransactionIndex};
pub use block::{BlockRef, BlockAPI, Round, TransactionIndex};
/// Exported API for testing.
pub use block::{TestBlock, Transaction, VerifiedBlock};
pub use commit::{CommitDigest, CommitIndex, CommitRef, CommittedSubDag};
Expand Down
49 changes: 12 additions & 37 deletions crates/sui-core/src/consensus_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use crate::consensus_handler::{classify, SequencedConsensusTransactionKey};
use crate::consensus_throughput_calculator::{ConsensusThroughputProfiler, Level};
use crate::epoch::reconfiguration::{ReconfigState, ReconfigurationInitiator};
use crate::metrics::LatencyObserver;
use consensus_core::ConnectionStatus;
use consensus_core::{BlockStatus, ConnectionStatus};
use mysten_metrics::{spawn_monitored_task, GaugeGuard, GaugeGuardFutureExt};
use sui_protocol_config::ProtocolConfig;
use sui_simulator::anemo::PeerId;
Expand Down Expand Up @@ -192,32 +192,7 @@ impl ConsensusAdapterMetrics {
}
}

pub enum BlockStatus {
Sequenced,
GarbageCollected,
}

pub enum SubmitResponse {
NoStatusWaiter(BlockStatus),
WithStatusWaiter(oneshot::Receiver<consensus_core::BlockStatus>),
}

impl SubmitResponse {
pub async fn wait_for_status(self) -> SuiResult<BlockStatus> {
match self {
SubmitResponse::NoStatusWaiter(status) => Ok(status),
SubmitResponse::WithStatusWaiter(receiver) => match receiver.await {
Ok(status) => match status {
consensus_core::BlockStatus::Sequenced(_) => Ok(BlockStatus::Sequenced),
consensus_core::BlockStatus::GarbageCollected(_) => {
Ok(BlockStatus::GarbageCollected)
}
},
Err(e) => Err(SuiError::ConsensusConnectionBroken(format!("{:?}", e))),
},
}
}
}
pub type BlockStatusReceiver = oneshot::Receiver<BlockStatus>;

#[mockall::automock]
#[async_trait::async_trait]
Expand All @@ -236,7 +211,7 @@ pub trait ConsensusClient: Sync + Send + 'static {
&self,
transactions: &[ConsensusTransaction],
epoch_store: &Arc<AuthorityPerEpochStore>,
) -> SuiResult<SubmitResponse>;
) -> SuiResult<BlockStatusReceiver>;
}

/// Submit Sui certificates to the consensus.
Expand Down Expand Up @@ -787,7 +762,7 @@ impl ConsensusAdapter {

loop {
// Submit the transaction to consensus and return the submit result with a status waiter
let submit_result = self
let status_waiter = self
.submit_inner(
&transactions,
epoch_store,
Expand All @@ -797,8 +772,8 @@ impl ConsensusAdapter {
)
.await;

match submit_result.wait_for_status().await {
Ok(BlockStatus::Sequenced) => {
match status_waiter.await {
Ok(BlockStatus::Sequenced(_)) => {
self.metrics
.sequencing_certificate_status
.with_label_values(&[tx_type, "sequenced"])
Expand All @@ -809,7 +784,7 @@ impl ConsensusAdapter {
);
break;
}
Ok(BlockStatus::GarbageCollected) => {
Ok(BlockStatus::GarbageCollected(_)) => {
self.metrics
.sequencing_certificate_status
.with_label_values(&[tx_type, "garbage_collected"])
Expand All @@ -823,9 +798,9 @@ impl ConsensusAdapter {
time::sleep(RETRY_DELAY_STEP).await;
continue;
}
Err(_err) => {
Err(err) => {
warn!(
"Error while waiting for status from consensus for transactions {transaction_keys:?}. Will be retried."
"Error while waiting for status from consensus for transactions {transaction_keys:?}, with error {:?}. Will be retried.", err
);
time::sleep(RETRY_DELAY_STEP).await;
continue;
Expand Down Expand Up @@ -899,11 +874,11 @@ impl ConsensusAdapter {
transaction_keys: &[SequencedConsensusTransactionKey],
tx_type: &str,
is_soft_bundle: bool,
) -> SubmitResponse {
) -> BlockStatusReceiver {
let ack_start = Instant::now();
let mut retries: u32 = 0;

let submit_response = loop {
let status_waiter = loop {
match self
.consensus_client
.submit(transactions, epoch_store)
Expand Down Expand Up @@ -955,7 +930,7 @@ impl ConsensusAdapter {
.with_label_values(&[&bucket, tx_type])
.observe(ack_start.elapsed().as_secs_f64());

submit_response
status_waiter
}

/// Waits for transactions to appear either to consensus output or been executed via a checkpoint (state sync).
Expand Down
4 changes: 2 additions & 2 deletions crates/sui-core/src/consensus_manager/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
use crate::consensus_adapter::{ConsensusClient, SubmitResponse};
use crate::consensus_adapter::{BlockStatusReceiver, ConsensusClient};
use crate::consensus_handler::ConsensusHandlerInitializer;
use crate::consensus_manager::mysticeti_manager::MysticetiManager;
use crate::consensus_validator::SuiTxValidator;
Expand Down Expand Up @@ -213,7 +213,7 @@ impl ConsensusClient for ConsensusClientWrapper {
&self,
transactions: &[ConsensusTransaction],
epoch_store: &Arc<AuthorityPerEpochStore>,
) -> SuiResult<SubmitResponse> {
) -> SuiResult<BlockStatusReceiver> {
let client = self.get().await;
client.submit(transactions, epoch_store).await
}
Expand Down
14 changes: 10 additions & 4 deletions crates/sui-core/src/epoch/randomness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -804,11 +804,13 @@ mod tests {
use crate::{
authority::test_authority_builder::TestAuthorityBuilder,
consensus_adapter::{
BlockStatus, ConnectionMonitorStatusForTests, ConsensusAdapter,
ConsensusAdapterMetrics, MockConsensusClient, SubmitResponse,
ConnectionMonitorStatusForTests, ConsensusAdapter, ConsensusAdapterMetrics,
MockConsensusClient,
},
epoch::randomness::*,
mock_consensus::with_block_status,
};
use consensus_core::{BlockRef, BlockStatus};
use std::num::NonZeroUsize;
use sui_protocol_config::ProtocolConfig;
use sui_protocol_config::{Chain, ProtocolVersion};
Expand Down Expand Up @@ -847,7 +849,7 @@ mod tests {
tx_consensus.try_send(transactions.to_vec()).unwrap();
true
})
.returning(|_, _| Ok(SubmitResponse::NoStatusWaiter(BlockStatus::Sequenced)));
.returning(|_, _| Ok(with_block_status(BlockStatus::Sequenced(BlockRef::MIN))));

let state = TestAuthorityBuilder::new()
.with_protocol_config(protocol_config.clone())
Expand Down Expand Up @@ -979,7 +981,11 @@ mod tests {
tx_consensus.try_send(transactions.to_vec()).unwrap();
true
})
.returning(|_, _| Ok(SubmitResponse::NoStatusWaiter(BlockStatus::Sequenced)));
.returning(|_, _| {
Ok(with_block_status(consensus_core::BlockStatus::Sequenced(
BlockRef::MIN,
)))
});

let state = TestAuthorityBuilder::new()
.with_protocol_config(protocol_config.clone())
Expand Down
17 changes: 13 additions & 4 deletions crates/sui-core/src/mock_consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@
use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
use crate::authority::{AuthorityMetrics, AuthorityState};
use crate::checkpoints::CheckpointServiceNoop;
use crate::consensus_adapter::{BlockStatus, ConsensusClient, SubmitResponse, SubmitToConsensus};
use crate::consensus_adapter::{BlockStatusReceiver, ConsensusClient, SubmitToConsensus};
use crate::consensus_handler::SequencedConsensusTransaction;
use consensus_core::BlockRef;
use prometheus::Registry;
use std::sync::{Arc, Weak};
use sui_types::error::{SuiError, SuiResult};
use sui_types::messages_consensus::{ConsensusTransaction, ConsensusTransactionKind};
use sui_types::transaction::VerifiedCertificate;
use tokio::sync::mpsc;
use tokio::sync::{mpsc, oneshot};
use tokio::task::JoinHandle;
use tracing::debug;

Expand Down Expand Up @@ -104,14 +105,22 @@ impl ConsensusClient for MockConsensusClient {
&self,
transactions: &[ConsensusTransaction],
_epoch_store: &Arc<AuthorityPerEpochStore>,
) -> SuiResult<SubmitResponse> {
) -> SuiResult<BlockStatusReceiver> {
// TODO: maybe support multi-transactions and remove this check
assert!(transactions.len() == 1);
let transaction = &transactions[0];
self.tx_sender
.send(transaction.clone())
.await
.map_err(|e| SuiError::Unknown(e.to_string()))?;
Ok(SubmitResponse::NoStatusWaiter(BlockStatus::Sequenced))
Ok(with_block_status(consensus_core::BlockStatus::Sequenced(
BlockRef::MIN,
)))
}
}

pub(crate) fn with_block_status(status: consensus_core::BlockStatus) -> BlockStatusReceiver {
let (tx, rx) = oneshot::channel();
tx.send(status).ok();
rx
}
6 changes: 3 additions & 3 deletions crates/sui-core/src/mysticeti_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use tracing::{error, info, warn};

use crate::{
authority::authority_per_epoch_store::AuthorityPerEpochStore,
consensus_adapter::{ConsensusClient, SubmitResponse},
consensus_adapter::{BlockStatusReceiver, ConsensusClient},
consensus_handler::SequencedConsensusTransactionKey,
};

Expand Down Expand Up @@ -80,7 +80,7 @@ impl ConsensusClient for LazyMysticetiClient {
&self,
transactions: &[ConsensusTransaction],
_epoch_store: &Arc<AuthorityPerEpochStore>,
) -> SuiResult<SubmitResponse> {
) -> SuiResult<BlockStatusReceiver> {
// TODO(mysticeti): confirm comment is still true
// The retrieved TransactionClient can be from the past epoch. Submit would fail after
// Mysticeti shuts down, so there should be no correctness issue.
Expand Down Expand Up @@ -127,6 +127,6 @@ impl ConsensusClient for LazyMysticetiClient {
let transaction_key = SequencedConsensusTransactionKey::External(transactions[0].key());
tracing::info!("Transaction {transaction_key:?} was included in {block_ref}",)
};
Ok(SubmitResponse::WithStatusWaiter(status_waiter))
Ok(status_waiter)
}
}
36 changes: 21 additions & 15 deletions crates/sui-core/src/unit_tests/consensus_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ use std::collections::HashSet;
use super::*;
use crate::authority::{authority_tests::init_state_with_objects, AuthorityState};
use crate::checkpoints::CheckpointServiceNoop;
use crate::consensus_adapter::BlockStatus;
use crate::consensus_handler::SequencedConsensusTransaction;
use crate::mock_consensus::with_block_status;
use consensus_core::{BlockRef, BlockStatus};
use fastcrypto::traits::KeyPair;
use move_core_types::{account_address::AccountAddress, ident_str};
use narwhal_types::Transactions;
Expand Down Expand Up @@ -191,7 +192,7 @@ pub fn make_consensus_adapter_for_test(
state: Arc<AuthorityState>,
process_via_checkpoint: HashSet<TransactionDigest>,
execute: bool,
mock_submit_responses: Vec<SubmitResponse>,
mock_block_status_receivers: Vec<BlockStatusReceiver>,
) -> Arc<ConsensusAdapter> {
let metrics = ConsensusAdapterMetrics::new_test();

Expand All @@ -200,7 +201,7 @@ pub fn make_consensus_adapter_for_test(
state: Arc<AuthorityState>,
process_via_checkpoint: HashSet<TransactionDigest>,
execute: bool,
mock_submit_responses: Arc<Mutex<Vec<SubmitResponse>>>,
mock_block_status_receivers: Arc<Mutex<Vec<BlockStatusReceiver>>>,
}

#[async_trait::async_trait]
Expand All @@ -209,7 +210,7 @@ pub fn make_consensus_adapter_for_test(
&self,
transactions: &[ConsensusTransaction],
epoch_store: &Arc<AuthorityPerEpochStore>,
) -> SuiResult<SubmitResponse> {
) -> SuiResult<BlockStatusReceiver> {
let sequenced_transactions: Vec<SequencedConsensusTransaction> = transactions
.iter()
.map(|txn| SequencedConsensusTransaction::new_test(txn.clone()))
Expand Down Expand Up @@ -272,10 +273,10 @@ pub fn make_consensus_adapter_for_test(
}

assert!(
!self.mock_submit_responses.lock().is_empty(),
!self.mock_block_status_receivers.lock().is_empty(),
"No mock submit responses left"
);
Ok(self.mock_submit_responses.lock().remove(0))
Ok(self.mock_block_status_receivers.lock().remove(0))
}
}
let epoch_store = state.epoch_store_for_testing();
Expand All @@ -285,7 +286,7 @@ pub fn make_consensus_adapter_for_test(
state: state.clone(),
process_via_checkpoint,
execute,
mock_submit_responses: Arc::new(Mutex::new(mock_submit_responses)),
mock_block_status_receivers: Arc::new(Mutex::new(mock_block_status_receivers)),
}),
state.name,
Arc::new(ConnectionMonitorStatusForTests {}),
Expand Down Expand Up @@ -315,13 +316,18 @@ async fn submit_transaction_to_consensus_adapter() {
let epoch_store = state.epoch_store_for_testing();

// Make a new consensus adapter instance.
let submit_responses = vec![
SubmitResponse::NoStatusWaiter(BlockStatus::GarbageCollected),
SubmitResponse::NoStatusWaiter(BlockStatus::GarbageCollected),
SubmitResponse::NoStatusWaiter(BlockStatus::Sequenced),
let block_status_receivers = vec![
with_block_status(BlockStatus::GarbageCollected(BlockRef::MIN)),
with_block_status(BlockStatus::GarbageCollected(BlockRef::MIN)),
with_block_status(BlockStatus::GarbageCollected(BlockRef::MIN)),
with_block_status(BlockStatus::Sequenced(BlockRef::MIN)),
];
let adapter =
make_consensus_adapter_for_test(state.clone(), HashSet::new(), false, submit_responses);
let adapter = make_consensus_adapter_for_test(
state.clone(),
HashSet::new(),
false,
block_status_receivers,
);

// Submit the transaction and ensure the adapter reports success to the caller. Note
// that consensus may drop some transactions (so we may need to resubmit them).
Expand Down Expand Up @@ -361,7 +367,7 @@ async fn submit_multiple_transactions_to_consensus_adapter() {
state.clone(),
process_via_checkpoint,
false,
vec![SubmitResponse::NoStatusWaiter(BlockStatus::Sequenced)],
vec![with_block_status(BlockStatus::Sequenced(BlockRef::MIN))],
);

// Submit the transaction and ensure the adapter reports success to the caller. Note
Expand Down Expand Up @@ -397,7 +403,7 @@ async fn submit_checkpoint_signature_to_consensus_adapter() {
state,
HashSet::new(),
false,
vec![SubmitResponse::NoStatusWaiter(BlockStatus::Sequenced)],
vec![with_block_status(BlockStatus::Sequenced(BlockRef::MIN))],
);

let checkpoint_summary = CheckpointSummary::new(
Expand Down
5 changes: 3 additions & 2 deletions crates/sui-core/src/unit_tests/transaction_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
// SPDX-License-Identifier: Apache-2.0

use crate::authority::test_authority_builder::TestAuthorityBuilder;
use crate::consensus_adapter::{BlockStatus, SubmitResponse};
use crate::mock_consensus::with_block_status;
use consensus_core::{BlockRef, BlockStatus};
use fastcrypto::{ed25519::Ed25519KeyPair, traits::KeyPair};
use fastcrypto_zkp::bn254::zk_login::{parse_jwks, OIDCProvider, ZkLoginInputs};
use move_core_types::ident_str;
Expand Down Expand Up @@ -1682,7 +1683,7 @@ async fn test_handle_soft_bundle_certificates() {
authority.clone(),
HashSet::new(),
true,
vec![SubmitResponse::NoStatusWaiter(BlockStatus::Sequenced)],
vec![with_block_status(BlockStatus::Sequenced(BlockRef::MIN))],
);
let server = AuthorityServer::new_for_test_with_consensus_adapter(authority.clone(), adapter);
let _metrics = server.metrics.clone();
Expand Down

0 comments on commit 3618464

Please sign in to comment.