From ad4c63df6a083f2f42154e8cf44325d78f39f119 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E6=9E=97=E4=BC=9F?= Date: Thu, 1 Feb 2024 15:49:21 +0800 Subject: [PATCH] Build join schema --- Cargo.lock | 12 ++++++ bustubx/Cargo.toml | 3 +- bustubx/src/catalog/column.rs | 11 +++-- bustubx/src/common/table_ref.rs | 38 ++++++++++++++++- bustubx/src/planner/logical_plan_v2/join.rs | 2 + bustubx/src/planner/logical_plan_v2/mod.rs | 21 ++++++++++ .../src/planner/logical_plan_v2/table_scan.rs | 3 +- bustubx/src/planner/logical_plan_v2/util.rs | 42 +++++++++++++++++++ .../planner/logical_planner/plan_set_expr.rs | 24 +++++++++-- 9 files changed, 146 insertions(+), 10 deletions(-) create mode 100644 bustubx/src/planner/logical_plan_v2/util.rs diff --git a/Cargo.lock b/Cargo.lock index 3f1d100..6d91964 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -103,6 +103,7 @@ version = "0.1.0-alpha.1" dependencies = [ "comfy-table", "derive-new", + "derive-with", "itertools 0.11.0", "lazy_static", "petgraph", @@ -253,6 +254,17 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "derive-with" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "841ea25b31404c50f2ddc92e028984a42d0fc818c10afee0b1fbda27c995f028" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.48", +] + [[package]] name = "digest" version = "0.10.7" diff --git a/bustubx/Cargo.toml b/bustubx/Cargo.toml index 7b4a1f8..e53556c 100644 --- a/bustubx/Cargo.toml +++ b/bustubx/Cargo.toml @@ -22,4 +22,5 @@ tracing = "0.1" tracing-subscriber = "0.3" tracing-chrome = "0.7.1" thiserror = "1.0.56" -tempfile = "3" \ No newline at end of file +tempfile = "3" +derive-with = "0.5.0" \ No newline at end of file diff --git a/bustubx/src/catalog/column.rs b/bustubx/src/catalog/column.rs index 93812c7..74f3532 100644 --- a/bustubx/src/catalog/column.rs +++ b/bustubx/src/catalog/column.rs @@ -1,3 +1,4 @@ +use derive_with::With; use sqlparser::ast::ColumnDef; use std::sync::Arc; @@ -5,11 +6,11 @@ use crate::catalog::DataType; pub type ColumnRef = Arc; -// 列定义 -#[derive(Debug, Clone)] +#[derive(Debug, Clone, With)] pub struct Column { pub name: String, pub data_type: DataType, + pub nullable: bool, } impl PartialEq for Column { @@ -20,7 +21,11 @@ impl PartialEq for Column { impl Column { pub fn new(name: String, data_type: DataType) -> Self { - Self { name, data_type } + Self { + name, + data_type, + nullable: false, + } } pub fn from_sqlparser_column(column_def: &ColumnDef) -> Self { diff --git a/bustubx/src/common/table_ref.rs b/bustubx/src/common/table_ref.rs index 7b9cc08..af082da 100644 --- a/bustubx/src/common/table_ref.rs +++ b/bustubx/src/common/table_ref.rs @@ -1,5 +1,3 @@ -use std::borrow::Cow; - #[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] pub enum TableReference { /// An unqualified table reference, e.g. "table" @@ -41,4 +39,40 @@ impl TableReference { table, } } + + pub fn table(&self) -> &str { + match self { + Self::Full { table, .. } | Self::Partial { table, .. } | Self::Bare { table } => table, + } + } + + pub fn schema(&self) -> Option<&str> { + match self { + Self::Full { schema, .. } | Self::Partial { schema, .. } => Some(schema), + _ => None, + } + } + + pub fn catalog(&self) -> Option<&str> { + match self { + Self::Full { catalog, .. } => Some(catalog), + _ => None, + } + } +} + +impl std::fmt::Display for TableReference { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + TableReference::Bare { table } => write!(f, "{table}"), + TableReference::Partial { schema, table } => { + write!(f, "{schema}.{table}") + } + TableReference::Full { + catalog, + schema, + table, + } => write!(f, "{catalog}.{schema}.{table}"), + } + } } diff --git a/bustubx/src/planner/logical_plan_v2/join.rs b/bustubx/src/planner/logical_plan_v2/join.rs index 31ee121..8a25e24 100644 --- a/bustubx/src/planner/logical_plan_v2/join.rs +++ b/bustubx/src/planner/logical_plan_v2/join.rs @@ -1,3 +1,4 @@ +use crate::catalog::SchemaRef; use crate::expression::Expr; use crate::planner::logical_plan_v2::LogicalPlanV2; use crate::planner::table_ref::join::JoinType; @@ -11,4 +12,5 @@ pub struct Join { pub right: Arc, pub join_type: JoinType, pub condition: Option, + pub schema: SchemaRef, } diff --git a/bustubx/src/planner/logical_plan_v2/mod.rs b/bustubx/src/planner/logical_plan_v2/mod.rs index ebfeffe..0e9d1f7 100644 --- a/bustubx/src/planner/logical_plan_v2/mod.rs +++ b/bustubx/src/planner/logical_plan_v2/mod.rs @@ -8,8 +8,10 @@ mod limit; mod project; mod sort; mod table_scan; +mod util; mod values; +use crate::catalog::SchemaRef; pub use create_index::CreateIndex; pub use create_table::CreateTable; pub use empty_relation::EmptyRelation; @@ -20,6 +22,7 @@ pub use limit::Limit; pub use project::Project; pub use sort::{OrderByExpr, Sort}; pub use table_scan::TableScan; +pub use util::*; pub use values::Values; #[derive(Debug, Clone)] @@ -36,3 +39,21 @@ pub enum LogicalPlanV2 { Values(Values), EmptyRelation(EmptyRelation), } + +impl LogicalPlanV2 { + pub fn schema(&self) -> &SchemaRef { + match self { + LogicalPlanV2::CreateTable(_) => todo!(), + LogicalPlanV2::CreateIndex(_) => todo!(), + LogicalPlanV2::Filter(Filter { input, .. }) => input.schema(), + LogicalPlanV2::Insert(_) => todo!(), + LogicalPlanV2::Join(Join { schema, .. }) => schema, + LogicalPlanV2::Limit(Limit { input, .. }) => input.schema(), + LogicalPlanV2::Project(_) => todo!(), + LogicalPlanV2::TableScan(TableScan { schema, .. }) => schema, + LogicalPlanV2::Sort(Sort { input, .. }) => input.schema(), + LogicalPlanV2::Values(Values { schema, .. }) => schema, + LogicalPlanV2::EmptyRelation(EmptyRelation { schema, .. }) => schema, + } + } +} diff --git a/bustubx/src/planner/logical_plan_v2/table_scan.rs b/bustubx/src/planner/logical_plan_v2/table_scan.rs index 571b639..6111e3c 100644 --- a/bustubx/src/planner/logical_plan_v2/table_scan.rs +++ b/bustubx/src/planner/logical_plan_v2/table_scan.rs @@ -1,10 +1,11 @@ -use crate::catalog::ColumnRef; +use crate::catalog::{ColumnRef, SchemaRef}; use crate::common::table_ref::TableReference; use crate::expression::Expr; #[derive(derive_new::new, Debug, Clone)] pub struct TableScan { pub table_ref: TableReference, + pub schema: SchemaRef, pub filters: Vec, pub limit: Option, } diff --git a/bustubx/src/planner/logical_plan_v2/util.rs b/bustubx/src/planner/logical_plan_v2/util.rs new file mode 100644 index 0000000..e8ae524 --- /dev/null +++ b/bustubx/src/planner/logical_plan_v2/util.rs @@ -0,0 +1,42 @@ +use crate::catalog::{ColumnRef, Schema}; +use crate::planner::table_ref::join::JoinType; +use crate::BustubxResult; +use std::sync::Arc; + +pub fn build_join_schema( + left: &Schema, + right: &Schema, + join_type: JoinType, +) -> BustubxResult { + fn nullify_columns(columns: &[ColumnRef]) -> Vec { + columns + .iter() + .map(|f| Arc::new(f.as_ref().clone().with_nullable(true))) + .collect() + } + + let left_cols = &left.columns; + let right_cols = &right.columns; + + let columns: Vec = match join_type { + JoinType::Inner | JoinType::CrossJoin => { + left_cols.iter().chain(right_cols.iter()).cloned().collect() + } + JoinType::LeftOuter => left_cols + .iter() + .chain(&nullify_columns(right_cols)) + .cloned() + .collect(), + JoinType::RightOuter => nullify_columns(left_cols) + .iter() + .chain(right_cols.iter()) + .cloned() + .collect(), + JoinType::FullOuter => nullify_columns(left_cols) + .iter() + .chain(&nullify_columns(right_cols)) + .cloned() + .collect(), + }; + Ok(Schema { columns }) +} diff --git a/bustubx/src/planner/logical_planner/plan_set_expr.rs b/bustubx/src/planner/logical_planner/plan_set_expr.rs index 1ba6766..753d5b4 100644 --- a/bustubx/src/planner/logical_planner/plan_set_expr.rs +++ b/bustubx/src/planner/logical_planner/plan_set_expr.rs @@ -1,7 +1,7 @@ use crate::catalog::{Column, Schema}; use crate::expression::{Alias, Expr, ExprTrait}; use crate::planner::logical_plan_v2::{ - EmptyRelation, Filter, Join, LogicalPlanV2, Project, TableScan, Values, + build_join_schema, EmptyRelation, Filter, Join, LogicalPlanV2, Project, TableScan, Values, }; use crate::planner::table_ref::join::JoinType; use crate::planner::LogicalPlanner; @@ -149,11 +149,13 @@ impl LogicalPlanner<'_> { match constraint { sqlparser::ast::JoinConstraint::On(expr) => { let expr = self.plan_expr(expr)?; + let schema = Arc::new(build_join_schema(left.schema(), right.schema(), join_type)?); Ok(LogicalPlanV2::Join(Join { - left: Arc::new((left)), - right: Arc::new((right)), + left: Arc::new(left), + right: Arc::new(right), join_type, condition: Some(expr), + schema, })) } _ => Err(BustubxError::Plan(format!( @@ -168,11 +170,17 @@ impl LogicalPlanner<'_> { left: LogicalPlanV2, right: LogicalPlanV2, ) -> BustubxResult { + let schema = Arc::new(build_join_schema( + left.schema(), + right.schema(), + JoinType::CrossJoin, + )?); Ok(LogicalPlanV2::Join(Join { left: Arc::new(left), right: Arc::new(right), join_type: JoinType::CrossJoin, condition: None, + schema, })) } @@ -184,8 +192,18 @@ impl LogicalPlanner<'_> { sqlparser::ast::TableFactor::Table { name, alias, .. } => { // TODO handle alias let table_ref = self.plan_table_name(name)?; + // TODO get schema by full table name + let schema = self + .context + .catalog + .get_table_by_name(table_ref.table()) + .map_or( + Err(BustubxError::Plan(format!("table {} not found", table_ref))), + |info| Ok(info.schema.clone()), + )?; Ok(LogicalPlanV2::TableScan(TableScan { table_ref, + schema, filters: vec![], limit: None, }))