Skip to content

Commit

Permalink
Add output_schema to VolcanoExecutor
Browse files Browse the repository at this point in the history
  • Loading branch information
lewiszlw committed Jan 30, 2024
1 parent 0cc4d07 commit 83d329c
Show file tree
Hide file tree
Showing 13 changed files with 197 additions and 199 deletions.
8 changes: 3 additions & 5 deletions bustubx/src/execution/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,13 @@ use std::sync::Arc;

use tracing::span;

use crate::{
catalog::{Catalog, Schema},
planner::physical_plan::PhysicalPlan,
storage::Tuple,
};
use crate::catalog::SchemaRef;
use crate::{catalog::Catalog, planner::physical_plan::PhysicalPlan, storage::Tuple};

pub trait VolcanoExecutor {
fn init(&self, context: &mut ExecutionContext);
fn next(&self, context: &mut ExecutionContext) -> Option<Tuple>;
fn output_schema(&self) -> SchemaRef;
}

#[derive(derive_new::new)]
Expand Down
30 changes: 8 additions & 22 deletions bustubx/src/planner/physical_plan/create_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,34 +6,14 @@ use crate::{
};
use std::sync::Arc;

#[derive(Debug)]
#[derive(Debug, derive_new::new)]
pub struct PhysicalCreateIndex {
pub index_name: String,
pub table_name: String,
pub table_schema: SchemaRef,
pub key_attrs: Vec<u32>,
}
impl PhysicalCreateIndex {
pub fn new(
index_name: String,
table_name: String,
table_schema: SchemaRef,
key_attrs: Vec<u32>,
) -> Self {
Self {
index_name,
table_name,
table_schema,
key_attrs,
}
}
pub fn output_schema(&self) -> SchemaRef {
Arc::new(Schema::copy_schema(
self.table_schema.clone(),
&self.key_attrs,
))
}
}

impl VolcanoExecutor for PhysicalCreateIndex {
fn init(&self, context: &mut ExecutionContext) {
println!("init create index executor");
Expand All @@ -46,4 +26,10 @@ impl VolcanoExecutor for PhysicalCreateIndex {
);
None
}
fn output_schema(&self) -> SchemaRef {
Arc::new(Schema::copy_schema(
self.table_schema.clone(),
&self.key_attrs,
))
}
}
9 changes: 4 additions & 5 deletions bustubx/src/planner/physical_plan/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,7 @@ pub struct PhysicalCreateTable {
pub table_name: String,
pub schema: Schema,
}
impl PhysicalCreateTable {
pub fn output_schema(&self) -> SchemaRef {
Arc::new(self.schema.clone())
}
}

impl VolcanoExecutor for PhysicalCreateTable {
fn init(&self, context: &mut ExecutionContext) {
println!("init create table executor");
Expand All @@ -26,4 +22,7 @@ impl VolcanoExecutor for PhysicalCreateTable {
.create_table(self.table_name.clone(), Arc::new(self.schema.clone()));
None
}
fn output_schema(&self) -> SchemaRef {
Arc::new(self.schema.clone())
}
}
11 changes: 5 additions & 6 deletions bustubx/src/planner/physical_plan/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::sync::Arc;

use crate::catalog::SchemaRef;
use crate::{
catalog::Schema,
common::ScalarValue,
execution::{ExecutionContext, VolcanoExecutor},
planner::expr::Expr,
Expand All @@ -16,11 +15,7 @@ pub struct PhysicalFilter {
pub predicate: Expr,
pub input: Arc<PhysicalPlan>,
}
impl PhysicalFilter {
pub fn output_schema(&self) -> SchemaRef {
self.input.output_schema()
}
}

impl VolcanoExecutor for PhysicalFilter {
fn init(&self, context: &mut ExecutionContext) {
println!("init filter executor");
Expand All @@ -44,4 +39,8 @@ impl VolcanoExecutor for PhysicalFilter {
}
}
}

fn output_schema(&self) -> SchemaRef {
self.input.output_schema()
}
}
13 changes: 7 additions & 6 deletions bustubx/src/planner/physical_plan/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,6 @@ impl PhysicalInsert {
insert_rows: AtomicU32::new(0),
}
}
pub fn output_schema(&self) -> SchemaRef {
Arc::new(Schema::new(vec![Column::new(
"insert_rows".to_string(),
DataType::Int32,
)]))
}
}
impl VolcanoExecutor for PhysicalInsert {
fn init(&self, context: &mut ExecutionContext) {
Expand Down Expand Up @@ -77,4 +71,11 @@ impl VolcanoExecutor for PhysicalInsert {
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
}
}

fn output_schema(&self) -> SchemaRef {
Arc::new(Schema::new(vec![Column::new(
"insert_rows".to_string(),
DataType::Int32,
)]))
}
}
7 changes: 4 additions & 3 deletions bustubx/src/planner/physical_plan/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@ impl PhysicalLimit {
cursor: AtomicU32::new(0),
}
}
pub fn output_schema(&self) -> SchemaRef {
self.input.output_schema()
}
}
impl VolcanoExecutor for PhysicalLimit {
fn init(&self, context: &mut ExecutionContext) {
Expand Down Expand Up @@ -61,4 +58,8 @@ impl VolcanoExecutor for PhysicalLimit {
}
}
}

