Skip to content

Commit

Permalink
feat(hydroflow_plus): perf(true) added to HydroflowCrateService (hydr…
Browse files Browse the repository at this point in the history
  • Loading branch information
davidchuyaya committed Apr 12, 2024
1 parent 23d7a8f commit e6b6d40
Show file tree
Hide file tree
Showing 20 changed files with 373 additions and 10 deletions.
3 changes: 2 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@
],
"files.watcherExclude": {
"**/target": true
}
},
"rust-analyzer.showUnlinkedFileNotification": false
}
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ lto = "fat"
[profile.profile]
inherits = "release"
debug = 2
lto = "off"
strip = "none"

[profile.dev.package.website_playground]
Expand Down
8 changes: 8 additions & 0 deletions hydro_deploy/core/src/hydroflow_crate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub struct HydroflowCrate {
target: CrateTarget,
on: Arc<RwLock<dyn Host>>,
profile: Option<String>,
perf: bool, // Whether to wrap the binary in perf to get CPU time
args: Vec<String>,
display_name: Option<String>,
}
Expand All @@ -40,6 +41,7 @@ impl HydroflowCrate {
target: CrateTarget::Default,
on,
profile: None,
perf: false,
args: vec![],
display_name: None,
}
Expand Down Expand Up @@ -78,6 +80,11 @@ impl HydroflowCrate {
self
}

pub fn perf(mut self, perf: bool) -> Self {
self.perf = perf;
self
}

