Skip to content

Commit

Permalink
feat: [MR-579] Enable reject signals for requests (#1051)
Browse files Browse the repository at this point in the history
Enables reject signals for requests for `CertificationVersion::V19` and
above; keeps generating reject responses for versions below that.

Adds new tests as modification of the old tests; old tests are kept
around with a `legacy_` prefix. Adapts one state machine test such that
canisters are stopped rather than just make them stop making new calls,
this provokes reject signals for requests thereby giving us a high level
check on this functionality while leaving the test intact otherwise.
Adds one new state machine test for a canister migration to check all
calls are concluded for this scenario.
  • Loading branch information
stiegerc authored Oct 21, 2024
1 parent 1eae28c commit f55a756
Show file tree
Hide file tree
Showing 4 changed files with 1,363 additions and 243 deletions.
87 changes: 47 additions & 40 deletions rs/messaging/src/routing/stream_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -615,14 +615,14 @@ impl StreamHandlerImpl {
state,
available_guaranteed_response_memory,
) {
Ok(()) => {
None => {
// Reject response successfully inducted or dropped.
}
Err((StateError::CanisterMigrating { .. }, reject_response)) => {
Some((RejectReason::CanisterMigrating, reject_response)) => {
// Canister is being migrated, reroute reject response.
reroute_response(reject_response, state, streams, &self.log);
}
Err(_) => {
Some(_) => {
unreachable!(
"Errors other than `CanisterMigrating` shouldn't be possible."
);
Expand Down Expand Up @@ -740,35 +740,27 @@ impl StreamHandlerImpl {
state,
available_guaranteed_response_memory,
) {
Ok(()) => {
None => {
// Message successfully inducted or dropped.
stream.push_accept_signal();
}
Err((StateError::CanisterMigrating { .. }, RequestOrResponse::Response(_))) => {
// Unable to deliver a response due to migrating canister, push reject signal.
stream.push_reject_signal(RejectReason::CanisterMigrating);
}
Err((err, RequestOrResponse::Request(request))) => {
// TODO(MR-249): Replace generating reject responses with pushing reject signals.
Some((reason, RequestOrResponse::Request(request)))
if state.metadata.certification_version < CertificationVersion::V19 =>
{
// Unable to induct a request, generate reject response and push it into `stream`.
let code = match err {
StateError::CanisterNotFound(_) => RejectCode::DestinationInvalid,
StateError::CanisterStopped(_) => RejectCode::CanisterError,
StateError::CanisterStopping(_) => RejectCode::CanisterError,
StateError::CanisterMigrating { .. } => RejectCode::SysTransient,
StateError::QueueFull { .. } => RejectCode::SysTransient,
StateError::OutOfMemory { .. } => RejectCode::CanisterError,
StateError::NonMatchingResponse { .. }
| StateError::BitcoinNonMatchingResponse { .. } => {
unreachable!("Not a user error: {}", err);
}
};
*available_guaranteed_response_memory -=
stream.push(generate_reject_response(&request, code, err.to_string()))
as i64;
stream.push_accept_signal()
stream.push(generate_reject_response_for(reason, &request)) as i64;
stream.push_accept_signal();
}
Err((_, RequestOrResponse::Response(_))) => {
Some((reason, RequestOrResponse::Request(_))) => {
// Unable to induct a request, push a reject signal.
stream.push_reject_signal(reason);
}
Some((RejectReason::CanisterMigrating, RequestOrResponse::Response(_))) => {
// Unable to deliver a response due to migrating canister, push reject signal.
stream.push_reject_signal(RejectReason::CanisterMigrating);
}
Some((_, RequestOrResponse::Response(_))) => {
unreachable!("No signals are generated for response induction failures except for CanisterMigrating");
}
}
Expand All @@ -792,13 +784,18 @@ impl StreamHandlerImpl {
}
}

/// Inducts a message into `state`. There are 3 possible outcomes:
/// - `msg` successfully inducted, returns `None`.
/// - `msg` silently dropped (e.g. late best-effort response), `returns None`.
/// - Failed to induct `msg` (error or canister migrating), returns `(RejectReason, RequestOrResponse)`.
/// Caller is expected to produce a reject response or a reject signal.
fn induct_message_impl(
&self,
msg: RequestOrResponse,
msg_type: &str,
state: &mut ReplicatedState,
available_guaranteed_response_memory: &mut i64,
) -> Result<(), (StateError, RequestOrResponse)> {
) -> Option<(RejectReason, RequestOrResponse)> {
// Subnet that should have received the message according to the routing table.
let receiver_host_subnet = state
.metadata
Expand All @@ -823,13 +820,27 @@ impl StreamHandlerImpl {

match msg {
RequestOrResponse::Request(ref request) => {
let reason = match err {
StateError::CanisterNotFound(_) => {
RejectReason::CanisterNotFound
}
StateError::CanisterStopped(_) => RejectReason::CanisterStopped,
StateError::CanisterStopping(_) => {
RejectReason::CanisterStopping
}
StateError::QueueFull { .. } => RejectReason::QueueFull,
StateError::OutOfMemory { .. } => RejectReason::OutOfMemory,
// Unreachable.
StateError::NonMatchingResponse { .. }
| StateError::BitcoinNonMatchingResponse { .. } => {
RejectReason::Unknown
}
};
debug!(
self.log,
"Induction failed with error '{}', generating reject Response for {:?}",
&err,
&request
"Inducting request failed: {}\n{:?}", &err, &request
);
return Err((err, msg));
return Some((reason, msg));
}
RequestOrResponse::Response(response) => {
// Critical error, responses should always be inducted successfully.
Expand All @@ -850,30 +861,26 @@ impl StreamHandlerImpl {
// Receiver canister is migrating to/from this subnet.
Some(host_subnet) if self.should_reroute_message_to(&msg, host_subnet, state) => {
self.observe_inducted_message_status(msg_type, LABEL_VALUE_CANISTER_MIGRATED);
let err = StateError::CanisterMigrating {
canister_id: msg.receiver(),
host_subnet,
};

match &msg {
RequestOrResponse::Request(request) => {
debug!(
self.log,
"Canister {} is being migrated, generating reject response for {:?}",
"Inducting request failed: Canister {} is migrating\n{:?}",
request.receiver,
request
);
return Err((err, msg));
return Some((RejectReason::CanisterMigrating, msg));
}
RequestOrResponse::Response(response) => {
if state.metadata.certification_version >= CertificationVersion::V9 {
debug!(
self.log,
"Canister {} is being migrated, generating reject signal for {:?}",
"Inducting response failed: Canister {} is migrating\n{:?}",
response.originator,
response
);
return Err((err, msg));
return Some((RejectReason::CanisterMigrating, msg));
} else {
fatal!(
self.log,
Expand Down Expand Up @@ -906,7 +913,7 @@ impl StreamHandlerImpl {
}

// Any reject signals generated were returned before this point.
Ok(())
None
}

/// Checks whether `actual_subnet_id` is a valid host subnet for `msg.sender()`
Expand Down
Loading

0 comments on commit f55a756

Please sign in to comment.