diff --git a/src/mito2/src/memtable/partition_tree/tree.rs b/src/mito2/src/memtable/partition_tree/tree.rs index d02b13ddb47a..8f02fc688725 100644 --- a/src/mito2/src/memtable/partition_tree/tree.rs +++ b/src/mito2/src/memtable/partition_tree/tree.rs @@ -96,6 +96,19 @@ impl PartitionTree { } } + fn verify_primary_key_length(&self, kv: &KeyValue) -> Result<()> { + if let Some(expected_num_fields) = self.row_codec.num_fields() { + ensure!( + expected_num_fields == kv.num_primary_keys(), + PrimaryKeyLengthMismatchSnafu { + expect: expected_num_fields, + actual: kv.num_primary_keys(), + } + ); + } + Ok(()) + } + // TODO(yingwen): The size computed from values is inaccurate. /// Write key-values into the tree. /// @@ -110,13 +123,7 @@ impl PartitionTree { let has_pk = !self.metadata.primary_key.is_empty(); for kv in kvs.iter() { - ensure!( - kv.num_primary_keys() == self.row_codec.num_fields(), - PrimaryKeyLengthMismatchSnafu { - expect: self.row_codec.num_fields(), - actual: kv.num_primary_keys(), - } - ); + self.verify_primary_key_length(&kv)?; // Safety: timestamp of kv must be both present and a valid timestamp value. let ts = kv.timestamp().as_timestamp().unwrap().unwrap().value(); metrics.min_ts = metrics.min_ts.min(ts); @@ -161,13 +168,7 @@ impl PartitionTree { ) -> Result<()> { let has_pk = !self.metadata.primary_key.is_empty(); - ensure!( - kv.num_primary_keys() == self.row_codec.num_fields(), - PrimaryKeyLengthMismatchSnafu { - expect: self.row_codec.num_fields(), - actual: kv.num_primary_keys(), - } - ); + self.verify_primary_key_length(&kv)?; // Safety: timestamp of kv must be both present and a valid timestamp value. let ts = kv.timestamp().as_timestamp().unwrap().unwrap().value(); metrics.min_ts = metrics.min_ts.min(ts); diff --git a/src/mito2/src/memtable/time_series.rs b/src/mito2/src/memtable/time_series.rs index c7e341db2e6e..c1617e683090 100644 --- a/src/mito2/src/memtable/time_series.rs +++ b/src/mito2/src/memtable/time_series.rs @@ -145,13 +145,16 @@ impl TimeSeriesMemtable { } fn write_key_value(&self, kv: KeyValue, stats: &mut WriteMetrics) -> Result<()> { - ensure!( - kv.num_primary_keys() == self.row_codec.num_fields(), - PrimaryKeyLengthMismatchSnafu { - expect: self.row_codec.num_fields(), - actual: kv.num_primary_keys() - } - ); + if let Some(expected_num_fields) = self.row_codec.num_fields() { + ensure!( + expected_num_fields == kv.num_primary_keys(), + PrimaryKeyLengthMismatchSnafu { + expect: expected_num_fields, + actual: kv.num_primary_keys(), + } + ); + } + let primary_key_encoded = self.row_codec.encode(kv.primary_keys())?; let fields = kv.fields().collect::>(); diff --git a/src/mito2/src/row_converter.rs b/src/mito2/src/row_converter.rs index 7402623b781f..4d0635d3cc45 100644 --- a/src/mito2/src/row_converter.rs +++ b/src/mito2/src/row_converter.rs @@ -111,7 +111,7 @@ pub trait PrimaryKeyCodec: Send + Sync + Debug { ) -> Result<()>; /// Returns the number of fields in the primary key. - fn num_fields(&self) -> usize; + fn num_fields(&self) -> Option; /// Returns a primary key filter factory. fn primary_key_filter( diff --git a/src/mito2/src/row_converter/dense.rs b/src/mito2/src/row_converter/dense.rs index 35a618718079..28304dd78263 100644 --- a/src/mito2/src/row_converter/dense.rs +++ b/src/mito2/src/row_converter/dense.rs @@ -324,21 +324,17 @@ pub struct DensePrimaryKeyCodec { impl DensePrimaryKeyCodec { pub fn new(metadata: &RegionMetadata) -> Self { - let ordered_primary_key_columns = Arc::new( - metadata - .primary_key_columns() - .map(|c| { - ( - c.column_id, - SortField::new(c.column_schema.data_type.clone()), - ) - }) - .collect::>(), - ); - - Self { - ordered_primary_key_columns, - } + let ordered_primary_key_columns = metadata + .primary_key_columns() + .map(|c| { + ( + c.column_id, + SortField::new(c.column_schema.data_type.clone()), + ) + }) + .collect::>(); + + Self::with_fields(ordered_primary_key_columns) } pub fn with_fields(fields: Vec<(ColumnId, SortField)>) -> Self { @@ -468,8 +464,8 @@ impl PrimaryKeyCodec for DensePrimaryKeyCodec { Some(self.estimated_size()) } - fn num_fields(&self) -> usize { - self.ordered_primary_key_columns.len() + fn num_fields(&self) -> Option { + Some(self.ordered_primary_key_columns.len()) } fn encoding(&self) -> PrimaryKeyEncoding { diff --git a/src/mito2/src/row_converter/sparse.rs b/src/mito2/src/row_converter/sparse.rs index 93ee2a0a18e7..18b4da99fafb 100644 --- a/src/mito2/src/row_converter/sparse.rs +++ b/src/mito2/src/row_converter/sparse.rs @@ -26,7 +26,7 @@ use store_api::metadata::RegionMetadataRef; use store_api::storage::consts::ReservedColumnId; use store_api::storage::ColumnId; -use crate::error::{DeserializeFieldSnafu, Result, SerializeFieldSnafu}; +use crate::error::{DeserializeFieldSnafu, Result, SerializeFieldSnafu, UnsupportedOperationSnafu}; use crate::memtable::key_values::KeyValue; use crate::memtable::partition_tree::SparsePrimaryKeyFilter; use crate::row_converter::dense::SortField; @@ -226,7 +226,10 @@ impl SparsePrimaryKeyCodec { impl PrimaryKeyCodec for SparsePrimaryKeyCodec { fn encode_key_value(&self, _key_value: &KeyValue, _buffer: &mut Vec) -> Result<()> { - todo!() + UnsupportedOperationSnafu { + err_msg: "The encode_key_value method is not supported in SparsePrimaryKeyCodec.", + } + .fail() } fn encode_values(&self, values: &[(ColumnId, Value)], buffer: &mut Vec) -> Result<()> { @@ -245,8 +248,8 @@ impl PrimaryKeyCodec for SparsePrimaryKeyCodec { None } - fn num_fields(&self) -> usize { - todo!() + fn num_fields(&self) -> Option { + None } fn encoding(&self) -> PrimaryKeyEncoding { diff --git a/src/mito2/src/sst/index/bloom_filter/creator.rs b/src/mito2/src/sst/index/bloom_filter/creator.rs index d9d50b2a16d9..cdc21538cb9c 100644 --- a/src/mito2/src/sst/index/bloom_filter/creator.rs +++ b/src/mito2/src/sst/index/bloom_filter/creator.rs @@ -108,7 +108,10 @@ impl BloomFilterIndexer { return Ok(None); } - let codec = IndexValuesCodec::from_tag_columns(metadata.primary_key_columns()); + let codec = IndexValuesCodec::from_tag_columns( + metadata.primary_key_encoding, + metadata.primary_key_columns(), + ); let indexer = Self { creators, temp_file_provider, diff --git a/src/mito2/src/sst/index/codec.rs b/src/mito2/src/sst/index/codec.rs index 8577309c2431..5d08cc7b2934 100644 --- a/src/mito2/src/sst/index/codec.rs +++ b/src/mito2/src/sst/index/codec.rs @@ -13,16 +13,20 @@ // limitations under the License. use std::collections::HashMap; +use std::sync::Arc; use datatypes::data_type::ConcreteDataType; use datatypes::value::ValueRef; use memcomparable::Serializer; use snafu::{ensure, OptionExt, ResultExt}; +use store_api::codec::PrimaryKeyEncoding; use store_api::metadata::ColumnMetadata; use store_api::storage::ColumnId; use crate::error::{FieldTypeMismatchSnafu, IndexEncodeNullSnafu, Result}; -use crate::row_converter::{CompositeValues, DensePrimaryKeyCodec, PrimaryKeyCodec, SortField}; +use crate::row_converter::{ + build_primary_key_codec_with_fields, CompositeValues, PrimaryKeyCodec, SortField, +}; /// Encodes index values according to their data types for sorting and storage use. pub struct IndexValueCodec; @@ -68,12 +72,15 @@ pub struct IndexValuesCodec { /// The data types of tag columns. fields: Vec<(ColumnId, SortField)>, /// The decoder for the primary key. - decoder: DensePrimaryKeyCodec, + decoder: Arc, } impl IndexValuesCodec { /// Creates a new `IndexValuesCodec` from a list of `ColumnMetadata` of tag columns. - pub fn from_tag_columns<'a>(tag_columns: impl Iterator) -> Self { + pub fn from_tag_columns<'a>( + primary_key_encoding: PrimaryKeyEncoding, + tag_columns: impl Iterator, + ) -> Self { let (column_ids, fields): (Vec<_>, Vec<_>) = tag_columns .map(|column| { ( @@ -87,8 +94,9 @@ impl IndexValuesCodec { .unzip(); let column_ids = column_ids.into_iter().collect(); + let decoder = + build_primary_key_codec_with_fields(primary_key_encoding, fields.clone().into_iter()); - let decoder = DensePrimaryKeyCodec::with_fields(fields.clone()); Self { column_ids, fields, @@ -115,10 +123,13 @@ impl IndexValuesCodec { #[cfg(test)] mod tests { use datatypes::data_type::ConcreteDataType; + use datatypes::schema::ColumnSchema; + use datatypes::value::Value; + use store_api::metadata::ColumnMetadata; use super::*; use crate::error::Error; - use crate::row_converter::SortField; + use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SortField}; #[test] fn test_encode_value_basic() { @@ -152,42 +163,32 @@ mod tests { #[test] fn test_decode_primary_key_basic() { - // TODO(weny, zhenchi): rewrite the test. - // let tag_columns = vec![ - // ColumnMetadata { - // column_schema: ColumnSchema::new("tag0", ConcreteDataType::string_datatype(), true), - // semantic_type: api::v1::SemanticType::Tag, - // column_id: 1, - // }, - // ColumnMetadata { - // column_schema: ColumnSchema::new("tag1", ConcreteDataType::int64_datatype(), false), - // semantic_type: api::v1::SemanticType::Tag, - // column_id: 2, - // }, - // ]; - - // let primary_key = DensePrimaryKeyCodec::with_fields(vec![ - // (0, SortField::new(ConcreteDataType::string_datatype())), - // (1, SortField::new(ConcreteDataType::int64_datatype())), - // ]) - // .encode([ValueRef::Null, ValueRef::Int64(10)].into_iter()) - // .unwrap(); - - // let codec = IndexValuesCodec::from_tag_columns(tag_columns.iter()); - // let mut iter = codec.decode(&primary_key).unwrap(); - - // let ((column_id, col_id_str), field, value) = iter.next().unwrap(); - // assert_eq!(*column_id, 1); - // assert_eq!(col_id_str, "1"); - // assert_eq!(field, &SortField::new(ConcreteDataType::string_datatype())); - // assert_eq!(value, None); - - // let ((column_id, col_id_str), field, value) = iter.next().unwrap(); - // assert_eq!(*column_id, 2); - // assert_eq!(col_id_str, "2"); - // assert_eq!(field, &SortField::new(ConcreteDataType::int64_datatype())); - // assert_eq!(value, Some(Value::Int64(10))); - - // assert!(iter.next().is_none()); + let tag_columns = vec![ + ColumnMetadata { + column_schema: ColumnSchema::new("tag0", ConcreteDataType::string_datatype(), true), + semantic_type: api::v1::SemanticType::Tag, + column_id: 1, + }, + ColumnMetadata { + column_schema: ColumnSchema::new("tag1", ConcreteDataType::int64_datatype(), false), + semantic_type: api::v1::SemanticType::Tag, + column_id: 2, + }, + ]; + + let primary_key = DensePrimaryKeyCodec::with_fields(vec![ + (0, SortField::new(ConcreteDataType::string_datatype())), + (1, SortField::new(ConcreteDataType::int64_datatype())), + ]) + .encode([ValueRef::Null, ValueRef::Int64(10)].into_iter()) + .unwrap(); + + let codec = + IndexValuesCodec::from_tag_columns(PrimaryKeyEncoding::Dense, tag_columns.iter()); + let values = codec.decode(&primary_key).unwrap().into_dense(); + + assert_eq!(values.len(), 2); + assert_eq!(values[0], Value::Null); + assert_eq!(values[1], Value::Int64(10)); } } diff --git a/src/mito2/src/sst/index/inverted_index/creator.rs b/src/mito2/src/sst/index/inverted_index/creator.rs index d603b7203624..7903f2a496d9 100644 --- a/src/mito2/src/sst/index/inverted_index/creator.rs +++ b/src/mito2/src/sst/index/inverted_index/creator.rs @@ -101,7 +101,10 @@ impl InvertedIndexer { ); let index_creator = Box::new(SortIndexCreator::new(sorter, segment_row_count)); - let codec = IndexValuesCodec::from_tag_columns(metadata.primary_key_columns()); + let codec = IndexValuesCodec::from_tag_columns( + metadata.primary_key_encoding, + metadata.primary_key_columns(), + ); Self { codec, index_creator, diff --git a/src/mito2/src/sst/parquet/format.rs b/src/mito2/src/sst/parquet/format.rs index 9de3f411ec73..dafb93a084ae 100644 --- a/src/mito2/src/sst/parquet/format.rs +++ b/src/mito2/src/sst/parquet/format.rs @@ -41,7 +41,6 @@ use datatypes::vectors::{Helper, Vector}; use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData}; use parquet::file::statistics::Statistics; use snafu::{ensure, OptionExt, ResultExt}; -use store_api::codec::PrimaryKeyEncoding; use store_api::metadata::{ColumnMetadata, RegionMetadataRef}; use store_api::storage::{ColumnId, SequenceNumber}; @@ -308,9 +307,8 @@ impl ReadFormat { column_id: ColumnId, ) -> Option { let column = self.metadata.column_by_id(column_id)?; - let primary_key_encoding = self.metadata.primary_key_encoding; match column.semantic_type { - SemanticType::Tag => self.tag_values(row_groups, column, true, primary_key_encoding), + SemanticType::Tag => self.tag_values(row_groups, column, true), SemanticType::Field => { let index = self.field_id_to_index.get(&column_id)?; Self::column_values(row_groups, column, *index, true) @@ -329,9 +327,8 @@ impl ReadFormat { column_id: ColumnId, ) -> Option { let column = self.metadata.column_by_id(column_id)?; - let primary_key_encoding = self.metadata.primary_key_encoding; match column.semantic_type { - SemanticType::Tag => self.tag_values(row_groups, column, false, primary_key_encoding), + SemanticType::Tag => self.tag_values(row_groups, column, false), SemanticType::Field => { let index = self.field_id_to_index.get(&column_id)?; Self::column_values(row_groups, column, *index, false) @@ -393,8 +390,8 @@ impl ReadFormat { row_groups: &[impl Borrow], column: &ColumnMetadata, is_min: bool, - primary_key_encoding: PrimaryKeyEncoding, ) -> Option { + let primary_key_encoding = self.metadata.primary_key_encoding; let is_first_tag = self .metadata .primary_key