Skip to content

Commit

Permalink
Merge branch 'main' into profiler
Browse files Browse the repository at this point in the history
  • Loading branch information
davidchuyaya committed Mar 27, 2024
2 parents 16679c2 + ee4e33c commit 6ba298a
Show file tree
Hide file tree
Showing 55 changed files with 748 additions and 484 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,5 @@ __pycache__/
perf.data
perf.data.old
flamegraph.svg

/rustc-ice-*.txt
6 changes: 4 additions & 2 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
{
"rust-analyzer.runnableEnv": [
{
// Set output levels for `tracing` logging.
"env": {
// Stack backtraces.
"RUST_BACKTRACE": "full",
// Set output levels for `tracing` logging.
"RUST_LOG": "debug,hydroflow=trace",
"INSTA_FORCE_PASS": "1"
// Make sure all snapshots are written instead of just the first failure.
"INSTA_FORCE_PASS": "1",
}
}
],
Expand Down
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions benches/benches/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
fork_join_*.hf
159 changes: 13 additions & 146 deletions benches/benches/fork_join.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use hydroflow::hydroflow_syntax;
use hydroflow::scheduled::graph::Hydroflow;
use hydroflow::scheduled::graph_ext::GraphExt;
use hydroflow::scheduled::handoff::{Iter, VecHandoff};
Expand Down Expand Up @@ -82,6 +83,17 @@ fn benchmark_hydroflow(c: &mut Criterion) {
});
}

fn benchmark_hydroflow_surface(c: &mut Criterion) {
c.bench_function("fork_join/hydroflow/surface", |b| {
b.iter(|| {
let mut hf = hydroflow_syntax! {
source_iter(0..NUM_INTS) -> import!("fork_join_20.hf") -> for_each(|x| { black_box(x); });
};
hf.run_available();
})
});
}

