diff --git a/python/quantmsio/quantms_io/core/diann_convert.py b/python/quantmsio/quantms_io/core/diann_convert.py index 16f86d9..5c177e8 100644 --- a/python/quantmsio/quantms_io/core/diann_convert.py +++ b/python/quantmsio/quantms_io/core/diann_convert.py @@ -15,8 +15,12 @@ from quantms_io.core.feature_in_memory import FeatureInMemory from quantms_io.core.psm import PSMHandler from collections import Counter + +import concurrent.futures + from quantms_io.utils.thread import MyThread from typing import Any, List, Tuple, Dict, Set +import duckdb import swifter MODIFICATION_PATTERN = re.compile(r"\((.*?)\)") @@ -480,8 +484,7 @@ def extract_dict_from_pep(self, pr_path: str, bset_score_map: dict) -> dict: return map_dict - def extract_from_psm_to_pep_msg(self, report_path: str, qvalue_threshold: float, folder: str, - uniq_masses: Dict, index_ref: Dict, map_dict: Dict, psm_output_path:str, chunksize: int): + def extract_from_psm_to_pep_msg(self, report_path: str, qvalue_threshold: float, folder: str,uniq_masses: Dict, index_ref: Dict, map_dict: Dict, psm_output_path:str, chunksize: int): """ The extract_from_psm_to_pep_msg method is part of the DiaNNConvert class and is responsible for extracting relevant information from a PSM (Peptide-Spectrum Match) report and converting it into a peptide message format. @@ -534,6 +537,154 @@ def intergrate_msg(folder,n,group): + ## Score at PSM level: Q.Value + out_mztab_psh = out_mztab_psh[ + [ + "Stripped.Sequence", + "Protein.Ids", + "Q.Value", + "RT.Start", + "Precursor.Charge", + "Calculate.Precursor.Mz", + "exp_mass_to_charge", + "Modified.Sequence", + "PEP", + "Global.Q.Value", + "Global.Q.Value", + "opt_global_spectrum_reference", + "ms_run", + ] + ] + out_mztab_psh.columns = [ + "sequence", + "accession", + "search_engine_score[1]", + "retention_time", + "charge", + "calc_mass_to_charge", + "exp_mass_to_charge", + "opt_global_cv_MS:1000889_peptidoform_sequence", + "opt_global_SpecEValue_score", + "opt_global_q-value", + "opt_global_q-value_score", + "opt_global_spectrum_reference", + "ms_run", + ] + + out_mztab_psh.loc[:, "opt_global_cv_MS:1002217_decoy_peptide"] = "0" + out_mztab_psh.reset_index(drop=True,inplace=True) + out_mztab_psh.loc[:, "unique"] = out_mztab_psh["accession"].swifter.apply(lambda x: "0" if ";" in str(x) else "1") + + null_col = [ + "start", + "end", + "opt_global_feature_id", + "opt_global_map_index", + ] + out_mztab_psh.loc[:, null_col] = "null" + + out_mztab_psh.loc[:, "modifications"] = out_mztab_psh["opt_global_cv_MS:1000889_peptidoform_sequence"].swifter.apply( + lambda x: find_modification(x) + ) + out_mztab_psh.loc[:, "spectra_ref"] = 'ms_run[' + out_mztab_psh["ms_run"].astype(str).values + ']:' + out_mztab_psh["opt_global_spectrum_reference"].values + + out_mztab_psh.loc[:, "opt_global_cv_MS:1000889_peptidoform_sequence"] = out_mztab_psh["opt_global_cv_MS:1000889_peptidoform_sequence"].apply( + lambda x: AASequence.fromString(x).toString() + ) + + new_cols = [col for col in out_mztab_psh.columns if not col.startswith("opt_")] + [ + col for col in out_mztab_psh.columns if col.startswith("opt_") + ] + out_mztab_psh = out_mztab_psh[new_cols] + out_mztab_psh.loc[:, 'scan_number'] = out_mztab_psh['spectra_ref'].swifter.apply(lambda x: generate_scan_number(x)) + out_mztab_psh['spectra_ref'] = out_mztab_psh['spectra_ref'].swifter.apply(lambda x: self._ms_runs[x.split(":")[0]]) + parquet_table = self.generate_psm_file(out_mztab_psh) + + if not pqwriter: + # create a parquet write object giving it an output file + pqwriter = pq.ParquetWriter(psm_output_path, parquet_table.schema) + pqwriter.write_table(parquet_table) + + spectra_count_dict = self.__get_spectra_count(out_mztab_psh, spectra_count_dict) + map_dict, psm_unique_keys = self.__extract_from_psm_to_pep_msg(out_mztab_psh, map_dict, psm_unique_keys) + if pqwriter: + pqwriter.close() + return map_dict, spectra_count_dict + + def get_report_from_database(self,report_path,runs,qvalue_threshold,uniq_masses): + database = duckdb.query( + """ + select "File.Name","Run","Protein.Group","Protein.Names","Protein.Ids","First.Protein.Description","PG.MaxLFQ","RT.Start","Global.Q.Value","Lib.Q.Value","PEP","Precursor.Normalised","Precursor.Id","Q.Value","Modified.Sequence","Stripped.Sequence","Precursor.Charge","Precursor.Quantity","Global.PG.Q.Value" from '{}' + where Run IN {} + """.format(report_path,tuple(runs)) + ) + report = database.df() + report = report[report["Q.Value"] < qvalue_threshold] + mass_vector = report["Modified.Sequence"].map(uniq_masses) + report["Calculate.Precursor.Mz"] = (mass_vector + (PROTON_MASS_U * report["Precursor.Charge"])) / report[ + "Precursor.Charge" + ] + return report + + def extract_from_psm_to_pep_msg_for_database(self, report_path: str, qvalue_threshold: float, folder: str, + uniq_masses: Dict, index_ref: Dict, map_dict: Dict, psm_output_path:str, chunksize: int): + """ + The extract_from_psm_to_pep_msg method is part of the DiaNNConvert class and is responsible for extracting + relevant information from a PSM (Peptide-Spectrum Match) report and converting it into a peptide message format. + + :param report_path: The path to the PSM report file in TSV format. + :param qvalue_threshold: The threshold value for the Q.Value score. + :param folder: The path to the folder containing additional information files. + :param uniq_masses: A dictionary mapping unique peptide sequences to their corresponding mass values. + :param index_ref: A dictionary containing index and reference mappings. + :param map_dict: A dictionary used for mapping values. + :param chunksize: The number of rows to read at a time from the PSM report file. + :return: A pandas DataFrame containing the extracted information. + """ + + psm_unique_keys = [] + spectra_count_dict = Counter() + info_list = [mzml.replace('_mzml_info.tsv','') for mzml in os.listdir(folder) if mzml.endswith('_mzml_info.tsv')] + info_list = [info_list[i:i+60] for i in range(0,len(info_list),60)] + + pqwriter = None + #for report in self.main_report_df(report_path, qvalue_threshold, chunksize, uniq_masses): + for refs in info_list: + report = self.get_report_from_database(report_path,refs,qvalue_threshold,uniq_masses) + report = report.merge(index_ref[["ms_run", "Run", "study_variable"]], on="Run", validate="many_to_one") + + + def intergrate_msg(n): + nonlocal report + nonlocal folder + files = list(Path(folder).glob(f"*{n}_mzml_info.tsv")) + if not files: + raise ValueError(f"Could not find {n} info file in {dir}") + target = pd.read_csv(files[0],sep='\t',usecols=["Retention_Time", "SpectrumID", "Exp_Mass_To_Charge"]) + group = report[report['Run']==n].copy() + group.sort_values(by="RT.Start", inplace=True) + target.rename(columns={"Retention_Time": "RT.Start", "SpectrumID": "opt_global_spectrum_reference", + "Exp_Mass_To_Charge": "exp_mass_to_charge"}, inplace=True) + target["RT.Start"] = target["RT.Start"] / 60 + res = pd.merge_asof(group, target, on="RT.Start", direction="nearest") + return res + + #out_mztab_psh = pd.DataFrame() + + with concurrent.futures.ThreadPoolExecutor(60) as executor: + results = executor.map(intergrate_msg,refs) + ''' + ThreadPool = [] + for ref in refs: + ThreadPool.append(MyThread(target=intergrate_msg, args=(folder,ref,report[report['Run']==ref].copy()))) + for p in ThreadPool: + p.start() + for p in ThreadPool: + p.join() + ''' + out_mztab_psh = pd.concat([result for result in results]) + + ## Score at PSM level: Q.Value out_mztab_psh = out_mztab_psh[ [ @@ -749,7 +900,14 @@ def generate_feature_and_psm_file(self, report_path: str, design_file: str, fast st = time.time() map_dict = self.extract_dict_from_pep(pr_path, self.peptide_best_score_dict) - map_dict, spectra_count_dict = self.extract_from_psm_to_pep_msg(report_path, + file_list = [mzml.replace('_mzml_info.tsv','') for mzml in os.listdir(mzml_info_folder) if mzml.endswith('_mzml_info.tsv')] + if len(file_list) < 200: + map_dict, spectra_count_dict = self.extract_from_psm_to_pep_msg(report_path, + qvalue_threshold, + mzml_info_folder, self.uniq_masses_map, + index_ref, map_dict, psm_output_path, chunksize) + else: + map_dict, spectra_count_dict = self.extract_from_psm_to_pep_msg_for_database(report_path, qvalue_threshold, mzml_info_folder, self.uniq_masses_map, index_ref, map_dict, psm_output_path, chunksize)