Skip to content

Commit

Permalink
feat: set default compaction parallelism (GreptimeTeam#5371)
Browse files Browse the repository at this point in the history
Signed-off-by: Ruihang Xia <[email protected]>
  • Loading branch information
waynexia authored and evenyag committed Jan 23, 2025
1 parent a78908a commit 84f18ce
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 4 deletions.
14 changes: 14 additions & 0 deletions src/mito2/src/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::{RegionId, TableId};
use table::predicate::Predicate;
use task::MAX_PARALLEL_COMPACTION;
use tokio::sync::mpsc::{self, Sender};

use crate::access_layer::AccessLayerRef;
Expand Down Expand Up @@ -85,6 +86,7 @@ pub struct CompactionRequest {
pub(crate) manifest_ctx: ManifestContextRef,
pub(crate) listener: WorkerListener,
pub(crate) schema_metadata_manager: SchemaMetadataManagerRef,
pub(crate) max_parallelism: usize,
}

impl CompactionRequest {
Expand Down Expand Up @@ -145,6 +147,7 @@ impl CompactionScheduler {
waiter: OptionOutputTx,
manifest_ctx: &ManifestContextRef,
schema_metadata_manager: SchemaMetadataManagerRef,
max_parallelism: usize,
) -> Result<()> {
if let Some(status) = self.region_status.get_mut(&region_id) {
// Region is compacting. Add the waiter to pending list.
Expand All @@ -163,6 +166,7 @@ impl CompactionScheduler {
manifest_ctx,
self.listener.clone(),
schema_metadata_manager,
max_parallelism,
);
self.region_status.insert(region_id, status);
let result = self
Expand Down Expand Up @@ -193,6 +197,7 @@ impl CompactionScheduler {
manifest_ctx,
self.listener.clone(),
schema_metadata_manager,
MAX_PARALLEL_COMPACTION,
);
// Try to schedule next compaction task for this region.
if let Err(e) = self
Expand Down Expand Up @@ -264,6 +269,7 @@ impl CompactionScheduler {
manifest_ctx,
listener,
schema_metadata_manager,
max_parallelism,
} = request;

let ttl = find_ttl(
Expand Down Expand Up @@ -294,6 +300,7 @@ impl CompactionScheduler {
manifest_ctx: manifest_ctx.clone(),
file_purger: None,
ttl: Some(ttl),
max_parallelism,
};

let picker_output = {
Expand Down Expand Up @@ -521,6 +528,7 @@ impl CompactionStatus {
manifest_ctx: &ManifestContextRef,
listener: WorkerListener,
schema_metadata_manager: SchemaMetadataManagerRef,
max_parallelism: usize,
) -> CompactionRequest {
let current_version = CompactionVersion::from(self.version_control.current().version);
let start_time = Instant::now();
Expand All @@ -535,6 +543,7 @@ impl CompactionStatus {
manifest_ctx: manifest_ctx.clone(),
listener,
schema_metadata_manager,
max_parallelism,
};

if let Some(pending) = self.pending_compaction.take() {
Expand Down Expand Up @@ -722,6 +731,7 @@ mod tests {
waiter,
&manifest_ctx,
schema_metadata_manager.clone(),
1,
)
.await
.unwrap();
Expand All @@ -742,6 +752,7 @@ mod tests {
waiter,
&manifest_ctx,
schema_metadata_manager,
1,
)
.await
.unwrap();
Expand Down Expand Up @@ -795,6 +806,7 @@ mod tests {
OptionOutputTx::none(),
&manifest_ctx,
schema_metadata_manager.clone(),
1,
)
.await
.unwrap();
Expand Down Expand Up @@ -825,6 +837,7 @@ mod tests {
OptionOutputTx::none(),
&manifest_ctx,
schema_metadata_manager.clone(),
1,
)
.await
.unwrap();
Expand Down Expand Up @@ -860,6 +873,7 @@ mod tests {
OptionOutputTx::none(),
&manifest_ctx,
schema_metadata_manager,
1,
)
.await
.unwrap();
Expand Down
14 changes: 11 additions & 3 deletions src/mito2/src/compaction/compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ pub struct CompactionRegion {
pub(crate) current_version: CompactionVersion,
pub(crate) file_purger: Option<Arc<LocalFilePurger>>,
pub(crate) ttl: Option<TimeToLive>,

/// Controls the parallelism of this compaction task. Default is 1.
///
/// The parallel is inside this compaction task, not across different compaction tasks.
/// It can be different windows of the same compaction task or something like this.
pub max_parallelism: usize,
}

/// OpenCompactionRegionRequest represents the request to open a compaction region.
Expand All @@ -99,6 +105,7 @@ pub struct OpenCompactionRegionRequest {
pub region_id: RegionId,
pub region_dir: String,
pub region_options: RegionOptions,
pub max_parallelism: usize,
}

/// Open a compaction region from a compaction request.
Expand Down Expand Up @@ -205,6 +212,7 @@ pub async fn open_compaction_region(
current_version,
file_purger: Some(file_purger),
ttl: Some(ttl),
max_parallelism: req.max_parallelism,
})
}

Expand Down Expand Up @@ -266,6 +274,7 @@ impl Compactor for DefaultCompactor {
let mut futs = Vec::with_capacity(picker_output.outputs.len());
let mut compacted_inputs =
Vec::with_capacity(picker_output.outputs.iter().map(|o| o.inputs.len()).sum());
let internal_parallelism = compaction_region.max_parallelism.max(1);

for output in picker_output.outputs.drain(..) {
compacted_inputs.extend(output.inputs.iter().map(|f| f.meta_ref().clone()));
Expand Down Expand Up @@ -358,9 +367,8 @@ impl Compactor for DefaultCompactor {
}
let mut output_files = Vec::with_capacity(futs.len());
while !futs.is_empty() {
let mut task_chunk =
Vec::with_capacity(crate::compaction::task::MAX_PARALLEL_COMPACTION);
for _ in 0..crate::compaction::task::MAX_PARALLEL_COMPACTION {
let mut task_chunk = Vec::with_capacity(internal_parallelism);
for _ in 0..internal_parallelism {
if let Some(task) = futs.pop() {
task_chunk.push(common_runtime::spawn_compact(task));
}
Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/compaction/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::request::{
use crate::worker::WorkerListener;

/// Maximum number of compaction tasks in parallel.
pub const MAX_PARALLEL_COMPACTION: usize = 8;
pub const MAX_PARALLEL_COMPACTION: usize = 1;

pub(crate) struct CompactionTaskImpl {
pub compaction_region: CompactionRegion,
Expand Down
1 change: 1 addition & 0 deletions src/mito2/src/engine/open_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,7 @@ async fn test_open_compaction_region() {
region_id,
region_dir: region_dir.clone(),
region_options: RegionOptions::default(),
max_parallelism: 1,
};

let compaction_region = open_compaction_region(
Expand Down
3 changes: 3 additions & 0 deletions src/mito2/src/worker/handle_compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ impl<S> RegionWorkerLoop<S> {
sender,
&region.manifest_ctx,
self.schema_metadata_manager.clone(),
// TODO(yingwen): expose this to frontend
1,
)
.await
{
Expand Down Expand Up @@ -113,6 +115,7 @@ impl<S> RegionWorkerLoop<S> {
OptionOutputTx::none(),
&region.manifest_ctx,
self.schema_metadata_manager.clone(),
1,
)
.await
{
Expand Down

0 comments on commit 84f18ce

Please sign in to comment.