fn benchmark_hydroflow_builder(c: &mut Criterion) {
c.bench_function("fork_join/hydroflow_builder", |b| {
b.iter(|| {
Expand Down Expand Up @@ -151,157 +163,12 @@ fn benchmark_timely(c: &mut Criterion) {
});
}

// fn benchmark_spinach(c: &mut Criterion) {
// c.bench_function("spinach", |b| {
// b.to_async(
// tokio::runtime::Builder::new_current_thread()
// .build()
// .unwrap(),
// )
// .iter(|| {
// async {
// use spinachflow::comp::Comp;

// type MyLatRepr =
// spinachflow::lattice::set_union::SetUnionRepr<spinachflow::tag::VEC, usize>;
// let op = <spinachflow::op::OnceOp<MyLatRepr>>::new((0..NUM_INTS).collect());

// struct Even();
// impl spinachflow::func::unary::Morphism for Even {
// type InLatRepr = MyLatRepr;
// type OutLatRepr = MyLatRepr;
// fn call<Y: spinachflow::hide::Qualifier>(
// &self,
// item: spinachflow::hide::Hide<Y, Self::InLatRepr>,
// ) -> spinachflow::hide::Hide<Y, Self::OutLatRepr> {
// item.filter(|i| 0 == i % 2)
// }
// }

// struct Odds();
// impl spinachflow::func::unary::Morphism for Odds {
// type InLatRepr = MyLatRepr;
// type OutLatRepr = MyLatRepr;
// fn call<Y: spinachflow::hide::Qualifier>(
// &self,
// item: spinachflow::hide::Hide<Y, Self::InLatRepr>,
// ) -> spinachflow::hide::Hide<Y, Self::OutLatRepr> {
// item.filter(|i| 1 == i % 2)
// }
// }

// ///// MAGIC NUMBER!!!!!!!! is NUM_OPS
// seq_macro::seq!(N in 0..20 {
// let [ op_even, op_odds ] = spinachflow::op::fixed_split::<_, 2>(op);
// let op_even = spinachflow::op::MorphismOp::new(op_even, Even());
// let op_odds = spinachflow::op::MorphismOp::new(op_odds, Odds());
// let op = spinachflow::op::MergeOp::new(op_even, op_odds);
// let op = spinachflow::op::DynOpDelta::new(Box::new(op));
// });

// let comp = spinachflow::comp::NullComp::new(op);
// spinachflow::comp::CompExt::run(&comp).await.unwrap_err();
// }
// });
// });
// }

// fn benchmark_spinach_switch(c: &mut Criterion) {
// c.bench_function("spinach w/ switch", |b| {
// b.to_async(
// tokio::runtime::Builder::new_current_thread()
// .build()
// .unwrap(),
// )
// .iter(|| {
// async {
// use spinachflow::comp::Comp;

// type MyLatRepr =
// spinachflow::lattice::set_union::SetUnionRepr<spinachflow::tag::VEC, usize>;
// let op = <spinachflow::op::OnceOp<MyLatRepr>>::new((0..NUM_INTS).collect());

// struct SwitchEvenOdd();
// impl spinachflow::func::unary::Morphism for SwitchEvenOdd {
// type InLatRepr = MyLatRepr;
// type OutLatRepr = spinachflow::lattice::pair::PairRepr<MyLatRepr, MyLatRepr>;
// fn call<Y: spinachflow::hide::Qualifier>(
// &self,
// item: spinachflow::hide::Hide<Y, Self::InLatRepr>,
// ) -> spinachflow::hide::Hide<Y, Self::OutLatRepr> {
// let (a, b) = item.switch(|i| 0 == i % 2);
// spinachflow::hide::Hide::zip(a, b)
// }
// }

// ///// MAGIC NUMBER!!!!!!!! is NUM_OPS
// seq_macro::seq!(N in 0..20 {
// let op = spinachflow::op::MorphismOp::new(op, SwitchEvenOdd());
// let ( op_even, op_odds ) = spinachflow::op::SwitchOp::new(op);
// let op = spinachflow::op::MergeOp::new(op_even, op_odds);
// let op = spinachflow::op::DynOpDelta::new(Box::new(op));
// });

// let comp = spinachflow::comp::NullComp::new(op);
// spinachflow::comp::CompExt::run(&comp).await.unwrap_err();
// }
// });
// });
// }

// fn benchmark_spinachflow_symm(c: &mut Criterion) {
// c.bench_function("spinachflow (symmetric)", |b| {
// b.to_async(
// tokio::runtime::Builder::new_current_thread()
// .build()
// .unwrap(),
// )
// .iter(|| {
// async {
// use spinachflow::futures::StreamExt;
// use spinachflow::futures::future::ready;

// let stream = spinachflow::futures::stream::iter(0..NUM_INTS);

// ///// MAGIC NUMBER!!!!!!!! is NUM_OPS
// seq_macro::seq!(N in 0..20 {
// let splitter = spinachflow::stream::Splitter::new(stream);
// let mut i = 0;
// let splits = [(); BRANCH_FACTOR].map(|_| {
// let j = i;
// i += 1;
// splitter.add_split().filter(move |x| ready(j == x % BRANCH_FACTOR))
// });
// let stream = spinachflow::stream::SelectArr::new(splits);
// let stream: std::pin::Pin<Box<dyn spinachflow::futures::Stream<Item = usize>>> = Box::pin(stream);
// });

// let mut stream = stream;
// loop {
// let item = stream.next().await;
// if item.is_none() {
// break;
// }
// }
// }
// });
// });
// }

// criterion_group!(
// name = fork_join_dataflow;
// config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None)));
// targets = benchmark_babyflow
// );
// criterion_group!(fork_join_dataflow, benchmark_timely,);
criterion_group!(
fork_join_dataflow,
benchmark_hydroflow,
benchmark_hydroflow_surface,
benchmark_hydroflow_builder,
benchmark_timely,
benchmark_raw,
// benchmark_spinach,
// benchmark_spinach_switch,
// benchmark_spinachflow_symm,
);
criterion_main!(fork_join_dataflow);
37 changes: 37 additions & 0 deletions benches/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use std::fs::File;
use std::io::{BufWriter, Write};
use std::path::PathBuf;

const NUM_OPS: usize = 20;

pub fn main() {
if let Err(err) = fork_join() {
eprintln!("benches/build.rs error: {:?}", err);
}
}

pub fn fork_join() -> std::io::Result<()> {
let path = PathBuf::from_iter([
env!("CARGO_MANIFEST_DIR"),
"benches",
&format!("fork_join_{}.hf", NUM_OPS),
]);
let file = File::create(path)?;
let mut write = BufWriter::new(file);

writeln!(write, "a0 = mod -> tee();")?;

for i in 0..NUM_OPS {
if i > 0 {
writeln!(write, "a{} = union() -> tee();", i)?;
}
writeln!(write, "a{} -> filter(|x| x % 2 == 0) -> a{};", i, i + 1)?;
writeln!(write, "a{} -> filter(|x| x % 2 == 1) -> a{};", i, i + 1)?;
}

writeln!(write, "a{} = union() -> mod;", NUM_OPS)?;

write.flush()?;

Ok(())
}
2 changes: 1 addition & 1 deletion docs/docs/hydroflow_plus/clusters.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ Clusters solve this by providing an nearly-identical API to processes, but repre
Instantiating clusters is done using the `cluster` method on `FlowBuilder`, taking a `ClusterSpec`:
```rust
pub fn my_flow<'a, D: Deploy<'a>>(
flow: &'a FlowBuilder<'a, D>,
flow: &FlowBuilder<'a, D>,
cluster_spec: &impl ClusterSpec<'a, D>
) {
let cluster = flow.cluster(cluster_spec);
Expand Down
2 changes: 1 addition & 1 deletion docs/docs/hydroflow_plus/process_streams.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ To create a process, we must take a `ProcessSpec` as an argument to our function

```rust
pub fn my_flow<'a, D: Deploy<'a>>(
flow: &'a FlowBuilder<'a, D>,
flow: &FlowBuilder<'a, D>,
process_spec: &impl ProcessSpec<'a, D>
) {
...
Expand Down
6 changes: 3 additions & 3 deletions docs/docs/hydroflow_plus/quickstart/clusters.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use hydroflow_plus::*;
use stageleft::*;

pub fn broadcast<'a, D: Deploy<'a>>(
flow: &'a FlowBuilder<'a, D>,
flow: &FlowBuilder<'a, D>,
process_spec: &impl ProcessSpec<'a, D>,
cluster_spec: &impl ClusterSpec<'a, D>
) {
Expand Down Expand Up @@ -56,10 +56,10 @@ use hydroflow_plus_cli_integration::{CLIRuntime, HydroflowPlusMeta};

#[stageleft::entry]
pub fn broadcast_runtime<'a>(
flow: &'a FlowBuilder<'a, CLIRuntime>,
flow: FlowBuilder<'a, CLIRuntime>,
cli: RuntimeData<&'a HydroCLI<HydroflowPlusMeta>>,
) -> impl Quoted<'a, Hydroflow<'a>> {
broadcast(flow, &cli, &cli);
broadcast(&flow, &cli, &cli);
flow.extract()
.optimize_default()
.with_dynamic_id(q!(cli.meta.subgraph_id))
Expand Down
12 changes: 6 additions & 6 deletions docs/docs/hydroflow_plus/quickstart/distributed.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use hydroflow_plus::*;
use stageleft::*;

pub fn first_ten<'a, D: LocalDeploy<'a>>(
flow: &'a FlowBuilder<'a, D>,
flow: &FlowBuilder<'a, D>,
process_spec: &impl ProcessSpec<'a, D>
) {
let process = flow.process(process_spec);
Expand All @@ -26,7 +26,7 @@ use hydroflow_plus::*;
use stageleft::*;

pub fn first_ten_distributed<'a, D: Deploy<'a>>(
flow: &'a FlowBuilder<'a, D>,
flow: &FlowBuilder<'a, D>,
process_spec: &impl ProcessSpec<'a, D>
) {
let process = flow.process(process_spec);
Expand All @@ -44,18 +44,18 @@ numbers
```

## The Runtime
Now that our graph spans multiple processes, our runtime entrypoint will involve multiple subgraphs. This means we can't get away with `build_single`. Instead, we must take the subgraph ID as a runtime parameter (`subgraph_id`) to select the appropriate graph. In addition, our dataflow involves the network, so we take a `HydroCLI` runtime parameter (`cli`) so that processes can look up their network connections and instantiate the flow graph with access to it.
Now that our graph spans multiple processes, our runtime entrypoint will involve multiple subgraphs. This means we can't get away with just the optimized dataflow. Instead, we must take the subgraph ID as a runtime parameter through `with_dynamic_id` to select the appropriate graph. In addition, our dataflow involves the network, so we take a `HydroCLI` runtime parameter (`cli`) so that processes can look up their network connections and instantiate the flow graph with access to it.

```rust title="flow/src/first_ten_distributed.rs"
use hydroflow_plus::util::cli::HydroCLI;
use hydroflow_plus_cli_integration::{CLIRuntime, HydroflowPlusMeta};

#[stageleft::entry]
pub fn first_ten_distributed_runtime<'a>(
flow: &'a FlowBuilder<'a, CLIRuntime>,
flow: FlowBuilder<'a, CLIRuntime>,
cli: RuntimeData<&'a HydroCLI<HydroflowPlusMeta>>,
) -> impl Quoted<'a, Hydroflow<'a>> {
first_ten_distributed(flow, &cli);
first_ten_distributed(&flow, &cli);
flow.extract()
.optimize_default()
.with_dynamic_id(q!(cli.meta.subgraph_id))
Expand All @@ -67,7 +67,7 @@ The corresponding binary in `src/bin/first_ten_distributed.rs` then instantiates
```rust title="flow/src/bin/first_ten_distributed.rs"
#[tokio::main]
async fn main() {
hydroflow_plus::util::cli::launch(
hydroflow_plus::util::cli::launch!(
|ports| flow::first_ten_distributed_runtime!(ports)
).await;
}
Expand Down
10 changes: 5 additions & 5 deletions docs/docs/hydroflow_plus/quickstart/structure.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use hydroflow_plus::*;
use stageleft::*;

pub fn first_ten<'a, D: LocalDeploy<'a>>(
flow: &'a FlowBuilder<'a, D>,
flow: &FlowBuilder<'a, D>,
process_spec: &impl ProcessSpec<'a, D>
) {}
```
Expand All @@ -40,7 +40,7 @@ To build a Hydroflow+ application, we need to define a dataflow that spans multi

```rust
pub fn first_ten<'a, D: LocalDeploy<'a>>(
flow: &'a FlowBuilder<'a, D>,
flow: &FlowBuilder<'a, D>,
process_spec: &impl ProcessSpec<'a, D>
) {
let process = flow.process(process_spec);
Expand All @@ -66,15 +66,15 @@ Next, we need to instantiate our dataflow into a runnable Rust binary. We do thi

To define the entrypoint, we use the `#[stageleft::entry]` macro, which takes the graph being built and returns a generated Hydroflow program. We define this as a new function `first_ten_runtime`. In this first example, we assume there is a single process so that we do not need to specify how the graph is laid out across processes. We specify this by using the `SingleProcessGraph` type parameter on `FlowBuilder`.

Having done that, we can use some simple defaults for "distributing" this single-process deployment. First, we use `()` as an argument to `first_ten` to choose the default process spec. Then we use the `build_single` method to generate the Hydroflow program.
Having done that, we can use some simple defaults for "distributing" this single-process deployment. First, we use `()` as an argument to `first_ten` to choose the default process spec. Then we use the `optimize_default` method to generate the Hydroflow program with default optimizations.


```rust title="flow/src/first_ten.rs"
#[stageleft::entry]
pub fn first_ten_runtime<'a>(
flow: &'a FlowBuilder<'a, SingleProcessGraph>
flow: FlowBuilder<'a, SingleProcessGraph>
) -> impl Quoted<'a, Hydroflow<'a>> {
first_ten(flow, &() /* for a single process graph */);
first_ten(&flow, &() /* for a single process graph */);
flow.extract().optimize_default() // : impl Quoted<'a, Hydroflow<'a>>
}
```
Expand Down
Loading

0 comments on commit 6ba298a

Please sign in to comment.