Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
kpop-dfinity committed Jan 20, 2025
1 parent 85af5fc commit 1588a65
Show file tree
Hide file tree
Showing 14 changed files with 423 additions and 114 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rs/p2p/artifact_downloader/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ DEPENDENCIES = [
]

DEV_DEPENDENCIES = [
"//rs/canister_client/sender",
"//rs/p2p/test_utils",
"//rs/test_utilities/consensus",
"//rs/test_utilities/types",
Expand Down
1 change: 1 addition & 0 deletions rs/p2p/artifact_downloader/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ axum = { workspace = true }
backoff = { workspace = true }
bytes = { workspace = true }
ic-base-types = { path = "../../types/base_types" }
ic-canister-client-sender = { path = "../../canister_client/sender" }
ic-consensus-manager = { path = "../consensus_manager" }
ic-interfaces = { path = "../../interfaces" }
ic-logger = { path = "../../monitoring/logger" }
Expand Down
139 changes: 125 additions & 14 deletions rs/p2p/artifact_downloader/src/fetch_stripped_artifact/assembler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,11 @@ use super::{
download::download_ingress,
metrics::{FetchStrippedConsensusArtifactMetrics, IngressMessageSource, IngressSenderMetrics},
stripper::Strippable,
types::stripped::{
MaybeStrippedConsensusMessage, StrippedBlockProposal, StrippedConsensusMessageId,
types::{
stripped::{
MaybeStrippedConsensusMessage, StrippedBlockProposal, StrippedConsensusMessageId,
},
SignedIngressId,
},
};

Expand Down Expand Up @@ -246,7 +249,7 @@ impl ArtifactAssembler<ConsensusMessage, MaybeStrippedConsensusMessage>
/// Tries to get the missing object either from the pool(s) or from the peers who are advertising
/// it.
async fn get_or_fetch<P: Peers>(
ingress_message_id: IngressMessageId,
signed_ingress_id: SignedIngressId,
ingress_pool: ValidatedPoolReaderRef<SignedIngress>,
transport: Arc<dyn Transport>,
// Id of the *full* artifact which should contain the missing data
Expand All @@ -257,13 +260,19 @@ async fn get_or_fetch<P: Peers>(
peer_rx: P,
) -> (SignedIngress, NodeId) {
// First check if the ingress message exists in the Ingress Pool.
if let Some(ingress_message) = ingress_pool.read().unwrap().get(&ingress_message_id) {
return (ingress_message, node_id);
if let Some(ingress_message) = ingress_pool
.read()
.unwrap()
.get(&signed_ingress_id.ingress_message_id)
{
if SignedIngressId::from(&ingress_message) == signed_ingress_id {
return (ingress_message, node_id);
}
}

download_ingress(
transport,
ingress_message_id,
signed_ingress_id,
full_consensus_message_id,
&log,
&metrics,
Expand All @@ -290,7 +299,7 @@ pub(crate) enum AssemblyError {

struct BlockProposalAssembler {
stripped_block_proposal: StrippedBlockProposal,
ingress_messages: Vec<(IngressMessageId, Option<SignedIngress>)>,
ingress_messages: Vec<(SignedIngressId, Option<SignedIngress>)>,
}

impl BlockProposalAssembler {
Expand All @@ -300,14 +309,14 @@ impl BlockProposalAssembler {
.stripped_ingress_payload
.ingress_messages
.iter()
.map(|ingress_message_id| (ingress_message_id.clone(), None))
.map(|ingress_message| (ingress_message.clone(), None))
.collect(),
stripped_block_proposal,
}
}

/// Returns the list of [`IngressMessageId`]s which have been stripped from the block.
pub(crate) fn missing_ingress_messages(&self) -> Vec<IngressMessageId> {
/// Returns the list of ingress messages which have been stripped from the block.
pub(crate) fn missing_ingress_messages(&self) -> Vec<SignedIngressId> {
self.ingress_messages
.iter()
.filter_map(|(ingress_message_id, maybe_ingress)| {
Expand All @@ -326,14 +335,14 @@ impl BlockProposalAssembler {
&mut self,
ingress_message: SignedIngress,
) -> Result<(), InsertionError> {
let ingress_message_id = IngressMessageId::from(&ingress_message);
let signed_ingress_id = SignedIngressId::from(&ingress_message);

// We can have at most 1000 elements in the vector, so it should be reasonably fast to do a
// linear scan here.
let (_, ingress) = self
.ingress_messages
.iter_mut()
.find(|(id, _maybe_ingress)| *id == ingress_message_id)
.find(|(id, _maybe_ingress)| *id == signed_ingress_id)
.ok_or(InsertionError::NotNeeded)?;

if ingress.is_some() {
Expand All @@ -356,7 +365,9 @@ impl BlockProposalAssembler {
let ingresses = self
.ingress_messages
.into_iter()
.map(|(id, message)| message.ok_or_else(|| AssemblyError::Missing(id)))
.map(|(id, message)| {
message.ok_or_else(|| AssemblyError::Missing(id.ingress_message_id))
})
.collect::<Result<Vec<_>, _>>()?;
let reconstructed_ingress_payload = IngressPayload::from(ingresses);

Expand All @@ -377,8 +388,18 @@ impl BlockProposalAssembler {
mod tests {
use crate::fetch_stripped_artifact::test_utils::{
fake_block_proposal_with_ingresses, fake_ingress_message,
fake_ingress_message_with_arg_size, fake_stripped_block_proposal_with_ingresses,
fake_ingress_message_with_arg_size, fake_ingress_message_with_sig,
fake_stripped_block_proposal_with_ingresses,
};
use crate::fetch_stripped_artifact::types::rpc::GetIngressMessageInBlockResponse;
use bytes::Bytes;
use ic_interfaces::p2p::consensus::BouncerValue;
use ic_logger::no_op_logger;
use ic_p2p_test_utils::mocks::MockBouncerFactory;
use ic_p2p_test_utils::mocks::MockTransport;
use ic_p2p_test_utils::mocks::MockValidatedPoolReader;
use ic_protobuf::proxy::ProtoProxy;
use ic_types_test_utils::ids::NODE_1;

use super::*;

Expand Down Expand Up @@ -502,4 +523,94 @@ mod tests {
Err(InsertionError::NotNeeded)
);
}

#[derive(Clone)]
struct MockPeers(NodeId);

impl Peers for MockPeers {
fn peers(&self) -> Vec<NodeId> {
vec![self.0]
}
}

fn set_up_assembler_with_fake_dependencies(
ingress_pool_message: Option<SignedIngress>,
peers_message: Option<SignedIngress>,
) -> FetchStrippedConsensusArtifact {
let mut mock_transport = MockTransport::new();
let mut ingress_pool = MockValidatedPoolReader::<SignedIngress>::default();

if let Some(ingress_message) = ingress_pool_message {
ingress_pool.expect_get().return_const(ingress_message);
}

if let Some(ingress_message) = peers_message {
let fake_response = axum::response::Response::builder()
.body(Bytes::from(
pb::GetIngressMessageInBlockResponse::proxy_encode(
GetIngressMessageInBlockResponse {
serialized_ingress_message: ingress_message.binary().clone(),
},
),
))
.unwrap();

mock_transport
.expect_rpc()
.returning(move |_, _| (Ok(fake_response.clone())));
}

let consensus_pool = MockValidatedPoolReader::<ConsensusMessage>::default();
let mut mock_bouncer_factory = MockBouncerFactory::default();
mock_bouncer_factory
.expect_new_bouncer()
.returning(|_| Box::new(|_| BouncerValue::Wants));

let f = FetchStrippedConsensusArtifact::new(
no_op_logger(),
tokio::runtime::Handle::current(),
Arc::new(RwLock::new(consensus_pool)),
Arc::new(RwLock::new(ingress_pool)),
Arc::new(mock_bouncer_factory),
MetricsRegistry::new(),
NODE_1,
)
.0;

(f)(Arc::new(mock_transport))
}

/// Tests whether the assembler uses the ingress message with the correct signature in the case
/// when the local ingress pool contains an ingress message with the same content as the one in
/// the stripped block proposal but with a different signature.
#[tokio::test]
async fn roundtrip_test_with_two_identical_ingress_messages_different_signatures() {
let (ingress_1, _ingress_1_id) = fake_ingress_message_with_sig("fake_1", vec![1, 2, 3]);
let (ingress_2, _ingress_2_id) = fake_ingress_message_with_sig("fake_1", vec![2, 3, 4]);
assert_eq!(
IngressMessageId::from(&ingress_1),
IngressMessageId::from(&ingress_2)
);
let block_proposal = fake_block_proposal_with_ingresses(vec![ingress_2.clone()]);

let assembler = set_up_assembler_with_fake_dependencies(
/*ingress_pool_message=*/ Some(ingress_1.clone()),
/*consensus_pool_message=*/ Some(ingress_2.clone()),
);
let stripped_block_proposal =
assembler.disassemble_message(ConsensusMessage::BlockProposal(block_proposal.clone()));
let reassembled_block_proposal = assembler
.assemble_message(
stripped_block_proposal.id(),
Some((stripped_block_proposal, NODE_1)),
MockPeers(NODE_1),
)
.await
.expect("should reassemble the message given the dependencies");

assert_eq!(
reassembled_block_proposal,
(ConsensusMessage::BlockProposal(block_proposal), NODE_1)
);
}
}
Loading

0 comments on commit 1588a65

Please sign in to comment.