Skip to content

Commit

Permalink
feat: compress first chunk with LZ4
Browse files Browse the repository at this point in the history
  • Loading branch information
rinsuki committed May 6, 2024
1 parent 7b3910b commit 767ad5d
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 29 deletions.
22 changes: 22 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ axum = "0.7.2"
clap = { version = "4.4.11", features = ["derive"] }
crc32fast = "1.3.2"
flate2 = "1.0.28"
lz4 = "1.24.0"
lz4_flex = "0.11.1"
once_cell = "1.19.0"
prost = "0.12.3"
prost-types = "0.12.3"
rayon = "1.10.0"
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/bradenaw/juniper v0.15.1
github.com/dgraph-io/ristretto v0.1.1
github.com/klauspost/compress v1.17.4
github.com/pierrec/lz4/v4 v4.1.21
github.com/winfsp/cgofuse v1.5.1-0.20230130140708-f87f5db493b5
golang.org/x/text v0.14.0
google.golang.org/protobuf v1.32.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfU
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/klauspost/compress v1.17.4 h1:Ej5ixsIri7BrIjBkRZLTo6ghwrEtHFk7ijlczPW4fZ4=
github.com/klauspost/compress v1.17.4/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM=
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand Down
76 changes: 55 additions & 21 deletions marmounter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"archive/zip"
"bufio"
"encoding/binary"
"encoding/hex"
"fmt"
"io"
"io/ioutil"
Expand All @@ -22,6 +23,7 @@ import (
"github.com/bradenaw/juniper/xsync"
"github.com/dgraph-io/ristretto"
"github.com/klauspost/compress/zstd"
"github.com/pierrec/lz4/v4"
pb "github.com/rinsuki/mayakashi/proto"
"github.com/winfsp/cgofuse/fuse"
"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -253,6 +255,15 @@ func (fs *MayakashiFS) ParseFile(file string) error {
return nil
}

if file == "showhashes" {
for _, f := range fs.Files {
if f.MarEntry != nil {
fmt.Printf("%s\t%s\n", hex.EncodeToString(f.MarEntry.Info.OriginalSha256), f.MarEntry.Info.Path)
}
}
os.Exit(0)
}

