Skip to content

Commit

Permalink
feat: Plan with types
Browse files Browse the repository at this point in the history
  • Loading branch information
discord9 committed May 14, 2024
1 parent f16ce3c commit cb46f27
Show file tree
Hide file tree
Showing 11 changed files with 304 additions and 121 deletions.
17 changes: 17 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/flow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ tonic.workspace = true
[dev-dependencies]
catalog.workspace = true
common-catalog.workspace = true
pretty_assertions = "1.4.0"
prost.workspace = true
query.workspace = true
serde_json = "1.0"
Expand Down
10 changes: 5 additions & 5 deletions src/flow/src/compute/render.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use crate::expr::error::{DataTypeSnafu, InternalSnafu};
use crate::expr::{
self, EvalError, GlobalId, LocalId, MapFilterProject, MfpPlan, SafeMfpPlan, ScalarExpr,
};
use crate::plan::{AccumulablePlan, KeyValPlan, Plan, ReducePlan};
use crate::plan::{AccumulablePlan, KeyValPlan, Plan, ReducePlan, TypedPlan};
use crate::repr::{self, DiffRow, KeyValDiffRow, Row};
use crate::utils::{ArrangeHandler, ArrangeReader, ArrangeWriter, Arrangement};

Expand Down Expand Up @@ -101,8 +101,8 @@ impl<'referred, 'df> Context<'referred, 'df> {
/// Interpret and execute plan
///
/// return the output of this plan
pub fn render_plan(&mut self, plan: Plan) -> Result<CollectionBundle, Error> {
match plan {
pub fn render_plan(&mut self, plan: TypedPlan) -> Result<CollectionBundle, Error> {
match plan.plan {
Plan::Constant { rows } => Ok(self.render_constant(rows)),
Plan::Get { id } => self.get_by_id(id),
Plan::Let { id, value, body } => self.eval_let(id, value, body),
Expand Down Expand Up @@ -193,8 +193,8 @@ impl<'referred, 'df> Context<'referred, 'df> {
pub fn eval_let(
&mut self,
id: LocalId,
value: Box<Plan>,
body: Box<Plan>,
value: Box<TypedPlan>,
body: Box<TypedPlan>,
) -> Result<CollectionBundle, Error> {
let value = self.render_plan(*value)?;

Expand Down
19 changes: 15 additions & 4 deletions src/flow/src/compute/render/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::compute::render::Context;
use crate::compute::state::Scheduler;
use crate::compute::types::{Arranged, Collection, CollectionBundle, ErrCollector, Toff};
use crate::expr::{EvalError, MapFilterProject, MfpPlan, ScalarExpr};
use crate::plan::Plan;
use crate::plan::{Plan, TypedPlan};
use crate::repr::{self, DiffRow, KeyValDiffRow, Row};
use crate::utils::ArrangeHandler;

Expand All @@ -38,7 +38,7 @@ impl<'referred, 'df> Context<'referred, 'df> {
#[allow(clippy::mutable_key_type)]
pub fn render_mfp(
&mut self,
input: Box<Plan>,
input: Box<TypedPlan>,
mfp: MapFilterProject,
) -> Result<CollectionBundle, Error> {
let input = self.render_plan(*input)?;
Expand Down Expand Up @@ -184,6 +184,7 @@ mod test {
use crate::compute::render::test::{get_output_handle, harness_test_ctx, run_and_check};
use crate::compute::state::DataflowState;
use crate::expr::{self, BinaryFunc, GlobalId};
use crate::repr::{ColumnType, RelationType};

/// test if temporal filter works properly
/// namely: if mfp operator can schedule a delete at the correct time
Expand All @@ -203,6 +204,9 @@ mod test {
let input_plan = Plan::Get {
id: expr::Id::Global(GlobalId::User(1)),
};
let typ = RelationType::new(vec![ColumnType::new_nullable(
ConcreteDataType::int64_datatype(),
)]);
// temporal filter: now <= col(0) < now + 4
let mfp = MapFilterProject::new(1)
.filter(vec![
Expand All @@ -225,7 +229,9 @@ mod test {
])
.unwrap();

let bundle = ctx.render_mfp(Box::new(input_plan), mfp).unwrap();
let bundle = ctx
.render_mfp(Box::new(input_plan.with_types(typ)), mfp)
.unwrap();
let output = get_output_handle(&mut ctx, bundle);
// drop ctx here to simulate actual process of compile first, run later scenario
drop(ctx);
Expand Down Expand Up @@ -273,14 +279,19 @@ mod test {
let input_plan = Plan::Get {
id: expr::Id::Global(GlobalId::User(1)),
};
let typ = RelationType::new(vec![ColumnType::new_nullable(
ConcreteDataType::int64_datatype(),
)]);
// filter: col(0)>1
let mfp = MapFilterProject::new(1)
.filter(vec![ScalarExpr::Column(0).call_binary(
ScalarExpr::literal(1.into(), ConcreteDataType::int32_datatype()),
BinaryFunc::Gt,
)])
.unwrap();
let bundle = ctx.render_mfp(Box::new(input_plan), mfp).unwrap();
let bundle = ctx
.render_mfp(Box::new(input_plan.with_types(typ)), mfp)
.unwrap();

let output = get_output_handle(&mut ctx, bundle);
drop(ctx);
Expand Down
50 changes: 43 additions & 7 deletions src/flow/src/compute/render/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::compute::state::Scheduler;
use crate::compute::types::{Arranged, Collection, CollectionBundle, ErrCollector, Toff};
use crate::expr::error::{DataTypeSnafu, InternalSnafu};
use crate::expr::{AggregateExpr, EvalError, ScalarExpr};
use crate::plan::{AccumulablePlan, AggrWithIndex, KeyValPlan, Plan, ReducePlan};
use crate::plan::{AccumulablePlan, AggrWithIndex, KeyValPlan, Plan, ReducePlan, TypedPlan};
use crate::repr::{self, DiffRow, KeyValDiffRow, Row};
use crate::utils::{ArrangeHandler, ArrangeReader, ArrangeWriter};

Expand All @@ -39,7 +39,7 @@ impl<'referred, 'df> Context<'referred, 'df> {
#[allow(clippy::mutable_key_type)]
pub fn render_reduce(
&mut self,
input: Box<Plan>,
input: Box<TypedPlan>,
key_val_plan: KeyValPlan,
reduce_plan: ReducePlan,
) -> Result<CollectionBundle, Error> {
Expand Down Expand Up @@ -736,6 +736,7 @@ mod test {
use crate::compute::render::test::{get_output_handle, harness_test_ctx, run_and_check};
use crate::compute::state::DataflowState;
use crate::expr::{self, AggregateFunc, BinaryFunc, GlobalId, MapFilterProject};
use crate::repr::{ColumnType, RelationType};

/// SELECT DISTINCT col FROM table
///
Expand All @@ -762,13 +763,20 @@ mod test {
let input_plan = Plan::Get {
id: expr::Id::Global(GlobalId::User(1)),
};
let typ = RelationType::new(vec![ColumnType::new_nullable(
ConcreteDataType::int64_datatype(),
)]);
let key_val_plan = KeyValPlan {
key_plan: MapFilterProject::new(1).project([0]).unwrap().into_safe(),
val_plan: MapFilterProject::new(1).project([]).unwrap().into_safe(),
};
let reduce_plan = ReducePlan::Distinct;
let bundle = ctx
.render_reduce(Box::new(input_plan), key_val_plan, reduce_plan)
.render_reduce(
Box::new(input_plan.with_types(typ)),
key_val_plan,
reduce_plan,
)
.unwrap();

let output = get_output_handle(&mut ctx, bundle);
Expand Down Expand Up @@ -809,6 +817,9 @@ mod test {
let input_plan = Plan::Get {
id: expr::Id::Global(GlobalId::User(1)),
};
let typ = RelationType::new(vec![ColumnType::new_nullable(
ConcreteDataType::int64_datatype(),
)]);
let key_val_plan = KeyValPlan {
key_plan: MapFilterProject::new(1).project([]).unwrap().into_safe(),
val_plan: MapFilterProject::new(1).project([0]).unwrap().into_safe(),
Expand All @@ -835,7 +846,11 @@ mod test {

let reduce_plan = ReducePlan::Accumulable(accum_plan);
let bundle = ctx
.render_reduce(Box::new(input_plan), key_val_plan, reduce_plan)
.render_reduce(
Box::new(input_plan.with_types(typ)),
key_val_plan,
reduce_plan,
)
.unwrap();

let output = get_output_handle(&mut ctx, bundle);
Expand Down Expand Up @@ -882,6 +897,9 @@ mod test {
let input_plan = Plan::Get {
id: expr::Id::Global(GlobalId::User(1)),
};
let typ = RelationType::new(vec![ColumnType::new_nullable(
ConcreteDataType::int64_datatype(),
)]);
let key_val_plan = KeyValPlan {
key_plan: MapFilterProject::new(1).project([]).unwrap().into_safe(),
val_plan: MapFilterProject::new(1).project([0]).unwrap().into_safe(),
Expand All @@ -908,7 +926,11 @@ mod test {

let reduce_plan = ReducePlan::Accumulable(accum_plan);
let bundle = ctx
.render_reduce(Box::new(input_plan), key_val_plan, reduce_plan)
.render_reduce(
Box::new(input_plan.with_types(typ)),
key_val_plan,
reduce_plan,
)
.unwrap();

let output = get_output_handle(&mut ctx, bundle);
Expand Down Expand Up @@ -951,6 +973,9 @@ mod test {
let input_plan = Plan::Get {
id: expr::Id::Global(GlobalId::User(1)),
};
let typ = RelationType::new(vec![ColumnType::new_nullable(
ConcreteDataType::int64_datatype(),
)]);
let key_val_plan = KeyValPlan {
key_plan: MapFilterProject::new(1).project([]).unwrap().into_safe(),
val_plan: MapFilterProject::new(1).project([0]).unwrap().into_safe(),
Expand All @@ -977,7 +1002,11 @@ mod test {

let reduce_plan = ReducePlan::Accumulable(accum_plan);
let bundle = ctx
.render_reduce(Box::new(input_plan), key_val_plan, reduce_plan)
.render_reduce(
Box::new(input_plan.with_types(typ)),
key_val_plan,
reduce_plan,
)
.unwrap();

let output = get_output_handle(&mut ctx, bundle);
Expand Down Expand Up @@ -1020,6 +1049,9 @@ mod test {
let input_plan = Plan::Get {
id: expr::Id::Global(GlobalId::User(1)),
};
let typ = RelationType::new(vec![ColumnType::new_nullable(
ConcreteDataType::int64_datatype(),
)]);
let key_val_plan = KeyValPlan {
key_plan: MapFilterProject::new(1).project([]).unwrap().into_safe(),
val_plan: MapFilterProject::new(1).project([0]).unwrap().into_safe(),
Expand Down Expand Up @@ -1061,7 +1093,11 @@ mod test {

let reduce_plan = ReducePlan::Accumulable(accum_plan);
let bundle = ctx
.render_reduce(Box::new(input_plan), key_val_plan, reduce_plan)
.render_reduce(
Box::new(input_plan.with_types(typ)),
key_val_plan,
reduce_plan,
)
.unwrap();

let output = get_output_handle(&mut ctx, bundle);
Expand Down
36 changes: 35 additions & 1 deletion src/flow/src/expr/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ use datatypes::value::Value;
use serde::{Deserialize, Serialize};
use snafu::ensure;

use crate::adapter::error::{Error, InvalidQuerySnafu, UnsupportedTemporalFilterSnafu};
use crate::adapter::error::{
Error, InvalidQuerySnafu, UnexpectedSnafu, UnsupportedTemporalFilterSnafu,
};
use crate::expr::error::{EvalError, InvalidArgumentSnafu, OptimizeSnafu};
use crate::expr::func::{BinaryFunc, UnaryFunc, UnmaterializableFunc, VariadicFunc};
use crate::repr::ColumnType;
Expand Down Expand Up @@ -80,6 +82,38 @@ pub enum ScalarExpr {
},
}

impl ScalarExpr {
/// try to determine the type of the expression
pub fn typ(&self, input_types: &[ColumnType]) -> Result<ColumnType, Error> {
match self {
ScalarExpr::Column(i) => input_types.get(*i).cloned().ok_or_else(|| {
UnexpectedSnafu {
reason: format!(
"column index {} out of range of len={}",
i,
input_types.len()
),
}
.build()
}),
ScalarExpr::Literal(_, typ) => Ok(ColumnType::new_nullable(typ.clone())),
ScalarExpr::CallUnmaterializable(func) => {
Ok(ColumnType::new_nullable(func.signature().output))
}
ScalarExpr::CallUnary { func, .. } => {
Ok(ColumnType::new_nullable(func.signature().output))
}
ScalarExpr::CallBinary { func, .. } => {
Ok(ColumnType::new_nullable(func.signature().output))
}
ScalarExpr::CallVariadic { func, .. } => {
Ok(ColumnType::new_nullable(func.signature().output))
}
ScalarExpr::If { then, .. } => then.typ(input_types),
}
}
}

impl ScalarExpr {
/// apply optimization to the expression, like flatten variadic function
pub fn optimize(&mut self) {
Expand Down
Loading

0 comments on commit cb46f27

Please sign in to comment.