Skip to content

Commit

Permalink
update: command
Browse files Browse the repository at this point in the history
  • Loading branch information
zprobot committed Nov 11, 2024
1 parent 5a0b0d1 commit 490ecc5
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 56 deletions.
3 changes: 3 additions & 0 deletions docs/tools.rst
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ Example:
* Optional parameter

.. code:: shell
--protein_file Protein file that meets specific requirements(protein.txt)
--partitions The field used for splitting files, multiple fields are separated by `,`
--chunksize Read batch size
--output_prefix_file Prefix of the parquet file needed to generate the file name
Expand All @@ -168,6 +170,7 @@ Example:

.. code:: shell
--protein_file Protein file that meets specific requirements
--partitions The field used for splitting files, multiple fields are separated by `,`
--output_prefix_file Prefix of the Json file needed to generate the file name
--duckdb_max_memory The maximum amount of memory allocated by the DuckDB engine (e.g 16GB)
--duckdb_threads The number of threads for the DuckDB engine (e.g 4)
Expand Down
38 changes: 28 additions & 10 deletions quantmsio/commands/diann_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@
help="Prefix of the Json file needed to generate the file name",
required=False,
)
@click.option(
"--partitions",
help="The field used for splitting files, multiple fields are separated by ,",
required=False,
)
@click.option(
"--duckdb_max_memory", help="The maximum amount of memory allocated by the DuckDB engine (e.g 4GB)", required=False
)
Expand All @@ -56,6 +61,7 @@ def diann_convert_to_parquet(
output_folder: str,
protein_file: str,
output_prefix_file: str,
partitions: str,
duckdb_max_memory: str,
duckdb_threads: int,
file_num: int,
Expand All @@ -66,6 +72,7 @@ def diann_convert_to_parquet(
mzml_info_folder: mzml info file folder
sdrf_path: sdrf file path
output_folder: Folder where the Json file will be generated
param partitions: The field used for splitting files, multiple fields are separated by ,
output_prefix_file: Prefix of the Json file needed to generate the file name
duckdb_max_memory: The maximum amount of memory allocated by the DuckDB engine (e.g 4GB)
duckdb_threads: The number of threads for the DuckDB engine (e.g 4)
Expand All @@ -79,20 +86,31 @@ def diann_convert_to_parquet(

if not output_prefix_file:
output_prefix_file = ""

feature_output_path = output_folder + "/" + create_uuid_filename(output_prefix_file, ".feature.parquet")
filename = create_uuid_filename(output_prefix_file, ".feature.parquet")
feature_output_path = output_folder + "/" + filename

dia_nn = DiaNNConvert(
diann_report=report_path,
sdrf_path=sdrf_path,
duckdb_max_memory=duckdb_max_memory,
duckdb_threads=duckdb_threads,
)

dia_nn.write_feature_to_file(
qvalue_threshold=qvalue_threshold,
mzml_info_folder=mzml_info_folder,
output_path=feature_output_path,
file_num=file_num,
protein_file=protein_file,
)
if not partitions:
dia_nn.write_feature_to_file(
qvalue_threshold=qvalue_threshold,
mzml_info_folder=mzml_info_folder,
output_path=feature_output_path,
file_num=file_num,
protein_file=protein_file,
)
else:
partitions = partitions.split(",")
dia_nn.write_features_to_file(
qvalue_threshold=qvalue_threshold,
mzml_info_folder=mzml_info_folder,
output_folder=output_folder,
filename=filename,
partitions=partitions,
file_num=file_num,
protein_file=protein_file,
)
36 changes: 31 additions & 5 deletions quantmsio/commands/maxquant_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def convert_maxquant_psm(

MQ = MaxQuant()
output_path = output_folder + "/" + create_uuid_filename(output_prefix_file, ".psm.parquet")
MQ.convert_psm_to_parquet(msms_path=msms_file, output_path=output_path, chunksize=chunksize)
MQ.write_psm_to_file(msms_path=msms_file, output_path=output_path, chunksize=chunksize)


@click.command(
Expand All @@ -71,6 +71,16 @@ def convert_maxquant_psm(
help="Folder where the parquet file will be generated",
required=True,
)
@click.option(
"--protein_file",
help="Protein file that meets specific requirements",
required=False,
)
@click.option(
"--partitions",
help="The field used for splitting files, multiple fields are separated by ,",
required=False,
)
@click.option(
"--chunksize",
help="Read batch size",
Expand All @@ -85,6 +95,8 @@ def convert_maxquant_feature(
evidence_file: str,
sdrf_file: str,
output_folder: str,
protein_file: str,
partitions: str,
chunksize: int,
output_prefix_file: str,
):
Expand All @@ -93,6 +105,7 @@ def convert_maxquant_feature(
:param evidence_file: the msms.txt file, this will be used to extract the peptide information
:param sdrf_file: the SDRF file needed to extract some of the metadata
:param output_folder: Folder where the Json file will be generated
:param partitions: The field used for splitting files, multiple fields are separated by ,
:param chunksize: Read batch size
:param output_prefix_file: Prefix of the Json file needed to generate the file name
"""
Expand All @@ -104,7 +117,20 @@ def convert_maxquant_feature(
output_prefix_file = ""

MQ = MaxQuant()
output_path = output_folder + "/" + create_uuid_filename(output_prefix_file, ".feature.parquet")
MQ.convert_feature_to_parquet(
evidence_path=evidence_file, sdrf_path=sdrf_file, output_path=output_path, chunksize=chunksize
)
filename = create_uuid_filename(output_prefix_file, ".feature.parquet")
output_path = output_folder + "/" + filename
if not partitions:
MQ.write_feature_to_file(
evidence_path=evidence_file, sdrf_path=sdrf_file, output_path=output_path, chunksize=chunksize, protein_file=protein_file
)
else:
partitions = partitions.split(",")
MQ.write_features_to_file(
evidence_path=evidence_file,
sdrf_path=sdrf_file,
output_folder = output_folder,
filename=filename,
partitions=partitions,
chunksize=chunksize,
protein_file=protein_file
)
31 changes: 24 additions & 7 deletions quantmsio/core/diann.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from pyopenms import AASequence
from pyopenms.Constants import PROTON_MASS_U
from quantmsio.operate.tools import get_ahocorasick
from quantmsio.utils.file_utils import extract_protein_list
from quantmsio.utils.file_utils import extract_protein_list, save_slice_file
from quantmsio.core.sdrf import SDRFHandler
from quantmsio.core.mztab import MzTab
from quantmsio.core.feature import Feature
Expand Down Expand Up @@ -149,9 +149,6 @@ def intergrate_msg(n):

report["peptidoform"] = report["peptidoform"].map(modifications_map)

# report["scan_reference_file_name"] = report["Precursor.Id"].map(best_ref_map)
# report["scan"] = None

yield report

def add_additional_msg(self, report: pd.DataFrame):
Expand Down Expand Up @@ -223,10 +220,9 @@ def generate_feature(
s = time.time()
self.add_additional_msg(report)
Feature.convert_to_parquet_format(report)
feature = Feature.transform_feature(report)
et = time.time() - s
logging.info("Time to generate psm and feature file {} seconds".format(et))
yield feature
yield report

def write_feature_to_file(
self,
Expand All @@ -239,10 +235,31 @@ def write_feature_to_file(
protein_list = extract_protein_list(protein_file) if protein_file else None
protein_str = "|".join(protein_list) if protein_list else None
pqwriter = None
for feature in self.generate_feature(qvalue_threshold, mzml_info_folder, file_num, protein_str):
for report in self.generate_feature(qvalue_threshold, mzml_info_folder, file_num, protein_str):
feature = Feature.transform_feature(report)
if not pqwriter:
pqwriter = pq.ParquetWriter(output_path, feature.schema)
pqwriter.write_table(feature)
if pqwriter:
pqwriter.close()
self.destroy_duckdb_database()

def write_features_to_file(
self,
qvalue_threshold: float,
mzml_info_folder: str,
output_folder: str,
filename: str,
partitions: list,
file_num:int = 50,
protein_file=None,
):
pqwriters = {}
protein_list = extract_protein_list(protein_file) if protein_file else None
protein_str = "|".join(protein_list) if protein_list else None
for report in self.generate_feature(qvalue_threshold, mzml_info_folder, file_num, protein_str):
for key, df in Feature.slice(report, partitions):
feature = Feature.transform_feature(df)
pqwriters = save_slice_file(feature, pqwriters, output_folder, key, filename)
for pqwriter in pqwriters.values():
pqwriter.close()
13 changes: 2 additions & 11 deletions quantmsio/core/feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import pyarrow as pa
import pyarrow.parquet as pq
from quantmsio.operate.tools import get_ahocorasick, get_protein_accession
from quantmsio.utils.file_utils import extract_protein_list
from quantmsio.utils.file_utils import extract_protein_list,save_slice_file
from quantmsio.core.mztab import MzTab
from quantmsio.core.psm import Psm
from quantmsio.core.sdrf import SDRFHandler
Expand Down Expand Up @@ -207,16 +207,7 @@ def write_features_to_file(
for key, feature in self.generate_slice_feature(
partitions, file_num, protein_str, duckdb_max_memory, duckdb_threads
):
folder = [output_folder] + [str(col) for col in key]
folder = os.path.join(*folder)
if not os.path.exists(folder):
os.makedirs(folder, exist_ok=True)
save_path = os.path.join(*[folder, filename])
if not os.path.exists(save_path):
pqwriter = pq.ParquetWriter(save_path, feature.schema)
pqwriters[key] = pqwriter
pqwriters[key].write_table(feature)

pqwriters = save_slice_file(feature, pqwriters, output_folder, key, filename)
for pqwriter in pqwriters.values():
pqwriter.close()

Expand Down
41 changes: 35 additions & 6 deletions quantmsio/core/maxquant.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from quantmsio.core.common import MAXQUANT_PSM_MAP, MAXQUANT_PSM_USECOLS, MAXQUANT_FEATURE_MAP, MAXQUANT_FEATURE_USECOLS
from quantmsio.core.feature import Feature
from quantmsio.core.psm import Psm
from quantmsio.utils.file_utils import extract_protein_list, save_slice_file

logging.basicConfig(format="%(asctime)s - %(message)s", level=logging.INFO)

Expand All @@ -40,7 +41,7 @@ class MaxQuant:
def __init__(self):
pass

def iter_batch(self, file_path: str, label: str = "feature", chunksize: int = 100000):
def iter_batch(self, file_path: str, label: str = "feature", chunksize: int = 100000, protein_str:str = None):
self.mods_map = self.get_mods_map(file_path)
self._automaton = get_ahocorasick(self.mods_map)
col_df = pd.read_csv(file_path, sep="\t", nrows=1)
Expand Down Expand Up @@ -74,6 +75,8 @@ def iter_batch(self, file_path: str, label: str = "feature", chunksize: int = 10
chunksize=chunksize,
):
df.rename(columns=use_map, inplace=True)
if protein_str:
df = df[df["mp_accessions"].str.contains(f"{protein_str}", na=False)]
df = self.main_operate(df)
yield df

Expand Down Expand Up @@ -278,7 +281,7 @@ def transform_feature(self, df: pd.DataFrame):
df.loc[:, "start_ion_mobility"] = None
df.loc[:, "stop_ion_mobility"] = None

def convert_psm_to_parquet(self, msms_path: str, output_path: str, chunksize: int = 1000000):
def write_psm_to_file(self, msms_path: str, output_path: str, chunksize: int = 1000000):
pqwriter = None
for df in self.iter_batch(msms_path, "psm", chunksize=chunksize):
self.transform_psm(df)
Expand All @@ -290,14 +293,17 @@ def convert_psm_to_parquet(self, msms_path: str, output_path: str, chunksize: in
if pqwriter:
pqwriter.close()

def convert_feature_to_parquet(
self, evidence_path: str, sdrf_path: str, output_path: str, chunksize: int = 1000000
):
def _init_sdrf(self, sdrf_path: str):
Sdrf = SDRFHandler(sdrf_path)
self.experiment_type = Sdrf.get_experiment_type_from_sdrf()
self._sample_map = Sdrf.get_sample_map_run()

def write_feature_to_file(
self, evidence_path: str, sdrf_path: str, output_path: str, chunksize: int = 1000000, protein_file=None,
):
self._init_sdrf(sdrf_path)
pqwriter = None
for df in self.iter_batch(evidence_path, chunksize=chunksize):
for df in self.iter_batch(evidence_path, chunksize=chunksize, protein_str=protein_file):
self.transform_feature(df)
Feature.convert_to_parquet_format(df)
parquet = Feature.transform_feature(df)
Expand All @@ -306,3 +312,26 @@ def convert_feature_to_parquet(
pqwriter.write_table(parquet)
if pqwriter:
pqwriter.close()

def write_features_to_file(
self,
evidence_path: str,
sdrf_path: str,
output_folder: str,
filename: str,
partitions: list,
chunksize: int = 1000000,
protein_file=None,
):
pqwriters = {}
protein_list = extract_protein_list(protein_file) if protein_file else None
protein_str = "|".join(protein_list) if protein_list else None
self._init_sdrf(sdrf_path)
for report in self.iter_batch(evidence_path, chunksize=chunksize, protein_str=protein_str):
self.transform_feature(report)
Feature.convert_to_parquet_format(report)
for key, df in Feature.slice(report, partitions):
feature = Feature.transform_feature(df)
pqwriters = save_slice_file(feature, pqwriters, output_folder, key, filename)
for pqwriter in pqwriters.values():
pqwriter.close()
20 changes: 3 additions & 17 deletions quantmsio/operate/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@
from quantmsio.operate.query import Query, map_spectrum_mz
from quantmsio.core.openms import OpenMSHandler
from quantmsio.utils.pride_utils import get_unanimous_name
from quantmsio.utils.file_utils import load_de_or_ae, read_large_parquet

from quantmsio.utils.file_utils import load_de_or_ae, save_slice_file, save_file

def init_save_info(parquet_path: str):
pqwriters = {}
Expand Down Expand Up @@ -69,24 +68,11 @@ def save_parquet_file(
if partitions and len(partitions) > 0:
for key, df in table.groupby(partitions):
parquet_table = pa.Table.from_pandas(df, schema=schema)
folder = [output_folder] + [str(col) for col in key]
folder = os.path.join(*folder)
if not os.path.exists(folder):
os.makedirs(folder, exist_ok=True)
save_path = os.path.join(*[folder, filename])
if not os.path.exists(save_path):
pqwriter = pq.ParquetWriter(save_path, parquet_table.schema)
pqwriters[key] = pqwriter
pqwriters[key].write_table(parquet_table)
pqwriters = save_slice_file(parquet_table, pqwriters, output_folder, key, filename)
return pqwriters, pqwriter_no_part
else:
parquet_table = pa.Table.from_pandas(table, schema=schema)
if not os.path.exists(output_folder):
os.makedirs(output_folder, exist_ok=True)
save_path = os.path.join(*[output_folder, filename])
if not pqwriter_no_part:
pqwriter_no_part = pq.ParquetWriter(save_path, parquet_table.schema)
pqwriter_no_part.write_table(parquet_table)
pqwriter_no_part = save_file(parquet_table, pqwriter_no_part, output_folder, filename)
return pqwriters, pqwriter_no_part


Expand Down
21 changes: 21 additions & 0 deletions quantmsio/utils/file_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,24 @@ def calculate_buffer_size(file_path: str) -> int:
fraction_of_memory = 0.4 # Adjust as needed

return min(int(total_memory * fraction_of_memory), max_buffer_size, file_size)

def save_slice_file(parquet_table, pqwriters, output_folder, partitions, filename):
folder = [output_folder] + [str(col) for col in partitions]
folder = os.path.join(*folder)
if not os.path.exists(folder):
os.makedirs(folder, exist_ok=True)
save_path = os.path.join(*[folder, filename])
if not os.path.exists(save_path):
pqwriter = pq.ParquetWriter(save_path, parquet_table.schema)
pqwriters[partitions] = pqwriter
pqwriters[partitions].write_table(parquet_table)
return pqwriters

def save_file(parquet_table, pqwriter, output_folder, filename):
if not os.path.exists(output_folder):
os.makedirs(output_folder, exist_ok=True)
save_path = os.path.join(*[output_folder, filename])
if not pqwriter:
pqwriter = pq.ParquetWriter(save_path, parquet_table.schema)
pqwriter.write_table(parquet_table)
return pqwriter

0 comments on commit 490ecc5

Please sign in to comment.