diff --git a/bustubx/src/execution/mod.rs b/bustubx/src/execution/mod.rs index 729fcc9..6a3eccd 100644 --- a/bustubx/src/execution/mod.rs +++ b/bustubx/src/execution/mod.rs @@ -2,15 +2,13 @@ use std::sync::Arc; use tracing::span; -use crate::{ - catalog::{Catalog, Schema}, - planner::physical_plan::PhysicalPlan, - storage::Tuple, -}; +use crate::catalog::SchemaRef; +use crate::{catalog::Catalog, planner::physical_plan::PhysicalPlan, storage::Tuple}; pub trait VolcanoExecutor { fn init(&self, context: &mut ExecutionContext); fn next(&self, context: &mut ExecutionContext) -> Option; + fn output_schema(&self) -> SchemaRef; } #[derive(derive_new::new)] diff --git a/bustubx/src/planner/physical_plan/create_index.rs b/bustubx/src/planner/physical_plan/create_index.rs index 0748c39..3999ec7 100644 --- a/bustubx/src/planner/physical_plan/create_index.rs +++ b/bustubx/src/planner/physical_plan/create_index.rs @@ -6,34 +6,14 @@ use crate::{ }; use std::sync::Arc; -#[derive(Debug)] +#[derive(Debug, derive_new::new)] pub struct PhysicalCreateIndex { pub index_name: String, pub table_name: String, pub table_schema: SchemaRef, pub key_attrs: Vec, } -impl PhysicalCreateIndex { - pub fn new( - index_name: String, - table_name: String, - table_schema: SchemaRef, - key_attrs: Vec, - ) -> Self { - Self { - index_name, - table_name, - table_schema, - key_attrs, - } - } - pub fn output_schema(&self) -> SchemaRef { - Arc::new(Schema::copy_schema( - self.table_schema.clone(), - &self.key_attrs, - )) - } -} + impl VolcanoExecutor for PhysicalCreateIndex { fn init(&self, context: &mut ExecutionContext) { println!("init create index executor"); @@ -46,4 +26,10 @@ impl VolcanoExecutor for PhysicalCreateIndex { ); None } + fn output_schema(&self) -> SchemaRef { + Arc::new(Schema::copy_schema( + self.table_schema.clone(), + &self.key_attrs, + )) + } } diff --git a/bustubx/src/planner/physical_plan/create_table.rs b/bustubx/src/planner/physical_plan/create_table.rs index 984e3cd..8ae600c 100644 --- a/bustubx/src/planner/physical_plan/create_table.rs +++ b/bustubx/src/planner/physical_plan/create_table.rs @@ -11,11 +11,7 @@ pub struct PhysicalCreateTable { pub table_name: String, pub schema: Schema, } -impl PhysicalCreateTable { - pub fn output_schema(&self) -> SchemaRef { - Arc::new(self.schema.clone()) - } -} + impl VolcanoExecutor for PhysicalCreateTable { fn init(&self, context: &mut ExecutionContext) { println!("init create table executor"); @@ -26,4 +22,7 @@ impl VolcanoExecutor for PhysicalCreateTable { .create_table(self.table_name.clone(), Arc::new(self.schema.clone())); None } + fn output_schema(&self) -> SchemaRef { + Arc::new(self.schema.clone()) + } } diff --git a/bustubx/src/planner/physical_plan/filter.rs b/bustubx/src/planner/physical_plan/filter.rs index edc607d..11076cc 100644 --- a/bustubx/src/planner/physical_plan/filter.rs +++ b/bustubx/src/planner/physical_plan/filter.rs @@ -2,7 +2,6 @@ use std::sync::Arc; use crate::catalog::SchemaRef; use crate::{ - catalog::Schema, common::ScalarValue, execution::{ExecutionContext, VolcanoExecutor}, planner::expr::Expr, @@ -16,11 +15,7 @@ pub struct PhysicalFilter { pub predicate: Expr, pub input: Arc, } -impl PhysicalFilter { - pub fn output_schema(&self) -> SchemaRef { - self.input.output_schema() - } -} + impl VolcanoExecutor for PhysicalFilter { fn init(&self, context: &mut ExecutionContext) { println!("init filter executor"); @@ -44,4 +39,8 @@ impl VolcanoExecutor for PhysicalFilter { } } } + + fn output_schema(&self) -> SchemaRef { + self.input.output_schema() + } } diff --git a/bustubx/src/planner/physical_plan/insert.rs b/bustubx/src/planner/physical_plan/insert.rs index a2d53e7..97b9041 100644 --- a/bustubx/src/planner/physical_plan/insert.rs +++ b/bustubx/src/planner/physical_plan/insert.rs @@ -27,12 +27,6 @@ impl PhysicalInsert { insert_rows: AtomicU32::new(0), } } - pub fn output_schema(&self) -> SchemaRef { - Arc::new(Schema::new(vec![Column::new( - "insert_rows".to_string(), - DataType::Int32, - )])) - } } impl VolcanoExecutor for PhysicalInsert { fn init(&self, context: &mut ExecutionContext) { @@ -77,4 +71,11 @@ impl VolcanoExecutor for PhysicalInsert { .fetch_add(1, std::sync::atomic::Ordering::SeqCst); } } + + fn output_schema(&self) -> SchemaRef { + Arc::new(Schema::new(vec![Column::new( + "insert_rows".to_string(), + DataType::Int32, + )])) + } } diff --git a/bustubx/src/planner/physical_plan/limit.rs b/bustubx/src/planner/physical_plan/limit.rs index 5d62012..e592e8f 100644 --- a/bustubx/src/planner/physical_plan/limit.rs +++ b/bustubx/src/planner/physical_plan/limit.rs @@ -26,9 +26,6 @@ impl PhysicalLimit { cursor: AtomicU32::new(0), } } - pub fn output_schema(&self) -> SchemaRef { - self.input.output_schema() - } } impl VolcanoExecutor for PhysicalLimit { fn init(&self, context: &mut ExecutionContext) { @@ -61,4 +58,8 @@ impl VolcanoExecutor for PhysicalLimit { } } } + + fn output_schema(&self) -> SchemaRef { + self.input.output_schema() + } } diff --git a/bustubx/src/planner/physical_plan/mod.rs b/bustubx/src/planner/physical_plan/mod.rs index 089c77c..6f76199 100644 --- a/bustubx/src/planner/physical_plan/mod.rs +++ b/bustubx/src/planner/physical_plan/mod.rs @@ -1,31 +1,33 @@ use std::sync::Arc; use crate::catalog::SchemaRef; -use crate::planner::logical_plan::LogicalPlan; -use crate::planner::operator::LogicalOperator; use crate::{ catalog::Schema, execution::{ExecutionContext, VolcanoExecutor}, storage::Tuple, }; -use self::{ - create_index::PhysicalCreateIndex, create_table::PhysicalCreateTable, filter::PhysicalFilter, - insert::PhysicalInsert, limit::PhysicalLimit, nested_loop_join::PhysicalNestedLoopJoin, - project::PhysicalProject, seq_scan::PhysicalSeqScan, sort::PhysicalSort, - values::PhysicalValues, -}; +mod create_index; +mod create_table; +mod filter; +mod insert; +mod limit; +mod nested_loop_join; +mod project; +mod seq_scan; +mod sort; +mod values; -pub mod create_index; -pub mod create_table; -pub mod filter; -pub mod insert; -pub mod limit; -pub mod nested_loop_join; -pub mod project; -pub mod seq_scan; -pub mod sort; -pub mod values; +pub use create_index::PhysicalCreateIndex; +pub use create_table::PhysicalCreateTable; +pub use filter::PhysicalFilter; +pub use insert::PhysicalInsert; +pub use limit::PhysicalLimit; +pub use nested_loop_join::PhysicalNestedLoopJoin; +pub use project::PhysicalProject; +pub use seq_scan::PhysicalSeqScan; +pub use sort::PhysicalSort; +pub use values::PhysicalValues; #[derive(Debug)] pub enum PhysicalPlan { @@ -41,110 +43,6 @@ pub enum PhysicalPlan { NestedLoopJoin(PhysicalNestedLoopJoin), Sort(PhysicalSort), } -impl PhysicalPlan { - pub fn output_schema(&self) -> SchemaRef { - match self { - Self::Dummy => Arc::new(Schema::new(vec![])), - Self::CreateTable(op) => op.output_schema(), - Self::CreateIndex(op) => op.output_schema(), - Self::Insert(op) => op.output_schema(), - Self::Values(op) => op.output_schema(), - Self::Project(op) => op.output_schema(), - Self::Filter(op) => op.output_schema(), - Self::TableScan(op) => op.output_schema(), - Self::Limit(op) => op.output_schema(), - Self::NestedLoopJoin(op) => op.output_schema(), - Self::Sort(op) => op.output_schema(), - } - } -} - -pub fn build_plan(logical_plan: Arc) -> PhysicalPlan { - let plan = match logical_plan.operator { - LogicalOperator::Dummy => PhysicalPlan::Dummy, - LogicalOperator::CreateTable(ref logic_create_table) => { - PhysicalPlan::CreateTable(PhysicalCreateTable::new( - logic_create_table.table_name.clone(), - logic_create_table.schema.clone(), - )) - } - LogicalOperator::CreateIndex(ref logic_create_index) => { - PhysicalPlan::CreateIndex(PhysicalCreateIndex::new( - logic_create_index.index_name.clone(), - logic_create_index.table_name.clone(), - logic_create_index.table_schema.clone(), - logic_create_index.key_attrs.clone(), - )) - } - LogicalOperator::Insert(ref logic_insert) => { - let child_logical_node = logical_plan.children[0].clone(); - let child_physical_node = build_plan(child_logical_node.clone()); - PhysicalPlan::Insert(PhysicalInsert::new( - logic_insert.table_name.clone(), - logic_insert.columns.clone(), - Arc::new(child_physical_node), - )) - } - LogicalOperator::Values(ref logical_values) => PhysicalPlan::Values(PhysicalValues::new( - logical_values.columns.clone(), - logical_values.tuples.clone(), - )), - LogicalOperator::Project(ref logical_project) => { - let child_logical_node = logical_plan.children[0].clone(); - let child_physical_node = build_plan(child_logical_node.clone()); - PhysicalPlan::Project(PhysicalProject::new( - logical_project.expressions.clone(), - Arc::new(child_physical_node), - )) - } - LogicalOperator::Filter(ref logical_filter) => { - // filter下只有一个子节点 - let child_logical_node = logical_plan.children[0].clone(); - let child_physical_node = build_plan(child_logical_node.clone()); - PhysicalPlan::Filter(PhysicalFilter::new( - logical_filter.predicate.clone(), - Arc::new(child_physical_node), - )) - } - LogicalOperator::Scan(ref logical_table_scan) => { - PhysicalPlan::TableScan(PhysicalSeqScan::new( - logical_table_scan.table_oid.clone(), - logical_table_scan.columns.clone(), - )) - } - LogicalOperator::Limit(ref logical_limit) => { - let child_logical_node = logical_plan.children[0].clone(); - let child_physical_node = build_plan(child_logical_node.clone()); - PhysicalPlan::Limit(PhysicalLimit::new( - logical_limit.limit, - logical_limit.offset, - Arc::new(child_physical_node), - )) - } - LogicalOperator::Join(ref logical_join) => { - let left_logical_node = logical_plan.children[0].clone(); - let left_physical_node = build_plan(left_logical_node.clone()); - let right_logical_node = logical_plan.children[1].clone(); - let right_physical_node = build_plan(right_logical_node.clone()); - PhysicalPlan::NestedLoopJoin(PhysicalNestedLoopJoin::new( - logical_join.join_type.clone(), - logical_join.condition.clone(), - Arc::new(left_physical_node), - Arc::new(right_physical_node), - )) - } - LogicalOperator::Sort(ref logical_sort) => { - let child_logical_node = logical_plan.children[0].clone(); - let child_physical_node = build_plan(child_logical_node.clone()); - PhysicalPlan::Sort(PhysicalSort::new( - logical_sort.order_bys.clone(), - Arc::new(child_physical_node), - )) - } - _ => unimplemented!(), - }; - plan -} impl VolcanoExecutor for PhysicalPlan { fn init(&self, context: &mut ExecutionContext) { @@ -178,4 +76,20 @@ impl VolcanoExecutor for PhysicalPlan { PhysicalPlan::Sort(op) => op.next(context), } } + + fn output_schema(&self) -> SchemaRef { + match self { + Self::Dummy => Arc::new(Schema::new(vec![])), + Self::CreateTable(op) => op.output_schema(), + Self::CreateIndex(op) => op.output_schema(), + Self::Insert(op) => op.output_schema(), + Self::Values(op) => op.output_schema(), + Self::Project(op) => op.output_schema(), + Self::Filter(op) => op.output_schema(), + Self::TableScan(op) => op.output_schema(), + Self::Limit(op) => op.output_schema(), + Self::NestedLoopJoin(op) => op.output_schema(), + Self::Sort(op) => op.output_schema(), + } + } } diff --git a/bustubx/src/planner/physical_plan/nested_loop_join.rs b/bustubx/src/planner/physical_plan/nested_loop_join.rs index dba931b..b218ce9 100644 --- a/bustubx/src/planner/physical_plan/nested_loop_join.rs +++ b/bustubx/src/planner/physical_plan/nested_loop_join.rs @@ -35,15 +35,6 @@ impl PhysicalNestedLoopJoin { left_tuple: Mutex::new(None), } } - pub fn output_schema(&self) -> SchemaRef { - Arc::new( - Schema::try_merge(vec![ - self.left_input.output_schema().as_ref().clone(), - self.right_input.output_schema().as_ref().clone(), - ]) - .unwrap(), - ) - } } impl VolcanoExecutor for PhysicalNestedLoopJoin { fn init(&self, context: &mut ExecutionContext) { @@ -100,4 +91,14 @@ impl VolcanoExecutor for PhysicalNestedLoopJoin { } return None; } + + fn output_schema(&self) -> SchemaRef { + Arc::new( + Schema::try_merge(vec![ + self.left_input.output_schema().as_ref().clone(), + self.right_input.output_schema().as_ref().clone(), + ]) + .unwrap(), + ) + } } diff --git a/bustubx/src/planner/physical_plan/project.rs b/bustubx/src/planner/physical_plan/project.rs index f07c886..c9d0906 100644 --- a/bustubx/src/planner/physical_plan/project.rs +++ b/bustubx/src/planner/physical_plan/project.rs @@ -15,12 +15,6 @@ pub struct PhysicalProject { pub expressions: Vec, pub input: Arc, } -impl PhysicalProject { - pub fn output_schema(&self) -> SchemaRef { - // TODO consider aggr/alias - self.input.output_schema() - } -} impl VolcanoExecutor for PhysicalProject { fn init(&self, context: &mut ExecutionContext) { println!("init project executor"); @@ -37,4 +31,9 @@ impl VolcanoExecutor for PhysicalProject { } return Some(Tuple::new(self.output_schema(), new_values)); } + + fn output_schema(&self) -> SchemaRef { + // TODO consider aggr/alias + self.input.output_schema() + } } diff --git a/bustubx/src/planner/physical_plan/seq_scan.rs b/bustubx/src/planner/physical_plan/seq_scan.rs index 9b0a081..2817bf2 100644 --- a/bustubx/src/planner/physical_plan/seq_scan.rs +++ b/bustubx/src/planner/physical_plan/seq_scan.rs @@ -22,11 +22,6 @@ impl PhysicalSeqScan { iterator: Mutex::new(TableIterator::new(None, None)), } } - pub fn output_schema(&self) -> SchemaRef { - Arc::new(Schema { - columns: self.columns.clone(), - }) - } } impl VolcanoExecutor for PhysicalSeqScan { fn init(&self, context: &mut ExecutionContext) { @@ -48,4 +43,10 @@ impl VolcanoExecutor for PhysicalSeqScan { let full_tuple = iterator.next(&mut table_info.table); return full_tuple.map(|t| t.1); } + + fn output_schema(&self) -> SchemaRef { + Arc::new(Schema { + columns: self.columns.clone(), + }) + } } diff --git a/bustubx/src/planner/physical_plan/sort.rs b/bustubx/src/planner/physical_plan/sort.rs index e860c74..3cc3dde 100644 --- a/bustubx/src/planner/physical_plan/sort.rs +++ b/bustubx/src/planner/physical_plan/sort.rs @@ -2,7 +2,6 @@ use std::sync::{atomic::AtomicU32, Arc, Mutex}; use crate::catalog::SchemaRef; use crate::{ - catalog::Schema, execution::{ExecutionContext, VolcanoExecutor}, planner::order_by::BoundOrderBy, storage::Tuple, @@ -27,9 +26,6 @@ impl PhysicalSort { cursor: AtomicU32::new(0), } } - pub fn output_schema(&self) -> SchemaRef { - self.input.output_schema() - } } impl VolcanoExecutor for PhysicalSort { fn init(&self, context: &mut ExecutionContext) { @@ -83,4 +79,8 @@ impl VolcanoExecutor for PhysicalSort { .get(cursor) .map(|t| t.clone()); } + + fn output_schema(&self) -> SchemaRef { + self.input.output_schema() + } } diff --git a/bustubx/src/planner/physical_plan/values.rs b/bustubx/src/planner/physical_plan/values.rs index 06e1e0a..ad5628f 100644 --- a/bustubx/src/planner/physical_plan/values.rs +++ b/bustubx/src/planner/physical_plan/values.rs @@ -24,11 +24,6 @@ impl PhysicalValues { cursor: AtomicU32::new(0), } } - pub fn output_schema(&self) -> SchemaRef { - Arc::new(Schema { - columns: self.columns.clone(), - }) - } } impl VolcanoExecutor for PhysicalValues { fn init(&self, context: &mut ExecutionContext) { @@ -46,4 +41,10 @@ impl VolcanoExecutor for PhysicalValues { return None; } } + + fn output_schema(&self) -> SchemaRef { + Arc::new(Schema { + columns: self.columns.clone(), + }) + } } diff --git a/bustubx/src/planner/physical_planner/physical_planner.rs b/bustubx/src/planner/physical_planner/physical_planner.rs index 2e221d1..3da2cb1 100644 --- a/bustubx/src/planner/physical_planner/physical_planner.rs +++ b/bustubx/src/planner/physical_planner/physical_planner.rs @@ -1,8 +1,19 @@ use std::sync::Arc; use crate::planner::logical_plan::LogicalPlan; +use crate::planner::operator::LogicalOperator; -use crate::planner::physical_plan::{build_plan, PhysicalPlan}; +use crate::planner::physical_plan::PhysicalCreateIndex; +use crate::planner::physical_plan::PhysicalCreateTable; +use crate::planner::physical_plan::PhysicalFilter; +use crate::planner::physical_plan::PhysicalInsert; +use crate::planner::physical_plan::PhysicalLimit; +use crate::planner::physical_plan::PhysicalNestedLoopJoin; +use crate::planner::physical_plan::PhysicalPlan; +use crate::planner::physical_plan::PhysicalProject; +use crate::planner::physical_plan::PhysicalSeqScan; +use crate::planner::physical_plan::PhysicalSort; +use crate::planner::physical_plan::PhysicalValues; pub struct PhysicalPlanner; @@ -16,3 +27,90 @@ impl PhysicalPlanner { build_plan(logical_plan.clone()) } } + +pub fn build_plan(logical_plan: Arc) -> PhysicalPlan { + let plan = match logical_plan.operator { + LogicalOperator::Dummy => PhysicalPlan::Dummy, + LogicalOperator::CreateTable(ref logic_create_table) => { + PhysicalPlan::CreateTable(PhysicalCreateTable::new( + logic_create_table.table_name.clone(), + logic_create_table.schema.clone(), + )) + } + LogicalOperator::CreateIndex(ref logic_create_index) => { + PhysicalPlan::CreateIndex(PhysicalCreateIndex::new( + logic_create_index.index_name.clone(), + logic_create_index.table_name.clone(), + logic_create_index.table_schema.clone(), + logic_create_index.key_attrs.clone(), + )) + } + LogicalOperator::Insert(ref logic_insert) => { + let child_logical_node = logical_plan.children[0].clone(); + let child_physical_node = build_plan(child_logical_node.clone()); + PhysicalPlan::Insert(PhysicalInsert::new( + logic_insert.table_name.clone(), + logic_insert.columns.clone(), + Arc::new(child_physical_node), + )) + } + LogicalOperator::Values(ref logical_values) => PhysicalPlan::Values(PhysicalValues::new( + logical_values.columns.clone(), + logical_values.tuples.clone(), + )), + LogicalOperator::Project(ref logical_project) => { + let child_logical_node = logical_plan.children[0].clone(); + let child_physical_node = build_plan(child_logical_node.clone()); + PhysicalPlan::Project(PhysicalProject::new( + logical_project.expressions.clone(), + Arc::new(child_physical_node), + )) + } + LogicalOperator::Filter(ref logical_filter) => { + // filter下只有一个子节点 + let child_logical_node = logical_plan.children[0].clone(); + let child_physical_node = build_plan(child_logical_node.clone()); + PhysicalPlan::Filter(PhysicalFilter::new( + logical_filter.predicate.clone(), + Arc::new(child_physical_node), + )) + } + LogicalOperator::Scan(ref logical_table_scan) => { + PhysicalPlan::TableScan(PhysicalSeqScan::new( + logical_table_scan.table_oid.clone(), + logical_table_scan.columns.clone(), + )) + } + LogicalOperator::Limit(ref logical_limit) => { + let child_logical_node = logical_plan.children[0].clone(); + let child_physical_node = build_plan(child_logical_node.clone()); + PhysicalPlan::Limit(PhysicalLimit::new( + logical_limit.limit, + logical_limit.offset, + Arc::new(child_physical_node), + )) + } + LogicalOperator::Join(ref logical_join) => { + let left_logical_node = logical_plan.children[0].clone(); + let left_physical_node = build_plan(left_logical_node.clone()); + let right_logical_node = logical_plan.children[1].clone(); + let right_physical_node = build_plan(right_logical_node.clone()); + PhysicalPlan::NestedLoopJoin(PhysicalNestedLoopJoin::new( + logical_join.join_type.clone(), + logical_join.condition.clone(), + Arc::new(left_physical_node), + Arc::new(right_physical_node), + )) + } + LogicalOperator::Sort(ref logical_sort) => { + let child_logical_node = logical_plan.children[0].clone(); + let child_physical_node = build_plan(child_logical_node.clone()); + PhysicalPlan::Sort(PhysicalSort::new( + logical_sort.order_bys.clone(), + Arc::new(child_physical_node), + )) + } + _ => unimplemented!(), + }; + plan +}