Skip to content

Commit

Permalink
Paxos replicas implemented, blocked on error: "Invalid node id: 1"
Browse files Browse the repository at this point in the history
  • Loading branch information
davidchuyaya committed Apr 5, 2024
1 parent 398ed25 commit 7b47f92
Show file tree
Hide file tree
Showing 7 changed files with 622 additions and 0 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions hydroflow_plus_test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ hydroflow_plus = { path = "../hydroflow_plus", version = "^0.6.0" }
tokio = { version = "1.16", features = [ "full" ] }
stageleft = { path = "../stageleft", version = "^0.2.0" }
hydroflow_plus_cli_integration = { path = "../hydro_deploy/hydroflow_plus_cli_integration", version = "^0.6.0" }
serde = { version = "1", features = [ "derive" ] }
rand = "0.8.5"

hydroflow_plus_test_macro = { path = "../hydroflow_plus_test_macro" }
Expand Down
17 changes: 17 additions & 0 deletions hydroflow_plus_test/src/bin/paxos.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// cannot use hydroflow::main because connect_local_blocking causes a deadlock
#[tokio::main]
async fn main() {
let f = 1;
let num_clients_per_node = 1; // TODO: Need to change based on experiment between 1, 50, 100.
let kv_num_keys = 1;
let kv_value_size = 16;

hydroflow_plus::launch!(|ports| hydroflow_plus_test::paxos::paxos_runtime!(
ports,
&f,
&num_clients_per_node,
&kv_num_keys,
&kv_value_size,
))
.await;
}
1 change: 1 addition & 0 deletions hydroflow_plus_test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub mod cluster;
pub mod first_ten;
pub mod negation;
pub mod networked;
pub mod paxos;