fn output_schema(&self) -> SchemaRef {
self.input.output_schema()
}
}
158 changes: 36 additions & 122 deletions bustubx/src/planner/physical_plan/mod.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,33 @@
use std::sync::Arc;

use crate::catalog::SchemaRef;
use crate::planner::logical_plan::LogicalPlan;
use crate::planner::operator::LogicalOperator;
use crate::{
catalog::Schema,
execution::{ExecutionContext, VolcanoExecutor},
storage::Tuple,
};

use self::{
create_index::PhysicalCreateIndex, create_table::PhysicalCreateTable, filter::PhysicalFilter,
insert::PhysicalInsert, limit::PhysicalLimit, nested_loop_join::PhysicalNestedLoopJoin,
project::PhysicalProject, seq_scan::PhysicalSeqScan, sort::PhysicalSort,
values::PhysicalValues,
};
mod create_index;
mod create_table;
mod filter;
mod insert;
mod limit;
mod nested_loop_join;
mod project;
mod seq_scan;
mod sort;
mod values;

pub mod create_index;
pub mod create_table;
pub mod filter;
pub mod insert;
pub mod limit;
pub mod nested_loop_join;
pub mod project;
pub mod seq_scan;
pub mod sort;
pub mod values;
pub use create_index::PhysicalCreateIndex;
pub use create_table::PhysicalCreateTable;
pub use filter::PhysicalFilter;
pub use insert::PhysicalInsert;
pub use limit::PhysicalLimit;
pub use nested_loop_join::PhysicalNestedLoopJoin;
pub use project::PhysicalProject;
pub use seq_scan::PhysicalSeqScan;
pub use sort::PhysicalSort;
pub use values::PhysicalValues;

