Skip to content

Commit

Permalink
style: clippy and fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
bpetit committed Jan 5, 2024
1 parent 1103155 commit 4fec833
Show file tree
Hide file tree
Showing 11 changed files with 370 additions and 253 deletions.
3 changes: 1 addition & 2 deletions src/exporters/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use std::{
path::{Path, PathBuf},
thread,
time::{Duration, Instant},
sync::mpsc::Receiver
};

/// An Exporter that writes power consumption data of the host
Expand Down Expand Up @@ -157,7 +156,7 @@ struct Report {

impl Exporter for JsonExporter {
/// Runs [iterate()] every `step` until `timeout`
fn run(&mut self, channel: &Receiver<u8>) {
fn run(&mut self) {
let step = self.time_step;
info!("Measurement step is: {step:?}");

Expand Down
18 changes: 2 additions & 16 deletions src/exporters/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,13 @@ pub mod utils;
pub mod warpten;
use crate::sensors::{
utils::{current_system_time_since_epoch, IProcess},
RecordGenerator, Topology, Record
RecordGenerator, Topology,
};
use chrono::Utc;
use std::collections::HashMap;
use std::fmt;
use std::time::Duration;
use utils::get_scaphandre_version;
use std::sync::mpsc::Receiver;
#[cfg(feature = "containers")]
use {
docker_sync::{container::Container, Docker},
Expand Down Expand Up @@ -109,22 +108,10 @@ impl fmt::Debug for MetricValueType {
/// with the structs provided by the sensor.
pub trait Exporter {
/// Runs the exporter.
fn run(&mut self, channel: &Receiver<u8>);
fn run(&mut self);

/// The name of the kind of the exporter, for example "json".
fn kind(&self) -> &str;

fn watch_signal(&mut self, channel: &Receiver<u8>) -> Option<i32> {
match channel.try_recv() {
Ok(received) => {
info!("Received signal: {}", received);
Some(1)
},
Err(_) => {
None
}
}
}
}

/// MetricGenerator is an exporter helper structure to collect Scaphandre metrics.
Expand Down Expand Up @@ -643,7 +630,6 @@ impl MetricGenerator {
description: String::from("Total swap space on the host, in bytes."),
metric_value: MetricValueType::Text(metric_value.value),
});

}

/// Generate socket metrics.
Expand Down
6 changes: 3 additions & 3 deletions src/exporters/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
//! [scrape](https://prometheus.io/docs/prometheus/latest/getting_started).
use super::utils;
use crate::current_system_time_since_epoch;
use crate::sensors::utils::current_system_time_since_epoch;
use crate::exporters::{Exporter, MetricGenerator, MetricValueType};
use crate::sensors::{Sensor, Topology};
use chrono::Utc;
Expand All @@ -16,9 +16,9 @@ use std::{
collections::HashMap,
fmt::Write,
net::{IpAddr, Ipv4Addr, SocketAddr},
sync::mpsc::Receiver,
sync::{Arc, Mutex},
time::Duration,
sync::mpsc::Receiver
};

/// Default ipv4/ipv6 address to expose the service is any
Expand Down Expand Up @@ -73,7 +73,7 @@ impl PrometheusExporter {

impl Exporter for PrometheusExporter {
/// Starts an HTTP server to expose the metrics in Prometheus format.
fn run(&mut self, channel: &Receiver<u8>) {
fn run(&mut self) {
info!(
"{}: Starting Prometheus exporter",
Utc::now().format("%Y-%m-%dT%H:%M:%S")
Expand Down
11 changes: 1 addition & 10 deletions src/exporters/prometheuspush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use isahc::{prelude::*, Request};
use std::fmt::Write;
use std::thread;
use std::time::Duration;
use std::sync::mpsc::Receiver;

pub struct PrometheusPushExporter {
topo: Topology,
Expand Down Expand Up @@ -73,7 +72,7 @@ impl PrometheusPushExporter {
}

impl Exporter for PrometheusPushExporter {
fn run(&mut self, channel: &Receiver<u8>) {
fn run(&mut self) {
info!(
"{}: Starting Prometheus Push exporter",
Utc::now().format("%Y-%m-%dT%H:%M:%S")
Expand All @@ -97,10 +96,6 @@ impl Exporter for PrometheusPushExporter {
);

loop {
if self.watch_signal(channel).is_some() {
info!("Daemon/Service has received a stop signal.");
break;
}
metric_generator.topology.refresh();
metric_generator.gen_all_metrics();
let mut body = String::from("");
Expand Down Expand Up @@ -159,10 +154,6 @@ impl Exporter for PrometheusPushExporter {
}
}

if self.watch_signal(channel).is_some() {
info!("Daemon/Service has received a stop signal.");
break;
}
thread::sleep(Duration::new(self.args.step, 0));
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/exporters/qemu.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::exporters::Exporter;
use crate::sensors::Topology;
use crate::sensors::{utils::ProcessRecord, Sensor};
use std::{fs, io, thread, time};
use std::sync::mpsc::Receiver;
use std::{fs, io, thread, time};

/// An Exporter that extracts power consumption data of running
/// Qemu/KVM virtual machines on the host and store those data
Expand Down
2 changes: 1 addition & 1 deletion src/exporters/riemann.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use riemann_client::proto::{Attribute, Event};
use riemann_client::Client;
use std::collections::HashMap;
use std::convert::TryFrom;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use std::sync::mpsc::Receiver;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

/// Riemann server default ipv4/ipv6 address
const DEFAULT_IP_ADDRESS: &str = "localhost";
Expand Down
7 changes: 2 additions & 5 deletions src/exporters/stdout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use regex::Regex;
use std::fmt::Write;
use std::thread;
use std::time::{Duration, Instant};
use std::sync::mpsc::Receiver;

/// An Exporter that displays power consumption data of the host
/// and its processes on the standard output of the terminal.
Expand Down Expand Up @@ -54,7 +53,7 @@ pub struct ExporterArgs {

impl Exporter for StdoutExporter {
/// Runs [iterate()] every `step` until `timeout`
fn run(&mut self, channel: &Receiver<u8>) {
fn run(&mut self) {
let time_step = Duration::from_secs(self.args.step);
let time_limit = if self.args.timeout < 0 {
None
Expand Down Expand Up @@ -118,7 +117,7 @@ impl StdoutExporter {
host_power_source = src.to_string()
}
&m.metric_value
},
}
None => &none_value,
};

Expand Down Expand Up @@ -151,8 +150,6 @@ impl StdoutExporter {

let mut to_print = format!("Socket{socket_id}\t{power_str} W |\t");



let domains = metrics.iter().filter(|x| {
x.name == "scaph_domain_power_microwatts"
&& x.attributes.get("socket_id").unwrap() == &socket_id
Expand Down
8 changes: 0 additions & 8 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ use sensors::msr_rapl;
#[cfg(target_os = "linux")]
use sensors::powercap_rapl;

use std::time::{Duration, SystemTime};

/// Create a new [`Sensor`] instance with the default sensor available,
/// with its default options.
pub fn get_default_sensor() -> impl sensors::Sensor {
Expand All @@ -30,12 +28,6 @@ pub fn get_default_sensor() -> impl sensors::Sensor {
return msr_rapl::MsrRAPLSensor::new();
}

fn current_system_time_since_epoch() -> Duration {
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
}

// Copyright 2020 The scaphandre authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
Expand Down
64 changes: 36 additions & 28 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
use clap::{command, ArgAction, Parser, Subcommand};
use colored::Colorize;
use scaphandre::{exporters, sensors::Sensor};
use std::sync::mpsc::{self, Receiver, Sender};
use std::thread;

#[cfg(target_os = "linux")]
Expand All @@ -21,7 +20,7 @@ use windows_service::{
service::ServiceStatus,
service::ServiceType,
service_control_handler::{self, ServiceControlHandlerResult},
service_dispatcher, Result,
service_dispatcher
};

#[cfg(target_os = "windows")]
Expand Down Expand Up @@ -113,14 +112,13 @@ enum ExporterChoice {
}

#[cfg(target_os = "windows")]
fn my_service_main(arguments: Vec<OsString>) {
fn my_service_main(_arguments: Vec<OsString>) {
use std::thread::JoinHandle;
let graceful_period = 3;

let (tx, rx) = mpsc::channel();
let start_status = ServiceStatus {
service_type: ServiceType::OWN_PROCESS, // Should match the one from system service registry
current_state: ServiceState::Running, // The new state
current_state: ServiceState::Running, // The new state
controls_accepted: ServiceControlAccept::STOP, // Accept stop events when running
exit_code: ServiceExitCode::Win32(0), // Used to report an error when starting or stopping only, otherwise must be zero
checkpoint: 0, // Only used for pending states, otherwise must be zero
Expand All @@ -134,7 +132,7 @@ fn my_service_main(arguments: Vec<OsString>) {
exit_code: ServiceExitCode::Win32(0),
checkpoint: 0,
wait_hint: Duration::default(),
process_id: None
process_id: None,
};
let stoppending_status = ServiceStatus {
service_type: ServiceType::OWN_PROCESS,
Expand All @@ -143,18 +141,17 @@ fn my_service_main(arguments: Vec<OsString>) {
exit_code: ServiceExitCode::Win32(0),
checkpoint: 0,
wait_hint: Duration::from_secs(graceful_period),
process_id: None
process_id: None,
};

let mut thread_handle: Option<JoinHandle<()>> = None;
let mut stop = false;
let thread_handle: Option<JoinHandle<()>>;
let mut _stop = false;
let event_handler = move |control_event| -> ServiceControlHandlerResult {
println!("Got service control event: {:?}", control_event);
match control_event {
ServiceControl::Stop => {
// Handle stop event and return control back to the system.
stop = true;
let _ = &tx.send(1);
_stop = true;
ServiceControlHandlerResult::NoError
}
// All services must accept Interrogate even if it's a no-op.
Expand All @@ -164,27 +161,42 @@ fn my_service_main(arguments: Vec<OsString>) {
};

if let Ok(system_handler) = service_control_handler::register("scaphandre", event_handler) {
// Tell the system that the service is running now and run it
// Tell the system that the service is running now and run it
match system_handler.set_service_status(start_status.clone()) {
Ok(status_set) => {
println!("Starting main thread, service status has been set: {:?}", status_set);
thread_handle = Some(thread::spawn(move || { parse_cli_and_run_exporter(&rx); }));
},
println!(
"Starting main thread, service status has been set: {:?}",
status_set
);
thread_handle = Some(thread::spawn(move || {
parse_cli_and_run_exporter();
}));
}
Err(e) => {
panic!("Couldn't set Windows service status. Error: {:?}", e);
}
}
loop {
if stop {
if _stop {
// Wait for the thread to finnish, then end the current function
match system_handler.set_service_status(stoppending_status.clone()) {
Ok(status_set) => {
println!("Stop status has been set for service: {:?}", status_set);
if let Some(thr) = thread_handle {
if let Ok(_) = thr.join() {
if thr.join().is_ok() {
match system_handler.set_service_status(stop_status.clone()) {
Ok(laststatus_set) => {println!("Scaphandre gracefully stopped: {:?}", laststatus_set);},
Err(e) => {panic!("Could'nt set Stop status on scaphandre service: {:?}", e);}
Ok(laststatus_set) => {
println!(
"Scaphandre gracefully stopped: {:?}",
laststatus_set
);
}
Err(e) => {
panic!(
"Could'nt set Stop status on scaphandre service: {:?}",
e
);
}
}
} else {
panic!("Joining the thread failed.");
Expand All @@ -193,7 +205,7 @@ fn my_service_main(arguments: Vec<OsString>) {
} else {
panic!("Thread handle was not initialized.");
}
},
}
Err(e) => {
panic!("Couldn't set Windows service status. Error: {:?}", e);
}
Expand All @@ -214,12 +226,10 @@ fn main() {
}
}

let (_, rx) = mpsc::channel();

parse_cli_and_run_exporter(&rx);
parse_cli_and_run_exporter();
}

fn parse_cli_and_run_exporter(channel: &Receiver<u8>) {
fn parse_cli_and_run_exporter() {
let cli = Cli::parse();
loggerv::init_with_verbosity(cli.verbose.into()).expect("unable to initialize the logger");

Expand All @@ -229,7 +239,7 @@ fn parse_cli_and_run_exporter(channel: &Receiver<u8>) {
print_scaphandre_header(exporter.kind());
}

exporter.run(channel);
exporter.run();
}

fn build_exporter(choice: ExporterChoice, sensor: &dyn Sensor) -> Box<dyn exporters::Exporter> {
Expand Down Expand Up @@ -280,9 +290,7 @@ fn build_sensor(cli: &Cli) -> impl Sensor {
};

#[cfg(target_os = "windows")]
let msr_sensor_win = || {
msr_rapl::MsrRAPLSensor::new()
};
let msr_sensor_win = msr_rapl::MsrRAPLSensor::new;

match cli.sensor.as_deref() {
Some("powercap_rapl") => {
Expand Down
Loading

0 comments on commit 4fec833

Please sign in to comment.