Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Vectorized chunk writes #23

Draft
wants to merge 32 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
5f2795e
Add benchmark to test different chunk/shard/compression/storage confi…
aliddell Nov 20, 2024
1cf276e
Add ShardBuffer class to keep track of ready-to-write chunks.
aliddell Nov 21, 2024
48fe1a6
Increase number of runs/config in benchmark.
aliddell Nov 21, 2024
1d8c605
Add ChunkIndexBuffer to ZarrV2ArrayWriter.
aliddell Nov 21, 2024
ee1dac4
Merge remote-tracking branch 'upstream/main' into vector-writes
aliddell Nov 27, 2024
f0a97e8
wip: Vectorized file write (Windows).
aliddell Nov 27, 2024
6aadf65
Changes for Linux
aliddell Nov 27, 2024
97e93c1
Vectorized file write on OSX
aliddell Dec 2, 2024
d70ca8e
Test vectorized file write with multiple writes to the same file.
aliddell Dec 2, 2024
1f95b05
Minor edits
aliddell Dec 2, 2024
d5e4b37
Remove ArrayWriter::version_()
aliddell Dec 2, 2024
f0bf009
Do-nothing compress_and_flush_() method.
aliddell Dec 2, 2024
7559cbf
Compress right into flush (wip: Zarr V2 only).
aliddell Dec 3, 2024
d0bc1d3
wip
aliddell Dec 4, 2024
5717867
Use a vector of spans to avoid copies.
aliddell Dec 4, 2024
c3c1e1c
Merge branch 'vector-writes' into vectorized-chunk-writes
aliddell Dec 5, 2024
0f71dd0
Remove dead code.
aliddell Dec 5, 2024
ed89a98
Remove benchmark.
aliddell Dec 5, 2024
ee842cb
Merge branch 'vector-writes' into vectorized-chunk-writes
aliddell Dec 5, 2024
dfc3228
Merge remote-tracking branch 'upstream/main' into vectorized-chunk-wr…
aliddell Dec 6, 2024
dbed6e7
Update and test shard writer.
aliddell Dec 10, 2024
a3e2aca
Implement and test make_dirs.
aliddell Dec 10, 2024
5d6ce8d
Rename SinkCreator methods to be more explicit.
aliddell Dec 10, 2024
7d7d399
Make `ArrayWriter::make_data_sinks_()` a pure virtual method.
aliddell Dec 10, 2024
bc4ff3b
Add missing EXPECT error message to make-dirs.cpp.
aliddell Dec 11, 2024
97ede7c
Remove `ArrayWriter::flush_impl_()`
aliddell Dec 11, 2024
72508ce
Everything except ZarrV3 compressed to filesystem working again.
aliddell Dec 11, 2024
1fc703f
We did it.
aliddell Dec 13, 2024
99a553f
Fix ragged internal dim size check on OSX.
aliddell Dec 13, 2024
f1bb9e9
Merge remote-tracking branch 'upstream/main' into vectorized-chunk-wr…
aliddell Dec 13, 2024
b6fd05d
Respond to PR comments.
aliddell Dec 18, 2024
1c26a5c
Merge remote-tracking branch 'upstream/main' into vectorized-chunk-wr…
aliddell Dec 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions src/streaming/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ add_library(${tgt}
zarrv2.array.writer.cpp
zarrv3.array.writer.hh
zarrv3.array.writer.cpp
vectorized.file.writer.hh
vectorized.file.writer.cpp
shard.writer.hh
shard.writer.cpp
platform.hh
platform.cpp
)

target_include_directories(${tgt}
Expand Down
165 changes: 37 additions & 128 deletions src/streaming/array.writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ zarr::downsample(const ArrayWriterConfig& config,
return true;
}

/// Writer
zarr::ArrayWriter::ArrayWriter(const ArrayWriterConfig& config,
std::shared_ptr<ThreadPool> thread_pool)
: ArrayWriter(std::move(config), thread_pool, nullptr)
Expand Down Expand Up @@ -138,84 +137,21 @@ zarr::ArrayWriter::is_s3_array_() const
return config_.bucket_name.has_value();
}