#[derive(Debug)]
pub enum PhysicalPlan {
Expand All @@ -41,110 +43,6 @@ pub enum PhysicalPlan {
NestedLoopJoin(PhysicalNestedLoopJoin),
Sort(PhysicalSort),
}
impl PhysicalPlan {
pub fn output_schema(&self) -> SchemaRef {
match self {
Self::Dummy => Arc::new(Schema::new(vec![])),
Self::CreateTable(op) => op.output_schema(),
Self::CreateIndex(op) => op.output_schema(),
Self::Insert(op) => op.output_schema(),
Self::Values(op) => op.output_schema(),
Self::Project(op) => op.output_schema(),
Self::Filter(op) => op.output_schema(),
Self::TableScan(op) => op.output_schema(),
Self::Limit(op) => op.output_schema(),
Self::NestedLoopJoin(op) => op.output_schema(),
Self::Sort(op) => op.output_schema(),
}
}
}

pub fn build_plan(logical_plan: Arc<LogicalPlan>) -> PhysicalPlan {
let plan = match logical_plan.operator {
LogicalOperator::Dummy => PhysicalPlan::Dummy,
LogicalOperator::CreateTable(ref logic_create_table) => {
PhysicalPlan::CreateTable(PhysicalCreateTable::new(
logic_create_table.table_name.clone(),
logic_create_table.schema.clone(),
))
}
LogicalOperator::CreateIndex(ref logic_create_index) => {
PhysicalPlan::CreateIndex(PhysicalCreateIndex::new(
logic_create_index.index_name.clone(),
logic_create_index.table_name.clone(),
logic_create_index.table_schema.clone(),
logic_create_index.key_attrs.clone(),
))
}
LogicalOperator::Insert(ref logic_insert) => {
let child_logical_node = logical_plan.children[0].clone();
let child_physical_node = build_plan(child_logical_node.clone());
PhysicalPlan::Insert(PhysicalInsert::new(
logic_insert.table_name.clone(),
logic_insert.columns.clone(),
Arc::new(child_physical_node),
))
}
LogicalOperator::Values(ref logical_values) => PhysicalPlan::Values(PhysicalValues::new(
logical_values.columns.clone(),
logical_values.tuples.clone(),
)),
LogicalOperator::Project(ref logical_project) => {
let child_logical_node = logical_plan.children[0].clone();
let child_physical_node = build_plan(child_logical_node.clone());
PhysicalPlan::Project(PhysicalProject::new(
logical_project.expressions.clone(),
Arc::new(child_physical_node),
))
}
LogicalOperator::Filter(ref logical_filter) => {
// filter下只有一个子节点
let child_logical_node = logical_plan.children[0].clone();
let child_physical_node = build_plan(child_logical_node.clone());
PhysicalPlan::Filter(PhysicalFilter::new(
logical_filter.predicate.clone(),
Arc::new(child_physical_node),
))
}
LogicalOperator::Scan(ref logical_table_scan) => {
PhysicalPlan::TableScan(PhysicalSeqScan::new(
logical_table_scan.table_oid.clone(),
logical_table_scan.columns.clone(),
))
}
LogicalOperator::Limit(ref logical_limit) => {
let child_logical_node = logical_plan.children[0].clone();
let child_physical_node = build_plan(child_logical_node.clone());
PhysicalPlan::Limit(PhysicalLimit::new(
logical_limit.limit,
logical_limit.offset,
Arc::new(child_physical_node),
))
}
LogicalOperator::Join(ref logical_join) => {
let left_logical_node = logical_plan.children[0].clone();
let left_physical_node = build_plan(left_logical_node.clone());
let right_logical_node = logical_plan.children[1].clone();
let right_physical_node = build_plan(right_logical_node.clone());
PhysicalPlan::NestedLoopJoin(PhysicalNestedLoopJoin::new(
logical_join.join_type.clone(),
logical_join.condition.clone(),
Arc::new(left_physical_node),
Arc::new(right_physical_node),
))
}
LogicalOperator::Sort(ref logical_sort) => {
let child_logical_node = logical_plan.children[0].clone();
let child_physical_node = build_plan(child_logical_node.clone());
PhysicalPlan::Sort(PhysicalSort::new(
logical_sort.order_bys.clone(),
Arc::new(child_physical_node),
))
}
_ => unimplemented!(),
};
plan
}

impl VolcanoExecutor for PhysicalPlan {
fn init(&self, context: &mut ExecutionContext) {
Expand Down Expand Up @@ -178,4 +76,20 @@ impl VolcanoExecutor for PhysicalPlan {
PhysicalPlan::Sort(op) => op.next(context),
}
}

fn output_schema(&self) -> SchemaRef {
match self {
Self::Dummy => Arc::new(Schema::new(vec![])),
Self::CreateTable(op) => op.output_schema(),
Self::CreateIndex(op) => op.output_schema(),
Self::Insert(op) => op.output_schema(),
Self::Values(op) => op.output_schema(),
Self::Project(op) => op.output_schema(),
Self::Filter(op) => op.output_schema(),
Self::TableScan(op) => op.output_schema(),
Self::Limit(op) => op.output_schema(),
Self::NestedLoopJoin(op) => op.output_schema(),
Self::Sort(op) => op.output_schema(),
}
}
}
19 changes: 10 additions & 9 deletions bustubx/src/planner/physical_plan/nested_loop_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,6 @@ impl PhysicalNestedLoopJoin {
left_tuple: Mutex::new(None),
}
}
pub fn output_schema(&self) -> SchemaRef {
Arc::new(
Schema::try_merge(vec![
self.left_input.output_schema().as_ref().clone(),
self.right_input.output_schema().as_ref().clone(),
])
.unwrap(),
)
}
}
impl VolcanoExecutor for PhysicalNestedLoopJoin {
fn init(&self, context: &mut ExecutionContext) {
Expand Down Expand Up @@ -100,4 +91,14 @@ impl VolcanoExecutor for PhysicalNestedLoopJoin {
}
return None;
}

fn output_schema(&self) -> SchemaRef {
Arc::new(
Schema::try_merge(vec![
self.left_input.output_schema().as_ref().clone(),
self.right_input.output_schema().as_ref().clone(),
])
.unwrap(),
)
}
}
11 changes: 5 additions & 6 deletions bustubx/src/planner/physical_plan/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,6 @@ pub struct PhysicalProject {
pub expressions: Vec<Expr>,
pub input: Arc<PhysicalPlan>,
}
impl PhysicalProject {
pub fn output_schema(&self) -> SchemaRef {
// TODO consider aggr/alias
self.input.output_schema()
}
}
impl VolcanoExecutor for PhysicalProject {
fn init(&self, context: &mut ExecutionContext) {
println!("init project executor");
Expand All @@ -37,4 +31,9 @@ impl VolcanoExecutor for PhysicalProject {
}
return Some(Tuple::new(self.output_schema(), new_values));
}

fn output_schema(&self) -> SchemaRef {
// TODO consider aggr/alias
self.input.output_schema()
}
}
Loading

0 comments on commit 83d329c

Please sign in to comment.