Skip to content

Commit

Permalink
feat(hydroflow_plus): add APIs for running optimization passes
Browse files Browse the repository at this point in the history
  • Loading branch information
shadaj committed Mar 5, 2024
1 parent c1d1b51 commit a5bb45b
Show file tree
Hide file tree
Showing 13 changed files with 168 additions and 38 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions hydroflow_plus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,6 @@ stageleft = { path = "../stageleft", version = "^0.1.0" }

[build-dependencies]
stageleft_tool = { path = "../stageleft_tool", version = "^0.1.0" }

[dev-dependencies]
insta = "1.7.1"
52 changes: 35 additions & 17 deletions hydroflow_plus/src/builder.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::cell::{Ref, RefCell};
use std::cell::RefCell;
use std::collections::{BTreeMap, HashMap};
use std::marker::PhantomData;

Expand Down Expand Up @@ -29,12 +29,6 @@ pub struct FlowBuilder<'a, D: LocalDeploy<'a> + ?Sized> {
_phantom: PhantomData<&'a mut &'a ()>,
}

impl<'a, D: LocalDeploy<'a> + ?Sized> FlowBuilder<'a, D> {
pub fn ir(&self) -> Ref<'_, Vec<HfPlusLeaf>> {
self.ir_leaves.borrow()
}
}

impl<'a, D: LocalDeploy<'a>> QuotedContext for FlowBuilder<'a, D> {
fn create() -> Self {
FlowBuilder::new()
Expand All @@ -54,6 +48,13 @@ impl<'a, D: LocalDeploy<'a>> FlowBuilder<'a, D> {
}
}

pub fn build(&self) -> BuiltFlow<'a, D> {
BuiltFlow {
ir: self.ir_leaves.borrow().clone(),
_phantom: PhantomData,
}
}

pub fn ir_leaves(&self) -> &RefCell<Vec<HfPlusLeaf>> {
&self.ir_leaves
}
Expand Down Expand Up @@ -102,15 +103,34 @@ impl<'a, D: LocalDeploy<'a>> FlowBuilder<'a, D> {
}
}

#[derive(Clone)]
pub struct BuiltFlow<'a, D: LocalDeploy<'a>> {
pub ir: Vec<HfPlusLeaf>,

_phantom: PhantomData<&'a mut &'a D>,
}

impl<'a, D: LocalDeploy<'a>> BuiltFlow<'a, D> {
pub fn optimize_with(
self,
f: impl FnOnce(Vec<HfPlusLeaf>) -> Vec<HfPlusLeaf>,
) -> BuiltFlow<'a, D> {
BuiltFlow {
ir: f(self.ir),
_phantom: PhantomData,
}
}
}

fn build_inner<'a, D: LocalDeploy<'a>>(
me: &FlowBuilder<'a, D>,
me: BuiltFlow<'a, D>,
id: syn::Expr,
is_single: bool,
) -> HfBuilt<'a> {
let mut builders = BTreeMap::new();
let mut built_tees = HashMap::new();
let mut next_stmt_id = 0;
for leaf in me.ir_leaves().replace(Default::default()) {
for leaf in me.ir {
leaf.emit(&mut builders, &mut built_tees, &mut next_stmt_id);
}

Expand Down Expand Up @@ -171,17 +191,16 @@ fn build_inner<'a, D: LocalDeploy<'a>>(
}
}

