Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Arrow writers refactor #153

Merged
merged 14 commits into from
Oct 27, 2023
Merged
45 changes: 35 additions & 10 deletions src/nyx/arrow_output_stream.cpp
Original file line number Diff line number Diff line change
@@ -1,22 +1,20 @@
#ifdef USE_ARROW

#include "arrow_output_stream.h"

std::shared_ptr<ApacheArrowWriter> ArrowOutputStream::create_arrow_file(const std::string& arrow_file_type,
#ifdef USE_ARROW

std::shared_ptr<ApacheArrowWriter> ArrowOutputStream::create_arrow_file(const SaveOption& arrow_file_type,
const std::string& arrow_file_path,
const std::vector<std::string>& header) {

std::string arrow_file_type_upper = Nyxus::toupper(arrow_file_type);

if(arrow_file_path != "" && !fs::is_directory(arrow_file_path) && !(Nyxus::ends_with_substr(arrow_file_path, ".arrow") || Nyxus::ends_with_substr(arrow_file_path, ".feather") || Nyxus::ends_with_substr(arrow_file_path, ".parquet"))) {
throw std::invalid_argument("The arrow file path must end in \".arrow\"");
}

if (!(arrow_file_type_upper == "ARROW" || arrow_file_type_upper == "ARROWIPC" || arrow_file_type_upper == "PARQUET")) {
throw std::invalid_argument("The valid file types are ARROW, ARROWIPC, or PARQUET");
if (arrow_file_type != SaveOption::saveArrowIPC && arrow_file_type != SaveOption::saveParquet) {
throw std::invalid_argument("The valid save options are SaveOption::saveArrowIPC or SaveOption::saveParquet.");
}

std::string extension = (arrow_file_type_upper == "PARQUET") ? ".parquet" : ".arrow";
std::string extension = (arrow_file_type == SaveOption::saveParquet) ? ".parquet" : ".arrow";

if (arrow_file_path == "") {
arrow_file_path_ = "NyxusFeatures" + extension;
Expand All @@ -34,16 +32,43 @@ std::shared_ptr<ApacheArrowWriter> ArrowOutputStream::create_arrow_file(const st
}


std::shared_ptr<arrow::Table> ArrowOutputStream::get_arrow_table(const std::string& file_path, arrow::Status& table_status) {
std::shared_ptr<arrow::Table> ArrowOutputStream::get_arrow_table(const std::string& file_path) {

if (this->arrow_table_ != nullptr) return this->arrow_table_;

this->arrow_table_ = writer_->get_arrow_table(file_path, table_status);
this->arrow_table_ = writer_->get_arrow_table(file_path);

return this->arrow_table_;
}

std::string ArrowOutputStream::get_arrow_path() {
return arrow_file_path_;
}

#else

std::shared_ptr<ApacheArrowWriter> ArrowOutputStream::create_arrow_file(const SaveOption& arrow_file_type,
const std::string& arrow_file_path,
const std::vector<std::string>& header) {

std::cerr << "Apache Arrow functionality is not available. Please install Nyxus with Arrow enabled to use this functionality." << std::endl;

return nullptr;
}


std::shared_ptr<arrow::Table> ArrowOutputStream::get_arrow_table(const std::string& file_path) {

std::cerr << "Apache Arrow functionality is not available. Please install Nyxus with Arrow enabled to use this functionality." << std::endl;

return nullptr;
}

std::string ArrowOutputStream::get_arrow_path() {

std::cerr << "Apache Arrow functionality is not available. Please install Nyxus with Arrow enabled to use this functionality." << std::endl;

return "";
}

#endif
40 changes: 36 additions & 4 deletions src/nyx/arrow_output_stream.h
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
#pragma once

#ifdef USE_ARROW

#include <string>
#include <memory>

#include "output_writers.h"
#include "helpers/helpers.h"
#include "globals.h"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why we need this here.


#ifdef USE_ARROW
#include <arrow/table.h>

#if __has_include(<filesystem>)
Expand Down Expand Up @@ -36,10 +36,42 @@ class ArrowOutputStream {
std::shared_ptr<arrow::Table> arrow_table_ = nullptr;

public:
std::shared_ptr<ApacheArrowWriter> create_arrow_file(const std::string& arrow_file_type,
std::shared_ptr<ApacheArrowWriter> create_arrow_file(const SaveOption& arrow_file_type,
const std::string& arrow_file_path,
const std::vector<std::string>& header);
std::shared_ptr<arrow::Table> get_arrow_table(const std::string& file_path);
std::string get_arrow_path();
};

#else

// Replace arrow::Table with a dummy variable
namespace arrow {
using Table = bool;
};

/**
* @brief Class to write to Apache Arrow formats
*
* This class provides a place holder for the Arrow writer class when Nyxus is built without arrow.
*
*/
class ArrowOutputStream {

private:

std::string arrow_file_path_ = "";
std::shared_ptr<ApacheArrowWriter> writer_ = nullptr;
std::string arrow_output_type_ = "";
std::shared_ptr<arrow::Table> arrow_table_ = nullptr;

public:
std::shared_ptr<ApacheArrowWriter> create_arrow_file(const SaveOption& arrow_file_type,
const std::string& arrow_file_path,
const std::vector<std::string>& header);
std::shared_ptr<arrow::Table> get_arrow_table(const std::string& file_path, arrow::Status& table_status);
std::shared_ptr<arrow::Table> get_arrow_table(const std::string& file_path);
std::string get_arrow_path();
};


#endif
1 change: 1 addition & 0 deletions src/nyx/environment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -847,6 +847,7 @@ bool Environment::parse_cmdline(int argc, char **argv)
{
useCsv = false;
arrow_output_type = rawOutpTypeUC;
use_apache_writers = true;
}
else
{
Expand Down
11 changes: 4 additions & 7 deletions src/nyx/environment.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@
#include "cli_gabor_options.h"
#include "cli_nested_roi_options.h"

#ifdef USE_ARROW
#include "output_writers.h"
#include "arrow_output_stream.h"
#endif
#include "output_writers.h"
#include "arrow_output_stream.h"


#ifdef USE_GPU
#include <cuda_runtime.h>
Expand Down Expand Up @@ -117,13 +116,11 @@ class Environment: public BasicEnvironment

bool singleROI = false; // is set to 'true' parse_cmdline() if labels_dir==intensity_dir

#ifdef USE_ARROW

bool use_apache_writers = false;
std::string arrow_output_type = "";
ArrowOutputStream arrow_stream;
std::shared_ptr<ApacheArrowWriter> arrow_writer = nullptr;

#endif

std::string embedded_pixel_size = "";

Expand Down
12 changes: 6 additions & 6 deletions src/nyx/globals.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@ namespace py = pybind11;
#endif

namespace Nyxus
{
{
extern FeatureManager theFeatureMgr;
extern ImageLoader theImLoader;

enum class SaveOption {saveCSV, saveBuffer, saveArrowIPC, saveParquet};

bool scanFilePairParallel(const std::string& intens_fpath, const std::string& label_fpath, int num_fastloader_threads, int num_sensemaker_threads, int filepair_index, int tot_num_filepairs);
std::string getPureFname(const std::string& fpath);
int processDataset(const std::vector<std::string>& intensFiles, const std::vector<std::string>& labelFiles, int numFastloaderThreads, int numSensemakerThreads, int numReduceThreads, int min_online_roi_size, bool save2csv, bool arrow_output, const std::string& csvOutputDir);
int processDataset(const std::vector<std::string>& intensFiles, const std::vector<std::string>& labelFiles, int numFastloaderThreads, int numSensemakerThreads, int numReduceThreads, int min_online_roi_size, SaveOption saveOption, const std::string& outputDir);
bool gatherRoisMetrics(const std::string& intens_fpath, const std::string& label_fpath, int num_FL_threads);
bool processTrivialRois (const std::vector<int>& trivRoiLabels, const std::string& intens_fpath, const std::string& label_fpath, int num_FL_threads, size_t memory_limit);
bool processNontrivialRois (const std::vector<int>& nontrivRoiLabels, const std::string& intens_fpath, const std::string& label_fpath, int num_FL_threads);
Expand All @@ -46,7 +48,7 @@ namespace Nyxus
bool gatherRoisMetricsInMemory (const py::array_t<unsigned int, py::array::c_style | py::array::forcecast>& intens_image, const py::array_t<unsigned int, py::array::c_style | py::array::forcecast>& label_image, int start_idx);
bool processIntSegImagePairInMemory (const std::string& intens_fpath, const std::string& label_fpath, int filepair_index, const std::string& intens_name, const std::string& seg_name);
int processMontage(const py::array_t<unsigned int, py::array::c_style | py::array::forcecast>& intensFiles, const py::array_t<unsigned int, py::array::c_style | py::array::forcecast>& labelFiles, int numReduceThreads, const std::vector<std::string>& intensity_names,
const std::vector<std::string>& seg_names, std::string& error_message, bool arrow_output=false, const std::string& outputDir="");
const std::vector<std::string>& seg_names, std::string& error_message, SaveOption saveOption, const std::string& outputDir="");
bool scanTrivialRois (const std::vector<int>& batch_labels, const py::array_t<unsigned int, py::array::c_style | py::array::forcecast>& intens_images, const py::array_t<unsigned int, py::array::c_style | py::array::forcecast>& label_images, int start_idx);
bool processTrivialRoisInMemory (const std::vector<int>& trivRoiLabels, const py::array_t<unsigned int, py::array::c_style | py::array::forcecast>& intens_fpath, const py::array_t<unsigned int, py::array::c_style | py::array::forcecast>& label_fpath, int start_idx, size_t memory_limit);
#endif
Expand Down Expand Up @@ -127,6 +129,4 @@ namespace Nyxus
const NestedRoiOptions::Aggregations& aggr,
int verbosity_level);

} // namespace Nyxus


} // namespace Nyxus
25 changes: 16 additions & 9 deletions src/nyx/main_nyxus.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,25 +58,30 @@ int main (int argc, char** argv)
// Current time stamp #1
auto startTS = getTimeStr();
VERBOSLVL1(std::cout << "\n>>> STARTING >>> " << startTS << "\n";)


bool use_arrow = false;

#ifdef USE_ARROW
use_arrow = theEnvironment.arrow_output_type == "ARROW" || theEnvironment.arrow_output_type == "PARQUET";
#endif

// Process the image data
int min_online_roi_size = 0;

SaveOption saveOption = [](){
if (theEnvironment.use_apache_writers) {
if (Nyxus::toupper(theEnvironment.arrow_output_type) == "ARROW") {
return SaveOption::saveArrowIPC;
} else {
return SaveOption::saveParquet;
}
}
else if (theEnvironment.useCsv) {return SaveOption::saveCSV;}
else {return SaveOption::saveBuffer;}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in CLI mode, I don't this SaveOption::saveBuffer is supported. It should be either arrow or csv, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, both "ARROW" and "ARROWIPC" should set saveOption to saveArrowIPC. Or may be get rid of "ARROW" option all together and be more precise.

}();

errorCode = processDataset (
intensFiles,
labelFiles,
theEnvironment.n_loader_threads,
theEnvironment.n_pixel_scan_threads,
theEnvironment.n_reduce_threads,
min_online_roi_size,
use_arrow,
theEnvironment.useCsv,
saveOption,
theEnvironment.output_dir);

// Report feature extraction error, if any
Expand All @@ -93,6 +98,8 @@ int main (int argc, char** argv)
case 3: // Memory error
std::cout << std::endl << "Memory error" << std::endl;
break;
case 4:
std::cout << std::endl << "Apache Arrow functionality is not available. Please install Nyxus with Arrow enabled to use this functionality." << std::endl;
default: // Any other error
std::cout << std::endl << "Error #" << errorCode << std::endl;
break;
Expand Down
21 changes: 17 additions & 4 deletions src/nyx/output_writers.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#ifdef USE_ARROW
#include "output_writers.h"

std::shared_ptr<arrow::Table> ApacheArrowWriter::get_arrow_table(const std::string& file_path, arrow::Status& table_status) {
#ifdef USE_ARROW
std::shared_ptr<arrow::Table> ApacheArrowWriter::get_arrow_table(const std::string& file_path) {

if (table_ != nullptr) return table_;

Expand All @@ -21,7 +21,7 @@ std::shared_ptr<arrow::Table> ApacheArrowWriter::get_arrow_table(const std::stri

if (!status.ok()) {
// Handle read error
table_status = status;
std::cerr << "Error creating arrow table: " << status.ToString();
return nullptr;
}

Expand All @@ -32,7 +32,7 @@ std::shared_ptr<arrow::Table> ApacheArrowWriter::get_arrow_table(const std::stri

if (!status.ok()) {
// Handle read error
table_status = status;
std::cerr << "Error creating arrow table: " << status.ToString();
return nullptr;
}

Expand Down Expand Up @@ -412,5 +412,18 @@ std::shared_ptr<ApacheArrowWriter> WriterFactory::create_writer(const std::strin
}
}
}
#else

std::shared_ptr<arrow::Table> ApacheArrowWriter::get_arrow_table(const std::string& file_path) {
return nullptr;
}

arrow::Status ApacheArrowWriter::write (const std::vector<std::tuple<std::vector<std::string>, int, std::vector<double>>>& features) {
return arrow::Status();
}

arrow::Status ApacheArrowWriter::close () {
return arrow::Status();
}

#endif
Loading