diff --git a/rs/execution_environment/benches/scheduler.rs b/rs/execution_environment/benches/scheduler.rs index 872cfa3a40b..0def9d106db 100644 --- a/rs/execution_environment/benches/scheduler.rs +++ b/rs/execution_environment/benches/scheduler.rs @@ -37,6 +37,7 @@ fn main() { let round_schedule = RoundSchedule::new( scheduler_cores, long_execution_cores, + 0, ordered_new_execution_canister_ids, ordered_long_execution_canister_ids, ); diff --git a/rs/execution_environment/src/scheduler.rs b/rs/execution_environment/src/scheduler.rs index 0289bda4097..d4aff4789ec 100644 --- a/rs/execution_environment/src/scheduler.rs +++ b/rs/execution_environment/src/scheduler.rs @@ -271,8 +271,8 @@ impl SchedulerImpl { // `(compute_capacity - total_compute_allocation) * multiplier / number_of_canisters` // can be simplified to just // `(compute_capacity - total_compute_allocation) * scheduler_cores` - let free_capacity_per_canister = (compute_capacity_percent - .saturating_sub(total_compute_allocation_percent)) + let free_capacity_per_canister = compute_capacity_percent + .saturating_sub(total_compute_allocation_percent) * scheduler_cores as i64; // Fully divide the free allocation across all canisters. @@ -344,6 +344,7 @@ impl SchedulerImpl { let round_schedule = RoundSchedule::new( scheduler_cores, long_execution_cores, + total_compute_allocation_percent, round_states .iter() .skip(number_of_long_executions) @@ -643,7 +644,7 @@ impl SchedulerImpl { scheduler_round_limits: &mut SchedulerRoundLimits, registry_settings: &RegistryExecutionSettings, idkg_subnet_public_keys: &BTreeMap, - ) -> (ReplicatedState, BTreeSet) { + ) -> (ReplicatedState, BTreeSet, BTreeSet) { let measurement_scope = MeasurementScope::nested(&self.metrics.round_inner, root_measurement_scope); let mut ingress_execution_results = Vec::new(); @@ -654,6 +655,9 @@ impl SchedulerImpl { let mut heartbeat_and_timer_canister_ids = BTreeSet::new(); let mut round_executed_canister_ids = BTreeSet::new(); + // The set of canisters marked as fully executed: have no messages to execute + // or were scheduled first on a core. + let mut round_fully_executed_canister_ids = BTreeSet::new(); // Start iteration loop: // - Execute subnet messages. @@ -725,6 +729,7 @@ impl SchedulerImpl { let ( active_canisters, executed_canister_ids, + fully_executed_canister_ids, mut loop_ingress_execution_results, heap_delta, ) = self.execute_canisters_in_inner_round( @@ -739,9 +744,10 @@ impl SchedulerImpl { ); let instructions_consumed = instructions_before - round_limits.instructions; drop(execution_timer); - round_executed_canister_ids.extend(executed_canister_ids); let finalization_timer = self.metrics.round_inner_iteration_fin.start_timer(); + round_executed_canister_ids.extend(executed_canister_ids); + round_fully_executed_canister_ids.extend(fully_executed_canister_ids); total_heap_delta += heap_delta; state.metadata.heap_delta_estimate += heap_delta; @@ -847,7 +853,11 @@ impl SchedulerImpl { .heap_delta_rate_limited_canisters_per_round .observe(round_filtered_canisters.rate_limited_canister_ids.len() as f64); - (state, round_filtered_canisters.active_canister_ids) + ( + state, + round_filtered_canisters.active_canister_ids, + round_fully_executed_canister_ids, + ) } /// Executes canisters in parallel using the thread pool. @@ -874,6 +884,7 @@ impl SchedulerImpl { ) -> ( Vec, BTreeSet, + Vec, Vec<(MessageId, IngressStatus)>, NumBytes, ) { @@ -887,6 +898,7 @@ impl SchedulerImpl { canisters_by_thread.into_iter().flatten().collect(), BTreeSet::new(), vec![], + vec![], NumBytes::from(0), ); } @@ -950,6 +962,7 @@ impl SchedulerImpl { // Aggregate `results_by_thread` to get the result of this function. let mut canisters = Vec::new(); let mut executed_canister_ids = BTreeSet::new(); + let mut fully_executed_canister_ids = vec![]; let mut ingress_results = Vec::new(); let mut total_instructions_executed = NumInstructions::from(0); let mut max_instructions_executed_per_thread = NumInstructions::from(0); @@ -957,6 +970,7 @@ impl SchedulerImpl { for mut result in results_by_thread.into_iter() { canisters.append(&mut result.canisters); executed_canister_ids.extend(result.executed_canister_ids); + fully_executed_canister_ids.extend(result.fully_executed_canister_ids); ingress_results.append(&mut result.ingress_results); let instructions_executed = as_num_instructions( round_limits_per_thread.instructions - result.round_limits.instructions, @@ -990,6 +1004,7 @@ impl SchedulerImpl { ( canisters, executed_canister_ids, + fully_executed_canister_ids, ingress_results, heap_delta, ) @@ -1651,7 +1666,7 @@ impl Scheduler for SchedulerImpl { }; // Inner round. - let (mut state, active_canister_ids) = self.inner_round( + let (mut state, active_canister_ids, fully_executed_canister_ids) = self.inner_round( state, &mut csprng, &round_schedule, @@ -1812,6 +1827,10 @@ impl Scheduler for SchedulerImpl { .num_canister_snapshots .set(final_state.canister_snapshots.count() as i64); } + round_schedule.finish_round( + &mut final_state.canister_states, + fully_executed_canister_ids, + ); self.finish_round(&mut final_state, current_round_type); final_state .metadata @@ -1887,6 +1906,7 @@ fn observe_instructions_consumed_per_message( struct ExecutionThreadResult { canisters: Vec, executed_canister_ids: BTreeSet, + fully_executed_canister_ids: Vec, ingress_results: Vec<(MessageId, IngressStatus)>, slices_executed: NumSlices, messages_executed: NumMessages, @@ -1924,6 +1944,7 @@ fn execute_canisters_on_thread( // These variables accumulate the results and will be returned at the end. let mut canisters = vec![]; let mut executed_canister_ids = BTreeSet::new(); + let mut fully_executed_canister_ids = vec![]; let mut ingress_results = vec![]; let mut total_slices_executed = NumSlices::from(0); let mut total_messages_executed = NumMessages::from(0); @@ -2036,18 +2057,13 @@ fn execute_canisters_on_thread( if let Some(es) = &mut canister.execution_state { es.last_executed_round = round_id; } - let full_message_execution = match canister.next_execution() { - NextExecution::None => true, - NextExecution::StartNew => false, - // We just finished a full slice of executions. - NextExecution::ContinueLong | NextExecution::ContinueInstallCode => true, - }; - let scheduled_first = is_first_iteration && rank == 0; - if full_message_execution || scheduled_first { - // The very first canister is considered to have a full execution round for - // scheduling purposes even if it did not complete within the round. - canister.scheduler_state.last_full_execution_round = round_id; - } + RoundSchedule::finish_canister_execution( + &mut canister, + &mut fully_executed_canister_ids, + round_id, + is_first_iteration, + rank, + ); canister.system_state.canister_metrics.executed += 1; canisters.push(canister); round_limits.instructions -= @@ -2057,6 +2073,7 @@ fn execute_canisters_on_thread( ExecutionThreadResult { canisters, executed_canister_ids, + fully_executed_canister_ids, ingress_results, slices_executed: total_slices_executed, messages_executed: total_messages_executed, diff --git a/rs/execution_environment/src/scheduler/round_schedule.rs b/rs/execution_environment/src/scheduler/round_schedule.rs index 13e21c48848..f8b30150a0d 100644 --- a/rs/execution_environment/src/scheduler/round_schedule.rs +++ b/rs/execution_environment/src/scheduler/round_schedule.rs @@ -1,9 +1,9 @@ -use std::collections::{BTreeMap, HashMap}; +use std::collections::{BTreeMap, BTreeSet, HashMap}; use ic_base_types::{CanisterId, NumBytes}; use ic_config::flag_status::FlagStatus; use ic_replicated_state::{canister_state::NextExecution, CanisterState}; -use ic_types::{AccumulatedPriority, ComputeAllocation, LongExecutionMode}; +use ic_types::{AccumulatedPriority, ComputeAllocation, ExecutionRound, LongExecutionMode}; /// Round metrics required to prioritize a canister. #[derive(Clone, Debug)] @@ -42,6 +42,8 @@ pub struct RoundSchedule { pub scheduler_cores: usize, /// Number of cores dedicated for long executions. pub long_execution_cores: usize, + // Sum of all canisters compute allocation in percent. + pub total_compute_allocation_percent: i64, /// Ordered Canister IDs with new executions. pub ordered_new_execution_canister_ids: Vec, /// Ordered Canister IDs with long executions. @@ -52,6 +54,7 @@ impl RoundSchedule { pub fn new( scheduler_cores: usize, long_execution_cores: usize, + total_compute_allocation_percent: i64, ordered_new_execution_canister_ids: Vec, ordered_long_execution_canister_ids: Vec, ) -> Self { @@ -59,6 +62,7 @@ impl RoundSchedule { scheduler_cores, long_execution_cores: long_execution_cores .min(ordered_long_execution_canister_ids.len()), + total_compute_allocation_percent, ordered_new_execution_canister_ids, ordered_long_execution_canister_ids, } @@ -174,6 +178,7 @@ impl RoundSchedule { RoundSchedule::new( self.scheduler_cores, self.long_execution_cores, + self.total_compute_allocation_percent, ordered_new_execution_canister_ids, ordered_long_execution_canister_ids, ), @@ -229,4 +234,64 @@ impl RoundSchedule { (canisters_partitioned_by_cores, canisters) } + + pub fn finish_canister_execution( + canister: &mut CanisterState, + fully_executed_canister_ids: &mut Vec, + round_id: ExecutionRound, + is_first_iteration: bool, + rank: usize, + ) { + let full_message_execution = match canister.next_execution() { + NextExecution::None => true, + NextExecution::StartNew => false, + // We just finished a full slice of executions. + NextExecution::ContinueLong => true, + NextExecution::ContinueInstallCode => false, + }; + let scheduled_first = is_first_iteration && rank == 0; + + // The very first canister is considered to have a full execution round for + // scheduling purposes even if it did not complete within the round. + if full_message_execution || scheduled_first { + canister.scheduler_state.last_full_execution_round = round_id; + + // We schedule canisters (as opposed to individual messages), + // and we charge for every full execution round. + fully_executed_canister_ids.push(canister.canister_id()); + } + } + + pub(crate) fn finish_round( + &self, + canister_states: &mut BTreeMap, + fully_executed_canister_ids: BTreeSet, + ) { + let scheduler_cores = self.scheduler_cores; + let number_of_canisters = canister_states.len(); + let multiplier = (scheduler_cores * number_of_canisters).max(1) as i64; + + // Charge canisters for full executions in this round. + let mut total_charged_priority = 0; + for canister_id in fully_executed_canister_ids { + if let Some(canister) = canister_states.get_mut(&canister_id) { + total_charged_priority += 100 * multiplier; + canister.scheduler_state.priority_credit += (100 * multiplier).into(); + } + } + + let total_allocated = self.total_compute_allocation_percent * multiplier; + // Free capacity per canister in multiplied percent. + let free_capacity_per_canister = total_charged_priority.saturating_sub(total_allocated) + / number_of_canisters.max(1) as i64; + // Fully divide the free allocation across all canisters. + for canister in canister_states.values_mut() { + // De-facto compute allocation includes bonus allocation + let factual = canister.scheduler_state.compute_allocation.as_percent() as i64 + * multiplier + + free_capacity_per_canister; + // Increase accumulated priority by de-facto compute allocation. + canister.scheduler_state.accumulated_priority += factual.into(); + } + } } diff --git a/rs/execution_environment/src/scheduler/tests.rs b/rs/execution_environment/src/scheduler/tests.rs index eafe856374c..9377b347341 100644 --- a/rs/execution_environment/src/scheduler/tests.rs +++ b/rs/execution_environment/src/scheduler/tests.rs @@ -5921,3 +5921,96 @@ fn inner_round_long_execution_is_a_full_execution() { // The accumulated priority invariant should be respected. assert_eq!(total_accumulated_priority - total_priority_credit, 0); } + +#[test_strategy::proptest(ProptestConfig { cases: 8, ..ProptestConfig::default() })] +fn charge_canisters_for_full_execution(#[strategy(2..10_usize)] scheduler_cores: usize) { + let instructions = 20; + let messages_per_round = 2; + let mut test = SchedulerTestBuilder::new() + .with_scheduler_config(SchedulerConfig { + scheduler_cores, + max_instructions_per_round: (instructions * messages_per_round).into(), + max_instructions_per_message: instructions.into(), + max_instructions_per_message_without_dts: instructions.into(), + max_instructions_per_slice: instructions.into(), + instruction_overhead_per_execution: 0.into(), + instruction_overhead_per_canister: 0.into(), + instruction_overhead_per_canister_for_finalization: 0.into(), + ..SchedulerConfig::application_subnet() + }) + .build(); + + // Bump up the round number. + test.execute_round(ExecutionRoundType::OrdinaryRound); + + // Create `messages_per_round * 2` canisters for each scheduler core. + let num_canisters = scheduler_cores as u64 * messages_per_round * 2; + let mut canister_ids = vec![]; + for _ in 0..num_canisters { + let canister_id = test.create_canister(); + // Send one messages per canister. Having `max_messages_per_round * 2` canisters, + // only half of them will finish in one round. + test.send_ingress(canister_id, ingress(instructions)); + canister_ids.push(canister_id); + } + + test.execute_round(ExecutionRoundType::OrdinaryRound); + + let mut total_accumulated_priority = 0; + let mut total_priority_credit = 0; + for (i, canister) in test.state().canisters_iter().enumerate() { + if i < num_canisters as usize / 2 { + // The first half of the canisters should finish their messages. + prop_assert_eq!(canister.system_state.queues().ingress_queue_size(), 0); + prop_assert_eq!(canister.system_state.canister_metrics.executed, 1); + prop_assert_eq!( + canister.scheduler_state.last_full_execution_round, + test.last_round() + ); + } else { + // The second half of the canisters should still have their messages. + prop_assert_eq!(canister.system_state.queues().ingress_queue_size(), 1); + prop_assert_eq!(canister.system_state.canister_metrics.executed, 0); + prop_assert_eq!(canister.scheduler_state.last_full_execution_round, 0.into()); + } + total_accumulated_priority += canister.scheduler_state.accumulated_priority.get(); + total_priority_credit += canister.scheduler_state.priority_credit.get(); + } + prop_assert_eq!(total_accumulated_priority - total_priority_credit, 0); + + // Send one more message for first half of the canisters. + for (i, canister) in canister_ids.iter().enumerate() { + if i < num_canisters as usize / 2 { + test.send_ingress(*canister, ingress(instructions)); + } + } + + test.execute_round(ExecutionRoundType::OrdinaryRound); + + let mut total_accumulated_priority = 0; + let mut total_priority_credit = 0; + for (i, canister) in test.state().canisters_iter().enumerate() { + // Now all the canisters should be executed once. + prop_assert_eq!(canister.system_state.canister_metrics.executed, 1); + if i < num_canisters as usize / 2 { + // The first half of the canisters should have messages. + prop_assert_eq!(canister.system_state.queues().ingress_queue_size(), 1); + // The first half of the canisters should be executed two rounds ago. + prop_assert_eq!( + canister.scheduler_state.last_full_execution_round.get(), + test.last_round().get() - 1 + ); + } else { + // The second half of the canisters should finish their messages. + prop_assert_eq!(canister.system_state.queues().ingress_queue_size(), 0); + // The second half of the canisters should be executed in the last round. + prop_assert_eq!( + canister.scheduler_state.last_full_execution_round, + test.last_round() + ); + } + total_accumulated_priority += canister.scheduler_state.accumulated_priority.get(); + total_priority_credit += canister.scheduler_state.priority_credit.get(); + } + prop_assert_eq!(total_accumulated_priority - total_priority_credit, 0); +}