From 0d6deb81b487cc4175d0bb3ee409d5cb27684d4f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20M=2E=20L=C3=BCscher?= Date: Thu, 14 Nov 2024 16:01:23 +0100 Subject: [PATCH] implement job for text file splitting --- text/processing.py | 89 +++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 88 insertions(+), 1 deletion(-) diff --git a/text/processing.py b/text/processing.py index 8e90c93d..5b84a2a2 100644 --- a/text/processing.py +++ b/text/processing.py @@ -5,13 +5,18 @@ "TailJob", "SetDifferenceJob", "WriteToTextFileJob", + "SplitTextFileJob", ] +import logging import os +import shutil +import subprocess from collections.abc import Iterable +import tempfile from typing import List, Union -from sisyphus import Job, Task, Path, global_settings as gs +from sisyphus import Job, Task, Path, global_settings as gs, toolkit as tk from sisyphus.delayed_ops import DelayedBase import i6_core.util as util @@ -305,3 +310,85 @@ def run(self): f.write(f"{line}\n") else: raise NotImplementedError + + +class SplitTextFileJob(Job): + def __init__( + self, + text_file: tk.Path, + num_lines_per_split: int, + num_text_file_lines: int, + zip_output: bool = True, + ): + """ + Job splits a text file into several smaller files. + + https://stackoverflow.com/a/45761990/2062195 + + :param text_file: Input text file to be processed. + :param num_lines_per_split: Number of lines per split. + :param num_text_file_lines: Number of lines in the input text file. + :param zip_output: compress the output files. + """ + self.in_text_file = text_file + self.num_lines_per_split = num_lines_per_split + self.num_text_file_lines = num_text_file_lines + self.zip_output = zip_output + + self.num_output_files = num_text_file_lines // num_lines_per_split + int( + bool(num_text_file_lines % num_lines_per_split) + ) + + self.out_split_text_files = { + k: self.output_path(f'split.{k:04}.{"txt.gz" if zip_output else "txt"}') + for k in range(1, self.num_output_files + 1) + } + + self.run_rqmt = {"cpu": 1, "mem": 12.0, "time": 6.0} + + def tasks(self): + yield Task("run", rqmt=self.run_rqmt) + + def run(self): + with tempfile.TemporaryDirectory() as tmp_dir: + if self.in_text_file.get_path().endswith(".gz"): + logging.info("Un-compressing file") + text_file = f"{tmp_dir}/input_file.txt" + with open(text_file, "wt") as f_in: + uncompress_cmd = ["gzip", "-cdk", self.in_text_file.get_path()] + subprocess.run(uncompress_cmd, check=True, stdout=f_in) + else: + text_file = self.in_text_file.get_path() + + logging.info("Split lines") + split_cmd = [ + "split", + "-l", + str(self.num_lines_per_split), + "--suffix-length=4", + "--numeric-suffixes=1", + "--additional-suffix=.txt", + text_file, + f"{tmp_dir}/split.", + ] + subprocess.run(split_cmd, check=True) + + for file_id in range(1, self.num_output_files + 1): + file_path = f"split.{file_id:04}.txt" + assert os.path.isfile(file_path) and os.path.getsize(file_path) > 0 + + if self.in_text_file.get_path().endswith(".gz"): + os.remove(text_file) + + if self.zip_output: + logging.info("Compressing file") + compress_cmd = ["gzip"] + [ + f"{tmp_dir}/split.{file_id:04}.txt" for file_id in range(1, self.num_output_files + 1) + ] + subprocess.run(compress_cmd, check=True) + + for file_id in range(1, self.num_output_files + 1): + shutil.move( + f"{tmp_dir}/split.{file_id:04}.txt.gz" if self.zip_output else f"split.{file_id:04}.txt", + self.out_split_text_files[file_id].get_path(), + )