diff --git a/src/exporters/json.rs b/src/exporters/json.rs index 6733e46f..c448cd4f 100644 --- a/src/exporters/json.rs +++ b/src/exporters/json.rs @@ -8,7 +8,6 @@ use std::{ path::{Path, PathBuf}, thread, time::{Duration, Instant}, - sync::mpsc::Receiver }; /// An Exporter that writes power consumption data of the host @@ -157,7 +156,7 @@ struct Report { impl Exporter for JsonExporter { /// Runs [iterate()] every `step` until `timeout` - fn run(&mut self, channel: &Receiver) { + fn run(&mut self) { let step = self.time_step; info!("Measurement step is: {step:?}"); diff --git a/src/exporters/mod.rs b/src/exporters/mod.rs index c5e8f427..60c6d857 100644 --- a/src/exporters/mod.rs +++ b/src/exporters/mod.rs @@ -18,14 +18,13 @@ pub mod utils; pub mod warpten; use crate::sensors::{ utils::{current_system_time_since_epoch, IProcess}, - RecordGenerator, Topology, Record + RecordGenerator, Topology, }; use chrono::Utc; use std::collections::HashMap; use std::fmt; use std::time::Duration; use utils::get_scaphandre_version; -use std::sync::mpsc::Receiver; #[cfg(feature = "containers")] use { docker_sync::{container::Container, Docker}, @@ -109,22 +108,10 @@ impl fmt::Debug for MetricValueType { /// with the structs provided by the sensor. pub trait Exporter { /// Runs the exporter. - fn run(&mut self, channel: &Receiver); + fn run(&mut self); /// The name of the kind of the exporter, for example "json". fn kind(&self) -> &str; - - fn watch_signal(&mut self, channel: &Receiver) -> Option { - match channel.try_recv() { - Ok(received) => { - info!("Received signal: {}", received); - Some(1) - }, - Err(_) => { - None - } - } - } } /// MetricGenerator is an exporter helper structure to collect Scaphandre metrics. @@ -643,7 +630,6 @@ impl MetricGenerator { description: String::from("Total swap space on the host, in bytes."), metric_value: MetricValueType::Text(metric_value.value), }); - } /// Generate socket metrics. diff --git a/src/exporters/prometheus.rs b/src/exporters/prometheus.rs index ad0e6150..9159c3a9 100644 --- a/src/exporters/prometheus.rs +++ b/src/exporters/prometheus.rs @@ -5,7 +5,7 @@ //! [scrape](https://prometheus.io/docs/prometheus/latest/getting_started). use super::utils; -use crate::current_system_time_since_epoch; +use crate::sensors::utils::current_system_time_since_epoch; use crate::exporters::{Exporter, MetricGenerator, MetricValueType}; use crate::sensors::{Sensor, Topology}; use chrono::Utc; @@ -16,9 +16,9 @@ use std::{ collections::HashMap, fmt::Write, net::{IpAddr, Ipv4Addr, SocketAddr}, + sync::mpsc::Receiver, sync::{Arc, Mutex}, time::Duration, - sync::mpsc::Receiver }; /// Default ipv4/ipv6 address to expose the service is any @@ -73,7 +73,7 @@ impl PrometheusExporter { impl Exporter for PrometheusExporter { /// Starts an HTTP server to expose the metrics in Prometheus format. - fn run(&mut self, channel: &Receiver) { + fn run(&mut self) { info!( "{}: Starting Prometheus exporter", Utc::now().format("%Y-%m-%dT%H:%M:%S") diff --git a/src/exporters/prometheuspush.rs b/src/exporters/prometheuspush.rs index 0ff558c2..73981e4f 100644 --- a/src/exporters/prometheuspush.rs +++ b/src/exporters/prometheuspush.rs @@ -13,7 +13,6 @@ use isahc::{prelude::*, Request}; use std::fmt::Write; use std::thread; use std::time::Duration; -use std::sync::mpsc::Receiver; pub struct PrometheusPushExporter { topo: Topology, @@ -73,7 +72,7 @@ impl PrometheusPushExporter { } impl Exporter for PrometheusPushExporter { - fn run(&mut self, channel: &Receiver) { + fn run(&mut self) { info!( "{}: Starting Prometheus Push exporter", Utc::now().format("%Y-%m-%dT%H:%M:%S") @@ -97,10 +96,6 @@ impl Exporter for PrometheusPushExporter { ); loop { - if self.watch_signal(channel).is_some() { - info!("Daemon/Service has received a stop signal."); - break; - } metric_generator.topology.refresh(); metric_generator.gen_all_metrics(); let mut body = String::from(""); @@ -159,10 +154,6 @@ impl Exporter for PrometheusPushExporter { } } - if self.watch_signal(channel).is_some() { - info!("Daemon/Service has received a stop signal."); - break; - } thread::sleep(Duration::new(self.args.step, 0)); } } diff --git a/src/exporters/qemu.rs b/src/exporters/qemu.rs index 239293de..829645e6 100644 --- a/src/exporters/qemu.rs +++ b/src/exporters/qemu.rs @@ -1,8 +1,8 @@ use crate::exporters::Exporter; use crate::sensors::Topology; use crate::sensors::{utils::ProcessRecord, Sensor}; -use std::{fs, io, thread, time}; use std::sync::mpsc::Receiver; +use std::{fs, io, thread, time}; /// An Exporter that extracts power consumption data of running /// Qemu/KVM virtual machines on the host and store those data diff --git a/src/exporters/riemann.rs b/src/exporters/riemann.rs index 94843e2e..e2abadac 100644 --- a/src/exporters/riemann.rs +++ b/src/exporters/riemann.rs @@ -10,8 +10,8 @@ use riemann_client::proto::{Attribute, Event}; use riemann_client::Client; use std::collections::HashMap; use std::convert::TryFrom; -use std::time::{Duration, SystemTime, UNIX_EPOCH}; use std::sync::mpsc::Receiver; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; /// Riemann server default ipv4/ipv6 address const DEFAULT_IP_ADDRESS: &str = "localhost"; diff --git a/src/exporters/stdout.rs b/src/exporters/stdout.rs index ad5fe321..ce50c727 100644 --- a/src/exporters/stdout.rs +++ b/src/exporters/stdout.rs @@ -4,7 +4,6 @@ use regex::Regex; use std::fmt::Write; use std::thread; use std::time::{Duration, Instant}; -use std::sync::mpsc::Receiver; /// An Exporter that displays power consumption data of the host /// and its processes on the standard output of the terminal. @@ -54,7 +53,7 @@ pub struct ExporterArgs { impl Exporter for StdoutExporter { /// Runs [iterate()] every `step` until `timeout` - fn run(&mut self, channel: &Receiver) { + fn run(&mut self) { let time_step = Duration::from_secs(self.args.step); let time_limit = if self.args.timeout < 0 { None @@ -118,7 +117,7 @@ impl StdoutExporter { host_power_source = src.to_string() } &m.metric_value - }, + } None => &none_value, }; @@ -151,8 +150,6 @@ impl StdoutExporter { let mut to_print = format!("Socket{socket_id}\t{power_str} W |\t"); - - let domains = metrics.iter().filter(|x| { x.name == "scaph_domain_power_microwatts" && x.attributes.get("socket_id").unwrap() == &socket_id diff --git a/src/lib.rs b/src/lib.rs index 60b65c46..af59dc34 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -14,8 +14,6 @@ use sensors::msr_rapl; #[cfg(target_os = "linux")] use sensors::powercap_rapl; -use std::time::{Duration, SystemTime}; - /// Create a new [`Sensor`] instance with the default sensor available, /// with its default options. pub fn get_default_sensor() -> impl sensors::Sensor { @@ -30,12 +28,6 @@ pub fn get_default_sensor() -> impl sensors::Sensor { return msr_rapl::MsrRAPLSensor::new(); } -fn current_system_time_since_epoch() -> Duration { - SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap() -} - // Copyright 2020 The scaphandre authors. // // Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/src/main.rs b/src/main.rs index 187fd3fc..320c0cb9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,7 +3,6 @@ use clap::{command, ArgAction, Parser, Subcommand}; use colored::Colorize; use scaphandre::{exporters, sensors::Sensor}; -use std::sync::mpsc::{self, Receiver, Sender}; use std::thread; #[cfg(target_os = "linux")] @@ -21,7 +20,7 @@ use windows_service::{ service::ServiceStatus, service::ServiceType, service_control_handler::{self, ServiceControlHandlerResult}, - service_dispatcher, Result, + service_dispatcher }; #[cfg(target_os = "windows")] @@ -113,14 +112,13 @@ enum ExporterChoice { } #[cfg(target_os = "windows")] -fn my_service_main(arguments: Vec) { +fn my_service_main(_arguments: Vec) { use std::thread::JoinHandle; let graceful_period = 3; - let (tx, rx) = mpsc::channel(); let start_status = ServiceStatus { service_type: ServiceType::OWN_PROCESS, // Should match the one from system service registry - current_state: ServiceState::Running, // The new state + current_state: ServiceState::Running, // The new state controls_accepted: ServiceControlAccept::STOP, // Accept stop events when running exit_code: ServiceExitCode::Win32(0), // Used to report an error when starting or stopping only, otherwise must be zero checkpoint: 0, // Only used for pending states, otherwise must be zero @@ -134,7 +132,7 @@ fn my_service_main(arguments: Vec) { exit_code: ServiceExitCode::Win32(0), checkpoint: 0, wait_hint: Duration::default(), - process_id: None + process_id: None, }; let stoppending_status = ServiceStatus { service_type: ServiceType::OWN_PROCESS, @@ -143,18 +141,17 @@ fn my_service_main(arguments: Vec) { exit_code: ServiceExitCode::Win32(0), checkpoint: 0, wait_hint: Duration::from_secs(graceful_period), - process_id: None + process_id: None, }; - let mut thread_handle: Option> = None; - let mut stop = false; + let thread_handle: Option>; + let mut _stop = false; let event_handler = move |control_event| -> ServiceControlHandlerResult { println!("Got service control event: {:?}", control_event); match control_event { ServiceControl::Stop => { // Handle stop event and return control back to the system. - stop = true; - let _ = &tx.send(1); + _stop = true; ServiceControlHandlerResult::NoError } // All services must accept Interrogate even if it's a no-op. @@ -164,27 +161,42 @@ fn my_service_main(arguments: Vec) { }; if let Ok(system_handler) = service_control_handler::register("scaphandre", event_handler) { - // Tell the system that the service is running now and run it + // Tell the system that the service is running now and run it match system_handler.set_service_status(start_status.clone()) { Ok(status_set) => { - println!("Starting main thread, service status has been set: {:?}", status_set); - thread_handle = Some(thread::spawn(move || { parse_cli_and_run_exporter(&rx); })); - }, + println!( + "Starting main thread, service status has been set: {:?}", + status_set + ); + thread_handle = Some(thread::spawn(move || { + parse_cli_and_run_exporter(); + })); + } Err(e) => { panic!("Couldn't set Windows service status. Error: {:?}", e); } } loop { - if stop { + if _stop { // Wait for the thread to finnish, then end the current function match system_handler.set_service_status(stoppending_status.clone()) { Ok(status_set) => { println!("Stop status has been set for service: {:?}", status_set); if let Some(thr) = thread_handle { - if let Ok(_) = thr.join() { + if thr.join().is_ok() { match system_handler.set_service_status(stop_status.clone()) { - Ok(laststatus_set) => {println!("Scaphandre gracefully stopped: {:?}", laststatus_set);}, - Err(e) => {panic!("Could'nt set Stop status on scaphandre service: {:?}", e);} + Ok(laststatus_set) => { + println!( + "Scaphandre gracefully stopped: {:?}", + laststatus_set + ); + } + Err(e) => { + panic!( + "Could'nt set Stop status on scaphandre service: {:?}", + e + ); + } } } else { panic!("Joining the thread failed."); @@ -193,7 +205,7 @@ fn my_service_main(arguments: Vec) { } else { panic!("Thread handle was not initialized."); } - }, + } Err(e) => { panic!("Couldn't set Windows service status. Error: {:?}", e); } @@ -214,12 +226,10 @@ fn main() { } } - let (_, rx) = mpsc::channel(); - - parse_cli_and_run_exporter(&rx); + parse_cli_and_run_exporter(); } -fn parse_cli_and_run_exporter(channel: &Receiver) { +fn parse_cli_and_run_exporter() { let cli = Cli::parse(); loggerv::init_with_verbosity(cli.verbose.into()).expect("unable to initialize the logger"); @@ -229,7 +239,7 @@ fn parse_cli_and_run_exporter(channel: &Receiver) { print_scaphandre_header(exporter.kind()); } - exporter.run(channel); + exporter.run(); } fn build_exporter(choice: ExporterChoice, sensor: &dyn Sensor) -> Box { @@ -280,9 +290,7 @@ fn build_sensor(cli: &Cli) -> impl Sensor { }; #[cfg(target_os = "windows")] - let msr_sensor_win = || { - msr_rapl::MsrRAPLSensor::new() - }; + let msr_sensor_win = msr_rapl::MsrRAPLSensor::new; match cli.sensor.as_deref() { Some("powercap_rapl") => { diff --git a/src/sensors/mod.rs b/src/sensors/mod.rs index fd9f5518..40dd4547 100644 --- a/src/sensors/mod.rs +++ b/src/sensors/mod.rs @@ -5,7 +5,7 @@ #[cfg(target_os = "windows")] pub mod msr_rapl; -#[cfg(target_os="windows")] +#[cfg(target_os = "windows")] use msr_rapl::get_msr_value; #[cfg(target_os = "linux")] pub mod powercap_rapl; @@ -227,13 +227,10 @@ impl Topology { } } - pub fn safe_insert_socket( - &mut self, - socket: CPUSocket - ) { + pub fn safe_insert_socket(&mut self, socket: CPUSocket) { if !self.sockets.iter().any(|s| s.id == socket.id) { self.sockets.push(socket); - } + } } /// Returns a immutable reference to self.proc_tracker @@ -296,53 +293,53 @@ impl Topology { /// Generates CPUCore instances for the host and adds them /// to appropriate CPUSocket instance from self.sockets + #[cfg(target_os = "linux")] pub fn add_cpu_cores(&mut self) { if let Some(mut cores) = Topology::generate_cpu_cores() { - #[cfg(target_os = "linux")] { - while let Some(c) = cores.pop() { - let socket_id = &c - .attributes - .get("physical id") - .unwrap() - .parse::() - .unwrap(); - let socket_match = self.sockets.iter_mut().find(|x| &x.id == socket_id); - - //In VMs there might be a missmatch betwen Sockets and Cores - see Issue#133 as a first fix we just map all cores that can't be mapped to the first - let socket = match socket_match { - Some(x) => x, - None =>self.sockets.first_mut().expect("Trick: if you are running on a vm, do not forget to use --vm parameter invoking scaphandre at the command line") - }; - - if socket_id == &socket.id { - socket.add_cpu_core(c); - } else { - socket.add_cpu_core(c); - warn!("coud't not match core to socket - mapping to first socket instead - if you are not using --vm there is something wrong") - } + while let Some(c) = cores.pop() { + let socket_id = &c + .attributes + .get("physical id") + .unwrap() + .parse::() + .unwrap(); + let socket_match = self.sockets.iter_mut().find(|x| &x.id == socket_id); + + //In VMs there might be a missmatch betwen Sockets and Cores - see Issue#133 as a first fix we just map all cores that can't be mapped to the first + let socket = match socket_match { + Some(x) => x, + None =>self.sockets.first_mut().expect("Trick: if you are running on a vm, do not forget to use --vm parameter invoking scaphandre at the command line") + }; + + if socket_id == &socket.id { + socket.add_cpu_core(c); + } else { + socket.add_cpu_core(c); + warn!("coud't not match core to socket - mapping to first socket instead - if you are not using --vm there is something wrong") } } + //#[cfg(target_os = "windows")] //{ - //TODO: fix - //let nb_sockets = &self.sockets.len(); - //let mut socket_counter = 0; - //let nb_cores_per_socket = &cores.len() / nb_sockets; - //warn!("nb_cores_per_socket: {} cores_len: {} sockets_len: {}", nb_cores_per_socket, &cores.len(), &self.sockets.len()); - //for s in self.sockets.iter_mut() { - // for c in (socket_counter * nb_cores_per_socket)..((socket_counter+1) * nb_cores_per_socket) { - // match cores.pop() { - // Some(core) => { - // warn!("adding core {} to socket {}", core.id, s.id); - // s.add_cpu_core(core); - // }, - // None => { - // error!("Uneven number of CPU cores !"); - // } - // } - // } - // socket_counter = socket_counter + 1; - //} + //TODO: fix + //let nb_sockets = &self.sockets.len(); + //let mut socket_counter = 0; + //let nb_cores_per_socket = &cores.len() / nb_sockets; + //warn!("nb_cores_per_socket: {} cores_len: {} sockets_len: {}", nb_cores_per_socket, &cores.len(), &self.sockets.len()); + //for s in self.sockets.iter_mut() { + // for c in (socket_counter * nb_cores_per_socket)..((socket_counter+1) * nb_cores_per_socket) { + // match cores.pop() { + // Some(core) => { + // warn!("adding core {} to socket {}", core.id, s.id); + // s.add_cpu_core(core); + // }, + // None => { + // error!("Uneven number of CPU cores !"); + // } + // } + // } + // socket_counter = socket_counter + 1; + //} //} } else { panic!("Couldn't retrieve any CPU Core from the topology. (generate_cpu_cores)"); @@ -462,33 +459,34 @@ impl Topology { .get(self.record_buffer.len() - 2) .unwrap(); match previous_record.value.trim().parse::() { - Ok(previous_microjoules) => { - match last_record.value.trim().parse::() { - Ok(last_microjoules) => { - if previous_microjoules > last_microjoules { - return None; - } - let microjoules = last_microjoules - previous_microjoules; - let time_diff = last_record.timestamp.as_secs_f64() - - previous_record.timestamp.as_secs_f64(); - let microwatts = microjoules as f64 / time_diff; - return Some(Record::new( - last_record.timestamp, - (microwatts as u64).to_string(), - units::Unit::MicroWatt, - )); - }, - Err(e) => { - warn!( - "Could'nt get previous_microjoules - value : '{}' - error : {:?}", - previous_record.value, e - ); + Ok(previous_microjoules) => match last_record.value.trim().parse::() { + Ok(last_microjoules) => { + if previous_microjoules > last_microjoules { + return None; } - + let microjoules = last_microjoules - previous_microjoules; + let time_diff = last_record.timestamp.as_secs_f64() + - previous_record.timestamp.as_secs_f64(); + let microwatts = microjoules as f64 / time_diff; + return Some(Record::new( + last_record.timestamp, + (microwatts as u64).to_string(), + units::Unit::MicroWatt, + )); + } + Err(e) => { + warn!( + "Could'nt get previous_microjoules - value : '{}' - error : {:?}", + previous_record.value, e + ); } }, Err(e) => { - warn!("Couldn't parse previous_microjoules - value : '{}' - error : {:?}", previous_record.value.trim(), e); + warn!( + "Couldn't parse previous_microjoules - value : '{}' - error : {:?}", + previous_record.value.trim(), + e + ); } } } @@ -918,7 +916,7 @@ impl Topology { None } - #[cfg(target_os="linux")] + #[cfg(target_os = "linux")] pub fn get_rapl_psys_energy_microjoules(&self) -> Option { if let Some(psys) = self._sensor_data.get("psys") { match &fs::read_to_string(format!("{psys}/energy_uj")) { @@ -939,24 +937,30 @@ impl Topology { None } - #[cfg(target_os="windows")] + /// # Safety + /// + /// This function is unsafe rust as it calls get_msr_value function from msr_rapl sensor module. + /// It calls the msr_RAPL::MSR_PLATFORM_ENERGY_STATUS MSR address, which has been tested on several Intel x86 processors + /// but might fail on AMD (needs testing). That being said, it returns None if the msr query fails (which means if the Windows + /// driver fails.) and should not prevent from using a value coming from elsewhere, which means from another get_msr_value calls + /// targeting another msr address. + #[cfg(target_os = "windows")] pub unsafe fn get_rapl_psys_energy_microjoules(&self) -> Option { let msr_addr = msr_rapl::MSR_PLATFORM_ENERGY_STATUS; - match msr_rapl::get_msr_value(0, msr_addr.into(), &self._sensor_data) { + match get_msr_value(0, msr_addr.into(), &self._sensor_data) { Ok(res) => { return Some(Record::new( current_system_time_since_epoch(), res.value.to_string(), - units::Unit::MicroJoule + units::Unit::MicroJoule, )) - }, + } Err(e) => { debug!("get_msr_value returned error : {}", e); } } None } - } // !!!!!!!!!!!!!!!!! CPUSocket !!!!!!!!!!!!!!!!!!!!!!! diff --git a/src/sensors/msr_rapl.rs b/src/sensors/msr_rapl.rs index 3f7097bd..1a0ae715 100644 --- a/src/sensors/msr_rapl.rs +++ b/src/sensors/msr_rapl.rs @@ -1,47 +1,40 @@ use crate::sensors::utils::current_system_time_since_epoch; -use crate::sensors::{CPUSocket, Domain, Record, RecordReader, Sensor, Topology, CPUCore}; +use crate::sensors::{CPUCore, CPUSocket, Domain, Record, RecordReader, Sensor, Topology}; +use raw_cpuid::{CpuId, TopologyType}; use std::collections::HashMap; use std::error::Error; use std::mem::size_of; -use sysinfo::{System, SystemExt, CpuExt, Cpu}; -use raw_cpuid::{CpuId, TopologyType}; +use sysinfo::{CpuExt, System, SystemExt}; use windows::Win32::Foundation::{CloseHandle, GetLastError, HANDLE, INVALID_HANDLE_VALUE}; use windows::Win32::Storage::FileSystem::{ CreateFileW, FILE_FLAG_OVERLAPPED, FILE_GENERIC_READ, FILE_GENERIC_WRITE, FILE_READ_DATA, FILE_SHARE_READ, FILE_SHARE_WRITE, FILE_WRITE_DATA, OPEN_EXISTING, }; use windows::Win32::System::Ioctl::{FILE_DEVICE_UNKNOWN, METHOD_BUFFERED}; -use windows::Win32::System::IO::DeviceIoControl; +use windows::Win32::System::SystemInformation::GROUP_AFFINITY; use windows::Win32::System::Threading::{ - GetThreadGroupAffinity, GetProcessGroupAffinity, GetCurrentProcess, GetProcessInformation, - GetCurrentThread, GetActiveProcessorGroupCount, SetThreadGroupAffinity + GetActiveProcessorGroupCount, GetCurrentProcess, GetCurrentThread, GetProcessGroupAffinity, + GetThreadGroupAffinity, SetThreadGroupAffinity, }; -use windows::Win32::System::SystemInformation::GROUP_AFFINITY; +use windows::Win32::System::IO::DeviceIoControl; use core_affinity::{self, CoreId}; pub use x86::cpuid; // Intel RAPL MSRs pub use x86::msr::{ + MSR_DRAM_ENERGY_STATUS, MSR_DRAM_PERF_STATUS, MSR_PKG_ENERGY_STATUS, MSR_PKG_POWER_INFO, + MSR_PKG_POWER_LIMIT, MSR_PP0_ENERGY_STATUS, MSR_PP0_PERF_STATUS, MSR_PP1_ENERGY_STATUS, MSR_RAPL_POWER_UNIT, - MSR_PKG_POWER_LIMIT, - MSR_PKG_POWER_INFO, - MSR_PKG_ENERGY_STATUS, - MSR_DRAM_ENERGY_STATUS, - MSR_DRAM_PERF_STATUS, - MSR_PP0_ENERGY_STATUS, - MSR_PP0_PERF_STATUS, - MSR_PP1_ENERGY_STATUS, }; pub const MSR_PLATFORM_ENERGY_STATUS: u32 = 0x0000064d; -pub const MSR_PLATFORM_POWER_LIMIT: u32 = 0x0000065c ; +pub const MSR_PLATFORM_POWER_LIMIT: u32 = 0x0000065c; // AMD RAPL MSRs pub const MSR_AMD_RAPL_POWER_UNIT: u32 = 0xc0010299; pub const MSR_AMD_CORE_ENERGY_STATUS: u32 = 0xc001029a; pub const MSR_AMD_PKG_ENERGY_STATUS: u32 = 0xc001029b; - unsafe fn ctl_code(device_type: u32, request_code: u32, method: u32, access: u32) -> u32 { ((device_type) << 16) | ((access) << 14) | ((request_code) << 2) | (method) } @@ -176,7 +169,7 @@ impl MsrRAPLSensor { impl RecordReader for Topology { fn read_record(&self) -> Result> { - let mut record: Option = None; + let record: Option; unsafe { record = self.get_rapl_psys_energy_microjoules(); } @@ -189,8 +182,8 @@ impl RecordReader for Topology { match s.read_record() { Ok(rec) => { debug!("rec: {:?}", rec); - res = res + rec.value.parse::()?; - }, + res += rec.value.parse::()?; + } Err(e) => { error!("Failed to get socket record : {:?}", e); } @@ -250,32 +243,65 @@ impl RecordReader for CPUSocket { fn read_record(&self) -> Result> { unsafe { let current_thread = GetCurrentThread(); - let processorgroup_id = self.sensor_data.get("PROCESSORGROUP_ID").unwrap().parse::().unwrap(); - let mut thread_group_affinity: GROUP_AFFINITY = GROUP_AFFINITY { Mask: 255, Group: processorgroup_id, Reserved: [0,0,0] }; - let thread_affinity = GetThreadGroupAffinity(current_thread, &mut thread_group_affinity); + let processorgroup_id = self + .sensor_data + .get("PROCESSORGROUP_ID") + .unwrap() + .parse::() + .unwrap(); + let mut thread_group_affinity: GROUP_AFFINITY = GROUP_AFFINITY { + Mask: 255, + Group: processorgroup_id, + Reserved: [0, 0, 0], + }; + let thread_affinity = + GetThreadGroupAffinity(current_thread, &mut thread_group_affinity); if thread_affinity.as_bool() { debug!("got thead_affinity : {:?}", thread_group_affinity); let core_id = self.cpu_cores.last().unwrap().id; //(self.cpu_cores.last().unwrap().id + self.id * self.cpu_cores.len() as u16) as usize - let newaffinity = GROUP_AFFINITY { Mask: (self.cpu_cores.len() + self.id as usize * self.cpu_cores.len() - 1) as usize, Group: processorgroup_id, Reserved: [0, 0, 0]}; - let res = SetThreadGroupAffinity(current_thread, &newaffinity, &mut thread_group_affinity); + let newaffinity = GROUP_AFFINITY { + Mask: self.cpu_cores.len() + self.id as usize * self.cpu_cores.len() - 1, + Group: processorgroup_id, + Reserved: [0, 0, 0], + }; + let res = SetThreadGroupAffinity( + current_thread, + &newaffinity, + &mut thread_group_affinity, + ); if res.as_bool() { - debug!("Asking get_msr_value, from socket, with core_id={}", core_id); - match get_msr_value(core_id as usize, MSR_PKG_ENERGY_STATUS as u64, &self.sensor_data) { + debug!( + "Asking get_msr_value, from socket, with core_id={}", + core_id + ); + match get_msr_value( + core_id as usize, + MSR_PKG_ENERGY_STATUS as u64, + &self.sensor_data, + ) { Ok(rec) => { - return Ok(Record { timestamp: current_system_time_since_epoch(), value: rec.value, unit: super::units::Unit::MicroJoule }) - }, + Ok(Record { + timestamp: current_system_time_since_epoch(), + value: rec.value, + unit: super::units::Unit::MicroJoule, + }) + } Err(e) => { - error!("Could'nt get MSR value for {}: {}", MSR_PKG_ENERGY_STATUS, e); - return Ok(Record { + error!( + "Could'nt get MSR value for {}: {}", + MSR_PKG_ENERGY_STATUS, e + ); + Ok(Record { timestamp: current_system_time_since_epoch(), value: String::from("0"), - unit: super::units::Unit::MicroJoule + unit: super::units::Unit::MicroJoule, }) } } } else { panic!("Couldn't set Thread affinity !"); } + //TODO add DRAM domain to result when available } else { panic!("Coudld'nt get Thread affinity !"); } @@ -289,21 +315,28 @@ impl RecordReader for Domain { debug!("Reading Domain {} on Core {}", self.name, usize_coreid); if let Some(msr_addr) = self.sensor_data.get("MSR_ADDR") { unsafe { - debug!("Asking, from Domain, get_msr_value with core_id={}", usize_coreid); - match get_msr_value(usize_coreid, msr_addr.parse::().unwrap(), &self.sensor_data) { + debug!( + "Asking, from Domain, get_msr_value with core_id={}", + usize_coreid + ); + match get_msr_value( + usize_coreid, + msr_addr.parse::().unwrap(), + &self.sensor_data, + ) { Ok(rec) => { - return Ok(Record { + Ok(Record { timestamp: current_system_time_since_epoch(), unit: super::units::Unit::MicroJoule, value: rec.value, }) - }, + } Err(e) => { error!("Could'nt get MSR value for {}: {}", msr_addr, e); - Ok(Record { + Ok(Record { timestamp: current_system_time_since_epoch(), value: String::from("0"), - unit: super::units::Unit::MicroJoule + unit: super::units::Unit::MicroJoule, }) } } @@ -337,8 +370,7 @@ impl Sensor for MsrRAPLSensor { for group_id in 0..group_count { //TODO fix that to actually count the number of sockets - let logical_cpus = sys.cpus() ; - let mut nb_cpu_sockets: u16 = 0; + let logical_cpus = sys.cpus(); let cpuid = CpuId::new(); let mut logical_cpus_from_cpuid = 1; match cpuid.get_extended_topology_info() { @@ -348,7 +380,7 @@ impl Sensor for MsrRAPLSensor { logical_cpus_from_cpuid = t.processors(); } } - }, + } None => { panic!("Could'nt get cpuid data."); } @@ -359,31 +391,62 @@ impl Sensor for MsrRAPLSensor { let mut i: u16 = 0; let mut no_more_sockets = false; debug!("Entering ProcessorGroup {}", group_id); - let newaffinity = GROUP_AFFINITY { Mask: 255, Group: group_id, Reserved: [0, 0, 0]}; - let mut thread_group_affinity: GROUP_AFFINITY = GROUP_AFFINITY { Mask: 255, Group: 0, Reserved: [0,0,0] }; - let thread_affinity = GetThreadGroupAffinity(current_thread, &mut thread_group_affinity); + let newaffinity = GROUP_AFFINITY { + Mask: 255, + Group: group_id, + Reserved: [0, 0, 0], + }; + let mut thread_group_affinity: GROUP_AFFINITY = GROUP_AFFINITY { + Mask: 255, + Group: 0, + Reserved: [0, 0, 0], + }; + let thread_affinity = + GetThreadGroupAffinity(current_thread, &mut thread_group_affinity); debug!("Thread group affinity result : {:?}", thread_affinity); if thread_affinity.as_bool() { debug!("got thead_affinity : {:?}", thread_group_affinity); - let res = SetThreadGroupAffinity(current_thread, &newaffinity, &mut thread_group_affinity); + let res = SetThreadGroupAffinity( + current_thread, + &newaffinity, + &mut thread_group_affinity, + ); if res.as_bool() { debug!("Have set thread affinity: {:?}", newaffinity); match core_affinity::get_core_ids() { Some(core_ids) => { - debug!("CPU SETUP - Cores from core_affinity, len={} : {:?}", core_ids.len(), core_ids); - debug!("CPU SETUP - Logical CPUs from sysinfo: {}", logical_cpus.len()); + debug!( + "CPU SETUP - Cores from core_affinity, len={} : {:?}", + core_ids.len(), + core_ids + ); + debug!( + "CPU SETUP - Logical CPUs from sysinfo: {}", + logical_cpus.len() + ); while !no_more_sockets { let start = i * logical_cpus_from_cpuid; - let stop = (i+1)*logical_cpus_from_cpuid; + let stop = (i + 1) * logical_cpus_from_cpuid; debug!("Looping over {} .. {}", start, stop); - sensor_data.insert(String::from("PROCESSORGROUP_ID"), group_id.to_string()); - let mut current_socket = CPUSocket::new(i, vec![], vec![], String::from(""),1, sensor_data.clone()); - for c in start..stop {//core_ids { + sensor_data.insert( + String::from("PROCESSORGROUP_ID"), + group_id.to_string(), + ); + let mut current_socket = CPUSocket::new( + i, + vec![], + vec![], + String::from(""), + 1, + sensor_data.clone(), + ); + for c in start..stop { + //core_ids { if core_affinity::set_for_current(CoreId { id: c.into() }) { match cpuid.get_vendor_info() { Some(info) => { debug!("Got CPU {:?}", info); - }, + } None => { warn!("Couldn't get cpuinfo"); } @@ -394,43 +457,76 @@ impl Sensor for MsrRAPLSensor { debug!("Got CPU topo info {:?}", info); for t in info { if t.level_type() == TopologyType::Core { - //nb_cpu_sockets = logical_cpus.len() as u16 / t.processors(); //logical_cpus_from_cpuid = t.processors() let x2apic_id = t.x2apic_id(); let socket_id = (x2apic_id & 240) >> 4; // upper bits of x2apic_id are socket_id, mask them, then bit shift to get socket_id current_socket.set_id(socket_id as u16); let core_id = x2apic_id & 15; // 4 last bits of x2apic_id are the core_id (per-socket) - debug!("Found socketid={} and coreid={}", socket_id, core_id); - let mut attributes = HashMap::::new(); - let ref_core = logical_cpus.first().unwrap(); - attributes.insert(String::from("frequency"), ref_core.frequency().to_string()); - attributes.insert(String::from("name"), ref_core.name().to_string()); - attributes.insert(String::from("vendor_id"), ref_core.vendor_id().to_string()); - attributes.insert(String::from("brand"), ref_core.brand().to_string()); - debug!("Adding core id {} to socket_id {}", ((i * (logical_cpus_from_cpuid - 1)) + core_id as u16), current_socket.id); - current_socket.add_cpu_core(CPUCore::new((i * (logical_cpus_from_cpuid - 1)) + core_id as u16, attributes)); - debug!("Reviewing sockets : {:?}", topology.get_sockets_passive()); + debug!( + "Found socketid={} and coreid={}", + socket_id, core_id + ); + let mut attributes = + HashMap::::new(); + let ref_core = + logical_cpus.first().unwrap(); + attributes.insert( + String::from("frequency"), + ref_core.frequency().to_string(), + ); + attributes.insert( + String::from("name"), + ref_core.name().to_string(), + ); + attributes.insert( + String::from("vendor_id"), + ref_core.vendor_id().to_string(), + ); + attributes.insert( + String::from("brand"), + ref_core.brand().to_string(), + ); + debug!( + "Adding core id {} to socket_id {}", + ((i * (logical_cpus_from_cpuid + - 1)) + + core_id as u16), + current_socket.id + ); + current_socket.add_cpu_core( + CPUCore::new( + (i * (logical_cpus_from_cpuid + - 1)) + + core_id as u16, + attributes, + ), + ); + debug!( + "Reviewing sockets : {:?}", + topology.get_sockets_passive() + ); } } - }, + } None => { warn!("Couldn't get cpu topo info"); } } } else { no_more_sockets = true; - debug!("There's likely to be no more socket to explore."); + debug!( + "There's likely to be no more socket to explore." + ); break; } - } + } if !no_more_sockets { debug!("inserting socket {:?}", current_socket); topology.safe_insert_socket(current_socket); - i = i + 1; + i += 1; } } - nb_cpu_sockets = i; - }, + } None => { panic!("Could'nt get core ids from core_affinity."); } @@ -448,14 +544,13 @@ impl Sensor for MsrRAPLSensor { panic!("Error was : {:?}", last_error); } } else { - panic!("Getting thread group affinity failed !"); + error!("Getting thread group affinity failed !"); let last_error = GetLastError(); panic!("Error was: {:?}", last_error); // win32 error 122 is insufficient buffer } } //let process_information = GetProcessInformation(current_process, , , ); } - //nb_cpu_sockets = logical_cpus.len() as u16 / logical_cpus_from_cpuid; //let mut core_id_counter = logical_cpus.len(); //match cpuid.get_advanced_power_mgmt_info() { @@ -506,29 +601,38 @@ impl Sensor for MsrRAPLSensor { // warn!("Couldn't get cpu capacity info"); // } //} - //TODO: fix - //i=0; - //while i < nb_cpu_sockets { - // //topology.safe_add_domain_to_socket(i, , name, uj_counter, buffer_max_kbytes, sensor_data) - // i = i + 1; - //} //topology.add_cpu_cores(); - let mut domains = vec![]; + let mut domains = vec![]; for s in topology.get_sockets() { debug!("Inspecting CPUSocket: {:?}", s); unsafe { - let core_id = s.get_cores_passive().last().unwrap().id + s.id * s.cpu_cores.len() as u16; - debug!("Asking get_msr_value, from generate_tpopo, with core_id={}", core_id); - match get_msr_value(core_id as usize, MSR_DRAM_ENERGY_STATUS as u64, &sensor_data) { + let core_id = + s.get_cores_passive().last().unwrap().id + s.id * s.cpu_cores.len() as u16; + debug!( + "Asking get_msr_value, from generate_tpopo, with core_id={}", + core_id + ); + match get_msr_value( + core_id as usize, + MSR_DRAM_ENERGY_STATUS as u64, + &sensor_data, + ) { Ok(_rec) => { debug!("Adding domain Dram !"); let mut domain_sensor_data = sensor_data.clone(); - domain_sensor_data.insert(String::from("MSR_ADDR"), MSR_DRAM_ENERGY_STATUS.to_string()); + domain_sensor_data + .insert(String::from("MSR_ADDR"), MSR_DRAM_ENERGY_STATUS.to_string()); domain_sensor_data.insert(String::from("CORE_ID"), core_id.to_string()); // nb of cores in a socket * socket_id + local_core_id domains.push(String::from("dram")); - s.safe_add_domain(Domain::new(2, String::from("dram"), String::from(""), 5, domain_sensor_data)) - }, + s.safe_add_domain(Domain::new( + 2, + String::from("dram"), + String::from(""), + 5, + domain_sensor_data, + )) + } Err(e) => { warn!("Could'nt add Dram domain: {}", e); } @@ -537,11 +641,18 @@ impl Sensor for MsrRAPLSensor { Ok(_rec) => { debug!("Adding domain Core !"); let mut domain_sensor_data = sensor_data.clone(); - domain_sensor_data.insert(String::from("MSR_ADDR"), MSR_PP0_ENERGY_STATUS.to_string()); + domain_sensor_data + .insert(String::from("MSR_ADDR"), MSR_PP0_ENERGY_STATUS.to_string()); domain_sensor_data.insert(String::from("CORE_ID"), core_id.to_string()); domains.push(String::from("core")); - s.safe_add_domain(Domain::new(2, String::from("core"), String::from(""), 5, domain_sensor_data)) - }, + s.safe_add_domain(Domain::new( + 2, + String::from("core"), + String::from(""), + 5, + domain_sensor_data, + )) + } Err(e) => { warn!("Could'nt add Core domain: {}", e); } @@ -550,11 +661,18 @@ impl Sensor for MsrRAPLSensor { Ok(_rec) => { debug!("Adding domain Uncore !"); let mut domain_sensor_data = sensor_data.clone(); - domain_sensor_data.insert(String::from("MSR_ADDR"), MSR_PP1_ENERGY_STATUS.to_string()); + domain_sensor_data + .insert(String::from("MSR_ADDR"), MSR_PP1_ENERGY_STATUS.to_string()); domain_sensor_data.insert(String::from("CORE_ID"), core_id.to_string()); domains.push(String::from("uncore")); - s.safe_add_domain(Domain::new(2, String::from("uncore"), String::from(""), 5, domain_sensor_data)) - }, + s.safe_add_domain(Domain::new( + 2, + String::from("uncore"), + String::from(""), + 5, + domain_sensor_data, + )) + } Err(e) => { warn!("Could'nt add Uncore domain: {}", e); } @@ -573,8 +691,10 @@ impl Sensor for MsrRAPLSensor { match get_msr_value(0, MSR_PLATFORM_ENERGY_STATUS as u64, &sensor_data) { Ok(_rec) => { debug!("Adding domain Platform / PSYS !"); - topology._sensor_data.insert(String::from("psys"), String::from("")); - }, + topology + ._sensor_data + .insert(String::from("psys"), String::from("")); + } Err(e) => { warn!("Could'nt add Uncore domain: {}", e); } @@ -594,19 +714,38 @@ impl Sensor for MsrRAPLSensor { } } -pub unsafe fn get_msr_value(core_id: usize, msr_addr: u64, sensor_data: &HashMap) -> Result { +/// # Safety +/// +/// This function should is unsafe rust as it uses send_request, hence calls a DeviceIO Windows driver. +/// The safety burden actuallr resides in the DeviceIO driver that is called. Please refer to the documentation to +/// get the relationship between Scaphandre and its driver for Windows. The driver should exit smoothly if a wrong +/// MSR address is called, then this function should throw an Error. Any improper issue with the operating system would mean +/// there is an issue in the driver used behind the scene, or the way it is configured. +pub unsafe fn get_msr_value( + core_id: usize, + msr_addr: u64, + sensor_data: &HashMap, +) -> Result { let current_process = GetCurrentProcess(); let current_thread = GetCurrentThread(); - let mut thread_group_affinity = GROUP_AFFINITY { Mask: 255, Group: 9, Reserved: [0,0,0] }; - let thread_affinity_res = GetThreadGroupAffinity(current_thread, &mut thread_group_affinity); + let mut thread_group_affinity = GROUP_AFFINITY { + Mask: 255, + Group: 9, + Reserved: [0, 0, 0], + }; + let thread_affinity_res = GetThreadGroupAffinity(current_thread, &mut thread_group_affinity); if thread_affinity_res.as_bool() { debug!("Thread affinity found : {:?}", thread_group_affinity); } else { error!("Could'nt get thread group affinity"); } - let mut process_group_array: [u16; 8] = [0,0,0,0,0,0,0,0]; + let mut process_group_array: [u16; 8] = [0, 0, 0, 0, 0, 0, 0, 0]; let mut process_group_array_len = 8; - let process_affinity_res = GetProcessGroupAffinity(current_process, &mut process_group_array_len, process_group_array.as_mut_ptr()); + let process_affinity_res = GetProcessGroupAffinity( + current_process, + &mut process_group_array_len, + process_group_array.as_mut_ptr(), + ); if process_affinity_res.as_bool() { debug!("Process affinity found: {:?}", process_group_array); } else { @@ -623,7 +762,7 @@ pub unsafe fn get_msr_value(core_id: usize, msr_addr: u64, sensor_data: &HashMap debug!("core_id: {:b}", ((core_id as u64) << 32)); let src = ((core_id as u64) << 32) | msr_addr; //let src = ((core_id as u64) << 32) | msr_addr; let ptr = &src as *const u64; - + debug!("src: {:x}", src); debug!("src: {:b}", src); debug!("*ptr: {:b}", *ptr); @@ -638,7 +777,7 @@ pub unsafe fn get_msr_value(core_id: usize, msr_addr: u64, sensor_data: &HashMap ptr_result, size_of::(), ) { - Ok(res) => { + Ok(_res) => { close_handle(device); let energy_unit = sensor_data @@ -646,7 +785,8 @@ pub unsafe fn get_msr_value(core_id: usize, msr_addr: u64, sensor_data: &HashMap .unwrap() .parse::() .unwrap(); - let current_value = MsrRAPLSensor::extract_rapl_current_power(msr_result, energy_unit); + let current_value = + MsrRAPLSensor::extract_rapl_current_power(msr_result, energy_unit); debug!("current_value: {}", current_value); Ok(Record { @@ -654,17 +794,17 @@ pub unsafe fn get_msr_value(core_id: usize, msr_addr: u64, sensor_data: &HashMap unit: super::units::Unit::MicroJoule, value: current_value, }) - }, + } Err(e) => { error!("Failed to get data from send_request: {:?}", e); close_handle(device); Err(format!("Failed to get data from send_request: {:?}", e)) } } - }, + } Err(e) => { error!("Couldn't get driver handle : {:?}", e); Err(format!("Couldn't get driver handle : {:?}", e)) } } -} \ No newline at end of file +}