diff --git a/rs/messaging/src/routing/stream_handler.rs b/rs/messaging/src/routing/stream_handler.rs index 8c98175f456..321f4708995 100644 --- a/rs/messaging/src/routing/stream_handler.rs +++ b/rs/messaging/src/routing/stream_handler.rs @@ -615,14 +615,14 @@ impl StreamHandlerImpl { state, available_guaranteed_response_memory, ) { - Ok(()) => { + None => { // Reject response successfully inducted or dropped. } - Err((StateError::CanisterMigrating { .. }, reject_response)) => { + Some((RejectReason::CanisterMigrating, reject_response)) => { // Canister is being migrated, reroute reject response. reroute_response(reject_response, state, streams, &self.log); } - Err(_) => { + Some(_) => { unreachable!( "Errors other than `CanisterMigrating` shouldn't be possible." ); @@ -740,35 +740,27 @@ impl StreamHandlerImpl { state, available_guaranteed_response_memory, ) { - Ok(()) => { + None => { // Message successfully inducted or dropped. stream.push_accept_signal(); } - Err((StateError::CanisterMigrating { .. }, RequestOrResponse::Response(_))) => { - // Unable to deliver a response due to migrating canister, push reject signal. - stream.push_reject_signal(RejectReason::CanisterMigrating); - } - Err((err, RequestOrResponse::Request(request))) => { - // TODO(MR-249): Replace generating reject responses with pushing reject signals. + Some((reason, RequestOrResponse::Request(request))) + if state.metadata.certification_version < CertificationVersion::V19 => + { // Unable to induct a request, generate reject response and push it into `stream`. - let code = match err { - StateError::CanisterNotFound(_) => RejectCode::DestinationInvalid, - StateError::CanisterStopped(_) => RejectCode::CanisterError, - StateError::CanisterStopping(_) => RejectCode::CanisterError, - StateError::CanisterMigrating { .. } => RejectCode::SysTransient, - StateError::QueueFull { .. } => RejectCode::SysTransient, - StateError::OutOfMemory { .. } => RejectCode::CanisterError, - StateError::NonMatchingResponse { .. } - | StateError::BitcoinNonMatchingResponse { .. } => { - unreachable!("Not a user error: {}", err); - } - }; *available_guaranteed_response_memory -= - stream.push(generate_reject_response(&request, code, err.to_string())) - as i64; - stream.push_accept_signal() + stream.push(generate_reject_response_for(reason, &request)) as i64; + stream.push_accept_signal(); } - Err((_, RequestOrResponse::Response(_))) => { + Some((reason, RequestOrResponse::Request(_))) => { + // Unable to induct a request, push a reject signal. + stream.push_reject_signal(reason); + } + Some((RejectReason::CanisterMigrating, RequestOrResponse::Response(_))) => { + // Unable to deliver a response due to migrating canister, push reject signal. + stream.push_reject_signal(RejectReason::CanisterMigrating); + } + Some((_, RequestOrResponse::Response(_))) => { unreachable!("No signals are generated for response induction failures except for CanisterMigrating"); } } @@ -792,13 +784,18 @@ impl StreamHandlerImpl { } } + /// Inducts a message into `state`. There are 3 possible outcomes: + /// - `msg` successfully inducted, returns `None`. + /// - `msg` silently dropped (e.g. late best-effort response), `returns None`. + /// - Failed to induct `msg` (error or canister migrating), returns `(RejectReason, RequestOrResponse)`. + /// Caller is expected to produce a reject response or a reject signal. fn induct_message_impl( &self, msg: RequestOrResponse, msg_type: &str, state: &mut ReplicatedState, available_guaranteed_response_memory: &mut i64, - ) -> Result<(), (StateError, RequestOrResponse)> { + ) -> Option<(RejectReason, RequestOrResponse)> { // Subnet that should have received the message according to the routing table. let receiver_host_subnet = state .metadata @@ -823,13 +820,27 @@ impl StreamHandlerImpl { match msg { RequestOrResponse::Request(ref request) => { + let reason = match err { + StateError::CanisterNotFound(_) => { + RejectReason::CanisterNotFound + } + StateError::CanisterStopped(_) => RejectReason::CanisterStopped, + StateError::CanisterStopping(_) => { + RejectReason::CanisterStopping + } + StateError::QueueFull { .. } => RejectReason::QueueFull, + StateError::OutOfMemory { .. } => RejectReason::OutOfMemory, + // Unreachable. + StateError::NonMatchingResponse { .. } + | StateError::BitcoinNonMatchingResponse { .. } => { + RejectReason::Unknown + } + }; debug!( self.log, - "Induction failed with error '{}', generating reject Response for {:?}", - &err, - &request + "Inducting request failed: {}\n{:?}", &err, &request ); - return Err((err, msg)); + return Some((reason, msg)); } RequestOrResponse::Response(response) => { // Critical error, responses should always be inducted successfully. @@ -850,30 +861,26 @@ impl StreamHandlerImpl { // Receiver canister is migrating to/from this subnet. Some(host_subnet) if self.should_reroute_message_to(&msg, host_subnet, state) => { self.observe_inducted_message_status(msg_type, LABEL_VALUE_CANISTER_MIGRATED); - let err = StateError::CanisterMigrating { - canister_id: msg.receiver(), - host_subnet, - }; match &msg { RequestOrResponse::Request(request) => { debug!( self.log, - "Canister {} is being migrated, generating reject response for {:?}", + "Inducting request failed: Canister {} is migrating\n{:?}", request.receiver, request ); - return Err((err, msg)); + return Some((RejectReason::CanisterMigrating, msg)); } RequestOrResponse::Response(response) => { if state.metadata.certification_version >= CertificationVersion::V9 { debug!( self.log, - "Canister {} is being migrated, generating reject signal for {:?}", + "Inducting response failed: Canister {} is migrating\n{:?}", response.originator, response ); - return Err((err, msg)); + return Some((RejectReason::CanisterMigrating, msg)); } else { fatal!( self.log, @@ -906,7 +913,7 @@ impl StreamHandlerImpl { } // Any reject signals generated were returned before this point. - Ok(()) + None } /// Checks whether `actual_subnet_id` is a valid host subnet for `msg.sender()` diff --git a/rs/messaging/src/routing/stream_handler/tests.rs b/rs/messaging/src/routing/stream_handler/tests.rs index 934eb7e5618..83d31812e04 100644 --- a/rs/messaging/src/routing/stream_handler/tests.rs +++ b/rs/messaging/src/routing/stream_handler/tests.rs @@ -1,7 +1,8 @@ use super::*; use crate::message_routing::{LABEL_REMOTE, METRIC_TIME_IN_BACKLOG, METRIC_TIME_IN_STREAM}; use assert_matches::assert_matches; -use ic_base_types::NumSeconds; +use ic_base_types::{NumSeconds, PrincipalId}; +use ic_certification_version::{CertificationVersion, CURRENT_CERTIFICATION_VERSION}; use ic_config::execution_environment::Config as HypervisorConfig; use ic_interfaces::messaging::LABEL_VALUE_CANISTER_NOT_FOUND; use ic_metrics::MetricsRegistry; @@ -10,7 +11,7 @@ use ic_registry_subnet_type::SubnetType; use ic_replicated_state::metadata_state::StreamMap; use ic_replicated_state::replicated_state::LABEL_VALUE_OUT_OF_MEMORY; use ic_replicated_state::testing::{ReplicatedStateTesting, SystemStateTesting}; -use ic_replicated_state::{CanisterStatus, ReplicatedState, Stream}; +use ic_replicated_state::{CanisterStatus, ReplicatedState, Stream, SystemState}; use ic_test_utilities_logger::with_test_replica_logger; use ic_test_utilities_metrics::{ fetch_histogram_stats, fetch_histogram_vec_count, fetch_int_counter, fetch_int_counter_vec, @@ -153,8 +154,12 @@ fn induct_loopback_stream_empty_loopback_stream() { ); } -/// Tests that inducting a loopback stream containing a request to a non-existant canister produces a -/// corresponding reject response in the loopback stream. +/// Tests that inducting a loopback stream containing a request to a non-existant canister results +/// in a reject response addressed to `LOCAL_CANISTER` inducted into the state. +/// +/// Note that `induct_loopback_stream()` first inducts the loopback stream as a stream slice, which +/// produces a reject signal for this request. In a second step the loopback stream is gc'ed, +/// which collects the signal and triggers generating and then inducting a corrsponding reject response. #[test] fn induct_loopback_stream_reject_response() { // A loopback stream with 1 message addressed to an unknown canister. @@ -166,10 +171,64 @@ fn induct_loopback_stream_reject_response() { ..StreamConfig::default() }], |stream_handler, state, metrics| { - let reject_response = generate_reject_response( + let mut expected_state = state.clone(); + // Expecting a state with reject response inducted for the request @21. + push_input( + &mut expected_state, + generate_reject_response_for( + RejectReason::CanisterNotFound, + request_in_stream(state.get_stream(&LOCAL_SUBNET), 21), + ), + ); + + // Expecting an empty loopback stream with begin advanced. + let loopback_stream = stream_from_config(StreamConfig { + begin: 22, + signals_end: 22, + ..StreamConfig::default() + }); + expected_state.with_streams(btreemap![LOCAL_SUBNET => loopback_stream]); + + let mut available_guaranteed_response_memory = + stream_handler.available_guaranteed_response_memory(&state); + + let inducted_state = stream_handler + .induct_loopback_stream(state, &mut available_guaranteed_response_memory); + + assert_eq!(expected_state, inducted_state); + assert_eq!( + stream_handler.available_guaranteed_response_memory(&inducted_state), + available_guaranteed_response_memory + ); + + metrics.assert_inducted_xnet_messages_eq(&[ + (LABEL_VALUE_TYPE_REQUEST, LABEL_VALUE_CANISTER_NOT_FOUND, 1), + (LABEL_VALUE_TYPE_RESPONSE, LABEL_VALUE_SUCCESS, 1), + ]); + assert_eq!(1, metrics.fetch_inducted_payload_sizes_stats().count); + // No critical errors raised. + metrics.assert_eq_critical_errors(CriticalErrorCounts::default()); + }, + ); +} + +// TODO: Remove legacy tests once certification versions < V19 can be phased out safely. +/// Tests that inducting a loopback stream containing a request to a non-existant canister produces a +/// corresponding reject response in the loopback stream. +#[test] +fn legacy_induct_loopback_stream_reject_response() { + // A loopback stream with 1 message addressed to an unknown canister. + with_legacy_local_test_setup( + btreemap![LOCAL_SUBNET => StreamConfig { + begin: 21, + messages: vec![Request(*LOCAL_CANISTER, *OTHER_LOCAL_CANISTER)], + signals_end: 21, + ..StreamConfig::default() + }], + |stream_handler, state, metrics| { + let reject_response = generate_reject_response_for( + RejectReason::CanisterNotFound, request_in_stream(state.get_stream(&LOCAL_SUBNET), 21), - RejectCode::DestinationInvalid, - StateError::CanisterNotFound(*OTHER_LOCAL_CANISTER).to_string(), ); let reject_response_count_bytes = reject_response.count_bytes(); @@ -221,12 +280,106 @@ fn induct_loopback_stream_reject_response() { /// migrated to `CANISTER_MIGRATION_SUBNET` /// - are inducted successfully when addressed to the non-migrating canister `LOCAL_CANISTER`. /// - requests trigger a reject response when addressed the migrating canister -/// `OTHER_LOCAL_CANISTER`. +/// `OTHER_LOCAL_CANISTER` that is inducted into the state. /// - responses are rerouted into the stream to `CANISTER_MIGRATION_SUBNET` when addressed to the /// migrating canister `OTHER_LOCAL_CANISTER`. #[test] fn induct_loopback_stream_reroute_response() { with_local_test_setup( + btreemap![LOCAL_SUBNET => StreamConfig { + begin: 21, + messages: vec![ + Request(*LOCAL_CANISTER, *LOCAL_CANISTER), + Response(*LOCAL_CANISTER, *LOCAL_CANISTER), + Request(*LOCAL_CANISTER, *OTHER_LOCAL_CANISTER), + Response(*LOCAL_CANISTER, *OTHER_LOCAL_CANISTER), + ], + signals_end: 21, + ..StreamConfig::default() + }], + |stream_handler, state, metrics| { + // `OTHER_LOCAL_CANISTER` was hosted by the `LOCAL_SUBNET` but then migrated. + let state = simulate_canister_migration( + state, + *OTHER_LOCAL_CANISTER, + LOCAL_SUBNET, + CANISTER_MIGRATION_SUBNET, + ); + + let mut expected_state = state.clone(); + // The request @21 and the response @22 are expected to be inducted successfully; + // the request @23 is expected to trigger a reject response which is inducted + // successfully. + let inducted_response = message_in_stream(state.get_stream(&LOCAL_SUBNET), 22); + let inducted_response_count_bytes = inducted_response.count_bytes(); + push_inputs( + &mut expected_state, + [ + message_in_stream(state.get_stream(&LOCAL_SUBNET), 21), + inducted_response, + &generate_reject_response_for( + RejectReason::CanisterMigrating, + request_in_stream(state.get_stream(&LOCAL_SUBNET), 23), + ), + ], + ); + + // The loopback stream is expected to be empty, with signals advanced. + let loopback_stream = stream_from_config(StreamConfig { + begin: 25, + signals_end: 25, + ..StreamConfig::default() + }); + + // A new outgoing stream is generated with the response @24 rerouted into it. + let migration_stream = stream_from_config(StreamConfig { + messages: vec![message_in_stream(state.get_stream(&LOCAL_SUBNET), 24).clone()], + ..StreamConfig::default() + }); + + expected_state.with_streams(btreemap![ + LOCAL_SUBNET => loopback_stream, + CANISTER_MIGRATION_SUBNET => migration_stream, + ]); + + let mut available_guaranteed_response_memory = + stream_handler.available_guaranteed_response_memory(&state); + + let inducted_state = stream_handler + .induct_loopback_stream(state, &mut available_guaranteed_response_memory); + + assert_eq!(expected_state, inducted_state); + // `available_guaranteed_response_memory` does not keep track of gc'ing + // the response @22 in the loopback stream after inducting it. + assert_eq!( + available_guaranteed_response_memory + inducted_response_count_bytes as i64, + stream_handler.available_guaranteed_response_memory(&inducted_state), + ); + + metrics.assert_inducted_xnet_messages_eq(&[ + (LABEL_VALUE_TYPE_REQUEST, LABEL_VALUE_SUCCESS, 1), + (LABEL_VALUE_TYPE_REQUEST, LABEL_VALUE_CANISTER_MIGRATED, 1), + (LABEL_VALUE_TYPE_RESPONSE, LABEL_VALUE_SUCCESS, 2), + (LABEL_VALUE_TYPE_RESPONSE, LABEL_VALUE_CANISTER_MIGRATED, 1), + ]); + assert_eq!(3, metrics.fetch_inducted_payload_sizes_stats().count); + // No critical errors raised. + metrics.assert_eq_critical_errors(CriticalErrorCounts::default()); + }, + ); +} + +// TODO: Remove legacy tests once certification versions < V19 can be phased out safely. +/// Tests that messages in the loopback stream on a subnet where `OTHER_LOCAL_CANISTER` has been +/// migrated to `CANISTER_MIGRATION_SUBNET` +/// - are inducted successfully when addressed to the non-migrating canister `LOCAL_CANISTER`. +/// - requests trigger a reject response when addressed the migrating canister +/// `OTHER_LOCAL_CANISTER`. +/// - responses are rerouted into the stream to `CANISTER_MIGRATION_SUBNET` when addressed to the +/// migrating canister `OTHER_LOCAL_CANISTER`. +#[test] +fn legacy_induct_loopback_stream_reroute_response() { + with_legacy_local_test_setup( btreemap![LOCAL_SUBNET => StreamConfig { begin: 21, messages: vec![ @@ -259,13 +412,9 @@ fn induct_loopback_stream_reroute_response() { push_input(&mut expected_state, inducted_response); // The request @23 is expected to trigger a reject response in the loopback stream. - let reject_response = generate_reject_response( + let reject_response = generate_reject_response_for( + RejectReason::CanisterMigrating, request_in_stream(state.get_stream(&LOCAL_SUBNET), 23), - RejectCode::SysTransient, - format!( - "Canister {} is being migrated to/from {}", - *OTHER_LOCAL_CANISTER, CANISTER_MIGRATION_SUBNET - ), ); let reject_response_count_bytes = reject_response.count_bytes(); let loopback_stream = stream_from_config(StreamConfig { @@ -398,6 +547,18 @@ fn induct_loopback_stream_with_subnet_message_memory_limit() { }); } +// TODO: Remove legacy tests once certification versions < V19 can be phased out safely. +/// Tests that subnet message memory limit is enforced by +/// `StreamHandlerImpl::induct_loopback_stream()`. +#[test] +fn legacy_induct_loopback_stream_with_subnet_message_memory_limit() { + // A stream handler with a subnet message memory limit that only allows up to 3 reservations. + legacy_induct_loopback_stream_with_memory_limit_impl(HypervisorConfig { + subnet_message_memory_capacity: NumBytes::new(MAX_RESPONSE_COUNT_BYTES as u64 * 7 / 2), + ..Default::default() + }); +} + /// Tests that wasm custom sections memory capacity does not affect /// `StreamHandlerImpl::induct_loopback_stream()`. #[test] @@ -411,6 +572,20 @@ fn induct_loopback_stream_with_zero_subnet_wasm_custom_sections_limit() { }); } +// TODO: Remove legacy tests once certification versions < V19 can be phased out safely. +/// Tests that wasm custom sections memory capacity does not affect +/// `StreamHandlerImpl::induct_loopback_stream()`. +#[test] +fn legacy_induct_loopback_stream_with_zero_subnet_wasm_custom_sections_limit() { + // A stream handler with a subnet message memory limit that only allows up to 3 reservations + // and no allowance for wasm custom sections. + legacy_induct_loopback_stream_with_memory_limit_impl(HypervisorConfig { + subnet_message_memory_capacity: NumBytes::new(MAX_RESPONSE_COUNT_BYTES as u64 * 7 / 2), + subnet_wasm_custom_sections_memory_capacity: NumBytes::new(0), + ..Default::default() + }); +} + /// Tests that canister memory limit is ignored by /// `StreamHandlerImpl::induct_loopback_stream()` for system subnets. #[test] @@ -461,11 +636,13 @@ fn system_subnet_induct_loopback_stream_ignores_subnet_wasm_custom_sections_memo fn with_induct_loopback_stream_setup( config: HypervisorConfig, subnet_type: SubnetType, + certification_version: CertificationVersion, test_impl: impl FnOnce(StreamHandlerImpl, ReplicatedState, MetricsFixture), ) { with_local_test_setup_and_config( config, subnet_type, + certification_version, btreemap![LOCAL_SUBNET => StreamConfig { begin: 21, messages: vec![ @@ -493,6 +670,66 @@ fn induct_loopback_stream_with_memory_limit_impl(config: HypervisorConfig) { with_induct_loopback_stream_setup( config, SubnetType::Application, + CURRENT_CERTIFICATION_VERSION, + |stream_handler, state, metrics| { + let mut expected_state = state.clone(); + // Expecting a canister state with... the first message and a reject response for the + // request @22 inducted... + push_inputs( + &mut expected_state, + [ + message_in_stream(state.get_stream(&LOCAL_SUBNET), 21), + &generate_reject_response_for( + RejectReason::OutOfMemory, + request_in_stream(state.get_stream(&LOCAL_SUBNET), 22), + ), + ], + ); + + // ...and an empty loopback stream with indices advanced. + let loopback_stream = stream_from_config(StreamConfig { + begin: 23, + signals_end: 23, + ..StreamConfig::default() + }); + expected_state.with_streams(btreemap![LOCAL_SUBNET => loopback_stream]); + + let mut available_guaranteed_response_memory = + stream_handler.available_guaranteed_response_memory(&state); + let inducted_state = stream_handler + .induct_loopback_stream(state, &mut available_guaranteed_response_memory); + + assert_eq!(expected_state, inducted_state); + assert_eq!( + stream_handler.available_guaranteed_response_memory(&inducted_state), + available_guaranteed_response_memory + ); + metrics.assert_inducted_xnet_messages_eq(&[ + (LABEL_VALUE_TYPE_REQUEST, LABEL_VALUE_SUCCESS, 1), + (LABEL_VALUE_TYPE_REQUEST, LABEL_VALUE_OUT_OF_MEMORY, 1), + (LABEL_VALUE_TYPE_RESPONSE, LABEL_VALUE_SUCCESS, 1), + ]); + assert_eq!(2, metrics.fetch_inducted_payload_sizes_stats().count); + }, + ); +} + +// TODO: Remove legacy tests once certification versions < V19 can be phased out safely. +/// Common implementation for `StreamHandlerImpl::induct_loopback_stream()` +/// memory limit tests. Expects a `StreamHandlerImpl` with canister; subnet; or +/// subnet message; memory limits only large enough for 3 in-flight requests +/// plus epsilon at a time. Ensures that the limits are enforced when inducting +/// the loopback stream. +/// +/// Sets up a canister with two input queue reservations for two in-flight +/// loopback requests and a loopback stream containing said requests. Tries to +/// induct the loopback stream and expects the first request to be inducted; and +/// the second request to fail to be inducted due to lack of memory. +fn legacy_induct_loopback_stream_with_memory_limit_impl(config: HypervisorConfig) { + with_induct_loopback_stream_setup( + config, + SubnetType::Application, + CertificationVersion::V18, |stream_handler, state, metrics| { let mut expected_state = state.clone(); // Expecting a canister state with the first message inducted... @@ -503,14 +740,9 @@ fn induct_loopback_stream_with_memory_limit_impl(config: HypervisorConfig) { // ...and a loopback stream with indices advanced and a reject response for the request @22. let loopback_stream = stream_from_config(StreamConfig { begin: 23, - messages: vec![generate_reject_response( + messages: vec![generate_reject_response_for( + RejectReason::OutOfMemory, request_in_stream(state.get_stream(&LOCAL_SUBNET), 22), - RejectCode::CanisterError, - StateError::OutOfMemory { - requested: NumBytes::new(MAX_RESPONSE_COUNT_BYTES as u64), - available: MAX_RESPONSE_COUNT_BYTES as i64 / 2, - } - .to_string(), )], signals_end: 23, ..StreamConfig::default() @@ -550,6 +782,7 @@ fn induct_loopback_stream_ignores_memory_limit_impl(config: HypervisorConfig) { with_induct_loopback_stream_setup( config, SubnetType::System, + CURRENT_CERTIFICATION_VERSION, |stream_handler, state, metrics| { let mut expected_state = state.clone(); // Expecting a canister state with the 2 requests inducted... @@ -1654,19 +1887,16 @@ fn check_stream_handler_locally_generated_reject_response_unknown() { ); } -// TODO: [MR-579] Remove this fixture and the tests that depend on it, since reject responses of -// this type are no longer produced. /// Common implementation for tests checking reject responses generated by the `StreamHandler` /// directly. -fn check_stream_handler_generated_reject_response_impl( +fn check_stream_handler_generated_reject_signal_impl( mut available_guaranteed_response_memory: i64, // This function will be fed with a replicated state that has one `LOCAL_CANISTER` installed. // It's purpose is to set the stage as required such that inducting the `loopback_stream` // induces the type of reject response that will be be compared against a reference given // by `expected_reject_code` and `expected_state_error`. setup_state: &dyn Fn(&mut ReplicatedState), - expected_reject_code: RejectCode, - expected_state_error: StateError, + reason: RejectReason, ) { with_local_test_setup( // A loopback stream with one request in it. @@ -1678,46 +1908,40 @@ fn check_stream_handler_generated_reject_response_impl( // Call the state setup function. setup_state(&mut state); - // Generate the expected reject response for the request in `loopback_stream`. - let request = request_in_stream(state.get_stream(&LOCAL_SUBNET), 0); - let reject_response = ic_types::messages::Response { - originator: request.sender, - respondent: request.receiver, - originator_reply_callback: request.sender_reply_callback, - refund: request.payment, - response_payload: Payload::Reject(RejectContext::new( - expected_reject_code, - expected_state_error, - )), - deadline: request.deadline, - }; + // A reject signal gets pushed into the loopback stream. + let mut expected_state = state.clone(); + let mut expected_stream = state.get_stream(&LOCAL_SUBNET).unwrap().clone(); + expected_stream.push_reject_signal(reason); + expected_state.with_streams(btreemap![LOCAL_SUBNET => expected_stream]); - // Induct the loopback stream and check the expected reject response is present. - let inducted_state = stream_handler - .induct_loopback_stream(state, &mut available_guaranteed_response_memory); - assert_eq!( - response_in_stream(inducted_state.get_stream(&LOCAL_SUBNET), 1), - &reject_response, + // Induct the loopback stream as a stream slice. + let loopback_stream_slice: StreamSlice = + state.get_stream(&LOCAL_SUBNET).unwrap().clone().into(); + let inducted_state = stream_handler.induct_stream_slices( + state, + btreemap![LOCAL_SUBNET => loopback_stream_slice], + &mut available_guaranteed_response_memory, ); + + assert_eq!(expected_state, inducted_state); }, ); } #[test] -fn check_stream_handler_generated_reject_response_canister_not_found() { - check_stream_handler_generated_reject_response_impl( +fn check_stream_handler_generated_reject_signal_canister_not_found() { + check_stream_handler_generated_reject_signal_impl( i64::MAX / 2, // `available_guaranteed_response_memory` &|state| { state.canister_states.remove(&LOCAL_CANISTER).unwrap(); }, - RejectCode::DestinationInvalid, - StateError::CanisterNotFound(*LOCAL_CANISTER), + RejectReason::CanisterNotFound, ); } #[test] -fn check_stream_handler_generated_reject_response_canister_stopped() { - check_stream_handler_generated_reject_response_impl( +fn check_stream_handler_generated_reject_signal_canister_stopped() { + check_stream_handler_generated_reject_signal_impl( i64::MAX / 2, // `available_guaranteed_response_memory` &|state| { state @@ -1727,14 +1951,13 @@ fn check_stream_handler_generated_reject_response_canister_stopped() { .system_state .set_status(CanisterStatus::Stopped); }, - RejectCode::CanisterError, - StateError::CanisterStopped(*LOCAL_CANISTER), + RejectReason::CanisterStopped, ); } #[test] -fn check_stream_handler_generated_reject_response_canister_stopping() { - check_stream_handler_generated_reject_response_impl( +fn check_stream_handler_generated_reject_signal_canister_stopping() { + check_stream_handler_generated_reject_signal_impl( i64::MAX / 2, // `available_guaranteed_response_memory` &|state| { state @@ -1747,14 +1970,13 @@ fn check_stream_handler_generated_reject_response_canister_stopping() { stop_contexts: Default::default(), }); }, - RejectCode::CanisterError, - StateError::CanisterStopping(*LOCAL_CANISTER), + RejectReason::CanisterStopping, ); } #[test] -fn check_stream_handler_generated_reject_response_queue_full() { - check_stream_handler_generated_reject_response_impl( +fn check_stream_handler_generated_reject_signal_queue_full() { + check_stream_handler_generated_reject_signal_impl( i64::MAX / 2, // `available_guaranteed_response_memory` &|state| { let mut callback_id = 2; @@ -1766,27 +1988,22 @@ fn check_stream_handler_generated_reject_response_queue_full() { callback_id += 1; } }, - RejectCode::SysTransient, - StateError::QueueFull { capacity: 500 }, + RejectReason::QueueFull, ); } #[test] -fn check_stream_handler_generated_reject_response_out_of_memory() { - check_stream_handler_generated_reject_response_impl( +fn check_stream_handler_generated_reject_signal_out_of_memory() { + check_stream_handler_generated_reject_signal_impl( 0, // `available_guaranteed_response_memory` &|_| {}, - RejectCode::CanisterError, - StateError::OutOfMemory { - requested: (MAX_RESPONSE_COUNT_BYTES as u64).into(), - available: 0, - }, + RejectReason::OutOfMemory, ); } #[test] -fn check_stream_handler_generated_reject_response_canister_migrating() { - check_stream_handler_generated_reject_response_impl( +fn check_stream_handler_generated_reject_signal_canister_migrating() { + check_stream_handler_generated_reject_signal_impl( i64::MAX / 2, // `available_guaranteed_response_memory` &|state| { *state = simulate_canister_migration( @@ -1796,30 +2013,289 @@ fn check_stream_handler_generated_reject_response_canister_migrating() { CANISTER_MIGRATION_SUBNET, ); }, - RejectCode::SysTransient, - StateError::CanisterMigrating { - canister_id: *LOCAL_CANISTER, - host_subnet: CANISTER_MIGRATION_SUBNET, - }, + RejectReason::CanisterMigrating, ); } -/// Tests that inducting stream slices results in signals appended to `StreamHeaders`; -/// and messages included into canister `InputQueues` or reject `Responses` on output streams -/// as appropriate. -#[test] -fn induct_stream_slices_partial_success() { - with_test_setup( - // An outgoing stream with one request and one response in it. - btreemap![REMOTE_SUBNET => StreamConfig { - begin: 31, - messages: vec![ - Request(*LOCAL_CANISTER, *REMOTE_CANISTER), - Response(*LOCAL_CANISTER, *REMOTE_CANISTER), - ], - signals_end: 43, - ..StreamConfig::default() - }], +// TODO: Remove legacy tests once certification versions < V19 can be phased out safely. +/// Common implementation for tests checking reject responses generated by the `StreamHandler` +/// directly. +fn legacy_check_stream_handler_generated_reject_response_impl( + mut available_guaranteed_response_memory: i64, + // This function will be fed with a replicated state that has one `LOCAL_CANISTER` installed. + // It's purpose is to set the stage as required such that inducting the `loopback_stream` + // induces the type of reject response that will be be compared against a reference given + // by `expected_reject_code` and `expected_state_error`. + setup_state: &dyn Fn(&mut ReplicatedState), + expected_reject_reason: RejectReason, +) { + with_legacy_local_test_setup( + // A loopback stream with one request in it. + btreemap![LOCAL_SUBNET => StreamConfig { + messages: vec![Request(*LOCAL_CANISTER, *LOCAL_CANISTER)], + ..StreamConfig::default() + }], + |stream_handler, mut state, _| { + // Call the state setup function. + setup_state(&mut state); + + // Generate the expected reject response for the request in `loopback_stream`. + let request = request_in_stream(state.get_stream(&LOCAL_SUBNET), 0); + let reject_response = generate_reject_response_for(expected_reject_reason, request); + + // Induct the loopback stream and check the expected reject response is present. + let inducted_state = stream_handler + .induct_loopback_stream(state, &mut available_guaranteed_response_memory); + assert_eq!( + message_in_stream(inducted_state.get_stream(&LOCAL_SUBNET), 1), + &reject_response, + ); + }, + ); +} + +// TODO: Remove legacy tests once certification versions < V19 can be phased out safely. +#[test] +fn legacy_check_stream_handler_generated_reject_response_canister_not_found() { + legacy_check_stream_handler_generated_reject_response_impl( + i64::MAX / 2, // `available_guaranteed_response_memory` + &|state| { + state.canister_states.remove(&LOCAL_CANISTER).unwrap(); + }, + RejectReason::CanisterNotFound, + ); +} + +// TODO: Remove legacy tests once certification versions < V19 can be phased out safely. +#[test] +fn legacy_check_stream_handler_generated_reject_response_canister_stopped() { + legacy_check_stream_handler_generated_reject_response_impl( + i64::MAX / 2, // `available_guaranteed_response_memory` + &|state| { + state + .canister_states + .get_mut(&LOCAL_CANISTER) + .unwrap() + .system_state = SystemState::new_stopped_for_testing( + *LOCAL_CANISTER, + PrincipalId::default(), + Cycles::new(u128::MAX / 2), + NumSeconds::from(0), + ); + }, + RejectReason::CanisterStopped, + ); +} + +// TODO: Remove legacy tests once certification versions < V19 can be phased out safely. +#[test] +fn legacy_check_stream_handler_generated_reject_response_canister_stopping() { + legacy_check_stream_handler_generated_reject_response_impl( + i64::MAX / 2, // `available_guaranteed_response_memory` + &|state| { + state + .canister_states + .get_mut(&LOCAL_CANISTER) + .unwrap() + .system_state = SystemState::new_stopping_for_testing( + *LOCAL_CANISTER, + PrincipalId::default(), + Cycles::new(u128::MAX / 2), + NumSeconds::from(0), + ); + }, + RejectReason::CanisterStopping, + ); +} + +// TODO: Remove legacy tests once certification versions < V19 can be phased out safely. +#[test] +fn legacy_check_stream_handler_generated_reject_response_queue_full() { + legacy_check_stream_handler_generated_reject_response_impl( + i64::MAX / 2, // `available_guaranteed_response_memory` + &|state| { + let mut callback_id = 2; + while let Ok(()) = state.push_input( + Request(*LOCAL_CANISTER, *LOCAL_CANISTER) + .build_with(CallbackId::new(callback_id), 0), + &mut (i64::MAX / 2), + ) { + callback_id += 1; + } + }, + RejectReason::QueueFull, + ); +} + +// TODO: Remove legacy tests once certification versions < V19 can be phased out safely. +#[test] +fn lgeacy_check_stream_handler_generated_reject_response_out_of_memory() { + legacy_check_stream_handler_generated_reject_response_impl( + 0, // `available_guaranteed_response_memory` + &|_| {}, + RejectReason::OutOfMemory, + ); +} + +// TODO: Remove legacy tests once certification versions < V19 can be phased out safely. +#[test] +fn legacy_check_stream_handler_generated_reject_response_canister_migrating() { + legacy_check_stream_handler_generated_reject_response_impl( + i64::MAX / 2, // `available_guaranteed_response_memory` + &|state| { + *state = simulate_canister_migration( + state.clone(), + *LOCAL_CANISTER, + LOCAL_SUBNET, + CANISTER_MIGRATION_SUBNET, + ); + }, + RejectReason::CanisterMigrating, + ); +} + +/// Tests that inducting stream slices results in signals appended to `StreamHeaders`; +/// and messages included into canister `InputQueues` or reject `Responses` on output streams +/// as appropriate. +#[test] +fn induct_stream_slices_partial_success() { + with_test_setup( + // An outgoing stream with one request and one response in it. + btreemap![REMOTE_SUBNET => StreamConfig { + begin: 31, + messages: vec![ + Request(*LOCAL_CANISTER, *REMOTE_CANISTER), + Response(*LOCAL_CANISTER, *REMOTE_CANISTER), + ], + signals_end: 43, + ..StreamConfig::default() + }], + // An incoming stream slice with... + btreemap![REMOTE_SUBNET => StreamSliceConfig { + messages_begin: 43, + messages: vec![ + // ...two incoming request @43 and @44... + Request(*REMOTE_CANISTER, *LOCAL_CANISTER), + Request(*REMOTE_CANISTER, *LOCAL_CANISTER), + // ...an incoming response @45... + Response(*REMOTE_CANISTER, *LOCAL_CANISTER), + // ...a request to a missing canister @46 (on this subnet according to the + // routing table); this is expected to trigger a reject response... + Request(*REMOTE_CANISTER, *OTHER_LOCAL_CANISTER), + // ..a request not from `REMOTE_SUBNET` @47... + Request(*LOCAL_CANISTER, *LOCAL_CANISTER), + // ...a request from a missing canister @48 (not anywhere according to the routing + // table); this is expected to be accepted but dropped... + Request(*UNKNOWN_CANISTER, *LOCAL_CANISTER), + // ...and a response to a missing canister @49 (on this subnet according to the + // routing table); this expected to be accepted but dropped. + Response(*REMOTE_CANISTER, *OTHER_LOCAL_CANISTER), + ], + // ...and two accept signals. + signals_end: 33, + ..StreamSliceConfig::default() + }], + |stream_handler, state, slices, metrics| { + let mut expected_state = state.clone(); + // The expected state has the first 3 messages inducted. + push_inputs( + &mut expected_state, + messages_in_slice(slices.get(&REMOTE_SUBNET), 43..=45), + ); + let response_count_bytes = + response_in_slice(slices.get(&REMOTE_SUBNET), 45).count_bytes(); + + // The expected stream has... + let expected_stream = stream_from_config(StreamConfig { + begin: 31, + // ...the two initial messages as `induct_stream_slices` does not gc, + // a reject response for the request to a missing canister @46... + messages: vec![ + message_in_stream(state.get_stream(&REMOTE_SUBNET), 31).clone(), + message_in_stream(state.get_stream(&REMOTE_SUBNET), 32).clone(), + ], + // ...6 accept signals for the messages in the stream slice... + signals_end: 50, + reject_signals: vec![ + // ...and a reject signal for the request @46 due to a missing canister. + RejectSignal::new(RejectReason::CanisterNotFound, 46.into()), + ], + ..StreamConfig::default() + }); + expected_state.with_streams(btreemap![REMOTE_SUBNET => expected_stream]); + + let initial_available_guaranteed_response_memory = + stream_handler.available_guaranteed_response_memory(&state); + let mut available_guaranteed_response_memory = + initial_available_guaranteed_response_memory; + + // Act + let inducted_state = stream_handler.induct_stream_slices( + state, + slices, + &mut available_guaranteed_response_memory, + ); + + assert_eq!(expected_state, inducted_state); + // 2 requests and one response inducted (consuming 2 - 1 reservations). + assert_eq!( + initial_available_guaranteed_response_memory + - MAX_RESPONSE_COUNT_BYTES as i64 + - response_count_bytes as i64, + available_guaranteed_response_memory + ); + // Not equal, because the computed available memory does not account for the + // reject response (since it's from a nonexistent canister). + assert!( + stream_handler.available_guaranteed_response_memory(&inducted_state) + >= available_guaranteed_response_memory + ); + + metrics.assert_inducted_xnet_messages_eq(&[ + // Requests @43 and @44 successfully inducted. + (LABEL_VALUE_TYPE_REQUEST, LABEL_VALUE_SUCCESS, 2), + // Request @46 not inducted because of missing canister. + (LABEL_VALUE_TYPE_REQUEST, LABEL_VALUE_CANISTER_NOT_FOUND, 1), + // Request @47 not inducted because of canister not on `REMOTE_SUBNET`. + // Request @48 not inducted becaue of unknown canister sender. + ( + LABEL_VALUE_TYPE_REQUEST, + LABEL_VALUE_SENDER_SUBNET_MISMATCH, + 2, + ), + // Response @45 successfully inducted. + (LABEL_VALUE_TYPE_RESPONSE, LABEL_VALUE_SUCCESS, 1), + // Response @49 not inducted because of missing canister. + (LABEL_VALUE_TYPE_RESPONSE, LABEL_VALUE_CANISTER_NOT_FOUND, 1), + ]); + assert_eq!(3, metrics.fetch_inducted_payload_sizes_stats().count); + // Three critical errors raised. + metrics.assert_eq_critical_errors(CriticalErrorCounts { + induct_response_failed: 1, + sender_subnet_mismatch: 2, + ..CriticalErrorCounts::default() + }); + }, + ); +} + +// TODO: Remove legacy tests once certification versions < V19 can be phased out safely. +/// Tests that inducting stream slices results in signals appended to `StreamHeaders`; +/// and messages included into canister `InputQueues` or reject `Responses` on output streams +/// as appropriate. +#[test] +fn legacy_induct_stream_slices_partial_success() { + with_legacy_test_setup( + // An outgoing stream with one request and one response in it. + btreemap![REMOTE_SUBNET => StreamConfig { + begin: 31, + messages: vec![ + Request(*LOCAL_CANISTER, *REMOTE_CANISTER), + Response(*LOCAL_CANISTER, *REMOTE_CANISTER), + ], + signals_end: 43, + ..StreamConfig::default() + }], // An incoming stream slice with... btreemap![REMOTE_SUBNET => StreamSliceConfig { messages_begin: 43, @@ -1855,10 +2331,9 @@ fn induct_stream_slices_partial_success() { let response_count_bytes = response_in_slice(slices.get(&REMOTE_SUBNET), 45).count_bytes(); - let reject_response = generate_reject_response( + let reject_response = generate_reject_response_for( + RejectReason::CanisterNotFound, request_in_slice(slices.get(&REMOTE_SUBNET), 46), - RejectCode::DestinationInvalid, - StateError::CanisterNotFound(*OTHER_LOCAL_CANISTER).to_string(), ); let reject_response_count_bytes = reject_response.count_bytes(); @@ -2069,13 +2544,98 @@ fn induct_stream_slices_with_messages_to_migrating_canister() { LOCAL_SUBNET, ); - let reject_response = generate_reject_response( + let mut expected_state = state.clone(); + // Expecting a stream with... + let outgoing_stream = state.get_stream(&REMOTE_SUBNET); + let expected_stream = stream_from_config(StreamConfig { + begin: 21, + messages: vec![ + // ...the initial messages still in it... + message_in_stream(outgoing_stream, 21).clone(), + message_in_stream(outgoing_stream, 22).clone(), + ], + // ... a `signals_end` advanced by 2... + signals_end: 45, + // ...and reject signals for the request @43 and the response @44. + reject_signals: vec![ + RejectSignal::new(RejectReason::CanisterMigrating, 43.into()), + RejectSignal::new(RejectReason::CanisterMigrating, 44.into()), + ], + ..StreamConfig::default() + }); + expected_state.with_streams(btreemap![REMOTE_SUBNET => expected_stream]); + + let mut available_guaranteed_response_memory = + stream_handler.available_guaranteed_response_memory(&state); + + // Act. + let inducted_state = stream_handler.induct_stream_slices( + state, + slices, + &mut available_guaranteed_response_memory, + ); + + assert_eq!(expected_state, inducted_state); + assert_eq!( + stream_handler.available_guaranteed_response_memory(&inducted_state), + available_guaranteed_response_memory + ); + + metrics.assert_inducted_xnet_messages_eq(&[ + (LABEL_VALUE_TYPE_REQUEST, LABEL_VALUE_CANISTER_MIGRATED, 1), + (LABEL_VALUE_TYPE_RESPONSE, LABEL_VALUE_CANISTER_MIGRATED, 1), + ]); + assert_eq!(0, metrics.fetch_inducted_payload_sizes_stats().count); + // No critical errors raised. + metrics.assert_eq_critical_errors(CriticalErrorCounts::default()); + }, + ); +} + +// TODO: Remove legacy tests once certification versions < V19 can be phased out safely. +/// Tests that inducting stream slices containing messages to a canister that is +/// known to be in the process of migration but has not yet been migrated to +/// this subnet results in reject signals for responses and reject `Responses` +/// for requests on output streams. +#[test] +fn legacy_induct_stream_slices_with_messages_to_migrating_canister() { + with_legacy_test_setup( + // An outgoing stream with one request and one response in it. + btreemap![REMOTE_SUBNET => StreamConfig { + begin: 21, + messages: vec![ + Request(*LOCAL_CANISTER, *REMOTE_CANISTER), + Response(*LOCAL_CANISTER, *REMOTE_CANISTER), + ], + signals_end: 43, + ..StreamConfig::default() + }], + // An incoming stream slice with a request addressed to `REMOTE_CANISTER` @43 and a response + // woth the same recipients @44. + btreemap![REMOTE_SUBNET => StreamSliceConfig { + messages_begin: 43, + messages: vec![ + Request(*OTHER_REMOTE_CANISTER, *REMOTE_CANISTER), + Response(*OTHER_REMOTE_CANISTER, *REMOTE_CANISTER), + ], + signals_end: 21, + ..StreamSliceConfig::default() + }], + |stream_handler, state, slices, metrics| { + // `REMOTE_CANISTER` is hosted by `CANISTER_MIGRATION_SUBNET` but in the process + // of being migrated to `LOCAL_SUBNET`. + let state = + complete_canister_migration(state, *REMOTE_CANISTER, CANISTER_MIGRATION_SUBNET); + let state = prepare_canister_migration( + state, + *REMOTE_CANISTER, + CANISTER_MIGRATION_SUBNET, + LOCAL_SUBNET, + ); + + let reject_response = generate_reject_response_for( + RejectReason::CanisterMigrating, request_in_slice(slices.get(&REMOTE_SUBNET), 43), - RejectCode::SysTransient, - format!( - "Canister {} is being migrated to/from {}", - *REMOTE_CANISTER, CANISTER_MIGRATION_SUBNET - ), ); let reject_response_count_bytes = reject_response.count_bytes(); @@ -2173,13 +2733,93 @@ fn induct_stream_slices_with_messages_to_migrated_canister() { CANISTER_MIGRATION_SUBNET, ); - let reject_response = generate_reject_response( + let mut expected_state = state.clone(); + // Expecting a stream with... + let outgoing_stream = state.get_stream(&REMOTE_SUBNET); + let expected_stream = stream_from_config(StreamConfig { + begin: 21, + messages: vec![ + // ...the initial messages still in it... + message_in_stream(outgoing_stream, 21).clone(), + message_in_stream(outgoing_stream, 22).clone(), + ], + // ... a `signals_end` advanced by 2... + signals_end: 45, + // ...and reject signals for the request @43 and the response @44. + reject_signals: vec![ + RejectSignal::new(RejectReason::CanisterMigrating, 43.into()), + RejectSignal::new(RejectReason::CanisterMigrating, 44.into()), + ], + ..StreamConfig::default() + }); + expected_state.with_streams(btreemap![REMOTE_SUBNET => expected_stream]); + + let mut available_guaranteed_response_memory = + stream_handler.available_guaranteed_response_memory(&state); + + // Act + let inducted_state = stream_handler.induct_stream_slices( + state, + slices, + &mut available_guaranteed_response_memory, + ); + + assert_eq!(expected_state, inducted_state); + assert_eq!( + stream_handler.available_guaranteed_response_memory(&inducted_state), + available_guaranteed_response_memory + ); + + metrics.assert_inducted_xnet_messages_eq(&[ + (LABEL_VALUE_TYPE_REQUEST, LABEL_VALUE_CANISTER_MIGRATED, 1), + (LABEL_VALUE_TYPE_RESPONSE, LABEL_VALUE_CANISTER_MIGRATED, 1), + ]); + assert_eq!(0, metrics.fetch_inducted_payload_sizes_stats().count); + // No critical errors raised. + metrics.assert_eq_critical_errors(CriticalErrorCounts::default()); + }, + ); +} + +// TODO: Remove legacy tests once certification versions < V19 can be phased out safely. +/// Tests that inducting stream slices containing messages to a migrated +/// canister results in reject signals for responses and reject `Responses` for +/// requests on output streams. +#[test] +fn legacy_induct_stream_slices_with_messages_to_migrated_canister() { + with_legacy_test_setup( + // An outgoing stream with one request and one response in it. + btreemap![REMOTE_SUBNET => StreamConfig { + begin: 21, + messages: vec![ + Request(*LOCAL_CANISTER, *REMOTE_CANISTER), + Response(*LOCAL_CANISTER, *REMOTE_CANISTER), + ], + signals_end: 43, + ..StreamConfig::default() + }], + // An incoming stream slice with a request @43 and a response @44. + btreemap![REMOTE_SUBNET => StreamSliceConfig { + messages_begin: 43, + messages: vec![ + Request(*REMOTE_CANISTER, *LOCAL_CANISTER), + Response(*REMOTE_CANISTER, *LOCAL_CANISTER), + ], + signals_end: 21, + ..StreamSliceConfig::default() + }], + |stream_handler, state, slices, metrics| { + // `LOCAL_CANISTER` was hosted by the `LOCAL_SUBNET` but then migrated. + let state = simulate_canister_migration( + state, + *LOCAL_CANISTER, + LOCAL_SUBNET, + CANISTER_MIGRATION_SUBNET, + ); + + let reject_response = generate_reject_response_for( + RejectReason::CanisterMigrating, request_in_slice(slices.get(&REMOTE_SUBNET), 43), - RejectCode::SysTransient, - format!( - "Canister {} is being migrated to/from {}", - *LOCAL_CANISTER, CANISTER_MIGRATION_SUBNET - ), ); let reject_response_count_bytes = reject_response.count_bytes(); @@ -2283,21 +2923,104 @@ fn induct_stream_slices_with_messages_from_migrating_canister() { &mut expected_state, messages_in_slice(slices.get(&CANISTER_MIGRATION_SUBNET), 43..=44), ); - // ...and a stream with... - let migration_stream = state.get_stream(&CANISTER_MIGRATION_SUBNET); + // ...and a stream with... + let migration_stream = state.get_stream(&CANISTER_MIGRATION_SUBNET); + let expected_stream = stream_from_config(StreamConfig { + begin: 21, + messages: vec![ + // ...the initial messages still in it... + message_in_stream(migration_stream, 21).clone(), + message_in_stream(migration_stream, 22).clone(), + ], + // ...and a `signals_end` incremented by 2. + signals_end: 45, + ..StreamConfig::default() + }); + expected_state.with_streams(btreemap![CANISTER_MIGRATION_SUBNET => expected_stream]); + + let mut available_guaranteed_response_memory = + stream_handler.available_guaranteed_response_memory(&state); + let inducted_state = stream_handler.induct_stream_slices( + state, + slices, + &mut available_guaranteed_response_memory, + ); + + assert_eq!(expected_state, inducted_state); + assert_eq!( + stream_handler.available_guaranteed_response_memory(&inducted_state), + available_guaranteed_response_memory + ); + + metrics.assert_inducted_xnet_messages_eq(&[ + (LABEL_VALUE_TYPE_REQUEST, LABEL_VALUE_SUCCESS, 1), + (LABEL_VALUE_TYPE_RESPONSE, LABEL_VALUE_SUCCESS, 1), + ]); + assert_eq!(2, metrics.fetch_inducted_payload_sizes_stats().count); + // No critical errors raised. + metrics.assert_eq_critical_errors(CriticalErrorCounts::default()); + }, + ); +} + +/// Common implementation for memory limit tests setup such the subnet has +/// only enough message memory for one reservation (plus epsilon). +/// +/// Ensures that the limits are enforced when inducting stream slices. +/// +/// Tries to induct a slice consisting of `[request1, response, request2]`: +/// * `request1` will fail to be inducted due to lack of memory; +/// * `response` will be inducted and consume the existing reservation; +/// * `request2` will be inducted successfully, as there is now available +/// guaranteed response memory for one request. +fn induct_stream_slices_with_memory_limit_impl(subnet_type: SubnetType) { + with_test_setup_and_config( + // A config with only enough subnet message memory for one request + epsilon. + HypervisorConfig { + subnet_message_memory_capacity: NumBytes::new( + MAX_RESPONSE_COUNT_BYTES as u64 * 15 / 10, + ), + ..Default::default() + }, + subnet_type, + CURRENT_CERTIFICATION_VERSION, + // An empty outgoing stream. + btreemap![REMOTE_SUBNET => StreamConfig { + begin: 31, + signals_end: 43, + ..StreamConfig::default() + }], + // An incoming stream slice with [request1 @43, response @44, request2 @45] in it. + btreemap![REMOTE_SUBNET => StreamSliceConfig { + messages_begin: 43, + messages: vec![ + Request(*REMOTE_CANISTER, *LOCAL_CANISTER), + Response(*REMOTE_CANISTER, *LOCAL_CANISTER), + Request(*REMOTE_CANISTER, *LOCAL_CANISTER), + ], + signals_end: 31, + ..StreamSliceConfig::default() + }], + |stream_handler, state, slices, metrics| { + let mut expected_state = state.clone(); + // The expected state must have `response` and `request2` inducted. + push_inputs( + &mut expected_state, + messages_in_slice(slices.get(&REMOTE_SUBNET), 44..=45), + ); + // The expected stream is empty with advanced... let expected_stream = stream_from_config(StreamConfig { - begin: 21, - messages: vec![ - // ...the initial messages still in it... - message_in_stream(migration_stream, 21).clone(), - message_in_stream(migration_stream, 22).clone(), + begin: 31, + signals_end: 46, + reject_signals: vec![ + // ...and a reject signal for request1 @43 was appended. + RejectSignal::new(RejectReason::OutOfMemory, 43.into()), ], - // ...and a `signals_end` incremented by 2. - signals_end: 45, ..StreamConfig::default() }); - expected_state.with_streams(btreemap![CANISTER_MIGRATION_SUBNET => expected_stream]); + expected_state.with_streams(btreemap![REMOTE_SUBNET => expected_stream]); + // Act let mut available_guaranteed_response_memory = stream_handler.available_guaranteed_response_memory(&state); let inducted_state = stream_handler.induct_stream_slices( @@ -2306,14 +3029,15 @@ fn induct_stream_slices_with_messages_from_migrating_canister() { &mut available_guaranteed_response_memory, ); + // Assert assert_eq!(expected_state, inducted_state); assert_eq!( stream_handler.available_guaranteed_response_memory(&inducted_state), available_guaranteed_response_memory ); - metrics.assert_inducted_xnet_messages_eq(&[ (LABEL_VALUE_TYPE_REQUEST, LABEL_VALUE_SUCCESS, 1), + (LABEL_VALUE_TYPE_REQUEST, LABEL_VALUE_OUT_OF_MEMORY, 1), (LABEL_VALUE_TYPE_RESPONSE, LABEL_VALUE_SUCCESS, 1), ]); assert_eq!(2, metrics.fetch_inducted_payload_sizes_stats().count); @@ -2323,6 +3047,21 @@ fn induct_stream_slices_with_messages_from_migrating_canister() { ); } +/// Tests that subnet message memory limit is enforced by +/// `StreamHandlerImpl::induct_stream_slices()`. +#[test] +fn induct_stream_slices_with_subnet_message_memory_limit() { + induct_stream_slices_with_memory_limit_impl(SubnetType::Application); +} + +/// Tests that subnet message memory limit is enforced by +/// `StreamHandlerImpl::induct_stream_slices()` on system subnets. +#[test] +fn system_subnet_induct_stream_slices_with_subnet_message_memory_limit() { + induct_stream_slices_with_memory_limit_impl(SubnetType::System); +} + +// TODO: Remove legacy tests once certification versions < V19 can be phased out safely. /// Common implementation for memory limit tests setup such the subnet has /// only enough message memory for one reservation (plus epsilon). /// @@ -2333,7 +3072,7 @@ fn induct_stream_slices_with_messages_from_migrating_canister() { /// * `response` will be inducted and consume the existing reservation; /// * `request2` will be inducted successfully, as there is now available /// guaranteed response memory for one request. -fn induct_stream_slices_with_memory_limit_impl(subnet_type: SubnetType) { +fn legacy_induct_stream_slices_with_memory_limit_impl(subnet_type: SubnetType) { with_test_setup_and_config( // A config with only enough subnet message memory for one request + epsilon. HypervisorConfig { @@ -2343,6 +3082,7 @@ fn induct_stream_slices_with_memory_limit_impl(subnet_type: SubnetType) { ..Default::default() }, subnet_type, + CertificationVersion::V18, // An empty outgoing stream. btreemap![REMOTE_SUBNET => StreamConfig { begin: 31, @@ -2372,14 +3112,9 @@ fn induct_stream_slices_with_memory_limit_impl(subnet_type: SubnetType) { begin: 31, messages: vec![ // ...a reject response for request1 @43... - generate_reject_response( + generate_reject_response_for( + RejectReason::OutOfMemory, request_in_slice(slices.get(&REMOTE_SUBNET), 43), - RejectCode::CanisterError, - StateError::OutOfMemory { - requested: NumBytes::new(MAX_RESPONSE_COUNT_BYTES as u64), - available: MAX_RESPONSE_COUNT_BYTES as i64 / 2, - } - .to_string(), ), ], // ...and the `signal_end` incremented by 3. @@ -2415,18 +3150,20 @@ fn induct_stream_slices_with_memory_limit_impl(subnet_type: SubnetType) { ); } +// TODO: Remove legacy tests once certification versions < V19 can be phased out safely. /// Tests that subnet message memory limit is enforced by /// `StreamHandlerImpl::induct_stream_slices()`. #[test] -fn induct_stream_slices_with_subnet_message_memory_limit() { - induct_stream_slices_with_memory_limit_impl(SubnetType::Application); +fn legacy_induct_stream_slices_with_subnet_message_memory_limit() { + legacy_induct_stream_slices_with_memory_limit_impl(SubnetType::Application); } +// TODO: Remove legacy tests once certification versions < V19 can be phased out safely. /// Tests that subnet message memory limit is enforced by /// `StreamHandlerImpl::induct_stream_slices()` on system subnets. #[test] -fn system_subnet_induct_stream_slices_with_subnet_message_memory_limit() { - induct_stream_slices_with_memory_limit_impl(SubnetType::System); +fn legacy_system_subnet_induct_stream_slices_with_subnet_message_memory_limit() { + legacy_induct_stream_slices_with_memory_limit_impl(SubnetType::System); } /// Tests that messages in the loopback stream and incoming slices are inducted @@ -2688,6 +3425,214 @@ fn process_stream_slices_canister_migration_in_both_subnets_success() { CANISTER_MIGRATION_SUBNET, ); + let mut expected_state = state.clone(); + // The expected state has the first 5 loopback messages @21..=25 inducted... + push_inputs( + &mut expected_state, + messages_in_stream(state.get_stream(&LOCAL_SUBNET), 21..=25), + ); + // ...and a reject response for the request @26 and the first incoming message @153. + push_inputs( + &mut expected_state, + [ + &generate_reject_response_for( + RejectReason::CanisterMigrating, + request_in_stream(state.get_stream(&LOCAL_SUBNET), 26), + ), + message_in_slice(slices.get(&REMOTE_SUBNET), 153), + ], + ); + + // The expected loopback stream has all initial messages gc'ed. + let expected_loopback_stream = stream_from_config(StreamConfig { + begin: 28, + signals_end: 28, + ..StreamConfig::default() + }); + + // The expected outgoing stream is pruned and has reject signals for the messages + // @154..=157. + let pruned_outgoing_stream = stream_from_config(StreamConfig { + begin: 34, + messages: vec![ + // ...one message @34 not gc'ed... + message_in_stream(state.get_stream(&REMOTE_SUBNET), 34).clone(), + ], + signals_end: 158, + reject_signals: vec![ + RejectSignal::new(RejectReason::CanisterMigrating, 142.into()), + RejectSignal::new(RejectReason::CanisterMigrating, 145.into()), + RejectSignal::new(RejectReason::CanisterMigrating, 154.into()), + RejectSignal::new(RejectReason::CanisterMigrating, 155.into()), + RejectSignal::new(RejectReason::CanisterMigrating, 156.into()), + RejectSignal::new(RejectReason::CanisterMigrating, 157.into()), + ], + ..StreamConfig::default() + }); + + // The expected stream to `CANISTER_MIGRATION_SUBNET` has... + let rerouted_stream = stream_from_config(StreamConfig { + messages: vec![ + // ...the response @27 rerouted... + message_in_stream(state.get_stream(&LOCAL_SUBNET), 27).clone(), + // ...and the response @33 rerouted. + message_in_stream(state.get_stream(&REMOTE_SUBNET), 33).clone(), + ], + ..StreamConfig::default() + }); + + expected_state.with_streams(btreemap![ + LOCAL_SUBNET => expected_loopback_stream, + REMOTE_SUBNET => pruned_outgoing_stream, + CANISTER_MIGRATION_SUBNET => rerouted_stream, + ]); + + // Act + let inducted_state = stream_handler.process_stream_slices(state, slices); + + assert_eq!(expected_state, inducted_state); + + // 2 incoming messages discarded and 3 loopback +1 incoming inducted. + metrics.assert_inducted_xnet_messages_eq(&[ + (LABEL_VALUE_TYPE_REQUEST, LABEL_VALUE_CANISTER_MIGRATED, 3), + (LABEL_VALUE_TYPE_RESPONSE, LABEL_VALUE_CANISTER_MIGRATED, 3), + (LABEL_VALUE_TYPE_REQUEST, LABEL_VALUE_SUCCESS, 5), + (LABEL_VALUE_TYPE_RESPONSE, LABEL_VALUE_SUCCESS, 2), + ]); + // 7 messages inducted, compare above. + assert_eq!(7, metrics.fetch_inducted_payload_sizes_stats().count); + // 7 messages GC-ed from loopback stream, 3 from outgoing stream. + assert_eq!( + Some(10), + metrics.fetch_int_counter(METRIC_GCED_XNET_MESSAGES), + ); + // 3 reject signals from outgoing stream (138, 139, 142) were gc-ed; + // and 1 reject signal for the rejected request in the loopback stream. + assert_eq!( + Some(4), + metrics.fetch_int_counter(METRIC_GCED_XNET_REJECT_SIGNALS), + ); + assert_eq!( + metric_vec(&[(&[(LABEL_REMOTE, &REMOTE_SUBNET.to_string())], 0)]), + metrics.fetch_int_gauge_vec(METRIC_XNET_MESSAGE_BACKLOG), + ); + // Check the number of GC-ed messages in the stream for the remote subnet. + assert_eq!( + metric_vec(&[(&[(&LABEL_REMOTE, &REMOTE_SUBNET.to_string().as_str())], 3)]), + metrics.fetch_histogram_vec_count(METRIC_TIME_IN_STREAM), + ); + // Check the number of inducted messages in the slice from the remote subnet. + assert_eq!( + metric_vec(&[(&[(&LABEL_REMOTE, &REMOTE_SUBNET.to_string().as_str())], 5)]), + metrics.fetch_histogram_vec_count(METRIC_TIME_IN_BACKLOG), + ); + }, + ); +} + +// TODO: Remove legacy tests once certification versions < V19 can be phased out safely. +/// Tests that when canister migration happens in both sending and receiving subnets, +/// messages in the loopback stream and incoming slices are inducted +/// (with signals added appropriately); and messages present in the initial +/// state are garbage collected or rerouted as appropriate. +#[test] +fn legacy_process_stream_slices_canister_migration_in_both_subnets_success() { + with_legacy_test_setup( + btreemap![ + // A loopback stream with... + LOCAL_SUBNET => StreamConfig { + begin: 21, + messages: vec![ + // ...3 messages to and from `LOCAL_CANISTER` @21..=23... + Request(*LOCAL_CANISTER, *LOCAL_CANISTER), + Request(*LOCAL_CANISTER, *LOCAL_CANISTER), + Request(*LOCAL_CANISTER, *LOCAL_CANISTER), + // ...a request @24 and a response @25 to `LOCAL_CANISTER` from `OTHER_LOCAL_CANISTER`... + Request(*OTHER_LOCAL_CANISTER, *LOCAL_CANISTER), + Response(*OTHER_LOCAL_CANISTER, *LOCAL_CANISTER), + // ...a request @26 from `LOCAL_CANISTER` to `OTHER_LOCAL_CANISTER (a reject response + // should be generated for it)... + Request(*LOCAL_CANISTER, *OTHER_LOCAL_CANISTER), + // ...and a response @27 from `LOCAL_CANISTER` to `OTHER_LOCAL_CANISTER` (a reject + // signal will be generated during induction; the response will be rerouted; and then + // the reject signal is gc'ed; i.e. the signal is never visible). + Response(*LOCAL_CANISTER, *OTHER_LOCAL_CANISTER), + ], + signals_end: 21, + ..StreamConfig::default() + }, + // An outgoing stream with... + REMOTE_SUBNET => StreamConfig { + begin: 31, + messages: vec![ + // ...4 messages from `LOCAL_CANISTER` to `REMOTE_CANISTER`... @31..=34, with a + // response @33... + Request(*LOCAL_CANISTER, *REMOTE_CANISTER), + Request(*LOCAL_CANISTER, *REMOTE_CANISTER), + Response(*LOCAL_CANISTER, *REMOTE_CANISTER), + Request(*LOCAL_CANISTER, *REMOTE_CANISTER), + ], + signals_end: 153, + // ...and 4 reject signals. + reject_signals: vec![ + RejectSignal::new(RejectReason::CanisterMigrating, 138.into()), + RejectSignal::new(RejectReason::CanisterMigrating, 139.into()), + RejectSignal::new(RejectReason::CanisterMigrating, 142.into()), + RejectSignal::new(RejectReason::CanisterMigrating, 145.into()), + ], + ..StreamConfig::default() + } + ], + // An incoming stream slice with... + btreemap![REMOTE_SUBNET => StreamSliceConfig { + header_begin: Some(142), + messages_begin: 153, + messages: vec![ + // ...a request @153 from `REMOTE_CANISTER` to `LOCAL_CANISTER`... + Request(*REMOTE_CANISTER, *LOCAL_CANISTER), + // ...one request @154 to the migrated canister... + Request(*OTHER_REMOTE_CANISTER, *OTHER_LOCAL_CANISTER), + // ...one response @155 to the migrated canister... + Response(*OTHER_REMOTE_CANISTER, *OTHER_LOCAL_CANISTER), + // ...one request @156 between the two migrated canisters... + Request(*REMOTE_CANISTER, *OTHER_LOCAL_CANISTER), + // ...one response @157 between the two migrated canisters... + Response(*REMOTE_CANISTER, *OTHER_LOCAL_CANISTER), + ], + signals_end: 34, + // ..and a reject signal for the response @33. + reject_signals: vec![RejectSignal::new( + RejectReason::CanisterMigrating, + 33.into(), + )], + ..StreamSliceConfig::default() + }], + |stream_handler, state, slices, metrics| { + stream_handler + .time_in_stream_metrics + .lock() + .unwrap() + .record_header( + REMOTE_SUBNET, + &state.get_stream(&REMOTE_SUBNET).unwrap().header(), + ); + + // `OTHER_LOCAL_CANISTER` is marked as migrating from `LOCAL_SUBNET` to `CANISTER_MIGRATION_SUBNET`. + let state = simulate_canister_migration( + state, + *OTHER_LOCAL_CANISTER, + LOCAL_SUBNET, + CANISTER_MIGRATION_SUBNET, + ); + + // `REMOTE_CANISTER` is marked as migrating from `REMOTE_SUBNET` to `CANISTER_MIGRATION_SUBNET`. + let state = simulate_canister_migration( + state, + *REMOTE_CANISTER, + REMOTE_SUBNET, + CANISTER_MIGRATION_SUBNET, + ); + let mut expected_state = state.clone(); // The expected state has the first 5 loopback messages @21..=25 inducted... push_inputs( @@ -2705,13 +3650,9 @@ fn process_stream_slices_canister_migration_in_both_subnets_success() { begin: 28, messages: vec![ // ...and a reject response for the request @26. - generate_reject_response( + generate_reject_response_for( + RejectReason::CanisterMigrating, request_in_stream(state.get_stream(&LOCAL_SUBNET), 26), - RejectCode::SysTransient, - format!( - "Canister {} is being migrated to/from {}", - *OTHER_LOCAL_CANISTER, CANISTER_MIGRATION_SUBNET - ), ), ], signals_end: 28, @@ -2725,22 +3666,14 @@ fn process_stream_slices_canister_migration_in_both_subnets_success() { // ...one message @34 not gc'ed... message_in_stream(state.get_stream(&REMOTE_SUBNET), 34).clone(), // ...a reject response for the request @154... - generate_reject_response( + generate_reject_response_for( + RejectReason::CanisterMigrating, request_in_slice(slices.get(&REMOTE_SUBNET), 154), - RejectCode::SysTransient, - format!( - "Canister {} is being migrated to/from {}", - *OTHER_LOCAL_CANISTER, CANISTER_MIGRATION_SUBNET - ), ), // ...a reject response for the request @156... - generate_reject_response( + generate_reject_response_for( + RejectReason::CanisterMigrating, request_in_slice(slices.get(&REMOTE_SUBNET), 156), - RejectCode::SysTransient, - format!( - "Canister {} is being migrated to/from {}", - *OTHER_LOCAL_CANISTER, CANISTER_MIGRATION_SUBNET - ), ), ], signals_end: 158, @@ -2861,6 +3794,28 @@ fn with_test_setup( with_test_setup_and_config( HypervisorConfig::default(), SubnetType::Application, + CURRENT_CERTIFICATION_VERSION, + stream_configs, + slice_configs, + test_impl, + ) +} + +/// Generates a legacy test setup. For details see `with_test_setup_and_config()`. +fn with_legacy_test_setup( + stream_configs: BTreeMap>>, + slice_configs: BTreeMap>>, + test_impl: impl FnOnce( + StreamHandlerImpl, + ReplicatedState, + BTreeMap, + MetricsFixture, + ), +) { + with_test_setup_and_config( + HypervisorConfig::default(), + SubnetType::Application, + CertificationVersion::V18, stream_configs, slice_configs, test_impl, @@ -2876,6 +3831,7 @@ fn with_test_setup( fn with_test_setup_and_config( hypervisor_config: HypervisorConfig, subnet_type: SubnetType, + certification_version: CertificationVersion, stream_configs: BTreeMap>>, slice_configs: BTreeMap>>, test_impl: impl FnOnce( @@ -2888,8 +3844,7 @@ fn with_test_setup_and_config( with_test_replica_logger(|log| { // Generate an empty `ReplicatedState` for `LOCAL_SUBNET`. let mut state = ReplicatedState::new(LOCAL_SUBNET, subnet_type); - state.metadata.certification_version = - ic_certification_version::CURRENT_CERTIFICATION_VERSION; + state.metadata.certification_version = certification_version; let metrics_registry = MetricsRegistry::new(); let stream_handler = StreamHandlerImpl::new( LOCAL_SUBNET, @@ -3045,17 +4000,32 @@ fn with_local_test_setup( ); } +/// Generates a local test setup, i.e. without incoming stream slices. +/// For details see `with_test_setup_and_config()`. +fn with_legacy_local_test_setup( + stream_configs: BTreeMap>>, + test_impl: impl FnOnce(StreamHandlerImpl, ReplicatedState, MetricsFixture), +) { + with_legacy_test_setup( + stream_configs, + btreemap![], + |stream_handler, state, _, metrics| test_impl(stream_handler, state, metrics), + ); +} + /// Generates a local test setup, i.e. without incoming stream slices. /// For details see `with_test_setup_and_config()`. fn with_local_test_setup_and_config( hypervisor_config: HypervisorConfig, subnet_type: SubnetType, + certification_version: CertificationVersion, stream_configs: BTreeMap>>, test_impl: impl FnOnce(StreamHandlerImpl, ReplicatedState, MetricsFixture), ) { with_test_setup_and_config( hypervisor_config, subnet_type, + certification_version, stream_configs, btreemap![], |stream_handler, state, _, metrics| test_impl(stream_handler, state, metrics), @@ -3260,7 +4230,10 @@ fn messages_in_slice( } /// Pushes the messages yielded by `iter` into the `state`. -fn push_inputs<'a>(state: &mut ReplicatedState, iter: impl Iterator) { +fn push_inputs<'a>( + state: &mut ReplicatedState, + iter: impl IntoIterator, +) { for msg in iter { state.push_input(msg.clone(), &mut (i64::MAX / 2)).unwrap(); } diff --git a/rs/messaging/tests/memory_tests.rs b/rs/messaging/tests/memory_tests.rs index 856f004080c..202a7392032 100644 --- a/rs/messaging/tests/memory_tests.rs +++ b/rs/messaging/tests/memory_tests.rs @@ -8,14 +8,14 @@ use ic_config::{ use ic_registry_routing_table::{routing_table_insert_subnet, RoutingTable}; use ic_registry_subnet_type::SubnetType; use ic_replicated_state::ReplicatedState; -use ic_state_machine_tests::{StateMachine, StateMachineBuilder, StateMachineConfig}; +use ic_state_machine_tests::{ + MessageId, StateMachine, StateMachineBuilder, StateMachineConfig, UserError, +}; use ic_test_utilities_types::ids::{SUBNET_0, SUBNET_1}; use ic_types::{messages::MAX_INTER_CANISTER_PAYLOAD_IN_BYTES_U64, Cycles}; use proptest::prelude::*; -use random_traffic_test::{ - extract_metrics, Config as CanisterConfig, Metrics as CanisterMetrics, Record, -}; -use std::collections::BTreeMap; +use random_traffic_test::{extract_metrics, Config as CanisterConfig, Record as CanisterRecord}; +use std::collections::{BTreeMap, BTreeSet}; use std::sync::Arc; const LOCAL_SUBNET_ID: SubnetId = SUBNET_0; @@ -53,8 +53,11 @@ proptest! { /// In the first phase a number of rounds are executed on both subnets, including XNet traffic with /// 'chatter' enabled, i.e. the installed canisters are making random calls (including downstream calls). /// -/// For the second phase, the 'chatter' is disabled and additional rounds are executed until all -/// calls have received a reply. +/// For the second phase, the 'chatter' is disabled by putting a canister into `Stopping` state +/// every 10 rounds. In addition to shutting down traffic altogether from that canister (including +/// downstream calls) this will also induce a lot asychnronous rejections for requests. If any +/// canister fails to reach `Stopped` state (i.e. no hanging calls), something went wrong in +/// message routing, most likely a bug connected to reject signals for requests. /// /// Checks that the guaranteed response message memory never exceeds the limit; that all calls eventually /// receive a reply (or were rejected synchronously when issued); and that the message memory goes @@ -116,8 +119,25 @@ fn check_guaranteed_response_message_memory_limits_are_respected_impl( )?; } - // Stop chatter on all canisters. - fixture.stop_chatter().unwrap(); + // Shut down chatter by putting a canister into `Stopping` state every 10 ticks until they are + // all `Stopping` or `Stopped`. + for canister in fixture.canisters().into_iter() { + // The max calls per heartbeat are set to 0 here, because the canister has to be started + // to query it's records. This is to make sure the canister doesn't start making calls + // immediately before we can get its records. + fixture.set_max_calls_per_heartbeat(canister, 0).unwrap(); + fixture.stop_canister_non_blocking(canister); + for _ in 0..10 { + fixture.tick(); + + // Check message memory limits are respected. + fixture.expect_guaranteed_response_message_memory_taken_at_most( + "Shutdown", + LOCAL_MESSAGE_MEMORY_CAPACITY, + REMOTE_MESSAGE_MEMORY_CAPACITY, + )?; + } + } // Keep ticking until all calls are answered. for counter in 0.. { @@ -130,11 +150,7 @@ fn check_guaranteed_response_message_memory_limits_are_respected_impl( REMOTE_MESSAGE_MEMORY_CAPACITY, )?; - if fixture - .collect_metrics() - .into_iter() - .all(|(_canister, metrics)| metrics.hanging_calls == 0) - { + if fixture.open_call_contexts_count().values().sum::() == 0 { break; } @@ -146,15 +162,102 @@ fn check_guaranteed_response_message_memory_limits_are_respected_impl( // One extra tick to make sure everything is gc'ed. fixture.tick(); - // Check the system agrees on 'no hanging calls'. - if fixture.open_call_contexts_count().values().sum::() != 0 { - return fixture.failed_with_reason("found call contexts after shutdown phase"); + // Check the records agree on 'no hanging calls'. + if fixture + .canisters() + .into_iter() + .map(|canister| extract_metrics(&fixture.force_query_records(canister))) + .any(|metrics| metrics.hanging_calls != 0) + { + return fixture.failed_with_reason("found hanging calls in the records"); } // After the fact, all memory is freed and back to 0. fixture.expect_guaranteed_response_message_memory_taken_at_most("Final check", 0, 0) } +/// Runs a state machine test with two subnets, a local subnet with 2 canisters installed and a +/// remote subnet with 5 canisters installed. All canisters, except one local canister referred to +/// as `migrating_canister`, are stopped. +/// +/// In the first phase a number of rounds are executed on both subnets, including XNet traffic with +/// the `migrating_canister` making random calls to all installed canisters (since all calls are +/// rejected except those to self, downstream calls are disabled). +/// +/// For the second phase, `migrating_canister` stops making calls and is then migrated to the +/// remote subnet. Since all other canisters are stopped, there are bound to be a number of reject +/// signals for requests in the stream to the local_subnet. But since we migrated the `migrating_canister` +/// to the remote subnet, the locally generated reject responses fail to induct and are rerouted into the +/// stream to the remote subnet. The remote subnet eventually picks them up and inducts them into +/// `migrating_canister` leaving no hanging calls after some more rounds. +/// +/// If there are hanging calls after a threshold number of rounds, there is most likely a bug +/// connected to reject signals for requests, specifically with the corresponding exceptions due to +/// canister migration. +#[test] +fn check_calls_conclude_with_migrating_canister() { + // The number of rounds to execute while the migrating canister is making calls. + const BUILDUP_PHASE_ROUND_COUNT: u64 = 10; + // The maximum number of rounds to execute after chatter is turned off. It it takes more than + // this number of rounds until there are no more hanging calls, the test fails. + const SHUTDOWN_PHASE_MAX_ROUNDS: u64 = 300; + + let mut fixture = Fixture::new(FixtureConfig { + local_canisters_count: 2, + remote_canisters_count: 5, + ..FixtureConfig::default() + }); + + let migrating_canister = *fixture.local_canisters.first().unwrap(); + let config = CanisterConfig::try_new( + fixture.canisters(), // receivers + 0..=0, // call_bytes + 0..=0, // reply_bytes + 0..=0, // instructions_count + ) + .unwrap(); + fixture.set_config(migrating_canister, config).unwrap(); + + fixture.seed_rng(migrating_canister, 73); + fixture.set_reply_weight(migrating_canister, 1).unwrap(); + fixture.set_call_weight(migrating_canister, 0).unwrap(); + fixture + .set_max_calls_per_heartbeat(migrating_canister, 10) + .unwrap(); + + // Stop all canisters except `migrating_canister`. + for canister in fixture.canisters() { + if canister != migrating_canister { + fixture.stop_canister_non_blocking(canister); + } + } + // Make calls on `migrating_canister`. + for _ in 0..BUILDUP_PHASE_ROUND_COUNT { + fixture.tick(); + } + + // Stop making calls and migrate `migrating_canister`. + fixture + .set_max_calls_per_heartbeat(migrating_canister, 0) + .unwrap(); + fixture.migrate_canister(migrating_canister); + + // Tick until all calls have concluded. + for counter in 0.. { + fixture.tick(); + if fixture.open_call_contexts_count().values().sum::() == 0 { + break; + } + assert!(counter < SHUTDOWN_PHASE_MAX_ROUNDS); + } + + // Check that the records agree on 'no hanging calls'. + assert_eq!( + 0, + extract_metrics(&fixture.force_query_records(migrating_canister)).hanging_calls + ); +} + #[derive(Debug)] struct FixtureConfig { local_canisters_count: u64, @@ -223,9 +326,9 @@ impl FixtureConfig { #[derive(Debug, Clone)] struct Fixture { pub local_env: Arc, - pub local_canisters: Vec, + pub local_canisters: BTreeSet, pub remote_env: Arc, - pub remote_canisters: Vec, + pub remote_canisters: BTreeSet, } impl Fixture { @@ -357,35 +460,43 @@ impl Fixture { Ok(()) } - /// Sets `call_per_round` on all canisters to 0; sets all call weights to 0. + /// Starts `canister`. /// - /// This sets the total new calls made on `Self` to 0 whether they are made from the heartbeat - /// or recursively as a downstream call. - pub fn stop_chatter(&self) -> Result<(), ()> { - self.start_chatter(0)?; - for canister in self.canisters() { - self.set_call_weight(canister, 0)?; - } - Ok(()) + /// Panics if `canister` is not installed in `Self`. + pub fn start_canister(&self, canister: CanisterId) { + self.get_env(&canister).start_canister(canister).unwrap(); + } + + /// Puts `canister` into `Stopping` state. + /// + /// This function is asynchronous. It returns the ID of the ingress message + /// that can be awaited later with [await_ingress]. + /// + /// Panics if `canister` is not installed in `Self`. + pub fn stop_canister_non_blocking(&self, canister: CanisterId) -> MessageId { + self.get_env(&canister).stop_canister_non_blocking(canister) } /// Queries the records from `canister`. /// /// Panics if `canister` is not installed in `Self`. - pub fn query_records(&self, canister: CanisterId) -> Vec { - let reply = self - .get_env(&canister) - .query(canister, "records", vec![]) - .unwrap(); - candid::Decode!(&reply.bytes(), Vec).unwrap() + pub fn query_records(&self, canister: CanisterId) -> Result, UserError> { + let reply = self.get_env(&canister).query(canister, "records", vec![])?; + Ok(candid::Decode!(&reply.bytes(), Vec).unwrap()) } - /// Collects the metrics for all installed canisters on the fixture. - pub fn collect_metrics(&self) -> BTreeMap { - self.canisters() - .into_iter() - .map(|canister| (canister, extract_metrics(&self.query_records(canister)))) - .collect() + /// Force queries the records from `canister` by first attempting to query them; if it fails, start + /// the canister and try querying them again. + /// + /// Panics if `canister` is not installed in `Self`. + pub fn force_query_records(&self, canister: CanisterId) -> Vec { + match self.query_records(canister) { + Err(_) => { + self.start_canister(canister); + self.query_records(canister).unwrap() + } + Ok(records) => records, + } } /// Return the number of bytes taken by guaranteed response memory (`local_env`, `remote_env`). @@ -469,6 +580,46 @@ impl Fixture { } } + /// Migrates `canister` between `local_env` and `remote_env` (either direction). + /// + /// Panics if no such canister exists. + pub fn migrate_canister(&mut self, canister: CanisterId) { + fn move_canister( + canister: CanisterId, + from_env: &StateMachine, + from_subnet: SubnetId, + to_env: &StateMachine, + to_subnet: SubnetId, + ) { + for env in [from_env, to_env] { + env.prepare_canister_migrations(canister..=canister, from_subnet, to_subnet); + env.reroute_canister_range(canister..=canister, to_subnet); + } + from_env.move_canister_state_to(to_env, canister).unwrap(); + } + + if self.local_canisters.remove(&canister) { + move_canister( + canister, + &self.local_env, + LOCAL_SUBNET_ID, + &self.remote_env, + REMOTE_SUBNET_ID, + ); + self.remote_canisters.insert(canister); + } else { + move_canister( + canister, + &self.remote_env, + REMOTE_SUBNET_ID, + &self.local_env, + LOCAL_SUBNET_ID, + ); + assert!(self.remote_canisters.remove(&canister)); + self.local_canisters.insert(canister); + } + } + /// Returns the canister records, the latest local state and the latest remote state. pub fn failed_with_reason(&self, reason: impl Into) -> Result<(), (String, DebugInfo)> { Err(( @@ -477,7 +628,7 @@ impl Fixture { records: self .canisters() .into_iter() - .map(|canister| (canister, self.query_records(canister))) + .map(|canister| (canister, self.force_query_records(canister))) .collect(), latest_local_state: self.local_env.get_latest_state(), latest_remote_state: self.remote_env.get_latest_state(), @@ -489,7 +640,7 @@ impl Fixture { /// Returned by `Fixture::failed_with_reason()`. #[allow(dead_code)] struct DebugInfo { - pub records: BTreeMap>, + pub records: BTreeMap>, pub latest_local_state: Arc, pub latest_remote_state: Arc, } diff --git a/rs/replicated_state/src/replicated_state.rs b/rs/replicated_state/src/replicated_state.rs index a9a03804533..2a616196cfb 100644 --- a/rs/replicated_state/src/replicated_state.rs +++ b/rs/replicated_state/src/replicated_state.rs @@ -98,12 +98,6 @@ pub enum StateError { /// Canister is stopping, only accepting responses. CanisterStopping(CanisterId), - /// Message enqueuing failed due to canister migration. - CanisterMigrating { - canister_id: CanisterId, - host_subnet: SubnetId, - }, - /// Message enqueuing failed due to full in/out queue. QueueFull { capacity: usize }, @@ -259,7 +253,6 @@ impl PeekableOutputIterator for OutputIterator<'_> { } } -pub const LABEL_VALUE_CANISTER_MIGRATING: &str = "CanisterMigrating"; pub const LABEL_VALUE_QUEUE_FULL: &str = "QueueFull"; pub const LABEL_VALUE_OUT_OF_MEMORY: &str = "OutOfMemory"; pub const LABEL_VALUE_INVALID_RESPONSE: &str = "InvalidResponse"; @@ -273,7 +266,6 @@ impl StateError { StateError::CanisterNotFound(_) => LABEL_VALUE_CANISTER_NOT_FOUND, StateError::CanisterStopped(_) => LABEL_VALUE_CANISTER_STOPPED, StateError::CanisterStopping(_) => LABEL_VALUE_CANISTER_STOPPING, - StateError::CanisterMigrating { .. } => LABEL_VALUE_CANISTER_MIGRATING, StateError::QueueFull { .. } => LABEL_VALUE_QUEUE_FULL, StateError::OutOfMemory { .. } => LABEL_VALUE_OUT_OF_MEMORY, StateError::NonMatchingResponse { .. } => LABEL_VALUE_INVALID_RESPONSE, @@ -310,9 +302,6 @@ impl std::fmt::Display for StateError { StateError::CanisterStopping(canister_id) => { write!(f, "Canister {} is stopping", canister_id) } - StateError::CanisterMigrating { canister_id, host_subnet } => { - write!(f, "Canister {} is being migrated to/from {}", canister_id, host_subnet) - } StateError::QueueFull { capacity } => { write!(f, "Maximum queue capacity {} reached", capacity) }