Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Arrow writers refactor #153

Merged
merged 14 commits into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 35 additions & 10 deletions src/nyx/arrow_output_stream.cpp
Original file line number Diff line number Diff line change
@@ -1,22 +1,20 @@
#ifdef USE_ARROW

#include "arrow_output_stream.h"

std::shared_ptr<ApacheArrowWriter> ArrowOutputStream::create_arrow_file(const std::string& arrow_file_type,
#ifdef USE_ARROW

std::shared_ptr<ApacheArrowWriter> ArrowOutputStream::create_arrow_file(const Nyxus::SaveOption& arrow_file_type,
const std::string& arrow_file_path,
const std::vector<std::string>& header) {

std::string arrow_file_type_upper = Nyxus::toupper(arrow_file_type);

if(arrow_file_path != "" && !fs::is_directory(arrow_file_path) && !(Nyxus::ends_with_substr(arrow_file_path, ".arrow") || Nyxus::ends_with_substr(arrow_file_path, ".feather") || Nyxus::ends_with_substr(arrow_file_path, ".parquet"))) {
throw std::invalid_argument("The arrow file path must end in \".arrow\"");
}

if (!(arrow_file_type_upper == "ARROW" || arrow_file_type_upper == "ARROWIPC" || arrow_file_type_upper == "PARQUET")) {
throw std::invalid_argument("The valid file types are ARROW, ARROWIPC, or PARQUET");
if (arrow_file_type != Nyxus::SaveOption::saveArrowIPC && arrow_file_type != Nyxus::SaveOption::saveParquet) {
throw std::invalid_argument("The valid save options are Nyxus::SaveOption::saveArrowIPC or Nyxus::SaveOption::saveParquet.");
}

std::string extension = (arrow_file_type_upper == "PARQUET") ? ".parquet" : ".arrow";
std::string extension = (arrow_file_type == Nyxus::SaveOption::saveParquet) ? ".parquet" : ".arrow";

if (arrow_file_path == "") {
arrow_file_path_ = "NyxusFeatures" + extension;
Expand All @@ -34,16 +32,43 @@ std::shared_ptr<ApacheArrowWriter> ArrowOutputStream::create_arrow_file(const st
}


std::shared_ptr<arrow::Table> ArrowOutputStream::get_arrow_table(const std::string& file_path, arrow::Status& table_status) {
std::shared_ptr<arrow::Table> ArrowOutputStream::get_arrow_table(const std::string& file_path) {

if (this->arrow_table_ != nullptr) return this->arrow_table_;

this->arrow_table_ = writer_->get_arrow_table(file_path, table_status);
this->arrow_table_ = writer_->get_arrow_table(file_path);

return this->arrow_table_;
}

std::string ArrowOutputStream::get_arrow_path() {
return arrow_file_path_;
}

#else

std::shared_ptr<ApacheArrowWriter> ArrowOutputStream::create_arrow_file(const Nyxus::SaveOption& arrow_file_type,
const std::string& arrow_file_path,
const std::vector<std::string>& header) {

std::cerr << "Apache Arrow functionality is not available. Please install Nyxus with Arrow enabled to use this functionality." << std::endl;

return nullptr;
}


std::shared_ptr<arrow::Table> ArrowOutputStream::get_arrow_table(const std::string& file_path) {

std::cerr << "Apache Arrow functionality is not available. Please install Nyxus with Arrow enabled to use this functionality." << std::endl;

return nullptr;
}

std::string ArrowOutputStream::get_arrow_path() {

std::cerr << "Apache Arrow functionality is not available. Please install Nyxus with Arrow enabled to use this functionality." << std::endl;

return "";
}

#endif
40 changes: 36 additions & 4 deletions src/nyx/arrow_output_stream.h
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
#pragma once

#ifdef USE_ARROW

#include <string>
#include <memory>

#include "output_writers.h"
#include "helpers/helpers.h"
#include "save_option.h"

#ifdef USE_ARROW
#include <arrow/table.h>

#if __has_include(<filesystem>)
Expand Down Expand Up @@ -36,10 +36,42 @@ class ArrowOutputStream {
std::shared_ptr<arrow::Table> arrow_table_ = nullptr;

public:
std::shared_ptr<ApacheArrowWriter> create_arrow_file(const std::string& arrow_file_type,
std::shared_ptr<ApacheArrowWriter> create_arrow_file(const Nyxus::SaveOption& arrow_file_type,
const std::string& arrow_file_path,
const std::vector<std::string>& header);
std::shared_ptr<arrow::Table> get_arrow_table(const std::string& file_path);
std::string get_arrow_path();
};

#else

// Replace arrow::Table with a dummy variable
namespace arrow {
using Table = bool;
};

/**
* @brief Class to write to Apache Arrow formats
*
* This class provides a place holder for the Arrow writer class when Nyxus is built without arrow.
*
*/
class ArrowOutputStream {

private:

std::string arrow_file_path_ = "";
std::shared_ptr<ApacheArrowWriter> writer_ = nullptr;
std::string arrow_output_type_ = "";
std::shared_ptr<arrow::Table> arrow_table_ = nullptr;

public:
std::shared_ptr<ApacheArrowWriter> create_arrow_file(const Nyxus::SaveOption& arrow_file_type,
const std::string& arrow_file_path,
const std::vector<std::string>& header);
std::shared_ptr<arrow::Table> get_arrow_table(const std::string& file_path, arrow::Status& table_status);
std::shared_ptr<arrow::Table> get_arrow_table(const std::string& file_path);
std::string get_arrow_path();
};


#endif
32 changes: 17 additions & 15 deletions src/nyx/environment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -844,24 +844,24 @@ bool Environment::parse_cmdline(int argc, char **argv)
auto rawOutpTypeUC = Nyxus::toupper(rawOutpType);
if (rawOutpTypeUC != Nyxus::toupper(OT_SINGLECSV) &&
rawOutpTypeUC != Nyxus::toupper(OT_SEPCSV) &&
rawOutpTypeUC != Nyxus::toupper(OT_ARROW) &&
rawOutpTypeUC != Nyxus::toupper(OT_ARROWIPC) &&
rawOutpTypeUC != Nyxus::toupper(OT_PARQUET))
{
std::cout << "Error: valid values of " << OUTPUTTYPE << " are " << OT_SEPCSV << ", " << OT_SINGLECSV << ", " << OT_ARROW ", or" << OT_PARQUET << "." "\n";
std::cout << "Error: valid values of " << OUTPUTTYPE << " are " << OT_SEPCSV << ", " << OT_SINGLECSV << ", or" << OT_PARQUET << "." "\n";
return false;
}

if (rawOutpTypeUC == Nyxus::toupper(OT_ARROW) ||
rawOutpTypeUC == Nyxus::toupper(OT_ARROWIPC) ||
rawOutpTypeUC == Nyxus::toupper(OT_PARQUET))
{
useCsv = false;
arrow_output_type = rawOutpTypeUC;
}
else
{
useCsv = true;
SaveOption saveOption = [&rawOutpTypeUC](){
if (rawOutpTypeUC == Nyxus::toupper(OT_ARROWIPC)) {
return SaveOption::saveArrowIPC;
} else if (rawOutpTypeUC == Nyxus::toupper(OT_PARQUET)) {
return SaveOption::saveParquet;
} else {
return SaveOption::saveCSV;
}
}();

if (saveOption == SaveOption::saveCSV) {
separateCsv = rawOutpTypeUC == Nyxus::toupper(OT_SEPCSV);
}

Expand All @@ -873,16 +873,18 @@ bool Environment::parse_cmdline(int argc, char **argv)
std::cout << "Error: valid values of " << OUTPUTTYPE << " are " << OT_SEPCSV << ", " << OT_SINGLECSV << "\n";

// Intercept an attempt of running Nyxus with Apache options
if (rawOutpTypeUC != Nyxus::toupper(OT_ARROW) ||
rawOutpTypeUC != Nyxus::toupper(OT_ARROWIPC) ||
if (rawOutpTypeUC != Nyxus::toupper(OT_ARROWIPC) ||
rawOutpTypeUC != Nyxus::toupper(OT_PARQUET))
std::cout << "Error: Nyxus must be built with Apache Arrow enabled to use Arrow output types. Please rebuild with the flag USEARROW=ON." << std::endl;

return false;
}

useCsv = (rawOutpTypeUC == Nyxus::toupper(OT_SINGLECSV) || rawOutpTypeUC == Nyxus::toupper(OT_SEPCSV));
separateCsv = rawOutpTypeUC == Nyxus::toupper(OT_SEPCSV);

if (rawOutpTypeUC == Nyxus::toupper(OT_SINGLECSV) || rawOutpTypeUC == Nyxus::toupper(OT_SEPCSV)) {
saveOption = Nyxus::SaveOption::saveCSV;
}
#endif

//==== Check numeric parameters
Expand Down
16 changes: 6 additions & 10 deletions src/nyx/environment.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@
#include "roi_blacklist.h"
#include "cli_gabor_options.h"
#include "cli_nested_roi_options.h"
#include "save_option.h"

#include "output_writers.h"
#include "arrow_output_stream.h"

#ifdef USE_ARROW
#include "output_writers.h"
#include "arrow_output_stream.h"
#endif

#ifdef USE_GPU
#include <cuda_runtime.h>
Expand Down Expand Up @@ -89,7 +89,6 @@
// Valid values of 'OUTPUTTYPE'
#define OT_SEPCSV "separatecsv"
#define OT_SINGLECSV "singlecsv"
#define OT_ARROW "arrow"
#define OT_ARROWIPC "arrowipc"
#define OT_PARQUET "parquet"

Expand Down Expand Up @@ -118,13 +117,9 @@ class Environment: public BasicEnvironment

bool singleROI = false; // is set to 'true' parse_cmdline() if labels_dir==intensity_dir

#ifdef USE_ARROW

std::string arrow_output_type = "";
ArrowOutputStream arrow_stream;
std::shared_ptr<ApacheArrowWriter> arrow_writer = nullptr;

#endif

std::string embedded_pixel_size = "";

Expand Down Expand Up @@ -153,7 +148,8 @@ class Environment: public BasicEnvironment

std::string rawOutpType = ""; // Valid values: "separatecsv", "singlecsv", "arrow", "parquet"
bool separateCsv = true;
bool useCsv = true;

Nyxus::SaveOption saveOption;

// x- and y- resolution in pixels per centimeter
std::string rawXYRes = "";
Expand Down
10 changes: 5 additions & 5 deletions src/nyx/globals.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "image_loader.h"
#include "results_cache.h"
#include "roi_cache.h"
#include "save_option.h"

#include "nested_feature_aggregation.h" // Nested ROI

Expand All @@ -26,6 +27,7 @@ namespace py = pybind11;
#endif

namespace Nyxus

{
// Permanent column names of the feature output table
const char colname_intensity_image[] = "intensity_image",
Expand All @@ -38,7 +40,7 @@ namespace Nyxus

bool scanFilePairParallel(const std::string& intens_fpath, const std::string& label_fpath, int num_fastloader_threads, int num_sensemaker_threads, int filepair_index, int tot_num_filepairs);
std::string getPureFname(const std::string& fpath);
int processDataset(const std::vector<std::string>& intensFiles, const std::vector<std::string>& labelFiles, int numFastloaderThreads, int numSensemakerThreads, int numReduceThreads, int min_online_roi_size, bool save2csv, bool arrow_output, const std::string& csvOutputDir);
int processDataset(const std::vector<std::string>& intensFiles, const std::vector<std::string>& labelFiles, int numFastloaderThreads, int numSensemakerThreads, int numReduceThreads, int min_online_roi_size, SaveOption saveOption, const std::string& outputDir);
bool gatherRoisMetrics(const std::string& intens_fpath, const std::string& label_fpath, int num_FL_threads);
bool processTrivialRois (const std::vector<int>& trivRoiLabels, const std::string& intens_fpath, const std::string& label_fpath, int num_FL_threads, size_t memory_limit);
bool processNontrivialRois (const std::vector<int>& nontrivRoiLabels, const std::string& intens_fpath, const std::string& label_fpath, int num_FL_threads);
Expand All @@ -52,7 +54,7 @@ namespace Nyxus
bool gatherRoisMetricsInMemory (const py::array_t<unsigned int, py::array::c_style | py::array::forcecast>& intens_image, const py::array_t<unsigned int, py::array::c_style | py::array::forcecast>& label_image, int start_idx);
bool processIntSegImagePairInMemory (const std::string& intens_fpath, const std::string& label_fpath, int filepair_index, const std::string& intens_name, const std::string& seg_name);
int processMontage(const py::array_t<unsigned int, py::array::c_style | py::array::forcecast>& intensFiles, const py::array_t<unsigned int, py::array::c_style | py::array::forcecast>& labelFiles, int numReduceThreads, const std::vector<std::string>& intensity_names,
const std::vector<std::string>& seg_names, std::string& error_message, bool arrow_output=false, const std::string& outputDir="");
const std::vector<std::string>& seg_names, std::string& error_message, SaveOption saveOption, const std::string& outputDir="");
bool scanTrivialRois (const std::vector<int>& batch_labels, const py::array_t<unsigned int, py::array::c_style | py::array::forcecast>& intens_images, const py::array_t<unsigned int, py::array::c_style | py::array::forcecast>& label_images, int start_idx);
bool processTrivialRoisInMemory (const std::vector<int>& trivRoiLabels, const py::array_t<unsigned int, py::array::c_style | py::array::forcecast>& intens_fpath, const py::array_t<unsigned int, py::array::c_style | py::array::forcecast>& label_fpath, int start_idx, size_t memory_limit);
#endif
Expand Down Expand Up @@ -131,6 +133,4 @@ namespace Nyxus
const NestedRoiOptions::Aggregations& aggr,
int verbosity_level);

} // namespace Nyxus


} // namespace Nyxus
14 changes: 5 additions & 9 deletions src/nyx/main_nyxus.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "dirs_and_files.h"
#include "environment.h"
#include "globals.h"
#include "save_option.h"
#include "arrow_output_stream.h"
#ifdef USE_GPU
bool gpu_initialize(int dev_id);
Expand Down Expand Up @@ -58,25 +59,18 @@ int main (int argc, char** argv)
// Current time stamp #1
auto startTS = getTimeStr();
VERBOSLVL1(std::cout << "\n>>> STARTING >>> " << startTS << "\n";)


bool use_arrow = false;

#ifdef USE_ARROW
use_arrow = theEnvironment.arrow_output_type == "ARROW" || theEnvironment.arrow_output_type == "PARQUET";
#endif

// Process the image data
int min_online_roi_size = 0;

errorCode = processDataset (
intensFiles,
labelFiles,
theEnvironment.n_loader_threads,
theEnvironment.n_pixel_scan_threads,
theEnvironment.n_reduce_threads,
min_online_roi_size,
use_arrow,
theEnvironment.useCsv,
theEnvironment.saveOption,
theEnvironment.output_dir);

// Report feature extraction error, if any
Expand All @@ -93,6 +87,8 @@ int main (int argc, char** argv)
case 3: // Memory error
std::cout << std::endl << "Memory error" << std::endl;
break;
case 4:
std::cout << std::endl << "Apache Arrow functionality is not available. Please install Nyxus with Arrow enabled to use this functionality." << std::endl;
default: // Any other error
std::cout << std::endl << "Error #" << errorCode << std::endl;
break;
Expand Down
21 changes: 17 additions & 4 deletions src/nyx/output_writers.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#ifdef USE_ARROW
#include "output_writers.h"

std::shared_ptr<arrow::Table> ApacheArrowWriter::get_arrow_table(const std::string& file_path, arrow::Status& table_status) {
#ifdef USE_ARROW
std::shared_ptr<arrow::Table> ApacheArrowWriter::get_arrow_table(const std::string& file_path) {

if (table_ != nullptr) return table_;

Expand All @@ -21,7 +21,7 @@ std::shared_ptr<arrow::Table> ApacheArrowWriter::get_arrow_table(const std::stri

if (!status.ok()) {
// Handle read error
table_status = status;
std::cerr << "Error creating arrow table: " << status.ToString();
return nullptr;
}

Expand All @@ -32,7 +32,7 @@ std::shared_ptr<arrow::Table> ApacheArrowWriter::get_arrow_table(const std::stri

if (!status.ok()) {
// Handle read error
table_status = status;
std::cerr << "Error creating arrow table: " << status.ToString();
return nullptr;
}

Expand Down Expand Up @@ -412,5 +412,18 @@ std::shared_ptr<ApacheArrowWriter> WriterFactory::create_writer(const std::strin
}
}
}
#else

std::shared_ptr<arrow::Table> ApacheArrowWriter::get_arrow_table(const std::string& file_path) {
return nullptr;
}

arrow::Status ApacheArrowWriter::write (const std::vector<std::tuple<std::vector<std::string>, int, std::vector<double>>>& features) {
return arrow::Status();
}

arrow::Status ApacheArrowWriter::close () {
return arrow::Status();
}

#endif
Loading