Skip to content

Commit

Permalink
implement job for text file splitting
Browse files Browse the repository at this point in the history
  • Loading branch information
christophmluscher committed Nov 14, 2024
1 parent dca8f09 commit 0d6deb8
Showing 1 changed file with 88 additions and 1 deletion.
89 changes: 88 additions & 1 deletion text/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,18 @@
"TailJob",
"SetDifferenceJob",
"WriteToTextFileJob",
"SplitTextFileJob",
]

import logging
import os
import shutil
import subprocess
from collections.abc import Iterable
import tempfile
from typing import List, Union

from sisyphus import Job, Task, Path, global_settings as gs
from sisyphus import Job, Task, Path, global_settings as gs, toolkit as tk
from sisyphus.delayed_ops import DelayedBase

import i6_core.util as util
Expand Down Expand Up @@ -305,3 +310,85 @@ def run(self):
f.write(f"{line}\n")
else:
raise NotImplementedError


class SplitTextFileJob(Job):
def __init__(
self,
text_file: tk.Path,
num_lines_per_split: int,
num_text_file_lines: int,
zip_output: bool = True,
):
"""
Job splits a text file into several smaller files.
https://stackoverflow.com/a/45761990/2062195
:param text_file: Input text file to be processed.
:param num_lines_per_split: Number of lines per split.
:param num_text_file_lines: Number of lines in the input text file.
:param zip_output: compress the output files.
"""
self.in_text_file = text_file
self.num_lines_per_split = num_lines_per_split
self.num_text_file_lines = num_text_file_lines
self.zip_output = zip_output

self.num_output_files = num_text_file_lines // num_lines_per_split + int(
bool(num_text_file_lines % num_lines_per_split)
)

self.out_split_text_files = {
k: self.output_path(f'split.{k:04}.{"txt.gz" if zip_output else "txt"}')
for k in range(1, self.num_output_files + 1)
}

self.run_rqmt = {"cpu": 1, "mem": 12.0, "time": 6.0}

def tasks(self):
yield Task("run", rqmt=self.run_rqmt)

def run(self):
with tempfile.TemporaryDirectory() as tmp_dir:
if self.in_text_file.get_path().endswith(".gz"):
logging.info("Un-compressing file")
text_file = f"{tmp_dir}/input_file.txt"
with open(text_file, "wt") as f_in:
uncompress_cmd = ["gzip", "-cdk", self.in_text_file.get_path()]
subprocess.run(uncompress_cmd, check=True, stdout=f_in)
else:
text_file = self.in_text_file.get_path()

logging.info("Split lines")
split_cmd = [
"split",
"-l",
str(self.num_lines_per_split),
"--suffix-length=4",
"--numeric-suffixes=1",
"--additional-suffix=.txt",
text_file,
f"{tmp_dir}/split.",
]
subprocess.run(split_cmd, check=True)

for file_id in range(1, self.num_output_files + 1):
file_path = f"split.{file_id:04}.txt"
assert os.path.isfile(file_path) and os.path.getsize(file_path) > 0

if self.in_text_file.get_path().endswith(".gz"):
os.remove(text_file)

if self.zip_output:
logging.info("Compressing file")
compress_cmd = ["gzip"] + [
f"{tmp_dir}/split.{file_id:04}.txt" for file_id in range(1, self.num_output_files + 1)
]
subprocess.run(compress_cmd, check=True)

for file_id in range(1, self.num_output_files + 1):
shutil.move(
f"{tmp_dir}/split.{file_id:04}.txt.gz" if self.zip_output else f"split.{file_id:04}.txt",
self.out_split_text_files[file_id].get_path(),
)

0 comments on commit 0d6deb8

Please sign in to comment.