Skip to content

Commit

Permalink
update: diann
Browse files Browse the repository at this point in the history
  • Loading branch information
zprobot committed Oct 26, 2024
1 parent 16e6b35 commit 06428fc
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 172 deletions.
6 changes: 3 additions & 3 deletions quantmsio/core/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,13 @@
"PEP": "posterior_error_probability",
"Global.Q.Value": "global_qvalue",
"Global.PG.Q.Value": "pg_global_qvalue",
"Q.Value": "qvalue",
"PG.Q.Value": "pg_qvalue",
"Precursor.Normalised": "normalize_intensity",
"Precursor.Charge": "precursor_charge",
"Stripped.Sequence": "sequence",
"Modified.Sequence": "peptidoform",
"Genes": "gg_names",
"opt_global_spectrum_reference": "scan",
"Calculate.Precursor.Mz": "calculated_mz",
"exp_mass_to_charge": "observed_mz",
}
MAXQUANT_MAP = {
"Sequence": "sequence",
Expand Down
222 changes: 79 additions & 143 deletions quantmsio/core/diann.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import duckdb
import time
import logging
import re
import numpy as np
import pandas as pd
import os
Expand All @@ -10,83 +8,30 @@
from pathlib import Path
from pyopenms import AASequence
from pyopenms.Constants import PROTON_MASS_U
from quantmsio.core.project import create_uuid_filename
from quantmsio.operate.tools import get_ahocorasick
from quantmsio.utils.file_utils import extract_protein_list
from quantmsio.core.sdrf import SDRFHandler
from quantmsio.core.mztab import MzTab
from quantmsio.core.feature import Feature
from quantmsio.utils.pride_utils import get_peptidoform_proforma_version_in_mztab, generate_scan_number
from quantmsio.core.duckdb import DuckDB
from quantmsio.utils.pride_utils import generate_scan_number
from quantmsio.core.common import DIANN_MAP

MODIFICATION_PATTERN = re.compile(r"\((.*?)\)")


def find_modification(peptide):
"""
Identify the modification site based on the peptide containing modifications.
:param peptide: Sequences of peptides
:type peptide: str
:return: Modification sites
:rtype: str
Examples:
>>> find_modification("PEPM(UNIMOD:35)IDE")
'4-UNIMOD:35'
>>> find_modification("SM(UNIMOD:35)EWEIRDS(UNIMOD:21)EPTIDEK")
'2-UNIMOD:35,9-UNIMOD:21'
"""
peptide = str(peptide)
original_mods = MODIFICATION_PATTERN.findall(peptide)
peptide = MODIFICATION_PATTERN.sub(".", peptide)
position = [i for i, x in enumerate(peptide) if x == "."]
for j in range(1, len(position)):
position[j] -= j

for k in range(0, len(original_mods)):
original_mods[k] = str(position[k]) + "-" + original_mods[k].upper()

original_mods = ",".join(str(i) for i in original_mods) if len(original_mods) > 0 else "null"

return original_mods


class DiaNNConvert:
class DiaNNConvert(DuckDB):

def __init__(self, diann_report, sdrf_path, duckdb_max_memory="16GB", duckdb_threads=4):
self._sdrf_path = sdrf_path
self._diann_path = diann_report
self._modifications = SDRFHandler(sdrf_path).get_mods_dict()
self._duckdb_name = create_uuid_filename("report-duckdb", ".db")
self._duckdb = self.create_duckdb_from_diann_report(duckdb_max_memory, duckdb_threads)
super(DiaNNConvert, self).__init__(diann_report, duckdb_max_memory, duckdb_threads)
self.init_duckdb()
self._sdrf = SDRFHandler(sdrf_path)
self._mods_map = self._sdrf.get_mods_dict()
self._automaton = get_ahocorasick(self._mods_map)
self._sample_map = self._sdrf.get_sample_map_run()

def create_duckdb_from_diann_report(self, max_memory, worker_threads):
"""
This function creates a duckdb database from a diann report for fast performance queries. The database
is created from the tab delimited format of diann and can handle really large datasets.
:param report_path: The path to the diann report
:return: A duckdb database
"""
s = time.time()

database = duckdb.connect(self._duckdb_name)
database.execute("SET enable_progress_bar=true")
def init_duckdb(self):

if max_memory is not None:
database.execute("SET max_memory='{}'".format(max_memory))
if worker_threads is not None:
database.execute("SET worker_threads='{}'".format(worker_threads))
self._duckdb.execute("""CREATE INDEX idx_precursor_q ON report ("Precursor.Id", "Q.Value")""")
self._duckdb.execute("""CREATE INDEX idx_run ON report ("Run")""")

msg = database.execute("SELECT * FROM duckdb_settings() where name in ('worker_threads', 'max_memory')").df()
logging.info("duckdb uses {} threads.".format(str(msg["value"][0])))
logging.info("duckdb uses {} of memory.".format(str(msg["value"][1])))

database.execute("CREATE TABLE diann_report AS SELECT * FROM '{}'".format(self._diann_path))
database.execute("""CREATE INDEX idx_precursor_q ON diann_report ("Precursor.Id", "Q.Value")""")
database.execute("""CREATE INDEX idx_run ON diann_report ("Run")""")

et = time.time() - s
logging.info("Time to create duckdb database {} seconds".format(et))
return database

def get_report_from_database(self, runs: list) -> pd.DataFrame:
"""
Expand All @@ -99,7 +44,7 @@ def get_report_from_database(self, runs: list) -> pd.DataFrame:
"""
select "File.Name", "Run", "Protein.Group", "Protein.Ids", "Genes", "RT", "Predicted.RT", "RT.Start", "RT.Stop", "Precursor.Id", "Q.Value", "Global.Q.Value", "PEP",
"PG.Q.Value", "Global.PG.Q.Value", "Modified.Sequence", "Stripped.Sequence", "Precursor.Charge", "Precursor.Quantity", "Precursor.Normalised"
from diann_report
from report
where Run IN {}
""".format(
tuple(runs)
Expand All @@ -113,7 +58,7 @@ def get_report_from_database(self, runs: list) -> pd.DataFrame:
def get_masses_and_modifications_map(self):
database = self._duckdb.query(
"""
select DISTINCT "Modified.Sequence" from diann_report
select DISTINCT "Modified.Sequence" from report
"""
)
report = database.df()
Expand All @@ -131,7 +76,7 @@ def get_peptide_map_from_database(self):
FROM (
SELECT "Precursor.Id", "Q.Value","Run", ROW_NUMBER()
OVER (PARTITION BY "Precursor.Id" ORDER BY "Q.Value" ASC) AS row_num
FROM diann_report
FROM report
) AS subquery
WHERE row_num = 1;
"""
Expand All @@ -157,21 +102,21 @@ def intergrate_msg(n):
usecols=["Retention_Time", "SpectrumID", "Exp_Mass_To_Charge"],
)
group = report[report["Run"] == n].copy()
group.sort_values(by="RT.Start", inplace=True)
group.sort_values(by="rt_start", inplace=True)
target.rename(
columns={
"Retention_Time": "RT.Start",
"SpectrumID": "opt_global_spectrum_reference",
"Exp_Mass_To_Charge": "exp_mass_to_charge",
"Retention_Time": "rt_start",
"SpectrumID": "scan",
"Exp_Mass_To_Charge": "observed_mz",
},
inplace=True,
)
target["RT.Start"] = target["RT.Start"] / 60
res = pd.merge_asof(group, target, on="RT.Start", direction="nearest")
target["rt_start"] = target["rt_start"] / 60
res = pd.merge_asof(group, target, on="rt_start", direction="nearest")
return res