impl<'a, D: LocalDeploy<'a, GraphId = usize>> FlowBuilder<'a, D> {
pub fn build(&self, id: impl Quoted<'a, usize>) -> HfBuilt<'a> {
impl<'a, D: LocalDeploy<'a, GraphId = usize>> BuiltFlow<'a, D> {
pub fn emit(self, id: impl Quoted<'a, usize>) -> HfBuilt<'a> {
build_inner(self, id.splice(), false)
}

pub fn hydroflow_ir(&self) -> BTreeMap<usize, HydroflowGraph> {
let mut builders = BTreeMap::new();
let mut built_tees = HashMap::new();
let mut next_stmt_id = 0;
let borrowed_leaves = self.ir_leaves.borrow();
for leaf in borrowed_leaves.iter() {
for leaf in self.ir.iter() {
leaf.clone()
.emit(&mut builders, &mut built_tees, &mut next_stmt_id);
}
Expand All @@ -196,17 +215,16 @@ impl<'a, D: LocalDeploy<'a, GraphId = usize>> FlowBuilder<'a, D> {
}
}

impl<'a, D: LocalDeploy<'a, GraphId = ()>> FlowBuilder<'a, D> {
pub fn build_single(&self) -> HfBuilt<'a> {
impl<'a, D: LocalDeploy<'a, GraphId = ()>> BuiltFlow<'a, D> {
pub fn emit_single(self) -> HfBuilt<'a> {
build_inner(self, parse_quote!(0), true)
}

pub fn single_hydroflow_ir(&self) -> HydroflowGraph {
let mut builders = BTreeMap::new();
let mut built_tees = HashMap::new();
let mut next_stmt_id = 0;
let borrowed_leaves = self.ir_leaves.borrow();
for leaf in borrowed_leaves.iter() {
for leaf in self.ir.iter() {
leaf.clone()
.emit(&mut builders, &mut built_tees, &mut next_stmt_id);
}
Expand Down
4 changes: 4 additions & 0 deletions hydroflow_plus/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#![feature(box_patterns)]

stageleft::stageleft_no_entry_crate!();

use std::marker::PhantomData;
Expand Down Expand Up @@ -31,6 +33,8 @@ pub use builder::FlowBuilder;

pub mod ir;

pub mod persist_pushdown;

#[derive(Clone)]
pub struct RuntimeContext<'a> {
_phantom: PhantomData<&'a mut &'a ()>,
Expand Down
59 changes: 59 additions & 0 deletions hydroflow_plus/src/persist_pushdown.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
use crate::ir::*;

fn predicate_pushdown_node(node: HfPlusNode) -> HfPlusNode {
match node {
HfPlusNode::Map {
f,
input: box HfPlusNode::Persist(behind_persist),
} => HfPlusNode::Persist(Box::new(predicate_pushdown_node(HfPlusNode::Map {
f,
input: behind_persist,
}))),
o => o,
}
}

fn predicate_pushdown_leaf(leaf: HfPlusLeaf) -> HfPlusLeaf {
match leaf {
HfPlusLeaf::ForEach { f, input } => {
let input = predicate_pushdown_node(*input);
HfPlusLeaf::ForEach {
f,
input: Box::new(input),
}
}
o => o,
}
}

pub fn predicate_pushdown(ir: Vec<HfPlusLeaf>) -> Vec<HfPlusLeaf> {
ir.into_iter().map(predicate_pushdown_leaf).collect()
}

#[stageleft::runtime]
#[cfg(test)]
mod tests {
use stageleft::*;

use crate::{Location, MultiGraph};

#[test]
fn predicate_pushdown_through_map() {
let flow = crate::builder::FlowBuilder::<MultiGraph>::new();
let process = flow.process(&());

process
.source_iter(q!(0..10))
.all_ticks()
.map(q!(|v| v + 1))
.for_each(q!(|n| println!("{}", n)));

let built = flow.build();

insta::assert_debug_snapshot!(&built.ir);

let pushed_down = built.optimize_with(super::predicate_pushdown);

insta::assert_debug_snapshot!(&pushed_down.ir);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
---
source: hydroflow_plus/src/persist_pushdown.rs
expression: "&pushed_down.ir"
---
[
ForEach {
f: { use crate :: __staged :: persist_pushdown :: tests :: * ; | n | println ! ("{}" , n) },
input: Persist(
Map {
f: { use crate :: __staged :: persist_pushdown :: tests :: * ; | v | v + 1 },
input: Source {
source: Iter(
{ use crate :: __staged :: persist_pushdown :: tests :: * ; 0 .. 10 },
),
produces_delta: false,
location_id: 0,
},
},
),
},
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
---
source: hydroflow_plus/src/persist_pushdown.rs
expression: "&built.ir"
---
[
ForEach {
f: { use crate :: __staged :: persist_pushdown :: tests :: * ; | n | println ! ("{}" , n) },
input: Map {
f: { use crate :: __staged :: persist_pushdown :: tests :: * ; | v | v + 1 },
input: Persist(
Source {
source: Iter(
{ use crate :: __staged :: persist_pushdown :: tests :: * ; 0 .. 10 },
),
produces_delta: false,
location_id: 0,
},
),
},
},
]
22 changes: 12 additions & 10 deletions hydroflow_plus_test/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ pub fn simple_cluster_runtime<'a>(
cli: RuntimeData<&'a HydroCLI<HydroflowPlusMeta>>,
) -> impl Quoted<'a, Hydroflow<'a>> {
let _ = simple_cluster(flow, &cli, &cli);
flow.build(q!(cli.meta.subgraph_id))
flow.build().emit(q!(cli.meta.subgraph_id))
}

#[stageleft::entry]
Expand All @@ -134,7 +134,7 @@ pub fn many_to_many_runtime<'a>(
cli: RuntimeData<&'a HydroCLI<HydroflowPlusMeta>>,
) -> impl Quoted<'a, Hydroflow<'a>> {
let _ = many_to_many(flow, &cli);
flow.build(q!(cli.meta.subgraph_id))
flow.build().emit(q!(cli.meta.subgraph_id))
}

#[stageleft::entry]
Expand All @@ -143,7 +143,7 @@ pub fn map_reduce_runtime<'a>(
cli: RuntimeData<&'a HydroCLI<HydroflowPlusMeta>>,
) -> impl Quoted<'a, Hydroflow<'a>> {
let _ = map_reduce(flow, &cli, &cli);
flow.build(q!(cli.meta.subgraph_id))
flow.build().emit(q!(cli.meta.subgraph_id))
}

#[stageleft::entry]
Expand All @@ -152,7 +152,7 @@ pub fn compute_pi_runtime<'a>(
cli: RuntimeData<&'a HydroCLI<HydroflowPlusMeta>>,
) -> impl Quoted<'a, Hydroflow<'a>> {
let _ = compute_pi(flow, &cli, &cli);
flow.build(q!(cli.meta.subgraph_id))
flow.build().emit(q!(cli.meta.subgraph_id))
}

#[stageleft::runtime]
Expand Down Expand Up @@ -194,7 +194,7 @@ mod tests {
}),
);

insta::assert_debug_snapshot!(builder.ir());
insta::assert_debug_snapshot!(builder.build().ir);

let mut deployment = deployment.into_inner();

Expand Down Expand Up @@ -250,7 +250,7 @@ mod tests {
}),
);

