Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: write manifest notifications to remote WAL #4678

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ etcd-client = { version = "0.13" }
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "c437b55725b7f5224fe9d46db21072b4a682ee4b" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "b27d5d0f6b88b0416ee622623f25b99b81a742ac" }
humantime = "2.1"
humantime-serde = "1.1"
itertools = "0.10"
Expand Down
1 change: 1 addition & 0 deletions src/mito2/benches/memtable_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ impl CpuDataGenerator {
schema: self.column_schemas.clone(),
rows,
}),
manifest_notification: None,
};

KeyValues::new(&self.metadata, mutation).unwrap()
Expand Down
9 changes: 5 additions & 4 deletions src/mito2/src/compaction/compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use object_store::manager::ObjectStoreManagerRef;
use serde::{Deserialize, Serialize};
use smallvec::SmallVec;
use snafu::{OptionExt, ResultExt};
use store_api::manifest::ManifestVersion;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::RegionId;

Expand Down Expand Up @@ -217,7 +218,7 @@ pub trait Compactor: Send + Sync + 'static {
&self,
compaction_region: &CompactionRegion,
merge_output: MergeOutput,
) -> Result<RegionEdit>;
) -> Result<(ManifestVersion, RegionEdit)>;

/// Execute compaction for a region.
async fn compact(
Expand Down Expand Up @@ -364,7 +365,7 @@ impl Compactor for DefaultCompactor {
&self,
compaction_region: &CompactionRegion,
merge_output: MergeOutput,
) -> Result<RegionEdit> {
) -> Result<(ManifestVersion, RegionEdit)> {
// Write region edit to manifest.
let edit = RegionEdit {
files_to_add: merge_output.files_to_add,
Expand All @@ -378,12 +379,12 @@ impl Compactor for DefaultCompactor {

let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
// TODO: We might leak files if we fail to update manifest. We can add a cleanup task to remove them later.
compaction_region
let version = compaction_region
.manifest_ctx
.update_manifest(Writable, action_list)
.await?;

Ok(edit)
Ok((version, edit))
}

// The default implementation of compact combines the merge_ssts and update_manifest functions.
Expand Down
18 changes: 11 additions & 7 deletions src/mito2/src/compaction/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::time::Instant;

use common_telemetry::{error, info};
use snafu::ResultExt;
use store_api::manifest::ManifestVersion;
use tokio::sync::mpsc;

use crate::compaction::compactor::{CompactionRegion, Compactor};
Expand Down Expand Up @@ -77,7 +78,7 @@ impl CompactionTaskImpl {
.for_each(|o| o.inputs.iter().for_each(|f| f.set_compacting(compacting)));
}

async fn handle_compaction(&mut self) -> error::Result<RegionEdit> {
async fn handle_compaction(&mut self) -> error::Result<(ManifestVersion, RegionEdit)> {
self.mark_files_compacting(true);

let merge_timer = COMPACTION_STAGE_ELAPSED
Expand Down Expand Up @@ -146,12 +147,15 @@ impl CompactionTaskImpl {
impl CompactionTask for CompactionTaskImpl {
async fn run(&mut self) {
let notify = match self.handle_compaction().await {
Ok(edit) => BackgroundNotify::CompactionFinished(CompactionFinished {
region_id: self.compaction_region.region_id,
senders: std::mem::take(&mut self.waiters),
start_time: self.start_time,
edit,
}),
Ok((manifest_version, edit)) => {
BackgroundNotify::CompactionFinished(CompactionFinished {
region_id: self.compaction_region.region_id,
senders: std::mem::take(&mut self.waiters),
start_time: self.start_time,
edit,
manifest_version,
})
}
Err(e) => {
error!(e; "Failed to compact region, region id: {}", self.compaction_region.region_id);
let err = Arc::new(e);
Expand Down
7 changes: 6 additions & 1 deletion src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ use object_store::manager::ObjectStoreManagerRef;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::logstore::provider::Provider;
use store_api::logstore::LogStore;
use store_api::manifest::ManifestVersion;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{
BatchResponses, RegionEngine, RegionRole, RegionScannerRef, SetReadonlyResponse,
Expand Down Expand Up @@ -184,7 +185,11 @@ impl MitoEngine {
/// Now we only allow adding files to region (the [RegionEdit] struct can only contain a non-empty "files_to_add" field).
/// Other region editing intention will result in an "invalid request" error.
/// Also note that if a region is to be edited directly, we MUST not write data to it thereafter.
pub async fn edit_region(&self, region_id: RegionId, edit: RegionEdit) -> Result<()> {
pub async fn edit_region(
&self,
region_id: RegionId,
edit: RegionEdit,
) -> Result<ManifestVersion> {
let _timer = HANDLE_REQUEST_ELAPSED
.with_label_values(&["edit_region"])
.start_timer();
Expand Down
59 changes: 57 additions & 2 deletions src/mito2/src/engine/alter_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ use common_error::status_code::StatusCode;
use common_recordbatch::RecordBatches;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use futures::TryStreamExt;
use log_store::kafka::log_store::KafkaLogStore;
use rstest::rstest;
use rstest_reuse::{self, apply};
use store_api::logstore::provider::Provider;
use store_api::logstore::LogStore;
use store_api::metadata::ColumnMetadata;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{
Expand All @@ -34,9 +40,12 @@ use crate::config::MitoConfig;
use crate::engine::listener::AlterFlushListener;
use crate::engine::MitoEngine;
use crate::test_util::{
build_rows, build_rows_for_key, flush_region, put_rows, rows_schema, CreateRequestBuilder,
TestEnv,
build_rows, build_rows_for_key, flush_region, kafka_log_store_factory,
prepare_test_for_kafka_log_store, put_rows, rows_schema, single_kafka_log_store_factory,
CreateRequestBuilder, LogStoreFactory, TestEnv,
};
use crate::wal::entry_reader::decode_stream;
use crate::wal::raw_entry_reader::flatten_stream;

async fn scan_check_after_alter(engine: &MitoEngine, region_id: RegionId, expected: &str) {
let request = ScanRequest::default();
Expand Down Expand Up @@ -68,6 +77,52 @@ fn add_tag1() -> RegionAlterRequest {
}
}

#[apply(single_kafka_log_store_factory)]
async fn test_alter_region_notification(factory: Option<LogStoreFactory>) {
common_telemetry::init_default_ut_logging();
let Some(factory) = factory else {
return;
};

let mut env =
TestEnv::with_prefix("alter-notification").with_log_store_factory(factory.clone());
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let topic = prepare_test_for_kafka_log_store(&factory).await;
let request = CreateRequestBuilder::new()
.kafka_topic(topic.clone())
.build();
let column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows_for_key("a", 0, 3, 0),
};
put_rows(&engine, region_id, rows).await;
let request = add_tag1();
engine
.handle_request(region_id, RegionRequest::Alter(request))
.await
.unwrap();

let topic = topic.unwrap();
let log_store = env.log_store().unwrap().into_kafka_log_store();
let provider = Provider::kafka_provider(topic);
let stream = log_store.read(&provider, 0, None).await.unwrap();
let entries = decode_stream(flatten_stream::<KafkaLogStore>(stream, provider.clone()))
.try_collect::<Vec<_>>()
.await
.unwrap();

// Flush sst notification
assert_eq!(entries[1].1.mutations[0].op_type(), api::v1::OpType::Notify);
// Modify table metadata notification
assert_eq!(entries[2].1.mutations[0].op_type(), api::v1::OpType::Notify);
}

#[tokio::test]
async fn test_alter_region() {
common_telemetry::init_default_ut_logging();
Expand Down
72 changes: 71 additions & 1 deletion src/mito2/src/engine/compaction_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ use api::v1::{ColumnSchema, Rows};
use common_recordbatch::{RecordBatches, SendableRecordBatchStream};
use datatypes::prelude::ScalarVector;
use datatypes::vectors::TimestampMillisecondVector;
use futures::TryStreamExt;
use log_store::kafka::log_store::KafkaLogStore;
use rstest::rstest;
use rstest_reuse::{self, apply};
use store_api::logstore::provider::Provider;
use store_api::logstore::LogStore;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{
RegionCompactRequest, RegionDeleteRequest, RegionFlushRequest, RegionRequest,
Expand All @@ -30,8 +36,12 @@ use crate::config::MitoConfig;
use crate::engine::listener::CompactionListener;
use crate::engine::MitoEngine;
use crate::test_util::{
build_rows_for_key, column_metadata_to_column_schema, put_rows, CreateRequestBuilder, TestEnv,
build_rows_for_key, column_metadata_to_column_schema, kafka_log_store_factory,
prepare_test_for_kafka_log_store, put_rows, single_kafka_log_store_factory,
CreateRequestBuilder, LogStoreFactory, TestEnv,
};
use crate::wal::entry_reader::decode_stream;
use crate::wal::raw_entry_reader::flatten_stream;

async fn put_and_flush(
engine: &MitoEngine,
Expand Down Expand Up @@ -105,6 +115,66 @@ async fn collect_stream_ts(stream: SendableRecordBatchStream) -> Vec<i64> {
res
}

#[apply(single_kafka_log_store_factory)]
async fn test_compaction_region_notification(factory: Option<LogStoreFactory>) {
common_telemetry::init_default_ut_logging();
let Some(factory) = factory else {
return;
};

let mut env =
TestEnv::with_prefix("compaction_notification").with_log_store_factory(factory.clone());
let engine = env.create_engine(MitoConfig::default()).await;
let topic = prepare_test_for_kafka_log_store(&factory).await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new()
.kafka_topic(topic.clone())
.insert_option("compaction.type", "twcs")
.insert_option("compaction.twcs.max_active_window_runs", "1")
.insert_option("compaction.twcs.max_inactive_window_runs", "1")
.build();

let column_schemas = request
.column_metadatas
.iter()
.map(column_metadata_to_column_schema)
.collect::<Vec<_>>();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
// Flush 5 SSTs for compaction.
put_and_flush(&engine, region_id, &column_schemas, 0..10).await;
put_and_flush(&engine, region_id, &column_schemas, 10..20).await;
put_and_flush(&engine, region_id, &column_schemas, 20..30).await;
delete_and_flush(&engine, region_id, &column_schemas, 15..30).await;
put_and_flush(&engine, region_id, &column_schemas, 15..25).await;

let result = engine
.handle_request(
region_id,
RegionRequest::Compact(RegionCompactRequest::default()),
)
.await
.unwrap();
assert_eq!(result.affected_rows, 0);

let topic = topic.unwrap();
let log_store = env.log_store().unwrap().into_kafka_log_store();
let provider = Provider::kafka_provider(topic);
let stream = log_store.read(&provider, 0, None).await.unwrap();
let entries = decode_stream(flatten_stream::<KafkaLogStore>(stream, provider.clone()))
.try_collect::<Vec<_>>()
.await
.unwrap();

let notifications = entries
.into_iter()
.filter(|(_, entry)| matches!(entry.mutations[0].op_type(), api::v1::OpType::Notify))
.count();
assert_eq!(notifications, 6);
}

#[tokio::test]
async fn test_compaction_region() {
common_telemetry::init_default_ut_logging();
Expand Down
70 changes: 69 additions & 1 deletion src/mito2/src/engine/edit_region_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,13 @@ use std::sync::{Arc, Mutex};
use std::time::Duration;

use common_time::util::current_time_millis;
use futures::TryStreamExt;
use log_store::kafka::log_store::KafkaLogStore;
use object_store::ObjectStore;
use rstest::rstest;
use rstest_reuse::{self, apply};
use store_api::logstore::provider::Provider;
use store_api::logstore::LogStore;
use store_api::region_engine::RegionEngine;
use store_api::region_request::RegionRequest;
use store_api::storage::RegionId;
Expand All @@ -29,7 +35,69 @@ use crate::engine::MitoEngine;
use crate::manifest::action::RegionEdit;
use crate::region::MitoRegionRef;
use crate::sst::file::{FileId, FileMeta};
use crate::test_util::{CreateRequestBuilder, TestEnv};
use crate::test_util::{
kafka_log_store_factory, prepare_test_for_kafka_log_store, single_kafka_log_store_factory,
CreateRequestBuilder, LogStoreFactory, TestEnv,
};
use crate::wal::entry_reader::decode_stream;
use crate::wal::raw_entry_reader::flatten_stream;

#[apply(single_kafka_log_store_factory)]
async fn test_edit_region_notification(factory: Option<LogStoreFactory>) {
common_telemetry::init_default_ut_logging();
let Some(factory) = factory else {
return;
};

let mut env = TestEnv::with_prefix("edit-notification").with_log_store_factory(factory.clone());
let engine = env.create_engine(MitoConfig::default()).await;
let topic = prepare_test_for_kafka_log_store(&factory).await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new()
.kafka_topic(topic.clone())
.build();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
let region = engine.get_region(region_id).unwrap();
let file_id = FileId::random();
// Simulating the ingestion of an SST file.
env.get_object_store()
.unwrap()
.write(
&format!("{}/{}.parquet", region.region_dir(), file_id),
b"x".as_slice(),
)
.await
.unwrap();
let edit = RegionEdit {
files_to_add: vec![FileMeta {
region_id: region.region_id,
file_id,
level: 0,
..Default::default()
}],
files_to_remove: vec![],
compaction_time_window: None,
flushed_entry_id: None,
flushed_sequence: None,
};
engine.edit_region(region.region_id, edit).await.unwrap();
let topic = topic.unwrap();
let log_store = env.log_store().unwrap().into_kafka_log_store();
let provider = Provider::kafka_provider(topic);
let stream = log_store.read(&provider, 0, None).await.unwrap();
let entries = decode_stream(flatten_stream::<KafkaLogStore>(stream, provider.clone()))
.try_collect::<Vec<_>>()
.await
.unwrap();
let notifications = entries
.into_iter()
.filter(|(_, entry)| matches!(entry.mutations[0].op_type(), api::v1::OpType::Notify))
.count();
assert_eq!(notifications, 1);
}

#[tokio::test]
async fn test_edit_region_schedule_compaction() {
Expand Down
Loading