Skip to content

Commit

Permalink
chore: apply suggestions from CR
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Jan 22, 2025
1 parent b81b92a commit b19fae7
Show file tree
Hide file tree
Showing 9 changed files with 100 additions and 93 deletions.
29 changes: 15 additions & 14 deletions src/mito2/src/memtable/partition_tree/tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,19 @@ impl PartitionTree {
}
}

fn verify_primary_key_length(&self, kv: &KeyValue) -> Result<()> {
if let Some(expected_num_fields) = self.row_codec.num_fields() {
ensure!(
expected_num_fields == kv.num_primary_keys(),
PrimaryKeyLengthMismatchSnafu {
expect: expected_num_fields,
actual: kv.num_primary_keys(),
}
);
}
Ok(())
}

// TODO(yingwen): The size computed from values is inaccurate.
/// Write key-values into the tree.
///
Expand All @@ -110,13 +123,7 @@ impl PartitionTree {
let has_pk = !self.metadata.primary_key.is_empty();

for kv in kvs.iter() {
ensure!(
kv.num_primary_keys() == self.row_codec.num_fields(),
PrimaryKeyLengthMismatchSnafu {
expect: self.row_codec.num_fields(),
actual: kv.num_primary_keys(),
}
);
self.verify_primary_key_length(&kv)?;
// Safety: timestamp of kv must be both present and a valid timestamp value.
let ts = kv.timestamp().as_timestamp().unwrap().unwrap().value();
metrics.min_ts = metrics.min_ts.min(ts);
Expand Down Expand Up @@ -161,13 +168,7 @@ impl PartitionTree {
) -> Result<()> {
let has_pk = !self.metadata.primary_key.is_empty();

ensure!(
kv.num_primary_keys() == self.row_codec.num_fields(),
PrimaryKeyLengthMismatchSnafu {
expect: self.row_codec.num_fields(),
actual: kv.num_primary_keys(),
}
);
self.verify_primary_key_length(&kv)?;
// Safety: timestamp of kv must be both present and a valid timestamp value.
let ts = kv.timestamp().as_timestamp().unwrap().unwrap().value();
metrics.min_ts = metrics.min_ts.min(ts);
Expand Down
17 changes: 10 additions & 7 deletions src/mito2/src/memtable/time_series.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,13 +145,16 @@ impl TimeSeriesMemtable {
}

fn write_key_value(&self, kv: KeyValue, stats: &mut WriteMetrics) -> Result<()> {
ensure!(
kv.num_primary_keys() == self.row_codec.num_fields(),
PrimaryKeyLengthMismatchSnafu {
expect: self.row_codec.num_fields(),
actual: kv.num_primary_keys()
}
);
if let Some(expected_num_fields) = self.row_codec.num_fields() {
ensure!(
expected_num_fields == kv.num_primary_keys(),
PrimaryKeyLengthMismatchSnafu {
expect: expected_num_fields,
actual: kv.num_primary_keys(),
}
);
}

let primary_key_encoded = self.row_codec.encode(kv.primary_keys())?;
let fields = kv.fields().collect::<Vec<_>>();

Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/row_converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ pub trait PrimaryKeyCodec: Send + Sync + Debug {
) -> Result<()>;

/// Returns the number of fields in the primary key.
fn num_fields(&self) -> usize;
fn num_fields(&self) -> Option<usize>;

/// Returns a primary key filter factory.
fn primary_key_filter(
Expand Down
30 changes: 13 additions & 17 deletions src/mito2/src/row_converter/dense.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,21 +324,17 @@ pub struct DensePrimaryKeyCodec {

impl DensePrimaryKeyCodec {
pub fn new(metadata: &RegionMetadata) -> Self {
let ordered_primary_key_columns = Arc::new(
metadata
.primary_key_columns()
.map(|c| {
(
c.column_id,
SortField::new(c.column_schema.data_type.clone()),
)
})
.collect::<Vec<_>>(),
);

Self {
ordered_primary_key_columns,
}
let ordered_primary_key_columns = metadata
.primary_key_columns()
.map(|c| {
(
c.column_id,
SortField::new(c.column_schema.data_type.clone()),
)
})
.collect::<Vec<_>>();

Self::with_fields(ordered_primary_key_columns)
}

pub fn with_fields(fields: Vec<(ColumnId, SortField)>) -> Self {
Expand Down Expand Up @@ -468,8 +464,8 @@ impl PrimaryKeyCodec for DensePrimaryKeyCodec {
Some(self.estimated_size())
}

fn num_fields(&self) -> usize {
self.ordered_primary_key_columns.len()
fn num_fields(&self) -> Option<usize> {
Some(self.ordered_primary_key_columns.len())
}

fn encoding(&self) -> PrimaryKeyEncoding {
Expand Down
11 changes: 7 additions & 4 deletions src/mito2/src/row_converter/sparse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use store_api::metadata::RegionMetadataRef;
use store_api::storage::consts::ReservedColumnId;
use store_api::storage::ColumnId;

use crate::error::{DeserializeFieldSnafu, Result, SerializeFieldSnafu};
use crate::error::{DeserializeFieldSnafu, Result, SerializeFieldSnafu, UnsupportedOperationSnafu};
use crate::memtable::key_values::KeyValue;
use crate::memtable::partition_tree::SparsePrimaryKeyFilter;
use crate::row_converter::dense::SortField;
Expand Down Expand Up @@ -226,7 +226,10 @@ impl SparsePrimaryKeyCodec {

impl PrimaryKeyCodec for SparsePrimaryKeyCodec {
fn encode_key_value(&self, _key_value: &KeyValue, _buffer: &mut Vec<u8>) -> Result<()> {
todo!()
UnsupportedOperationSnafu {
err_msg: "The encode_key_value method is not supported in SparsePrimaryKeyCodec.",
}
.fail()
}

fn encode_values(&self, values: &[(ColumnId, Value)], buffer: &mut Vec<u8>) -> Result<()> {
Expand All @@ -245,8 +248,8 @@ impl PrimaryKeyCodec for SparsePrimaryKeyCodec {
None
}

fn num_fields(&self) -> usize {
todo!()
fn num_fields(&self) -> Option<usize> {
None
}

fn encoding(&self) -> PrimaryKeyEncoding {
Expand Down
5 changes: 4 additions & 1 deletion src/mito2/src/sst/index/bloom_filter/creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,10 @@ impl BloomFilterIndexer {
return Ok(None);
}

let codec = IndexValuesCodec::from_tag_columns(metadata.primary_key_columns());
let codec = IndexValuesCodec::from_tag_columns(
metadata.primary_key_encoding,
metadata.primary_key_columns(),
);
let indexer = Self {
creators,
temp_file_provider,
Expand Down
85 changes: 43 additions & 42 deletions src/mito2/src/sst/index/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,20 @@
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;

use datatypes::data_type::ConcreteDataType;
use datatypes::value::ValueRef;
use memcomparable::Serializer;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::ColumnMetadata;
use store_api::storage::ColumnId;

use crate::error::{FieldTypeMismatchSnafu, IndexEncodeNullSnafu, Result};
use crate::row_converter::{CompositeValues, DensePrimaryKeyCodec, PrimaryKeyCodec, SortField};
use crate::row_converter::{
build_primary_key_codec_with_fields, CompositeValues, PrimaryKeyCodec, SortField,
};

/// Encodes index values according to their data types for sorting and storage use.
pub struct IndexValueCodec;
Expand Down Expand Up @@ -68,12 +72,15 @@ pub struct IndexValuesCodec {
/// The data types of tag columns.
fields: Vec<(ColumnId, SortField)>,
/// The decoder for the primary key.
decoder: DensePrimaryKeyCodec,
decoder: Arc<dyn PrimaryKeyCodec>,
}

impl IndexValuesCodec {
/// Creates a new `IndexValuesCodec` from a list of `ColumnMetadata` of tag columns.
pub fn from_tag_columns<'a>(tag_columns: impl Iterator<Item = &'a ColumnMetadata>) -> Self {
pub fn from_tag_columns<'a>(
primary_key_encoding: PrimaryKeyEncoding,
tag_columns: impl Iterator<Item = &'a ColumnMetadata>,
) -> Self {
let (column_ids, fields): (Vec<_>, Vec<_>) = tag_columns
.map(|column| {
(
Expand All @@ -87,8 +94,9 @@ impl IndexValuesCodec {
.unzip();

let column_ids = column_ids.into_iter().collect();
let decoder =
build_primary_key_codec_with_fields(primary_key_encoding, fields.clone().into_iter());

let decoder = DensePrimaryKeyCodec::with_fields(fields.clone());
Self {
column_ids,
fields,
Expand All @@ -115,10 +123,13 @@ impl IndexValuesCodec {
#[cfg(test)]
mod tests {
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use datatypes::value::Value;
use store_api::metadata::ColumnMetadata;

use super::*;
use crate::error::Error;
use crate::row_converter::SortField;
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SortField};

#[test]
fn test_encode_value_basic() {
Expand Down Expand Up @@ -152,42 +163,32 @@ mod tests {

#[test]
fn test_decode_primary_key_basic() {
// TODO(weny, zhenchi): rewrite the test.
// let tag_columns = vec![
// ColumnMetadata {
// column_schema: ColumnSchema::new("tag0", ConcreteDataType::string_datatype(), true),
// semantic_type: api::v1::SemanticType::Tag,
// column_id: 1,
// },
// ColumnMetadata {
// column_schema: ColumnSchema::new("tag1", ConcreteDataType::int64_datatype(), false),
// semantic_type: api::v1::SemanticType::Tag,
// column_id: 2,
// },
// ];

// let primary_key = DensePrimaryKeyCodec::with_fields(vec![
// (0, SortField::new(ConcreteDataType::string_datatype())),
// (1, SortField::new(ConcreteDataType::int64_datatype())),
// ])
// .encode([ValueRef::Null, ValueRef::Int64(10)].into_iter())
// .unwrap();

// let codec = IndexValuesCodec::from_tag_columns(tag_columns.iter());
// let mut iter = codec.decode(&primary_key).unwrap();

// let ((column_id, col_id_str), field, value) = iter.next().unwrap();
// assert_eq!(*column_id, 1);
// assert_eq!(col_id_str, "1");
// assert_eq!(field, &SortField::new(ConcreteDataType::string_datatype()));
// assert_eq!(value, None);

// let ((column_id, col_id_str), field, value) = iter.next().unwrap();
// assert_eq!(*column_id, 2);
// assert_eq!(col_id_str, "2");
// assert_eq!(field, &SortField::new(ConcreteDataType::int64_datatype()));
// assert_eq!(value, Some(Value::Int64(10)));

// assert!(iter.next().is_none());
let tag_columns = vec![
ColumnMetadata {
column_schema: ColumnSchema::new("tag0", ConcreteDataType::string_datatype(), true),
semantic_type: api::v1::SemanticType::Tag,
column_id: 1,
},
ColumnMetadata {
column_schema: ColumnSchema::new("tag1", ConcreteDataType::int64_datatype(), false),
semantic_type: api::v1::SemanticType::Tag,
column_id: 2,
},
];

let primary_key = DensePrimaryKeyCodec::with_fields(vec![
(0, SortField::new(ConcreteDataType::string_datatype())),
(1, SortField::new(ConcreteDataType::int64_datatype())),
])
.encode([ValueRef::Null, ValueRef::Int64(10)].into_iter())
.unwrap();

let codec =
IndexValuesCodec::from_tag_columns(PrimaryKeyEncoding::Dense, tag_columns.iter());
let values = codec.decode(&primary_key).unwrap().into_dense();

assert_eq!(values.len(), 2);
assert_eq!(values[0], Value::Null);
assert_eq!(values[1], Value::Int64(10));
}
}
5 changes: 4 additions & 1 deletion src/mito2/src/sst/index/inverted_index/creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,10 @@ impl InvertedIndexer {
);
let index_creator = Box::new(SortIndexCreator::new(sorter, segment_row_count));

let codec = IndexValuesCodec::from_tag_columns(metadata.primary_key_columns());
let codec = IndexValuesCodec::from_tag_columns(
metadata.primary_key_encoding,
metadata.primary_key_columns(),
);
Self {
codec,
index_creator,
Expand Down
9 changes: 3 additions & 6 deletions src/mito2/src/sst/parquet/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ use datatypes::vectors::{Helper, Vector};
use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
use parquet::file::statistics::Statistics;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::{ColumnMetadata, RegionMetadataRef};
use store_api::storage::{ColumnId, SequenceNumber};

Expand Down Expand Up @@ -308,9 +307,8 @@ impl ReadFormat {
column_id: ColumnId,
) -> Option<ArrayRef> {
let column = self.metadata.column_by_id(column_id)?;
let primary_key_encoding = self.metadata.primary_key_encoding;
match column.semantic_type {
SemanticType::Tag => self.tag_values(row_groups, column, true, primary_key_encoding),
SemanticType::Tag => self.tag_values(row_groups, column, true),
SemanticType::Field => {
let index = self.field_id_to_index.get(&column_id)?;
Self::column_values(row_groups, column, *index, true)
Expand All @@ -329,9 +327,8 @@ impl ReadFormat {
column_id: ColumnId,
) -> Option<ArrayRef> {
let column = self.metadata.column_by_id(column_id)?;
let primary_key_encoding = self.metadata.primary_key_encoding;
match column.semantic_type {
SemanticType::Tag => self.tag_values(row_groups, column, false, primary_key_encoding),
SemanticType::Tag => self.tag_values(row_groups, column, false),
SemanticType::Field => {
let index = self.field_id_to_index.get(&column_id)?;
Self::column_values(row_groups, column, *index, false)
Expand Down Expand Up @@ -393,8 +390,8 @@ impl ReadFormat {
row_groups: &[impl Borrow<RowGroupMetaData>],
column: &ColumnMetadata,
is_min: bool,
primary_key_encoding: PrimaryKeyEncoding,
) -> Option<ArrayRef> {
let primary_key_encoding = self.metadata.primary_key_encoding;
let is_first_tag = self
.metadata
.primary_key
Expand Down

0 comments on commit b19fae7

Please sign in to comment.