Skip to content

Commit

Permalink
refactor cluster stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
Alw3ys committed Dec 18, 2023
1 parent 7f8a2d3 commit ef0a18d
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 54 deletions.
2 changes: 1 addition & 1 deletion dosei/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub async fn start_server(config: &'static Config) -> anyhow::Result<()> {
let pool = Pool::<Postgres>::connect(&env::var("DATABASE_URL")?).await?;
let shared_pool = Arc::new(pool);
info!("Successfully connected to Postgres");
cluster::start_node(config);
cluster::start_cluster(config)?;
cron::start_job_manager(config, Arc::clone(&shared_pool));
let app = Router::new()
.route("/envs/:owner_id", routing::post(secret::api_set_envs))
Expand Down
69 changes: 53 additions & 16 deletions dosei/src/server/cluster.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,35 @@
use crate::config;
use crate::config::Config;
use dosei_proto::ProtoChannel;
use dosei_proto::{cron_job, ping};
use log::{error, info};
use once_cell::sync::Lazy;
use prost::Message;
use std::error::Error;
use std::sync::Arc;
use tokio::io::AsyncReadExt;
use tokio::net::TcpListener;
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::Mutex;
use tokio::time::sleep;

static CLUSTER_INFO: Lazy<Arc<Mutex<ClusterInfo>>> = Lazy::new(|| {
Arc::new(Mutex::new(ClusterInfo {
replicas: Vec::new(),
}))
});

#[derive(Debug, Clone)]
pub struct ClusterInfo {
pub replicas: Vec<ping::Ping>,
}

impl ClusterInfo {
pub fn add_or_update_replica(&mut self, replica: ping::Ping) {
match self.replicas.iter_mut().find(|r| r.id == replica.id) {
Some(existing_replica) => {
*existing_replica = replica;
pub fn start_cluster(config: &'static Config) -> anyhow::Result<()> {
start_node(config);
if config.is_replica() {
tokio::spawn(async move {
loop {
sleep(Duration::from_secs(1)).await;
update_status(config).await.unwrap();
}
None => {
self.replicas.push(replica);
}
}
});
}
Ok(())
}

pub fn start_node(config: &'static Config) {
Expand Down Expand Up @@ -85,3 +84,41 @@ pub fn start_node(config: &'static Config) {
}
});
}

async fn update_status(config: &'static Config) -> Result<(), Box<dyn Error>> {
let node_info = ping::Ping {
id: config.node_info.id.to_string(),
node_type: i32::from(config.node_info.node_type),
address: config.address.to_string(),
version: config::VERSION.to_string(),
};

let mut buf = Vec::with_capacity(node_info.encoded_len() + 1);
buf.push(ping::Ping::PROTO_ID);

node_info.encode(&mut buf)?;

let primary_node_address = config.get_primary_node_address().to_string();
let mut stream = TcpStream::connect(primary_node_address).await?;

stream.write_all(&buf).await?;
Ok(())
}

#[derive(Debug, Clone)]
pub struct ClusterInfo {
pub replicas: Vec<ping::Ping>,
}

impl ClusterInfo {
pub fn add_or_update_replica(&mut self, replica: ping::Ping) {
match self.replicas.iter_mut().find(|r| r.id == replica.id) {
Some(existing_replica) => {
*existing_replica = replica;
}
None => {
self.replicas.push(replica);
}
}
}
}
37 changes: 0 additions & 37 deletions dosei/src/server/cron.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use crate::config;
use crate::config::Config;
use crate::schema::{CronJob, Job};
use axum::http::StatusCode;
Expand All @@ -13,32 +12,19 @@ use bollard::system::EventsOptions;
use bollard::Docker;
use chrono::Utc;
use cron::Schedule;
use dosei_proto::{ping, ProtoChannel};
use futures_util::stream::StreamExt;
use gcp_auth::AuthenticationManager;
use log::{error, info};
use prost::Message;
use serde::Deserialize;
use sqlx::{Pool, Postgres};
use std::collections::HashMap;
use std::error::Error;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio::time::sleep;
use uuid::Uuid;

pub fn start_job_manager(config: &'static Config, pool: Arc<Pool<Postgres>>) {
if config.is_replica() {
tokio::spawn(async move {
loop {
sleep(Duration::from_secs(1)).await;
update_status(config).await.unwrap();
}
});
}
tokio::spawn(async move {
loop {
run_jobs(config, Arc::clone(&pool)).await;
Expand All @@ -50,29 +36,6 @@ pub fn start_job_manager(config: &'static Config, pool: Arc<Pool<Postgres>>) {
});
}

async fn update_status(config: &'static Config) -> Result<(), Box<dyn Error>> {
let node_info = ping::Ping {
id: config.node_info.id.to_string(),
node_type: i32::from(config.node_info.node_type),
address: config.address.to_string(),
version: config::VERSION.to_string(),
};

let mut buf = Vec::with_capacity(node_info.encoded_len() + 1);
buf.push(ping::Ping::PROTO_ID);

// Serialize the CronJob instance to a buffer
node_info.encode(&mut buf)?;

// Connect to a peer
let primary_node_address = config.get_primary_node_address().to_string();
let mut stream = TcpStream::connect(primary_node_address).await?;

// Write the serialized data
stream.write_all(&buf).await?;
Ok(())
}

#[derive(Deserialize)]
pub struct CreateJobBody {
schedule: String,
Expand Down

0 comments on commit ef0a18d

Please sign in to comment.