insta::assert_debug_snapshot!(builder.ir());
insta::assert_debug_snapshot!(builder.build().ir);

let mut deployment = deployment.into_inner();

Expand Down Expand Up @@ -289,10 +289,11 @@ mod tests {
&RuntimeData::new("FAKE"),
&RuntimeData::new("FAKE"),
);
let built = builder.build();

insta::assert_debug_snapshot!(builder.ir());
insta::assert_debug_snapshot!(built.ir);

for (id, ir) in builder.hydroflow_ir() {
for (id, ir) in built.hydroflow_ir() {
insta::with_settings!({snapshot_suffix => format!("surface_graph_{id}")}, {
insta::assert_display_snapshot!(ir.surface_syntax_string());
});
Expand All @@ -307,10 +308,11 @@ mod tests {
&RuntimeData::new("FAKE"),
&RuntimeData::new("FAKE"),
);
let built = builder.build();

insta::assert_debug_snapshot!(builder.ir());
insta::assert_debug_snapshot!(built.ir);

for (id, ir) in builder.hydroflow_ir() {
for (id, ir) in built.hydroflow_ir() {
insta::with_settings!({snapshot_suffix => format!("surface_graph_{id}")}, {
insta::assert_display_snapshot!(ir.surface_syntax_string());
});
Expand Down
6 changes: 3 additions & 3 deletions hydroflow_plus_test/src/first_ten.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub fn first_ten_runtime<'a>(
flow: &'a FlowBuilder<'a, SingleProcessGraph>,
) -> impl Quoted<'a, Hydroflow<'a>> {
first_ten(flow, &());
flow.build_single()
flow.build().emit_single()
}

pub fn first_ten_distributed<'a, D: Deploy<'a>>(
Expand All @@ -42,7 +42,7 @@ pub fn first_ten_distributed_runtime<'a>(
cli: RuntimeData<&'a HydroCLI<HydroflowPlusMeta>>,
) -> impl Quoted<'a, Hydroflow<'a>> {
let _ = first_ten_distributed(flow, &cli);
flow.build(q!(cli.meta.subgraph_id))
flow.build().emit(q!(cli.meta.subgraph_id))
}

#[stageleft::runtime]
Expand All @@ -68,7 +68,7 @@ mod tests {
}),
);

insta::assert_debug_snapshot!(builder.ir());
insta::assert_debug_snapshot!(builder.build().ir);

deployment.deploy().await.unwrap();

Expand Down
8 changes: 4 additions & 4 deletions hydroflow_plus_test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub fn teed_join<'a, S: Stream<Item = u32> + Unpin + 'a>(
output.send(v).unwrap();
}));

flow.build(subgraph_id)
flow.build().emit(subgraph_id)
}

#[stageleft::entry]
Expand Down Expand Up @@ -73,7 +73,7 @@ pub fn chat_app<'a>(
output.send(t).unwrap();
}));

flow.build_single()
flow.build().emit_single()
}

#[stageleft::entry]
Expand Down Expand Up @@ -102,7 +102,7 @@ pub fn graph_reachability<'a>(
reached_out.send(v).unwrap();
}));

flow.build_single()
flow.build().emit_single()
}

#[stageleft::entry(String)]
Expand All @@ -123,7 +123,7 @@ pub fn count_elems<'a, T: 'a>(
output.send(v).unwrap();
}));

flow.build_single()
flow.build().emit_single()
}

#[stageleft::runtime]
Expand Down
Loading

0 comments on commit a5bb45b

Please sign in to comment.