diff --git a/rs/replicated_state/src/canister_state/queues/tests.rs b/rs/replicated_state/src/canister_state/queues/tests.rs index ca8f4f5857e..db572865a24 100644 --- a/rs/replicated_state/src/canister_state/queues/tests.rs +++ b/rs/replicated_state/src/canister_state/queues/tests.rs @@ -49,24 +49,32 @@ impl CanisterQueuesFixture { } } - fn push_input_request(&mut self) -> Result { + fn push_input_request( + &mut self, + deadline: CoarseTime, + ) -> Result { self.queues.push_input( RequestBuilder::default() .sender(self.other) .receiver(self.this) + .deadline(deadline) .build() .into(), LocalSubnet, ) } - fn push_input_response(&mut self) -> Result { + fn push_input_response( + &mut self, + deadline: CoarseTime, + ) -> Result { self.last_callback_id += 1; self.queues.push_input( ResponseBuilder::default() .originator(self.this) .respondent(self.other) .originator_reply_callback(CallbackId::from(self.last_callback_id)) + .deadline(deadline) .build() .into(), LocalSubnet, @@ -91,7 +99,10 @@ impl CanisterQueuesFixture { self.queues.pop_input() } - fn push_output_request(&mut self) -> Result<(), (StateError, Arc)> { + fn push_output_request( + &mut self, + deadline: CoarseTime, + ) -> Result<(), (StateError, Arc)> { self.last_callback_id += 1; self.queues.push_output_request( Arc::new( @@ -99,17 +110,19 @@ impl CanisterQueuesFixture { .sender(self.this) .receiver(self.other) .sender_reply_callback(CallbackId::from(self.last_callback_id)) + .deadline(deadline) .build(), ), UNIX_EPOCH, ) } - fn push_output_response(&mut self) { + fn push_output_response(&mut self, deadline: CoarseTime) { self.queues.push_output_response(Arc::new( ResponseBuilder::default() .originator(self.other) .respondent(self.this) + .deadline(deadline) .build(), )); } @@ -198,7 +211,7 @@ pub fn input_queue_type_from_local_canisters( #[test] fn can_push_output_request() { let mut fixture = CanisterQueuesFixture::new(); - fixture.push_output_request().unwrap(); + fixture.push_output_request(NO_DEADLINE).unwrap(); } /// Cannot push guaranteed response to output queues without having pushed an @@ -207,7 +220,7 @@ fn can_push_output_request() { #[should_panic(expected = "assertion failed: self.guaranteed_response_memory_reservations > 0")] fn cannot_push_output_response_guaranteed_without_input_request() { let mut fixture = CanisterQueuesFixture::new(); - fixture.push_output_response(); + fixture.push_output_response(NO_DEADLINE); } /// Cannot push best-effort response to output queues without having pushed an @@ -229,26 +242,26 @@ fn cannot_push_output_response_best_effort_without_input_request() { fn enqueuing_unexpected_response_does_not_panic() { let mut fixture = CanisterQueuesFixture::new(); // Enqueue a request to create a queue for `other`. - assert!(fixture.push_input_request().unwrap()); + assert!(fixture.push_input_request(NO_DEADLINE).unwrap()); // Now `other` sends an unexpected `Response`. We should return an error, not // panic. - fixture.push_input_response().unwrap_err(); + fixture.push_input_response(NO_DEADLINE).unwrap_err(); } /// Can push response to output queues after pushing input request. #[test] fn can_push_output_response_after_input_request() { let mut fixture = CanisterQueuesFixture::new(); - assert!(fixture.push_input_request().unwrap()); + assert!(fixture.push_input_request(NO_DEADLINE).unwrap()); fixture.pop_input().unwrap(); - fixture.push_output_response(); + fixture.push_output_response(NO_DEADLINE); } /// Can push one request to the induction pool. #[test] fn can_push_input_request() { let mut fixture = CanisterQueuesFixture::new(); - assert!(fixture.push_input_request().unwrap()); + assert!(fixture.push_input_request(NO_DEADLINE).unwrap()); } /// Cannot push response to the induction pool without pushing output @@ -256,7 +269,7 @@ fn can_push_input_request() { #[test] fn cannot_push_input_response_without_output_request() { let mut fixture = CanisterQueuesFixture::new(); - fixture.push_input_response().unwrap_err(); + fixture.push_input_response(NO_DEADLINE).unwrap_err(); } /// Can push response to input queues after pushing request to output @@ -264,9 +277,9 @@ fn cannot_push_input_response_without_output_request() { #[test] fn can_push_input_response_after_output_request() { let mut fixture = CanisterQueuesFixture::new(); - fixture.push_output_request().unwrap(); + fixture.push_output_request(NO_DEADLINE).unwrap(); fixture.pop_output().unwrap(); - assert!(fixture.push_input_response().unwrap()); + assert!(fixture.push_input_response(NO_DEADLINE).unwrap()); } #[test] @@ -346,14 +359,14 @@ fn push_input_response_duplicate_best_effort_response() { #[test] fn test_available_output_request_slots_dont_counts() { let mut fixture = CanisterQueuesFixture::new(); - assert!(fixture.push_input_request().unwrap()); + assert!(fixture.push_input_request(NO_DEADLINE).unwrap()); assert_eq!( DEFAULT_QUEUE_CAPACITY, fixture.available_output_request_slots() ); fixture.pop_input().unwrap(); - fixture.push_output_response(); + fixture.push_output_response(NO_DEADLINE); assert_eq!( DEFAULT_QUEUE_CAPACITY, fixture.available_output_request_slots() @@ -367,7 +380,7 @@ fn test_available_output_request_slots_counts() { let mut fixture = CanisterQueuesFixture::new(); // Check that output request counts. - fixture.push_output_request().unwrap(); + fixture.push_output_request(NO_DEADLINE).unwrap(); assert_eq!( DEFAULT_QUEUE_CAPACITY - 1, fixture.available_output_request_slots() @@ -381,7 +394,7 @@ fn test_available_output_request_slots_counts() { ); // Check that input response counts. - assert!(fixture.push_input_response().unwrap()); + assert!(fixture.push_input_response(NO_DEADLINE).unwrap()); assert_eq!( DEFAULT_QUEUE_CAPACITY - 1, fixture.available_output_request_slots() @@ -395,9 +408,9 @@ fn test_available_output_request_slots_counts_timed_out_output_requests() { let mut fixture = CanisterQueuesFixture::new(); // Need output response to pin timed out request behind. - assert!(fixture.push_input_request().unwrap()); + assert!(fixture.push_input_request(NO_DEADLINE).unwrap()); fixture.pop_input().unwrap(); - fixture.push_output_response(); + fixture.push_output_response(NO_DEADLINE); // All output request slots are still available. assert_eq!( @@ -406,7 +419,7 @@ fn test_available_output_request_slots_counts_timed_out_output_requests() { ); // Push output request, then time it out. - fixture.push_output_request().unwrap(); + fixture.push_output_request(NO_DEADLINE).unwrap(); fixture.time_out_all_messages_with_deadlines(); // Pop the reject response, to isolate the timed out request. @@ -424,18 +437,18 @@ fn test_backpressure_with_timed_out_requests() { let mut fixture = CanisterQueuesFixture::new(); // Need output response to pin timed out requests behind. - assert!(fixture.push_input_request().unwrap()); + assert!(fixture.push_input_request(NO_DEADLINE).unwrap()); fixture.pop_input(); - fixture.push_output_response(); + fixture.push_output_response(NO_DEADLINE); // Push `DEFAULT_QUEUE_CAPACITY` output requests and time them all out. for _ in 0..DEFAULT_QUEUE_CAPACITY { - fixture.push_output_request().unwrap(); + fixture.push_output_request(NO_DEADLINE).unwrap(); } fixture.time_out_all_messages_with_deadlines(); // Check that no new request can be pushed. - assert!(fixture.push_output_request().is_err()); + assert!(fixture.push_output_request(NO_DEADLINE).is_err()); } /// Checks that `available_output_request_slots` counts timed out output @@ -446,7 +459,7 @@ fn test_available_output_request_slots() { // Fill the output queue with requests. for _ in 0..DEFAULT_QUEUE_CAPACITY { - fixture.push_output_request().unwrap(); + fixture.push_output_request(NO_DEADLINE).unwrap(); } // No output request slots are available. assert_eq!(0, fixture.available_output_request_slots()); @@ -475,7 +488,7 @@ fn test_deadline_expired_input() { let mut fixture = CanisterQueuesFixture::new(); // Enqueue a "deadline expired" compact reject response. - fixture.push_output_request().unwrap(); + fixture.push_output_request(NO_DEADLINE).unwrap(); fixture.pop_output().unwrap(); assert_eq!(Ok(true), fixture.try_push_deadline_expired_input()); @@ -524,7 +537,7 @@ fn test_try_push_deadline_expired_input_no_reserved_slot() { let mut fixture = CanisterQueuesFixture::new(); // Enqueue an input request, to create the input queue. - assert!(fixture.push_input_request().unwrap()); + assert!(fixture.push_input_request(NO_DEADLINE).unwrap()); // Pushing a deadline expired input without a reserved slot signals a bug. assert_eq!( @@ -538,9 +551,9 @@ fn test_try_push_deadline_expired_input_with_same_callback_id() { let mut fixture = CanisterQueuesFixture::new(); // Push an input response. - fixture.push_output_request().unwrap(); + fixture.push_output_request(NO_DEADLINE).unwrap(); fixture.pop_output().unwrap(); - assert!(fixture.push_input_response().unwrap()); + assert!(fixture.push_input_response(NO_DEADLINE).unwrap()); // Sanity check. assert_eq!(1, fixture.queues.input_queues_message_count()); @@ -700,7 +713,7 @@ fn test_message_picking_round_robin_on_one_queue() { let mut fixture = CanisterQueuesFixture::new(); assert!(fixture.pop_input().is_none()); for _ in 0..3 { - assert!(fixture.push_input_request().unwrap()); + assert!(fixture.push_input_request(NO_DEADLINE).unwrap()); } for _ in 0..3 { @@ -3341,10 +3354,74 @@ mod mainnet_compatibility_tests { fn serialize() { let mut fixture = CanisterQueuesFixture::new_with_ids(CANISTER_ID, OTHER_CANISTER_ID); - assert!(fixture.push_input_request().unwrap()); - fixture.push_output_request().unwrap(); - assert!(fixture.push_input_response().unwrap()); - fixture.push_output_response(); + assert!(fixture.push_input_request(NO_DEADLINE).unwrap()); + fixture.push_output_request(NO_DEADLINE).unwrap(); + assert!(fixture.push_input_response(NO_DEADLINE).unwrap()); + fixture.push_output_response(NO_DEADLINE); + + let pb_queues: pb_queues::CanisterQueues = (&fixture.queues).into(); + let serialized = pb_queues.encode_to_vec(); + + let output_path = std::path::Path::new(OUTPUT_NAME); + File::create(output_path) + .unwrap() + .write_all(&serialized) + .unwrap(); + } + + #[test] + #[ignore] + fn deserialize() { + let serialized = std::fs::read(OUTPUT_NAME).expect("Could not read file"); + let pb_queues = pb_queues::CanisterQueues::decode(&serialized as &[u8]) + .expect("Failed to deserialize the protobuf"); + let queues = CanisterQueues::try_from(( + pb_queues, + &StrictMetrics as &dyn CheckpointLoadingMetrics, + )) + .expect("Failed to convert the protobuf to CanisterQueues"); + let mut fixture = CanisterQueuesFixture { + queues, + this: CANISTER_ID, + other: OTHER_CANISTER_ID, + last_callback_id: 0, + }; + assert_matches!(fixture.pop_input(), Some(CanisterInput::Request(req)) if req.deadline == NO_DEADLINE); + assert_matches!(fixture.pop_input(), Some(CanisterInput::Response(rep)) if rep.deadline == NO_DEADLINE); + assert_eq!(fixture.pop_input(), None); + assert!(!fixture.queues.has_input()); + + assert_matches!(fixture.pop_output(), Some(RequestOrResponse::Request(req)) if req.deadline == NO_DEADLINE); + assert_matches!(fixture.pop_output(), Some(RequestOrResponse::Response(rep)) if rep.deadline == NO_DEADLINE); + assert_eq!(fixture.pop_input(), None); + assert!(!fixture.queues.has_output()); + } + } + + #[cfg(test)] + mod best_effort_test { + + use super::super::*; + use super::*; + + const OUTPUT_NAME: &str = "queues.pbuf"; + const CANISTER_ID: CanisterId = CanisterId::from_u64(42); + const OTHER_CANISTER_ID: CanisterId = CanisterId::from_u64(13); + + #[test] + #[ignore] + fn serialize() { + let mut fixture = CanisterQueuesFixture::new_with_ids(CANISTER_ID, OTHER_CANISTER_ID); + + assert!(fixture.push_input_request(NO_DEADLINE).unwrap()); + fixture.push_output_request(NO_DEADLINE).unwrap(); + assert!(fixture.push_input_response(NO_DEADLINE).unwrap()); + fixture.push_output_response(NO_DEADLINE); + + assert!(fixture.push_input_request(SOME_DEADLINE).unwrap()); + fixture.push_output_request(SOME_DEADLINE).unwrap(); + assert!(fixture.push_input_response(SOME_DEADLINE).unwrap()); + fixture.push_output_response(SOME_DEADLINE); let pb_queues: pb_queues::CanisterQueues = (&fixture.queues).into(); let serialized = pb_queues.encode_to_vec(); @@ -3373,13 +3450,17 @@ mod mainnet_compatibility_tests { other: OTHER_CANISTER_ID, last_callback_id: 0, }; - assert_matches!(fixture.pop_input(), Some(CanisterInput::Request(_))); - assert_matches!(fixture.pop_input(), Some(CanisterInput::Response(_))); + assert_matches!(fixture.pop_input(), Some(CanisterInput::Request(req)) if req.deadline == NO_DEADLINE); + assert_matches!(fixture.pop_input(), Some(CanisterInput::Response(rep)) if rep.deadline == NO_DEADLINE); + assert_matches!(fixture.pop_input(), Some(CanisterInput::Request(req)) if req.deadline == SOME_DEADLINE); + assert_matches!(fixture.pop_input(), Some(CanisterInput::Response(rep)) if rep.deadline == SOME_DEADLINE); assert_eq!(fixture.pop_input(), None); assert!(!fixture.queues.has_input()); - assert_matches!(fixture.pop_output(), Some(RequestOrResponse::Request(_))); - assert_matches!(fixture.pop_output(), Some(RequestOrResponse::Response(_))); + assert_matches!(fixture.pop_output(), Some(RequestOrResponse::Request(req)) if req.deadline == NO_DEADLINE); + assert_matches!(fixture.pop_output(), Some(RequestOrResponse::Response(rep)) if rep.deadline == NO_DEADLINE); + assert_matches!(fixture.pop_output(), Some(RequestOrResponse::Request(req)) if req.deadline == SOME_DEADLINE); + assert_matches!(fixture.pop_output(), Some(RequestOrResponse::Response(rep)) if rep.deadline == SOME_DEADLINE); assert_eq!(fixture.pop_input(), None); assert!(!fixture.queues.has_output()); } diff --git a/rs/tests/message_routing/queues_compatibility_test.rs b/rs/tests/message_routing/queues_compatibility_test.rs index 5078a963acf..1112dc4092b 100644 --- a/rs/tests/message_routing/queues_compatibility_test.rs +++ b/rs/tests/message_routing/queues_compatibility_test.rs @@ -262,6 +262,17 @@ fn test(env: TestEnv) { "_main/rs/replicated_state/replicated_state_test_binary/replicated_state_test_binary", "canister_state::queues::tests::mainnet_compatibility_tests::basic_test", ), + // TODO(MR-638): Enabke this test case once a version including + // `best_effort_test` was deployed to mainnet. + // + // TestCase::new( + // TestType::Bidirectional { + // published_binary: "replicated-state-test".to_string(), + // mainnet_version: v.clone(), + // }, + // "_main/rs/replicated_state/replicated_state_test_binary/replicated_state_test_binary", + // "canister_state::queues::tests::mainnet_compatibility_tests::best_effort_test", + // ), TestCase::new( TestType::Bidirectional { published_binary: "replicated-state-test".to_string(),