Skip to content

Commit

Permalink
Profiling compute_pi with counters
Browse files Browse the repository at this point in the history
  • Loading branch information
davidchuyaya committed Mar 14, 2024
1 parent 200ef08 commit 7ac5fe7
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 7 deletions.
2 changes: 1 addition & 1 deletion hydroflow/src/util/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl<T> HydroCLI<T> {
}
}

async fn init_no_ack_start<T: DeserializeOwned + Default>() -> HydroCLI<T> {
pub async fn init_no_ack_start<T: DeserializeOwned + Default>() -> HydroCLI<T> {
let mut input = String::new();
std::io::stdin().read_line(&mut input).unwrap();
let trimmed = input.trim();
Expand Down
63 changes: 63 additions & 0 deletions hydroflow_plus/src/profiler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
use crate::{ir::*, RuntimeContext};
use stageleft::*;
use crate as hydroflow_plus;
use std::cell::RefCell;

pub fn increment_counter(tick: usize, id: u32, count: &mut u64) {
*count += 1;
println!("tick {}, id {}: counter {} incremented", tick, id, count);
}

fn quoted_any_fn<'a, F: Fn(usize) -> usize + 'a, Q: IntoQuotedMut<'a, F>>(q: Q) -> Q {
q
}

fn add_profiling_node<'a>(node: HfPlusNode, context: RuntimeContext<'a>, counters: RuntimeData<&'a RefCell<Vec<u64>>>, id: &mut u32, seen_tees: &mut SeenTees) -> HfPlusNode {
let my_id = *id;
*id += 1;

let child = node.transform_children(|node, seen_tees| add_profiling_node(node, context, counters, id, seen_tees), seen_tees);
let counters_copy = *&counters;
HfPlusNode::Map {
f: quoted_any_fn(q!(
{
counters_copy.borrow_mut()[my_id as usize] = 0;
move |v| { hydroflow_plus::profiler::increment_counter(context.current_tick(), my_id, &mut counters.borrow_mut()[my_id as usize]); v }
})).splice().into(),
input: Box::new(child),
}
}

pub fn profiling<'a>(ir: Vec<HfPlusLeaf>, context: RuntimeContext<'a>, counters: RuntimeData<&'a RefCell<Vec<u64>>>) -> Vec<HfPlusLeaf> {
let mut id = 0;
let mut seen_tees = Default::default();
ir.into_iter().map(|l| l.transform_children( |node, seen_tees| add_profiling_node(node, context, counters, &mut id, seen_tees), &mut seen_tees)).collect()
}

#[stageleft::runtime]
#[cfg(test)]
mod tests {
use stageleft::*;

use crate::{Location, MultiGraph};

#[test]
fn predicate_pushdown_through_map() {
let flow = crate::builder::FlowBuilder::<MultiGraph>::new();
let process = flow.process(&());

process
.source_iter(q!(0..10))
.all_ticks()
.map(q!(|v| v + 1))
.for_each(q!(|n| println!("{}", n)));

let built = flow.build();

insta::assert_debug_snapshot!(&built.ir);

let pushed_down = built.optimize_with(|ir| super::profiling(ir, flow.runtime_context()));

insta::assert_debug_snapshot!(&pushed_down.ir);
}
}
2 changes: 1 addition & 1 deletion hydroflow_plus_test/examples/compute_pi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ async fn main() {
None,
)
}),
"release",
"dev",
)
} else {
let localhost = deployment.borrow_mut().Localhost();
Expand Down
16 changes: 12 additions & 4 deletions hydroflow_plus_test/src/bin/compute_pi.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
use std::cell::RefCell;
use hydroflow_plus::util::cli::init_no_ack_start;
use hydroflow_plus::util::cli::launch_flow;

// cannot use hydroflow::main because connect_local_blocking causes a deadlock
#[tokio::main]
async fn main() {
hydroflow_plus::util::cli::launch(|ports| {
hydroflow_plus_test::cluster::compute_pi_runtime!(ports)
})
.await;
// TODO: Figure out the number of counters we need
let counters = RefCell::new(vec![0; 8192]);
let ports = init_no_ack_start().await;
let flow = hydroflow_plus_test::cluster::compute_pi_runtime!(&ports, &counters);

println!("ack start");

launch_flow(flow).await;
}
3 changes: 2 additions & 1 deletion hydroflow_plus_test/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,10 @@ pub fn map_reduce_runtime<'a>(
pub fn compute_pi_runtime<'a>(
flow: &'a FlowBuilder<'a, CLIRuntime>,
cli: RuntimeData<&'a HydroCLI<HydroflowPlusMeta>>,
counters: RuntimeData<&'a std::cell::RefCell<Vec<u64>>>,
) -> impl Quoted<'a, Hydroflow<'a>> {
let _ = compute_pi(flow, &cli, &cli);
flow.build().emit(q!(cli.meta.subgraph_id))
flow.build().optimize_with(|ir| hydroflow_plus::profiler::profiling(ir, flow.runtime_context(), counters)).emit(q!(cli.meta.subgraph_id))
}

#[stageleft::runtime]
Expand Down

0 comments on commit 7ac5fe7

Please sign in to comment.