Skip to content

Commit

Permalink
Leo's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
kpop-dfinity committed Jan 21, 2025
1 parent 1588a65 commit eef9fa0
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -172,12 +172,12 @@ impl ArtifactAssembler<ConsensusMessage, MaybeStrippedConsensusMessage>
.start_timer();
let mut assembler = BlockProposalAssembler::new(stripped_block_proposal);

let missing_ingress_ids = assembler.missing_ingress_messages();
let stripped_ingress_ids = assembler.missing_ingress_messages();
// For each stripped object in the message, try to fetch it either from the local pools
// or from a random peer who is advertising it.
for missing_ingress_id in missing_ingress_ids {
for stripped_ingress_id in stripped_ingress_ids {
join_set.spawn(get_or_fetch(
missing_ingress_id,
stripped_ingress_id,
self.ingress_pool.clone(),
self.transport.clone(),
id.as_ref().clone(),
Expand Down Expand Up @@ -265,6 +265,8 @@ async fn get_or_fetch<P: Peers>(
.unwrap()
.get(&signed_ingress_id.ingress_message_id)
{
// Make sure that this is the correct ingress message. [`IngressMessageId`] does _not_
// uniquely identify ingress messages, we thus need to perform an extra check.
if SignedIngressId::from(&ingress_message) == signed_ingress_id {
return (ingress_message, node_id);
}
Expand Down Expand Up @@ -309,7 +311,7 @@ impl BlockProposalAssembler {
.stripped_ingress_payload
.ingress_messages
.iter()
.map(|ingress_message| (ingress_message.clone(), None))
.map(|signed_ingress_id| (signed_ingress_id.clone(), None))
.collect(),
stripped_block_proposal,
}
Expand All @@ -319,9 +321,9 @@ impl BlockProposalAssembler {
pub(crate) fn missing_ingress_messages(&self) -> Vec<SignedIngressId> {
self.ingress_messages
.iter()
.filter_map(|(ingress_message_id, maybe_ingress)| {
.filter_map(|(signed_ingress_id, maybe_ingress)| {
if maybe_ingress.is_none() {
Some(ingress_message_id)
Some(signed_ingress_id)
} else {
None
}
Expand Down
17 changes: 11 additions & 6 deletions rs/p2p/artifact_downloader/src/fetch_stripped_artifact/download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ impl Pools {

// First check if the requested ingress message exists in the Ingress Pool.
if let Some(ingress_message) = self.ingress_pool.read().unwrap().get(ingress_message_id) {
// Make sure that this is the correct ingress message. [`IngressMessageId`] does _not_
// uniquely identify ingress messages, we thus need to perform an extra check.
if SignedIngressId::from(&ingress_message) == *signed_ingress_id {
self.metrics.ingress_messages_in_ingress_pool.inc();
return Ok(ingress_message.into());
Expand Down Expand Up @@ -96,6 +98,9 @@ impl Pools {
.get_serialized_by_id(ingress_message_id)
{
Some(bytes)
// Make sure that this is the correct ingress message. [`IngressMessageId`]
// does _not_ uniquely identify ingress messages, we thus need to perform
// an extra check.
if SignedIngressId::new(ingress_message_id.clone(), bytes)
== *signed_ingress_id =>
{
Expand Down Expand Up @@ -226,7 +231,7 @@ fn parse_response(
};

let Ok(ingress) = SignedIngress::try_from(response.serialized_ingress_message) else {
metrics.report_download_error("ingress_deserializedion_failed");
metrics.report_download_error("ingress_deserialization_failed");
return None;
};

Expand Down Expand Up @@ -321,7 +326,7 @@ mod tests {
let pools = mock_pools(
Some(ingress_message.clone()),
None,
/*expect_consensus_pool_acces=*/ false,
/*expect_consensus_pool_access=*/ false,
);
let router = build_axum_router(pools);

Expand All @@ -348,7 +353,7 @@ mod tests {
let pools = mock_pools(
None,
Some(block.clone()),
/*expect_consensus_pool_acces=*/ true,
/*expect_consensus_pool_access=*/ true,
);
let router = build_axum_router(pools);

Expand All @@ -372,7 +377,7 @@ mod tests {
async fn rpc_get_not_found_test() {
let ingress_message = SignedIngressBuilder::new().nonce(1).build();
let block = fake_block_proposal(vec![]);
let pools = mock_pools(None, None, /*expect_consensus_pool_acces=*/ true);
let pools = mock_pools(None, None, /*expect_consensus_pool_access=*/ true);
let router = build_axum_router(pools);

let response = send_request(
Expand Down Expand Up @@ -417,7 +422,7 @@ mod tests {
let pools = mock_pools(
Some(ingress_message_1.clone()),
Some(block.clone()),
/*expect_consensus_pool_acces=*/ true,
/*expect_consensus_pool_access=*/ true,
);
let router = build_axum_router(pools);

Expand All @@ -440,7 +445,7 @@ mod tests {
let pools = mock_pools(
None,
Some(block.clone()),
/*expect_consensus_pool_acces=*/ true,
/*expect_consensus_pool_access=*/ true,
);
let router = build_axum_router(pools);

Expand Down

0 comments on commit eef9fa0

Please sign in to comment.