Skip to content

Commit

Permalink
feat(RUN-1032): Parallel dirty page copying for page allocator (#733)
Browse files Browse the repository at this point in the history
For messages that dirty large amounts of pages, speedups can be achieved
by copying the dirty pages in parallel at the end of message execution.
This MR parallelizes dirty page copying for messages that dirty more
than 64 MB.

---------

Co-authored-by: IDX GitHub Automation <[email protected]>
  • Loading branch information
alexandru-uta and IDX GitHub Automation authored Aug 7, 2024
1 parent 12e89bb commit c0373c6
Show file tree
Hide file tree
Showing 8 changed files with 136 additions and 4 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions rs/canister_sandbox/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,11 +172,11 @@ pub fn canister_sandbox_main() {
.expect("Error from the sandbox process due to unknown embedder config.");

// Currently Wasmtime uses the default rayon thread-pool with a thread per core.
// In production this results in 64 threads. This MR reduces the default
// thread pool size to 10 in the sandbox process because
// benchmarks show that 10 is the sweet spot.
// In production this results in 64 threads. The number of threads is set to 8,
// which is used for parallel page copying in the page allocator.
// The compilation rayon threads are now only used in the compiler sandbox.
rayon::ThreadPoolBuilder::new()
.num_threads(EmbeddersConfig::default().num_rayon_compilation_threads)
.num_threads(EmbeddersConfig::default().num_rayon_page_allocator_threads)
.build_global()
.unwrap();

Expand Down
7 changes: 7 additions & 0 deletions rs/config/src/embedders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ pub(crate) const DEFAULT_COST_TO_COMPILE_WASM_INSTRUCTION: NumInstructions =
/// The number of rayon threads used by wasmtime to compile wasm binaries
const DEFAULT_WASMTIME_RAYON_COMPILATION_THREADS: usize = 10;

/// The number of rayon threads use for the parallel page copying optimization.
const DEFAULT_PAGE_ALLOCATOR_THREADS: usize = 8;

/// Sandbox process eviction does not activate if the number of sandbox
/// processes is below this threshold.
pub(crate) const DEFAULT_MIN_SANDBOX_COUNT: usize = 500;
Expand Down Expand Up @@ -174,6 +177,9 @@ pub struct Config {
/// The number of rayon threads used by wasmtime to compile wasm binaries
pub num_rayon_compilation_threads: usize,

/// The number of the rayon threads used for the parallel page copying optimization.
pub num_rayon_page_allocator_threads: usize,

/// Flags to enable or disable features that are still experimental.
pub feature_flags: FeatureFlags,

Expand Down Expand Up @@ -237,6 +243,7 @@ impl Config {
max_sum_exported_function_name_lengths: MAX_SUM_EXPORTED_FUNCTION_NAME_LENGTHS,
cost_to_compile_wasm_instruction: DEFAULT_COST_TO_COMPILE_WASM_INSTRUCTION,
num_rayon_compilation_threads: DEFAULT_WASMTIME_RAYON_COMPILATION_THREADS,
num_rayon_page_allocator_threads: DEFAULT_PAGE_ALLOCATOR_THREADS,
feature_flags: FeatureFlags::const_default(),
metering_type: MeteringType::New,
stable_memory_dirty_page_limit: StableMemoryPageLimit {
Expand Down
1 change: 1 addition & 0 deletions rs/replicated_state/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ DEPENDENCIES = [
"@crate_index//:prometheus",
"@crate_index//:rand",
"@crate_index//:rand_chacha",
"@crate_index//:rayon",
"@crate_index//:serde",
"@crate_index//:slog",
"@crate_index//:strum",
Expand Down
1 change: 1 addition & 0 deletions rs/replicated_state/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ maplit = "1.0.2"
nix = { workspace = true }
rand = { workspace = true }
rand_chacha = { workspace = true }
rayon = { workspace = true }
phantom_newtype = { path = "../phantom_newtype" }
prometheus = { workspace = true }
serde = { workspace = true }
Expand Down
10 changes: 10 additions & 0 deletions rs/replicated_state/src/page_map/page_allocator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,14 @@ use mmap::{PageAllocatorId, PageAllocatorInner, PageInner};

pub use self::page_allocator_registry::PageAllocatorRegistry;
use super::{FileDescriptor, FileOffset, PageAllocatorFileDescriptor};
use ic_sys::PAGE_SIZE;

static ALLOCATED_PAGES: PageCounter = PageCounter::new();

/// Any allocation that is larger than this threshold will be copied in parallel
/// instead of the sequential code for smaller allocations.
const MIN_MEMORY_ALLOCATION_FOR_PARALLEL_COPY: usize = 64 * 1024 * 1024;

/// A clonable wrapper around a 4KiB memory page implementation.
/// It is mostly immutable after creation with the only exception of `Buffer`
/// modifying privately owned pages. The only way to create a page is via a
Expand Down Expand Up @@ -84,6 +89,11 @@ impl PageAllocator {
/// iterator. Knowing the page count beforehand allows the page allocator
/// to optimize allocation.
pub fn allocate(&self, pages: &[(PageIndex, &PageBytes)]) -> Vec<(PageIndex, Page)> {
// If the pages that need to be allocated and copied are more than MIN_MEMORY_ALLOCATION_FOR_PARALLEL_COPY,
// then we can call the fastpath allocator, which does parallel copying.
if pages.len() * PAGE_SIZE >= MIN_MEMORY_ALLOCATION_FOR_PARALLEL_COPY {
return PageAllocatorInner::allocate_fastpath(&self.0, pages);
}
PageAllocatorInner::allocate(&self.0, pages)
}

Expand Down
42 changes: 42 additions & 0 deletions rs/replicated_state/src/page_map/page_allocator/mmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use ic_sys::{page_bytes_from_ptr, PageBytes, PageIndex, PAGE_SIZE};
use ic_utils::deterministic_operations::deterministic_copy_from_slice;
use libc::{c_void, close};
use nix::sys::mman::{madvise, mmap, munmap, MapFlags, MmapAdvise, ProtFlags};
use rayon::prelude::*;
use serde::{Deserialize, Serialize};
use std::os::raw::c_int;
use std::os::unix::io::RawFd;
Expand Down Expand Up @@ -187,6 +188,47 @@ impl PageAllocatorInner {
.collect()
}

// This is the same functionality as `allocate`, but it uses rayon to parallelize
// the copying of pages. This is useful when the number of pages is large.
pub fn allocate_fastpath(
page_allocator: &Arc<Self>,
pages: &[(PageIndex, &PageBytes)],
) -> Vec<(PageIndex, Page)> {
let mut guard = page_allocator.core_allocator.lock().unwrap();
let allocator_creator = || {
MmapBasedPageAllocatorCore::new(Arc::clone(page_allocator.fd_factory.as_ref().unwrap()))
};
let core = guard.get_or_insert_with(allocator_creator);
// It would also be correct to increment the counters after all the
// allocations, but doing it before gives better performance because
// the core allocator can memory-map larger chunks.
ALLOCATED_PAGES.inc_by(pages.len());
core.allocated_pages += pages.len();

// Collect page addresses to avoid locking and copying for every page.
// This enables parallel copying of pages.
let mut allocated_pages_vec: Vec<_> = pages
.iter()
.map(|(_, _)| core.allocate_page(page_allocator))
.collect();

// Copy the contents of the pages in parallel using rayon parallel iterators.
// NB: the number of threads used are the same as the ones allocated when starting
// the sandbox, controlled by the embedders_config.num_rayon_page_allocator_threads.
allocated_pages_vec
.par_iter_mut()
.zip(pages.par_iter())
.for_each(|(allocated_page, delta_page)| {
allocated_page.copy_from_slice(0, delta_page.1);
});

allocated_pages_vec
.into_iter()
.zip(pages)
.map(|(allocated_page, delta_page)| (delta_page.0, Page(Arc::new(allocated_page))))
.collect()
}

// See the comments of the corresponding method in `PageAllocator`.
pub fn serialize(&self) -> PageAllocatorSerialization {
let mut guard = self.core_allocator.lock().unwrap();
Expand Down
70 changes: 70 additions & 0 deletions rs/replicated_state/src/page_map/page_allocator/mmap/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,73 @@ fn test_page_validation_non_zero_middle_byte() {
);
assert_eq!(pages[0].1 .0.validation.non_zero_word_value, 42 * 256);
}

#[test]
fn test_page_allocator_allocate_fastpath() {
// Create an allocator and allocate 4 pages using the slow path.
const N_PAGES: usize = 100;
let page_allocator = Arc::new(PageAllocatorInner::new_for_testing());
let contents_slow = [13u8; PAGE_SIZE * N_PAGES];
let pages_slow = PageAllocatorInner::allocate(
&page_allocator,
&[
(
PageIndex::new(0),
&contents_slow[0..PAGE_SIZE].try_into().unwrap(),
),
(
PageIndex::new(1),
&contents_slow[PAGE_SIZE..2 * PAGE_SIZE].try_into().unwrap(),
),
(
PageIndex::new(2),
&contents_slow[2 * PAGE_SIZE..3 * PAGE_SIZE]
.try_into()
.unwrap(),
),
(
PageIndex::new(3),
&contents_slow[3 * PAGE_SIZE..4 * PAGE_SIZE]
.try_into()
.unwrap(),
),
],
);

// Create another allocator and allocate 4 pages using the fast path.
let page_allocator = Arc::new(PageAllocatorInner::new_for_testing());
let contents_fast = [13u8; PAGE_SIZE * N_PAGES];
let pages_fast = PageAllocatorInner::allocate_fastpath(
&page_allocator,
&[
(
PageIndex::new(0),
&contents_fast[0..PAGE_SIZE].try_into().unwrap(),
),
(
PageIndex::new(1),
&contents_fast[PAGE_SIZE..2 * PAGE_SIZE].try_into().unwrap(),
),
(
PageIndex::new(2),
&contents_fast[2 * PAGE_SIZE..3 * PAGE_SIZE]
.try_into()
.unwrap(),
),
(
PageIndex::new(3),
&contents_fast[3 * PAGE_SIZE..4 * PAGE_SIZE]
.try_into()
.unwrap(),
),
],
);

// Check that the pages are the same.
assert_eq!(pages_slow.len(), pages_fast.len());
for (page_slow, page_fast) in pages_slow.iter().zip(pages_fast.iter()) {
let contents_slow = page_slow.1 .0.contents();
let contents_fast = page_fast.1 .0.contents();
assert_eq!(contents_slow, contents_fast);
}
}

0 comments on commit c0373c6

Please sign in to comment.