Skip to content

Commit

Permalink
Profiling counts can be processed outside of HF
Browse files Browse the repository at this point in the history
Co-authored-by: Shadaj Laddad <[email protected]>
  • Loading branch information
davidchuyaya and shadaj committed Mar 27, 2024
1 parent db53844 commit ee4e33c
Show file tree
Hide file tree
Showing 9 changed files with 212 additions and 5 deletions.
6 changes: 5 additions & 1 deletion hydroflow_plus/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,12 @@ impl<'a, D: LocalDeploy<'a>> BuiltFlow<'a, D> {
build_inner(self)
}

pub fn optimize_default(self) -> HfCompiled<'a, D::GraphId> {
pub fn with_default_optimize(self) -> BuiltFlow<'a, D> {
self.optimize_with(super::persist_pullup::persist_pullup)
}

pub fn optimize_default(self) -> HfCompiled<'a, D::GraphId> {
self.with_default_optimize()
.no_optimize()
}
}
4 changes: 2 additions & 2 deletions hydroflow_plus/src/ir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ pub enum HfPlusLeaf {
impl HfPlusLeaf {
pub fn transform_children(
self,
transform: impl Fn(HfPlusNode, &mut SeenTees) -> HfPlusNode,
mut transform: impl FnMut(HfPlusNode, &mut SeenTees) -> HfPlusNode,
seen_tees: &mut SeenTees,
) -> HfPlusLeaf {
match self {
Expand Down Expand Up @@ -246,7 +246,7 @@ pub type SeenTees = HashMap<*const RefCell<HfPlusNode>, Rc<RefCell<HfPlusNode>>>
impl HfPlusNode {
pub fn transform_children(
self,
transform: impl Fn(HfPlusNode, &mut SeenTees) -> HfPlusNode,
mut transform: impl FnMut(HfPlusNode, &mut SeenTees) -> HfPlusNode,
seen_tees: &mut SeenTees,
) -> HfPlusNode {
match self {
Expand Down
1 change: 1 addition & 0 deletions hydroflow_plus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ pub use builder::FlowBuilder;
pub mod ir;

pub mod persist_pullup;
pub mod profiler;

#[derive(Clone)]
pub struct RuntimeContext<'a> {
Expand Down
118 changes: 118 additions & 0 deletions hydroflow_plus/src/profiler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
use std::cell::RefCell;

use hydroflow::futures::channel::mpsc::UnboundedSender;
use stageleft::*;

use crate as hydroflow_plus;
use crate::ir::*;
use crate::RuntimeContext;

pub fn increment_counter(tick: usize, id: u32, count: &mut u64) {
*count += 1;
// println!("tick {}, id {}: counter {} incremented", tick, id, count);
}

fn quoted_any_fn<'a, F: Fn(usize) -> usize + 'a, Q: IntoQuotedMut<'a, F>>(q: Q) -> Q {
q
}

// Add a profiling node before each node to count the cardinality of its input
fn add_profiling_node<'a>(
node: HfPlusNode,
context: RuntimeContext<'a>,
counters: RuntimeData<&'a RefCell<Vec<u64>>>,
counter_queue: RuntimeData<&'a RefCell<UnboundedSender<(usize, u64)>>>,
id: &mut u32,
seen_tees: &mut SeenTees,
) -> HfPlusNode {
let my_id = *id;
*id += 1;

let child = node.transform_children(
|node, seen_tees| add_profiling_node(node, context, counters, counter_queue, id, seen_tees),
seen_tees,
);
let counters_copy1 = *&counters;
let counters_copy2 = *&counters;
let my_id_copy1 = my_id;
let my_id_copy2 = my_id;
let my_id_copy3 = my_id;
let my_id_copy4 = my_id;
HfPlusNode::Map {
f: quoted_any_fn(q!({
// Put counters on queue
counter_queue.borrow().unbounded_send((my_id_copy1 as usize, counters_copy1.borrow()[my_id_copy2 as usize])).unwrap();
counters_copy2.borrow_mut()[my_id_copy3 as usize] = 0;
move |v| {
hydroflow_plus::profiler::increment_counter(
context.current_tick(),
my_id_copy4,
&mut counters.borrow_mut()[my_id as usize],
);
v
}
}))
.splice()
.into(),
input: Box::new(child),
}
}

// Count the cardinality of each input and periodically output to a file
pub fn profiling<'a>(
ir: Vec<HfPlusLeaf>,
context: RuntimeContext<'a>,
counters: RuntimeData<&'a RefCell<Vec<u64>>>,
counter_queue: RuntimeData<&'a RefCell<UnboundedSender<(usize, u64)>>>,
) -> Vec<HfPlusLeaf> {
let mut id = 0;
let mut seen_tees = Default::default();
ir.into_iter()
.map(|l| {
l.transform_children(
|node, seen_tees| add_profiling_node(node, context, counters, counter_queue, &mut id, seen_tees),
&mut seen_tees,
)
})
.collect()
}

#[stageleft::runtime]
#[cfg(test)]
mod tests {
use hydroflow_lang::graph::WriteConfig;
use stageleft::*;

use crate::{Location, MultiGraph};

#[test]
fn predicate_pushdown_through_map() {
let flow = crate::builder::FlowBuilder::<MultiGraph>::new();
let process = flow.process(&());

process
.source_iter(q!(0..10))
.all_ticks()
.map(q!(|v| v + 1))
.for_each(q!(|n| println!("{}", n)));

let runtime_context = flow.runtime_context();
let built = flow.extract();

insta::assert_debug_snapshot!(&built.ir);

// Print mermaid
// let mut mermaid_config = WriteConfig {op_text_no_imports: true, ..Default::default()};
// for (_, ir) in built.clone().optimize_default().hydroflow_ir() {
// println!("{}", ir.to_mermaid(&mermaid_config));
// }

let counters = RuntimeData::new("Fake");
let counter_queue = RuntimeData::new("Fake");

let pushed_down =
built.optimize_with(|ir| super::profiling(ir, runtime_context, counters, counter_queue));

insta::assert_debug_snapshot!(&pushed_down.ir);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
---
source: hydroflow_plus/src/profiler.rs
assertion_line: 112
expression: "&pushed_down.ir"
---
[
ForEach {
f: { use crate :: __staged :: profiler :: tests :: * ; | n | println ! ("{}" , n) },
input: Map {
f: { use crate :: __staged :: profiler :: * ; let counter_queue = Fake ; let my_id = 0u32 ; let counters_copy1 = Fake ; let my_id = 32712u32 ; let counters_copy2 = Fake ; let my_id = 32712u32 ; let context = & context ; let my_id = 32712u32 ; let counters = Fake ; let my_id = 32712u32 ; { counter_queue . borrow () . unbounded_send ((my_id as usize , counters_copy1 . borrow () [my_id as usize])) . unwrap () ; counters_copy2 . borrow_mut () [my_id as usize] = 0 ; move | v | { hydroflow_plus :: profiler :: increment_counter (context . current_tick () , my_id , & mut counters . borrow_mut () [my_id as usize] ,) ; v } } },
input: Map {
f: { use crate :: __staged :: profiler :: tests :: * ; | v | v + 1 },
input: Map {
f: { use crate :: __staged :: profiler :: * ; let counter_queue = Fake ; let my_id = 1u32 ; let counters_copy1 = Fake ; let my_id = 32712u32 ; let counters_copy2 = Fake ; let my_id = 32712u32 ; let context = & context ; let my_id = 32712u32 ; let counters = Fake ; let my_id = 32712u32 ; { counter_queue . borrow () . unbounded_send ((my_id as usize , counters_copy1 . borrow () [my_id as usize])) . unwrap () ; counters_copy2 . borrow_mut () [my_id as usize] = 0 ; move | v | { hydroflow_plus :: profiler :: increment_counter (context . current_tick () , my_id , & mut counters . borrow_mut () [my_id as usize] ,) ; v } } },
input: Persist(
Map {
f: { use crate :: __staged :: profiler :: * ; let counter_queue = Fake ; let my_id = 2u32 ; let counters_copy1 = Fake ; let my_id = 32712u32 ; let counters_copy2 = Fake ; let my_id = 32712u32 ; let context = & context ; let my_id = 32712u32 ; let counters = Fake ; let my_id = 32712u32 ; { counter_queue . borrow () . unbounded_send ((my_id as usize , counters_copy1 . borrow () [my_id as usize])) . unwrap () ; counters_copy2 . borrow_mut () [my_id as usize] = 0 ; move | v | { hydroflow_plus :: profiler :: increment_counter (context . current_tick () , my_id , & mut counters . borrow_mut () [my_id as usize] ,) ; v } } },
input: Source {
source: Iter(
{ use crate :: __staged :: profiler :: tests :: * ; 0 .. 10 },
),
location_id: 0,
},
},
),
},
},
},
},
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
---
source: hydroflow_plus/src/profiler.rs
assertion_line: 98
expression: "&built.ir"
---
[
ForEach {
f: { use crate :: __staged :: profiler :: tests :: * ; | n | println ! ("{}" , n) },
input: Map {
f: { use crate :: __staged :: profiler :: tests :: * ; | v | v + 1 },
input: Persist(
Source {
source: Iter(
{ use crate :: __staged :: profiler :: tests :: * ; 0 .. 10 },
),
location_id: 0,
},
),
},
},
]
8 changes: 8 additions & 0 deletions hydroflow_plus_test/examples/compute_pi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use hydro_deploy::{Deployment, Host, HydroflowCrate};
use hydroflow_plus_cli_integration::{DeployClusterSpec, DeployProcessSpec};
use stageleft::RuntimeData;
use tokio::sync::RwLock;
use hydroflow_plus::profiler::profiling;

type HostCreator = Box<dyn Fn(&mut Deployment) -> Arc<RwLock<dyn Host>>>;

Expand Down Expand Up @@ -70,6 +71,13 @@ async fn main() {
RuntimeData::new("FAKE"),
);

let runtime_context = builder.runtime_context();
dbg!(builder.extract()
.with_default_optimize()
.optimize_with(|ir| profiling(ir, runtime_context, RuntimeData::new("FAKE"), RuntimeData::new("FAKE")))
.no_optimize()
.ir());

let mut deployment = deployment.into_inner();

deployment.deploy().await.unwrap();
Expand Down
19 changes: 18 additions & 1 deletion hydroflow_plus_test/src/bin/compute_pi.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,27 @@
use std::cell::RefCell;

use hydroflow_plus::futures::StreamExt;

// cannot use hydroflow::main because connect_local_blocking causes a deadlock
#[tokio::main]
async fn main() {
let batch_size = 8192;
let counters = RefCell::new(vec![0; 8192]);

let (counter_sender, mut counter_receiver) = hydroflow_plus::futures::channel::mpsc::unbounded();
let counter_queue = RefCell::new(counter_sender);

let thread = tokio::spawn(async move {
while let Some((id, count)) = counter_receiver.next().await {
println!("node id {}: counter = {}", id, count);
}
});

hydroflow_plus::launch!(|ports| hydroflow_plus_test::cluster::compute_pi_runtime!(
ports,
&batch_size
&batch_size,
&counters,
&counter_queue,
))
.await;
}
10 changes: 9 additions & 1 deletion hydroflow_plus_test/src/cluster.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use std::cell::RefCell;
use std::time::Duration;

use futures::channel::mpsc::UnboundedSender;
use hydroflow_plus::profiler::profiling;
use hydroflow_plus::*;
use stageleft::*;

Expand Down Expand Up @@ -158,10 +161,15 @@ pub fn compute_pi_runtime<'a>(
flow: FlowBuilder<'a, CLIRuntime>,
cli: RuntimeData<&'a HydroCLI<HydroflowPlusMeta>>,
batch_size: RuntimeData<&'a usize>,
counters: RuntimeData<&'a RefCell<Vec<u64>>>,
counter_queue: RuntimeData<&'a RefCell<UnboundedSender<(usize, u64)>>>,
) -> impl Quoted<'a, Hydroflow<'a>> {
let _ = compute_pi(&flow, &cli, &cli, batch_size);
let runtime_context = flow.runtime_context();
flow.extract()
.optimize_default()
.with_default_optimize()
.optimize_with(|ir| profiling(ir, runtime_context, counters, counter_queue))
.no_optimize()
.with_dynamic_id(q!(cli.meta.subgraph_id))
}

Expand Down

0 comments on commit ee4e33c

Please sign in to comment.