From 9f990ced96d039c3d1b529ae00e80daf433c411c Mon Sep 17 00:00:00 2001 From: zprobot <1727697083@qq.com> Date: Wed, 16 Oct 2024 17:01:00 +0800 Subject: [PATCH] update: slice --- quantmsio/commands/feature_command.py | 18 +++++++--- quantmsio/core/feature.py | 49 +++++++++++++++++++++++++-- quantmsio/operate/tools.py | 3 -- 3 files changed, 61 insertions(+), 9 deletions(-) diff --git a/quantmsio/commands/feature_command.py b/quantmsio/commands/feature_command.py index 1d51d9d2..e9401bae 100644 --- a/quantmsio/commands/feature_command.py +++ b/quantmsio/commands/feature_command.py @@ -35,6 +35,11 @@ help="Folder where the Json file will be generated", required=True, ) +@click.option( + "--partitions", + help="The field used for splitting files, multiple fields are separated by ,", + required=False, +) @click.option( "--output_prefix_file", help="Prefix of the Json file needed to generate the file name", @@ -47,6 +52,7 @@ def convert_feature_file( chunksize: int, protein_file: str, output_folder: str, + partitions: str, output_prefix_file: str, ): """ @@ -56,15 +62,19 @@ def convert_feature_file( :param mztab_file: the mzTab file, this will be used to extract the protein :param chunksize: Read batch size :param output_folder: Folder where the Json file will be generated + :param partitions: The field used for splitting files, multiple fields are separated by , :param output_prefix_file: Prefix of the Json file needed to generate the file name """ if sdrf_file is None or msstats_file is None or mztab_file is None or output_folder is None: raise click.UsageError("Please provide all the required parameters") - feature_manager = Feature(mzTab_path=mztab_file, sdrf_path=sdrf_file, msstats_in_path=msstats_file) if not output_prefix_file: output_prefix_file = "" - output_path = output_folder + "/" + create_uuid_filename(output_prefix_file, ".feature.parquet") - - feature_manager.write_feature_to_file(output_path=output_path, chunksize=chunksize, protein_file=protein_file) + filename = create_uuid_filename(output_prefix_file, ".feature.parquet") + output_path = output_folder + "/" + filename + if not partitions: + feature_manager.write_feature_to_file(output_path=output_path, chunksize=chunksize, protein_file=protein_file) + else: + partitions = partitions.split(',') + feature_manager.write_features_to_file(output_folder=output_folder, filename=filename, partitions=partitions, chunksize=chunksize, protein_file=protein_file) \ No newline at end of file diff --git a/quantmsio/core/feature.py b/quantmsio/core/feature.py index 0c6c11bc..5d74f1db 100644 --- a/quantmsio/core/feature.py +++ b/quantmsio/core/feature.py @@ -271,10 +271,34 @@ def generate_feature(self, chunksize=1000000, protein_str=None): for msstats in self.transform_msstats_in(chunksize, protein_str): msstats = self.merge_msstats_and_sdrf(msstats) msstats = self.merge_msstats_and_psm(msstats, map_dict) - self.transform_feature(msstats) + self.add_additional_msg(msstats) self.convert_to_parquet_format(msstats, self._modifications) feature = self.transform_feature(msstats) yield feature + + @staticmethod + def slice(df, partitions): + cols = df.columns + if not isinstance(partitions, list): + raise Exception(f"{partitions} is not a list") + if len(partitions) == 0: + raise Exception(f"{partitions} is empty") + for partion in partitions: + if partion not in cols: + raise Exception(f"{partion} does not exist") + for key, df in df.groupby(partitions): + yield key, df + + def generate_slice_feature(self, partitions, chunksize=1000000, protein_str=None): + map_dict = self.extract_psm_msg(chunksize, protein_str) + for msstats in self.transform_msstats_in(chunksize, protein_str): + msstats = self.merge_msstats_and_sdrf(msstats) + msstats = self.merge_msstats_and_psm(msstats, map_dict) + self.add_additional_msg(msstats) + self.convert_to_parquet_format(msstats, self._modifications) + for key, df in self.slice(msstats, partitions): + feature = self.transform_feature(df) + yield key, feature @staticmethod def transform_feature(df): @@ -296,7 +320,28 @@ def write_feature_to_file( if pqwriter: pqwriter.close() - def transform_feature(self, msstats): + def write_features_to_file( + self, output_folder, filename, partitions, chunksize=1000000, protein_file=None + ): + pqwriters = {} + protein_list = extract_protein_list(protein_file) if protein_file else None + protein_str = "|".join(protein_list) if protein_list else None + for key, feature in self.generate_slice_feature(partitions, chunksize, protein_str): + folder = [output_folder] + [str(col) for col in key] + folder = os.path.join(*folder) + if not os.path.exists(folder): + os.makedirs(folder, exist_ok=True) + save_path = os.path.join(*[folder, filename]) + if not os.path.exists(save_path): + pqwriter = pq.ParquetWriter(save_path, feature.schema) + pqwriters[key] = pqwriter + pqwriters[key].write_table(feature) + + for pqwriter in pqwriters.values(): + pqwriter.close() + + + def add_additional_msg(self, msstats): msstats.loc[:, "protein_global_qvalue"] = msstats["pg_accessions"].map(self._protein_global_qvalue_map) msstats.loc[:, "peptidoform"] = msstats[["modifications", "sequence"]].apply( lambda row: get_peptidoform_proforma_version_in_mztab( diff --git a/quantmsio/operate/tools.py b/quantmsio/operate/tools.py index 28b2c334..63b794fa 100644 --- a/quantmsio/operate/tools.py +++ b/quantmsio/operate/tools.py @@ -71,9 +71,6 @@ def slice_parquet_file(df, partitions, output_folder, label): schema = FEATURE_SCHEMA else: schema = PSM_SCHEMA - for partion in partitions: - if partion not in cols: - raise Exception(f"{partion} does not exist") for key, df in df.groupby(partitions): parquet_table = pa.Table.from_pandas(df, schema=schema) folder = [output_folder] + [str(col) for col in key]