Skip to content

Commit

Permalink
CheckpointService refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
mystenmark committed Jan 9, 2025
1 parent 560668e commit 06ff953
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 35 deletions.
3 changes: 2 additions & 1 deletion crates/sui-core/src/authority/authority_per_epoch_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use move_bytecode_utils::module_cache::SyncModuleCache;
use mysten_common::sync::notify_once::NotifyOnce;
use mysten_common::sync::notify_read::NotifyRead;
use mysten_metrics::monitored_scope;
use nonempty::NonEmpty;
use parking_lot::RwLock;
use parking_lot::{Mutex, RwLockReadGuard, RwLockWriteGuard};
use prometheus::IntCounter;
Expand Down Expand Up @@ -4031,7 +4032,7 @@ impl AuthorityPerEpochStore {
pub fn process_pending_checkpoint(
&self,
commit_height: CheckpointHeight,
content_info: Vec<(CheckpointSummary, CheckpointContents)>,
content_info: NonEmpty<(CheckpointSummary, CheckpointContents)>,
) -> SuiResult<()> {
let tables = self.tables()?;
// All created checkpoints are inserted in builder_checkpoint_summary in a single batch.
Expand Down
153 changes: 123 additions & 30 deletions crates/sui-core/src/checkpoints/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ use crate::stake_aggregator::{InsertResult, MultiStakeAggregator};
use crate::state_accumulator::StateAccumulator;
use diffy::create_patch;
use itertools::Itertools;
use mysten_common::{debug_fatal, fatal};
use mysten_metrics::{monitored_future, monitored_scope, MonitoredFutureExt};
use nonempty::NonEmpty;
use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
use sui_macros::fail_point;
Expand All @@ -28,6 +30,7 @@ use sui_types::base_types::ConciseableName;
use sui_types::executable_transaction::VerifiedExecutableTransaction;
use sui_types::messages_checkpoint::CheckpointCommitment;
use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
use tokio::sync::watch;

use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
use crate::consensus_handler::SequencedConsensusTransactionKey;
Expand Down Expand Up @@ -475,7 +478,7 @@ impl CheckpointStore {
?local_contents,
"Local checkpoint fork detected!",
);
panic!(
fatal!(
"Local checkpoint fork detected for sequence number: {}",
local_checkpoint.sequence_number()
);
Expand Down Expand Up @@ -860,6 +863,7 @@ pub struct CheckpointBuilder {
epoch_store: Arc<AuthorityPerEpochStore>,
notify: Arc<Notify>,
notify_aggregator: Arc<Notify>,
last_built: watch::Sender<CheckpointSequenceNumber>,
effects_store: Arc<dyn TransactionCacheRead>,
accumulator: Weak<StateAccumulator>,
output: Box<dyn CheckpointOutput>,
Expand Down Expand Up @@ -900,6 +904,7 @@ impl CheckpointBuilder {
accumulator: Weak<StateAccumulator>,
output: Box<dyn CheckpointOutput>,
notify_aggregator: Arc<Notify>,
last_built: watch::Sender<CheckpointSequenceNumber>,
metrics: Arc<CheckpointMetrics>,
max_transactions_per_checkpoint: usize,
max_checkpoint_size_bytes: usize,
Expand All @@ -913,6 +918,7 @@ impl CheckpointBuilder {
accumulator,
output,
notify_aggregator,
last_built,
metrics,
max_transactions_per_checkpoint,
max_checkpoint_size_bytes,
Expand Down Expand Up @@ -985,14 +991,24 @@ impl CheckpointBuilder {
checkpoint_commit_height = height,
"Making checkpoint at commit height"
);
if let Err(e) = self

match self
.make_checkpoint(std::mem::take(&mut grouped_pending_checkpoints))
.await
{
error!("Error while making checkpoint, will retry in 1s: {:?}", e);
tokio::time::sleep(Duration::from_secs(1)).await;
self.metrics.checkpoint_errors.inc();
return;
Ok(seq) => {
self.last_built.send_if_modified(|cur| {
assert!(seq > *cur);
*cur = seq;
true
});
}
Err(e) => {
error!("Error while making checkpoint, will retry in 1s: {:?}", e);
tokio::time::sleep(Duration::from_secs(1)).await;
self.metrics.checkpoint_errors.inc();
return;
}
}
// ensure that the task can be cancelled at end of epoch, even if no other await yields
// execution.
Expand All @@ -1005,7 +1021,10 @@ impl CheckpointBuilder {
}

#[instrument(level = "debug", skip_all, fields(last_height = pendings.last().unwrap().details().checkpoint_height))]
async fn make_checkpoint(&self, pendings: Vec<PendingCheckpointV2>) -> anyhow::Result<()> {
async fn make_checkpoint(
&self,
pendings: Vec<PendingCheckpointV2>,
) -> anyhow::Result<CheckpointSequenceNumber> {
let last_details = pendings.last().unwrap().details().clone();

// Keeps track of the effects that are already included in the current checkpoint.
Expand All @@ -1027,9 +1046,10 @@ impl CheckpointBuilder {
let new_checkpoint = self
.create_checkpoints(sorted_tx_effects_included_in_checkpoint, &last_details)
.await?;
let highest_sequence = *new_checkpoint.last().0.sequence_number();
self.write_checkpoints(last_details.checkpoint_height, new_checkpoint)
.await?;
Ok(())
Ok(highest_sequence)
}

// Given the root transactions of a pending checkpoint, resolve the transactions should be included in
Expand Down Expand Up @@ -1153,7 +1173,7 @@ impl CheckpointBuilder {
async fn write_checkpoints(
&self,
height: CheckpointHeight,
new_checkpoints: Vec<(CheckpointSummary, CheckpointContents)>,
new_checkpoints: NonEmpty<(CheckpointSummary, CheckpointContents)>,
) -> SuiResult {
let _scope = monitored_scope("CheckpointBuilder::write_checkpoints");
let mut batch = self.tables.checkpoint_content.batch();
Expand Down Expand Up @@ -1283,7 +1303,7 @@ impl CheckpointBuilder {
&self,
all_effects: Vec<TransactionEffects>,
details: &PendingCheckpointInfo,
) -> anyhow::Result<Vec<(CheckpointSummary, CheckpointContents)>> {
) -> anyhow::Result<NonEmpty<(CheckpointSummary, CheckpointContents)>> {
let _scope = monitored_scope("CheckpointBuilder::create_checkpoints");
let total = all_effects.len();
let mut last_checkpoint = self.epoch_store.last_built_checkpoint_summary()?;
Expand Down Expand Up @@ -1519,7 +1539,7 @@ impl CheckpointBuilder {
checkpoints.push((summary, contents));
}

Ok(checkpoints)
Ok(NonEmpty::from_vec(checkpoints).expect("at least one checkpoint"))
}

fn get_epoch_total_gas_cost(
Expand Down Expand Up @@ -2187,17 +2207,40 @@ pub trait CheckpointServiceNotify {
fn notify_checkpoint(&self) -> SuiResult;
}

/// This is a service used to communicate with other pieces of sui(for ex. authority)
enum CheckpointServiceState {
Unstarted((CheckpointBuilder, CheckpointAggregator)),
Started,
}

impl CheckpointServiceState {
fn take(&mut self) -> (CheckpointBuilder, CheckpointAggregator) {
let mut state = CheckpointServiceState::Started;
std::mem::swap(self, &mut state);

match state {
CheckpointServiceState::Unstarted((builder, aggregator)) => (builder, aggregator),
CheckpointServiceState::Started => panic!("CheckpointServiceState is already started"),
}
}
}

pub struct CheckpointService {
tables: Arc<CheckpointStore>,
notify_builder: Arc<Notify>,
notify_aggregator: Arc<Notify>,
last_signature_index: Mutex<u64>,
// The highest sequence number that had already been built at the time CheckpointService
// was constructed
last_built_seq: CheckpointSequenceNumber,
// A notification for each time a new checkpoint is built.
cur_built_tx: watch::Sender<CheckpointSequenceNumber>,
metrics: Arc<CheckpointMetrics>,
state: Mutex<CheckpointServiceState>,
}

impl CheckpointService {
pub fn spawn(
/// Constructs a new CheckpointService in an un-started state.
pub fn build(
state: Arc<AuthorityState>,
checkpoint_store: Arc<CheckpointStore>,
epoch_store: Arc<AuthorityPerEpochStore>,
Expand All @@ -2208,14 +2251,29 @@ impl CheckpointService {
metrics: Arc<CheckpointMetrics>,
max_transactions_per_checkpoint: usize,
max_checkpoint_size_bytes: usize,
) -> (Arc<Self>, JoinSet<()> /* Handle to tasks */) {
) -> Arc<Self> {
info!(
"Starting checkpoint service with {max_transactions_per_checkpoint} max_transactions_per_checkpoint and {max_checkpoint_size_bytes} max_checkpoint_size_bytes"
);
let notify_builder = Arc::new(Notify::new());
let notify_aggregator = Arc::new(Notify::new());

let mut tasks = JoinSet::new();
let last_built_seq = epoch_store
.last_built_checkpoint_builder_summary()
.expect("epoch should not have ended")
.map(|s| s.summary.sequence_number)
.unwrap_or(0);

let (cur_built_tx, _) = watch::channel(last_built_seq);

let aggregator = CheckpointAggregator::new(
checkpoint_store.clone(),
epoch_store.clone(),
notify_aggregator.clone(),
certified_checkpoint_output,
state.clone(),
metrics.clone(),
);

let builder = CheckpointBuilder::new(
state.clone(),
Expand All @@ -2226,36 +2284,70 @@ impl CheckpointService {
accumulator,
checkpoint_output,
notify_aggregator.clone(),
cur_built_tx.clone(),
metrics.clone(),
max_transactions_per_checkpoint,
max_checkpoint_size_bytes,
);
tasks.spawn(monitored_future!(builder.run()));

let aggregator = CheckpointAggregator::new(
checkpoint_store.clone(),
epoch_store.clone(),
notify_aggregator.clone(),
certified_checkpoint_output,
state.clone(),
metrics.clone(),
);
tasks.spawn(monitored_future!(aggregator.run()));

let last_signature_index = epoch_store
.get_last_checkpoint_signature_index()
.expect("should not cross end of epoch");
let last_signature_index = Mutex::new(last_signature_index);

let service = Arc::new(Self {
Arc::new(Self {
tables: checkpoint_store,
notify_builder,
notify_aggregator,
last_signature_index,
last_built_seq,
cur_built_tx,
metrics,
});
state: Mutex::new(CheckpointServiceState::Unstarted((builder, aggregator))),
})
}

/// Starts the CheckpointService.
///
/// This function blocks until the CheckpointBuilder re-builds all checkpoints that had
/// been built before the most recent restart. You can think of this as a WAL replay
/// operation. Upon startup, we may have a number of consensus commits and resulting
/// checkpoints that were built but not committed to disk. We want to reprocess the
/// commits and rebuild the checkpoints before starting normal operation.
pub async fn spawn(&self) -> JoinSet<()> {
let mut tasks = JoinSet::new();

let (builder, aggregator) = self.state.lock().take();
tasks.spawn(monitored_future!(builder.run()));
tasks.spawn(monitored_future!(aggregator.run()));

(service, tasks)
loop {
if tokio::time::timeout(Duration::from_secs(10), self.wait_for_rebuilt_checkpoints())
.await
.is_ok()
{
break;
} else {
debug_fatal!("Still waiting for checkpoints to be rebuilt");
}
}

tasks
}
}

impl CheckpointService {
/// Waits until the last_built_seq available in last_built_rx is >= last_built_seq
pub async fn wait_for_rebuilt_checkpoints(&self) {
let last_built_seq = self.last_built_seq;
let mut rx = self.cur_built_tx.subscribe();
loop {
let cur_last_built_seq = *rx.borrow_and_update();
if cur_last_built_seq >= last_built_seq {
break;
}
rx.changed().await.unwrap();
}
}

#[cfg(test)]
Expand Down Expand Up @@ -2500,7 +2592,7 @@ mod tests {
&epoch_store,
));

let (checkpoint_service, _tasks) = CheckpointService::spawn(
let checkpoint_service = CheckpointService::build(
state.clone(),
checkpoint_store,
epoch_store.clone(),
Expand All @@ -2512,6 +2604,7 @@ mod tests {
3,
100_000,
);
checkpoint_service.spawn().await;

checkpoint_service
.write_and_notify_checkpoint_for_testing(&epoch_store, p(0, vec![4], 0))
Expand Down
4 changes: 3 additions & 1 deletion crates/sui-core/src/unit_tests/mysticeti_manager_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use std::{sync::Arc, time::Duration};

use fastcrypto::traits::KeyPair;
use futures::FutureExt;
use mysten_metrics::RegistryService;
use prometheus::Registry;
use sui_swarm_config::network_config_builder::ConfigBuilder;
Expand Down Expand Up @@ -34,7 +35,7 @@ pub fn checkpoint_service_for_testing(state: Arc<AuthorityState>) -> Arc<Checkpo
));
let (certified_output, _certified_result) = mpsc::channel::<CertifiedCheckpointSummary>(10);

let (checkpoint_service, _) = CheckpointService::spawn(
let checkpoint_service = CheckpointService::build(
state.clone(),
state.get_checkpoint_store().clone(),
epoch_store.clone(),
Expand All @@ -46,6 +47,7 @@ pub fn checkpoint_service_for_testing(state: Arc<AuthorityState>) -> Arc<Checkpo
3,
100_000,
);
checkpoint_service.spawn().now_or_never().unwrap();
checkpoint_service
}

Expand Down
8 changes: 5 additions & 3 deletions crates/sui-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1302,7 +1302,7 @@ impl SuiNode {
sui_node_metrics: Arc<SuiNodeMetrics>,
sui_tx_validator_metrics: Arc<SuiTxValidatorMetrics>,
) -> Result<ValidatorComponents> {
let (checkpoint_service, checkpoint_service_tasks) = Self::start_checkpoint_service(
let checkpoint_service = Self::start_checkpoint_service(
config,
consensus_adapter.clone(),
checkpoint_store,
Expand Down Expand Up @@ -1374,6 +1374,8 @@ impl SuiNode {
)
.await;

let checkpoint_service_tasks = checkpoint_service.spawn().await;

if epoch_store.authenticator_state_enabled() {
Self::start_jwk_updater(
config,
Expand Down Expand Up @@ -1405,7 +1407,7 @@ impl SuiNode {
state_sync_handle: state_sync::Handle,
accumulator: Weak<StateAccumulator>,
checkpoint_metrics: Arc<CheckpointMetrics>,
) -> (Arc<CheckpointService>, JoinSet<()>) {
) -> Arc<CheckpointService> {
let epoch_start_timestamp_ms = epoch_store.epoch_start_state().epoch_start_timestamp_ms();
let epoch_duration_ms = epoch_store.epoch_start_state().epoch_duration_ms();

Expand All @@ -1430,7 +1432,7 @@ impl SuiNode {
let max_checkpoint_size_bytes =
epoch_store.protocol_config().max_checkpoint_size_bytes() as usize;

CheckpointService::spawn(
CheckpointService::build(
state.clone(),
checkpoint_store,
epoch_store,
Expand Down

0 comments on commit 06ff953

Please sign in to comment.