Skip to content

Commit

Permalink
test(MR): [MR-638] Include best-effort messages in `queues_compatibil…
Browse files Browse the repository at this point in the history
…ity_test`, stage 1

Have `queues_compatibility_test` also persist and restore best-effort requests and responses.

Unfortunately, the mainnet version that we test against must also have the test module encoding and decoding the same messages (in this case `best_effort_test`) before the test can actually work. So we will require a second stage to actually enable the test once the mainnet version includes this newly added test module.
  • Loading branch information
alin-at-dfinity committed Jan 22, 2025
1 parent 0126ae6 commit f67ab23
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 39 deletions.
159 changes: 120 additions & 39 deletions rs/replicated_state/src/canister_state/queues/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,24 +49,32 @@ impl CanisterQueuesFixture {
}
}

fn push_input_request(&mut self) -> Result<bool, (StateError, RequestOrResponse)> {
fn push_input_request(
&mut self,
deadline: CoarseTime,
) -> Result<bool, (StateError, RequestOrResponse)> {
self.queues.push_input(
RequestBuilder::default()
.sender(self.other)
.receiver(self.this)
.deadline(deadline)
.build()
.into(),
LocalSubnet,
)
}

fn push_input_response(&mut self) -> Result<bool, (StateError, RequestOrResponse)> {
fn push_input_response(
&mut self,
deadline: CoarseTime,
) -> Result<bool, (StateError, RequestOrResponse)> {
self.last_callback_id += 1;
self.queues.push_input(
ResponseBuilder::default()
.originator(self.this)
.respondent(self.other)
.originator_reply_callback(CallbackId::from(self.last_callback_id))
.deadline(deadline)
.build()
.into(),
LocalSubnet,
Expand All @@ -91,25 +99,30 @@ impl CanisterQueuesFixture {
self.queues.pop_input()
}

fn push_output_request(&mut self) -> Result<(), (StateError, Arc<Request>)> {
fn push_output_request(
&mut self,
deadline: CoarseTime,
) -> Result<(), (StateError, Arc<Request>)> {
self.last_callback_id += 1;
self.queues.push_output_request(
Arc::new(
RequestBuilder::default()
.sender(self.this)
.receiver(self.other)
.sender_reply_callback(CallbackId::from(self.last_callback_id))
.deadline(deadline)
.build(),
),
UNIX_EPOCH,
)
}

fn push_output_response(&mut self) {
fn push_output_response(&mut self, deadline: CoarseTime) {
self.queues.push_output_response(Arc::new(
ResponseBuilder::default()
.originator(self.other)
.respondent(self.this)
.deadline(deadline)
.build(),
));
}
Expand Down Expand Up @@ -198,7 +211,7 @@ pub fn input_queue_type_from_local_canisters(
#[test]
fn can_push_output_request() {
let mut fixture = CanisterQueuesFixture::new();
fixture.push_output_request().unwrap();
fixture.push_output_request(NO_DEADLINE).unwrap();
}

/// Cannot push guaranteed response to output queues without having pushed an
Expand All @@ -207,7 +220,7 @@ fn can_push_output_request() {
#[should_panic(expected = "assertion failed: self.guaranteed_response_memory_reservations > 0")]
fn cannot_push_output_response_guaranteed_without_input_request() {
let mut fixture = CanisterQueuesFixture::new();
fixture.push_output_response();
fixture.push_output_response(NO_DEADLINE);
}

/// Cannot push best-effort response to output queues without having pushed an
Expand All @@ -229,44 +242,44 @@ fn cannot_push_output_response_best_effort_without_input_request() {
fn enqueuing_unexpected_response_does_not_panic() {
let mut fixture = CanisterQueuesFixture::new();
// Enqueue a request to create a queue for `other`.
assert!(fixture.push_input_request().unwrap());
assert!(fixture.push_input_request(NO_DEADLINE).unwrap());
// Now `other` sends an unexpected `Response`. We should return an error, not
// panic.
fixture.push_input_response().unwrap_err();
fixture.push_input_response(NO_DEADLINE).unwrap_err();
}

/// Can push response to output queues after pushing input request.
#[test]
fn can_push_output_response_after_input_request() {
let mut fixture = CanisterQueuesFixture::new();
assert!(fixture.push_input_request().unwrap());
assert!(fixture.push_input_request(NO_DEADLINE).unwrap());
fixture.pop_input().unwrap();
fixture.push_output_response();
fixture.push_output_response(NO_DEADLINE);
}

/// Can push one request to the induction pool.
#[test]
fn can_push_input_request() {
let mut fixture = CanisterQueuesFixture::new();
assert!(fixture.push_input_request().unwrap());
assert!(fixture.push_input_request(NO_DEADLINE).unwrap());
}

/// Cannot push response to the induction pool without pushing output
/// request first.
#[test]
fn cannot_push_input_response_without_output_request() {
let mut fixture = CanisterQueuesFixture::new();
fixture.push_input_response().unwrap_err();
fixture.push_input_response(NO_DEADLINE).unwrap_err();
}

/// Can push response to input queues after pushing request to output
/// queues.
#[test]
fn can_push_input_response_after_output_request() {
let mut fixture = CanisterQueuesFixture::new();
fixture.push_output_request().unwrap();
fixture.push_output_request(NO_DEADLINE).unwrap();
fixture.pop_output().unwrap();
assert!(fixture.push_input_response().unwrap());
assert!(fixture.push_input_response(NO_DEADLINE).unwrap());
}

#[test]
Expand Down Expand Up @@ -346,14 +359,14 @@ fn push_input_response_duplicate_best_effort_response() {
#[test]
fn test_available_output_request_slots_dont_counts() {
let mut fixture = CanisterQueuesFixture::new();
assert!(fixture.push_input_request().unwrap());
assert!(fixture.push_input_request(NO_DEADLINE).unwrap());
assert_eq!(
DEFAULT_QUEUE_CAPACITY,
fixture.available_output_request_slots()
);
fixture.pop_input().unwrap();

fixture.push_output_response();
fixture.push_output_response(NO_DEADLINE);
assert_eq!(
DEFAULT_QUEUE_CAPACITY,
fixture.available_output_request_slots()
Expand All @@ -367,7 +380,7 @@ fn test_available_output_request_slots_counts() {
let mut fixture = CanisterQueuesFixture::new();

// Check that output request counts.
fixture.push_output_request().unwrap();
fixture.push_output_request(NO_DEADLINE).unwrap();
assert_eq!(
DEFAULT_QUEUE_CAPACITY - 1,
fixture.available_output_request_slots()
Expand All @@ -381,7 +394,7 @@ fn test_available_output_request_slots_counts() {
);

// Check that input response counts.
assert!(fixture.push_input_response().unwrap());
assert!(fixture.push_input_response(NO_DEADLINE).unwrap());
assert_eq!(
DEFAULT_QUEUE_CAPACITY - 1,
fixture.available_output_request_slots()
Expand All @@ -395,9 +408,9 @@ fn test_available_output_request_slots_counts_timed_out_output_requests() {
let mut fixture = CanisterQueuesFixture::new();

// Need output response to pin timed out request behind.
assert!(fixture.push_input_request().unwrap());
assert!(fixture.push_input_request(NO_DEADLINE).unwrap());
fixture.pop_input().unwrap();
fixture.push_output_response();
fixture.push_output_response(NO_DEADLINE);

// All output request slots are still available.
assert_eq!(
Expand All @@ -406,7 +419,7 @@ fn test_available_output_request_slots_counts_timed_out_output_requests() {
);

// Push output request, then time it out.
fixture.push_output_request().unwrap();
fixture.push_output_request(NO_DEADLINE).unwrap();
fixture.time_out_all_messages_with_deadlines();

// Pop the reject response, to isolate the timed out request.
Expand All @@ -424,18 +437,18 @@ fn test_backpressure_with_timed_out_requests() {
let mut fixture = CanisterQueuesFixture::new();

// Need output response to pin timed out requests behind.
assert!(fixture.push_input_request().unwrap());
assert!(fixture.push_input_request(NO_DEADLINE).unwrap());
fixture.pop_input();
fixture.push_output_response();
fixture.push_output_response(NO_DEADLINE);

// Push `DEFAULT_QUEUE_CAPACITY` output requests and time them all out.
for _ in 0..DEFAULT_QUEUE_CAPACITY {
fixture.push_output_request().unwrap();
fixture.push_output_request(NO_DEADLINE).unwrap();
}
fixture.time_out_all_messages_with_deadlines();

// Check that no new request can be pushed.
assert!(fixture.push_output_request().is_err());
assert!(fixture.push_output_request(NO_DEADLINE).is_err());
}

/// Checks that `available_output_request_slots` counts timed out output
Expand All @@ -446,7 +459,7 @@ fn test_available_output_request_slots() {

// Fill the output queue with requests.
for _ in 0..DEFAULT_QUEUE_CAPACITY {
fixture.push_output_request().unwrap();
fixture.push_output_request(NO_DEADLINE).unwrap();
}
// No output request slots are available.
assert_eq!(0, fixture.available_output_request_slots());
Expand Down Expand Up @@ -475,7 +488,7 @@ fn test_deadline_expired_input() {
let mut fixture = CanisterQueuesFixture::new();

// Enqueue a "deadline expired" compact reject response.
fixture.push_output_request().unwrap();
fixture.push_output_request(NO_DEADLINE).unwrap();
fixture.pop_output().unwrap();
assert_eq!(Ok(true), fixture.try_push_deadline_expired_input());

Expand Down Expand Up @@ -524,7 +537,7 @@ fn test_try_push_deadline_expired_input_no_reserved_slot() {
let mut fixture = CanisterQueuesFixture::new();

// Enqueue an input request, to create the input queue.
assert!(fixture.push_input_request().unwrap());
assert!(fixture.push_input_request(NO_DEADLINE).unwrap());

// Pushing a deadline expired input without a reserved slot signals a bug.
assert_eq!(
Expand All @@ -538,9 +551,9 @@ fn test_try_push_deadline_expired_input_with_same_callback_id() {
let mut fixture = CanisterQueuesFixture::new();

// Push an input response.
fixture.push_output_request().unwrap();
fixture.push_output_request(NO_DEADLINE).unwrap();
fixture.pop_output().unwrap();
assert!(fixture.push_input_response().unwrap());
assert!(fixture.push_input_response(NO_DEADLINE).unwrap());

// Sanity check.
assert_eq!(1, fixture.queues.input_queues_message_count());
Expand Down Expand Up @@ -700,7 +713,7 @@ fn test_message_picking_round_robin_on_one_queue() {
let mut fixture = CanisterQueuesFixture::new();
assert!(fixture.pop_input().is_none());
for _ in 0..3 {
assert!(fixture.push_input_request().unwrap());
assert!(fixture.push_input_request(NO_DEADLINE).unwrap());
}

for _ in 0..3 {
Expand Down Expand Up @@ -3341,10 +3354,74 @@ mod mainnet_compatibility_tests {
fn serialize() {
let mut fixture = CanisterQueuesFixture::new_with_ids(CANISTER_ID, OTHER_CANISTER_ID);

assert!(fixture.push_input_request().unwrap());
fixture.push_output_request().unwrap();
assert!(fixture.push_input_response().unwrap());
fixture.push_output_response();
assert!(fixture.push_input_request(NO_DEADLINE).unwrap());
fixture.push_output_request(NO_DEADLINE).unwrap();
assert!(fixture.push_input_response(NO_DEADLINE).unwrap());
fixture.push_output_response(NO_DEADLINE);

let pb_queues: pb_queues::CanisterQueues = (&fixture.queues).into();
let serialized = pb_queues.encode_to_vec();

let output_path = std::path::Path::new(OUTPUT_NAME);
File::create(output_path)
.unwrap()
.write_all(&serialized)
.unwrap();
}

#[test]
#[ignore]
fn deserialize() {
let serialized = std::fs::read(OUTPUT_NAME).expect("Could not read file");
let pb_queues = pb_queues::CanisterQueues::decode(&serialized as &[u8])
.expect("Failed to deserialize the protobuf");
let queues = CanisterQueues::try_from((
pb_queues,
&StrictMetrics as &dyn CheckpointLoadingMetrics,
))
.expect("Failed to convert the protobuf to CanisterQueues");
let mut fixture = CanisterQueuesFixture {
queues,
this: CANISTER_ID,
other: OTHER_CANISTER_ID,
last_callback_id: 0,
};
assert_matches!(fixture.pop_input(), Some(CanisterInput::Request(req)) if req.deadline == NO_DEADLINE);
assert_matches!(fixture.pop_input(), Some(CanisterInput::Response(rep)) if rep.deadline == NO_DEADLINE);
assert_eq!(fixture.pop_input(), None);
assert!(!fixture.queues.has_input());

assert_matches!(fixture.pop_output(), Some(RequestOrResponse::Request(req)) if req.deadline == NO_DEADLINE);
assert_matches!(fixture.pop_output(), Some(RequestOrResponse::Response(rep)) if rep.deadline == NO_DEADLINE);
assert_eq!(fixture.pop_input(), None);
assert!(!fixture.queues.has_output());
}
}

#[cfg(test)]
mod best_effort_test {

use super::super::*;
use super::*;

const OUTPUT_NAME: &str = "queues.pbuf";
const CANISTER_ID: CanisterId = CanisterId::from_u64(42);
const OTHER_CANISTER_ID: CanisterId = CanisterId::from_u64(13);

#[test]
#[ignore]
fn serialize() {
let mut fixture = CanisterQueuesFixture::new_with_ids(CANISTER_ID, OTHER_CANISTER_ID);

assert!(fixture.push_input_request(NO_DEADLINE).unwrap());
fixture.push_output_request(NO_DEADLINE).unwrap();
assert!(fixture.push_input_response(NO_DEADLINE).unwrap());
fixture.push_output_response(NO_DEADLINE);

assert!(fixture.push_input_request(SOME_DEADLINE).unwrap());
fixture.push_output_request(SOME_DEADLINE).unwrap();
assert!(fixture.push_input_response(SOME_DEADLINE).unwrap());
fixture.push_output_response(SOME_DEADLINE);

let pb_queues: pb_queues::CanisterQueues = (&fixture.queues).into();
let serialized = pb_queues.encode_to_vec();
Expand Down Expand Up @@ -3373,13 +3450,17 @@ mod mainnet_compatibility_tests {
other: OTHER_CANISTER_ID,
last_callback_id: 0,
};
assert_matches!(fixture.pop_input(), Some(CanisterInput::Request(_)));
assert_matches!(fixture.pop_input(), Some(CanisterInput::Response(_)));
assert_matches!(fixture.pop_input(), Some(CanisterInput::Request(req)) if req.deadline == NO_DEADLINE);
assert_matches!(fixture.pop_input(), Some(CanisterInput::Response(rep)) if rep.deadline == NO_DEADLINE);
assert_matches!(fixture.pop_input(), Some(CanisterInput::Request(req)) if req.deadline == SOME_DEADLINE);
assert_matches!(fixture.pop_input(), Some(CanisterInput::Response(rep)) if rep.deadline == SOME_DEADLINE);
assert_eq!(fixture.pop_input(), None);
assert!(!fixture.queues.has_input());

assert_matches!(fixture.pop_output(), Some(RequestOrResponse::Request(_)));
assert_matches!(fixture.pop_output(), Some(RequestOrResponse::Response(_)));
assert_matches!(fixture.pop_output(), Some(RequestOrResponse::Request(req)) if req.deadline == NO_DEADLINE);
assert_matches!(fixture.pop_output(), Some(RequestOrResponse::Response(rep)) if rep.deadline == NO_DEADLINE);
assert_matches!(fixture.pop_output(), Some(RequestOrResponse::Request(req)) if req.deadline == SOME_DEADLINE);
assert_matches!(fixture.pop_output(), Some(RequestOrResponse::Response(rep)) if rep.deadline == SOME_DEADLINE);
assert_eq!(fixture.pop_input(), None);
assert!(!fixture.queues.has_output());
}
Expand Down
11 changes: 11 additions & 0 deletions rs/tests/message_routing/queues_compatibility_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,17 @@ fn test(env: TestEnv) {
"_main/rs/replicated_state/replicated_state_test_binary/replicated_state_test_binary",
"canister_state::queues::tests::mainnet_compatibility_tests::basic_test",
),
// TODO(MR-638): Enabke this test case once a version including
// `best_effort_test` was deployed to mainnet.
//
// TestCase::new(
// TestType::Bidirectional {
// published_binary: "replicated-state-test".to_string(),
// mainnet_version: v.clone(),
// },
// "_main/rs/replicated_state/replicated_state_test_binary/replicated_state_test_binary",
// "canister_state::queues::tests::mainnet_compatibility_tests::best_effort_test",
// ),
TestCase::new(
TestType::Bidirectional {
published_binary: "replicated-state-test".to_string(),
Expand Down

0 comments on commit f67ab23

Please sign in to comment.