From ae65a4d79d80984898030627786f5bbfe64d2f11 Mon Sep 17 00:00:00 2001 From: zprobot <1727697083@qq.com> Date: Thu, 12 Dec 2024 19:40:09 +0800 Subject: [PATCH] update --- quantmsio/commands/diann_command.py | 59 ++++++++++++++++++++ quantmsio/core/ae.py | 4 +- quantmsio/core/common.py | 19 ++++++- quantmsio/core/diann.py | 85 ++++++++++++++++++++++++----- quantmsio/core/format.py | 62 +++++++++++++++++++++ quantmsio/core/maxquant.py | 64 +++++++++++++++++++--- quantmsio/operate/plots.py | 1 - quantmsio/operate/tools.py | 36 ++++++++++++ quantmsio/quantmsioc.py | 3 +- 9 files changed, 305 insertions(+), 28 deletions(-) diff --git a/quantmsio/commands/diann_command.py b/quantmsio/commands/diann_command.py index 82d947e..eb529fd 100644 --- a/quantmsio/commands/diann_command.py +++ b/quantmsio/commands/diann_command.py @@ -114,3 +114,62 @@ def diann_convert_to_parquet( file_num=file_num, protein_file=protein_file, ) + + +@click.command( + "convert-diann-pg", + short_help="Convert diann_report to pg file of quantms.io format", +) +@click.option( + "--report_path", + help="the diann report file path", + required=True, +) +@click.option( + "--output_folder", + help="Folder where the Json file will be generated", + required=True, +) +@click.option( + "--output_prefix_file", + help="Prefix of the Json file needed to generate the file name", + required=False, +) +@click.option( + "--duckdb_max_memory", help="The maximum amount of memory allocated by the DuckDB engine (e.g 4GB)", required=False +) +@click.option("--duckdb_threads", help="The number of threads for the DuckDB engine (e.g 4)", required=False) +@click.option( + "--file_num", + help="The number of files being processed at the same time", + default=100, +) +def diann_pg_convert_to_parquet( + report_path: str, + output_folder: str, + output_prefix_file: str, + duckdb_max_memory: str, + duckdb_threads: int, + file_num: int, +): + if report_path is None is None or output_folder is None: + raise click.UsageError("Please provide all the required parameters") + + if not os.path.exists(output_folder): + os.makedirs(output_folder) + + if not output_prefix_file: + output_prefix_file = "pg" + filename = create_uuid_filename(output_prefix_file, ".pg.parquet") + pg_output_path = output_folder + "/" + filename + + dia_nn = DiaNNConvert( + diann_report=report_path, + sdrf_path=None, + duckdb_max_memory=duckdb_max_memory, + duckdb_threads=duckdb_threads, + ) + dia_nn.write_pg_matrix_to_file( + output_path= pg_output_path, + file_num=file_num + ) \ No newline at end of file diff --git a/quantmsio/core/ae.py b/quantmsio/core/ae.py index 3571120..19663cd 100644 --- a/quantmsio/core/ae.py +++ b/quantmsio/core/ae.py @@ -17,7 +17,7 @@ def get_ibaq_columns(path): with open(path) as f: line = f.readline() - return line.split("\n")[0].split(",") + return line.split("\n")[0].split("\t") class AbsoluteExpressionHander: @@ -66,7 +66,7 @@ def load_ibaq_file(self, path, protein_str=None): for col in usecols: if col not in ibaq_columns: raise Exception(f"Not found {col} in ibaq file") - ibaqs = pd.read_csv(path, usecols=usecols) + ibaqs = pd.read_csv(path, usecols=usecols, sep="\t") ibaqs.rename(columns=AbsoluteExpressionHander.LABEL_MAP, inplace=True) if protein_str: ibaqs = ibaqs[ibaqs["protein"].str.contains(f"{protein_str}", na=False)] diff --git a/quantmsio/core/common.py b/quantmsio/core/common.py index 72c5036..dc25bb6 100644 --- a/quantmsio/core/common.py +++ b/quantmsio/core/common.py @@ -1,5 +1,5 @@ from quantmsio import __version__ -from quantmsio.core.format import PSM_FIELDS, FEATURE_FIELDS, IBAQ_FIELDS +from quantmsio.core.format import PSM_FIELDS, FEATURE_FIELDS, IBAQ_FIELDS, PG_FIELDS import pyarrow as pa PSM_MAP = { @@ -66,7 +66,19 @@ "Genes": "gg_names", "Run": "run", } +DIANN_PG_MAP = { + "Protein.Group": "pg_accessions", + "Protein.Names": "pg_names", + "Genes": "gg_accessions", + "Run": "reference_file_name", + "Global.PG.Q.Value": "global_qvalue", + 'PG.Quantity': "intensity", + 'PG.Normalised': "normalize_intensity", + "PG.MaxLFQ": "lfq", + 'PG.Q.Value': "qvalue" +} DIANN_USECOLS = list(DIANN_MAP.keys()) +DIANN_PG_USECOLS = list(DIANN_PG_MAP.keys()) MAXQUANT_PSM_MAP = { "Sequence": "sequence", @@ -135,3 +147,8 @@ IBAQ_FIELDS, metadata={"description": "ibaq file in quantms.io format"}, ) +PG_SCHEMA = pa.schema( + PG_FIELDS, + metadata={"description": "PG file in quantms.io format"}, +) + diff --git a/quantmsio/core/diann.py b/quantmsio/core/diann.py index 8e332ef..ef7452e 100644 --- a/quantmsio/core/diann.py +++ b/quantmsio/core/diann.py @@ -3,9 +3,11 @@ import numpy as np import pandas as pd import os +import pyarrow as pa import pyarrow.parquet as pq import concurrent.futures from pathlib import Path +from collections import defaultdict from pyopenms import AASequence from pyopenms.Constants import PROTON_MASS_U from quantmsio.operate.tools import get_ahocorasick @@ -15,27 +17,22 @@ from quantmsio.core.feature import Feature from quantmsio.core.duckdb import DuckDB from quantmsio.utils.pride_utils import generate_scan_number -from quantmsio.core.common import DIANN_MAP, DIANN_USECOLS +from quantmsio.core.common import DIANN_MAP, DIANN_USECOLS, DIANN_PG_MAP, DIANN_PG_USECOLS, PG_SCHEMA DIANN_SQL = ", ".join([f'"{name}"' for name in DIANN_USECOLS]) - +DIANN_PG_SQL = ", ".join([f'"{name}"' for name in DIANN_PG_USECOLS]) class DiaNNConvert(DuckDB): - def __init__(self, diann_report, sdrf_path, duckdb_max_memory="16GB", duckdb_threads=4): + def __init__(self, diann_report, sdrf_path=None, duckdb_max_memory="16GB", duckdb_threads=4): super(DiaNNConvert, self).__init__(diann_report, duckdb_max_memory, duckdb_threads) - self.init_duckdb() - self._sdrf = SDRFHandler(sdrf_path) - self._mods_map = self._sdrf.get_mods_dict() - self._automaton = get_ahocorasick(self._mods_map) - self._sample_map = self._sdrf.get_sample_map_run() - - def init_duckdb(self): - - self._duckdb.execute("""CREATE INDEX idx_precursor_q ON report ("Precursor.Id", "Q.Value")""") - self._duckdb.execute("""CREATE INDEX idx_run ON report ("Run")""") + if sdrf_path: + self._sdrf = SDRFHandler(sdrf_path) + self._mods_map = self._sdrf.get_mods_dict() + self._automaton = get_ahocorasick(self._mods_map) + self._sample_map = self._sdrf.get_sample_map_run() - def get_report_from_database(self, runs: list) -> pd.DataFrame: + def get_report_from_database(self, runs: list, sql:str = DIANN_SQL) -> pd.DataFrame: """ This function loads the report from the duckdb database for a group of ms_runs. :param runs: A list of ms_runs @@ -48,7 +45,7 @@ def get_report_from_database(self, runs: list) -> pd.DataFrame: from report where Run IN {} """.format( - DIANN_SQL, tuple(runs) + sql, tuple(runs) ) ) report = database.df() @@ -90,6 +87,46 @@ def get_peptide_map_from_database(self): logging.info("Time to load peptide map {} seconds".format(et)) return best_ref_map + def get_peptide_count(self, df): + peptide_count = defaultdict(int) + for proteins in df["pg_accessions"]: + for protein in proteins.split(";"): + peptide_count[protein] += 1 + return peptide_count + + def generate_pg_matrix(self, report): + peptide_count = self.get_peptide_count(report) + report.drop_duplicates(subset=["pg_accessions"],inplace=True) + report["gg_accessions"] = report["gg_accessions"].str.split(";") + report["pg_names"] = report["pg_names"].str.split(";") + report["reference_file_name"] = report["reference_file_name"].apply(lambda x: x.split(".")[0]) + report["pg_accessions"] = report["pg_accessions"].str.split(";") + report.loc[:,"peptides"] = report["pg_accessions"].apply( + lambda proteins: [{ + "protein_name": protein, + "peptide_count": peptide_count[protein] + } for protein in proteins] + ) + report.loc[:, "is_decoy"] = 0 + report.loc[:, "additional_intensities"] = report[["normalize_intensity","lfq"]].apply( + lambda rows: [ + { + "intensity_name": name, + "intensity_value": rows[name] + } for name in ["normalize_intensity","lfq"] + ], + axis=1, + ) + report.loc[:, "additional_scores"] = report["qvalue"].apply( + lambda value: [{ + "score_name": "qvalue", + "score_value": value + }] + ) + report.loc[:, "contaminant"] = None + report.loc[:, "anchor_protein"] = None + return report + def main_report_df(self, qvalue_threshold: float, mzml_info_folder: str, file_num: int, protein_str: str = None): def intergrate_msg(n): nonlocal report @@ -127,6 +164,7 @@ def intergrate_msg(n): for refs in info_list: report = self.get_report_from_database(refs) report.rename(columns=DIANN_MAP, inplace=True) + report.dropna(subset=["pg_accessions"], inplace=True) if protein_str: report = report[report["pg_accessions"].str.contains(f"{protein_str}", na=False)] # restrict @@ -228,6 +266,23 @@ def generate_feature( logging.info("Time to generate psm and feature file {} seconds".format(et)) yield report + def write_pg_matrix_to_file(self, output_path:str,file_num=20): + info_list = self.get_unique_references("Run") + info_list = [info_list[i : i + file_num] for i in range(0, len(info_list), file_num)] + pqwriter = None + for refs in info_list: + report = self.get_report_from_database(refs, DIANN_PG_SQL) + report.rename(columns=DIANN_PG_MAP, inplace=True) + report.dropna(subset=["pg_accessions"], inplace=True) + for ref in refs: + df = report[report["reference_file_name"] == ref].copy() + df = self.generate_pg_matrix(df) + pg_parquet = pa.Table.from_pandas(df, schema=PG_SCHEMA) + if not pqwriter: + pqwriter = pq.ParquetWriter(output_path, pg_parquet.schema) + pqwriter.write_table(pg_parquet) + close_file(pqwriter=pqwriter) + self.destroy_duckdb_database() def write_feature_to_file( self, qvalue_threshold: float, diff --git a/quantmsio/core/format.py b/quantmsio/core/format.py index 0219c4b..24a7c02 100644 --- a/quantmsio/core/format.py +++ b/quantmsio/core/format.py @@ -256,6 +256,68 @@ pa.field("intensity", pa.float32(), metadata={"description": "intensity value"}), ] +PG_FIELDS = [ + pa.field( + "pg_accessions", + pa.list_(pa.string()), + metadata={"description": "Protein group accessions of all the proteins that the peptide maps to"}, + ), + pa.field( + "pg_names", + pa.list_(pa.string()), + metadata={"description": "Protein group names"}, + ), + pa.field( + "gg_accessions", + pa.list_(pa.string()), + metadata={"description": "Gene group accessions, as a string array"}, + ), + pa.field( + "reference_file_name", + pa.string(), + metadata={"description": "Spectrum file name with no path information and not including the file extension"}, + ), + pa.field( + "global_qvalue", + pa.float32(), + metadata={"description": "Global q-value of the protein group at the experiment level"}, + ), + pa.field( + "intensity", + pa.float32(), + metadata={"description": "the intensity-based abundance of the protein group in the reference file"}, + ), + pa.field( + "additional_intensities", + pa.list_(pa.struct([("intensity_name", pa.string()), ("intensity_value", pa.float32())])), + metadata={"description": "The intensity-based abundance of the peptide in the sample"}, + ), + pa.field( + "is_decoy", + pa.int32(), + metadata={"description": "Decoy indicator, 1 if the PSM is a decoy, 0 target"}, + ), + pa.field( + "contaminant", + pa.int32(), + metadata={"description": "If the protein is a contaminant"}, + ), + pa.field( + "peptides", + pa.list_(pa.struct([("protein_name", pa.string()), ("peptide_count", pa.int32())])), + metadata={"description": "Number of peptides per protein in the protein group"}, + ), + pa.field( + "anchor_protein", + pa.string(), + metadata={"description": "The anchor protein of the protein group, leading protein or representative"}, + ), + pa.field( + "additional_scores", + pa.list_(pa.struct([("score_name", pa.string()), ("score_value", pa.float32())])), + metadata={"description": "List of structures, each structure contains two fields: name and value"}, + ) +] PSM_FIELDS = PEPTIDE_FIELDS + PSM_UNIQUE_FIELDS diff --git a/quantmsio/core/maxquant.py b/quantmsio/core/maxquant.py index cd449ec..faa5211 100644 --- a/quantmsio/core/maxquant.py +++ b/quantmsio/core/maxquant.py @@ -6,6 +6,8 @@ import codecs import os import pyarrow.parquet as pq +from typing import List +from pathlib import Path from pyopenms import ModificationsDB from pyopenms import AASequence from quantmsio.core.sdrf import SDRFHandler @@ -41,10 +43,10 @@ class MaxQuant: def __init__(self): pass - def iter_batch(self, file_path: str, label: str = "feature", chunksize: int = 100000, protein_str: str = None): - self.mods_map = self.get_mods_map(file_path) + def extract_col_msg(self, col_df, label: str = "feature"): + line = "\t".join(col_df.columns) + self.mods_map = self.get_mods_map(line) self._automaton = get_ahocorasick(self.mods_map) - col_df = pd.read_csv(file_path, sep="\t", nrows=1) if label == "feature": intensity_normalize_names = [] intensity_names = [] @@ -67,6 +69,11 @@ def iter_batch(self, file_path: str, label: str = "feature", chunksize: int = 10 col = f"{key} Probabilities" if col in col_df.columns: use_cols.append(col) + return use_map, use_cols + + def iter_batch(self, file_path: str, label: str = "feature", chunksize: int = 100000, protein_str: str = None): + col_df = pd.read_csv(file_path, sep="\t", nrows=0) + use_map, use_cols = self.extract_col_msg(col_df, label=label) for df in pd.read_csv( file_path, sep="\t", @@ -80,6 +87,33 @@ def iter_batch(self, file_path: str, label: str = "feature", chunksize: int = 10 df = self.main_operate(df) yield df + def open_from_zip_archive(self, zip_file, file_name, **kwargs): + """Open file from zip archive.""" + with zipfile.ZipFile(zip_file) as z: + with z.open(file_name) as f: + df = pd.read_csv(f, sep="\t", low_memory=False, **kwargs) + return df + + def read_zip_file(self, zip_path: str, **kwargs): + filepath = Path(zip_path) + df = self.open_from_zip_archive(zip_path, f"{filepath.stem}/evidence.txt", **kwargs) + return df + + def iter_zip_batch(self, zip_list: List[str], label: str = "feature", protein_str: str = None): + for zip_file in zip_list: + col_df = self.read_zip_file(zip_file, nrows=0) + use_map, use_cols = self.extract_col_msg(col_df, label=label) + df = self.read_zip_file( + zip_file, + usecols=use_cols, + ) + df.rename(columns=use_map, inplace=True) + #df["reference_file_name"] = zip_file.split(".")[0] + if protein_str: + df = df[df["mp_accessions"].str.contains(f"{protein_str}", na=False)] + df = self.main_operate(df) + yield df + def generete_calculated_mz(self, df): uniq_p = df["peptidoform"].unique() masses_map = {k: AASequence.fromString(k).getMonoWeight() for k in uniq_p} @@ -98,11 +132,7 @@ def _transform_mod(self, match): else: return None - def get_mods_map(self, report_path): - if os.stat(report_path).st_size == 0: - raise ValueError("File is empty") - f = codecs.open(report_path, "r", "utf-8") - line = f.readline() + def get_mods_map(self, line): pattern = r"sequence\s+(.*?)\s+(Missed|Proteins)" match = re.search(pattern, line, re.DOTALL) mod_seq = match.group(1) @@ -315,6 +345,24 @@ def write_feature_to_file( pqwriter.write_table(parquet) close_file(pqwriter=pqwriter) + def write_zip_feature_to_file( + self, + zip_list, + sdrf_path: str, + output_path: str, + protein_file=None, + ): + self._init_sdrf(sdrf_path) + pqwriter = None + for df in self.iter_zip_batch(zip_list, "feature", protein_str=protein_file): + self.transform_feature(df) + Feature.convert_to_parquet_format(df) + parquet = Feature.transform_feature(df) + if not pqwriter: + pqwriter = pq.ParquetWriter(output_path, parquet.schema) + pqwriter.write_table(parquet) + close_file(pqwriter=pqwriter) + def write_features_to_file( self, evidence_path: str, diff --git a/quantmsio/operate/plots.py b/quantmsio/operate/plots.py index 07bd5cc..b7c64f2 100644 --- a/quantmsio/operate/plots.py +++ b/quantmsio/operate/plots.py @@ -7,7 +7,6 @@ import seaborn as sns from quantmsio.operate.tools import transform_ibaq - def plot_distribution_of_ibaq(ibaq_path: str, save_path: str = None, selected_column: str = None) -> None: """ This function plots the distribution of the protein IBAQ values. diff --git a/quantmsio/operate/tools.py b/quantmsio/operate/tools.py index 940f10e..1e98083 100644 --- a/quantmsio/operate/tools.py +++ b/quantmsio/operate/tools.py @@ -2,10 +2,12 @@ import re from collections import defaultdict import pandas as pd +import numpy as np import pyarrow as pa import pyarrow.parquet as pq from Bio import SeqIO import ahocorasick +from pyopenms import FASTAFile from quantmsio.core.common import FEATURE_SCHEMA, IBAQ_SCHEMA, IBAQ_USECOLS, PSM_SCHEMA from quantmsio.core.sdrf import SDRFHandler from quantmsio.operate.query import Query, map_spectrum_mz @@ -112,7 +114,41 @@ def map_protein_for_tsv(path: str, fasta: str, output_path: str, map_parameter: with open(output_path, "w", encoding="utf8") as f: f.write(content) +def get_peptide_map(unique_peptides, fasta): + peptide_map = defaultdict(list) + automaton = ahocorasick.Automaton() + for sequence in unique_peptides: + automaton.add_word(sequence,sequence) + automaton.make_automaton() + + fasta_proteins = list() + FASTAFile().load(fasta, fasta_proteins) + for entry in fasta_proteins: + accession = entry.identifier.split("|")[1] + for match in automaton.iter(entry.sequence): + peptide = match[1] + if accession not in peptide_map[peptide]: + peptide_map[peptide].append(accession) + return peptide_map + +def map_peptide_to_protein(parquet_file: str, fasta: str, output_folder, label="feature"): + p = Query(parquet_file) + unique_peptides = p.get_unique_peptides() + peptide_map = get_peptide_map(unique_peptides, fasta) + pqwriter = None + filename = os.path.basename(parquet_file) + for table in p.iter_chunk(batch_size=2000000): + table["pg_accessions"] = table["sequence"].map(peptide_map) + table = table[table['pg_accessions'].apply(lambda x: len(x) > 0)] + table.loc[:,"unique"] = table['pg_accessions'].apply(lambda x: 0 if len(x) > 1 else 1).astype(np.int32) + if label == "feature": + parquet_table = pa.Table.from_pandas(table, schema=FEATURE_SCHEMA) + pqwriter = save_file(parquet_table, pqwriter, output_folder, filename) + else: + parquet_table = pa.Table.from_pandas(table, schema=IBAQ_SCHEMA) + pqwriter = save_file(parquet_table, pqwriter, output_folder, filename) + close_file(None, pqwriter) def get_modification_details(seq: str, mods_dict: dict, automaton: any, select_mods: list = None): if "(" not in seq: return (seq, []) diff --git a/quantmsio/quantmsioc.py b/quantmsio/quantmsioc.py index 635dbe6..38f1157 100644 --- a/quantmsio/quantmsioc.py +++ b/quantmsio/quantmsioc.py @@ -9,7 +9,7 @@ from quantmsio.commands.project_command import generate_pride_project_json from quantmsio.commands.feature_command import convert_feature_file from quantmsio.commands.psm_command import convert_psm_file, compare_set_of_psms -from quantmsio.commands.diann_command import diann_convert_to_parquet +from quantmsio.commands.diann_command import diann_convert_to_parquet, diann_pg_convert_to_parquet from quantmsio.commands.ae_command import convert_ibaq_absolute from quantmsio.commands.de_command import convert_msstats_differential from quantmsio.commands.attach_file_command import attach_file_to_json @@ -38,6 +38,7 @@ def cli(): cli.add_command(convert_psm_file) cli.add_command(compare_set_of_psms) cli.add_command(diann_convert_to_parquet) +cli.add_command(diann_pg_convert_to_parquet) cli.add_command(convert_ibaq_absolute) cli.add_command(convert_msstats_differential) cli.add_command(attach_file_to_json)