/// Sets the arguments to be passed to the binary when it is launched.
pub fn args(mut self, args: impl IntoIterator<Item = impl Into<String>>) -> Self {
self.args.extend(args.into_iter().map(|s| s.into()));
Expand Down Expand Up @@ -111,6 +118,7 @@ impl ServiceBuilder for HydroflowCrate {
bin,
example,
self.profile,
self.perf,
None,
Some(self.args),
self.display_name,
Expand Down
4 changes: 4 additions & 0 deletions hydro_deploy/core/src/hydroflow_crate/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub struct HydroflowCrateService {
bin: Option<String>,
example: Option<String>,
profile: Option<String>,
perf: bool,
features: Option<Vec<String>>,
args: Option<Vec<String>>,
display_id: Option<String>,
Expand Down Expand Up @@ -62,6 +63,7 @@ impl HydroflowCrateService {
bin: Option<String>,
example: Option<String>,
profile: Option<String>,
perf: bool,
features: Option<Vec<String>>,
args: Option<Vec<String>>,
display_id: Option<String>,
Expand All @@ -76,6 +78,7 @@ impl HydroflowCrateService {
on,
example,
profile,
perf,
features,
args,
display_id,
Expand Down Expand Up @@ -277,6 +280,7 @@ impl Service for HydroflowCrateService {
.unwrap_or_else(|| format!("service/{}", self.id)),
built.clone(),
&args,
self.perf,
)
.await?;

Expand Down
1 change: 1 addition & 0 deletions hydro_deploy/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ pub trait LaunchedHost: Send + Sync {
id: String,
binary: Arc<(String, Vec<u8>, PathBuf)>,
args: &[String],
perf: bool,
) -> Result<Arc<RwLock<dyn LaunchedBinary>>>;

async fn forward_port(&self, addr: &SocketAddr) -> Result<SocketAddr>;
Expand Down
28 changes: 24 additions & 4 deletions hydro_deploy/core/src/localhost/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,19 +156,39 @@ impl LaunchedHost for LaunchedLocalhost {
id: String,
binary: Arc<(String, Vec<u8>, PathBuf)>,
args: &[String],
perf: bool,
) -> Result<Arc<RwLock<dyn LaunchedBinary>>> {
let mut child = Command::new(&binary.2);
let mut command = if perf {
println!("Profiling binary with perf");
let mut tmp = Command::new("perf");
tmp.args([
"record",
"-F",
"5",
"--call-graph",
"dwarf,64000",
"-o",
(id.clone() + ".perf.data").as_str(),
])
.arg(&binary.2)
.args(args);
tmp
} else {
let mut tmp = Command::new(&binary.2);
tmp.args(args);
tmp
};

child
command
.args(args)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped());

#[cfg(not(unix))]
child.kill_on_drop(true);
command.kill_on_drop(true);

let child = child.spawn()?;
let child = command.spawn()?;

Ok(Arc::new(RwLock::new(LaunchedLocalhostBinary::new(
child, id,
Expand Down
3 changes: 3 additions & 0 deletions hydro_deploy/core/src/ssh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ impl<T: LaunchedSSHHost> LaunchedHost for T {
id: String,
binary: Arc<(String, Vec<u8>, PathBuf)>,
args: &[String],
perf: bool,
) -> Result<Arc<RwLock<dyn LaunchedBinary>>> {
let session = self.open_ssh_session().await?;

Expand Down Expand Up @@ -305,6 +306,8 @@ impl<T: LaunchedSSHHost> LaunchedHost for T {
channel
.exec(&format!("{binary_path_string}{args_string}"))
.await?;
// TODO: If perf == true, launch SSH with perf
let _ = perf;

anyhow::Ok(channel)
},
Expand Down
2 changes: 2 additions & 0 deletions hydro_deploy/hydro_cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ impl Deployment {
bin: Option<String>,
example: Option<String>,
profile: Option<String>,
perf: Option<bool>,
features: Option<Vec<String>>,
args: Option<Vec<String>>,
display_id: Option<String>,
Expand All @@ -279,6 +280,7 @@ impl Deployment {
bin,
example,
profile,
perf.unwrap_or(false),
features,
args,
display_id,
Expand Down
15 changes: 15 additions & 0 deletions hydroflow_lang/src/graph/hydroflow_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1423,6 +1423,18 @@ impl HydroflowGraph {
node_id,
&*if write_config.op_short_text {
node.to_name_string()
} else if write_config.op_text_no_imports {
// Remove any lines that start with "use" (imports)
let full_text = node.to_pretty_string();
let mut output = String::new();
for sentence in full_text.split('\n') {
if sentence.trim().starts_with("use") {
continue;
}
output.push('\n');
output.push_str(sentence);
}
output.into()
} else {
node.to_pretty_string()
},
Expand Down Expand Up @@ -1619,6 +1631,9 @@ pub struct WriteConfig {
/// Op text will only be their name instead of the whole source.
#[cfg_attr(feature = "debugging", arg(long))]
pub op_short_text: bool,
/// Op text will exclude any line that starts with "use".
#[cfg_attr(feature = "debugging", arg(long))]
pub op_text_no_imports: bool,
}

/// Enum for choosing between mermaid and dot graph writing.
Expand Down
7 changes: 5 additions & 2 deletions hydroflow_plus/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,8 +336,11 @@ impl<'a, D: LocalDeploy<'a>> BuiltFlow<'a, D> {
build_inner(self)
}

pub fn optimize_default(self) -> HfCompiled<'a, D::GraphId> {
pub fn with_default_optimize(self) -> BuiltFlow<'a, D> {
self.optimize_with(super::persist_pullup::persist_pullup)
.no_optimize()
}

pub fn optimize_default(self) -> HfCompiled<'a, D::GraphId> {
self.with_default_optimize().no_optimize()
}
}
4 changes: 2 additions & 2 deletions hydroflow_plus/src/ir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ pub enum HfPlusLeaf {
impl HfPlusLeaf {
pub fn transform_children(
self,
transform: impl Fn(HfPlusNode, &mut SeenTees) -> HfPlusNode,
mut transform: impl FnMut(HfPlusNode, &mut SeenTees) -> HfPlusNode,
seen_tees: &mut SeenTees,
) -> HfPlusLeaf {
match self {
Expand Down Expand Up @@ -246,7 +246,7 @@ pub type SeenTees = HashMap<*const RefCell<HfPlusNode>, Rc<RefCell<HfPlusNode>>>
impl HfPlusNode {
pub fn transform_children(
self,
transform: impl Fn(HfPlusNode, &mut SeenTees) -> HfPlusNode,
mut transform: impl FnMut(HfPlusNode, &mut SeenTees) -> HfPlusNode,
seen_tees: &mut SeenTees,
) -> HfPlusNode {
match self {
Expand Down
1 change: 1 addition & 0 deletions hydroflow_plus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ pub use builder::FlowBuilder;
pub mod ir;

pub mod persist_pullup;
pub mod profiler;

#[derive(Clone)]
pub struct RuntimeContext<'a> {
Expand Down
114 changes: 114 additions & 0 deletions hydroflow_plus/src/profiler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
use std::cell::RefCell;

use hydroflow::futures::channel::mpsc::UnboundedSender;
use stageleft::*;

use crate as hydroflow_plus;
use crate::ir::*;
use crate::RuntimeContext;

pub fn increment_counter(count: &mut u64) {
*count += 1;
}

fn quoted_any_fn<'a, F: Fn(usize) -> usize + 'a, Q: IntoQuotedMut<'a, F>>(q: Q) -> Q {
q
}

/// Add a profiling node before each node to count the cardinality of its input
fn add_profiling_node<'a>(
node: HfPlusNode,
_context: RuntimeContext<'a>,
counters: RuntimeData<&'a RefCell<Vec<u64>>>,
counter_queue: RuntimeData<&'a RefCell<UnboundedSender<(usize, u64)>>>,
id: &mut u32,
seen_tees: &mut SeenTees,
) -> HfPlusNode {
let my_id = *id;
*id += 1;

let child = node.transform_children(
|node, seen_tees| {
add_profiling_node(node, _context, counters, counter_queue, id, seen_tees)
},
seen_tees,
);
HfPlusNode::Map {
f: quoted_any_fn(q!({
// Put counters on queue
counter_queue
.borrow()
.unbounded_send((my_id as usize, counters.borrow()[my_id as usize]))
.unwrap();
counters.borrow_mut()[my_id as usize] = 0;
move |v| {
hydroflow_plus::profiler::increment_counter(
&mut counters.borrow_mut()[my_id as usize],
);
v
}
}))
.splice()
.into(),
input: Box::new(child),
}
}

/// Count the cardinality of each input and periodically output to a file
pub fn profiling<'a>(
ir: Vec<HfPlusLeaf>,
context: RuntimeContext<'a>,
counters: RuntimeData<&'a RefCell<Vec<u64>>>,
counter_queue: RuntimeData<&'a RefCell<UnboundedSender<(usize, u64)>>>,
) -> Vec<HfPlusLeaf> {
let mut id = 0;
let mut seen_tees = Default::default();
ir.into_iter()
.map(|l| {
l.transform_children(
|node, seen_tees| {
add_profiling_node(node, context, counters, counter_queue, &mut id, seen_tees)
},
&mut seen_tees,
)
})
.collect()
}

#[stageleft::runtime]
#[cfg(test)]
mod tests {
use stageleft::*;

use crate::MultiGraph;

#[test]
fn predicate_pushdown_through_map() {
let flow = crate::builder::FlowBuilder::<MultiGraph>::new();
let process = flow.process(&());

flow.source_iter(&process, q!(0..10))
.all_ticks()
.map(q!(|v| v + 1))
.for_each(q!(|n| println!("{}", n)));

let runtime_context = flow.runtime_context();
let built = flow.extract();

insta::assert_debug_snapshot!(&built.ir);

// Print mermaid
// let mut mermaid_config = WriteConfig {op_text_no_imports: true, ..Default::default()};
// for (_, ir) in built.clone().optimize_default().hydroflow_ir() {
// println!("{}", ir.to_mermaid(&mermaid_config));
// }

let counters = RuntimeData::new("Fake");
let counter_queue = RuntimeData::new("Fake");

let pushed_down = built
.optimize_with(|ir| super::profiling(ir, runtime_context, counters, counter_queue));

insta::assert_debug_snapshot!(&pushed_down.ir);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
---
source: hydroflow_plus/src/profiler.rs
expression: "&pushed_down.ir"
---
[
ForEach {
f: { use crate :: __staged :: profiler :: tests :: * ; | n | println ! ("{}" , n) },
input: Map {
f: { use crate :: __staged :: profiler :: * ; let counter_queue = Fake ; let my_id_copy1 = 0u32 ; let counters_copy1 = Fake ; let my_id_copy2 = 0u32 ; let counters_copy2 = Fake ; let my_id_copy3 = 0u32 ; let context = & context ; let my_id_copy4 = 0u32 ; let counters = Fake ; let my_id = 0u32 ; { counter_queue . borrow () . unbounded_send ((my_id_copy1 as usize , counters_copy1 . borrow () [my_id_copy2 as usize])) . unwrap () ; counters_copy2 . borrow_mut () [my_id_copy3 as usize] = 0 ; move | v | { hydroflow_plus :: profiler :: increment_counter (context . current_tick () , my_id_copy4 , & mut counters . borrow_mut () [my_id as usize] ,) ; v } } },
input: Map {
f: { use crate :: __staged :: profiler :: tests :: * ; | v | v + 1 },
input: Map {
f: { use crate :: __staged :: profiler :: * ; let counter_queue = Fake ; let my_id_copy1 = 1u32 ; let counters_copy1 = Fake ; let my_id_copy2 = 1u32 ; let counters_copy2 = Fake ; let my_id_copy3 = 1u32 ; let context = & context ; let my_id_copy4 = 1u32 ; let counters = Fake ; let my_id = 1u32 ; { counter_queue . borrow () . unbounded_send ((my_id_copy1 as usize , counters_copy1 . borrow () [my_id_copy2 as usize])) . unwrap () ; counters_copy2 . borrow_mut () [my_id_copy3 as usize] = 0 ; move | v | { hydroflow_plus :: profiler :: increment_counter (context . current_tick () , my_id_copy4 , & mut counters . borrow_mut () [my_id as usize] ,) ; v } } },
input: Persist(
Map {
f: { use crate :: __staged :: profiler :: * ; let counter_queue = Fake ; let my_id_copy1 = 2u32 ; let counters_copy1 = Fake ; let my_id_copy2 = 2u32 ; let counters_copy2 = Fake ; let my_id_copy3 = 2u32 ; let context = & context ; let my_id_copy4 = 2u32 ; let counters = Fake ; let my_id = 2u32 ; { counter_queue . borrow () . unbounded_send ((my_id_copy1 as usize , counters_copy1 . borrow () [my_id_copy2 as usize])) . unwrap () ; counters_copy2 . borrow_mut () [my_id_copy3 as usize] = 0 ; move | v | { hydroflow_plus :: profiler :: increment_counter (context . current_tick () , my_id_copy4 , & mut counters . borrow_mut () [my_id as usize] ,) ; v } } },
input: Source {
source: Iter(
{ use crate :: __staged :: profiler :: tests :: * ; 0 .. 10 },
),
location_id: 0,
},
},
),
},
},
},
},
]
Loading

0 comments on commit e6b6d40

Please sign in to comment.