#[stageleft::entry(UnboundedReceiverStream<u32>)]
pub fn teed_join<'a, S: Stream<Item = u32> + Unpin + 'a>(
Expand Down
260 changes: 260 additions & 0 deletions hydroflow_plus_test/src/paxos.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,260 @@
use std::collections::HashMap;

use hydroflow_plus::*;
use hydroflow_plus::util::cli::HydroCLI;
use hydroflow_plus_cli_integration::*;
use stageleft::*;
use serde::{Serialize, Deserialize};

#[derive(Serialize, Deserialize, PartialEq, Eq, Clone)]
struct ClientPayload {
key: u32,
value: String,
}

#[derive(Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Clone)]
struct ReplicaPayload { // Note: Important that seq is the first member of the struct for sorting
seq: i32,
key: u32,
value: String,
}

#[derive(Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Clone)]
struct Ballot { // Note: Important that num comes before id, since Ord is defined lexicographically
num: u32,
id: u32,
}

#[derive(Serialize, Deserialize, Clone)]
struct P1a {
ballot: Ballot,
}

#[derive(Serialize, Deserialize, Clone)]
struct LogValue {
ballot: Ballot,
slot: i32,
value: ClientPayload,
}

#[derive(Serialize, Deserialize, Clone)]
struct P1b {
ballot: Ballot,
max_ballot: Ballot,
accepted: Vec<LogValue>,
}

#[derive(Serialize, Deserialize, PartialEq, Eq, Clone)]
struct P2a {
ballot: Ballot,
slot: i32,
value: ClientPayload,
}

#[derive(Serialize, Deserialize, PartialEq, Eq, Clone)]
struct P2b {
ballot: Ballot,
max_ballot: Ballot,
slot: i32,
value: ClientPayload,
}


pub fn paxos<'a, D: Deploy<'a, ClusterId = u32>>(
flow: &FlowBuilder<'a, D>,
proposers_spec: &impl ClusterSpec<'a, D>,
acceptors_spec: &impl ClusterSpec<'a, D>,
clients_spec: &impl ClusterSpec<'a, D>,
replicas_spec: &impl ClusterSpec<'a, D>,
f: RuntimeData<&'a usize>,
num_clients_per_node: RuntimeData<&'a usize>,
kv_num_keys: RuntimeData<&'a usize>, // How many unique keys are expected by the state machine kv store?
kv_value_size: RuntimeData<&'a usize>, // Number of bytes in each value in the kv store
) -> (D::Cluster, D::Cluster, D::Cluster, D::Cluster) {
let proposers = flow.cluster(proposers_spec);
let acceptors = flow.cluster(acceptors_spec);
let clients = flow.cluster(clients_spec);
let replicas = flow.cluster(replicas_spec);

/* Clients */
let clients_to_replicas = flow
.source_iter(&clients, q!([
// ReplicaPayload { seq: 1, key: 10, value: "Hello again, Berkeley!".to_string() },
// ReplicaPayload { seq: 2, key: 10, value: "Goodbye, Berkeley".to_string() },
// ReplicaPayload { seq: 0, key: 10, value: "Hello, Berkeley!".to_string() },
// ReplicaPayload { seq: 3, key: 20, value: "Hello, SF".to_string() },
// ReplicaPayload { seq: 5, key: 20, value: "Goodbye, SF".to_string() }
(1, 10, "Hello again, Berkeley!".to_string()),
(2, 10, "Goodbye, Berkeley".to_string()),
(0, 10, "Hello, Berkeley!".to_string()),
(3, 20, "Hello, SF".to_string()),
(5, 20, "Goodbye, SF".to_string())
]))
.broadcast_bincode_interleaved(&replicas);
// TODO: Results should be: {10: "Goodbye, Berkeley", 20: "Hello, SF"}. The last "Goodbye, SF" should not be executed because there's a hole between sequence 2 and 4.

/* Replicas. All relations for replicas will be prefixed with r. */

let (r_buffered_payloads_complete_cycle, r_buffered_payloads) = flow.cycle(&replicas);
let r_sorted_payloads = clients_to_replicas
.tick_batch()
.union(r_buffered_payloads) // Combine with all payloads that we've received and not processed yet
.sort();
// Create a cycle since we'll use this seq before we define it
let (r_highest_seq_complete_cycle, r_highest_seq) = flow.cycle(&replicas);
let empty_slot = flow.source_iter(&replicas, q!(vec![-1]));
// Either the max sequence number executed so far or -1. Need to union otherwise r_highest_seq is empty and joins with it will fail
let r_highest_seq_with_default = r_highest_seq
.union(empty_slot);
// Find highest the sequence number of any payload that can be processed in this tick. This is the payload right before a hole.
let r_highest_seq_processable_payload = r_sorted_payloads
.clone()
.cross_product(r_highest_seq_with_default)
.fold(q!(|| -1), q!(| filled_slot, (sorted_payload, highest_seq)| { // Note: This function only works if the input is sorted on seq.
let next_slot = std::cmp::max(*filled_slot, highest_seq);

*filled_slot = if sorted_payload.0 == next_slot + 1 {
sorted_payload.0
} else {
*filled_slot
};
}));
// Find all payloads that can and cannot be processed in this tick.
let r_processable_payloads = r_sorted_payloads
.clone()
.cross_product(r_highest_seq_processable_payload.clone())
.filter(q!(|(sorted_payload, highest_seq)| sorted_payload.0 <= *highest_seq))
.map(q!(|(sorted_payload, _)| sorted_payload));
let r_new_non_processable_payloads = r_sorted_payloads
.clone()
.cross_product(r_highest_seq_processable_payload.clone())
.filter(q!(|(sorted_payload, highest_seq)| sorted_payload.0 > *highest_seq))
.map(q!(|(sorted_payload, _)| sorted_payload))
.defer_tick(); // Save these, we can process them once the hole has been filled
r_buffered_payloads_complete_cycle.complete(r_new_non_processable_payloads);

let r_kv_store = r_processable_payloads
.all_ticks() // Optimization: all_ticks() + fold() = fold<static>, where the state of the previous fold is saved and persisted values are deleted.
.fold(q!(|| (HashMap::<u32, String>::new(), -1)), q!(|state, payload| {
let ref mut kv_store = state.0;
let ref mut last_seq = state.1;
kv_store.insert(payload.1, payload.2);
debug_assert!(payload.0 == *last_seq + 1, "Hole in log between seq {} and {}", *last_seq, payload.0);
*last_seq = payload.0;
println!("Replica kv store: {:?}", kv_store);
}));
// Update the highest seq for the next tick
let r_new_highest_seq = r_kv_store
.map(q!(|(kv_store, highest_seq)| highest_seq))
.defer_tick();
r_highest_seq_complete_cycle.complete(r_new_highest_seq);

(proposers, acceptors, clients, replicas)
}

#[stageleft::entry]
pub fn paxos_runtime<'a>(
flow: FlowBuilder<'a, CLIRuntime>,
cli: RuntimeData<&'a HydroCLI<HydroflowPlusMeta>>,
f: RuntimeData<&'a usize>,
num_clients_per_node: RuntimeData<&'a usize>,
kv_num_keys: RuntimeData<&'a usize>,
kv_value_size: RuntimeData<&'a usize>,
) -> impl Quoted<'a, Hydroflow<'a>> {
let _ = paxos(&flow, &cli, &cli, &cli, &cli, f, num_clients_per_node, kv_num_keys, kv_value_size);
flow.extract()
.optimize_default()
.with_dynamic_id(q!(cli.meta.subgraph_id))
}

