From e2a44eac89a333ad9e60ac34ec812a9f11c4caf5 Mon Sep 17 00:00:00 2001 From: "J. Sebastian Paez" Date: Fri, 17 Jan 2025 18:56:40 -0800 Subject: [PATCH 01/11] feat: use of ion mobility in quant --- crates/sage-cli/src/output.rs | 47 ++++++- crates/sage-cli/src/runner.rs | 114 +++++++++++++--- crates/sage-cloudpath/src/tdf.rs | 214 ++++++++++++++++++++++++++++++ crates/sage-cloudpath/src/util.rs | 6 +- crates/sage/src/lfq.rs | 138 +++++++++++++------ crates/sage/src/scoring.rs | 22 +-- crates/sage/src/spectrum.rs | 77 ++++++++++- crates/sage/src/tmt.rs | 4 +- 8 files changed, 533 insertions(+), 89 deletions(-) diff --git a/crates/sage-cli/src/output.rs b/crates/sage-cli/src/output.rs index 9e0fa6d2..31b483f8 100644 --- a/crates/sage-cli/src/output.rs +++ b/crates/sage-cli/src/output.rs @@ -1,10 +1,11 @@ use rayon::prelude::*; -use sage_core::spectrum::ProcessedSpectrum; +use sage_core::spectrum::{ProcessedSpectrum, MS1Spectra}; use sage_core::{scoring::Feature, tmt::TmtQuant}; + #[derive(Default)] pub struct SageResults { - pub ms1: Vec, + pub ms1: MS1Spectra, pub features: Vec, pub quant: Vec, } @@ -19,7 +20,26 @@ impl FromParallelIterator for SageResults { .reduce(SageResults::default, |mut acc, x| { acc.features.extend(x.features); acc.quant.extend(x.quant); - acc.ms1.extend(x.ms1); + match (acc.ms1, x.ms1) { + (MS1Spectra::NoMobility(mut a), MS1Spectra::NoMobility(b)) => { + a.extend(b); + acc.ms1 = MS1Spectra::NoMobility(a); + } + (MS1Spectra::WithMobility(mut a), MS1Spectra::WithMobility(b)) => { + a.extend(b); + acc.ms1 = MS1Spectra::WithMobility(a); + } + (MS1Spectra::Empty, MS1Spectra::Empty) => { + acc.ms1 = MS1Spectra::Empty; + } + (MS1Spectra::Empty, MS1Spectra::WithMobility(a)) | (MS1Spectra::WithMobility(a), MS1Spectra::Empty) => { + acc.ms1 = MS1Spectra::WithMobility(a); + } + (MS1Spectra::Empty, MS1Spectra::NoMobility(a)) | (MS1Spectra::NoMobility(a), MS1Spectra::Empty) => { + acc.ms1 = MS1Spectra::NoMobility(a); + } + _ => {unreachable!()} + }; acc }) } @@ -35,7 +55,26 @@ impl FromIterator for SageResults { .fold(SageResults::default(), |mut acc, x| { acc.features.extend(x.features); acc.quant.extend(x.quant); - acc.ms1.extend(x.ms1); + match (acc.ms1, x.ms1) { + (MS1Spectra::NoMobility(mut a), MS1Spectra::NoMobility(b)) => { + a.extend(b); + acc.ms1 = MS1Spectra::NoMobility(a); + } + (MS1Spectra::WithMobility(mut a), MS1Spectra::WithMobility(b)) => { + a.extend(b); + acc.ms1 = MS1Spectra::WithMobility(a); + } + (MS1Spectra::Empty, MS1Spectra::Empty) => { + acc.ms1 = MS1Spectra::Empty; + } + (MS1Spectra::Empty, MS1Spectra::WithMobility(a)) | (MS1Spectra::WithMobility(a), MS1Spectra::Empty) => { + acc.ms1 = MS1Spectra::WithMobility(a); + } + (MS1Spectra::Empty, MS1Spectra::NoMobility(a)) | (MS1Spectra::NoMobility(a), MS1Spectra::Empty) => { + acc.ms1 = MS1Spectra::NoMobility(a); + } + _ => {unreachable!()} + }; acc }) } diff --git a/crates/sage-cli/src/runner.rs b/crates/sage-cli/src/runner.rs index 9358ceee..ecd47344 100644 --- a/crates/sage-cli/src/runner.rs +++ b/crates/sage-cli/src/runner.rs @@ -14,7 +14,7 @@ use sage_core::mass::Tolerance; use sage_core::peptide::Peptide; use sage_core::scoring::Fragments; use sage_core::scoring::{Feature, Scorer}; -use sage_core::spectrum::{ProcessedSpectrum, SpectrumProcessor}; +use sage_core::spectrum::{MS1Spectra, ProcessedSpectrum, RawSpectrum, SpectrumProcessor}; use sage_core::tmt::TmtQuant; use std::collections::HashMap; use std::collections::HashSet; @@ -26,6 +26,43 @@ pub struct Runner { start: Instant, } +#[derive(Default)] +struct RawSpectrumAccumulator { + pub ms1: Vec, + pub msn: Vec, +} + +impl FromParallelIterator for RawSpectrumAccumulator { + fn from_par_iter(par_iter: I) -> Self + where + I: IntoParallelIterator, + { + let out = par_iter + .into_par_iter() + .fold( + || RawSpectrumAccumulator::default(), + |mut accum, spectrum| { + if spectrum.ms_level == 1 { + accum.ms1.push(spectrum); + } else { + accum.msn.push(spectrum); + } + accum + }, + ) + .reduce( + || RawSpectrumAccumulator::default(), + |mut a, b| { + a.ms1.extend(b.ms1); + a.msn.extend(b.msn); + a + }, + ); + + out + } +} + impl Runner { pub fn new(parameters: Search, parallel: usize) -> anyhow::Result { let mut parameters = parameters.clone(); @@ -82,11 +119,13 @@ impl Runner { } pub fn prefilter_peptides(self, parallel: usize, fasta: Fasta) -> Vec { - let spectra: Option> = - match parallel >= self.parameters.mzml_paths.len() { - true => Some(self.read_processed_spectra(&self.parameters.mzml_paths, 0, 0)), - false => None, - }; + let spectra: Option<( + MS1Spectra, + Vec>, + )> = match parallel >= self.parameters.mzml_paths.len() { + true => Some(self.read_processed_spectra(&self.parameters.mzml_paths, 0, 0)), + false => None, + }; let mut all_peptides: Vec = fasta .iter_chunks(self.parameters.database.prefilter_chunk_size) .enumerate() @@ -112,13 +151,13 @@ impl Runner { override_precursor_charge: self.parameters.override_precursor_charge, max_fragment_charge: self.parameters.max_fragment_charge, chimera: self.parameters.chimera, - report_psms: self.parameters.report_psms + 1, + report_psms: self.parameters.report_psms + 1, // Q: Why is 1 being added here? (JSPP: Feb 2024) wide_window: self.parameters.wide_window, annotate_matches: self.parameters.annotate_matches, score_type: self.parameters.score_type, }; let peptide_idxs: HashSet = match &spectra { - Some(spectra) => self.peptide_filter_processed_spectra(&scorer, spectra), + Some(spectra) => self.peptide_filter_processed_spectra(&scorer, &spectra.1), None => self .parameters .mzml_paths @@ -127,7 +166,7 @@ impl Runner { .flat_map(|(chunk_idx, chunk)| { let spectra_chunk = self.read_processed_spectra(chunk, chunk_idx, parallel); - self.peptide_filter_processed_spectra(&scorer, &spectra_chunk) + self.peptide_filter_processed_spectra(&scorer, &spectra_chunk.1) }) .collect(), } @@ -152,7 +191,7 @@ impl Runner { fn peptide_filter_processed_spectra( &self, scorer: &Scorer, - spectra: &Vec, + spectra: &Vec>, ) -> Vec { use std::sync::atomic::{AtomicUsize, Ordering}; let counter = AtomicUsize::new(0); @@ -207,13 +246,13 @@ impl Runner { fn search_processed_spectra( &self, scorer: &Scorer, - spectra: &Vec, + msn_spectra: &Vec>, ) -> Vec { use std::sync::atomic::{AtomicUsize, Ordering}; let counter = AtomicUsize::new(0); let start = Instant::now(); - let features: Vec<_> = spectra + let features: Vec<_> = msn_spectra .par_iter() .filter(|spec| spec.peaks.len() >= self.parameters.min_peaks && spec.level == 2) .map(|x| { @@ -238,7 +277,8 @@ impl Runner { fn complete_features( &self, - spectra: Vec, + msn_spectra: Vec>, + ms1_spectra: MS1Spectra, features: Vec, ) -> SageResults { let quant = self @@ -251,18 +291,21 @@ impl Runner { if level != 2 && level != 3 { log::warn!("TMT quant level set at {}, is this correct?", level); } - sage_core::tmt::quantify(&spectra, isobaric, Tolerance::Ppm(-20.0, 20.0), level) + sage_core::tmt::quantify(&msn_spectra, isobaric, Tolerance::Ppm(-20.0, 20.0), level) }) .unwrap_or_default(); - let ms1 = spectra.into_iter().filter(|s| s.level == 1).collect(); SageResults { features, quant, - ms1, + ms1: ms1_spectra, } } + fn requires_ms1(&self) -> bool { + self.parameters.quant.lfq + } + fn process_chunk( &self, scorer: &Scorer, @@ -271,8 +314,8 @@ impl Runner { batch_size: usize, ) -> SageResults { let spectra = self.read_processed_spectra(chunk, chunk_idx, batch_size); - let features = self.search_processed_spectra(scorer, &spectra); - self.complete_features(spectra, features) + let features = self.search_processed_spectra(scorer, &spectra.1); + self.complete_features(spectra.1, spectra.0, features) } fn read_processed_spectra( @@ -280,7 +323,10 @@ impl Runner { chunk: &[String], chunk_idx: usize, batch_size: usize, - ) -> Vec { + ) -> ( + MS1Spectra, + Vec>, + ) { // Read all of the spectra at once - this can help prevent memory over-consumption issues info!( "processing files {} .. {} ", @@ -310,7 +356,7 @@ impl Runner { min_deisotope_mz.unwrap_or(0.0), ); - let spectra = chunk + let spectra: RawSpectrumAccumulator = chunk .par_iter() .enumerate() .flat_map(|(idx, path)| { @@ -320,6 +366,7 @@ impl Runner { file_id, sn, self.parameters.bruker_spectrum_processor, + self.requires_ms1(), ); match res { @@ -333,13 +380,36 @@ impl Runner { } } }) - .flat_map_iter(|spectra| spectra.into_iter().map(|s| sp.process(s))) + .flatten() + .collect(); + + let msn_spectra = spectra + .msn + .into_par_iter() + .map(|s| sp.process(s)) .collect::>(); + // Note: Empty iterators return true. + let all_contain_ims = spectra.ms1.iter().all(|x| x.mobility.is_some()); + let ms1_empty = spectra.ms1.is_empty(); + let ms1_spectra = if ms1_empty { + MS1Spectra::Empty + } else if all_contain_ims { + let spectra = spectra + .ms1 + .into_iter() + .map(|x| sp.process_with_mobility(x)) + .collect(); + MS1Spectra::WithMobility(spectra) + } else { + let spectra = spectra.ms1.into_iter().map(|s| sp.process(s)).collect(); + MS1Spectra::NoMobility(spectra) + }; + let io_time = Instant::now() - start; info!("- file IO: {:8} ms", io_time.as_millis()); - spectra + (ms1_spectra, msn_spectra) } pub fn batch_files(&self, scorer: &Scorer, batch_size: usize) -> SageResults { diff --git a/crates/sage-cloudpath/src/tdf.rs b/crates/sage-cloudpath/src/tdf.rs index 01d38e52..f7876fd1 100644 --- a/crates/sage-cloudpath/src/tdf.rs +++ b/crates/sage-cloudpath/src/tdf.rs @@ -3,7 +3,10 @@ use sage_core::{ mass::Tolerance, spectrum::{Precursor, RawSpectrum, Representation}, }; +use timsrust::readers::SpectrumReader; +use timsrust::converters::ConvertableDomain; pub use timsrust::readers::SpectrumReaderConfig as BrukerSpectrumProcessor; +use std::cmp::Ordering; pub struct TdfReader; @@ -13,11 +16,107 @@ impl TdfReader { path_name: impl AsRef, file_id: usize, bruker_spectrum_processor: BrukerSpectrumProcessor, + requires_ms1: bool, ) -> Result, timsrust::TimsRustError> { let spectrum_reader = timsrust::readers::SpectrumReader::build() .with_path(path_name.as_ref()) .with_config(bruker_spectrum_processor) .finalize()?; + let mut spectra = self.read_msn_spectra(file_id, &spectrum_reader)?; + if requires_ms1 { + let ms1s = self.read_ms1_spectra(&path_name, file_id, &spectrum_reader)?; + spectra.extend(ms1s); + } + + Ok(spectra) + } + + fn read_ms1_spectra(&self, path_name: impl AsRef, file_id: usize, spectrum_reader: &SpectrumReader) -> Result, timsrust::TimsRustError> { + let start = std::time::Instant::now(); + let frame_reader = timsrust::readers::FrameReader::new(path_name.as_ref())?; + let tdf_path = std::path::Path::new(path_name.as_ref()).join("analysis.tdf"); + let metadata = timsrust::readers::MetadataReader::new(tdf_path)?; + let mz_converter = metadata.mz_converter; + let ims_converter = metadata.im_converter; + + let ms1_spectra: Vec = frame_reader + .parallel_filter(|f| f.ms_level == timsrust::MSLevel::MS1) + .filter_map(|frame| match frame { + Ok(frame) => { + let mz: Vec = frame + .tof_indices + .iter() + .map(|&x| mz_converter.convert(x as f64) as f32) + .collect(); + let intensity: Vec = frame.intensities.iter().map(|&x| x as f32).collect(); + let mut imss: Vec = vec![0.0; mz.len()]; + // TODO: This is getting pretty big ... I should refactor this block. + frame.scan_offsets.windows(2).enumerate().map(|(i, w)| { + let num = w[1] - w[0]; + if num == 0 { + return None; + } + let lo = w[0]; + let hi = w[1]; + + let im = ims_converter.convert(i as f64) as f32; + Some((im, lo, hi)) + }).for_each(|x| { + if let Some((im, lo, hi)) = x { + for i in lo..hi { + imss[i] = im; + } + } + }); + + // Sort the mzs and intensities by mz + let mut indices: Vec = (0..mz.len()).collect(); + indices.sort_by(|&i, &j| { + mz[i] + .partial_cmp(&mz[j]) + .unwrap_or(std::cmp::Ordering::Equal) + }); + let sorted_mz: Vec = indices.iter().map(|&i| mz[i].clone()).collect(); + let sorted_inten: Vec = + indices.iter().map(|&i| intensity[i].clone()).collect(); + let sorted_imss: Vec = indices.iter().map(|&i| imss[i].clone()).collect(); + + // Squash the mobility dimension + let tol_ppm = 15.0; + let im_tol_pct = 2.0; + let (mz, (intensity, mobility)): (Vec, (Vec, Vec)) = dumbcentroid_frame(&sorted_mz, &sorted_inten, &sorted_imss, tol_ppm, im_tol_pct); + + let scan_start_time = frame.rt as f32 / 60.0; + let ion_injection_time = 100.0; // This is made up, in theory we can read + // if from the tdf file + let total_ion_current = sorted_inten.iter().sum::(); + let id = frame.index.to_string(); + + let spec = RawSpectrum { + file_id, + precursors: vec![], + representation: Representation::Centroid, + scan_start_time, + ion_injection_time, + mz, + ms_level: 1, + id, + intensity, + total_ion_current, + mobility: Some(mobility), + }; + Some(spec) + } + Err(x) => { + log::error!("error parsing spectrum: {:?}", x); + None + } + }).collect(); + log::info!("read {} ms1 spectra in {:#?}", ms1_spectra.len(), start.elapsed()); + Ok(ms1_spectra) + } + + fn read_msn_spectra(&self, file_id: usize, spectrum_reader: &SpectrumReader) -> Result, timsrust::TimsRustError> { let spectra: Vec = (0..spectrum_reader.len()) .into_par_iter() .filter_map(|index| match spectrum_reader.get(index) { @@ -39,6 +138,7 @@ impl TdfReader { ms_level: 2, id: dda_spectrum.index.to_string(), intensity: dda_spectrum.intensities.iter().map(|&x| x as f32).collect(), + mobility: None, }; Some(spectrum) } @@ -66,3 +166,117 @@ impl TdfReader { precursor } } + +fn dumbcentroid_frame(mz_array: &[f32], intensity_array: &[f32], ims_array: &[f32], mz_tol_ppm: f32, im_tol_pct: f32) -> (Vec, (Vec, Vec)) { + // Make sure the mz array is sorted + assert!(mz_array.windows(2).all(|x| x[0] <= x[1])); + + let arr_len = mz_array.len(); + let mut touched = vec![false; arr_len]; + let mut global_num_touched = 0; + + let mut order: Vec = (0..arr_len).collect(); + order.sort_unstable_by(|&a, &b| { + intensity_array[b] + .partial_cmp(&intensity_array[a]) + .unwrap_or(Ordering::Equal) + }); + + #[derive(Debug, Clone, Copy, Default)] + struct ImsPeak { + mz: f32, + intensity: f32, + im: f32, + } + let mut agg_buff = Vec::with_capacity(10_000.min(arr_len)); + let mut touch_buff = [false; 1000]; + + let utol = mz_tol_ppm / 1e6; + let im_tol = im_tol_pct / 100.0; + + for &idx in &order { + if touched[idx] { + continue; + } + + let mz = mz_array[idx]; + let im = ims_array[idx]; + let da_tol = mz * utol; + let left_e = mz - da_tol; + let right_e = mz + da_tol; + + let ss_start = mz_array.partition_point(|&x| x < left_e); + let ss_end = mz_array.partition_point(|&x| x <= right_e); + + let slice_width = ss_end - ss_start; + if slice_width > 1000 { + println!("slice_width: {}", slice_width); + println!("mz: {:.4}", mz); + println!("Limits: {:.4}, {:.4}", left_e, right_e); + println!("ss_start: {}, ss_end: {}", ss_start, ss_end); + } + let local_num_touched = touched[ss_start..ss_end].iter().filter(|&&x| x).count(); + let local_num_untouched = slice_width - local_num_touched; + + if local_num_touched == slice_width { + continue; + } + + let abs_im_tol = im * im_tol; + let left_im = im - abs_im_tol; + let right_im = im + abs_im_tol; + + let mut curr_intensity = 0.0; + + let mut num_touchable = 0; + for i in ss_start..ss_end { + let im_i = ims_array[i]; + if !touched[i] && intensity_array[i] > 0.0 && im_i >= left_im && im_i <= right_im { + curr_intensity += intensity_array[i]; + num_touchable += 1; + touch_buff[i - ss_start] = true; + } + } + + if curr_intensity > 0.0 { + agg_buff.push(ImsPeak { + mz, + intensity: curr_intensity, + im, + }); + touched[ss_start..ss_end].iter_mut().zip( + touch_buff.iter_mut().take(slice_width) + ).for_each(|(t, tb)| { + *t = true; + *tb = false; + }); + global_num_touched += num_touchable; + const MAX_PEAKS: usize = 10000; + if agg_buff.len() > MAX_PEAKS { + log::debug!("Reached limit of the agg buffer at index {}/{}", idx, arr_len); + break; + } + } + + if global_num_touched == arr_len { + break; + } + } + + assert!(touch_buff.iter().all(|x| !x), "{:?}", touch_buff); + + // Drop the zeros and sort + // I could in theory truncate instead of filtering. + let mut result: Vec<(f32, (f32, f32))> = agg_buff.iter() + .filter(|&x| x.mz > 0.0 && x.intensity > 0.0) + .map(|x| (x.mz, (x.intensity, x.im as f32))) + .collect(); + + result.sort_unstable_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(Ordering::Equal)); + // println!("Centroiding: Start len: {}; end len: {};", arr_len, result.len()); + // Ultra data is usually start: 40k end 10k, + // HT2 data is usually start 400k end 40k + + result.into_iter().unzip() +} + diff --git a/crates/sage-cloudpath/src/util.rs b/crates/sage-cloudpath/src/util.rs index 3bbc500e..be56dc53 100644 --- a/crates/sage-cloudpath/src/util.rs +++ b/crates/sage-cloudpath/src/util.rs @@ -43,11 +43,12 @@ pub fn read_spectra>( file_id: usize, sn: Option, bruker_processor: BrukerSpectrumProcessor, + requires_ms1: bool, ) -> Result, Error> { match identify_format(path.as_ref()) { FileFormat::MzML => read_mzml(path, file_id, sn), FileFormat::MGF => read_mgf(path, file_id), - FileFormat::TDF => read_tdf(path, file_id, bruker_processor), + FileFormat::TDF => read_tdf(path, file_id, bruker_processor, requires_ms1), FileFormat::Unidentified => panic!("Unable to get type for '{}'", path.as_ref()), // read_mzml(path, file_id, sn), } } @@ -69,8 +70,9 @@ pub fn read_tdf>( s: S, file_id: usize, bruker_spectrum_processor: BrukerSpectrumProcessor, + requires_ms1: bool, ) -> Result, Error> { - let res = crate::tdf::TdfReader.parse(s, file_id, bruker_spectrum_processor); + let res = crate::tdf::TdfReader.parse(s, file_id, bruker_spectrum_processor, requires_ms1); match res { Ok(t) => Ok(t), Err(e) => Err(Error::TDF(e)), diff --git a/crates/sage/src/lfq.rs b/crates/sage/src/lfq.rs index 783f73c7..4987d133 100644 --- a/crates/sage/src/lfq.rs +++ b/crates/sage/src/lfq.rs @@ -2,7 +2,8 @@ use crate::database::{binary_search_slice, IndexedDatabase, PeptideIx}; use crate::mass::{composition, Composition, Tolerance, NEUTRON}; use crate::ml::{matrix::Matrix, retention_alignment::Alignment}; use crate::scoring::Feature; -use crate::spectrum::ProcessedSpectrum; +use crate::spectrum::{MS1Spectra, ProcessedSpectrum}; +use crate::spectrum; use dashmap::DashMap; use rayon::prelude::*; use serde::{Deserialize, Serialize}; @@ -68,6 +69,7 @@ pub struct PrecursorRange { pub rt: f32, pub mass_lo: f32, pub mass_hi: f32, + pub mobility: f32, pub charge: u8, pub isotope: usize, pub peptide: PeptideIx, @@ -78,8 +80,8 @@ pub struct PrecursorRange { /// Create a data structure analogous to [`IndexedDatabase`] - instaed of /// storing fragment masses binned by precursor mass, store MS1 precursors /// binned by RT - This should enable rapid quantification as well -pub struct FeatureMap { - pub ranges: Vec, +pub struct FeatureMap { + pub ranges: Vec, pub min_rts: Vec, pub bin_size: usize, pub settings: LfqSettings, @@ -89,7 +91,7 @@ pub fn build_feature_map( settings: LfqSettings, precursor_charge: (u8, u8), features: &[Feature], -) -> FeatureMap { +) -> FeatureMap { let map: DashMap = DashMap::default(); features .iter() @@ -112,6 +114,7 @@ pub fn build_feature_map( charge: feat.charge, isotope: 0, file_id: feat.file_id, + mobility: feat.ims, decoy: false, }, ); @@ -163,7 +166,7 @@ pub fn build_feature_map( .par_chunks_mut(16 * 1024) .map(|chunk| { // There should always be at least one item in the chunk! - // we know the chunk is already sorted by fragment_mz too, so this is minimum value + // we know the chunk is already sorted by retention time too, so this is minimum value let min = chunk[0].rt; chunk.par_sort_unstable_by(|a, b| a.mass_lo.total_cmp(&b.mass_lo)); min @@ -178,8 +181,8 @@ pub fn build_feature_map( } } -struct Query<'a> { - ranges: &'a [PrecursorRange], +struct Query<'a, T> { + ranges: &'a [T], page_lo: usize, page_hi: usize, bin_size: usize, @@ -187,8 +190,8 @@ struct Query<'a> { max_rt: f32, } -impl FeatureMap { - fn rt_slice(&self, rt: f32, rt_tol: f32) -> Query<'_> { +impl FeatureMap { + fn rt_slice(&self, rt: f32, rt_tol: f32) -> Query<'_, T> { let (page_lo, page_hi) = binary_search_slice( &self.min_rts, |rt, x| rt.total_cmp(x), @@ -205,50 +208,91 @@ impl FeatureMap { min_rt: rt - rt_tol, } } +} +impl FeatureMap { /// Run label-free quantification module pub fn quantify( &self, db: &IndexedDatabase, - spectra: &[ProcessedSpectrum], + spectra: &MS1Spectra, alignments: &[Alignment], ) -> HashMap<(PrecursorId, bool), (Peak, Vec), fnv::FnvBuildHasher> { let scores: DashMap<(PrecursorId, bool), Grid, fnv::FnvBuildHasher> = DashMap::default(); log::info!("tracing MS1 features"); - spectra - .par_iter() - .filter(|s| s.level == 1) - .for_each(|spectrum| { - let a = alignments[spectrum.file_id]; - let rt = (spectrum.scan_start_time / a.max_rt) * a.slope + a.intercept; - let query = self.rt_slice(rt, RT_TOL); - - for peak in &spectrum.peaks { - for entry in query.mass_lookup(peak.mass) { - let id = match self.settings.combine_charge_states { - true => PrecursorId::Combined(entry.peptide), - false => PrecursorId::Charged((entry.peptide, entry.charge)), - }; - - let mut grid = scores.entry((id, entry.decoy)).or_insert_with(|| { - let p = &db[entry.peptide]; - let composition = p - .sequence - .iter() - .map(|r| composition(*r)) - .sum::(); - let dist = crate::isotopes::peptide_isotopes( - composition.carbon, - composition.sulfur, - ); - Grid::new(entry, RT_TOL, dist, alignments.len(), GRID_SIZE) - }); - - grid.add_entry(rt, entry.isotope, spectrum.file_id, peak.intensity); + + // TODO: find a good way to abstract this ... I think a macro would be the way to go. + match spectra { + MS1Spectra::NoMobility(spectra) => { + spectra.par_iter().for_each(|spectrum| { + let a = alignments[spectrum.file_id]; + let rt = (spectrum.scan_start_time / a.max_rt) * a.slope + a.intercept; + let query = self.rt_slice(rt, RT_TOL); + + for peak in &spectrum.peaks { + for entry in query.mass_lookup(peak.mass) { + let id = match self.settings.combine_charge_states { + true => PrecursorId::Combined(entry.peptide), + false => PrecursorId::Charged((entry.peptide, entry.charge)), + }; + + let mut grid = scores.entry((id, entry.decoy)).or_insert_with(|| { + let p = &db[entry.peptide]; + let composition = p + .sequence + .iter() + .map(|r| composition(*r)) + .sum::(); + let dist = crate::isotopes::peptide_isotopes( + composition.carbon, + composition.sulfur, + ); + Grid::new(entry, RT_TOL, dist, alignments.len(), GRID_SIZE) + }); + + grid.add_entry(rt, entry.isotope, spectrum.file_id, peak.intensity); + } + } + })}, + MS1Spectra::WithMobility(spectra) => { + spectra.par_iter().for_each(|spectrum|{ + let a = alignments[spectrum.file_id]; + let rt = (spectrum.scan_start_time / a.max_rt) * a.slope + a.intercept; + let query = self.rt_slice(rt, RT_TOL); + + for peak in &spectrum.peaks { + for entry in query.mass_mobility_lookup(peak.mass, peak.mobility) { + let id = match self.settings.combine_charge_states { + true => PrecursorId::Combined(entry.peptide), + false => PrecursorId::Charged((entry.peptide, entry.charge)), + }; + + let mut grid = scores.entry((id, entry.decoy)).or_insert_with(|| { + let p = &db[entry.peptide]; + let composition = p + .sequence + .iter() + .map(|r| composition(*r)) + .sum::(); + let dist = crate::isotopes::peptide_isotopes( + composition.carbon, + composition.sulfur, + ); + Grid::new(entry, RT_TOL, dist, alignments.len(), GRID_SIZE) + }); + + grid.add_entry(rt, entry.isotope, spectrum.file_id, peak.intensity); + } } - } - }); + + })}, + MS1Spectra::Empty => { + // Should never be called if no MS1 spectra are present + log::warn!("no MS1 spectra found for quantification"); + } + + }; log::info!("integrating MS1 features"); @@ -608,7 +652,7 @@ fn convolve(slice: &[f64], kernel: &[f64]) -> Vec { .collect() } -impl<'a> Query<'a> { +impl<'a> Query<'a, PrecursorRange> { pub fn mass_lookup(&self, mass: f32) -> impl Iterator { (self.page_lo..self.page_hi).flat_map(move |page| { let left_idx = page * self.bin_size; @@ -636,4 +680,12 @@ impl<'a> Query<'a> { }) }) } + + pub fn mass_mobility_lookup(&self, mass: f32, mobility: f32) -> impl Iterator { + self.mass_lookup(mass).filter(move |precursor| { + // TODO: replace this magic number with a patameter. + precursor.mobility >= (mobility - 0.01) + && precursor.mobility <= (mobility + 0.01) + }) + } } diff --git a/crates/sage/src/scoring.rs b/crates/sage/src/scoring.rs index 44afbdd6..2ae1242f 100644 --- a/crates/sage/src/scoring.rs +++ b/crates/sage/src/scoring.rs @@ -2,7 +2,7 @@ use crate::database::{IndexedDatabase, PeptideIx}; use crate::heap::bounded_min_heapify; use crate::ion_series::{IonSeries, Kind}; use crate::mass::{Tolerance, NEUTRON, PROTON}; -use crate::spectrum::{Precursor, ProcessedSpectrum}; +use crate::spectrum::{Peak, Precursor, ProcessedSpectrum}; use serde::{Deserialize, Serialize}; use std::ops::AddAssign; use std::sync::atomic::{AtomicUsize, Ordering}; @@ -245,7 +245,7 @@ fn max_fragment_charge(max_fragment_charge: Option, precursor_charge: u8) -> impl<'db> Scorer<'db> { pub fn quick_score( &self, - query: &ProcessedSpectrum, + query: &ProcessedSpectrum, prefilter_low_memory: bool, ) -> Vec { assert_eq!( @@ -284,7 +284,7 @@ impl<'db> Scorer<'db> { } } - pub fn score(&self, query: &ProcessedSpectrum) -> Vec { + pub fn score(&self, query: &ProcessedSpectrum) -> Vec { assert_eq!( query.level, 2, "internal bug, trying to score a non-MS2 scan!" @@ -321,7 +321,7 @@ impl<'db> Scorer<'db> { /// in sorted order. fn matched_peaks_with_isotope( &self, - query: &ProcessedSpectrum, + query: &ProcessedSpectrum, precursor_mass: f32, precursor_charge: u8, precursor_tol: Tolerance, @@ -369,7 +369,7 @@ impl<'db> Scorer<'db> { fn matched_peaks( &self, - query: &ProcessedSpectrum, + query: &ProcessedSpectrum, precursor_mass: f32, precursor_charge: u8, precursor_tol: Tolerance, @@ -401,7 +401,7 @@ impl<'db> Scorer<'db> { } } - fn initial_hits(&self, query: &ProcessedSpectrum, precursor: &Precursor) -> InitialHits { + fn initial_hits(&self, query: &ProcessedSpectrum, precursor: &Precursor) -> InitialHits { // Sage operates on masses without protons; [M] instead of [MH+] let mz = precursor.mz - PROTON; @@ -448,7 +448,7 @@ impl<'db> Scorer<'db> { } /// Score a single [`ProcessedSpectrum`] against the database - pub fn score_standard(&self, query: &ProcessedSpectrum) -> Vec { + pub fn score_standard(&self, query: &ProcessedSpectrum) -> Vec { let precursor = query.precursors.first().unwrap_or_else(|| { panic!("missing MS1 precursor for {}", query.id); }); @@ -463,7 +463,7 @@ impl<'db> Scorer<'db> { /// best PSMs ([`Feature`]) fn build_features( &self, - query: &ProcessedSpectrum, + query: &ProcessedSpectrum, precursor: &Precursor, hits: &InitialHits, report_psms: usize, @@ -576,7 +576,7 @@ impl<'db> Scorer<'db> { } /// Remove peaks matching a PSM from a query spectrum - fn remove_matched_peaks(&self, query: &mut ProcessedSpectrum, psm: &Feature) { + fn remove_matched_peaks(&self, query: &mut ProcessedSpectrum, psm: &Feature) { let peptide = &self.db[psm.peptide_idx]; let fragments = self .db @@ -612,7 +612,7 @@ impl<'db> Scorer<'db> { /// Return multiple PSMs for each spectra - first is the best match, second PSM is the best match /// after all theoretical peaks assigned to the best match are removed, etc - pub fn score_chimera_fast(&self, query: &ProcessedSpectrum) -> Vec { + pub fn score_chimera_fast(&self, query: &ProcessedSpectrum) -> Vec { let precursor = query.precursors.first().unwrap_or_else(|| { panic!("missing MS1 precursor for {}", query.id); }); @@ -641,7 +641,7 @@ impl<'db> Scorer<'db> { /// Calculate full hyperscore for a given PSM fn score_candidate( &self, - query: &ProcessedSpectrum, + query: &ProcessedSpectrum, pre_score: &PreScore, ) -> (Score, Option) { let mut score = Score { diff --git a/crates/sage/src/spectrum.rs b/crates/sage/src/spectrum.rs index e8fb4c0d..d2a9ff02 100644 --- a/crates/sage/src/spectrum.rs +++ b/crates/sage/src/spectrum.rs @@ -1,5 +1,6 @@ use crate::database::binary_search_slice; use crate::mass::{Tolerance, NEUTRON, PROTON}; +use crate::spectrum; /// A charge-less peak at monoisotopic mass #[derive(PartialEq, Copy, Clone, Default, Debug)] @@ -24,6 +25,31 @@ impl Ord for Peak { } } +/// A charge-less peak at monoisotopic mass with ion mobility +#[derive(PartialEq, Copy, Clone, Default, Debug)] +pub struct IMPeak { + pub intensity: f32, + pub mass: f32, + pub mobility: f32, // I would use f16 but its not stable +} + +impl Eq for IMPeak {} + +impl PartialOrd for IMPeak { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for IMPeak { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.intensity + .total_cmp(&other.intensity) + .then_with(|| self.mass.total_cmp(&other.mass)) + .then_with(|| self.mobility.total_cmp(&other.mobility)) + } +} + /// A de-isotoped peak, that might have some charge state information #[derive(PartialEq, PartialOrd, Debug, Copy, Clone)] pub struct Deisotoped { @@ -55,7 +81,7 @@ pub struct Precursor { } #[derive(Clone, Default, Debug)] -pub struct ProcessedSpectrum { +pub struct ProcessedSpectrum { /// MSn level pub level: u8, /// Scan ID @@ -69,7 +95,7 @@ pub struct ProcessedSpectrum { /// Selected ions for precursors, if `level > 1` pub precursors: Vec, /// MS peaks, sorted by mass in ascending order - pub peaks: Vec, + pub peaks: Vec, /// Total ion current pub total_ion_current: f32, } @@ -97,6 +123,8 @@ pub struct RawSpectrum { pub mz: Vec, /// Intensity array pub intensity: Vec, + /// Mobility array + pub mobility: Option>, } impl RawSpectrum { @@ -122,6 +150,18 @@ impl Default for Representation { } } +pub enum MS1Spectra { + NoMobility(Vec>), + WithMobility(Vec>), + Empty, +} + +impl Default for MS1Spectra { + fn default() -> Self { + Self::Empty + } +} + /// Binary search followed by linear search to select the most intense peak within `tolerance` window /// * `offset` - this parameter allows for a static adjustment to the lower and upper bounds of the search window. /// Sage subtracts a proton (and assumes z=1) for all experimental peaks, and stores all fragments as monoisotopic @@ -237,14 +277,13 @@ pub fn path_compression(peaks: &mut [Deisotoped]) { } } -impl ProcessedSpectrum { +impl ProcessedSpectrum { pub fn extract_ms1_precursor(&self) -> Option<(f32, u8)> { let precursor = self.precursors.first()?; let charge = precursor.charge?; let mass = (precursor.mz - PROTON) * charge as f32; Some((mass, charge)) } - pub fn in_isolation_window(&self, mz: f32) -> Option { let precursor = self.precursors.first()?; let (lo, hi) = precursor.isolation_window?.bounds(precursor.mz - PROTON); @@ -327,7 +366,7 @@ impl SpectrumProcessor { } } - pub fn process(&self, spectrum: RawSpectrum) -> ProcessedSpectrum { + pub fn process(&self, spectrum: RawSpectrum) -> ProcessedSpectrum { let mut peaks = match spectrum.ms_level { 2 => self.process_ms2(self.deisotope, &spectrum), _ => spectrum @@ -341,6 +380,34 @@ impl SpectrumProcessor { .collect::>(), }; + peaks.sort_by(|a, b| a.mass.total_cmp(&b.mass)); + let total_ion_current = peaks.iter().map(|peak| peak.intensity).sum::(); + + ProcessedSpectrum { + level: spectrum.ms_level, + id: spectrum.id, + file_id: spectrum.file_id, + scan_start_time: spectrum.scan_start_time, + ion_injection_time: spectrum.ion_injection_time, + precursors: spectrum.precursors, + peaks, + total_ion_current, + } + } + + pub fn process_with_mobility(&self, spectrum: RawSpectrum) -> ProcessedSpectrum { + assert!(spectrum.ms_level == 1, "Logic error, mobility processing should only be used for MS1"); + let mut peaks = spectrum + .mz + .iter() + .zip(spectrum.intensity.iter().zip(spectrum.mobility.unwrap().iter())) + .map(|(&mass, (&intensity, &mobility))| { + let mass = (mass - PROTON) * 1.0; + IMPeak { mass, intensity, mobility } + }) + .collect::>(); + + peaks.sort_by(|a, b| a.mass.total_cmp(&b.mass)); let total_ion_current = peaks.iter().map(|peak| peak.intensity).sum::(); diff --git a/crates/sage/src/tmt.rs b/crates/sage/src/tmt.rs index f63120cd..00b6030d 100644 --- a/crates/sage/src/tmt.rs +++ b/crates/sage/src/tmt.rs @@ -183,7 +183,7 @@ pub struct Quant<'ms3> { /// Quanitified TMT reporter ion intensities pub intensities: Vec>, /// MS3 spectrum - pub spectrum: &'ms3 ProcessedSpectrum, + pub spectrum: &'ms3 ProcessedSpectrum, } /// Return a vector containing the peaks closest to the m/zs defined in @@ -305,7 +305,7 @@ pub struct TmtQuant { /// * `isobaric_tolerance`: specify label tolerance /// * `level`: MSn level to extract isobaric peaks from pub fn quantify( - spectra: &[ProcessedSpectrum], + spectra: &[ProcessedSpectrum], isobaric_labels: &Isobaric, isobaric_tolerance: Tolerance, level: u8, From 74d3a3d2d4b34920571319023e93c592e9cdfb3d Mon Sep 17 00:00:00 2001 From: "J. Sebastian Paez" Date: Fri, 17 Jan 2025 22:45:05 -0800 Subject: [PATCH 02/11] chore: better notice of potential overflow in centroiding and cargo fmt --- crates/sage-cli/src/output.rs | 23 +++-- crates/sage-cli/src/runner.rs | 4 +- crates/sage-cloudpath/src/tdf.rs | 141 +++++++++++++++++++++---------- crates/sage/src/spectrum.rs | 21 +++-- 4 files changed, 130 insertions(+), 59 deletions(-) diff --git a/crates/sage-cli/src/output.rs b/crates/sage-cli/src/output.rs index 31b483f8..8414dffc 100644 --- a/crates/sage-cli/src/output.rs +++ b/crates/sage-cli/src/output.rs @@ -1,8 +1,7 @@ use rayon::prelude::*; -use sage_core::spectrum::{ProcessedSpectrum, MS1Spectra}; +use sage_core::spectrum::{MS1Spectra, ProcessedSpectrum}; use sage_core::{scoring::Feature, tmt::TmtQuant}; - #[derive(Default)] pub struct SageResults { pub ms1: MS1Spectra, @@ -32,13 +31,17 @@ impl FromParallelIterator for SageResults { (MS1Spectra::Empty, MS1Spectra::Empty) => { acc.ms1 = MS1Spectra::Empty; } - (MS1Spectra::Empty, MS1Spectra::WithMobility(a)) | (MS1Spectra::WithMobility(a), MS1Spectra::Empty) => { + (MS1Spectra::Empty, MS1Spectra::WithMobility(a)) + | (MS1Spectra::WithMobility(a), MS1Spectra::Empty) => { acc.ms1 = MS1Spectra::WithMobility(a); } - (MS1Spectra::Empty, MS1Spectra::NoMobility(a)) | (MS1Spectra::NoMobility(a), MS1Spectra::Empty) => { + (MS1Spectra::Empty, MS1Spectra::NoMobility(a)) + | (MS1Spectra::NoMobility(a), MS1Spectra::Empty) => { acc.ms1 = MS1Spectra::NoMobility(a); } - _ => {unreachable!()} + _ => { + unreachable!() + } }; acc }) @@ -67,13 +70,17 @@ impl FromIterator for SageResults { (MS1Spectra::Empty, MS1Spectra::Empty) => { acc.ms1 = MS1Spectra::Empty; } - (MS1Spectra::Empty, MS1Spectra::WithMobility(a)) | (MS1Spectra::WithMobility(a), MS1Spectra::Empty) => { + (MS1Spectra::Empty, MS1Spectra::WithMobility(a)) + | (MS1Spectra::WithMobility(a), MS1Spectra::Empty) => { acc.ms1 = MS1Spectra::WithMobility(a); } - (MS1Spectra::Empty, MS1Spectra::NoMobility(a)) | (MS1Spectra::NoMobility(a), MS1Spectra::Empty) => { + (MS1Spectra::Empty, MS1Spectra::NoMobility(a)) + | (MS1Spectra::NoMobility(a), MS1Spectra::Empty) => { acc.ms1 = MS1Spectra::NoMobility(a); } - _ => {unreachable!()} + _ => { + unreachable!() + } }; acc }) diff --git a/crates/sage-cli/src/runner.rs b/crates/sage-cli/src/runner.rs index ecd47344..56a58f7e 100644 --- a/crates/sage-cli/src/runner.rs +++ b/crates/sage-cli/src/runner.rs @@ -106,10 +106,10 @@ impl Runner { }; info!( - "generated {} fragments, {} peptides in {}ms", + "generated {} fragments, {} peptides in {:#?}", database.fragments.len(), database.peptides.len(), - (Instant::now() - start).as_millis() + (start.elapsed()) ); Ok(Self { database, diff --git a/crates/sage-cloudpath/src/tdf.rs b/crates/sage-cloudpath/src/tdf.rs index f7876fd1..f8370668 100644 --- a/crates/sage-cloudpath/src/tdf.rs +++ b/crates/sage-cloudpath/src/tdf.rs @@ -3,10 +3,10 @@ use sage_core::{ mass::Tolerance, spectrum::{Precursor, RawSpectrum, Representation}, }; -use timsrust::readers::SpectrumReader; +use std::cmp::Ordering; use timsrust::converters::ConvertableDomain; +use timsrust::readers::SpectrumReader; pub use timsrust::readers::SpectrumReaderConfig as BrukerSpectrumProcessor; -use std::cmp::Ordering; pub struct TdfReader; @@ -31,7 +31,12 @@ impl TdfReader { Ok(spectra) } - fn read_ms1_spectra(&self, path_name: impl AsRef, file_id: usize, spectrum_reader: &SpectrumReader) -> Result, timsrust::TimsRustError> { + fn read_ms1_spectra( + &self, + path_name: impl AsRef, + file_id: usize, + spectrum_reader: &SpectrumReader, + ) -> Result, timsrust::TimsRustError> { let start = std::time::Instant::now(); let frame_reader = timsrust::readers::FrameReader::new(path_name.as_ref())?; let tdf_path = std::path::Path::new(path_name.as_ref()).join("analysis.tdf"); @@ -51,23 +56,28 @@ impl TdfReader { let intensity: Vec = frame.intensities.iter().map(|&x| x as f32).collect(); let mut imss: Vec = vec![0.0; mz.len()]; // TODO: This is getting pretty big ... I should refactor this block. - frame.scan_offsets.windows(2).enumerate().map(|(i, w)| { - let num = w[1] - w[0]; - if num == 0 { - return None; - } - let lo = w[0]; - let hi = w[1]; - - let im = ims_converter.convert(i as f64) as f32; - Some((im, lo, hi)) - }).for_each(|x| { - if let Some((im, lo, hi)) = x { - for i in lo..hi { - imss[i] = im; + frame + .scan_offsets + .windows(2) + .enumerate() + .map(|(i, w)| { + let num = w[1] - w[0]; + if num == 0 { + return None; } - } - }); + let lo = w[0]; + let hi = w[1]; + + let im = ims_converter.convert(i as f64) as f32; + Some((im, lo, hi)) + }) + .for_each(|x| { + if let Some((im, lo, hi)) = x { + for i in lo..hi { + imss[i] = im; + } + } + }); // Sort the mzs and intensities by mz let mut indices: Vec = (0..mz.len()).collect(); @@ -84,11 +94,18 @@ impl TdfReader { // Squash the mobility dimension let tol_ppm = 15.0; let im_tol_pct = 2.0; - let (mz, (intensity, mobility)): (Vec, (Vec, Vec)) = dumbcentroid_frame(&sorted_mz, &sorted_inten, &sorted_imss, tol_ppm, im_tol_pct); + let (mz, (intensity, mobility)): (Vec, (Vec, Vec)) = + dumbcentroid_frame( + &sorted_mz, + &sorted_inten, + &sorted_imss, + tol_ppm, + im_tol_pct, + ); let scan_start_time = frame.rt as f32 / 60.0; let ion_injection_time = 100.0; // This is made up, in theory we can read - // if from the tdf file + // if from the tdf file let total_ion_current = sorted_inten.iter().sum::(); let id = frame.index.to_string(); @@ -111,12 +128,21 @@ impl TdfReader { log::error!("error parsing spectrum: {:?}", x); None } - }).collect(); - log::info!("read {} ms1 spectra in {:#?}", ms1_spectra.len(), start.elapsed()); + }) + .collect(); + log::info!( + "read {} ms1 spectra in {:#?}", + ms1_spectra.len(), + start.elapsed() + ); Ok(ms1_spectra) } - fn read_msn_spectra(&self, file_id: usize, spectrum_reader: &SpectrumReader) -> Result, timsrust::TimsRustError> { + fn read_msn_spectra( + &self, + file_id: usize, + spectrum_reader: &SpectrumReader, + ) -> Result, timsrust::TimsRustError> { let spectra: Vec = (0..spectrum_reader.len()) .into_par_iter() .filter_map(|index| match spectrum_reader.get(index) { @@ -167,7 +193,13 @@ impl TdfReader { } } -fn dumbcentroid_frame(mz_array: &[f32], intensity_array: &[f32], ims_array: &[f32], mz_tol_ppm: f32, im_tol_pct: f32) -> (Vec, (Vec, Vec)) { +fn dumbcentroid_frame( + mz_array: &[f32], + intensity_array: &[f32], + ims_array: &[f32], + mz_tol_ppm: f32, + im_tol_pct: f32, +) -> (Vec, (Vec, Vec)) { // Make sure the mz array is sorted assert!(mz_array.windows(2).all(|x| x[0] <= x[1])); @@ -189,7 +221,8 @@ fn dumbcentroid_frame(mz_array: &[f32], intensity_array: &[f32], ims_array: &[f3 im: f32, } let mut agg_buff = Vec::with_capacity(10_000.min(arr_len)); - let mut touch_buff = [false; 1000]; + const TOUCH_BUFF_SIZE: usize = 1000; + let mut touch_buff = [false; TOUCH_BUFF_SIZE]; let utol = mz_tol_ppm / 1e6; let im_tol = im_tol_pct / 100.0; @@ -205,15 +238,26 @@ fn dumbcentroid_frame(mz_array: &[f32], intensity_array: &[f32], ims_array: &[f3 let left_e = mz - da_tol; let right_e = mz + da_tol; - let ss_start = mz_array.partition_point(|&x| x < left_e); - let ss_end = mz_array.partition_point(|&x| x <= right_e); - - let slice_width = ss_end - ss_start; - if slice_width > 1000 { - println!("slice_width: {}", slice_width); - println!("mz: {:.4}", mz); - println!("Limits: {:.4}, {:.4}", left_e, right_e); - println!("ss_start: {}, ss_end: {}", ss_start, ss_end); + let mut ss_start = mz_array.partition_point(|&x| x < left_e); + let mut ss_end = mz_array.partition_point(|&x| x <= right_e); + + let mut slice_width = ss_end - ss_start; + if slice_width >= 1000 { + // It is EXCEEDINGLY UNLIKELY that more than 1000 points + // will be aggregated or in the range of a single mz peak. + // Here we just handle those edge cases by making sure the 'center' + // will be aggregated. + + let new_ss_start = idx.saturating_sub(TOUCH_BUFF_SIZE / 2); + let new_ss_end = new_ss_start + TOUCH_BUFF_SIZE - 1; + // TODO: make a better warning message here. + log::warn!( + "More than {} points are in the mz range of a point, limiting span", + TOUCH_BUFF_SIZE + ); + ss_start = new_ss_start; + ss_end = new_ss_end; + slice_width = ss_end - ss_start; } let local_num_touched = touched[ss_start..ss_end].iter().filter(|&&x| x).count(); let local_num_untouched = slice_width - local_num_touched; @@ -244,16 +288,25 @@ fn dumbcentroid_frame(mz_array: &[f32], intensity_array: &[f32], ims_array: &[f3 intensity: curr_intensity, im, }); - touched[ss_start..ss_end].iter_mut().zip( - touch_buff.iter_mut().take(slice_width) - ).for_each(|(t, tb)| { - *t = true; - *tb = false; - }); + touched[ss_start..ss_end] + .iter_mut() + .zip(touch_buff.iter_mut().take(slice_width)) + .for_each(|(t, tb)| { + *t = true; + *tb = false; + }); global_num_touched += num_touchable; const MAX_PEAKS: usize = 10000; if agg_buff.len() > MAX_PEAKS { - log::debug!("Reached limit of the agg buffer at index {}/{}", idx, arr_len); + let curr_loc_int = intensity_array[idx]; + if curr_loc_int > 200.0 { + log::debug!( + "Reached limit of the agg buffer at index {}/{} curr int={}", + idx, + arr_len, + curr_loc_int + ); + } break; } } @@ -267,7 +320,8 @@ fn dumbcentroid_frame(mz_array: &[f32], intensity_array: &[f32], ims_array: &[f3 // Drop the zeros and sort // I could in theory truncate instead of filtering. - let mut result: Vec<(f32, (f32, f32))> = agg_buff.iter() + let mut result: Vec<(f32, (f32, f32))> = agg_buff + .iter() .filter(|&x| x.mz > 0.0 && x.intensity > 0.0) .map(|x| (x.mz, (x.intensity, x.im as f32))) .collect(); @@ -279,4 +333,3 @@ fn dumbcentroid_frame(mz_array: &[f32], intensity_array: &[f32], ims_array: &[f3 result.into_iter().unzip() } - diff --git a/crates/sage/src/spectrum.rs b/crates/sage/src/spectrum.rs index d2a9ff02..af36b874 100644 --- a/crates/sage/src/spectrum.rs +++ b/crates/sage/src/spectrum.rs @@ -277,7 +277,7 @@ pub fn path_compression(peaks: &mut [Deisotoped]) { } } -impl ProcessedSpectrum { +impl ProcessedSpectrum { pub fn extract_ms1_precursor(&self) -> Option<(f32, u8)> { let precursor = self.precursors.first()?; let charge = precursor.charge?; @@ -396,17 +396,28 @@ impl SpectrumProcessor { } pub fn process_with_mobility(&self, spectrum: RawSpectrum) -> ProcessedSpectrum { - assert!(spectrum.ms_level == 1, "Logic error, mobility processing should only be used for MS1"); + assert!( + spectrum.ms_level == 1, + "Logic error, mobility processing should only be used for MS1" + ); let mut peaks = spectrum .mz .iter() - .zip(spectrum.intensity.iter().zip(spectrum.mobility.unwrap().iter())) + .zip( + spectrum + .intensity + .iter() + .zip(spectrum.mobility.unwrap().iter()), + ) .map(|(&mass, (&intensity, &mobility))| { let mass = (mass - PROTON) * 1.0; - IMPeak { mass, intensity, mobility } + IMPeak { + mass, + intensity, + mobility, + } }) .collect::>(); - peaks.sort_by(|a, b| a.mass.total_cmp(&b.mass)); let total_ion_current = peaks.iter().map(|peak| peak.intensity).sum::(); From 916cf2dba10ea89ac4c6424e3afc2899835f7f85 Mon Sep 17 00:00:00 2001 From: "J. Sebastian Paez" Date: Fri, 17 Jan 2025 22:47:50 -0800 Subject: [PATCH 03/11] feat: speedup on database gen (digest grouping) and fmt --- crates/sage/src/database.rs | 39 ++++++++--- crates/sage/src/enzyme.rs | 48 +++++++++++-- crates/sage/src/fasta.rs | 10 +-- crates/sage/src/lfq.rs | 133 ++++++++++++++++++------------------ crates/sage/src/peptide.rs | 14 +++- 5 files changed, 156 insertions(+), 88 deletions(-) diff --git a/crates/sage/src/database.rs b/crates/sage/src/database.rs index 7153cef1..00dbae2b 100644 --- a/crates/sage/src/database.rs +++ b/crates/sage/src/database.rs @@ -1,4 +1,4 @@ -use crate::enzyme::{Enzyme, EnzymeParameters}; +use crate::enzyme::{group_digests, Enzyme, EnzymeParameters}; use crate::fasta::Fasta; use crate::ion_series::{IonSeries, Kind}; use crate::mass::Tolerance; @@ -166,6 +166,15 @@ impl Parameters { // and missed cleavages, if applicable. let digests = fasta.digest(&enzyme); + log::trace!("grouping digests"); + let start_num = digests.len(); + let digests = group_digests(digests); + log::trace!( + "grouped {} digests into {} groups", + start_num, + digests.len() + ); + let mods = self .variable_mods .iter() @@ -175,9 +184,9 @@ impl Parameters { let targets: DashSet<_, FnvBuildHasher> = DashSet::default(); digests .par_iter() - .filter(|digest| !digest.decoy) + .filter(|digest| !digest.reference.decoy) .for_each(|digest| { - targets.insert(digest.sequence.clone().into_bytes()); + targets.insert(digest.reference.sequence.clone().into_bytes()); }); log::trace!("modifying peptides"); @@ -212,6 +221,7 @@ impl Parameters { pub fn reorder_peptides(target_decoys: &mut Vec) { log::trace!("sorting and deduplicating peptides"); + let init_size = target_decoys.len(); // This is equivalent to a stable sort target_decoys.par_sort_unstable_by(|a, b| { a.monoisotopic @@ -219,7 +229,9 @@ impl Parameters { .then_with(|| a.initial_sort(b)) }); target_decoys.dedup_by(|remove, keep| { - if remove.sequence == keep.sequence + if remove.monoisotopic == keep.monoisotopic + // if remove.sequence == keep.sequence + && remove.sequence == keep.sequence && remove.modifications == keep.modifications && remove.nterm == keep.nterm && remove.cterm == keep.cterm @@ -237,9 +249,20 @@ impl Parameters { target_decoys .par_iter_mut() .for_each(|peptide| peptide.proteins.sort_unstable()); + + let num_dropped = init_size - target_decoys.len(); + let tot_proteins = target_decoys + .iter() + .map(|x| x.proteins.len()) + .sum::(); + log::trace!( + "dropped {} t/d pairs, remaining {}, tot_proteins: {}", + num_dropped, + target_decoys.len(), + tot_proteins + ); } - // pub fn build(self) -> Result> { pub fn build(self, fasta: Fasta) -> IndexedDatabase { let target_decoys = self.digest(&fasta); self.build_from_peptides(target_decoys) @@ -612,11 +635,11 @@ mod test { fasta.targets, vec![ ( - Arc::new("sp|AAAAA".to_string()), + Arc::from("sp|AAAAA".to_string()), "MEWKLEQSMREQALLKAQLTQLK".into() ), ( - Arc::new("sp|BBBBB".to_string()), + Arc::from("sp|BBBBB".to_string()), "RMEWKLEQSMREQALLKAQLTQLK".into() ), ] @@ -665,7 +688,7 @@ mod test { // All peptides are shared except for the protein N-term mod for peptide in &peptides[..4] { - assert_eq!(peptide.proteins.len(), 2); + assert_eq!(peptide.proteins.len(), 2, "{:?}", peptide); } // Ensure that this mod is uniquely called as the first protein assert_eq!( diff --git a/crates/sage/src/enzyme.rs b/crates/sage/src/enzyme.rs index 88c7ab5f..d6ff4d37 100644 --- a/crates/sage/src/enzyme.rs +++ b/crates/sage/src/enzyme.rs @@ -18,13 +18,49 @@ pub struct Digest { /// Cleaved peptide sequence pub sequence: String, /// Protein accession - pub protein: Arc, + pub protein: Arc, /// Missed cleavages pub missed_cleavages: u8, /// Is this an N-terminal peptide of the protein? pub position: Position, } +pub struct DigestGroup { + pub reference: Digest, + pub proteins: Vec>, +} + +pub fn group_digests(mut digests: Vec) -> Vec { + let mut groups = Vec::new(); + digests.sort_unstable_by(|a, b| { + a.position + .cmp(&b.position) + .then(a.decoy.cmp(&b.decoy)) + .then(a.sequence.cmp(&b.sequence)) + }); + let mut curr_group = DigestGroup { + reference: digests[0].clone(), + proteins: Vec::new(), + }; + for digest in digests { + if digest.decoy == curr_group.reference.decoy + && digest.position == curr_group.reference.position + && digest.sequence == curr_group.reference.sequence + { + curr_group.proteins.push(digest.protein); + } else { + curr_group.proteins.sort_unstable(); + groups.push(curr_group); + curr_group = DigestGroup { + reference: digest.clone(), + proteins: vec![digest.protein], + }; + } + } + groups.push(curr_group); + groups +} + #[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Hash)] pub enum Position { Nterm, @@ -255,7 +291,7 @@ impl EnzymeParameters { sites.to_vec() } - pub fn digest(&self, sequence: &str, protein: Arc) -> Vec { + pub fn digest(&self, sequence: &str, protein: Arc) -> Vec { let n = sequence.len(); let mut digests = Vec::new(); let mut sites = self.cleavage_sites(sequence); @@ -329,7 +365,7 @@ mod test { sequence: "MADEEK".into(), missed_cleavages: 0, position: Position::Nterm, - protein: Arc::new(String::default()), + protein: Arc::from(String::default()), }, Digest { decoy: false, @@ -337,7 +373,7 @@ mod test { sequence: "MADEEK".into(), missed_cleavages: 0, position: Position::Nterm, - protein: Arc::new(String::default()), + protein: Arc::from(String::default()), }, ]; @@ -352,7 +388,7 @@ mod test { sequence: "MADEEK".into(), missed_cleavages: 0, position: Position::Nterm, - protein: Arc::new(String::default()), + protein: Arc::from(String::default()), }, Digest { decoy: false, @@ -360,7 +396,7 @@ mod test { sequence: "MADEEK".into(), missed_cleavages: 0, position: Position::Internal, - protein: Arc::new(String::default()), + protein: Arc::from(String::default()), }, ]; diff --git a/crates/sage/src/fasta.rs b/crates/sage/src/fasta.rs index ec11ecad..5b2ebbc3 100644 --- a/crates/sage/src/fasta.rs +++ b/crates/sage/src/fasta.rs @@ -4,7 +4,7 @@ use std::sync::Arc; #[derive(Clone)] pub struct Fasta { - pub targets: Vec<(Arc, String)>, + pub targets: Vec<(Arc, String)>, decoy_tag: String, // Should we ignore decoys in the fasta database // and generate them internally? @@ -27,8 +27,8 @@ impl Fasta { let line = line.trim(); if let Some(id) = line.strip_prefix('>') { if !s.is_empty() { - let acc: Arc = - Arc::new(last_id.split_ascii_whitespace().next().unwrap().to_string()); + let acc: Arc = + Arc::from(last_id.split_ascii_whitespace().next().unwrap().to_string()); let seq = std::mem::take(&mut s); if !acc.contains(&decoy_tag) || !generate_decoys { targets.push((acc, seq)); @@ -41,8 +41,8 @@ impl Fasta { } if !s.is_empty() { - let acc: Arc = - Arc::new(last_id.split_ascii_whitespace().next().unwrap().to_string()); + let acc: Arc = + Arc::from(last_id.split_ascii_whitespace().next().unwrap().to_string()); if !acc.contains(&decoy_tag) || !generate_decoys { targets.push((acc, s)); } diff --git a/crates/sage/src/lfq.rs b/crates/sage/src/lfq.rs index 4987d133..e31396c4 100644 --- a/crates/sage/src/lfq.rs +++ b/crates/sage/src/lfq.rs @@ -2,8 +2,8 @@ use crate::database::{binary_search_slice, IndexedDatabase, PeptideIx}; use crate::mass::{composition, Composition, Tolerance, NEUTRON}; use crate::ml::{matrix::Matrix, retention_alignment::Alignment}; use crate::scoring::Feature; -use crate::spectrum::{MS1Spectra, ProcessedSpectrum}; use crate::spectrum; +use crate::spectrum::{MS1Spectra, ProcessedSpectrum}; use dashmap::DashMap; use rayon::prelude::*; use serde::{Deserialize, Serialize}; @@ -190,7 +190,7 @@ struct Query<'a, T> { max_rt: f32, } -impl FeatureMap { +impl FeatureMap { fn rt_slice(&self, rt: f32, rt_tol: f32) -> Query<'_, T> { let (page_lo, page_hi) = binary_search_slice( &self.min_rts, @@ -224,74 +224,70 @@ impl FeatureMap { // TODO: find a good way to abstract this ... I think a macro would be the way to go. match spectra { - MS1Spectra::NoMobility(spectra) => { - spectra.par_iter().for_each(|spectrum| { - let a = alignments[spectrum.file_id]; - let rt = (spectrum.scan_start_time / a.max_rt) * a.slope + a.intercept; - let query = self.rt_slice(rt, RT_TOL); - - for peak in &spectrum.peaks { - for entry in query.mass_lookup(peak.mass) { - let id = match self.settings.combine_charge_states { - true => PrecursorId::Combined(entry.peptide), - false => PrecursorId::Charged((entry.peptide, entry.charge)), - }; - - let mut grid = scores.entry((id, entry.decoy)).or_insert_with(|| { - let p = &db[entry.peptide]; - let composition = p - .sequence - .iter() - .map(|r| composition(*r)) - .sum::(); - let dist = crate::isotopes::peptide_isotopes( - composition.carbon, - composition.sulfur, - ); - Grid::new(entry, RT_TOL, dist, alignments.len(), GRID_SIZE) - }); - - grid.add_entry(rt, entry.isotope, spectrum.file_id, peak.intensity); - } + MS1Spectra::NoMobility(spectra) => spectra.par_iter().for_each(|spectrum| { + let a = alignments[spectrum.file_id]; + let rt = (spectrum.scan_start_time / a.max_rt) * a.slope + a.intercept; + let query = self.rt_slice(rt, RT_TOL); + + for peak in &spectrum.peaks { + for entry in query.mass_lookup(peak.mass) { + let id = match self.settings.combine_charge_states { + true => PrecursorId::Combined(entry.peptide), + false => PrecursorId::Charged((entry.peptide, entry.charge)), + }; + + let mut grid = scores.entry((id, entry.decoy)).or_insert_with(|| { + let p = &db[entry.peptide]; + let composition = p + .sequence + .iter() + .map(|r| composition(*r)) + .sum::(); + let dist = crate::isotopes::peptide_isotopes( + composition.carbon, + composition.sulfur, + ); + Grid::new(entry, RT_TOL, dist, alignments.len(), GRID_SIZE) + }); + + grid.add_entry(rt, entry.isotope, spectrum.file_id, peak.intensity); } - })}, - MS1Spectra::WithMobility(spectra) => { - spectra.par_iter().for_each(|spectrum|{ - let a = alignments[spectrum.file_id]; - let rt = (spectrum.scan_start_time / a.max_rt) * a.slope + a.intercept; - let query = self.rt_slice(rt, RT_TOL); - - for peak in &spectrum.peaks { - for entry in query.mass_mobility_lookup(peak.mass, peak.mobility) { - let id = match self.settings.combine_charge_states { - true => PrecursorId::Combined(entry.peptide), - false => PrecursorId::Charged((entry.peptide, entry.charge)), - }; - - let mut grid = scores.entry((id, entry.decoy)).or_insert_with(|| { - let p = &db[entry.peptide]; - let composition = p - .sequence - .iter() - .map(|r| composition(*r)) - .sum::(); - let dist = crate::isotopes::peptide_isotopes( - composition.carbon, - composition.sulfur, - ); - Grid::new(entry, RT_TOL, dist, alignments.len(), GRID_SIZE) - }); - - grid.add_entry(rt, entry.isotope, spectrum.file_id, peak.intensity); - } + } + }), + MS1Spectra::WithMobility(spectra) => spectra.par_iter().for_each(|spectrum| { + let a = alignments[spectrum.file_id]; + let rt = (spectrum.scan_start_time / a.max_rt) * a.slope + a.intercept; + let query = self.rt_slice(rt, RT_TOL); + + for peak in &spectrum.peaks { + for entry in query.mass_mobility_lookup(peak.mass, peak.mobility) { + let id = match self.settings.combine_charge_states { + true => PrecursorId::Combined(entry.peptide), + false => PrecursorId::Charged((entry.peptide, entry.charge)), + }; + + let mut grid = scores.entry((id, entry.decoy)).or_insert_with(|| { + let p = &db[entry.peptide]; + let composition = p + .sequence + .iter() + .map(|r| composition(*r)) + .sum::(); + let dist = crate::isotopes::peptide_isotopes( + composition.carbon, + composition.sulfur, + ); + Grid::new(entry, RT_TOL, dist, alignments.len(), GRID_SIZE) + }); + + grid.add_entry(rt, entry.isotope, spectrum.file_id, peak.intensity); } - - })}, + } + }), MS1Spectra::Empty => { // Should never be called if no MS1 spectra are present log::warn!("no MS1 spectra found for quantification"); } - }; log::info!("integrating MS1 features"); @@ -681,11 +677,14 @@ impl<'a> Query<'a, PrecursorRange> { }) } - pub fn mass_mobility_lookup(&self, mass: f32, mobility: f32) -> impl Iterator { + pub fn mass_mobility_lookup( + &self, + mass: f32, + mobility: f32, + ) -> impl Iterator { self.mass_lookup(mass).filter(move |precursor| { // TODO: replace this magic number with a patameter. - precursor.mobility >= (mobility - 0.01) - && precursor.mobility <= (mobility + 0.01) + precursor.mobility >= (mobility - 0.01) && precursor.mobility <= (mobility + 0.01) }) } } diff --git a/crates/sage/src/peptide.rs b/crates/sage/src/peptide.rs index 7c731fa6..4c1ec260 100644 --- a/crates/sage/src/peptide.rs +++ b/crates/sage/src/peptide.rs @@ -3,7 +3,7 @@ use std::{collections::HashMap, fmt::Debug, sync::Arc}; use crate::modification::ModificationSpecificity; use crate::{ - enzyme::{Digest, Position}, + enzyme::{Digest, DigestGroup, Position}, mass::{monoisotopic, H2O}, }; use fnv::FnvHashSet; @@ -27,7 +27,7 @@ pub struct Peptide { /// Where is this peptide located in the protein? pub position: Position, - pub proteins: Vec>, + pub proteins: Vec>, } impl Peptide { @@ -344,6 +344,16 @@ pub enum PeptideError { InvalidSequence(String), } +impl TryFrom for Peptide { + type Error = PeptideError; + + fn try_from(value: DigestGroup) -> Result { + let mut pep = Peptide::try_from(value.reference)?; + pep.proteins = value.proteins; + Ok(pep) + } +} + impl TryFrom for Peptide { type Error = PeptideError; From 8fb36f808c839b84909aa9365311eee9463e6331 Mon Sep 17 00:00:00 2001 From: "J. Sebastian Paez" Date: Sat, 18 Jan 2025 00:06:15 -0800 Subject: [PATCH 04/11] chore: removed unnecessary generic and lint --- crates/sage-cli/src/output.rs | 2 +- crates/sage-cloudpath/src/tdf.rs | 18 ++++++------------ crates/sage/src/lfq.rs | 21 ++++++++++----------- 3 files changed, 17 insertions(+), 24 deletions(-) diff --git a/crates/sage-cli/src/output.rs b/crates/sage-cli/src/output.rs index 8414dffc..a92ffb45 100644 --- a/crates/sage-cli/src/output.rs +++ b/crates/sage-cli/src/output.rs @@ -1,5 +1,5 @@ use rayon::prelude::*; -use sage_core::spectrum::{MS1Spectra, ProcessedSpectrum}; +use sage_core::spectrum::MS1Spectra; use sage_core::{scoring::Feature, tmt::TmtQuant}; #[derive(Default)] diff --git a/crates/sage-cloudpath/src/tdf.rs b/crates/sage-cloudpath/src/tdf.rs index f8370668..6942b3c6 100644 --- a/crates/sage-cloudpath/src/tdf.rs +++ b/crates/sage-cloudpath/src/tdf.rs @@ -86,10 +86,10 @@ impl TdfReader { .partial_cmp(&mz[j]) .unwrap_or(std::cmp::Ordering::Equal) }); - let sorted_mz: Vec = indices.iter().map(|&i| mz[i].clone()).collect(); + let sorted_mz: Vec = indices.iter().map(|&i| mz[i]).collect(); let sorted_inten: Vec = - indices.iter().map(|&i| intensity[i].clone()).collect(); - let sorted_imss: Vec = indices.iter().map(|&i| imss[i].clone()).collect(); + indices.iter().map(|&i| intensity[i]).collect(); + let sorted_imss: Vec = indices.iter().map(|&i| imss[i]).collect(); // Squash the mobility dimension let tol_ppm = 15.0; @@ -179,14 +179,8 @@ impl TdfReader { fn parse_precursor(dda_precursor: timsrust::Precursor) -> Precursor { let mut precursor: Precursor = Precursor::default(); precursor.mz = dda_precursor.mz as f32; - precursor.charge = match dda_precursor.charge { - Some(x) => Some(x as u8), - None => None, - }; - precursor.intensity = match dda_precursor.intensity { - Some(x) => Some(x as f32), - None => None, - }; + precursor.charge = dda_precursor.charge.map(|x| x as u8); + precursor.intensity = dda_precursor.intensity.map(|x| x as f32); precursor.spectrum_ref = Option::from(dda_precursor.frame_index.to_string()); precursor.inverse_ion_mobility = Option::from(dda_precursor.im as f32); precursor @@ -323,7 +317,7 @@ fn dumbcentroid_frame( let mut result: Vec<(f32, (f32, f32))> = agg_buff .iter() .filter(|&x| x.mz > 0.0 && x.intensity > 0.0) - .map(|x| (x.mz, (x.intensity, x.im as f32))) + .map(|x| (x.mz, (x.intensity, x.im))) .collect(); result.sort_unstable_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(Ordering::Equal)); diff --git a/crates/sage/src/lfq.rs b/crates/sage/src/lfq.rs index e31396c4..8e698553 100644 --- a/crates/sage/src/lfq.rs +++ b/crates/sage/src/lfq.rs @@ -2,8 +2,7 @@ use crate::database::{binary_search_slice, IndexedDatabase, PeptideIx}; use crate::mass::{composition, Composition, Tolerance, NEUTRON}; use crate::ml::{matrix::Matrix, retention_alignment::Alignment}; use crate::scoring::Feature; -use crate::spectrum; -use crate::spectrum::{MS1Spectra, ProcessedSpectrum}; +use crate::spectrum::MS1Spectra; use dashmap::DashMap; use rayon::prelude::*; use serde::{Deserialize, Serialize}; @@ -80,8 +79,8 @@ pub struct PrecursorRange { /// Create a data structure analogous to [`IndexedDatabase`] - instaed of /// storing fragment masses binned by precursor mass, store MS1 precursors /// binned by RT - This should enable rapid quantification as well -pub struct FeatureMap { - pub ranges: Vec, +pub struct FeatureMap { + pub ranges: Vec, pub min_rts: Vec, pub bin_size: usize, pub settings: LfqSettings, @@ -91,7 +90,7 @@ pub fn build_feature_map( settings: LfqSettings, precursor_charge: (u8, u8), features: &[Feature], -) -> FeatureMap { +) -> FeatureMap { let map: DashMap = DashMap::default(); features .iter() @@ -181,8 +180,8 @@ pub fn build_feature_map( } } -struct Query<'a, T> { - ranges: &'a [T], +struct Query<'a> { + ranges: &'a [PrecursorRange], page_lo: usize, page_hi: usize, bin_size: usize, @@ -190,8 +189,8 @@ struct Query<'a, T> { max_rt: f32, } -impl FeatureMap { - fn rt_slice(&self, rt: f32, rt_tol: f32) -> Query<'_, T> { +impl FeatureMap { + fn rt_slice(&self, rt: f32, rt_tol: f32) -> Query<'_> { let (page_lo, page_hi) = binary_search_slice( &self.min_rts, |rt, x| rt.total_cmp(x), @@ -210,7 +209,7 @@ impl FeatureMap { } } -impl FeatureMap { +impl FeatureMap { /// Run label-free quantification module pub fn quantify( &self, @@ -648,7 +647,7 @@ fn convolve(slice: &[f64], kernel: &[f64]) -> Vec { .collect() } -impl<'a> Query<'a, PrecursorRange> { +impl Query<'_> { pub fn mass_lookup(&self, mass: f32) -> impl Iterator { (self.page_lo..self.page_hi).flat_map(move |page| { let left_idx = page * self.bin_size; From 677c6f03493e0631e10a950a1562cbcff56c4188 Mon Sep 17 00:00:00 2001 From: "J. Sebastian Paez" Date: Sat, 18 Jan 2025 17:02:03 -0800 Subject: [PATCH 05/11] feat: parameter propagation for mobility tolerance --- CHANGELOG.md | 2 + crates/sage-cli/src/input.rs | 9 ++ crates/sage-cli/src/output.rs | 8 +- crates/sage-cli/src/runner.rs | 20 ++-- crates/sage-cloudpath/src/tdf.rs | 122 ++++++++++++---------- crates/sage/src/database.rs | 17 ++- crates/sage/src/lfq.rs | 15 ++- crates/sage/src/mass.rs | 7 ++ crates/sage/src/ml/linear_discriminant.rs | 4 +- 9 files changed, 120 insertions(+), 84 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 798faa41..b0a4a080 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [v0.15.0-alpha] (unreleased) ### Added +- Initial support for LFQ on data with ion mobility. +- Speedup on the generation of databases when large number of peptides are redundant. - Initial support for searching diaPASEF data - `override_precursor_charge` setting that forces multiple charge states to be searched ### Breaking Changes diff --git a/crates/sage-cli/src/input.rs b/crates/sage-cli/src/input.rs index f19c8a66..0b3ac198 100644 --- a/crates/sage-cli/src/input.rs +++ b/crates/sage-cli/src/input.rs @@ -80,6 +80,7 @@ pub struct LfqOptions { pub integration: Option, pub spectral_angle: Option, pub ppm_tolerance: Option, + pub mobility_pct_tolerance: Option, pub combine_charge_states: Option, } @@ -91,6 +92,7 @@ impl From for LfqSettings { integration: value.integration.unwrap_or(default.integration), spectral_angle: value.spectral_angle.unwrap_or(default.spectral_angle).abs(), ppm_tolerance: value.ppm_tolerance.unwrap_or(default.ppm_tolerance).abs(), + mobility_pct_tolerance: value.mobility_pct_tolerance.unwrap_or(default.mobility_pct_tolerance), combine_charge_states: value .combine_charge_states .unwrap_or(default.combine_charge_states), @@ -98,6 +100,12 @@ impl From for LfqSettings { if settings.ppm_tolerance > 20.0 { log::warn!("lfq_settings.ppm_tolerance is higher than expected"); } + if settings.mobility_pct_tolerance > 4.0 { + log::warn!("lfq_settings.mobility_pct_tolerance is higher than expected"); + } + if settings.mobility_pct_tolerance < 0.05 { + log::warn!("lfq_settings.mobility_pct_tolerance is smaller than expected"); + } if settings.spectral_angle < 0.50 { log::warn!("lfq_settings.spectral_angle is lower than expected"); } @@ -221,6 +229,7 @@ impl Input { fn check_tolerances(tolerance: &Tolerance) { let (lo, hi) = match tolerance { Tolerance::Ppm(lo, hi) => (*lo, *hi), + Tolerance::Pct(_, _) => unreachable!("Pct tolerance should never be used on mz"), Tolerance::Da(lo, hi) => (*lo, *hi), }; if hi.abs() > lo.abs() { diff --git a/crates/sage-cli/src/output.rs b/crates/sage-cli/src/output.rs index a92ffb45..1efdcaab 100644 --- a/crates/sage-cli/src/output.rs +++ b/crates/sage-cli/src/output.rs @@ -40,7 +40,11 @@ impl FromParallelIterator for SageResults { acc.ms1 = MS1Spectra::NoMobility(a); } _ => { - unreachable!() + // In theory this can happen if someone is searching + // together files of different types, mixing the ones + // that support IMS and the ones that dont. + // ... I dont think this should be run-time recoverable. + unreachable!("Found combination of MS1 spectra with and without mobility.") } }; acc @@ -79,7 +83,7 @@ impl FromIterator for SageResults { acc.ms1 = MS1Spectra::NoMobility(a); } _ => { - unreachable!() + unreachable!("Found combination of MS1 spectra with and without mobility.") } }; acc diff --git a/crates/sage-cli/src/runner.rs b/crates/sage-cli/src/runner.rs index 56a58f7e..ad503830 100644 --- a/crates/sage-cli/src/runner.rs +++ b/crates/sage-cli/src/runner.rs @@ -37,10 +37,12 @@ impl FromParallelIterator for RawSpectrumAccumulator { where I: IntoParallelIterator, { - let out = par_iter + + + par_iter .into_par_iter() .fold( - || RawSpectrumAccumulator::default(), + RawSpectrumAccumulator::default, |mut accum, spectrum| { if spectrum.ms_level == 1 { accum.ms1.push(spectrum); @@ -51,15 +53,13 @@ impl FromParallelIterator for RawSpectrumAccumulator { }, ) .reduce( - || RawSpectrumAccumulator::default(), + RawSpectrumAccumulator::default, |mut a, b| { a.ms1.extend(b.ms1); a.msn.extend(b.msn); a }, - ); - - out + ) } } @@ -393,8 +393,10 @@ impl Runner { let all_contain_ims = spectra.ms1.iter().all(|x| x.mobility.is_some()); let ms1_empty = spectra.ms1.is_empty(); let ms1_spectra = if ms1_empty { + log::trace!("no MS1 spectra found"); MS1Spectra::Empty } else if all_contain_ims { + log::trace!("Processing MS1 spectra with IMS"); let spectra = spectra .ms1 .into_iter() @@ -402,6 +404,7 @@ impl Runner { .collect(); MS1Spectra::WithMobility(spectra) } else { + log::trace!("Processing MS1 spectra without IMS"); let spectra = spectra.ms1.into_iter().map(|s| sp.process(s)).collect(); MS1Spectra::NoMobility(spectra) }; @@ -481,6 +484,7 @@ impl Runner { let areas = alignments.and_then(|alignments| { if self.parameters.quant.lfq { + log::trace!("performing LFQ"); let mut areas = sage_core::lfq::build_feature_map( self.parameters.quant.lfq_settings, self.parameters.precursor_charge, @@ -875,9 +879,7 @@ impl Runner { record.push_field( itoa::Buffer::new() .format( - (feature.charge < 2 || feature.charge > 6) - .then_some(feature.charge) - .unwrap_or(0), + if feature.charge < 2 || feature.charge > 6 { feature.charge } else { 0 }, ) .as_bytes(), ); diff --git a/crates/sage-cloudpath/src/tdf.rs b/crates/sage-cloudpath/src/tdf.rs index 6942b3c6..18312715 100644 --- a/crates/sage-cloudpath/src/tdf.rs +++ b/crates/sage-cloudpath/src/tdf.rs @@ -187,6 +187,17 @@ impl TdfReader { } } + +/// Centroiding of the IM-containing spectra +/// +/// This is a very rudimentary centroiding algorithm but... it seems to work well. +/// It iterativelty goes over the peaks in decreasing intensity order and +/// accumulates the intensity of the peaks surrounding the peak. (sort of +/// like the first pass in dbscan). +/// +/// This dramatically reduces the number of peaks in the spectra +/// which saves a ton of memory and time when doing LFQ, since we +/// iterate over each peak. fn dumbcentroid_frame( mz_array: &[f32], intensity_array: &[f32], @@ -195,11 +206,12 @@ fn dumbcentroid_frame( im_tol_pct: f32, ) -> (Vec, (Vec, Vec)) { // Make sure the mz array is sorted - assert!(mz_array.windows(2).all(|x| x[0] <= x[1])); + // In theory I could use the type system to enforce this but I dont + // think it is worth it, its not that slow and its simple. + assert!(mz_array.windows(2).all(|x| x[0] <= x[1]), "mz_array is not sorted"); let arr_len = mz_array.len(); - let mut touched = vec![false; arr_len]; - let mut global_num_touched = 0; + let mut global_num_included = 0; let mut order: Vec = (0..arr_len).collect(); order.sort_unstable_by(|&a, &b| { @@ -208,21 +220,23 @@ fn dumbcentroid_frame( .unwrap_or(Ordering::Equal) }); - #[derive(Debug, Clone, Copy, Default)] + #[derive(Clone, Copy)] struct ImsPeak { mz: f32, intensity: f32, im: f32, } let mut agg_buff = Vec::with_capacity(10_000.min(arr_len)); - const TOUCH_BUFF_SIZE: usize = 1000; - let mut touch_buff = [false; TOUCH_BUFF_SIZE]; + // TODO: Optimize the size of this buffer. + const INCLUDE_BUFF_SIZE: usize = 1000; + let mut included = vec![false; arr_len]; + let mut include_buff = [false; INCLUDE_BUFF_SIZE]; let utol = mz_tol_ppm / 1e6; let im_tol = im_tol_pct / 100.0; for &idx in &order { - if touched[idx] { + if included[idx] { continue; } @@ -242,23 +256,19 @@ fn dumbcentroid_frame( // Here we just handle those edge cases by making sure the 'center' // will be aggregated. - let new_ss_start = idx.saturating_sub(TOUCH_BUFF_SIZE / 2); - let new_ss_end = new_ss_start + TOUCH_BUFF_SIZE - 1; + let new_ss_start = idx.saturating_sub(INCLUDE_BUFF_SIZE / 2); + let new_ss_end = new_ss_start + INCLUDE_BUFF_SIZE - 1; // TODO: make a better warning message here. log::warn!( "More than {} points are in the mz range of a point, limiting span", - TOUCH_BUFF_SIZE + INCLUDE_BUFF_SIZE ); ss_start = new_ss_start; ss_end = new_ss_end; slice_width = ss_end - ss_start; } - let local_num_touched = touched[ss_start..ss_end].iter().filter(|&&x| x).count(); - let local_num_untouched = slice_width - local_num_touched; - - if local_num_touched == slice_width { - continue; - } + let local_num_included = included[ss_start..ss_end].iter().filter(|&&x| x).count(); + let local_num_unincluded = slice_width - local_num_included; let abs_im_tol = im * im_tol; let left_im = im - abs_im_tol; @@ -266,64 +276,60 @@ fn dumbcentroid_frame( let mut curr_intensity = 0.0; - let mut num_touchable = 0; + let mut num_includable = 0; for i in ss_start..ss_end { let im_i = ims_array[i]; - if !touched[i] && intensity_array[i] > 0.0 && im_i >= left_im && im_i <= right_im { + if !included[i] && intensity_array[i] > 0.0 && im_i >= left_im && im_i <= right_im { curr_intensity += intensity_array[i]; - num_touchable += 1; - touch_buff[i - ss_start] = true; + num_includable += 1; + include_buff[i - ss_start] = true; } } - if curr_intensity > 0.0 { - agg_buff.push(ImsPeak { - mz, - intensity: curr_intensity, - im, + agg_buff.push(ImsPeak { + mz, + intensity: curr_intensity, + im, + }); + included[ss_start..ss_end] + .iter_mut() + .zip(include_buff.iter_mut().take(slice_width)) + .for_each(|(t, tb)| { + *t = true; + *tb = false; }); - touched[ss_start..ss_end] - .iter_mut() - .zip(touch_buff.iter_mut().take(slice_width)) - .for_each(|(t, tb)| { - *t = true; - *tb = false; - }); - global_num_touched += num_touchable; - const MAX_PEAKS: usize = 10000; - if agg_buff.len() > MAX_PEAKS { - let curr_loc_int = intensity_array[idx]; - if curr_loc_int > 200.0 { - log::debug!( - "Reached limit of the agg buffer at index {}/{} curr int={}", - idx, - arr_len, - curr_loc_int - ); - } - break; + global_num_included += num_includable; + const MAX_PEAKS: usize = 10000; + if agg_buff.len() > MAX_PEAKS { + let curr_loc_int = intensity_array[idx]; + if curr_loc_int > 200.0 { + log::debug!( + "Reached limit of the agg buffer at index {}/{} curr int={}", + idx, + arr_len, + curr_loc_int + ); } + break; } - if global_num_touched == arr_len { + if global_num_included == arr_len { break; } } - assert!(touch_buff.iter().all(|x| !x), "{:?}", touch_buff); - - // Drop the zeros and sort - // I could in theory truncate instead of filtering. - let mut result: Vec<(f32, (f32, f32))> = agg_buff - .iter() - .filter(|&x| x.mz > 0.0 && x.intensity > 0.0) - .map(|x| (x.mz, (x.intensity, x.im))) - .collect(); + // This just makes sure that I am correctly reseting + // the buffer on each iteration. + assert!(include_buff.iter().all(|x| !x), "{:?}", include_buff); - result.sort_unstable_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(Ordering::Equal)); + agg_buff.sort_unstable_by(|a, b| a.mz.partial_cmp(&b.mz).unwrap()); // println!("Centroiding: Start len: {}; end len: {};", arr_len, result.len()); // Ultra data is usually start: 40k end 10k, - // HT2 data is usually start 400k end 40k + // HT2 data is usually start 400k end 40k, limiting to 10k + // rarely leaves peaks with intensity > 200 ... ive never seen + // it happen. - result.into_iter().unzip() + agg_buff.into_iter() + .map(|x| (x.mz, (x.intensity, x.im))) + .unzip() } diff --git a/crates/sage/src/database.rs b/crates/sage/src/database.rs index 00dbae2b..506190ad 100644 --- a/crates/sage/src/database.rs +++ b/crates/sage/src/database.rs @@ -230,7 +230,6 @@ impl Parameters { }); target_decoys.dedup_by(|remove, keep| { if remove.monoisotopic == keep.monoisotopic - // if remove.sequence == keep.sequence && remove.sequence == keep.sequence && remove.modifications == keep.modifications && remove.nterm == keep.nterm @@ -251,15 +250,10 @@ impl Parameters { .for_each(|peptide| peptide.proteins.sort_unstable()); let num_dropped = init_size - target_decoys.len(); - let tot_proteins = target_decoys - .iter() - .map(|x| x.proteins.len()) - .sum::(); log::trace!( - "dropped {} t/d pairs, remaining {}, tot_proteins: {}", + "dropped {} t/d pairs, remaining {}", num_dropped, target_decoys.len(), - tot_proteins ); } @@ -286,14 +280,14 @@ impl Parameters { .flat_map(|kind| IonSeries::new(peptide, *kind).enumerate()) .filter(|(ion_idx, ion)| { // Don't store b1, b2, y1, y2 ions for preliminary scoring - let ion_idx_filter = match ion.kind { + + match ion.kind { Kind::A | Kind::B | Kind::C => (ion_idx + 1) > self.min_ion_index, Kind::X | Kind::Y | Kind::Z => { peptide.sequence.len().saturating_sub(1) - ion_idx > self.min_ion_index } - }; - ion_idx_filter + } }) .map(move |(_, ion)| Theoretical { peptide_index: PeptideIx(idx as u32), @@ -481,7 +475,7 @@ pub struct IndexedQuery<'d> { pub pre_idx_hi: usize, } -impl<'d> IndexedQuery<'d> { +impl IndexedQuery<'_> { /// Search for a specified `fragment_mz` within the database pub fn page_search(&self, fragment_mz: f32, charge: u8) -> impl Iterator { let mass = fragment_mz * charge as f32; @@ -490,6 +484,7 @@ impl<'d> IndexedQuery<'d> { // - relative tolerance needs to be proportionally decreased let tol = match self.fragment_tol { Tolerance::Ppm(lo, hi) => Tolerance::Ppm(lo / charge as f32, hi / charge as f32), + Tolerance::Pct(_, _) => unreachable!("Pct tolerance should never be used on mz"), Tolerance::Da(_, _) => self.fragment_tol, }; diff --git a/crates/sage/src/lfq.rs b/crates/sage/src/lfq.rs index 8e698553..f6f80713 100644 --- a/crates/sage/src/lfq.rs +++ b/crates/sage/src/lfq.rs @@ -48,6 +48,7 @@ pub struct LfqSettings { pub integration: IntegrationStrategy, pub spectral_angle: f64, pub ppm_tolerance: f32, + pub mobility_pct_tolerance: f32, pub combine_charge_states: bool, } @@ -58,6 +59,7 @@ impl Default for LfqSettings { integration: IntegrationStrategy::Sum, spectral_angle: 0.70, ppm_tolerance: 5.0, + mobility_pct_tolerance: 1.0, combine_charge_states: true, } } @@ -68,7 +70,8 @@ pub struct PrecursorRange { pub rt: f32, pub mass_lo: f32, pub mass_hi: f32, - pub mobility: f32, + pub mobility_lo: f32, + pub mobility_hi: f32, pub charge: u8, pub isotope: usize, pub peptide: PeptideIx, @@ -103,6 +106,9 @@ pub fn build_feature_map( // } else { // feat.calcmass // }; + let (mobility_lo, mobility_hi) = + Tolerance::Pct(-settings.mobility_pct_tolerance, settings.mobility_pct_tolerance) + .bounds(feat.ims); map.insert( feat.peptide_idx, PrecursorRange { @@ -113,7 +119,8 @@ pub fn build_feature_map( charge: feat.charge, isotope: 0, file_id: feat.file_id, - mobility: feat.ims, + mobility_lo, + mobility_hi, decoy: false, }, ); @@ -172,6 +179,8 @@ pub fn build_feature_map( }) .collect::>(); + log::trace!("building feature map"); + println!("First feature map entry: {:?}", ranges.first()); FeatureMap { ranges, min_rts, @@ -683,7 +692,7 @@ impl Query<'_> { ) -> impl Iterator { self.mass_lookup(mass).filter(move |precursor| { // TODO: replace this magic number with a patameter. - precursor.mobility >= (mobility - 0.01) && precursor.mobility <= (mobility + 0.01) + (precursor.mobility_hi >= mobility) && (precursor.mobility_lo <= mobility) }) } } diff --git a/crates/sage/src/mass.rs b/crates/sage/src/mass.rs index e168ea95..c3b50eb4 100644 --- a/crates/sage/src/mass.rs +++ b/crates/sage/src/mass.rs @@ -11,6 +11,7 @@ pub const NH3: f32 = 17.026548; #[serde(rename_all = "lowercase")] pub enum Tolerance { Ppm(f32, f32), + Pct(f32, f32), Da(f32, f32), } @@ -24,6 +25,11 @@ impl Tolerance { let delta_hi = center * hi / 1_000_000.0; (center + delta_lo, center + delta_hi) } + Tolerance::Pct(lo, hi) => { + let delta_lo = center * lo / 100.0; + let delta_hi = center * hi / 100.0; + (center + delta_lo, center + delta_hi) + } Tolerance::Da(lo, hi) => (center + lo, center + hi), } } @@ -44,6 +50,7 @@ impl Mul for Tolerance { fn mul(self, rhs: f32) -> Self::Output { match self { Tolerance::Ppm(lo, hi) => Tolerance::Ppm(lo * rhs, hi * rhs), + Tolerance::Pct(lo, hi) => Tolerance::Pct(lo * rhs, hi * rhs), Tolerance::Da(lo, hi) => Tolerance::Da(lo * rhs, hi * rhs), } } diff --git a/crates/sage/src/ml/linear_discriminant.rs b/crates/sage/src/ml/linear_discriminant.rs index b399b82c..57745945 100644 --- a/crates/sage/src/ml/linear_discriminant.rs +++ b/crates/sage/src/ml/linear_discriminant.rs @@ -42,7 +42,7 @@ const FEATURE_NAMES: [&str; FEATURES] = [ struct Features<'a>(&'a [f64]); -impl<'a> std::fmt::Debug for Features<'a> { +impl std::fmt::Debug for Features<'_> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_map() .entries(FEATURE_NAMES.iter().zip(self.0)) @@ -131,11 +131,13 @@ pub fn score_psms(scores: &mut [Feature], precursor_tol: Tolerance) -> Option<() let mass_error = match precursor_tol { Tolerance::Ppm(_, _) => |feat: &Feature| feat.delta_mass as f64, + Tolerance::Pct(_, _) => unreachable!("Pct tolerance should never be used on mz"), Tolerance::Da(_, _) => |feat: &Feature| (feat.expmass - feat.calcmass) as f64, }; let (bw_adjust, bin_size) = match precursor_tol { Tolerance::Ppm(lo, hi) => (2.0f64, (hi - lo).max(100.0)), + Tolerance::Pct(_, _) => unreachable!("Pct tolerance should never be used on mz"), Tolerance::Da(lo, hi) => (0.1f64, (hi - lo).max(1000.0)), }; From 939796301026a11e42d648e514e10ed96585e0c2 Mon Sep 17 00:00:00 2001 From: "J. Sebastian Paez" Date: Sat, 18 Jan 2025 17:09:35 -0800 Subject: [PATCH 06/11] oops: removed print debug line --- crates/sage/src/lfq.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/sage/src/lfq.rs b/crates/sage/src/lfq.rs index f6f80713..5c523a8e 100644 --- a/crates/sage/src/lfq.rs +++ b/crates/sage/src/lfq.rs @@ -180,7 +180,6 @@ pub fn build_feature_map( .collect::>(); log::trace!("building feature map"); - println!("First feature map entry: {:?}", ranges.first()); FeatureMap { ranges, min_rts, From f508f0e3f0ca3ea64846d6b94b54c5f0047a13bd Mon Sep 17 00:00:00 2001 From: "J. Sebastian Paez" Date: Sat, 18 Jan 2025 17:51:38 -0800 Subject: [PATCH 07/11] chore: code cleanup and timsrust update --- crates/sage-cloudpath/Cargo.toml | 2 +- crates/sage-cloudpath/src/tdf.rs | 70 ++++++++++++++++++-------------- 2 files changed, 40 insertions(+), 32 deletions(-) diff --git a/crates/sage-cloudpath/Cargo.toml b/crates/sage-cloudpath/Cargo.toml index bd0a98c1..d0090c7c 100644 --- a/crates/sage-cloudpath/Cargo.toml +++ b/crates/sage-cloudpath/Cargo.toml @@ -23,7 +23,7 @@ log = "0.4.0" once_cell = "1.0" tokio = { version = "1.0", features = ["fs", "io-util", "rt", "macros"] } quick-xml = { version = "0.31.0", features = ["async-tokio"] } -timsrust = { version = "0.4.1"} +timsrust = { version = "0.4.2"} rayon = "1.5" reqwest = { version = "0.11", features = ["json", "rustls-tls"], default-features = false } regex = "1.6" diff --git a/crates/sage-cloudpath/src/tdf.rs b/crates/sage-cloudpath/src/tdf.rs index 18312715..103bed4d 100644 --- a/crates/sage-cloudpath/src/tdf.rs +++ b/crates/sage-cloudpath/src/tdf.rs @@ -4,7 +4,7 @@ use sage_core::{ spectrum::{Precursor, RawSpectrum, Representation}, }; use std::cmp::Ordering; -use timsrust::converters::ConvertableDomain; +use timsrust::converters::{ConvertableDomain, Scan2ImConverter}; use timsrust::readers::SpectrumReader; pub use timsrust::readers::SpectrumReaderConfig as BrukerSpectrumProcessor; @@ -24,7 +24,7 @@ impl TdfReader { .finalize()?; let mut spectra = self.read_msn_spectra(file_id, &spectrum_reader)?; if requires_ms1 { - let ms1s = self.read_ms1_spectra(&path_name, file_id, &spectrum_reader)?; + let ms1s = self.read_ms1_spectra(&path_name, file_id)?; spectra.extend(ms1s); } @@ -35,7 +35,6 @@ impl TdfReader { &self, path_name: impl AsRef, file_id: usize, - spectrum_reader: &SpectrumReader, ) -> Result, timsrust::TimsRustError> { let start = std::time::Instant::now(); let frame_reader = timsrust::readers::FrameReader::new(path_name.as_ref())?; @@ -53,31 +52,11 @@ impl TdfReader { .iter() .map(|&x| mz_converter.convert(x as f64) as f32) .collect(); - let intensity: Vec = frame.intensities.iter().map(|&x| x as f32).collect(); - let mut imss: Vec = vec![0.0; mz.len()]; - // TODO: This is getting pretty big ... I should refactor this block. - frame - .scan_offsets - .windows(2) - .enumerate() - .map(|(i, w)| { - let num = w[1] - w[0]; - if num == 0 { - return None; - } - let lo = w[0]; - let hi = w[1]; - - let im = ims_converter.convert(i as f64) as f32; - Some((im, lo, hi)) - }) - .for_each(|x| { - if let Some((im, lo, hi)) = x { - for i in lo..hi { - imss[i] = im; - } - } - }); + let corr_factor = frame.intensity_correction_factor as f32; + let intensity: Vec = frame.intensities.iter().map(|&x| x as f32 * corr_factor).collect(); + let imss = Self::expand_mobility(&frame.scan_offsets, &ims_converter); + assert_eq!(mz.len(), intensity.len(), "{:?}", frame); + assert_eq!(mz.len(), imss.len(), "{:?}", frame); // Sort the mzs and intensities by mz let mut indices: Vec = (0..mz.len()).collect(); @@ -103,7 +82,7 @@ impl TdfReader { im_tol_pct, ); - let scan_start_time = frame.rt as f32 / 60.0; + let scan_start_time = frame.rt_in_seconds as f32 / 60.0; let ion_injection_time = 100.0; // This is made up, in theory we can read // if from the tdf file let total_ion_current = sorted_inten.iter().sum::(); @@ -185,6 +164,37 @@ impl TdfReader { precursor.inverse_ion_mobility = Option::from(dda_precursor.im as f32); precursor } + + fn expand_mobility(scan_offsets: &[usize], ims_converter: &Scan2ImConverter) -> Vec { + let capacity = match scan_offsets.last() { + Some(&x) => x, + None => return vec![], + }; + let mut imss: Vec = vec![0.0; capacity]; + // TODO: This is getting pretty big ... I should refactor this block. + scan_offsets + .windows(2) + .enumerate() + .map(|(i, w)| { + let num = w[1] - w[0]; + if num == 0 { + return None; + } + let lo = w[0]; + let hi = w[1]; + + let im = ims_converter.convert(i as f64) as f32; + Some((im, lo, hi)) + }) + .for_each(|x| { + if let Some((im, lo, hi)) = x { + for i in lo..hi { + imss[i] = im; + } + } + }); + imss + } } @@ -267,8 +277,6 @@ fn dumbcentroid_frame( ss_end = new_ss_end; slice_width = ss_end - ss_start; } - let local_num_included = included[ss_start..ss_end].iter().filter(|&&x| x).count(); - let local_num_unincluded = slice_width - local_num_included; let abs_im_tol = im * im_tol; let left_im = im - abs_im_tol; From e625e2994a59ee14edcc58f8dcfff26a428c3f20 Mon Sep 17 00:00:00 2001 From: "J. Sebastian Paez" Date: Sun, 19 Jan 2025 22:54:20 -0800 Subject: [PATCH 08/11] fix,refactor: error on excessive removal of points when centroiding ims data --- crates/sage-cloudpath/src/tdf.rs | 96 +++++++++++--------------------- crates/sage/src/spectrum.rs | 1 - 2 files changed, 34 insertions(+), 63 deletions(-) diff --git a/crates/sage-cloudpath/src/tdf.rs b/crates/sage-cloudpath/src/tdf.rs index 103bed4d..96a4ec68 100644 --- a/crates/sage-cloudpath/src/tdf.rs +++ b/crates/sage-cloudpath/src/tdf.rs @@ -58,7 +58,7 @@ impl TdfReader { assert_eq!(mz.len(), intensity.len(), "{:?}", frame); assert_eq!(mz.len(), imss.len(), "{:?}", frame); - // Sort the mzs and intensities by mz + // Sort the mzs, imss and intensities by mz let mut indices: Vec = (0..mz.len()).collect(); indices.sort_by(|&i, &j| { mz[i] @@ -75,9 +75,9 @@ impl TdfReader { let im_tol_pct = 2.0; let (mz, (intensity, mobility)): (Vec, (Vec, Vec)) = dumbcentroid_frame( - &sorted_mz, - &sorted_inten, - &sorted_imss, + sorted_mz, + sorted_inten, + sorted_imss, tol_ppm, im_tol_pct, ); @@ -85,7 +85,7 @@ impl TdfReader { let scan_start_time = frame.rt_in_seconds as f32 / 60.0; let ion_injection_time = 100.0; // This is made up, in theory we can read // if from the tdf file - let total_ion_current = sorted_inten.iter().sum::(); + let total_ion_current = intensity.iter().sum::(); let id = frame.index.to_string(); let spec = RawSpectrum { @@ -209,9 +209,9 @@ impl TdfReader { /// which saves a ton of memory and time when doing LFQ, since we /// iterate over each peak. fn dumbcentroid_frame( - mz_array: &[f32], - intensity_array: &[f32], - ims_array: &[f32], + mz_array: Vec, + mut intensity_array: Vec, + ims_array: Vec, mz_tol_ppm: f32, im_tol_pct: f32, ) -> (Vec, (Vec, Vec)) { @@ -236,19 +236,32 @@ fn dumbcentroid_frame( intensity: f32, im: f32, } - let mut agg_buff = Vec::with_capacity(10_000.min(arr_len)); - // TODO: Optimize the size of this buffer. - const INCLUDE_BUFF_SIZE: usize = 1000; - let mut included = vec![false; arr_len]; - let mut include_buff = [false; INCLUDE_BUFF_SIZE]; + const MAX_PEAKS: usize = 10_000; + let mut agg_buff = Vec::with_capacity(MAX_PEAKS.min(arr_len)); let utol = mz_tol_ppm / 1e6; let im_tol = im_tol_pct / 100.0; for &idx in &order { - if included[idx] { + if intensity_array[idx] <= 0.0 { + // In theory ... if I set the intensity as mutable + // I could remove the use of the 'included' vector + // and just set to -1.0 the intensity of the peaks + // I have already included. continue; } + if agg_buff.len() > MAX_PEAKS { + let curr_loc_int = intensity_array[idx]; + if curr_loc_int > 200.0 { + log::debug!( + "Reached limit of the agg buffer at index {}/{} curr int={}", + idx, + arr_len, + curr_loc_int + ); + } + break; + } let mz = mz_array[idx]; let im = ims_array[idx]; @@ -256,27 +269,8 @@ fn dumbcentroid_frame( let left_e = mz - da_tol; let right_e = mz + da_tol; - let mut ss_start = mz_array.partition_point(|&x| x < left_e); - let mut ss_end = mz_array.partition_point(|&x| x <= right_e); - - let mut slice_width = ss_end - ss_start; - if slice_width >= 1000 { - // It is EXCEEDINGLY UNLIKELY that more than 1000 points - // will be aggregated or in the range of a single mz peak. - // Here we just handle those edge cases by making sure the 'center' - // will be aggregated. - - let new_ss_start = idx.saturating_sub(INCLUDE_BUFF_SIZE / 2); - let new_ss_end = new_ss_start + INCLUDE_BUFF_SIZE - 1; - // TODO: make a better warning message here. - log::warn!( - "More than {} points are in the mz range of a point, limiting span", - INCLUDE_BUFF_SIZE - ); - ss_start = new_ss_start; - ss_end = new_ss_end; - slice_width = ss_end - ss_start; - } + let ss_start = mz_array.partition_point(|&x| x < left_e); + let ss_end = mz_array.partition_point(|&x| x <= right_e); let abs_im_tol = im * im_tol; let left_im = im - abs_im_tol; @@ -287,55 +281,33 @@ fn dumbcentroid_frame( let mut num_includable = 0; for i in ss_start..ss_end { let im_i = ims_array[i]; - if !included[i] && intensity_array[i] > 0.0 && im_i >= left_im && im_i <= right_im { + if (intensity_array[i] > 0.0) && im_i >= left_im && im_i <= right_im { curr_intensity += intensity_array[i]; + intensity_array[i] = -1.0; num_includable += 1; - include_buff[i - ss_start] = true; } } + assert!(num_includable > 0, "At least 'itself' should be included"); + agg_buff.push(ImsPeak { mz, intensity: curr_intensity, im, }); - included[ss_start..ss_end] - .iter_mut() - .zip(include_buff.iter_mut().take(slice_width)) - .for_each(|(t, tb)| { - *t = true; - *tb = false; - }); global_num_included += num_includable; - const MAX_PEAKS: usize = 10000; - if agg_buff.len() > MAX_PEAKS { - let curr_loc_int = intensity_array[idx]; - if curr_loc_int > 200.0 { - log::debug!( - "Reached limit of the agg buffer at index {}/{} curr int={}", - idx, - arr_len, - curr_loc_int - ); - } - break; - } if global_num_included == arr_len { break; } } - // This just makes sure that I am correctly reseting - // the buffer on each iteration. - assert!(include_buff.iter().all(|x| !x), "{:?}", include_buff); - agg_buff.sort_unstable_by(|a, b| a.mz.partial_cmp(&b.mz).unwrap()); // println!("Centroiding: Start len: {}; end len: {};", arr_len, result.len()); // Ultra data is usually start: 40k end 10k, // HT2 data is usually start 400k end 40k, limiting to 10k // rarely leaves peaks with intensity > 200 ... ive never seen - // it happen. + // it happen. -JSP 2025-Jan agg_buff.into_iter() .map(|x| (x.mz, (x.intensity, x.im))) diff --git a/crates/sage/src/spectrum.rs b/crates/sage/src/spectrum.rs index af36b874..e5f2849f 100644 --- a/crates/sage/src/spectrum.rs +++ b/crates/sage/src/spectrum.rs @@ -1,6 +1,5 @@ use crate::database::binary_search_slice; use crate::mass::{Tolerance, NEUTRON, PROTON}; -use crate::spectrum; /// A charge-less peak at monoisotopic mass #[derive(PartialEq, Copy, Clone, Default, Debug)] From b6f641fa98537eedce17b6bf0e7e60dafd20e613 Mon Sep 17 00:00:00 2001 From: "J. Sebastian Paez" Date: Tue, 21 Jan 2025 15:56:15 -0800 Subject: [PATCH 09/11] feat,format: pushdown of parallel reading and cargo fmt --- crates/sage-cli/src/input.rs | 4 +- crates/sage-cli/src/runner.rs | 124 +++++++++++++++++++----------- crates/sage-cloudpath/src/lib.rs | 1 + crates/sage-cloudpath/src/tdf.rs | 20 +++-- crates/sage-cloudpath/src/util.rs | 48 ++++++++---- crates/sage/src/database.rs | 2 +- crates/sage/src/lfq.rs | 8 +- 7 files changed, 137 insertions(+), 70 deletions(-) diff --git a/crates/sage-cli/src/input.rs b/crates/sage-cli/src/input.rs index 0b3ac198..cf7f14e7 100644 --- a/crates/sage-cli/src/input.rs +++ b/crates/sage-cli/src/input.rs @@ -92,7 +92,9 @@ impl From for LfqSettings { integration: value.integration.unwrap_or(default.integration), spectral_angle: value.spectral_angle.unwrap_or(default.spectral_angle).abs(), ppm_tolerance: value.ppm_tolerance.unwrap_or(default.ppm_tolerance).abs(), - mobility_pct_tolerance: value.mobility_pct_tolerance.unwrap_or(default.mobility_pct_tolerance), + mobility_pct_tolerance: value + .mobility_pct_tolerance + .unwrap_or(default.mobility_pct_tolerance), combine_charge_states: value .combine_charge_states .unwrap_or(default.combine_charge_states), diff --git a/crates/sage-cli/src/runner.rs b/crates/sage-cli/src/runner.rs index ad503830..8d76a5e6 100644 --- a/crates/sage-cli/src/runner.rs +++ b/crates/sage-cli/src/runner.rs @@ -5,7 +5,7 @@ use anyhow::Context; use csv::ByteRecord; use log::info; use rayon::prelude::*; -use sage_cloudpath::CloudPath; +use sage_cloudpath::{CloudPath, FileFormat}; use sage_core::database::{IndexedDatabase, Parameters, PeptideIx}; use sage_core::fasta::Fasta; use sage_core::ion_series::Kind; @@ -32,37 +32,53 @@ struct RawSpectrumAccumulator { pub msn: Vec, } +impl RawSpectrumAccumulator { + pub fn fold_op(mut self, rhs: RawSpectrum) -> Self { + if rhs.ms_level == 1 { + self.ms1.push(rhs); + } else { + self.msn.push(rhs); + } + self + } + + pub fn reduce(mut self, other: Self) -> Self { + self.ms1.extend(other.ms1); + self.msn.extend(other.msn); + self + } +} + impl FromParallelIterator for RawSpectrumAccumulator { fn from_par_iter(par_iter: I) -> Self where I: IntoParallelIterator, { - - par_iter .into_par_iter() .fold( RawSpectrumAccumulator::default, - |mut accum, spectrum| { - if spectrum.ms_level == 1 { - accum.ms1.push(spectrum); - } else { - accum.msn.push(spectrum); - } - accum - }, + RawSpectrumAccumulator::fold_op, ) .reduce( RawSpectrumAccumulator::default, - |mut a, b| { - a.ms1.extend(b.ms1); - a.msn.extend(b.msn); - a - }, + RawSpectrumAccumulator::reduce, ) } } +impl FromIterator for RawSpectrumAccumulator { + fn from_iter(iter: I) -> Self + where + I: IntoIterator, + { + iter.into_iter().fold( + RawSpectrumAccumulator::default(), + RawSpectrumAccumulator::fold_op, + ) + } +} + impl Runner { pub fn new(parameters: Search, parallel: usize) -> anyhow::Result { let mut parameters = parameters.clone(); @@ -356,32 +372,50 @@ impl Runner { min_deisotope_mz.unwrap_or(0.0), ); - let spectra: RawSpectrumAccumulator = chunk - .par_iter() - .enumerate() - .flat_map(|(idx, path)| { - let file_id = chunk_idx * batch_size + idx; - let res = sage_cloudpath::util::read_spectra( - path, - file_id, - sn, - self.parameters.bruker_spectrum_processor, - self.requires_ms1(), - ); + // If the file format supports parallel reading, then we can read + // then it is faster to read each file in series. (since each spectra + // will be processed internally in parallel). + let file_serial_read = chunk + .iter() + .all(|path| FileFormat::from(path.as_ref()).within_file_parallel()); + log::trace!("file serial read: {}", file_serial_read); + let inner_closure = |(idx, path)| { + let file_id = chunk_idx * batch_size + idx; + let res = sage_cloudpath::util::read_spectra( + path, + file_id, + sn, + self.parameters.bruker_spectrum_processor, + self.requires_ms1(), + ); - match res { - Ok(s) => { - log::trace!("- {}: read {} spectra", path, s.len()); - Ok(s) - } - Err(e) => { - log::error!("- {}: {}", path, e); - Err(e) - } + match res { + Ok(s) => { + log::trace!("- {}: read {} spectra", path, s.len()); + Ok(s) } - }) - .flatten() - .collect(); + Err(e) => { + log::error!("- {}: {}", path, e); + Err(e) + } + } + }; + + let spectra: RawSpectrumAccumulator = if file_serial_read { + chunk + .iter() + .enumerate() + .flat_map(inner_closure) + .flatten() + .collect() + } else { + chunk + .par_iter() + .enumerate() + .flat_map(inner_closure) + .flatten() + .collect() + }; let msn_spectra = spectra .msn @@ -389,6 +423,8 @@ impl Runner { .map(|s| sp.process(s)) .collect::>(); + // If all the MS1 spectra contain IMS, then we can process them + // we use the IMS! otherwise we dont. // Note: Empty iterators return true. let all_contain_ims = spectra.ms1.iter().all(|x| x.mobility.is_some()); let ms1_empty = spectra.ms1.is_empty(); @@ -878,9 +914,11 @@ impl Runner { ); record.push_field( itoa::Buffer::new() - .format( - if feature.charge < 2 || feature.charge > 6 { feature.charge } else { 0 }, - ) + .format(if feature.charge < 2 || feature.charge > 6 { + feature.charge + } else { + 0 + }) .as_bytes(), ); record.push_field(itoa::Buffer::new().format(feature.peptide_len).as_bytes()); diff --git a/crates/sage-cloudpath/src/lib.rs b/crates/sage-cloudpath/src/lib.rs index 5cc53444..dcebad02 100644 --- a/crates/sage-cloudpath/src/lib.rs +++ b/crates/sage-cloudpath/src/lib.rs @@ -10,6 +10,7 @@ pub mod mgf; pub mod mzml; pub mod tdf; pub mod util; +pub use util::FileFormat; #[cfg(feature = "parquet")] pub mod parquet; diff --git a/crates/sage-cloudpath/src/tdf.rs b/crates/sage-cloudpath/src/tdf.rs index 96a4ec68..8f76ea34 100644 --- a/crates/sage-cloudpath/src/tdf.rs +++ b/crates/sage-cloudpath/src/tdf.rs @@ -53,7 +53,11 @@ impl TdfReader { .map(|&x| mz_converter.convert(x as f64) as f32) .collect(); let corr_factor = frame.intensity_correction_factor as f32; - let intensity: Vec = frame.intensities.iter().map(|&x| x as f32 * corr_factor).collect(); + let intensity: Vec = frame + .intensities + .iter() + .map(|&x| x as f32 * corr_factor) + .collect(); let imss = Self::expand_mobility(&frame.scan_offsets, &ims_converter); assert_eq!(mz.len(), intensity.len(), "{:?}", frame); assert_eq!(mz.len(), imss.len(), "{:?}", frame); @@ -66,8 +70,7 @@ impl TdfReader { .unwrap_or(std::cmp::Ordering::Equal) }); let sorted_mz: Vec = indices.iter().map(|&i| mz[i]).collect(); - let sorted_inten: Vec = - indices.iter().map(|&i| intensity[i]).collect(); + let sorted_inten: Vec = indices.iter().map(|&i| intensity[i]).collect(); let sorted_imss: Vec = indices.iter().map(|&i| imss[i]).collect(); // Squash the mobility dimension @@ -197,14 +200,13 @@ impl TdfReader { } } - /// Centroiding of the IM-containing spectra /// /// This is a very rudimentary centroiding algorithm but... it seems to work well. /// It iterativelty goes over the peaks in decreasing intensity order and /// accumulates the intensity of the peaks surrounding the peak. (sort of /// like the first pass in dbscan). -/// +/// /// This dramatically reduces the number of peaks in the spectra /// which saves a ton of memory and time when doing LFQ, since we /// iterate over each peak. @@ -218,7 +220,10 @@ fn dumbcentroid_frame( // Make sure the mz array is sorted // In theory I could use the type system to enforce this but I dont // think it is worth it, its not that slow and its simple. - assert!(mz_array.windows(2).all(|x| x[0] <= x[1]), "mz_array is not sorted"); + assert!( + mz_array.windows(2).all(|x| x[0] <= x[1]), + "mz_array is not sorted" + ); let arr_len = mz_array.len(); let mut global_num_included = 0; @@ -309,7 +314,8 @@ fn dumbcentroid_frame( // rarely leaves peaks with intensity > 200 ... ive never seen // it happen. -JSP 2025-Jan - agg_buff.into_iter() + agg_buff + .into_iter() .map(|x| (x.mz, (x.intensity, x.im))) .unzip() } diff --git a/crates/sage-cloudpath/src/util.rs b/crates/sage-cloudpath/src/util.rs index be56dc53..9885ff49 100644 --- a/crates/sage-cloudpath/src/util.rs +++ b/crates/sage-cloudpath/src/util.rs @@ -4,13 +4,44 @@ use serde::Serialize; use tokio::io::AsyncReadExt; #[derive(Debug, PartialEq, Eq)] -enum FileFormat { +pub enum FileFormat { MzML, MGF, TDF, Unidentified, } +impl FileFormat { + /// Does this file format support parallel reading? + /// By this I mean that there is 'within' file parallelism that + /// would make it faster to read than reading mutiple files in + /// parallel. (is giving 4 cores to read 1 file 4 times, faster + /// than giving 1 cores to read 1 file and read 4 at the same time) + pub fn within_file_parallel(&self) -> bool { + match self { + FileFormat::MzML => false, + FileFormat::MGF => false, + FileFormat::TDF => true, + FileFormat::Unidentified => false, + } + } +} + +impl From<&str> for FileFormat { + fn from(s: &str) -> Self { + let path_lower = s.to_lowercase(); + if path_lower.ends_with(".mgf.gz") || path_lower.ends_with(".mgf") { + FileFormat::MGF + } else if is_bruker(&path_lower) { + FileFormat::TDF + } else if path_lower.ends_with(".mzml.gz") || path_lower.ends_with(".mzml") { + FileFormat::MzML + } else { + FileFormat::Unidentified + } + } +} + const BRUKER_EXTENSIONS: [&str; 5] = [".d", ".tdf", ".tdf_bin", "ms2", "raw"]; fn is_bruker(path: &str) -> bool { @@ -25,19 +56,6 @@ fn is_bruker(path: &str) -> bool { }) } -fn identify_format(s: &str) -> FileFormat { - let path_lower = s.to_lowercase(); - if path_lower.ends_with(".mgf.gz") || path_lower.ends_with(".mgf") { - FileFormat::MGF - } else if is_bruker(&path_lower) { - FileFormat::TDF - } else if path_lower.ends_with(".mzml.gz") || path_lower.ends_with(".mzml") { - FileFormat::MzML - } else { - FileFormat::Unidentified - } -} - pub fn read_spectra>( path: S, file_id: usize, @@ -45,7 +63,7 @@ pub fn read_spectra>( bruker_processor: BrukerSpectrumProcessor, requires_ms1: bool, ) -> Result, Error> { - match identify_format(path.as_ref()) { + match FileFormat::from(path.as_ref()) { FileFormat::MzML => read_mzml(path, file_id, sn), FileFormat::MGF => read_mgf(path, file_id), FileFormat::TDF => read_tdf(path, file_id, bruker_processor, requires_ms1), diff --git a/crates/sage/src/database.rs b/crates/sage/src/database.rs index 506190ad..4046f2e8 100644 --- a/crates/sage/src/database.rs +++ b/crates/sage/src/database.rs @@ -280,7 +280,7 @@ impl Parameters { .flat_map(|kind| IonSeries::new(peptide, *kind).enumerate()) .filter(|(ion_idx, ion)| { // Don't store b1, b2, y1, y2 ions for preliminary scoring - + match ion.kind { Kind::A | Kind::B | Kind::C => (ion_idx + 1) > self.min_ion_index, Kind::X | Kind::Y | Kind::Z => { diff --git a/crates/sage/src/lfq.rs b/crates/sage/src/lfq.rs index 5c523a8e..0a0ef60f 100644 --- a/crates/sage/src/lfq.rs +++ b/crates/sage/src/lfq.rs @@ -106,9 +106,11 @@ pub fn build_feature_map( // } else { // feat.calcmass // }; - let (mobility_lo, mobility_hi) = - Tolerance::Pct(-settings.mobility_pct_tolerance, settings.mobility_pct_tolerance) - .bounds(feat.ims); + let (mobility_lo, mobility_hi) = Tolerance::Pct( + -settings.mobility_pct_tolerance, + settings.mobility_pct_tolerance, + ) + .bounds(feat.ims); map.insert( feat.peptide_idx, PrecursorRange { From 43cc4d4f7f83f2abc3c7b45c8e578cf4d9ae7769 Mon Sep 17 00:00:00 2001 From: "J. Sebastian Paez" Date: Tue, 21 Jan 2025 16:26:26 -0800 Subject: [PATCH 10/11] fix: updated tests for raw format id --- crates/sage-cloudpath/src/util.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/crates/sage-cloudpath/src/util.rs b/crates/sage-cloudpath/src/util.rs index 9885ff49..8b46590f 100644 --- a/crates/sage-cloudpath/src/util.rs +++ b/crates/sage-cloudpath/src/util.rs @@ -167,12 +167,12 @@ mod test { #[test] fn test_identify_format() { - assert_eq!(identify_format("foo.mzml"), FileFormat::MzML); - assert_eq!(identify_format("foo.mzML"), FileFormat::MzML); - assert_eq!(identify_format("foo.mgf"), FileFormat::MGF); - assert_eq!(identify_format("foo.mgf.gz"), FileFormat::MGF); - assert_eq!(identify_format("foo.tdf"), FileFormat::TDF); - assert_eq!(identify_format("./tomato/foo.d"), FileFormat::TDF); - assert_eq!(identify_format("./tomato/foo.d/"), FileFormat::TDF); + assert_eq!(FileFormat::from("foo.mzml"), FileFormat::MzML); + assert_eq!(FileFormat::from("foo.mzML"), FileFormat::MzML); + assert_eq!(FileFormat::from("foo.mgf"), FileFormat::MGF); + assert_eq!(FileFormat::from("foo.mgf.gz"), FileFormat::MGF); + assert_eq!(FileFormat::from("foo.tdf"), FileFormat::TDF); + assert_eq!(FileFormat::from("./tomato/foo.d"), FileFormat::TDF); + assert_eq!(FileFormat::from("./tomato/foo.d/"), FileFormat::TDF); } } From 95257917f83cf49d69f1c0504a2dfd3ad1e393f7 Mon Sep 17 00:00:00 2001 From: "J. Sebastian Paez" Date: Fri, 14 Feb 2025 09:21:37 -0800 Subject: [PATCH 11/11] chore: updated outdated comment --- crates/sage-cloudpath/src/tdf.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/crates/sage-cloudpath/src/tdf.rs b/crates/sage-cloudpath/src/tdf.rs index 8f76ea34..21969814 100644 --- a/crates/sage-cloudpath/src/tdf.rs +++ b/crates/sage-cloudpath/src/tdf.rs @@ -249,10 +249,6 @@ fn dumbcentroid_frame( for &idx in &order { if intensity_array[idx] <= 0.0 { - // In theory ... if I set the intensity as mutable - // I could remove the use of the 'included' vector - // and just set to -1.0 the intensity of the peaks - // I have already included. continue; } if agg_buff.len() > MAX_PEAKS {