bool
zarr::ArrayWriter::make_data_sinks_()
{
std::string data_root;
std::function<size_t(const ZarrDimension&)> parts_along_dimension;
switch (version_()) {
case ZarrVersion_2:
parts_along_dimension = chunks_along_dimension;
data_root = config_.store_path + "/" +
std::to_string(config_.level_of_detail) + "/" +
std::to_string(append_chunk_index_);
break;
case ZarrVersion_3:
parts_along_dimension = shards_along_dimension;
data_root = config_.store_path + "/data/root/" +
std::to_string(config_.level_of_detail) + "/c" +
std::to_string(append_chunk_index_);
break;
default:
LOG_ERROR("Unsupported Zarr version");
return false;
}

SinkCreator creator(thread_pool_, s3_connection_pool_);

if (is_s3_array_()) {
if (!creator.make_data_sinks(*config_.bucket_name,
data_root,
config_.dimensions.get(),
parts_along_dimension,
data_sinks_)) {
LOG_ERROR("Failed to create data sinks in ",
data_root,
" for bucket ",
*config_.bucket_name);
return false;
}
} else if (!creator.make_data_sinks(data_root,
config_.dimensions.get(),
parts_along_dimension,
data_sinks_)) {
LOG_ERROR("Failed to create data sinks in ", data_root);
return false;
}

return true;
}

bool
zarr::ArrayWriter::make_metadata_sink_()
{
if (metadata_sink_) {
return true;
}

std::string metadata_path;
switch (version_()) {
case ZarrVersion_2:
metadata_path = config_.store_path + "/" +
std::to_string(config_.level_of_detail) +
"/.zarray";
break;
case ZarrVersion_3:
metadata_path = config_.store_path + "/meta/root/" +
std::to_string(config_.level_of_detail) +
".array.json";
break;
default:
LOG_ERROR("Unsupported Zarr version");
return false;
}
const auto metadata_path = metadata_path_();

if (is_s3_array_()) {
SinkCreator creator(thread_pool_, s3_connection_pool_);
aliddell marked this conversation as resolved.
Show resolved Hide resolved
metadata_sink_ =
creator.make_sink(*config_.bucket_name, metadata_path);
creator.make_s3_sink(*config_.bucket_name, metadata_path);
} else {
metadata_sink_ = zarr::SinkCreator::make_sink(metadata_path);
metadata_sink_ = zarr::SinkCreator::make_file_sink(metadata_path);
}

if (!metadata_sink_) {
Expand Down Expand Up @@ -336,60 +272,47 @@ zarr::ArrayWriter::should_flush_() const
return frames_written_ % frames_before_flush == 0;
}

void
zarr::ArrayWriter::compress_buffers_()
bool
zarr::ArrayWriter::compress_chunk_buffer_(size_t chunk_index)
{
if (!config_.compression_params.has_value()) {
return;
return true;
}

LOG_DEBUG("Compressing");
EXPECT(chunk_index < chunk_buffers_.size(),
"Chunk index out of bounds: ",
chunk_index);

BloscCompressionParams params = config_.compression_params.value();
LOG_DEBUG("Compressing chunk ", chunk_index);

const auto params = *config_.compression_params;
const auto bytes_per_px = bytes_of_type(config_.dtype);

std::scoped_lock lock(buffers_mutex_);
std::latch latch(chunk_buffers_.size());
for (auto& chunk : chunk_buffers_) {
EXPECT(thread_pool_->push_job(
[&params, buf = &chunk, bytes_per_px, &latch](
std::string& err) -> bool {
bool success = false;
const size_t bytes_of_chunk = buf->size();

try {
const auto tmp_size =
bytes_of_chunk + BLOSC_MAX_OVERHEAD;
ChunkBuffer tmp(tmp_size);
const auto nb =
blosc_compress_ctx(params.clevel,
params.shuffle,
bytes_per_px,
bytes_of_chunk,
buf->data(),
tmp.data(),
tmp_size,
params.codec_id.c_str(),
0 /* blocksize - 0:automatic */,
1);

tmp.resize(nb);
buf->swap(tmp);

success = true;
} catch (const std::exception& exc) {
err = "Failed to compress chunk: " +
std::string(exc.what());
}
latch.count_down();

return success;
}),
"Failed to push to job queue");
auto& chunk = chunk_buffers_[chunk_index];
const auto bytes_of_chunk = chunk.size();

const auto tmp_size = bytes_of_chunk + BLOSC_MAX_OVERHEAD;
ChunkBuffer tmp(tmp_size);
const auto nb = blosc_compress_ctx(params.clevel,
params.shuffle,
bytes_per_px,
bytes_of_chunk,
chunk.data(),
tmp.data(),
tmp_size,
params.codec_id.c_str(),
0 /* blocksize - 0:automatic */,
1);

if (nb <= 0) {
LOG_ERROR("Failed to compress chunk ", chunk_index);
return false;
}

// wait for all threads to finish
latch.wait();
tmp.resize(nb);
chunk.swap(tmp);

return true;
}

void
Expand All @@ -400,15 +323,12 @@ zarr::ArrayWriter::flush_()
}

// compress buffers and write out
compress_buffers_();
CHECK(flush_impl_());
compress_and_flush_();

const auto should_rollover = should_rollover_();
if (should_rollover) {
rollover_();
}

if (should_rollover || is_finalizing_) {
rollover_();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Compared to the old functionality, rollover_() would now be called if should_rollover==false but is_finalizing==true, whereas it wouldn't have before. I just want to make sure that is intended.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. I previously didn't want to call rollover_() when finalizing because it would uselessly increment a counter, but the alternative was to add another check on is_finalizing_ to call close_files_() -- but only when we had a ragged append dimension. This is simpler, and the increment doesn't matter.

CHECK(write_array_metadata_());
}

Expand All @@ -419,17 +339,6 @@ zarr::ArrayWriter::flush_()
bytes_to_flush_ = 0;
}