#[stageleft::runtime]
#[cfg(test)]
mod tests {
use std::cell::RefCell;

use hydro_deploy::{Deployment, HydroflowCrate};
use hydroflow_plus_cli_integration::{
DeployClusterSpec, DeployCrateWrapper, DeployProcessSpec,
};
use stageleft::RuntimeData;

#[tokio::test]
async fn paxos() {
let deployment = RefCell::new(Deployment::new());
let localhost = deployment.borrow_mut().Localhost();

let builder = hydroflow_plus::FlowBuilder::new();
let f = 1;
let num_clients = 1;
let num_replicas = 1;
let (proposers, acceptors, clients, replicas) = super::paxos(
&builder,
&DeployClusterSpec::new(|| {
(0..(f+1))
.map(|_| {
deployment.borrow_mut().add_service(
HydroflowCrate::new(".", localhost.clone())
.bin("paxos")
.profile("dev"),
)
})
.collect()
}),
&DeployClusterSpec::new(|| {
(0..(2*f+1))
.map(|_| {
deployment.borrow_mut().add_service(
HydroflowCrate::new(".", localhost.clone())
.bin("paxos")
.profile("dev"),
)
})
.collect()
}),
&DeployClusterSpec::new(|| {
(0..num_clients)
.map(|_| {
deployment.borrow_mut().add_service(
HydroflowCrate::new(".", localhost.clone())
.bin("paxos")
.profile("dev"),
)
})
.collect()
}),
&DeployClusterSpec::new(|| {
(0..num_replicas)
.map(|_| {
deployment.borrow_mut().add_service(
HydroflowCrate::new(".", localhost.clone())
.bin("paxos")
.profile("dev"),
)
})
.collect()
}),
RuntimeData::new("Fake"),
RuntimeData::new("Fake"),
RuntimeData::new("Fake"),
RuntimeData::new("Fake"),
);

// insta::assert_debug_snapshot!(builder.extract().ir());

let mut deployment = deployment.into_inner();

deployment.deploy().await.unwrap();

// let replicas_stdouts =
// futures::future::join_all(replicas.members.iter().map(|replica| replica.stdout())).await;

deployment.start().await.unwrap();

// TODO: Figure out what the printed map looks like
// for (i, stdout) in r_stdouts.into_iter().enumerate() {
// assert_eq!(
// stdout.recv().await.unwrap(),
// format!("cluster received: ({}, {})", i, j));
// }
}
}
Loading

0 comments on commit 7b47f92

Please sign in to comment.