diff --git a/Cargo.toml b/Cargo.toml index cab7a7ec..2074d408 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,7 +38,7 @@ isahc = { version = "1.7.2", optional = true } procfs = { version = "0.15.0" } [target.'cfg(target_os="windows")'.dependencies] -windows = { version = "0.27.0", features = ["alloc","Win32_Storage_FileSystem","Win32_Foundation","Win32_Security","Win32_System_IO","Win32_System_Ioctl"]} +windows = { version = "0.27.0", features = ["alloc","Win32_Storage_FileSystem","Win32_Foundation","Win32_Security","Win32_System_IO","Win32_System_Ioctl","Win32_System_Threading", "Win32_System_SystemInformation"]} windows-service = { version = "0.6.0" } raw-cpuid = { version = "10.5.0" } core_affinity = { version = "0.8.1"} diff --git a/src/sensors/mod.rs b/src/sensors/mod.rs index d64c33c6..3fa8f302 100644 --- a/src/sensors/mod.rs +++ b/src/sensors/mod.rs @@ -1048,6 +1048,10 @@ impl CPUSocket { } } + pub fn set_id(&mut self, id: u16) { + self.id = id + } + /// Adds a new Domain instance to the domains vector if and only if it doesn't exist in the vector already. fn safe_add_domain(&mut self, domain: Domain) { if !self.domains.iter().any(|d| d.id == domain.id) { diff --git a/src/sensors/msr_rapl.rs b/src/sensors/msr_rapl.rs index 3517adef..aef55c66 100644 --- a/src/sensors/msr_rapl.rs +++ b/src/sensors/msr_rapl.rs @@ -12,7 +12,11 @@ use windows::Win32::Storage::FileSystem::{ }; use windows::Win32::System::Ioctl::{FILE_DEVICE_UNKNOWN, METHOD_BUFFERED}; use windows::Win32::System::IO::DeviceIoControl; -use windows::Win32::System::Threading::SetThreadGroupAffinity; +use windows::Win32::System::Threading::{ + GetThreadGroupAffinity, GetProcessGroupAffinity, GetCurrentProcess, GetProcessInformation, + GetCurrentThread, GetActiveProcessorGroupCount, SetThreadGroupAffinity +}; +use windows::Win32::System::SystemInformation::GROUP_AFFINITY; use core_affinity::{self, CoreId}; @@ -237,94 +241,35 @@ unsafe fn send_request( impl RecordReader for CPUSocket { fn read_record(&self) -> Result> { unsafe { - let driver_name = self.sensor_data.get("DRIVER_NAME").unwrap(); - match get_handle(driver_name) { - Ok(device) => { - let mut msr_result: u64 = 0; - let ptr_result = &mut msr_result as *mut u64; - let mut core_id: u32 = 2; - // get core numbers tied to the socket - if let Some(core) = self.cpu_cores.first() { - core_id = core.id as u32; - match core_affinity::get_core_ids() { - Some(core_ids) => { - for c in core_ids { - if c.id == core.id as usize { - if core_affinity::set_for_current(c) { - warn!("Set core_affinity to {}", c.id); - } else { - warn!("Failed to set core_affinity to {}", c.id); - } - break; - } - } - }, - None => { - warn!("Could'nt get core ids from core_affinity."); - } - } - } else { - panic!("Couldn't get a CPUCore in socket {}", self.id); - } - warn!("msr: {:x}", (MSR_PKG_ENERGY_STATUS as u64)); - warn!("msr: {:b}", (MSR_PKG_ENERGY_STATUS as u64)); - warn!("core_id: {:x} {:b}", (core_id as u64), (core_id as u64)); - warn!("core_id: {:b}", ((core_id as u64) << 54)); - let src = ((core_id as u64) << 32) | (MSR_PKG_ENERGY_STATUS as u64); - let ptr = &src as *const u64; - - warn!("src: {:x}", src); - warn!("src: {:b}", src); - - warn!("*ptr: {}", *ptr); - warn!("*ptr: {:b}", *ptr); - trace!("&request: {:?} ptr (as *const u8): {:?}", &src, ptr); - - match send_request( - device, - MSR_PKG_ENERGY_STATUS, - // nouvelle version à integrer : request_code est ignoré et request doit contenir - // request_code sous forme d'un char * - ptr, - 8, - ptr_result, - size_of::(), - ) { - Ok(res) => { - debug!("{}", res); - - close_handle(device); - - let energy_unit = self - .sensor_data - .get("ENERGY_UNIT") - .unwrap() - .parse::() - .unwrap(); - - let current_power = MsrRAPLSensor::extract_rapl_current_power(msr_result, energy_unit); - warn!("current_power: {}", current_power); - - Ok(Record { - timestamp: current_system_time_since_epoch(), - unit: super::units::Unit::MicroJoule, - value: current_power, - }) + let current_thread = GetCurrentThread(); + let processorgroup_id = self.sensor_data.get("PROCESSORGROUP_ID").unwrap().parse::().unwrap(); + let mut thread_group_affinity: GROUP_AFFINITY = GROUP_AFFINITY { Mask: 255, Group: processorgroup_id, Reserved: [0,0,0] }; + let thread_affinity = GetThreadGroupAffinity(current_thread, &mut thread_group_affinity); + if thread_affinity.as_bool() { + warn!("got thead_affinity : {:?}", thread_group_affinity); + let core_id = self.cpu_cores.last().unwrap().id; //(self.cpu_cores.last().unwrap().id + self.id * self.cpu_cores.len() as u16) as usize + let newaffinity = GROUP_AFFINITY { Mask: (self.cpu_cores.len() + self.id as usize * self.cpu_cores.len() - 1) as usize, Group: processorgroup_id, Reserved: [0, 0, 0]}; + let res = SetThreadGroupAffinity(current_thread, &newaffinity, &mut thread_group_affinity); + if res.as_bool() { + warn!("Asking get_msr_value, from socket, with core_id={}", core_id); + match get_msr_value(core_id as usize, MSR_PKG_ENERGY_STATUS as u64, &self.sensor_data) { + Ok(rec) => { + return Ok(Record { timestamp: current_system_time_since_epoch(), value: rec.value, unit: super::units::Unit::MicroJoule }) }, Err(e) => { - error!("Failed to get data from send_request: {:?}", e); - close_handle(device); - Ok(Record { + error!("Could'nt get MSR value for {}: {}", MSR_PKG_ENERGY_STATUS, e); + return Ok(Record { timestamp: current_system_time_since_epoch(), - unit: super::units::Unit::MicroJoule, value: String::from("0"), + unit: super::units::Unit::MicroJoule }) } } - }, - Err(e) => { - panic!("Couldn't get driver handle : {:?}", e); + } else { + panic!("Couldn't set Thread affinity !"); } + } else { + panic!("Coudld'nt get Thread affinity !"); } } } @@ -336,6 +281,7 @@ impl RecordReader for Domain { warn!("Reading Domain {} on Core {}", self.name, usize_coreid); if let Some(msr_addr) = self.sensor_data.get("MSR_ADDR") { unsafe { + warn!("Asking, from Domain, get_msr_value with core_id={}", usize_coreid); match get_msr_value(usize_coreid, msr_addr.parse::().unwrap(), &self.sensor_data) { Ok(rec) => { return Ok(Record { @@ -374,94 +320,132 @@ impl Sensor for MsrRAPLSensor { let mut topology = Topology::new(sensor_data.clone()); let mut sys = System::new_all(); sys.refresh_all(); - - //TODO fix that to actually count the number of sockets - let mut i: u16 = 0; - let logical_cpus = sys.cpus() ; - let mut nb_cpu_sockets: u16 = 0; - let cpuid = CpuId::new(); - let mut logical_cpus_from_cpuid = 1; - match cpuid.get_extended_topology_info() { - Some(info) => { - for t in info { - if t.level_type() == TopologyType::Core { - logical_cpus_from_cpuid = t.processors(); + + unsafe { + let current_thread = GetCurrentThread(); + + let group_count = GetActiveProcessorGroupCount(); + warn!("GROUP COUNT : {}", group_count); + + for group_id in 0..group_count { + //TODO fix that to actually count the number of sockets + let logical_cpus = sys.cpus() ; + let mut nb_cpu_sockets: u16 = 0; + let cpuid = CpuId::new(); + let mut logical_cpus_from_cpuid = 1; + match cpuid.get_extended_topology_info() { + Some(info) => { + for t in info { + if t.level_type() == TopologyType::Core { + logical_cpus_from_cpuid = t.processors(); + } + } + }, + None => { + panic!("Could'nt get cpuid data."); } } - }, - None => { - panic!("Could'nt get cpuid data."); - } - } - if logical_cpus_from_cpuid <= 1 { - panic!("CpuID data is likely to be wrong."); - } - let mut no_more_sockets = false; - - match core_affinity::get_core_ids() { - Some(core_ids) => { - warn!("CPU SETUP - Cores from core_affinity, len={} : {:?}", core_ids.len(), core_ids); - warn!("CPU SETUP - Logical CPUs from sysinfo: {}", logical_cpus.len()); - while !no_more_sockets { - let start = i * logical_cpus_from_cpuid; - let stop = (i+1)*logical_cpus_from_cpuid; - warn!("Looping over {} .. {}", start, stop); - let mut current_socket = CPUSocket::new(i, vec![], vec![], String::from(""),1, sensor_data.clone()); - for c in start..stop {//core_ids { - if core_affinity::set_for_current(CoreId { id: c.into() }) { - match cpuid.get_vendor_info() { - Some(info) => { - warn!("Got CPU {:?}", info); - }, - None => { - warn!("Couldn't get cpuinfo"); - } - } - warn!("Set core_affinity to {}", c); - match cpuid.get_extended_topology_info() { - Some(info) => { - warn!("Got CPU topo info {:?}", info); - for t in info { - if t.level_type() == TopologyType::Core { - //nb_cpu_sockets = logical_cpus.len() as u16 / t.processors(); - //logical_cpus_from_cpuid = t.processors() - let x2apic_id = t.x2apic_id(); - let socket_id = (x2apic_id & 240) >> 4; // upper bits of x2apic_id are socket_id, mask them, then bit shift to get socket_id - let core_id = x2apic_id & 15; // 4 last bits of x2apic_id are the core_id (per-socket) - warn!("Found socketid={} and coreid={}", socket_id, core_id); - let mut attributes = HashMap::::new(); - let ref_core = logical_cpus.first().unwrap(); - attributes.insert(String::from("frequency"), ref_core.frequency().to_string()); - attributes.insert(String::from("name"), ref_core.name().to_string()); - attributes.insert(String::from("vendor_id"), ref_core.vendor_id().to_string()); - attributes.insert(String::from("brand"), ref_core.brand().to_string()); - warn!("Adding core id {} to socket_id {}", ((i * (logical_cpus_from_cpuid - 1)) + core_id as u16), current_socket.id); - current_socket.add_cpu_core(CPUCore::new((i * (logical_cpus_from_cpuid - 1)) + core_id as u16, attributes)); - warn!("Reviewing sockets : {:?}", topology.get_sockets_passive()); + if logical_cpus_from_cpuid <= 1 { + panic!("CpuID data is likely to be wrong."); + } + let mut i: u16 = 0; + let mut no_more_sockets = false; + warn!("Entering ProcessorGroup {}", group_id); + let newaffinity = GROUP_AFFINITY { Mask: 255, Group: group_id, Reserved: [0, 0, 0]}; + let mut thread_group_affinity: GROUP_AFFINITY = GROUP_AFFINITY { Mask: 255, Group: 0, Reserved: [0,0,0] }; + let thread_affinity = GetThreadGroupAffinity(current_thread, &mut thread_group_affinity); + warn!("Thread group affinity result : {:?}", thread_affinity); + if thread_affinity.as_bool() { + warn!("got thead_affinity : {:?}", thread_group_affinity); + let res = SetThreadGroupAffinity(current_thread, &newaffinity, &mut thread_group_affinity); + if res.as_bool() { + warn!("Have set thread affinity: {:?}", newaffinity); + match core_affinity::get_core_ids() { + Some(core_ids) => { + warn!("CPU SETUP - Cores from core_affinity, len={} : {:?}", core_ids.len(), core_ids); + warn!("CPU SETUP - Logical CPUs from sysinfo: {}", logical_cpus.len()); + while !no_more_sockets { + let start = i * logical_cpus_from_cpuid; + let stop = (i+1)*logical_cpus_from_cpuid; + warn!("Looping over {} .. {}", start, stop); + sensor_data.insert(String::from("PROCESSORGROUP_ID"), group_id.to_string()); + let mut current_socket = CPUSocket::new(i, vec![], vec![], String::from(""),1, sensor_data.clone()); + for c in start..stop {//core_ids { + if core_affinity::set_for_current(CoreId { id: c.into() }) { + match cpuid.get_vendor_info() { + Some(info) => { + warn!("Got CPU {:?}", info); + }, + None => { + warn!("Couldn't get cpuinfo"); + } + } + warn!("Set core_affinity to {}", c); + match cpuid.get_extended_topology_info() { + Some(info) => { + warn!("Got CPU topo info {:?}", info); + for t in info { + if t.level_type() == TopologyType::Core { + //nb_cpu_sockets = logical_cpus.len() as u16 / t.processors(); + //logical_cpus_from_cpuid = t.processors() + let x2apic_id = t.x2apic_id(); + let socket_id = (x2apic_id & 240) >> 4; // upper bits of x2apic_id are socket_id, mask them, then bit shift to get socket_id + current_socket.set_id(socket_id as u16); + let core_id = x2apic_id & 15; // 4 last bits of x2apic_id are the core_id (per-socket) + warn!("Found socketid={} and coreid={}", socket_id, core_id); + let mut attributes = HashMap::::new(); + let ref_core = logical_cpus.first().unwrap(); + attributes.insert(String::from("frequency"), ref_core.frequency().to_string()); + attributes.insert(String::from("name"), ref_core.name().to_string()); + attributes.insert(String::from("vendor_id"), ref_core.vendor_id().to_string()); + attributes.insert(String::from("brand"), ref_core.brand().to_string()); + warn!("Adding core id {} to socket_id {}", ((i * (logical_cpus_from_cpuid - 1)) + core_id as u16), current_socket.id); + current_socket.add_cpu_core(CPUCore::new((i * (logical_cpus_from_cpuid - 1)) + core_id as u16, attributes)); + warn!("Reviewing sockets : {:?}", topology.get_sockets_passive()); + } + } + }, + None => { + warn!("Couldn't get cpu topo info"); + } + } + } else { + no_more_sockets = true; + warn!("There's likely to be no more socket to explore."); + break; } + } + if !no_more_sockets { + warn!("inserting socket {:?}", current_socket); + topology.safe_insert_socket(current_socket); + i = i + 1; } - }, - None => { - warn!("Couldn't get cpu topo info"); + } + nb_cpu_sockets = i; + }, + None => { + panic!("Could'nt get core ids from core_affinity."); + } + } + if let Some(info) = CpuId::new().get_extended_topology_info() { + for c in info { + if c.level_type() == TopologyType::Core { + warn!("CPUID : {:?}", c); } } - } else { - no_more_sockets = true; - warn!("There's likely to be no more socket to explore."); - break; } - } - if !no_more_sockets { - warn!("inserting socket {:?}", current_socket); - topology.safe_insert_socket(current_socket); - i = i + 1; + } else { + error!("Could'nt set thread affinity !"); + let last_error = GetLastError(); + panic!("Error was : {:?}", last_error); } + } else { + warn!("Getting thread group affinity failed !"); + let last_error = GetLastError(); + panic!("Error was: {:?}", last_error); // win32 error 122 is insufficient buffer } - nb_cpu_sockets = i; - }, - None => { - panic!("Could'nt get core ids from core_affinity."); } + //let process_information = GetProcessInformation(current_process, , , ); } //nb_cpu_sockets = logical_cpus.len() as u16 / logical_cpus_from_cpuid; //let mut core_id_counter = logical_cpus.len(); @@ -526,13 +510,14 @@ impl Sensor for MsrRAPLSensor { for s in topology.get_sockets() { warn!("Inspecting CPUSocket: {:?}", s); unsafe { - let core_id = s.get_cores_passive().get(0).unwrap().id; + let core_id = s.get_cores_passive().last().unwrap().id + s.id * s.cpu_cores.len() as u16; + warn!("Asking get_msr_value, from generate_tpopo, with core_id={}", core_id); match get_msr_value(core_id as usize, MSR_DRAM_ENERGY_STATUS as u64, &sensor_data) { Ok(rec) => { warn!("Added domain Dram !"); let mut domain_sensor_data = sensor_data.clone(); domain_sensor_data.insert(String::from("MSR_ADDR"), MSR_DRAM_ENERGY_STATUS.to_string()); - domain_sensor_data.insert(String::from("CORE_ID"), core_id.to_string()); + domain_sensor_data.insert(String::from("CORE_ID"), core_id.to_string()); // nb of cores in a socket * socket_id + local_core_id s.safe_add_domain(Domain::new(2, String::from("dram"), String::from(""), 5, domain_sensor_data)) }, Err(e) => { @@ -586,35 +571,38 @@ impl Sensor for MsrRAPLSensor { } unsafe fn get_msr_value(core_id: usize, msr_addr: u64, sensor_data: &HashMap) -> Result { + let current_process = GetCurrentProcess(); + let current_thread = GetCurrentThread(); + let mut thread_group_affinity = GROUP_AFFINITY { Mask: 255, Group: 9, Reserved: [0,0,0] }; + let thread_affinity_res = GetThreadGroupAffinity(current_thread, &mut thread_group_affinity); + if thread_affinity_res.as_bool() { + warn!("Thread affinity found : {:?}", thread_group_affinity); + } else { + error!("Could'nt get thread group affinity"); + } + let mut process_group_array: [u16; 8] = [0,0,0,0,0,0,0,0]; + let mut process_group_array_len = 8; + let process_affinity_res = GetProcessGroupAffinity(current_process, &mut process_group_array_len, process_group_array.as_mut_ptr()); + if process_affinity_res.as_bool() { + warn!("Process affinity found: {:?}", process_group_array); + } else { + error!("Could'nt get process group affinity"); + error!("Error was : {:?}", GetLastError()); + } + warn!("Core ID requested to the driver : {}", core_id); match get_handle(sensor_data.get("DRIVER_NAME").unwrap()) { Ok(device) => { let mut msr_result: u64 = 0; let ptr_result = &mut msr_result as *mut u64; - let mut core_id: u32 = 0; - // get core numbers tied to the socket - match core_affinity::get_core_ids() { - Some(core_ids) => { - for c in core_ids { - if c.id == core_id as usize { - core_affinity::set_for_current(c); - warn!("Set core_affinity to {}", c.id); - break; - } - } - }, - None => { - warn!("Could'nt get core ids from core_affinity."); - } - } - //warn!("msr: {:x}", (MSR_PKG_ENERGY_STATUS as u64)); - //warn!("msr: {:b}", (MSR_PKG_ENERGY_STATUS as u64)); - //warn!("core_id: {:x} {:b}", (core_id as u64), (core_id as u64)); - //warn!("core_id: {:b}", ((core_id as u64) << 54)); - let src = ((core_id as u64) << 32) | msr_addr; + warn!("msr_addr: {:b}", msr_addr); + warn!("core_id: {:x} {:b}", (core_id as u64), (core_id as u64)); + warn!("core_id: {:b}", ((core_id as u64) << 32)); + let src = ((core_id as u64) << 32) | msr_addr; //let src = ((core_id as u64) << 32) | msr_addr; let ptr = &src as *const u64; - //warn!("src: {:x}", src); - //warn!("src: {:b}", src); + warn!("src: {:x}", src); + warn!("src: {:b}", src); + warn!("*ptr: {:b}", *ptr); //warn!("*ptr: {}", *ptr); //warn!("*ptr: {:b}", *ptr);