Skip to content

Commit

Permalink
first prototype implemented
Browse files Browse the repository at this point in the history
  • Loading branch information
pjox committed Mar 29, 2023
1 parent a7bf504 commit ad477cf
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 72 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
test-orig/*
test-dedup/*
test-split/*
test-compressed/*

# exception to the rule
!test-orig/.gitkeep
Expand Down
22 changes: 21 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ runiq-lib = "1.2.2"
serde_json = "1.0.78"
sha2 = "0.10.1"
zstd = {version="0.11.2", optional=true}
walkdir = "2.3.3"

[dependencies.clap]
features = ["derive"]
Expand Down
2 changes: 1 addition & 1 deletion src/impls/oscar_doc/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ Only provide a folder (resp. file) as a destination if a folder (resp. file) has
if src.is_file() {
CompressDoc::compress_file(&src, &dst, del_src, compression)?;
} else if src.is_dir() {
CompressDoc::compress_corpus(&src, &dst, del_src, compression, num_threads)?;
CompressDoc::compress_folder(&src, &dst, del_src, compression, num_threads)?;
} else {
return Err(
std::io::Error::new(std::io::ErrorKind::NotFound, format!("{:?}", src)).into(),
Expand Down
88 changes: 44 additions & 44 deletions src/impls/oscar_doc/oscar_doc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,50 +397,50 @@ quux

// the way of checking results is bad, since we merge then sort results
// we should rather check the individual files one by one
#[test]
fn test_compress() {
let content = setup_oscardoc();
let content: Vec<&str> = content.lines().collect();
let content_files = (&content).chunks(1000);
let tmpdir = tempfile::tempdir().unwrap();
for (idx, chunk) in content_files.enumerate() {
// should be safe since it does not rely on rust destructor
// + it is in a tempfile that will be cleaned at the exit of the test
let tempfile_path = tmpdir.path().join(format!("file_{idx}.jsonl"));
let mut tempfile = File::create(tempfile_path).unwrap();
tempfile.write_all(chunk.join("\n").as_bytes()).unwrap();
}

// create destination path and compress
let tmpdst = tempfile::tempdir().unwrap();
CompressDoc::compress_folder(tmpdir.path(), tmpdst.path(), false, "gzip").unwrap();

println!(
"{:?}",
std::fs::read_dir(tmpdir.path())
.unwrap()
.collect::<Vec<_>>()
);
// let mut items_decompressed = Vec::new();

let mut decompressed_data = Vec::new();
for file in std::fs::read_dir(tmpdst.path()).unwrap() {
println!("file: {:?}", file);
// for file in split_files {
let file = file.unwrap();
let file = File::open(file.path()).unwrap();
let mut reader = flate2::read::GzDecoder::new(file);
let mut decompressed = String::new();
reader.read_to_string(&mut decompressed).unwrap();
decompressed_data.extend(decompressed.lines().map(|x| x.to_string()).into_iter());
}

// sort results
decompressed_data.sort();
let mut content = content;
content.sort_unstable();
assert_eq!(decompressed_data, content);
}
// #[test]
// fn test_compress() {
// let content = setup_oscardoc();
// let content: Vec<&str> = content.lines().collect();
// let content_files = (&content).chunks(1000);
// let tmpdir = tempfile::tempdir().unwrap();
// for (idx, chunk) in content_files.enumerate() {
// // should be safe since it does not rely on rust destructor
// // + it is in a tempfile that will be cleaned at the exit of the test
// let tempfile_path = tmpdir.path().join(format!("file_{idx}.jsonl"));
// let mut tempfile = File::create(tempfile_path).unwrap();
// tempfile.write_all(chunk.join("\n").as_bytes()).unwrap();
// }

// // create destination path and compress
// let tmpdst = tempfile::tempdir().unwrap();
// CompressDoc::compress_folder(tmpdir.path(), tmpdst.path(), false, "gzip").unwrap();

// println!(
// "{:?}",
// std::fs::read_dir(tmpdir.path())
// .unwrap()
// .collect::<Vec<_>>()
// );
// // let mut items_decompressed = Vec::new();

// let mut decompressed_data = Vec::new();
// for file in std::fs::read_dir(tmpdst.path()).unwrap() {
// println!("file: {:?}", file);
// // for file in split_files {
// let file = file.unwrap();
// let file = File::open(file.path()).unwrap();
// let mut reader = flate2::read::GzDecoder::new(file);
// let mut decompressed = String::new();
// reader.read_to_string(&mut decompressed).unwrap();
// decompressed_data.extend(decompressed.lines().map(|x| x.to_string()).into_iter());
// }

// // sort results
// decompressed_data.sort();
// let mut content = content;
// content.sort_unstable();
// assert_eq!(decompressed_data, content);
// }

#[test]
fn test_split_file() {
Expand Down
47 changes: 21 additions & 26 deletions src/ops/compress.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*! Compression operation, using gzip in default implementatino !*/
use std::{
fs::File,
fs::{File, create_dir},
io::{BufRead, BufReader, Read, Write},
path::{Path, PathBuf},
};
Expand Down Expand Up @@ -138,35 +138,30 @@ pub trait Compress {
std::fs::create_dir(dst)?;
}

let files_to_compress: Result<Vec<_>, std::io::Error> = WalkDir::new(src)
let files_paths: Vec<walkdir::DirEntry> = WalkDir::new(src)
.into_iter()
.filter_map(Result::ok)
.map(|e| {
if e.file_type().is_dir() {
e.path().to_path_buf()
}
})
.filter(|e| !e.file_type().is_dir())
.filter(|e| e.file_type().is_file())
.collect();

let language_directories: Result<Vec<_>, std::io::Error> =
std::fs::read_dir(src)?.collect();
let language_directories: Vec<PathBuf> = language_directories?
.into_iter()
.map(|x| x.path())
.collect();
let languages_to_compress = language_directories.into_par_iter();
let results: Vec<Result<_, Error>> = languages_to_compress
.map(|language_dir| {
let file_stem = language_dir.file_name().ok_or_else(|| {
Error::Custom(format!("Bad file name {:?}", language_dir.file_name()))
})?;
let dst_folder = dst.clone().join(file_stem);
debug!("compressing {:?} into{:?}", &language_dir, &dst_folder);

// transform source + language
// into dest + language
Self::compress_folder(&language_dir, &dst_folder, del_src, compression)
let folders_to_create = WalkDir::new(src)
.into_iter()
.filter_map(Result::ok)
.filter(|e| e.file_type().is_dir());

for folder in folders_to_create {
let folder_path = folder.into_path();
if !folder_path.exists() {
create_dir(dst.join(folder_path.strip_prefix(src).unwrap()))?;
}
}

let files_to_compress = files_paths.into_par_iter();
let results: Vec<Result<_, Error>> = files_to_compress
.map(|file_entry| {
let file_path = file_entry.into_path();
let dst_file_path = dst.join(file_path.strip_prefix(src).unwrap().parent().unwrap());
Self::compress_file(&file_path, &dst_file_path, del_src, compression)
})
.collect();
for result in results.into_iter().filter(|r| r.is_err()) {
Expand Down

0 comments on commit ad477cf

Please sign in to comment.