diff --git a/Cargo.lock b/Cargo.lock index 1c213c6efce..6d0d4d2af2a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10845,6 +10845,7 @@ dependencies = [ "prost", "rand 0.8.5", "rand_chacha 0.3.1", + "rayon", "scoped_threadpool", "serde", "serde_cbor", diff --git a/rs/canister_sandbox/src/lib.rs b/rs/canister_sandbox/src/lib.rs index a0894202273..dd98cd2f4dc 100644 --- a/rs/canister_sandbox/src/lib.rs +++ b/rs/canister_sandbox/src/lib.rs @@ -172,11 +172,11 @@ pub fn canister_sandbox_main() { .expect("Error from the sandbox process due to unknown embedder config."); // Currently Wasmtime uses the default rayon thread-pool with a thread per core. - // In production this results in 64 threads. This MR reduces the default - // thread pool size to 10 in the sandbox process because - // benchmarks show that 10 is the sweet spot. + // In production this results in 64 threads. The number of threads is set to 8, + // which is used for parallel page copying in the page allocator. + // The compilation rayon threads are now only used in the compiler sandbox. rayon::ThreadPoolBuilder::new() - .num_threads(EmbeddersConfig::default().num_rayon_compilation_threads) + .num_threads(EmbeddersConfig::default().num_rayon_page_allocator_threads) .build_global() .unwrap(); diff --git a/rs/config/src/embedders.rs b/rs/config/src/embedders.rs index 0643c9303d1..c75fbb48a65 100644 --- a/rs/config/src/embedders.rs +++ b/rs/config/src/embedders.rs @@ -39,6 +39,9 @@ pub(crate) const DEFAULT_COST_TO_COMPILE_WASM_INSTRUCTION: NumInstructions = /// The number of rayon threads used by wasmtime to compile wasm binaries const DEFAULT_WASMTIME_RAYON_COMPILATION_THREADS: usize = 10; +/// The number of rayon threads use for the parallel page copying optimization. +const DEFAULT_PAGE_ALLOCATOR_THREADS: usize = 8; + /// Sandbox process eviction does not activate if the number of sandbox /// processes is below this threshold. pub(crate) const DEFAULT_MIN_SANDBOX_COUNT: usize = 500; @@ -174,6 +177,9 @@ pub struct Config { /// The number of rayon threads used by wasmtime to compile wasm binaries pub num_rayon_compilation_threads: usize, + /// The number of the rayon threads used for the parallel page copying optimization. + pub num_rayon_page_allocator_threads: usize, + /// Flags to enable or disable features that are still experimental. pub feature_flags: FeatureFlags, @@ -237,6 +243,7 @@ impl Config { max_sum_exported_function_name_lengths: MAX_SUM_EXPORTED_FUNCTION_NAME_LENGTHS, cost_to_compile_wasm_instruction: DEFAULT_COST_TO_COMPILE_WASM_INSTRUCTION, num_rayon_compilation_threads: DEFAULT_WASMTIME_RAYON_COMPILATION_THREADS, + num_rayon_page_allocator_threads: DEFAULT_PAGE_ALLOCATOR_THREADS, feature_flags: FeatureFlags::const_default(), metering_type: MeteringType::New, stable_memory_dirty_page_limit: StableMemoryPageLimit { diff --git a/rs/replicated_state/BUILD.bazel b/rs/replicated_state/BUILD.bazel index 354bfe36f63..c2713a68f46 100644 --- a/rs/replicated_state/BUILD.bazel +++ b/rs/replicated_state/BUILD.bazel @@ -36,6 +36,7 @@ DEPENDENCIES = [ "@crate_index//:prometheus", "@crate_index//:rand", "@crate_index//:rand_chacha", + "@crate_index//:rayon", "@crate_index//:serde", "@crate_index//:slog", "@crate_index//:strum", diff --git a/rs/replicated_state/Cargo.toml b/rs/replicated_state/Cargo.toml index 4cd6fb50971..464b02968fb 100644 --- a/rs/replicated_state/Cargo.toml +++ b/rs/replicated_state/Cargo.toml @@ -36,6 +36,7 @@ maplit = "1.0.2" nix = { workspace = true } rand = { workspace = true } rand_chacha = { workspace = true } +rayon = { workspace = true } phantom_newtype = { path = "../phantom_newtype" } prometheus = { workspace = true } serde = { workspace = true } diff --git a/rs/replicated_state/src/page_map/page_allocator.rs b/rs/replicated_state/src/page_map/page_allocator.rs index 911ec7c1b6d..ab447bfb866 100644 --- a/rs/replicated_state/src/page_map/page_allocator.rs +++ b/rs/replicated_state/src/page_map/page_allocator.rs @@ -15,9 +15,14 @@ use mmap::{PageAllocatorId, PageAllocatorInner, PageInner}; pub use self::page_allocator_registry::PageAllocatorRegistry; use super::{FileDescriptor, FileOffset, PageAllocatorFileDescriptor}; +use ic_sys::PAGE_SIZE; static ALLOCATED_PAGES: PageCounter = PageCounter::new(); +/// Any allocation that is larger than this threshold will be copied in parallel +/// instead of the sequential code for smaller allocations. +const MIN_MEMORY_ALLOCATION_FOR_PARALLEL_COPY: usize = 64 * 1024 * 1024; + /// A clonable wrapper around a 4KiB memory page implementation. /// It is mostly immutable after creation with the only exception of `Buffer` /// modifying privately owned pages. The only way to create a page is via a @@ -84,6 +89,11 @@ impl PageAllocator { /// iterator. Knowing the page count beforehand allows the page allocator /// to optimize allocation. pub fn allocate(&self, pages: &[(PageIndex, &PageBytes)]) -> Vec<(PageIndex, Page)> { + // If the pages that need to be allocated and copied are more than MIN_MEMORY_ALLOCATION_FOR_PARALLEL_COPY, + // then we can call the fastpath allocator, which does parallel copying. + if pages.len() * PAGE_SIZE >= MIN_MEMORY_ALLOCATION_FOR_PARALLEL_COPY { + return PageAllocatorInner::allocate_fastpath(&self.0, pages); + } PageAllocatorInner::allocate(&self.0, pages) } diff --git a/rs/replicated_state/src/page_map/page_allocator/mmap.rs b/rs/replicated_state/src/page_map/page_allocator/mmap.rs index 02b6ed3495c..f3974abe5f3 100644 --- a/rs/replicated_state/src/page_map/page_allocator/mmap.rs +++ b/rs/replicated_state/src/page_map/page_allocator/mmap.rs @@ -12,6 +12,7 @@ use ic_sys::{page_bytes_from_ptr, PageBytes, PageIndex, PAGE_SIZE}; use ic_utils::deterministic_operations::deterministic_copy_from_slice; use libc::{c_void, close}; use nix::sys::mman::{madvise, mmap, munmap, MapFlags, MmapAdvise, ProtFlags}; +use rayon::prelude::*; use serde::{Deserialize, Serialize}; use std::os::raw::c_int; use std::os::unix::io::RawFd; @@ -187,6 +188,47 @@ impl PageAllocatorInner { .collect() } + // This is the same functionality as `allocate`, but it uses rayon to parallelize + // the copying of pages. This is useful when the number of pages is large. + pub fn allocate_fastpath( + page_allocator: &Arc, + pages: &[(PageIndex, &PageBytes)], + ) -> Vec<(PageIndex, Page)> { + let mut guard = page_allocator.core_allocator.lock().unwrap(); + let allocator_creator = || { + MmapBasedPageAllocatorCore::new(Arc::clone(page_allocator.fd_factory.as_ref().unwrap())) + }; + let core = guard.get_or_insert_with(allocator_creator); + // It would also be correct to increment the counters after all the + // allocations, but doing it before gives better performance because + // the core allocator can memory-map larger chunks. + ALLOCATED_PAGES.inc_by(pages.len()); + core.allocated_pages += pages.len(); + + // Collect page addresses to avoid locking and copying for every page. + // This enables parallel copying of pages. + let mut allocated_pages_vec: Vec<_> = pages + .iter() + .map(|(_, _)| core.allocate_page(page_allocator)) + .collect(); + + // Copy the contents of the pages in parallel using rayon parallel iterators. + // NB: the number of threads used are the same as the ones allocated when starting + // the sandbox, controlled by the embedders_config.num_rayon_page_allocator_threads. + allocated_pages_vec + .par_iter_mut() + .zip(pages.par_iter()) + .for_each(|(allocated_page, delta_page)| { + allocated_page.copy_from_slice(0, delta_page.1); + }); + + allocated_pages_vec + .into_iter() + .zip(pages) + .map(|(allocated_page, delta_page)| (delta_page.0, Page(Arc::new(allocated_page)))) + .collect() + } + // See the comments of the corresponding method in `PageAllocator`. pub fn serialize(&self) -> PageAllocatorSerialization { let mut guard = self.core_allocator.lock().unwrap(); diff --git a/rs/replicated_state/src/page_map/page_allocator/mmap/tests.rs b/rs/replicated_state/src/page_map/page_allocator/mmap/tests.rs index 6c3faf9fa2a..475ac1ecd26 100644 --- a/rs/replicated_state/src/page_map/page_allocator/mmap/tests.rs +++ b/rs/replicated_state/src/page_map/page_allocator/mmap/tests.rs @@ -57,3 +57,73 @@ fn test_page_validation_non_zero_middle_byte() { ); assert_eq!(pages[0].1 .0.validation.non_zero_word_value, 42 * 256); } + +#[test] +fn test_page_allocator_allocate_fastpath() { + // Create an allocator and allocate 4 pages using the slow path. + const N_PAGES: usize = 100; + let page_allocator = Arc::new(PageAllocatorInner::new_for_testing()); + let contents_slow = [13u8; PAGE_SIZE * N_PAGES]; + let pages_slow = PageAllocatorInner::allocate( + &page_allocator, + &[ + ( + PageIndex::new(0), + &contents_slow[0..PAGE_SIZE].try_into().unwrap(), + ), + ( + PageIndex::new(1), + &contents_slow[PAGE_SIZE..2 * PAGE_SIZE].try_into().unwrap(), + ), + ( + PageIndex::new(2), + &contents_slow[2 * PAGE_SIZE..3 * PAGE_SIZE] + .try_into() + .unwrap(), + ), + ( + PageIndex::new(3), + &contents_slow[3 * PAGE_SIZE..4 * PAGE_SIZE] + .try_into() + .unwrap(), + ), + ], + ); + + // Create another allocator and allocate 4 pages using the fast path. + let page_allocator = Arc::new(PageAllocatorInner::new_for_testing()); + let contents_fast = [13u8; PAGE_SIZE * N_PAGES]; + let pages_fast = PageAllocatorInner::allocate_fastpath( + &page_allocator, + &[ + ( + PageIndex::new(0), + &contents_fast[0..PAGE_SIZE].try_into().unwrap(), + ), + ( + PageIndex::new(1), + &contents_fast[PAGE_SIZE..2 * PAGE_SIZE].try_into().unwrap(), + ), + ( + PageIndex::new(2), + &contents_fast[2 * PAGE_SIZE..3 * PAGE_SIZE] + .try_into() + .unwrap(), + ), + ( + PageIndex::new(3), + &contents_fast[3 * PAGE_SIZE..4 * PAGE_SIZE] + .try_into() + .unwrap(), + ), + ], + ); + + // Check that the pages are the same. + assert_eq!(pages_slow.len(), pages_fast.len()); + for (page_slow, page_fast) in pages_slow.iter().zip(pages_fast.iter()) { + let contents_slow = page_slow.1 .0.contents(); + let contents_fast = page_fast.1 .0.contents(); + assert_eq!(contents_slow, contents_fast); + } +}