if shouldBreak {
break
}
Expand Down Expand Up @@ -904,7 +915,7 @@ func (fs *MayakashiFS) readInternalFromMarEntry(path string, buff []byte, offset

pool := GetFilePoolFromPath(marFileName)

if targetChunk.CompressedMethod == pb.CompressedMethod_ZSTANDARD {
if targetChunk.CompressedMethod != pb.CompressedMethod_PASSTHROUGH {
// println("zstd")
cacheKey := fmt.Sprintf("%s#%d#%d", marFileName, datStart, chunkNo)
cachedData, ok := fs.ChunkCache.Get(cacheKey)
Expand All @@ -925,16 +936,9 @@ func (fs *MayakashiFS) readInternalFromMarEntry(path string, buff []byte, offset
fs.SlowReadLog.Write([]byte(path + "\n"))
}

decoder, err := zstd.NewReader(nil, zstd.WithDecoderConcurrency(0))
if err != nil {
println("failed to read", err)
return -fuse.EIO
}

decoded, err = decoder.DecodeAll(compressedBytes, make([]byte, 0, int(targetChunk.OriginalLength)))
if err != nil {
println("failed to decode", err)
return -fuse.EIO
res := fs.readChunk(targetChunk, &compressedBytes, &decoded)
if res != 0 {
return res
}

fs.ChunkCache.Set(cacheKey, &ChunkCache{
Expand All @@ -955,23 +959,53 @@ func (fs *MayakashiFS) readInternalFromMarEntry(path string, buff []byte, offset
// println("ok")

return readed
} else if targetChunk.CompressedMethod == pb.CompressedMethod_PASSTHROUGH {
// println("passthrough", path)
remainsLength := int(targetChunk.OriginalLength) - int(offset-chunkStart)
if len(buff) > remainsLength {
fmt.Println("!!!OVERLOAD!!!", len(buff), remainsLength)
buff = buff[:remainsLength]
}
readed, err := pool.ReadAt(buff, datStart+(offset-chunkStart))
}
// passthrough
// println("passthrough", path)
remainsLength := int(targetChunk.OriginalLength) - int(offset-chunkStart)
if len(buff) > remainsLength {
fmt.Println("!!!OVERLOAD!!!", len(buff), remainsLength)
buff = buff[:remainsLength]
}
readed, err := pool.ReadAt(buff, datStart+(offset-chunkStart))
if err != nil {
fmt.Println("failed to read from passthrough", err)
return -fuse.EIO
}
return readed
}

func (fs *MayakashiFS) readChunk(targetChunk *pb.ChunkInfo, compressedBytes *[]byte, decoded *[]byte) int {
if targetChunk.CompressedMethod == pb.CompressedMethod_ZSTANDARD {
decoder, err := zstd.NewReader(nil, zstd.WithDecoderConcurrency(0))
if err != nil {
fmt.Println("failed to read from passthrough", err)
println("failed to read", err)
return -fuse.EIO
}
return readed

*decoded, err = decoder.DecodeAll(*compressedBytes, make([]byte, 0, int(targetChunk.OriginalLength)))
if err != nil {
println("failed to decode", err)
return -fuse.EIO
}
} else if targetChunk.CompressedMethod == pb.CompressedMethod_LZ4 {
*decoded = make([]byte, targetChunk.OriginalLength)
decoded_size, err := lz4.UncompressBlock(*compressedBytes, *decoded)
if err != nil {
println("failed to uncompress lz4 block", err)
return -fuse.EIO
}
if uint32(decoded_size) != targetChunk.OriginalLength {
println("invalid decoded size", decoded_size, targetChunk.OriginalLength)
return -fuse.EIO
}
return 0
} else {
println("unknown compression method", targetChunk.CompressedMethod)
return -fuse.EIO
}

return 0
}

func (fs *MayakashiFS) Mkdir(path string, mode uint32) int {
Expand Down
1 change: 1 addition & 0 deletions proto/mayakashi.proto
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import "google/protobuf/timestamp.proto";
enum CompressedMethod {
PASSTHROUGH = 0;
ZSTANDARD = 1;
LZ4 = 2;
}

message FileInfo {
Expand Down
43 changes: 35 additions & 8 deletions src/cmd/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,22 @@ struct Chunk {
// using_dictionary: bool,
}

static RAYON_LOCK: Mutex<()> = Mutex::new(());

fn compress_file(input_data: &[u8]) -> Vec<Chunk> {
// 小さいファイルはサクッと読みたさそうなので適当にlz4で圧縮する
if input_data.len() <= CHUNK_SIZE {
let compressed_with_lz4 = lz4::block::compress(input_data, Some(lz4::block::CompressionMode::HIGHCOMPRESSION(12)), false).unwrap();
if input_data.len() > compressed_with_lz4.len() {
return vec![Chunk {
start: 0,
original_size: input_data.len(),
compressed: compressed_with_lz4,
compressed_method: CompressedMethod::Lz4,
// using_dictionary: false,
}];
}
}
// 入力サイズが 8MB 以下の時はチャンク毎圧縮をしない (十分に小さいためシーク時の遅さを気にする必要がない…ことにする)
if input_data.len() <= 8 * 1024 * 1024 {
// input_data を Zstandard で圧縮したもの
Expand Down Expand Up @@ -100,15 +115,22 @@ fn compress_file(input_data: &[u8]) -> Vec<Chunk> {
sources.push((i, src));
};

let lock = RAYON_LOCK.lock();

println!("start");
let chunks = sources
.par_iter()
.map(|(i, src)| {
let compressed = {
let mut buf = Vec::<u8>::with_capacity(CHUNK_SIZE * 2);
let mut encoder = zstd::Encoder::new(&mut buf, 22).unwrap();
encoder.write_all(src).unwrap();
encoder.finish().unwrap();
buf
let should_use_lz4 = *i == 0;
let compressed = match should_use_lz4 {
true => lz4::block::compress(src, Some(lz4::block::CompressionMode::HIGHCOMPRESSION(12)), false).unwrap(),
false => {
let mut buf = Vec::<u8>::with_capacity(CHUNK_SIZE * 2);
let mut encoder = zstd::Encoder::new(&mut buf, 22).unwrap();
encoder.write_all(src).unwrap();
encoder.finish().unwrap();
buf
}
};

let is_compressed = compressed.len() < (src.len() / 4 * 3);
Expand All @@ -119,7 +141,10 @@ fn compress_file(input_data: &[u8]) -> Vec<Chunk> {
start: *i,
original_size: src.len(),
compressed,
compressed_method: CompressedMethod::Zstandard,
compressed_method: match should_use_lz4 {
true => CompressedMethod::Lz4,
false => CompressedMethod::Zstandard
},
// using_dictionary: false,
}
} else {
Expand All @@ -135,13 +160,15 @@ fn compress_file(input_data: &[u8]) -> Vec<Chunk> {
})
.collect();

_ = lock;
println!("end");
return chunks;
}


pub fn main(args: Args) {
let (mut files, directories) = walk_dir(&args.input);
files.sort_by_key(|f| std::cmp::Reverse(f.size));
files.sort_by_key(|f| f.path.to_str().unwrap().to_string());
// println!("Files: {:#?}", files);

let files_count: usize = files.len();
Expand Down

0 comments on commit 767ad5d

Please sign in to comment.