# query duckdb
best_ref_map = self.get_peptide_map_from_database()
#best_ref_map = self.get_peptide_map_from_database()
masses_map, modifications_map = self.get_masses_and_modifications_map()

info_list = [
Expand All @@ -182,77 +127,96 @@ def intergrate_msg(n):
info_list = [info_list[i : i + file_num] for i in range(0, len(info_list), file_num)]
for refs in info_list:
report = self.get_report_from_database(refs)
report.rename(columns=DIANN_MAP, inplace=True)
if protein_str:
report = report[report["Protein.Ids"].str.contains(f"{protein_str}", na=False)]
report = report[report["pg_accessions"].str.contains(f"{protein_str}", na=False)]
# restrict
report = report[report["Q.Value"] < qvalue_threshold]
report = report[report["qvalue"] < qvalue_threshold]
usecols = report.columns.to_list() + [
"opt_global_spectrum_reference",
"exp_mass_to_charge",
"scan",
"observed_mz",
]
with concurrent.futures.ThreadPoolExecutor(100) as executor:
results = executor.map(intergrate_msg, refs)
report = np.vstack([result.values for result in results])
report = pd.DataFrame(report, columns=usecols)

# cal value and mod
mass_vector = report["Modified.Sequence"].map(masses_map)
report["Calculate.Precursor.Mz"] = (
mass_vector + (PROTON_MASS_U * report["Precursor.Charge"].values)
) / report["Precursor.Charge"].values
report["modifications"] = report["Modified.Sequence"].apply(find_modification)
report["Modified.Sequence"] = report["Modified.Sequence"].map(modifications_map)
# pep
report["scan_reference_file_name"] = report["Precursor.Id"].map(best_ref_map)
mass_vector = report["peptidoform"].map(masses_map)
report["calculated_mz"] = (
mass_vector + (PROTON_MASS_U * report["precursor_charge"].values)
) / report["precursor_charge"].values

report["peptidoform"] = report["peptidoform"].map(modifications_map)

# report["scan_reference_file_name"] = report["Precursor.Id"].map(best_ref_map)
# report["scan"] = None
report.rename(columns=DIANN_MAP, inplace=True)
# add extra msg
report = self.add_additional_msg(report)

yield report

def add_additional_msg(self, report: pd.DataFrame) -> pd.DataFrame:
def add_additional_msg(self, report: pd.DataFrame):
"""
Perform some transformations in the report dataframe to help with the generation of the psm and feature files.
:param report: The report dataframe
:return: The report dataframe with the transformations
"""
select_mods = list(self._mods_map.keys())
report["reference_file_name"] = report["reference_file_name"].apply(lambda x: x.split(".")[0])

report.loc[:, "is_decoy"] = "0"
report[["peptidoform", "modifications"]] = report[["peptidoform"]].apply(
lambda row: MzTab.generate_modifications_details(
row["peptidoform"], self._mods_map, self._automaton, select_mods),
axis = 1,
result_type="expand"
)
report.loc[:, "channel"] = "LFQ"
report.loc[:, "unique"] = report["pg_accessions"].apply(lambda x: "0" if ";" in str(x) else "1")

report["peptidoform"] = report[["sequence", "modifications"]].apply(
lambda row: get_peptidoform_proforma_version_in_mztab(
row["sequence"], row["modifications"], self._modifications
),
axis=1,
report.loc[:, "intensities"] = report[["reference_file_name","channel","intensity"]].apply(
lambda rows: [{
"sample_accession": self._sample_map[rows["reference_file_name"]+"-"+rows["channel"]],
"channel": rows["channel"],
"intensity": rows["intensity"]
}],
axis=1
)
report.loc[:, "is_decoy"] = "0"
report.loc[:, "unique"] = report["pg_accessions"].apply(lambda x: "0" if ";" in str(x) else "1")
report["scan"] = report["scan"].apply(generate_scan_number)
report["mp_accessions"] = report["mp_accessions"].str.split(";")
report["pg_accessions"] = report["pg_accessions"].str.split(";")
report.loc[:, "anchor_protein"] = report["pg_accessions"].str[0]
report.loc[:, "gg_names"] = report["gg_names"].str.split(",")
report.loc[:, "additional_intensities"] = report["Precursor.Normalised"].apply(
lambda v: [{"name": "normalized intensity", "value": np.float32(v)}]

report.loc[:, "additional_intensities"] = report[["reference_file_name","channel","normalize_intensity"]].apply(
lambda rows: [{
"sample_accession": self._sample_map[rows["reference_file_name"]+"-"+rows["channel"]],
"channel": rows["channel"],
"additional_intensity": [{
"name": "normalize_intensity",
"value": rows["normalize_intensity"]
}]
}],
axis=1
)
report.loc[:, "additional_scores"] = report[["Q.Value", "PG.Q.Value"]].apply(
report.loc[:, "additional_scores"] = report[["qvalue", "pg_qvalue"]].apply(
lambda row: [
{"name": "qvalue", "value": row["Q.Value"]},
{"name": "pg_qvalue", "value": row["PG.Q.Value"]},
{"name": "qvalue", "value": row["qvalue"]},
{"name": "pg_qvalue", "value": row["pg_qvalue"]},
],
axis=1,
)
report.loc[:, "modification_details"] = None
report.loc[:, "scan_reference_file_name"] = None
report.loc[:, "scan"] = None
report.loc[:, "cv_params"] = None
report.loc[:, "gg_accessions"] = None
report.loc[:, "best_id_score"] = None
return report
report.loc[:, "ion_mobility"] = None
report.loc[:, "start_ion_mobility"] = None
report.loc[:, "stop_ion_mobility"] = None

def generate_feature(
self, qvalue_threshold: float, mzml_info_folder: str, file_num: int = 50, protein_str: str = None
):
for report in self.main_report_df(qvalue_threshold, mzml_info_folder, file_num, protein_str):
s = time.time()
report = self.merge_sdrf_to_feature(report)
Feature.convert_to_parquet_format(report, self._modifications)
self.add_additional_msg(report)
Feature.convert_to_parquet_format(report)
feature = Feature.transform_feature(report)
et = time.time() - s
logging.info("Time to generate psm and feature file {} seconds".format(et))
Expand All @@ -277,31 +241,3 @@ def write_feature_to_file(
pqwriter.close()
self.destroy_duckdb_database()

def destroy_duckdb_database(self):
"""
This function destroys the duckdb database, closing connection and removeing it from the filesystem.
"""
if self._duckdb_name and self._duckdb:
self._duckdb.close()
os.remove(self._duckdb_name)
self._duckdb_name = None
self._duckdb = None

def merge_sdrf_to_feature(self, report):
sdrf = Feature.transform_sdrf(self._sdrf_path)
report = pd.merge(
report,
sdrf,
left_on=["reference_file_name"],
right_on=["reference"],
how="left",
)
report.drop(
[
"reference",
"label",
],
axis=1,
inplace=True,
)
return report
17 changes: 13 additions & 4 deletions quantmsio/core/feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,24 +172,33 @@ def transform_feature(df):
def write_feature_to_file(
self,
output_path,
chunksize=1000000,
file_num=10,
protein_file=None,
duckdb_max_memory="16GB",
duckdb_threads=4
):
protein_list = extract_protein_list(protein_file) if protein_file else None
protein_str = "|".join(protein_list) if protein_list else None
pqwriter = None
for feature in self.generate_feature(chunksize, protein_str):
for feature in self.generate_feature(file_num, protein_str, duckdb_max_memory, duckdb_threads):
if not pqwriter:
pqwriter = pq.ParquetWriter(output_path, feature.schema)
pqwriter.write_table(feature)
if pqwriter:
pqwriter.close()

def write_features_to_file(self, output_folder, filename, partitions, chunksize=1000000, protein_file=None):
def write_features_to_file(self,
output_folder,
filename,
partitions,
file_num=10,
protein_file=None,
duckdb_max_memory="16GB",
duckdb_threads=4):
pqwriters = {}
protein_list = extract_protein_list(protein_file) if protein_file else None
protein_str = "|".join(protein_list) if protein_list else None
for key, feature in self.generate_slice_feature(partitions, chunksize, protein_str):
for key, feature in self.generate_slice_feature(partitions, file_num, protein_str, duckdb_max_memory, duckdb_threads):
folder = [output_folder] + [str(col) for col in key]
folder = os.path.join(*folder)
if not os.path.exists(folder):
Expand Down
7 changes: 6 additions & 1 deletion quantmsio/core/format.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,12 @@
pa.struct([
('sample_accession', pa.string()),
('channel', pa.string()),
('intensity', pa.float32())
('additional_intensity', pa.list_(
pa.struct([
("name", pa.string()),
("value", pa.float32())
])
))
])
),
metadata={
Expand Down
Loading

0 comments on commit 06428fc

Please sign in to comment.