Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
  • Loading branch information
eichhorl committed Jun 30, 2024
1 parent 842953b commit 9063093
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 48 deletions.
77 changes: 47 additions & 30 deletions rs/artifact_pool/src/backup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ use ic_metrics::MetricsRegistry;
use ic_protobuf::types::v1 as pb;
use ic_types::{
consensus::{
BlockProposal, CatchUpPackage, ConsensusMessage, Finalization, HasHeight, Notarization,
RandomBeacon, RandomTape,
BlockProposal, ConsensusMessage, Finalization, HasHeight, Notarization, RandomBeacon,
RandomTape,
},
time::{Time, UNIX_EPOCH},
Height,
Expand All @@ -50,7 +50,33 @@ pub enum BackupArtifact {
BlockProposal(Box<BlockProposal>),
RandomBeacon(Box<RandomBeacon>),
RandomTape(Box<RandomTape>),
CatchUpPackage(Box<CatchUpPackage>),
CatchUpPackage(Box<pb::CatchUpPackage>),
}

impl TryFrom<ConsensusMessage> for BackupArtifact {
type Error = ();
fn try_from(artifact: ConsensusMessage) -> Result<Self, Self::Error> {
use ConsensusMessage::*;
match artifact {
Finalization(artifact) => Ok(BackupArtifact::Finalization(Box::new(artifact))),
Notarization(artifact) => Ok(BackupArtifact::Notarization(Box::new(artifact))),
BlockProposal(artifact) => Ok(BackupArtifact::BlockProposal(Box::new(artifact))),
RandomTape(artifact) => Ok(BackupArtifact::RandomTape(Box::new(artifact))),
RandomBeacon(artifact) => Ok(BackupArtifact::RandomBeacon(Box::new(artifact))),
// CUPs need to be backed up using the original protobuf bytes
CatchUpPackage(artifact) => Ok(BackupArtifact::CatchUpPackage(Box::new(
pb::CatchUpPackage::from(&artifact),
))),
// Do not replace by a `_` so that we evaluate at this place if we want to
// backup a new artifact!
RandomBeaconShare(_)
| NotarizationShare(_)
| FinalizationShare(_)
| RandomTapeShare(_)
| CatchUpPackageShare(_)
| EquivocationProof(_) => Err(()),
}
}
}

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -111,6 +137,10 @@ impl BackupThread {
loop {
match rx.recv() {
Ok(BackupRequest::Backup(artifacts)) => {
let artifacts = artifacts
.into_iter()
.flat_map(BackupArtifact::try_from)
.collect();
if let Err(err) = store_artifacts(artifacts, &self.version_path) {
error!(self.log, "Backup storing failed: {:?}", err);
self.metrics.io_errors.inc();
Expand Down Expand Up @@ -385,26 +415,9 @@ impl Backup {

// Write all backup files to the disk. For the sake of simplicity, we write all
// artifacts sequentially.
fn store_artifacts(artifacts: Vec<ConsensusMessage>, path: &Path) -> Result<(), io::Error> {
use ConsensusMessage::*;
fn store_artifacts(artifacts: Vec<BackupArtifact>, path: &Path) -> Result<(), io::Error> {
artifacts
.into_iter()
.filter_map(|artifact| match artifact {
Finalization(artifact) => Some(BackupArtifact::Finalization(Box::new(artifact))),
Notarization(artifact) => Some(BackupArtifact::Notarization(Box::new(artifact))),
BlockProposal(artifact) => Some(BackupArtifact::BlockProposal(Box::new(artifact))),
RandomTape(artifact) => Some(BackupArtifact::RandomTape(Box::new(artifact))),
RandomBeacon(artifact) => Some(BackupArtifact::RandomBeacon(Box::new(artifact))),
CatchUpPackage(artifact) => Some(BackupArtifact::CatchUpPackage(Box::new(artifact))),
// Do not replace by a `_` so that we evaluate at this place if we want to
// backup a new artifact!
RandomBeaconShare(_)
| NotarizationShare(_)
| FinalizationShare(_)
| RandomTapeShare(_)
| CatchUpPackageShare(_)
| EquivocationProof(_) => None,
})
.try_for_each(|artifact| artifact.write_to_disk(path))
}

Expand Down Expand Up @@ -472,7 +485,7 @@ fn get_leaves(dir: &Path, leaves: &mut Vec<PathBuf>) -> std::io::Result<()> {
}

// Returns all artifacts starting from the latest catch-up package height.
fn get_all_persisted_artifacts(pool: &dyn ConsensusPool) -> Vec<ConsensusMessage> {
fn get_all_persisted_artifacts(pool: &dyn ConsensusPool) -> Vec<BackupArtifact> {
let cup_height = pool.as_cache().catch_up_package().height();
let notarization_pool = pool.validated().notarization();
let notarization_range = HeightRange::new(
Expand Down Expand Up @@ -525,11 +538,6 @@ fn get_all_persisted_artifacts(pool: &dyn ConsensusPool) -> Vec<ConsensusMessage
.get_by_height_range(notarization_range)
.map(ConsensusMessage::Notarization),
)
.chain(
catch_up_package_pool
.get_by_height_range(catch_up_package_range)
.map(ConsensusMessage::CatchUpPackage),
)
.chain(
random_tape_pool
.get_by_height_range(random_tape_range)
Expand All @@ -545,6 +553,17 @@ fn get_all_persisted_artifacts(pool: &dyn ConsensusPool) -> Vec<ConsensusMessage
.get_by_height_range(block_proposal_range)
.map(ConsensusMessage::BlockProposal),
)
.flat_map(BackupArtifact::try_from)
.chain(
catch_up_package_pool
.get_by_height_range(catch_up_package_range)
.map(|cup| cup.height())
.flat_map(|height| {
pool.validated()
.get_catch_up_package_proto_at_height(height)
})
.map(|cup| BackupArtifact::CatchUpPackage(Box::new(cup))),
)
.collect()
}

Expand Down Expand Up @@ -588,9 +607,7 @@ impl BackupArtifact {
BlockProposal(artifact) => pb::BlockProposal::from(artifact.as_ref()).encode(&mut buf),
RandomTape(artifact) => pb::RandomTape::from(artifact.as_ref()).encode(&mut buf),
RandomBeacon(artifact) => pb::RandomBeacon::from(artifact.as_ref()).encode(&mut buf),
CatchUpPackage(artifact) => {
pb::CatchUpPackage::from(artifact.as_ref()).encode(&mut buf)
}
CatchUpPackage(artifact) => artifact.encode(&mut buf),
}
.map_err(|err| io::Error::new(io::ErrorKind::Other, err.to_string()))?;
Ok(buf)
Expand Down
28 changes: 16 additions & 12 deletions rs/artifact_pool/src/lmdb_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1327,12 +1327,8 @@ impl PoolSection<ValidatedConsensusArtifact> for PersistentHeightIndexedPool<Con
self
}

fn highest_catch_up_package_proto(&self) -> pb::CatchUpPackage {
let h = self
.catch_up_package()
.max_height()
.expect("There should always be a CUP in the pool.");
let key = HeightKey::from(h);
fn get_catch_up_package_proto_at_height(&self, height: Height) -> Option<pb::CatchUpPackage> {
let key = HeightKey::from(height);
let index_db = self.get_index_db(&CatchUpPackage::type_key());
let log = self.log.clone();
let artifacts = self.artifacts;
Expand Down Expand Up @@ -1360,12 +1356,20 @@ impl PoolSection<ValidatedConsensusArtifact> for PersistentHeightIndexedPool<Con
self.log.clone(),
)
.next()
.unwrap_or_else(|| {
panic!(
"This should be impossible since we found a max height at {:?}",
h
)
})
}

fn highest_catch_up_package_proto(&self) -> pb::CatchUpPackage {
let h = self
.catch_up_package()
.max_height()
.expect("There should always be a CUP in the pool.");
self.get_catch_up_package_proto_at_height(h)
.unwrap_or_else(|| {
panic!(
"This should be impossible since we found a max height at {:?}",
h
)
})
}

/// Number of artifacts in the DB.
Expand Down
15 changes: 9 additions & 6 deletions rs/artifact_pool/src/rocksdb_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -649,20 +649,23 @@ impl PoolSection<ValidatedConsensusArtifact> for PersistentHeightIndexedPool<Con
self
}

fn highest_catch_up_package_proto(&self) -> pb::CatchUpPackage {
let height_opt = self.max_height::<CatchUpPackage>().unwrap();
let min_height_key = make_min_key(height_opt.get());
let max_height_key = make_max_key(height_opt.get());
fn get_catch_up_package_proto_at_height(&self, height: Height) -> Option<pb::CatchUpPackage> {
let min_height_key = make_min_key(height.get());
let max_height_key = make_max_key(height.get());
let mut iter = check_ok_uw!(StandaloneIterator::new(
self.db.clone(),
CatchUpPackage::info().name,
&min_height_key,
&max_height_key,
deserialize_catch_up_package_fn
));
iter.next()
iter.next().map(|artifact| artifact.msg)
}

fn highest_catch_up_package_proto(&self) -> pb::CatchUpPackage {
let height_opt = self.max_height::<CatchUpPackage>().unwrap();
self.get_catch_up_package_proto_at_height(height_opt)
.expect("There must be a catch up package in the pool")
.msg
}

// TODO(CON-308): Implement size()
Expand Down
1 change: 1 addition & 0 deletions rs/backup/src/backup_helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,7 @@ impl BackupHelper {
);
let file_name = self.logs_dir().join(log_file_name);
debug!(self.log, "Write replay log to: {:?}", file_name);
warn!(self.log, "{stdout}");
let mut file =
File::create(file_name).map_err(|err| format!("Error creating log file: {:?}", err))?;
file.write_all(stdout.as_bytes())
Expand Down
7 changes: 7 additions & 0 deletions rs/interfaces/src/consensus_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,13 @@ pub trait PoolSection<T> {
)
}

fn get_catch_up_package_proto_at_height(&self, height: Height) -> Option<pb::CatchUpPackage> {
self.catch_up_package()
.get_only_by_height(height)
.map(|cup| pb::CatchUpPackage::from(&cup))
.ok()
}

fn size(&self) -> u64;
}

Expand Down
8 changes: 8 additions & 0 deletions rs/types/types/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,14 @@ impl HasHeight for BlockMetadata {
}
}

impl HasHeight for pb::CatchUpPackage {
fn height(&self) -> Height {
CatchUpPackage::try_from(self)
.map(|cup| cup.height())
.unwrap_or_default()
}
}

/// Rank is used to indicate the priority of a block maker, where 0 indicates
/// the highest priority.
#[derive(Copy, Clone, Debug, PartialOrd, Ord, PartialEq, Eq, Hash, Serialize, Deserialize)]
Expand Down

0 comments on commit 9063093

Please sign in to comment.