diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index bf8df5fcec7a..6f9e5c0261ff 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -40,6 +40,7 @@ use snafu::{OptionExt, ResultExt}; use store_api::metadata::RegionMetadataRef; use store_api::storage::{RegionId, TableId}; use table::predicate::Predicate; +use task::MAX_PARALLEL_COMPACTION; use tokio::sync::mpsc::{self, Sender}; use crate::access_layer::AccessLayerRef; @@ -85,6 +86,7 @@ pub struct CompactionRequest { pub(crate) manifest_ctx: ManifestContextRef, pub(crate) listener: WorkerListener, pub(crate) schema_metadata_manager: SchemaMetadataManagerRef, + pub(crate) max_parallelism: usize, } impl CompactionRequest { @@ -145,6 +147,7 @@ impl CompactionScheduler { waiter: OptionOutputTx, manifest_ctx: &ManifestContextRef, schema_metadata_manager: SchemaMetadataManagerRef, + max_parallelism: usize, ) -> Result<()> { if let Some(status) = self.region_status.get_mut(®ion_id) { // Region is compacting. Add the waiter to pending list. @@ -163,6 +166,7 @@ impl CompactionScheduler { manifest_ctx, self.listener.clone(), schema_metadata_manager, + max_parallelism, ); self.region_status.insert(region_id, status); let result = self @@ -193,6 +197,7 @@ impl CompactionScheduler { manifest_ctx, self.listener.clone(), schema_metadata_manager, + MAX_PARALLEL_COMPACTION, ); // Try to schedule next compaction task for this region. if let Err(e) = self @@ -264,6 +269,7 @@ impl CompactionScheduler { manifest_ctx, listener, schema_metadata_manager, + max_parallelism, } = request; let ttl = find_ttl( @@ -294,6 +300,7 @@ impl CompactionScheduler { manifest_ctx: manifest_ctx.clone(), file_purger: None, ttl: Some(ttl), + max_parallelism, }; let picker_output = { @@ -521,6 +528,7 @@ impl CompactionStatus { manifest_ctx: &ManifestContextRef, listener: WorkerListener, schema_metadata_manager: SchemaMetadataManagerRef, + max_parallelism: usize, ) -> CompactionRequest { let current_version = CompactionVersion::from(self.version_control.current().version); let start_time = Instant::now(); @@ -535,6 +543,7 @@ impl CompactionStatus { manifest_ctx: manifest_ctx.clone(), listener, schema_metadata_manager, + max_parallelism, }; if let Some(pending) = self.pending_compaction.take() { @@ -722,6 +731,7 @@ mod tests { waiter, &manifest_ctx, schema_metadata_manager.clone(), + 1, ) .await .unwrap(); @@ -742,6 +752,7 @@ mod tests { waiter, &manifest_ctx, schema_metadata_manager, + 1, ) .await .unwrap(); @@ -795,6 +806,7 @@ mod tests { OptionOutputTx::none(), &manifest_ctx, schema_metadata_manager.clone(), + 1, ) .await .unwrap(); @@ -825,6 +837,7 @@ mod tests { OptionOutputTx::none(), &manifest_ctx, schema_metadata_manager.clone(), + 1, ) .await .unwrap(); @@ -860,6 +873,7 @@ mod tests { OptionOutputTx::none(), &manifest_ctx, schema_metadata_manager, + 1, ) .await .unwrap(); diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index ceeb509bc17e..ae3aeea45b65 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -91,6 +91,12 @@ pub struct CompactionRegion { pub(crate) current_version: CompactionVersion, pub(crate) file_purger: Option>, pub(crate) ttl: Option, + + /// Controls the parallelism of this compaction task. Default is 1. + /// + /// The parallel is inside this compaction task, not across different compaction tasks. + /// It can be different windows of the same compaction task or something like this. + pub max_parallelism: usize, } /// OpenCompactionRegionRequest represents the request to open a compaction region. @@ -99,6 +105,7 @@ pub struct OpenCompactionRegionRequest { pub region_id: RegionId, pub region_dir: String, pub region_options: RegionOptions, + pub max_parallelism: usize, } /// Open a compaction region from a compaction request. @@ -205,6 +212,7 @@ pub async fn open_compaction_region( current_version, file_purger: Some(file_purger), ttl: Some(ttl), + max_parallelism: req.max_parallelism, }) } @@ -266,6 +274,7 @@ impl Compactor for DefaultCompactor { let mut futs = Vec::with_capacity(picker_output.outputs.len()); let mut compacted_inputs = Vec::with_capacity(picker_output.outputs.iter().map(|o| o.inputs.len()).sum()); + let internal_parallelism = compaction_region.max_parallelism.max(1); for output in picker_output.outputs.drain(..) { compacted_inputs.extend(output.inputs.iter().map(|f| f.meta_ref().clone())); @@ -358,9 +367,8 @@ impl Compactor for DefaultCompactor { } let mut output_files = Vec::with_capacity(futs.len()); while !futs.is_empty() { - let mut task_chunk = - Vec::with_capacity(crate::compaction::task::MAX_PARALLEL_COMPACTION); - for _ in 0..crate::compaction::task::MAX_PARALLEL_COMPACTION { + let mut task_chunk = Vec::with_capacity(internal_parallelism); + for _ in 0..internal_parallelism { if let Some(task) = futs.pop() { task_chunk.push(common_runtime::spawn_compact(task)); } diff --git a/src/mito2/src/compaction/task.rs b/src/mito2/src/compaction/task.rs index c76595097753..f083e09587fe 100644 --- a/src/mito2/src/compaction/task.rs +++ b/src/mito2/src/compaction/task.rs @@ -32,7 +32,7 @@ use crate::request::{ use crate::worker::WorkerListener; /// Maximum number of compaction tasks in parallel. -pub const MAX_PARALLEL_COMPACTION: usize = 8; +pub const MAX_PARALLEL_COMPACTION: usize = 1; pub(crate) struct CompactionTaskImpl { pub compaction_region: CompactionRegion, diff --git a/src/mito2/src/engine/open_test.rs b/src/mito2/src/engine/open_test.rs index 55bae04633f0..32b963b5067b 100644 --- a/src/mito2/src/engine/open_test.rs +++ b/src/mito2/src/engine/open_test.rs @@ -464,6 +464,7 @@ async fn test_open_compaction_region() { region_id, region_dir: region_dir.clone(), region_options: RegionOptions::default(), + max_parallelism: 1, }; let compaction_region = open_compaction_region( diff --git a/src/mito2/src/worker/handle_compaction.rs b/src/mito2/src/worker/handle_compaction.rs index 292eb237357b..6fb9f640f7c0 100644 --- a/src/mito2/src/worker/handle_compaction.rs +++ b/src/mito2/src/worker/handle_compaction.rs @@ -45,6 +45,8 @@ impl RegionWorkerLoop { sender, ®ion.manifest_ctx, self.schema_metadata_manager.clone(), + // TODO(yingwen): expose this to frontend + 1, ) .await { @@ -113,6 +115,7 @@ impl RegionWorkerLoop { OptionOutputTx::none(), ®ion.manifest_ctx, self.schema_metadata_manager.clone(), + 1, ) .await {