Skip to content

Commit

Permalink
update: slice
Browse files Browse the repository at this point in the history
  • Loading branch information
zprobot committed Oct 16, 2024
1 parent 08013b4 commit 9f990ce
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 9 deletions.
18 changes: 14 additions & 4 deletions quantmsio/commands/feature_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@
help="Folder where the Json file will be generated",
required=True,
)
@click.option(
"--partitions",
help="The field used for splitting files, multiple fields are separated by ,",
required=False,
)
@click.option(
"--output_prefix_file",
help="Prefix of the Json file needed to generate the file name",
Expand All @@ -47,6 +52,7 @@ def convert_feature_file(
chunksize: int,
protein_file: str,
output_folder: str,
partitions: str,
output_prefix_file: str,
):
"""
Expand All @@ -56,15 +62,19 @@ def convert_feature_file(
:param mztab_file: the mzTab file, this will be used to extract the protein
:param chunksize: Read batch size
:param output_folder: Folder where the Json file will be generated
:param partitions: The field used for splitting files, multiple fields are separated by ,
:param output_prefix_file: Prefix of the Json file needed to generate the file name
"""

if sdrf_file is None or msstats_file is None or mztab_file is None or output_folder is None:
raise click.UsageError("Please provide all the required parameters")

feature_manager = Feature(mzTab_path=mztab_file, sdrf_path=sdrf_file, msstats_in_path=msstats_file)
if not output_prefix_file:
output_prefix_file = ""
output_path = output_folder + "/" + create_uuid_filename(output_prefix_file, ".feature.parquet")

feature_manager.write_feature_to_file(output_path=output_path, chunksize=chunksize, protein_file=protein_file)
filename = create_uuid_filename(output_prefix_file, ".feature.parquet")
output_path = output_folder + "/" + filename
if not partitions:
feature_manager.write_feature_to_file(output_path=output_path, chunksize=chunksize, protein_file=protein_file)
else:
partitions = partitions.split(',')
feature_manager.write_features_to_file(output_folder=output_folder, filename=filename, partitions=partitions, chunksize=chunksize, protein_file=protein_file)
49 changes: 47 additions & 2 deletions quantmsio/core/feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,10 +271,34 @@ def generate_feature(self, chunksize=1000000, protein_str=None):
for msstats in self.transform_msstats_in(chunksize, protein_str):
msstats = self.merge_msstats_and_sdrf(msstats)
msstats = self.merge_msstats_and_psm(msstats, map_dict)
self.transform_feature(msstats)
self.add_additional_msg(msstats)
self.convert_to_parquet_format(msstats, self._modifications)
feature = self.transform_feature(msstats)
yield feature

@staticmethod
def slice(df, partitions):
cols = df.columns
if not isinstance(partitions, list):
raise Exception(f"{partitions} is not a list")
if len(partitions) == 0:
raise Exception(f"{partitions} is empty")
for partion in partitions:
if partion not in cols:
raise Exception(f"{partion} does not exist")
for key, df in df.groupby(partitions):
yield key, df

def generate_slice_feature(self, partitions, chunksize=1000000, protein_str=None):
map_dict = self.extract_psm_msg(chunksize, protein_str)
for msstats in self.transform_msstats_in(chunksize, protein_str):
msstats = self.merge_msstats_and_sdrf(msstats)
msstats = self.merge_msstats_and_psm(msstats, map_dict)
self.add_additional_msg(msstats)
self.convert_to_parquet_format(msstats, self._modifications)
for key, df in self.slice(msstats, partitions):
feature = self.transform_feature(df)
yield key, feature

@staticmethod
def transform_feature(df):
Expand All @@ -296,7 +320,28 @@ def write_feature_to_file(
if pqwriter:
pqwriter.close()

def transform_feature(self, msstats):
def write_features_to_file(
self, output_folder, filename, partitions, chunksize=1000000, protein_file=None
):
pqwriters = {}
protein_list = extract_protein_list(protein_file) if protein_file else None
protein_str = "|".join(protein_list) if protein_list else None
for key, feature in self.generate_slice_feature(partitions, chunksize, protein_str):
folder = [output_folder] + [str(col) for col in key]
folder = os.path.join(*folder)
if not os.path.exists(folder):
os.makedirs(folder, exist_ok=True)
save_path = os.path.join(*[folder, filename])
if not os.path.exists(save_path):
pqwriter = pq.ParquetWriter(save_path, feature.schema)
pqwriters[key] = pqwriter
pqwriters[key].write_table(feature)

for pqwriter in pqwriters.values():
pqwriter.close()


def add_additional_msg(self, msstats):
msstats.loc[:, "protein_global_qvalue"] = msstats["pg_accessions"].map(self._protein_global_qvalue_map)
msstats.loc[:, "peptidoform"] = msstats[["modifications", "sequence"]].apply(
lambda row: get_peptidoform_proforma_version_in_mztab(
Expand Down
3 changes: 0 additions & 3 deletions quantmsio/operate/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,6 @@ def slice_parquet_file(df, partitions, output_folder, label):
schema = FEATURE_SCHEMA
else:
schema = PSM_SCHEMA
for partion in partitions:
if partion not in cols:
raise Exception(f"{partion} does not exist")
for key, df in df.groupby(partitions):
parquet_table = pa.Table.from_pandas(df, schema=schema)
folder = [output_folder] + [str(col) for col in key]
Expand Down

0 comments on commit 9f990ce

Please sign in to comment.