void
zarr::ArrayWriter::close_sinks_()
{
for (auto i = 0; i < data_sinks_.size(); ++i) {
EXPECT(finalize_sink(std::move(data_sinks_[i])),
"Failed to finalize sink ",
i);
}
data_sinks_.clear();
}

void
zarr::ArrayWriter::rollover_()
{
Expand Down
11 changes: 6 additions & 5 deletions src/streaming/array.writer.hh
Original file line number Diff line number Diff line change
Expand Up @@ -78,27 +78,28 @@ class ArrayWriter

std::shared_ptr<S3ConnectionPool> s3_connection_pool_;

virtual ZarrVersion version_() const = 0;
virtual std::string data_root_() const = 0;
virtual std::string metadata_path_() const = 0;

bool is_s3_array_() const;

[[nodiscard]] bool make_data_sinks_();
[[nodiscard]] virtual bool make_data_sinks_() = 0;
[[nodiscard]] bool make_metadata_sink_();
void make_buffers_() noexcept;

bool should_flush_() const;
virtual bool should_rollover_() const = 0;

size_t write_frame_to_chunks_(std::span<const std::byte> data);
void compress_buffers_();
bool compress_chunk_buffer_(size_t chunk_index);

virtual void compress_and_flush_() = 0;
void flush_();
[[nodiscard]] virtual bool flush_impl_() = 0;
void rollover_();

[[nodiscard]] virtual bool write_array_metadata_() = 0;

void close_sinks_();
virtual void close_sinks_() = 0;

friend bool finalize_array(std::unique_ptr<ArrayWriter>&& writer);
};
Expand Down
Loading
Loading