From 2629784f801907c650f9d1b4c27547173f26fc7f Mon Sep 17 00:00:00 2001 From: JesseMckinzie <72471813+JesseMckinzie@users.noreply.github.com> Date: Mon, 16 Oct 2023 11:59:10 -0600 Subject: [PATCH 01/13] Update error handling for get_arrow_table method --- src/nyx/arrow_output_stream.cpp | 4 ++-- src/nyx/arrow_output_stream.h | 2 +- src/nyx/output_writers.cpp | 6 +++--- src/nyx/python/new_bindings_py.cpp | 13 ++++++------- 4 files changed, 12 insertions(+), 13 deletions(-) diff --git a/src/nyx/arrow_output_stream.cpp b/src/nyx/arrow_output_stream.cpp index 9cc522fd..b46106af 100644 --- a/src/nyx/arrow_output_stream.cpp +++ b/src/nyx/arrow_output_stream.cpp @@ -34,11 +34,11 @@ std::shared_ptr ArrowOutputStream::create_arrow_file(const st } -std::shared_ptr ArrowOutputStream::get_arrow_table(const std::string& file_path, arrow::Status& table_status) { +std::shared_ptr ArrowOutputStream::get_arrow_table(const std::string& file_path) { if (this->arrow_table_ != nullptr) return this->arrow_table_; - this->arrow_table_ = writer_->get_arrow_table(file_path, table_status); + this->arrow_table_ = writer_->get_arrow_table(file_path); return this->arrow_table_; } diff --git a/src/nyx/arrow_output_stream.h b/src/nyx/arrow_output_stream.h index f99042a4..0119df23 100644 --- a/src/nyx/arrow_output_stream.h +++ b/src/nyx/arrow_output_stream.h @@ -39,7 +39,7 @@ class ArrowOutputStream { std::shared_ptr create_arrow_file(const std::string& arrow_file_type, const std::string& arrow_file_path, const std::vector& header); - std::shared_ptr get_arrow_table(const std::string& file_path, arrow::Status& table_status); + std::shared_ptr get_arrow_table(const std::string& file_path); std::string get_arrow_path(); }; #endif \ No newline at end of file diff --git a/src/nyx/output_writers.cpp b/src/nyx/output_writers.cpp index d8b1bb40..60055ba1 100644 --- a/src/nyx/output_writers.cpp +++ b/src/nyx/output_writers.cpp @@ -1,7 +1,7 @@ #ifdef USE_ARROW #include "output_writers.h" -std::shared_ptr ApacheArrowWriter::get_arrow_table(const std::string& file_path, arrow::Status& table_status) { +std::shared_ptr ApacheArrowWriter::get_arrow_table(const std::string& file_path) { if (table_ != nullptr) return table_; @@ -21,7 +21,7 @@ std::shared_ptr ApacheArrowWriter::get_arrow_table(const std::stri if (!status.ok()) { // Handle read error - table_status = status; + std::cerr << "Error creating arrow table: " << status.ToString(); return nullptr; } @@ -32,7 +32,7 @@ std::shared_ptr ApacheArrowWriter::get_arrow_table(const std::stri if (!status.ok()) { // Handle read error - table_status = status; + std::cerr << "Error creating arrow table: " << status.ToString(); return nullptr; } diff --git a/src/nyx/python/new_bindings_py.cpp b/src/nyx/python/new_bindings_py.cpp index 3c99add7..4d8005c4 100644 --- a/src/nyx/python/new_bindings_py.cpp +++ b/src/nyx/python/new_bindings_py.cpp @@ -546,16 +546,15 @@ std::string get_parquet_file_imp() { #ifdef USE_ARROW std::shared_ptr get_arrow_table_imp(const std::string& file_path) { - - arrow::Status status; - - auto table = theEnvironment.arrow_stream.get_arrow_table(file_path, status); - if (!status.ok()) { - throw std::runtime_error("Error creating Arrow table: " + status.ToString()); + auto table = theEnvironment.arrow_stream.get_arrow_table(file_path); + + if (table == nullptr) { + std::cerr << "Error creating Arrow table." << std::endl; } - + return table; + } #else From f624bac125d032762c34f246f7a9138f9a9fb576 Mon Sep 17 00:00:00 2001 From: JesseMckinzie <72471813+JesseMckinzie@users.noreply.github.com> Date: Tue, 17 Oct 2023 13:20:56 -0600 Subject: [PATCH 02/13] Update Arrow writers --- src/nyx/environment.cpp | 1 + src/nyx/environment.h | 4 +- src/nyx/globals.h | 7 +- src/nyx/main_nyxus.cpp | 42 ++-- src/nyx/output_writers.h | 2 +- src/nyx/python/new_bindings_py.cpp | 109 +++++++--- src/nyx/scan_fastloader_way.cpp | 307 +++++++++++++++++++++-------- 7 files changed, 335 insertions(+), 137 deletions(-) diff --git a/src/nyx/environment.cpp b/src/nyx/environment.cpp index 7519a462..371b021e 100644 --- a/src/nyx/environment.cpp +++ b/src/nyx/environment.cpp @@ -847,6 +847,7 @@ bool Environment::parse_cmdline(int argc, char **argv) { useCsv = false; arrow_output_type = rawOutpTypeUC; + use_arrow = true; } else { diff --git a/src/nyx/environment.h b/src/nyx/environment.h index a1d0f958..9d75bfe4 100644 --- a/src/nyx/environment.h +++ b/src/nyx/environment.h @@ -117,12 +117,12 @@ class Environment: public BasicEnvironment bool singleROI = false; // is set to 'true' parse_cmdline() if labels_dir==intensity_dir -#ifdef USE_ARROW + bool use_arrow = false; +#ifdef USE_ARROW std::string arrow_output_type = ""; ArrowOutputStream arrow_stream; std::shared_ptr arrow_writer = nullptr; - #endif std::string embedded_pixel_size = ""; diff --git a/src/nyx/globals.h b/src/nyx/globals.h index 777c3716..eec12564 100644 --- a/src/nyx/globals.h +++ b/src/nyx/globals.h @@ -32,7 +32,8 @@ namespace Nyxus bool scanFilePairParallel(const std::string& intens_fpath, const std::string& label_fpath, int num_fastloader_threads, int num_sensemaker_threads, int filepair_index, int tot_num_filepairs); std::string getPureFname(const std::string& fpath); - int processDataset(const std::vector& intensFiles, const std::vector& labelFiles, int numFastloaderThreads, int numSensemakerThreads, int numReduceThreads, int min_online_roi_size, bool save2csv, bool arrow_output, const std::string& csvOutputDir); + int processDataset(const std::vector& intensFiles, const std::vector& labelFiles, int numFastloaderThreads, int numSensemakerThreads, int numReduceThreads, int min_online_roi_size, bool save2csv, const std::string& csvOutputDir); + int processDataset(const std::vector& intensFiles, const std::vector& labelFiles, int numFastloaderThreads, int numSensemakerThreads, int numReduceThreads, int min_online_roi_size, const std::string& outputDir); bool gatherRoisMetrics(const std::string& intens_fpath, const std::string& label_fpath, int num_FL_threads); bool processTrivialRois (const std::vector& trivRoiLabels, const std::string& intens_fpath, const std::string& label_fpath, int num_FL_threads, size_t memory_limit); bool processNontrivialRois (const std::vector& nontrivRoiLabels, const std::string& intens_fpath, const std::string& label_fpath, int num_FL_threads); @@ -46,7 +47,9 @@ namespace Nyxus bool gatherRoisMetricsInMemory (const py::array_t& intens_image, const py::array_t& label_image, int start_idx); bool processIntSegImagePairInMemory (const std::string& intens_fpath, const std::string& label_fpath, int filepair_index, const std::string& intens_name, const std::string& seg_name); int processMontage(const py::array_t& intensFiles, const py::array_t& labelFiles, int numReduceThreads, const std::vector& intensity_names, - const std::vector& seg_names, std::string& error_message, bool arrow_output=false, const std::string& outputDir=""); + const std::vector& seg_names, std::string& error_message); + int processMontage(const py::array_t& intensFiles, const py::array_t& labelFiles, int numReduceThreads, const std::vector& intensity_names, + const std::vector& seg_names, std::string& error_message, const std::string& outputDir); bool scanTrivialRois (const std::vector& batch_labels, const py::array_t& intens_images, const py::array_t& label_images, int start_idx); bool processTrivialRoisInMemory (const std::vector& trivRoiLabels, const py::array_t& intens_fpath, const py::array_t& label_fpath, int start_idx, size_t memory_limit); #endif diff --git a/src/nyx/main_nyxus.cpp b/src/nyx/main_nyxus.cpp index ff7be233..e9242edf 100644 --- a/src/nyx/main_nyxus.cpp +++ b/src/nyx/main_nyxus.cpp @@ -58,26 +58,34 @@ int main (int argc, char** argv) // Current time stamp #1 auto startTS = getTimeStr(); VERBOSLVL1(std::cout << "\n>>> STARTING >>> " << startTS << "\n";) - - - bool use_arrow = false; - -#ifdef USE_ARROW - use_arrow = theEnvironment.arrow_output_type == "ARROW" || theEnvironment.arrow_output_type == "PARQUET"; -#endif // Process the image data int min_online_roi_size = 0; - errorCode = processDataset ( - intensFiles, - labelFiles, - theEnvironment.n_loader_threads, - theEnvironment.n_pixel_scan_threads, - theEnvironment.n_reduce_threads, - min_online_roi_size, - use_arrow, - theEnvironment.useCsv, - theEnvironment.output_dir); + + if (theEnvironment.use_arrow) { + + errorCode = processDataset ( + intensFiles, + labelFiles, + theEnvironment.n_loader_threads, + theEnvironment.n_pixel_scan_threads, + theEnvironment.n_reduce_threads, + min_online_roi_size, + theEnvironment.output_dir); + + } else { + + errorCode = processDataset ( + intensFiles, + labelFiles, + theEnvironment.n_loader_threads, + theEnvironment.n_pixel_scan_threads, + theEnvironment.n_reduce_threads, + min_online_roi_size, + theEnvironment.useCsv, + theEnvironment.output_dir); + + } // Report feature extraction error, if any switch (errorCode) diff --git a/src/nyx/output_writers.h b/src/nyx/output_writers.h index d5d4e619..53e5cc8a 100644 --- a/src/nyx/output_writers.h +++ b/src/nyx/output_writers.h @@ -70,7 +70,7 @@ class ApacheArrowWriter * * @return std::shared_ptr */ - std::shared_ptr get_arrow_table(const std::string& file_path, arrow::Status& table_status); + std::shared_ptr get_arrow_table(const std::string& file_path); /** * @brief Write Nyxus data to Arrow file diff --git a/src/nyx/python/new_bindings_py.cpp b/src/nyx/python/new_bindings_py.cpp index 4d8005c4..352ed666 100644 --- a/src/nyx/python/new_bindings_py.cpp +++ b/src/nyx/python/new_bindings_py.cpp @@ -173,22 +173,36 @@ py::tuple featurize_directory_imp ( // We're good to extract features. Reset the feature results cache theResultsCache.clear(); - auto arrow_output = !pandas_output; + theEnvironment.use_arrow = !pandas_output; theEnvironment.separateCsv = false; // Process the image sdata int min_online_roi_size = 0; - errorCode = processDataset( - intensFiles, - labelFiles, - theEnvironment.n_loader_threads, - theEnvironment.n_pixel_scan_threads, - theEnvironment.n_reduce_threads, - min_online_roi_size, - arrow_output, - false, - theEnvironment.output_dir); + + if (theEnvironment.use_arrow) { + + errorCode = processDataset( + intensFiles, + labelFiles, + theEnvironment.n_loader_threads, + theEnvironment.n_pixel_scan_threads, + theEnvironment.n_reduce_threads, + min_online_roi_size, + theEnvironment.output_dir); + + } else { + + errorCode = processDataset( + intensFiles, + labelFiles, + theEnvironment.n_loader_threads, + theEnvironment.n_pixel_scan_threads, + theEnvironment.n_reduce_threads, + min_online_roi_size, + false, + theEnvironment.output_dir); + } if (errorCode) throw std::runtime_error("Error " + std::to_string(errorCode) + " occurred during dataset processing"); @@ -258,17 +272,33 @@ py::tuple featurize_montage_imp ( theResultsCache.clear(); + theEnvironment.use_arrow = !pandas_output; + // Process the image sdata std::string error_message = ""; - int errorCode = processMontage( - intensity_images, - label_images, - theEnvironment.n_reduce_threads, - intensity_names, - label_names, - error_message, - !pandas_output, - output_dir); + + int errorCode; + if (theEnvironment.use_arrow) { + + errorCode = processMontage( + intensity_images, + label_images, + theEnvironment.n_reduce_threads, + intensity_names, + label_names, + error_message, + output_dir); + + } else { + + errorCode = processMontage( + intensity_images, + label_images, + theEnvironment.n_reduce_threads, + intensity_names, + label_names, + error_message); + } if (errorCode) throw std::runtime_error("Error #" + std::to_string(errorCode) + " " + error_message + " occurred during dataset processing."); @@ -334,18 +364,37 @@ py::tuple featurize_fname_lists_imp (const py::list& int_fnames, const py::list theResultsCache.clear(); + theEnvironment.use_arrow = !pandas_output; + // Process the image sdata int min_online_roi_size = 0; - int errorCode = processDataset( - intensFiles, - labelFiles, - theEnvironment.n_loader_threads, - theEnvironment.n_pixel_scan_threads, - theEnvironment.n_reduce_threads, - min_online_roi_size, - !pandas_output, - false, // 'true' to save to csv - theEnvironment.output_dir); + int errorCode; + + if (theEnvironment.use_arrow) { + + errorCode = processDataset( + intensFiles, + labelFiles, + theEnvironment.n_loader_threads, + theEnvironment.n_pixel_scan_threads, + theEnvironment.n_reduce_threads, + min_online_roi_size, + theEnvironment.output_dir); + + } else { + + errorCode = processDataset( + intensFiles, + labelFiles, + theEnvironment.n_loader_threads, + theEnvironment.n_pixel_scan_threads, + theEnvironment.n_reduce_threads, + min_online_roi_size, + false, + theEnvironment.output_dir); + } + + if (errorCode) throw std::runtime_error("Error occurred during dataset processing."); diff --git a/src/nyx/scan_fastloader_way.cpp b/src/nyx/scan_fastloader_way.cpp index 9d516f53..d42e2a39 100644 --- a/src/nyx/scan_fastloader_way.cpp +++ b/src/nyx/scan_fastloader_way.cpp @@ -209,7 +209,6 @@ namespace Nyxus int numSensemakerThreads, int numReduceThreads, int min_online_roi_size, - bool arrow_output, bool save2csv, const std::string& outputDir) { @@ -222,22 +221,6 @@ namespace Nyxus // One-time initialization init_feature_buffers(); - - // initialize arrow writer if needed - #ifdef USE_ARROW - if (arrow_output) { - - theEnvironment.arrow_stream = ArrowOutputStream(); - - try { - theEnvironment.arrow_writer = theEnvironment.arrow_stream.create_arrow_file(theEnvironment.arrow_output_type, outputDir, Nyxus::get_header(theFeatureSet.getEnabledFeatures())); - } catch (const std::exception &err) { - std::cout << "Error creating Arrow file: " << err.what() << std::endl; - return 1; - } - } - #endif - bool ok = true; // Iterate file pattern-filtered images of the dataset @@ -276,35 +259,18 @@ namespace Nyxus return 1; } - #ifdef USE_ARROW - if (arrow_output) { - - auto status = theEnvironment.arrow_writer->write(Nyxus::get_feature_values()); - - if (!status.ok()) { - // Handle read error - std::cout << "Error writing Arrow file: " << status.ToString() << std::endl; - return 2; - } + if (save2csv) { + ok = save_features_2_csv(ifp, lfp, outputDir); + } else { + ok = save_features_2_buffer(theResultsCache); } - #endif - // For the non-Apache output mode, save the result for this intensity-label file pair - if (!arrow_output) { - - if (save2csv) { - ok = save_features_2_csv(ifp, lfp, outputDir); - } else { - ok = save_features_2_buffer(theResultsCache); - } - - if (ok == false) - { - std::cout << "save_features_2_csv() returned an error code" << std::endl; - return 2; - } + if (ok == false) + { + std::cout << "save_features_2_csv() returned an error code" << std::endl; + return 2; } - + theImLoader.close(); // Save nested ROI related info of this image @@ -348,23 +314,147 @@ namespace Nyxus ); } #endif + + return 0; // success + } + + int processDataset( + const std::vector& intensFiles, + const std::vector& labelFiles, + int numFastloaderThreads, + int numSensemakerThreads, + int numReduceThreads, + int min_online_roi_size, + const std::string& outputDir) + { #ifdef USE_ARROW - if (arrow_output) { - // close arrow file after use - auto status = theEnvironment.arrow_writer->close(); + + #ifdef CHECKTIMING + if (Stopwatch::inclusive()) + Stopwatch::reset(); + #endif + + // One-time initialization + init_feature_buffers(); + + theEnvironment.arrow_stream = ArrowOutputStream(); + + try { + theEnvironment.arrow_writer = theEnvironment.arrow_stream.create_arrow_file(theEnvironment.arrow_output_type, outputDir, Nyxus::get_header(theFeatureSet.getEnabledFeatures())); + } catch (const std::exception &err) { + std::cout << "Error creating Arrow file: " << err.what() << std::endl; + return 1; + } + + bool ok = true; + + // Iterate file pattern-filtered images of the dataset + auto nf = intensFiles.size(); + for (int i = 0; i < nf; i++) + { +#ifdef CHECKTIMING + if (Stopwatch::exclusive()) + Stopwatch::reset(); +#endif + + // Clear ROI data cached for the previous image + clear_feature_buffers(); + + auto& ifp = intensFiles[i], + & lfp = labelFiles[i]; + + // Cache the file names to be picked up by labels to know their file origin + fs::path p_int(ifp), p_seg(lfp); + theSegFname = p_seg.string(); + theIntFname = p_int.string(); + + // Scan one label-intensity pair + ok = theImLoader.open(theIntFname, theSegFname); + if (ok == false) + { + std::cout << "Terminating\n"; + return 1; + } + + // Do phased processing: prescan, trivial ROI processing, oversized ROI processing + ok = processIntSegImagePair(ifp, lfp, numFastloaderThreads, i, nf); + if (ok == false) + { + std::cout << "processIntSegImagePair() returned an error code while processing file pair " << ifp << " and " << lfp << std::endl; + return 1; + } + + auto status = theEnvironment.arrow_writer->write(Nyxus::get_feature_values()); if (!status.ok()) { // Handle read error - std::cout << "Error closing Arrow file: " << status.ToString() << std::endl; + std::cout << "Error writing Arrow file: " << status.ToString() << std::endl; return 2; } + + theImLoader.close(); + + // Save nested ROI related info of this image + if (theEnvironment.nestedOptions.defined()) + save_nested_roi_info(nestedRoiData, uniqueLabels, roiData); + + #ifdef WITH_PYTHON_H + // Allow heyboard interrupt. + if (PyErr_CheckSignals() != 0) + { + sureprint("\nAborting per user input\n"); + throw pybind11::error_already_set(); + } + #endif + + #ifdef CHECKTIMING + if (Stopwatch::exclusive()) + { + // Detailed timing - on the screen + VERBOSLVL1(Stopwatch::print_stats()); + + // Details - also to a file + VERBOSLVL3( + fs::path p(theSegFname); + Stopwatch::save_stats(theEnvironment.output_dir + "/" + p.stem().string() + "_nyxustiming.csv"); + ); + } + #endif } - #endif - + + #ifdef CHECKTIMING + if (Stopwatch::inclusive()) + { + // Detailed timing - on the screen + VERBOSLVL1(Stopwatch::print_stats()); + + // Details - also to a file + VERBOSLVL3( + fs::path p(theSegFname); + Stopwatch::save_stats(theEnvironment.output_dir + "/inclusive_nyxustiming.csv"); + ); + } + #endif + + // close arrow file after use + auto status = theEnvironment.arrow_writer->close(); + + if (!status.ok()) { + // Handle read error + std::cout << "Error closing Arrow file: " << status.ToString() << std::endl; + return 2; + } + return 0; // success + #else + std::cerr << "Apache Arrow functionality is not available. Please install Nyxus with Arrow enable or use a different output type." << std::endl; + return 1; + #endif + } + #ifdef WITH_PYTHON_H int processMontage( @@ -373,23 +463,8 @@ namespace Nyxus int numReduceThreads, const std::vector& intensity_names, const std::vector& seg_names, - std::string& error_message, - bool arrow_output, - const std::string& outputDir) + std::string& error_message) { - #ifdef USE_ARROW - if (arrow_output) { - - theEnvironment.arrow_stream = ArrowOutputStream(); - - try { - theEnvironment.arrow_writer = theEnvironment.arrow_stream.create_arrow_file(theEnvironment.arrow_output_type, outputDir, Nyxus::get_header(theFeatureSet.getEnabledFeatures())); - } catch (const std::exception &err) { - error_message = err.what(); - return 1; - } - } - #endif auto intens_buffer = intensity_images.request(); auto label_buffer = label_images.request(); @@ -413,22 +488,8 @@ namespace Nyxus error_message = "processIntSegImagePairInMemory() returned an error code while processing file pair"; return 1; } - - #ifdef USE_ARROW - if (arrow_output) { - auto status = theEnvironment.arrow_writer->write(Nyxus::get_feature_values()); - - if (!status.ok()) { - // Handle read error - error_message = "Error writing Arrow file: " + status.ToString(); - return 2; - } - } - #endif - - if (!arrow_output) - ok = save_features_2_buffer(theResultsCache); + ok = save_features_2_buffer(theResultsCache); if (ok == false) { @@ -452,22 +513,98 @@ namespace Nyxus if (PyErr_CheckSignals() != 0) throw pybind11::error_already_set(); } + + return 0; // success + } + + int processMontage( + const py::array_t& intensity_images, + const py::array_t& label_images, + int numReduceThreads, + const std::vector& intensity_names, + const std::vector& seg_names, + std::string& error_message, + const std::string& outputDir) + { #ifdef USE_ARROW - if (arrow_output) { - // close arrow file after use - auto status = theEnvironment.arrow_writer->close(); + + theEnvironment.arrow_stream = ArrowOutputStream(); + + try { + theEnvironment.arrow_writer = theEnvironment.arrow_stream.create_arrow_file(theEnvironment.arrow_output_type, outputDir, Nyxus::get_header(theFeatureSet.getEnabledFeatures())); + } catch (const std::exception &err) { + error_message = err.what(); + return 1; + } + + + auto intens_buffer = intensity_images.request(); + auto label_buffer = label_images.request(); + + auto width = intens_buffer.shape[1]; + auto height = intens_buffer.shape[2]; + + auto nf = intens_buffer.shape[0]; + + for (int i = 0; i < nf; i++) + { + // Clear ROI label list, ROI data, etc. + clear_feature_buffers(); + + auto image_idx = i * width * height; + + std::vector unprocessed_rois; + auto ok = processIntSegImagePairInMemory (intensity_images, label_images, image_idx, intensity_names[i], seg_names[i], unprocessed_rois); // Phased processing + if (ok == false) + { + error_message = "processIntSegImagePairInMemory() returned an error code while processing file pair"; + return 1; + } + + + auto status = theEnvironment.arrow_writer->write(Nyxus::get_feature_values()); if (!status.ok()) { // Handle read error - error_message = "Error closing Arrow file: " + status.ToString(); + error_message = "Error writing Arrow file: " + status.ToString(); return 2; } + + if (unprocessed_rois.size() > 0) { + error_message = "The following ROIS are oversized and cannot be processed: "; + for (const auto& roi: unprocessed_rois){ + error_message += roi; + error_message += ", "; + } + + // remove trailing space and comma + error_message.pop_back(); + error_message.pop_back(); + } + + // Allow heyboard interrupt. + if (PyErr_CheckSignals() != 0) + throw pybind11::error_already_set(); + } + + // close arrow file after use + auto status = theEnvironment.arrow_writer->close(); + + if (!status.ok()) { + // Handle read error + error_message = "Error closing Arrow file: " + status.ToString(); + return 2; } - #endif return 0; // success + + #else + std::cerr << "Apache Arrow functionality is not available. Please install Nyxus with Arrow enable or use a different output type." << std::endl; + return 1; + #endif } + #endif void dump_roi_metrics(const std::string & label_fpath) From a403517efe5d55130706b8967e6ae59d98ba6ab1 Mon Sep 17 00:00:00 2001 From: JesseMckinzie <72471813+JesseMckinzie@users.noreply.github.com> Date: Tue, 17 Oct 2023 14:16:21 -0600 Subject: [PATCH 03/13] Add error code for using non-enabled Arrow functions --- src/nyx/main_nyxus.cpp | 2 ++ src/nyx/scan_fastloader_way.cpp | 8 ++++---- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/src/nyx/main_nyxus.cpp b/src/nyx/main_nyxus.cpp index e9242edf..508962bc 100644 --- a/src/nyx/main_nyxus.cpp +++ b/src/nyx/main_nyxus.cpp @@ -101,6 +101,8 @@ int main (int argc, char** argv) case 3: // Memory error std::cout << std::endl << "Memory error" << std::endl; break; + case 4: + std::cout << std::endl << "Arrow not enabled" << std::endl; default: // Any other error std::cout << std::endl << "Error #" << errorCode << std::endl; break; diff --git a/src/nyx/scan_fastloader_way.cpp b/src/nyx/scan_fastloader_way.cpp index d42e2a39..94c886b2 100644 --- a/src/nyx/scan_fastloader_way.cpp +++ b/src/nyx/scan_fastloader_way.cpp @@ -449,7 +449,7 @@ namespace Nyxus return 0; // success #else std::cerr << "Apache Arrow functionality is not available. Please install Nyxus with Arrow enable or use a different output type." << std::endl; - return 1; + return 4; #endif } @@ -528,7 +528,7 @@ namespace Nyxus const std::string& outputDir) { #ifdef USE_ARROW - + theEnvironment.arrow_stream = ArrowOutputStream(); try { @@ -600,8 +600,8 @@ namespace Nyxus return 0; // success #else - std::cerr << "Apache Arrow functionality is not available. Please install Nyxus with Arrow enable or use a different output type." << std::endl; - return 1; + error_message = "Apache Arrow functionality is not available. Please install Nyxus with Arrow enable or use a different output type."; + return 4; #endif } From 2229405b7efa0a6cd62e50fa5fd6db5fe9f6a8b3 Mon Sep 17 00:00:00 2001 From: JesseMckinzie <72471813+JesseMckinzie@users.noreply.github.com> Date: Thu, 19 Oct 2023 12:32:56 -0600 Subject: [PATCH 04/13] Add mock classes for Arrow writers when Arrow is not enabled --- src/nyx/arrow_output_stream.cpp | 31 +++++++++++++++-- src/nyx/arrow_output_stream.h | 35 ++++++++++++++++++- src/nyx/output_writers.cpp | 15 +++++++- src/nyx/output_writers.h | 62 +++++++++++++++++++++++++++++++-- 4 files changed, 136 insertions(+), 7 deletions(-) diff --git a/src/nyx/arrow_output_stream.cpp b/src/nyx/arrow_output_stream.cpp index b46106af..78138c1a 100644 --- a/src/nyx/arrow_output_stream.cpp +++ b/src/nyx/arrow_output_stream.cpp @@ -1,7 +1,7 @@ -#ifdef USE_ARROW - #include "arrow_output_stream.h" +#ifdef USE_ARROW + std::shared_ptr ArrowOutputStream::create_arrow_file(const std::string& arrow_file_type, const std::string& arrow_file_path, const std::vector& header) { @@ -46,4 +46,31 @@ std::shared_ptr ArrowOutputStream::get_arrow_table(const std::stri std::string ArrowOutputStream::get_arrow_path() { return arrow_file_path_; } + +#else + +std::shared_ptr ArrowOutputStream::create_arrow_file(const std::string& arrow_file_type, + const std::string& arrow_file_path, + const std::vector& header) { + + std::cerr << "Apache Arrow functionality is not available. Please install Nyxus with Arrow enabled to use this functionality." << std::endl; + + return nullptr; +} + + +std::shared_ptr ArrowOutputStream::get_arrow_table(const std::string& file_path) { + + std::cerr << "Apache Arrow functionality is not available. Please install Nyxus with Arrow enabled to use this functionality." << std::endl; + + return nullptr; +} + +std::string ArrowOutputStream::get_arrow_path() { + + std::cerr << "Apache Arrow functionality is not available. Please install Nyxus with Arrow enabled to use this functionality." << std::endl; + + return ""; +} + #endif \ No newline at end of file diff --git a/src/nyx/arrow_output_stream.h b/src/nyx/arrow_output_stream.h index 0119df23..e31ffb8a 100644 --- a/src/nyx/arrow_output_stream.h +++ b/src/nyx/arrow_output_stream.h @@ -1,6 +1,6 @@ #pragma once -#ifdef USE_ARROW + #include #include @@ -8,6 +8,7 @@ #include "output_writers.h" #include "helpers/helpers.h" +#ifdef USE_ARROW #include #if __has_include() @@ -42,4 +43,36 @@ class ArrowOutputStream { std::shared_ptr get_arrow_table(const std::string& file_path); std::string get_arrow_path(); }; + +#else + +// Replace arrow::Table with a dummy variable +namespace arrow { + using Table = bool; +}; + +/** + * @brief Class to write to Apache Arrow formats + * + * This class provides a place holder for the Arrow writer class when Nyxus is built without arrow. + * + */ +class ArrowOutputStream { + +private: + + std::string arrow_file_path_ = ""; + std::shared_ptr writer_ = nullptr; + std::string arrow_output_type_ = ""; + std::shared_ptr arrow_table_ = nullptr; + +public: + std::shared_ptr create_arrow_file(const std::string& arrow_file_type, + const std::string& arrow_file_path, + const std::vector& header); + std::shared_ptr get_arrow_table(const std::string& file_path); + std::string get_arrow_path(); +}; + + #endif \ No newline at end of file diff --git a/src/nyx/output_writers.cpp b/src/nyx/output_writers.cpp index 60055ba1..6a850cd9 100644 --- a/src/nyx/output_writers.cpp +++ b/src/nyx/output_writers.cpp @@ -1,6 +1,6 @@ -#ifdef USE_ARROW #include "output_writers.h" +#ifdef USE_ARROW std::shared_ptr ApacheArrowWriter::get_arrow_table(const std::string& file_path) { if (table_ != nullptr) return table_; @@ -412,5 +412,18 @@ std::shared_ptr WriterFactory::create_writer(const std::strin } } } +#else + + std::shared_ptr ApacheArrowWriter::get_arrow_table(const std::string& file_path) { + return nullptr; + } + + arrow::Status ApacheArrowWriter::write (const std::vector, int, std::vector>>& features) { + return arrow::Status(); + } + + arrow::Status ApacheArrowWriter::close () { + return arrow::Status(); + } #endif \ No newline at end of file diff --git a/src/nyx/output_writers.h b/src/nyx/output_writers.h index 53e5cc8a..04fcdee2 100644 --- a/src/nyx/output_writers.h +++ b/src/nyx/output_writers.h @@ -1,6 +1,10 @@ #pragma once +#include +#include +#include + #ifdef USE_ARROW #include #include @@ -13,11 +17,8 @@ #include -#include -#include #include #include -#include #include #include "helpers/helpers.h" @@ -165,5 +166,60 @@ class WriterFactory { static std::shared_ptr create_writer(const std::string &output_file, const std::vector &header); }; +#else + + +namespace arrow { + + using Table = bool; + + class Status { + + public: + + bool ok() {return false;} + + std::string ToString() {return "Apache Arrow support is not enabled. Please reinstall Nyxus with Arrow support enabled.";} + + }; + +}; + +/** + * @brief Base class for creating Apache Arrow output writers + * + * This class provides methods for the Arrow table used for writing to Arrow formats and + * provides virtual functions to overridden for writing to different formats + * + */ +class ApacheArrowWriter +{ + +private: + std::shared_ptr table_ = nullptr; + +public: + + /** + * @brief Get the arrow table object + * + * @return std::shared_ptr + */ + std::shared_ptr get_arrow_table(const std::string& file_path); + + /** + * @brief Write Nyxus data to Arrow file + * + * @param header Header data + * @param string_columns String data + * @param numeric_columns Numeric data + * @param number_of_rows Number of rows + * @return arrow::Status + */ + virtual arrow::Status write (const std::vector, int, std::vector>>& features); + + virtual arrow::Status close (); + +}; #endif From bca342ea840399610e39125267c09aa10f96546c Mon Sep 17 00:00:00 2001 From: JesseMckinzie <72471813+JesseMckinzie@users.noreply.github.com> Date: Thu, 19 Oct 2023 12:33:22 -0600 Subject: [PATCH 05/13] Remove ifdef statements and use mock Arrow classes --- src/nyx/environment.h | 9 +- src/nyx/globals.h | 15 +- src/nyx/main_nyxus.cpp | 41 ++-- src/nyx/python/new_bindings_py.cpp | 112 ++++------- src/nyx/scan_fastloader_way.cpp | 312 ++++++++--------------------- 5 files changed, 152 insertions(+), 337 deletions(-) diff --git a/src/nyx/environment.h b/src/nyx/environment.h index 9d75bfe4..5f828688 100644 --- a/src/nyx/environment.h +++ b/src/nyx/environment.h @@ -8,10 +8,9 @@ #include "cli_gabor_options.h" #include "cli_nested_roi_options.h" -#ifdef USE_ARROW - #include "output_writers.h" - #include "arrow_output_stream.h" -#endif +#include "output_writers.h" +#include "arrow_output_stream.h" + #ifdef USE_GPU #include @@ -119,11 +118,9 @@ class Environment: public BasicEnvironment bool use_arrow = false; -#ifdef USE_ARROW std::string arrow_output_type = ""; ArrowOutputStream arrow_stream; std::shared_ptr arrow_writer = nullptr; -#endif std::string embedded_pixel_size = ""; diff --git a/src/nyx/globals.h b/src/nyx/globals.h index eec12564..52ad57ad 100644 --- a/src/nyx/globals.h +++ b/src/nyx/globals.h @@ -26,14 +26,15 @@ namespace py = pybind11; #endif namespace Nyxus -{ +{ extern FeatureManager theFeatureMgr; extern ImageLoader theImLoader; + enum class SaveOption {saveCSV, saveArrow, saveBuffer}; + bool scanFilePairParallel(const std::string& intens_fpath, const std::string& label_fpath, int num_fastloader_threads, int num_sensemaker_threads, int filepair_index, int tot_num_filepairs); std::string getPureFname(const std::string& fpath); - int processDataset(const std::vector& intensFiles, const std::vector& labelFiles, int numFastloaderThreads, int numSensemakerThreads, int numReduceThreads, int min_online_roi_size, bool save2csv, const std::string& csvOutputDir); - int processDataset(const std::vector& intensFiles, const std::vector& labelFiles, int numFastloaderThreads, int numSensemakerThreads, int numReduceThreads, int min_online_roi_size, const std::string& outputDir); + int processDataset(const std::vector& intensFiles, const std::vector& labelFiles, int numFastloaderThreads, int numSensemakerThreads, int numReduceThreads, int min_online_roi_size, SaveOption saveOption, const std::string& outputDir); bool gatherRoisMetrics(const std::string& intens_fpath, const std::string& label_fpath, int num_FL_threads); bool processTrivialRois (const std::vector& trivRoiLabels, const std::string& intens_fpath, const std::string& label_fpath, int num_FL_threads, size_t memory_limit); bool processNontrivialRois (const std::vector& nontrivRoiLabels, const std::string& intens_fpath, const std::string& label_fpath, int num_FL_threads); @@ -47,9 +48,7 @@ namespace Nyxus bool gatherRoisMetricsInMemory (const py::array_t& intens_image, const py::array_t& label_image, int start_idx); bool processIntSegImagePairInMemory (const std::string& intens_fpath, const std::string& label_fpath, int filepair_index, const std::string& intens_name, const std::string& seg_name); int processMontage(const py::array_t& intensFiles, const py::array_t& labelFiles, int numReduceThreads, const std::vector& intensity_names, - const std::vector& seg_names, std::string& error_message); - int processMontage(const py::array_t& intensFiles, const py::array_t& labelFiles, int numReduceThreads, const std::vector& intensity_names, - const std::vector& seg_names, std::string& error_message, const std::string& outputDir); + const std::vector& seg_names, std::string& error_message, SaveOption saveOption, const std::string& outputDir=""); bool scanTrivialRois (const std::vector& batch_labels, const py::array_t& intens_images, const py::array_t& label_images, int start_idx); bool processTrivialRoisInMemory (const std::vector& trivRoiLabels, const py::array_t& intens_fpath, const py::array_t& label_fpath, int start_idx, size_t memory_limit); #endif @@ -130,6 +129,4 @@ namespace Nyxus const NestedRoiOptions::Aggregations& aggr, int verbosity_level); -} // namespace Nyxus - - +} // namespace Nyxus \ No newline at end of file diff --git a/src/nyx/main_nyxus.cpp b/src/nyx/main_nyxus.cpp index 508962bc..555f63be 100644 --- a/src/nyx/main_nyxus.cpp +++ b/src/nyx/main_nyxus.cpp @@ -62,30 +62,21 @@ int main (int argc, char** argv) // Process the image data int min_online_roi_size = 0; - if (theEnvironment.use_arrow) { - - errorCode = processDataset ( - intensFiles, - labelFiles, - theEnvironment.n_loader_threads, - theEnvironment.n_pixel_scan_threads, - theEnvironment.n_reduce_threads, - min_online_roi_size, - theEnvironment.output_dir); - - } else { - - errorCode = processDataset ( - intensFiles, - labelFiles, - theEnvironment.n_loader_threads, - theEnvironment.n_pixel_scan_threads, - theEnvironment.n_reduce_threads, - min_online_roi_size, - theEnvironment.useCsv, - theEnvironment.output_dir); - - } + SaveOption saveOption = [](){ + if (theEnvironment.use_arrow) return SaveOption::saveArrow; + else if (theEnvironment.useCsv) {return SaveOption::saveCSV;} + else {return SaveOption::saveBuffer;} + }(); + + errorCode = processDataset ( + intensFiles, + labelFiles, + theEnvironment.n_loader_threads, + theEnvironment.n_pixel_scan_threads, + theEnvironment.n_reduce_threads, + min_online_roi_size, + saveOption, + theEnvironment.output_dir); // Report feature extraction error, if any switch (errorCode) @@ -102,7 +93,7 @@ int main (int argc, char** argv) std::cout << std::endl << "Memory error" << std::endl; break; case 4: - std::cout << std::endl << "Arrow not enabled" << std::endl; + std::cout << std::endl << "Apache Arrow functionality is not available. Please install Nyxus with Arrow enabled to use this functionality." << std::endl; default: // Any other error std::cout << std::endl << "Error #" << errorCode << std::endl; break; diff --git a/src/nyx/python/new_bindings_py.cpp b/src/nyx/python/new_bindings_py.cpp index 352ed666..c90cafad 100644 --- a/src/nyx/python/new_bindings_py.cpp +++ b/src/nyx/python/new_bindings_py.cpp @@ -179,30 +179,21 @@ py::tuple featurize_directory_imp ( // Process the image sdata int min_online_roi_size = 0; - - if (theEnvironment.use_arrow) { - - errorCode = processDataset( - intensFiles, - labelFiles, - theEnvironment.n_loader_threads, - theEnvironment.n_pixel_scan_threads, - theEnvironment.n_reduce_threads, - min_online_roi_size, - theEnvironment.output_dir); - - } else { - - errorCode = processDataset( - intensFiles, - labelFiles, - theEnvironment.n_loader_threads, - theEnvironment.n_pixel_scan_threads, - theEnvironment.n_reduce_threads, - min_online_roi_size, - false, - theEnvironment.output_dir); - } + + SaveOption saveOption = [](){ + if (theEnvironment.use_arrow) return SaveOption::saveArrow; + else {return SaveOption::saveBuffer;} + }(); + + errorCode = processDataset( + intensFiles, + labelFiles, + theEnvironment.n_loader_threads, + theEnvironment.n_pixel_scan_threads, + theEnvironment.n_reduce_threads, + min_online_roi_size, + saveOption, + theEnvironment.output_dir); if (errorCode) throw std::runtime_error("Error " + std::to_string(errorCode) + " occurred during dataset processing"); @@ -277,28 +268,20 @@ py::tuple featurize_montage_imp ( // Process the image sdata std::string error_message = ""; - int errorCode; - if (theEnvironment.use_arrow) { - - errorCode = processMontage( - intensity_images, - label_images, - theEnvironment.n_reduce_threads, - intensity_names, - label_names, - error_message, - output_dir); - - } else { - - errorCode = processMontage( - intensity_images, - label_images, - theEnvironment.n_reduce_threads, - intensity_names, - label_names, - error_message); - } + SaveOption saveOption = [](){ + if (theEnvironment.use_arrow) return SaveOption::saveArrow; + else {return SaveOption::saveBuffer;} + }(); + + int errorCode = processMontage( + intensity_images, + label_images, + theEnvironment.n_reduce_threads, + intensity_names, + label_names, + error_message, + saveOption, + output_dir); if (errorCode) throw std::runtime_error("Error #" + std::to_string(errorCode) + " " + error_message + " occurred during dataset processing."); @@ -370,29 +353,20 @@ py::tuple featurize_fname_lists_imp (const py::list& int_fnames, const py::list int min_online_roi_size = 0; int errorCode; - if (theEnvironment.use_arrow) { - - errorCode = processDataset( - intensFiles, - labelFiles, - theEnvironment.n_loader_threads, - theEnvironment.n_pixel_scan_threads, - theEnvironment.n_reduce_threads, - min_online_roi_size, - theEnvironment.output_dir); - - } else { - - errorCode = processDataset( - intensFiles, - labelFiles, - theEnvironment.n_loader_threads, - theEnvironment.n_pixel_scan_threads, - theEnvironment.n_reduce_threads, - min_online_roi_size, - false, - theEnvironment.output_dir); - } + SaveOption saveOption = [](){ + if (theEnvironment.use_arrow) return SaveOption::saveArrow; + else {return SaveOption::saveBuffer;} + }(); + + errorCode = processDataset( + intensFiles, + labelFiles, + theEnvironment.n_loader_threads, + theEnvironment.n_pixel_scan_threads, + theEnvironment.n_reduce_threads, + min_online_roi_size, + saveOption, + theEnvironment.output_dir); if (errorCode) diff --git a/src/nyx/scan_fastloader_way.cpp b/src/nyx/scan_fastloader_way.cpp index 94c886b2..0020172d 100644 --- a/src/nyx/scan_fastloader_way.cpp +++ b/src/nyx/scan_fastloader_way.cpp @@ -209,7 +209,7 @@ namespace Nyxus int numSensemakerThreads, int numReduceThreads, int min_online_roi_size, - bool save2csv, + SaveOption saveOption, const std::string& outputDir) { @@ -221,131 +221,20 @@ namespace Nyxus // One-time initialization init_feature_buffers(); - bool ok = true; - - // Iterate file pattern-filtered images of the dataset - auto nf = intensFiles.size(); - for (int i = 0; i < nf; i++) - { -#ifdef CHECKTIMING - if (Stopwatch::exclusive()) - Stopwatch::reset(); -#endif - - // Clear ROI data cached for the previous image - clear_feature_buffers(); - - auto& ifp = intensFiles[i], - & lfp = labelFiles[i]; - // Cache the file names to be picked up by labels to know their file origin - fs::path p_int(ifp), p_seg(lfp); - theSegFname = p_seg.string(); - theIntFname = p_int.string(); + // initialize arrow writer if needed + if (saveOption == SaveOption::saveArrow) { - // Scan one label-intensity pair - ok = theImLoader.open(theIntFname, theSegFname); - if (ok == false) - { - std::cout << "Terminating\n"; - return 1; - } + theEnvironment.arrow_stream = ArrowOutputStream(); - // Do phased processing: prescan, trivial ROI processing, oversized ROI processing - ok = processIntSegImagePair(ifp, lfp, numFastloaderThreads, i, nf); - if (ok == false) - { - std::cout << "processIntSegImagePair() returned an error code while processing file pair " << ifp << " and " << lfp << std::endl; + try { + theEnvironment.arrow_writer = theEnvironment.arrow_stream.create_arrow_file(theEnvironment.arrow_output_type, outputDir, Nyxus::get_header(theFeatureSet.getEnabledFeatures())); + } catch (const std::exception &err) { + std::cout << "Error creating Arrow file: " << err.what() << std::endl; return 1; } - - if (save2csv) { - ok = save_features_2_csv(ifp, lfp, outputDir); - } else { - ok = save_features_2_buffer(theResultsCache); - } - - if (ok == false) - { - std::cout << "save_features_2_csv() returned an error code" << std::endl; - return 2; - } - - theImLoader.close(); - - // Save nested ROI related info of this image - if (theEnvironment.nestedOptions.defined()) - save_nested_roi_info(nestedRoiData, uniqueLabels, roiData); - - #ifdef WITH_PYTHON_H - // Allow heyboard interrupt. - if (PyErr_CheckSignals() != 0) - { - sureprint("\nAborting per user input\n"); - throw pybind11::error_already_set(); - } - #endif - - #ifdef CHECKTIMING - if (Stopwatch::exclusive()) - { - // Detailed timing - on the screen - VERBOSLVL1(Stopwatch::print_stats()); - - // Details - also to a file - VERBOSLVL3( - fs::path p(theSegFname); - Stopwatch::save_stats(theEnvironment.output_dir + "/" + p.stem().string() + "_nyxustiming.csv"); - ); - } - #endif } - #ifdef CHECKTIMING - if (Stopwatch::inclusive()) - { - // Detailed timing - on the screen - VERBOSLVL1(Stopwatch::print_stats()); - - // Details - also to a file - VERBOSLVL3( - fs::path p(theSegFname); - Stopwatch::save_stats(theEnvironment.output_dir + "/inclusive_nyxustiming.csv"); - ); - } - #endif - - return 0; // success - } - - - int processDataset( - const std::vector& intensFiles, - const std::vector& labelFiles, - int numFastloaderThreads, - int numSensemakerThreads, - int numReduceThreads, - int min_online_roi_size, - const std::string& outputDir) - { - #ifdef USE_ARROW - - #ifdef CHECKTIMING - if (Stopwatch::inclusive()) - Stopwatch::reset(); - #endif - - // One-time initialization - init_feature_buffers(); - - theEnvironment.arrow_stream = ArrowOutputStream(); - - try { - theEnvironment.arrow_writer = theEnvironment.arrow_stream.create_arrow_file(theEnvironment.arrow_output_type, outputDir, Nyxus::get_header(theFeatureSet.getEnabledFeatures())); - } catch (const std::exception &err) { - std::cout << "Error creating Arrow file: " << err.what() << std::endl; - return 1; - } bool ok = true; @@ -385,12 +274,32 @@ namespace Nyxus return 1; } - auto status = theEnvironment.arrow_writer->write(Nyxus::get_feature_values()); - - if (!status.ok()) { - // Handle read error - std::cout << "Error writing Arrow file: " << status.ToString() << std::endl; - return 2; + + if (saveOption == SaveOption::saveArrow) { + + auto status = theEnvironment.arrow_writer->write(Nyxus::get_feature_values()); + + if (!status.ok()) { + // Handle read error + std::cout << "Error writing Arrow file: " << status.ToString() << std::endl; + return 2; + } + } else if (saveOption == SaveOption::saveCSV) { + ok = save_features_2_csv(ifp, lfp, outputDir); + + if (ok == false) + { + std::cout << "save_features_2_csv() returned an error code" << std::endl; + return 2; + } + } else { + ok = save_features_2_buffer(theResultsCache); + + if (ok == false) + { + std::cout << "save_features_2_buffer() returned an error code" << std::endl; + return 2; + } } theImLoader.close(); @@ -437,86 +346,21 @@ namespace Nyxus } #endif - // close arrow file after use - auto status = theEnvironment.arrow_writer->close(); - - if (!status.ok()) { - // Handle read error - std::cout << "Error closing Arrow file: " << status.ToString() << std::endl; - return 2; - } - - return 0; // success - #else - std::cerr << "Apache Arrow functionality is not available. Please install Nyxus with Arrow enable or use a different output type." << std::endl; - return 4; - #endif - - } - - -#ifdef WITH_PYTHON_H - - int processMontage( - const py::array_t& intensity_images, - const py::array_t& label_images, - int numReduceThreads, - const std::vector& intensity_names, - const std::vector& seg_names, - std::string& error_message) - { - - auto intens_buffer = intensity_images.request(); - auto label_buffer = label_images.request(); - - auto width = intens_buffer.shape[1]; - auto height = intens_buffer.shape[2]; - - auto nf = intens_buffer.shape[0]; - - for (int i = 0; i < nf; i++) - { - // Clear ROI label list, ROI data, etc. - clear_feature_buffers(); - - auto image_idx = i * width * height; - - std::vector unprocessed_rois; - auto ok = processIntSegImagePairInMemory (intensity_images, label_images, image_idx, intensity_names[i], seg_names[i], unprocessed_rois); // Phased processing - if (ok == false) - { - error_message = "processIntSegImagePairInMemory() returned an error code while processing file pair"; - return 1; - } - - ok = save_features_2_buffer(theResultsCache); - - if (ok == false) - { - error_message = "save_features_2_buffer() failed"; + if (saveOption == SaveOption::saveArrow) { + // close arrow file after use + auto status = theEnvironment.arrow_writer->close(); + + if (!status.ok()) { + // Handle read error + std::cout << "Error closing Arrow file: " << status.ToString() << std::endl; return 2; } - - if (unprocessed_rois.size() > 0) { - error_message = "The following ROIS are oversized and cannot be processed: "; - for (const auto& roi: unprocessed_rois){ - error_message += roi; - error_message += ", "; - } - - // remove trailing space and comma - error_message.pop_back(); - error_message.pop_back(); - } - - // Allow heyboard interrupt. - if (PyErr_CheckSignals() != 0) - throw pybind11::error_already_set(); } - + return 0; // success } +#ifdef WITH_PYTHON_H int processMontage( const py::array_t& intensity_images, @@ -525,19 +369,21 @@ namespace Nyxus const std::vector& intensity_names, const std::vector& seg_names, std::string& error_message, + SaveOption saveOption, const std::string& outputDir) { - #ifdef USE_ARROW - theEnvironment.arrow_stream = ArrowOutputStream(); + if (saveOption == SaveOption::saveArrow) { - try { - theEnvironment.arrow_writer = theEnvironment.arrow_stream.create_arrow_file(theEnvironment.arrow_output_type, outputDir, Nyxus::get_header(theFeatureSet.getEnabledFeatures())); - } catch (const std::exception &err) { - error_message = err.what(); - return 1; - } + theEnvironment.arrow_stream = ArrowOutputStream(); + try { + theEnvironment.arrow_writer = theEnvironment.arrow_stream.create_arrow_file(theEnvironment.arrow_output_type, outputDir, Nyxus::get_header(theFeatureSet.getEnabledFeatures())); + } catch (const std::exception &err) { + error_message = err.what(); + return 1; + } + } auto intens_buffer = intensity_images.request(); auto label_buffer = label_images.request(); @@ -563,12 +409,26 @@ namespace Nyxus } - auto status = theEnvironment.arrow_writer->write(Nyxus::get_feature_values()); - - if (!status.ok()) { - // Handle read error - error_message = "Error writing Arrow file: " + status.ToString(); - return 2; + if (saveOption == SaveOption::saveArrow) { + + auto status = theEnvironment.arrow_writer->write(Nyxus::get_feature_values()); + + if (!status.ok()) { + // Handle read error + error_message = "Error writing Arrow file: " + status.ToString(); + return 2; + } + + } else { + + ok = save_features_2_buffer(theResultsCache); + + if (ok == false) + { + error_message = "save_features_2_buffer() failed"; + return 2; + } + } if (unprocessed_rois.size() > 0) { @@ -588,23 +448,20 @@ namespace Nyxus throw pybind11::error_already_set(); } - // close arrow file after use - auto status = theEnvironment.arrow_writer->close(); - - if (!status.ok()) { - // Handle read error - error_message = "Error closing Arrow file: " + status.ToString(); - return 2; + + if (saveOption == SaveOption::saveArrow) { + // close arrow file after use + auto status = theEnvironment.arrow_writer->close(); + + if (!status.ok()) { + // Handle read error + error_message = "Error closing Arrow file: " + status.ToString(); + return 2; + } } return 0; // success - - #else - error_message = "Apache Arrow functionality is not available. Please install Nyxus with Arrow enable or use a different output type."; - return 4; - #endif } - #endif void dump_roi_metrics(const std::string & label_fpath) @@ -645,5 +502,4 @@ namespace Nyxus std::cout << "... done\n"; } -} - +} \ No newline at end of file From 552641527e31fd07f217da5625cfa9e27e580e17 Mon Sep 17 00:00:00 2001 From: JesseMckinzie <72471813+JesseMckinzie@users.noreply.github.com> Date: Mon, 23 Oct 2023 08:18:34 -0600 Subject: [PATCH 06/13] Rename use_arrow to use_apache_writers --- src/nyx/environment.cpp | 2 +- src/nyx/environment.h | 2 +- src/nyx/main_nyxus.cpp | 2 +- src/nyx/python/new_bindings_py.cpp | 12 ++++++------ 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/src/nyx/environment.cpp b/src/nyx/environment.cpp index 371b021e..6719de4d 100644 --- a/src/nyx/environment.cpp +++ b/src/nyx/environment.cpp @@ -847,7 +847,7 @@ bool Environment::parse_cmdline(int argc, char **argv) { useCsv = false; arrow_output_type = rawOutpTypeUC; - use_arrow = true; + use_apache_writers = true; } else { diff --git a/src/nyx/environment.h b/src/nyx/environment.h index 5f828688..1f65c4e6 100644 --- a/src/nyx/environment.h +++ b/src/nyx/environment.h @@ -117,7 +117,7 @@ class Environment: public BasicEnvironment bool singleROI = false; // is set to 'true' parse_cmdline() if labels_dir==intensity_dir - bool use_arrow = false; + bool use_apache_writers = false; std::string arrow_output_type = ""; ArrowOutputStream arrow_stream; std::shared_ptr arrow_writer = nullptr; diff --git a/src/nyx/main_nyxus.cpp b/src/nyx/main_nyxus.cpp index 555f63be..39ee8347 100644 --- a/src/nyx/main_nyxus.cpp +++ b/src/nyx/main_nyxus.cpp @@ -63,7 +63,7 @@ int main (int argc, char** argv) int min_online_roi_size = 0; SaveOption saveOption = [](){ - if (theEnvironment.use_arrow) return SaveOption::saveArrow; + if (theEnvironment.use_apache_writers) return SaveOption::saveArrow; else if (theEnvironment.useCsv) {return SaveOption::saveCSV;} else {return SaveOption::saveBuffer;} }(); diff --git a/src/nyx/python/new_bindings_py.cpp b/src/nyx/python/new_bindings_py.cpp index c90cafad..90131e39 100644 --- a/src/nyx/python/new_bindings_py.cpp +++ b/src/nyx/python/new_bindings_py.cpp @@ -173,7 +173,7 @@ py::tuple featurize_directory_imp ( // We're good to extract features. Reset the feature results cache theResultsCache.clear(); - theEnvironment.use_arrow = !pandas_output; + theEnvironment.use_apache_writers = !pandas_output; theEnvironment.separateCsv = false; @@ -181,7 +181,7 @@ py::tuple featurize_directory_imp ( int min_online_roi_size = 0; SaveOption saveOption = [](){ - if (theEnvironment.use_arrow) return SaveOption::saveArrow; + if (theEnvironment.use_apache_writers) return SaveOption::saveArrow; else {return SaveOption::saveBuffer;} }(); @@ -263,13 +263,13 @@ py::tuple featurize_montage_imp ( theResultsCache.clear(); - theEnvironment.use_arrow = !pandas_output; + theEnvironment.use_apache_writers = !pandas_output; // Process the image sdata std::string error_message = ""; SaveOption saveOption = [](){ - if (theEnvironment.use_arrow) return SaveOption::saveArrow; + if (theEnvironment.use_apache_writers) return SaveOption::saveArrow; else {return SaveOption::saveBuffer;} }(); @@ -347,14 +347,14 @@ py::tuple featurize_fname_lists_imp (const py::list& int_fnames, const py::list theResultsCache.clear(); - theEnvironment.use_arrow = !pandas_output; + theEnvironment.use_apache_writers = !pandas_output; // Process the image sdata int min_online_roi_size = 0; int errorCode; SaveOption saveOption = [](){ - if (theEnvironment.use_arrow) return SaveOption::saveArrow; + if (theEnvironment.use_apache_writers) return SaveOption::saveArrow; else {return SaveOption::saveBuffer;} }(); From 04b6952cdd8a0dcd29b9561f9eb4fed9ef5f8315 Mon Sep 17 00:00:00 2001 From: JesseMckinzie <72471813+JesseMckinzie@users.noreply.github.com> Date: Mon, 23 Oct 2023 10:17:29 -0600 Subject: [PATCH 07/13] Update Arrow writers to use SaveOption --- src/nyx/arrow_output_stream.cpp | 12 +++++------- src/nyx/arrow_output_stream.h | 7 +++---- src/nyx/globals.h | 2 +- src/nyx/main_nyxus.cpp | 8 +++++++- src/nyx/python/new_bindings_py.cpp | 26 +++++++++++++++++++++++--- src/nyx/scan_fastloader_way.cpp | 20 +++++++++++--------- 6 files changed, 50 insertions(+), 25 deletions(-) diff --git a/src/nyx/arrow_output_stream.cpp b/src/nyx/arrow_output_stream.cpp index 78138c1a..f909c0cd 100644 --- a/src/nyx/arrow_output_stream.cpp +++ b/src/nyx/arrow_output_stream.cpp @@ -2,21 +2,19 @@ #ifdef USE_ARROW -std::shared_ptr ArrowOutputStream::create_arrow_file(const std::string& arrow_file_type, +std::shared_ptr ArrowOutputStream::create_arrow_file(const SaveOption& arrow_file_type, const std::string& arrow_file_path, const std::vector& header) { - - std::string arrow_file_type_upper = Nyxus::toupper(arrow_file_type); if(arrow_file_path != "" && !fs::is_directory(arrow_file_path) && !(Nyxus::ends_with_substr(arrow_file_path, ".arrow") || Nyxus::ends_with_substr(arrow_file_path, ".feather") || Nyxus::ends_with_substr(arrow_file_path, ".parquet"))) { throw std::invalid_argument("The arrow file path must end in \".arrow\""); } - if (!(arrow_file_type_upper == "ARROW" || arrow_file_type_upper == "ARROWIPC" || arrow_file_type_upper == "PARQUET")) { - throw std::invalid_argument("The valid file types are ARROW, ARROWIPC, or PARQUET"); + if (arrow_file_type != SaveOption::saveArrowIPC && arrow_file_type != SaveOption::saveParquet) { + throw std::invalid_argument("The valid save options are SaveOption::saveArrowIPC or SaveOption::saveParquet."); } - std::string extension = (arrow_file_type_upper == "PARQUET") ? ".parquet" : ".arrow"; + std::string extension = (arrow_file_type == SaveOption::saveParquet) ? ".parquet" : ".arrow"; if (arrow_file_path == "") { arrow_file_path_ = "NyxusFeatures" + extension; @@ -49,7 +47,7 @@ std::string ArrowOutputStream::get_arrow_path() { #else -std::shared_ptr ArrowOutputStream::create_arrow_file(const std::string& arrow_file_type, +std::shared_ptr ArrowOutputStream::create_arrow_file(const SaveOption& arrow_file_type, const std::string& arrow_file_path, const std::vector& header) { diff --git a/src/nyx/arrow_output_stream.h b/src/nyx/arrow_output_stream.h index e31ffb8a..c8e873ca 100644 --- a/src/nyx/arrow_output_stream.h +++ b/src/nyx/arrow_output_stream.h @@ -1,12 +1,11 @@ #pragma once - - #include #include #include "output_writers.h" #include "helpers/helpers.h" +#include "globals.h" #ifdef USE_ARROW #include @@ -37,7 +36,7 @@ class ArrowOutputStream { std::shared_ptr arrow_table_ = nullptr; public: - std::shared_ptr create_arrow_file(const std::string& arrow_file_type, + std::shared_ptr create_arrow_file(const SaveOption& arrow_file_type, const std::string& arrow_file_path, const std::vector& header); std::shared_ptr get_arrow_table(const std::string& file_path); @@ -67,7 +66,7 @@ class ArrowOutputStream { std::shared_ptr arrow_table_ = nullptr; public: - std::shared_ptr create_arrow_file(const std::string& arrow_file_type, + std::shared_ptr create_arrow_file(const SaveOption& arrow_file_type, const std::string& arrow_file_path, const std::vector& header); std::shared_ptr get_arrow_table(const std::string& file_path); diff --git a/src/nyx/globals.h b/src/nyx/globals.h index 52ad57ad..ba5c69fc 100644 --- a/src/nyx/globals.h +++ b/src/nyx/globals.h @@ -30,7 +30,7 @@ namespace Nyxus extern FeatureManager theFeatureMgr; extern ImageLoader theImLoader; - enum class SaveOption {saveCSV, saveArrow, saveBuffer}; + enum class SaveOption {saveCSV, saveBuffer, saveArrowIPC, saveParquet}; bool scanFilePairParallel(const std::string& intens_fpath, const std::string& label_fpath, int num_fastloader_threads, int num_sensemaker_threads, int filepair_index, int tot_num_filepairs); std::string getPureFname(const std::string& fpath); diff --git a/src/nyx/main_nyxus.cpp b/src/nyx/main_nyxus.cpp index 39ee8347..aa42c354 100644 --- a/src/nyx/main_nyxus.cpp +++ b/src/nyx/main_nyxus.cpp @@ -63,7 +63,13 @@ int main (int argc, char** argv) int min_online_roi_size = 0; SaveOption saveOption = [](){ - if (theEnvironment.use_apache_writers) return SaveOption::saveArrow; + if (theEnvironment.use_apache_writers) { + if (Nyxus::toupper(theEnvironment.arrow_output_type) == "ARROW") { + return SaveOption::saveArrowIPC; + } else { + return SaveOption::saveParquet; + } + } else if (theEnvironment.useCsv) {return SaveOption::saveCSV;} else {return SaveOption::saveBuffer;} }(); diff --git a/src/nyx/python/new_bindings_py.cpp b/src/nyx/python/new_bindings_py.cpp index 90131e39..70e5fb89 100644 --- a/src/nyx/python/new_bindings_py.cpp +++ b/src/nyx/python/new_bindings_py.cpp @@ -181,7 +181,13 @@ py::tuple featurize_directory_imp ( int min_online_roi_size = 0; SaveOption saveOption = [](){ - if (theEnvironment.use_apache_writers) return SaveOption::saveArrow; + if (theEnvironment.use_apache_writers) { + if (Nyxus::toupper(theEnvironment.arrow_output_type) == "ARROW") { + return SaveOption::saveArrowIPC; + } else { + return SaveOption::saveParquet; + } + } else {return SaveOption::saveBuffer;} }(); @@ -269,10 +275,18 @@ py::tuple featurize_montage_imp ( std::string error_message = ""; SaveOption saveOption = [](){ - if (theEnvironment.use_apache_writers) return SaveOption::saveArrow; + if (theEnvironment.use_apache_writers) { + if (Nyxus::toupper(theEnvironment.arrow_output_type) == "ARROW") { + return SaveOption::saveArrowIPC; + } else { + return SaveOption::saveParquet; + } + } else {return SaveOption::saveBuffer;} }(); + + int errorCode = processMontage( intensity_images, label_images, @@ -354,7 +368,13 @@ py::tuple featurize_fname_lists_imp (const py::list& int_fnames, const py::list int errorCode; SaveOption saveOption = [](){ - if (theEnvironment.use_apache_writers) return SaveOption::saveArrow; + if (theEnvironment.use_apache_writers) { + if (Nyxus::toupper(theEnvironment.arrow_output_type) == "ARROW") { + return SaveOption::saveArrowIPC; + } else { + return SaveOption::saveParquet; + } + } else {return SaveOption::saveBuffer;} }(); diff --git a/src/nyx/scan_fastloader_way.cpp b/src/nyx/scan_fastloader_way.cpp index 0020172d..8fdcc9a3 100644 --- a/src/nyx/scan_fastloader_way.cpp +++ b/src/nyx/scan_fastloader_way.cpp @@ -221,14 +221,15 @@ namespace Nyxus // One-time initialization init_feature_buffers(); + bool write_apache = (saveOption == SaveOption::saveArrowIPC || saveOption == SaveOption::saveParquet); // initialize arrow writer if needed - if (saveOption == SaveOption::saveArrow) { + if (write_apache) { theEnvironment.arrow_stream = ArrowOutputStream(); try { - theEnvironment.arrow_writer = theEnvironment.arrow_stream.create_arrow_file(theEnvironment.arrow_output_type, outputDir, Nyxus::get_header(theFeatureSet.getEnabledFeatures())); + theEnvironment.arrow_writer = theEnvironment.arrow_stream.create_arrow_file(saveOption, outputDir, Nyxus::get_header(theFeatureSet.getEnabledFeatures())); } catch (const std::exception &err) { std::cout << "Error creating Arrow file: " << err.what() << std::endl; return 1; @@ -275,7 +276,7 @@ namespace Nyxus } - if (saveOption == SaveOption::saveArrow) { + if (write_apache) { auto status = theEnvironment.arrow_writer->write(Nyxus::get_feature_values()); @@ -346,7 +347,7 @@ namespace Nyxus } #endif - if (saveOption == SaveOption::saveArrow) { + if (write_apache) { // close arrow file after use auto status = theEnvironment.arrow_writer->close(); @@ -372,13 +373,14 @@ namespace Nyxus SaveOption saveOption, const std::string& outputDir) { + bool write_apache = (saveOption == SaveOption::saveArrowIPC || saveOption == SaveOption::saveParquet); - if (saveOption == SaveOption::saveArrow) { + if (write_apache) { theEnvironment.arrow_stream = ArrowOutputStream(); try { - theEnvironment.arrow_writer = theEnvironment.arrow_stream.create_arrow_file(theEnvironment.arrow_output_type, outputDir, Nyxus::get_header(theFeatureSet.getEnabledFeatures())); + theEnvironment.arrow_writer = theEnvironment.arrow_stream.create_arrow_file(saveOption, outputDir, Nyxus::get_header(theFeatureSet.getEnabledFeatures())); } catch (const std::exception &err) { error_message = err.what(); return 1; @@ -409,8 +411,8 @@ namespace Nyxus } - if (saveOption == SaveOption::saveArrow) { - + if (write_apache) { + auto status = theEnvironment.arrow_writer->write(Nyxus::get_feature_values()); if (!status.ok()) { @@ -449,7 +451,7 @@ namespace Nyxus } - if (saveOption == SaveOption::saveArrow) { + if (write_apache) { // close arrow file after use auto status = theEnvironment.arrow_writer->close(); From df0ea1157c0a77f7f64a2719e5499ff71e819e1d Mon Sep 17 00:00:00 2001 From: JesseMckinzie <72471813+JesseMckinzie@users.noreply.github.com> Date: Wed, 25 Oct 2023 10:24:50 -0600 Subject: [PATCH 08/13] Move SaveOption to separate file --- src/nyx/arrow_output_stream.cpp | 10 +++++----- src/nyx/arrow_output_stream.h | 6 +++--- src/nyx/globals.h | 3 +-- src/nyx/save_option.h | 5 +++++ 4 files changed, 14 insertions(+), 10 deletions(-) create mode 100644 src/nyx/save_option.h diff --git a/src/nyx/arrow_output_stream.cpp b/src/nyx/arrow_output_stream.cpp index f909c0cd..0d27c508 100644 --- a/src/nyx/arrow_output_stream.cpp +++ b/src/nyx/arrow_output_stream.cpp @@ -2,7 +2,7 @@ #ifdef USE_ARROW -std::shared_ptr ArrowOutputStream::create_arrow_file(const SaveOption& arrow_file_type, +std::shared_ptr ArrowOutputStream::create_arrow_file(const Nyxus::SaveOption& arrow_file_type, const std::string& arrow_file_path, const std::vector& header) { @@ -10,11 +10,11 @@ std::shared_ptr ArrowOutputStream::create_arrow_file(const Sa throw std::invalid_argument("The arrow file path must end in \".arrow\""); } - if (arrow_file_type != SaveOption::saveArrowIPC && arrow_file_type != SaveOption::saveParquet) { - throw std::invalid_argument("The valid save options are SaveOption::saveArrowIPC or SaveOption::saveParquet."); + if (arrow_file_type != Nyxus::SaveOption::saveArrowIPC && arrow_file_type != Nyxus::SaveOption::saveParquet) { + throw std::invalid_argument("The valid save options are Nyxus::SaveOption::saveArrowIPC or Nyxus::SaveOption::saveParquet."); } - std::string extension = (arrow_file_type == SaveOption::saveParquet) ? ".parquet" : ".arrow"; + std::string extension = (arrow_file_type == Nyxus::SaveOption::saveParquet) ? ".parquet" : ".arrow"; if (arrow_file_path == "") { arrow_file_path_ = "NyxusFeatures" + extension; @@ -47,7 +47,7 @@ std::string ArrowOutputStream::get_arrow_path() { #else -std::shared_ptr ArrowOutputStream::create_arrow_file(const SaveOption& arrow_file_type, +std::shared_ptr ArrowOutputStream::create_arrow_file(const Nyxus::SaveOption& arrow_file_type, const std::string& arrow_file_path, const std::vector& header) { diff --git a/src/nyx/arrow_output_stream.h b/src/nyx/arrow_output_stream.h index c8e873ca..33ec7014 100644 --- a/src/nyx/arrow_output_stream.h +++ b/src/nyx/arrow_output_stream.h @@ -5,7 +5,7 @@ #include "output_writers.h" #include "helpers/helpers.h" -#include "globals.h" +#include "save_option.h" #ifdef USE_ARROW #include @@ -36,7 +36,7 @@ class ArrowOutputStream { std::shared_ptr arrow_table_ = nullptr; public: - std::shared_ptr create_arrow_file(const SaveOption& arrow_file_type, + std::shared_ptr create_arrow_file(const Nyxus::SaveOption& arrow_file_type, const std::string& arrow_file_path, const std::vector& header); std::shared_ptr get_arrow_table(const std::string& file_path); @@ -66,7 +66,7 @@ class ArrowOutputStream { std::shared_ptr arrow_table_ = nullptr; public: - std::shared_ptr create_arrow_file(const SaveOption& arrow_file_type, + std::shared_ptr create_arrow_file(const Nyxus::SaveOption& arrow_file_type, const std::string& arrow_file_path, const std::vector& header); std::shared_ptr get_arrow_table(const std::string& file_path); diff --git a/src/nyx/globals.h b/src/nyx/globals.h index ba5c69fc..e2832b7d 100644 --- a/src/nyx/globals.h +++ b/src/nyx/globals.h @@ -13,6 +13,7 @@ #include "image_loader.h" #include "results_cache.h" #include "roi_cache.h" +#include "save_option.h" #include "nested_feature_aggregation.h" // Nested ROI @@ -30,8 +31,6 @@ namespace Nyxus extern FeatureManager theFeatureMgr; extern ImageLoader theImLoader; - enum class SaveOption {saveCSV, saveBuffer, saveArrowIPC, saveParquet}; - bool scanFilePairParallel(const std::string& intens_fpath, const std::string& label_fpath, int num_fastloader_threads, int num_sensemaker_threads, int filepair_index, int tot_num_filepairs); std::string getPureFname(const std::string& fpath); int processDataset(const std::vector& intensFiles, const std::vector& labelFiles, int numFastloaderThreads, int numSensemakerThreads, int numReduceThreads, int min_online_roi_size, SaveOption saveOption, const std::string& outputDir); diff --git a/src/nyx/save_option.h b/src/nyx/save_option.h new file mode 100644 index 00000000..6b54e7ca --- /dev/null +++ b/src/nyx/save_option.h @@ -0,0 +1,5 @@ +#pragma once + +namespace Nyxus { + enum class SaveOption {saveCSV, saveBuffer, saveArrowIPC, saveParquet}; +}; \ No newline at end of file From a018d6a6f5f0f50cdeea9dace64113e700dae620 Mon Sep 17 00:00:00 2001 From: JesseMckinzie <72471813+JesseMckinzie@users.noreply.github.com> Date: Wed, 25 Oct 2023 10:25:03 -0600 Subject: [PATCH 09/13] Remove unused Arrow parameters --- src/nyx/environment.cpp | 33 +++++++++++++++++---------------- src/nyx/environment.h | 7 +++---- 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/src/nyx/environment.cpp b/src/nyx/environment.cpp index 6719de4d..5ffa52ea 100644 --- a/src/nyx/environment.cpp +++ b/src/nyx/environment.cpp @@ -833,25 +833,24 @@ bool Environment::parse_cmdline(int argc, char **argv) auto rawOutpTypeUC = Nyxus::toupper(rawOutpType); if (rawOutpTypeUC != Nyxus::toupper(OT_SINGLECSV) && rawOutpTypeUC != Nyxus::toupper(OT_SEPCSV) && - rawOutpTypeUC != Nyxus::toupper(OT_ARROW) && rawOutpTypeUC != Nyxus::toupper(OT_ARROWIPC) && rawOutpTypeUC != Nyxus::toupper(OT_PARQUET)) { - std::cout << "Error: valid values of " << OUTPUTTYPE << " are " << OT_SEPCSV << ", " << OT_SINGLECSV << ", " << OT_ARROW ", or" << OT_PARQUET << "." "\n"; + std::cout << "Error: valid values of " << OUTPUTTYPE << " are " << OT_SEPCSV << ", " << OT_SINGLECSV << ", or" << OT_PARQUET << "." "\n"; return false; } - if (rawOutpTypeUC == Nyxus::toupper(OT_ARROW) || - rawOutpTypeUC == Nyxus::toupper(OT_ARROWIPC) || - rawOutpTypeUC == Nyxus::toupper(OT_PARQUET)) - { - useCsv = false; - arrow_output_type = rawOutpTypeUC; - use_apache_writers = true; - } - else - { - useCsv = true; + SaveOption saveOption = [&rawOutpTypeUC](){ + if (rawOutpTypeUC == Nyxus::toupper(OT_ARROWIPC)) { + return SaveOption::saveArrowIPC; + } else if (rawOutpTypeUC == Nyxus::toupper(OT_PARQUET)) { + return SaveOption::saveParquet; + } else { + return SaveOption::saveCSV; + } + }(); + + if (saveOption == SaveOption::saveCSV) { separateCsv = rawOutpTypeUC == Nyxus::toupper(OT_SEPCSV); } @@ -863,16 +862,18 @@ bool Environment::parse_cmdline(int argc, char **argv) std::cout << "Error: valid values of " << OUTPUTTYPE << " are " << OT_SEPCSV << ", " << OT_SINGLECSV << "\n"; // Intercept an attempt of running Nyxus with Apache options - if (rawOutpTypeUC != Nyxus::toupper(OT_ARROW) || - rawOutpTypeUC != Nyxus::toupper(OT_ARROWIPC) || + if (rawOutpTypeUC != Nyxus::toupper(OT_ARROWIPC) || rawOutpTypeUC != Nyxus::toupper(OT_PARQUET)) std::cout << "Error: Nyxus must be built with Apache Arrow enabled to use Arrow output types. Please rebuild with the flag USEARROW=ON." << std::endl; return false; } - useCsv = (rawOutpTypeUC == Nyxus::toupper(OT_SINGLECSV) || rawOutpTypeUC == Nyxus::toupper(OT_SEPCSV)); separateCsv = rawOutpTypeUC == Nyxus::toupper(OT_SEPCSV); + + if (rawOutpTypeUC == Nyxus::toupper(OT_SINGLECSV) || rawOutpTypeUC == Nyxus::toupper(OT_SEPCSV)) { + saveOption = Nyxus::SaveOption::saveCSV; + } #endif //==== Check numeric parameters diff --git a/src/nyx/environment.h b/src/nyx/environment.h index 1f65c4e6..0ce0368a 100644 --- a/src/nyx/environment.h +++ b/src/nyx/environment.h @@ -7,6 +7,7 @@ #include "roi_blacklist.h" #include "cli_gabor_options.h" #include "cli_nested_roi_options.h" +#include "save_option.h" #include "output_writers.h" #include "arrow_output_stream.h" @@ -88,7 +89,6 @@ // Valid values of 'OUTPUTTYPE' #define OT_SEPCSV "separatecsv" #define OT_SINGLECSV "singlecsv" -#define OT_ARROW "arrow" #define OT_ARROWIPC "arrowipc" #define OT_PARQUET "parquet" @@ -117,8 +117,6 @@ class Environment: public BasicEnvironment bool singleROI = false; // is set to 'true' parse_cmdline() if labels_dir==intensity_dir - bool use_apache_writers = false; - std::string arrow_output_type = ""; ArrowOutputStream arrow_stream; std::shared_ptr arrow_writer = nullptr; @@ -149,7 +147,8 @@ class Environment: public BasicEnvironment std::string rawOutpType = ""; // Valid values: "separatecsv", "singlecsv", "arrow", "parquet" bool separateCsv = true; - bool useCsv = true; + + Nyxus::SaveOption saveOption; // x- and y- resolution in pixels per centimeter std::string rawXYRes = ""; From 50f1c7dd198972e987a4b11c4d421937096d5882 Mon Sep 17 00:00:00 2001 From: JesseMckinzie <72471813+JesseMckinzie@users.noreply.github.com> Date: Wed, 25 Oct 2023 10:25:28 -0600 Subject: [PATCH 10/13] Use updated Arrow parameters --- src/nyx/main_nyxus.cpp | 15 +----- src/nyx/python/new_bindings_py.cpp | 76 +++++++++++------------------- src/nyx/python/nyxus/nyxus.py | 19 ++++---- 3 files changed, 39 insertions(+), 71 deletions(-) diff --git a/src/nyx/main_nyxus.cpp b/src/nyx/main_nyxus.cpp index aa42c354..42d4bfd5 100644 --- a/src/nyx/main_nyxus.cpp +++ b/src/nyx/main_nyxus.cpp @@ -3,6 +3,7 @@ #include "dirs_and_files.h" #include "environment.h" #include "globals.h" +#include "save_option.h" #include "arrow_output_stream.h" #ifdef USE_GPU bool gpu_initialize(int dev_id); @@ -62,18 +63,6 @@ int main (int argc, char** argv) // Process the image data int min_online_roi_size = 0; - SaveOption saveOption = [](){ - if (theEnvironment.use_apache_writers) { - if (Nyxus::toupper(theEnvironment.arrow_output_type) == "ARROW") { - return SaveOption::saveArrowIPC; - } else { - return SaveOption::saveParquet; - } - } - else if (theEnvironment.useCsv) {return SaveOption::saveCSV;} - else {return SaveOption::saveBuffer;} - }(); - errorCode = processDataset ( intensFiles, labelFiles, @@ -81,7 +70,7 @@ int main (int argc, char** argv) theEnvironment.n_pixel_scan_threads, theEnvironment.n_reduce_threads, min_online_roi_size, - saveOption, + theEnvironment.saveOption, theEnvironment.output_dir); // Report feature extraction error, if any diff --git a/src/nyx/python/new_bindings_py.cpp b/src/nyx/python/new_bindings_py.cpp index 70e5fb89..643a79c7 100644 --- a/src/nyx/python/new_bindings_py.cpp +++ b/src/nyx/python/new_bindings_py.cpp @@ -140,7 +140,7 @@ py::tuple featurize_directory_imp ( const std::string &intensity_dir, const std::string &labels_dir, const std::string &file_pattern, - bool pandas_output=true, + const std::string &output_type, const std::string &arrow_file_path="") { // Check and cache the file pattern @@ -173,22 +173,17 @@ py::tuple featurize_directory_imp ( // We're good to extract features. Reset the feature results cache theResultsCache.clear(); - theEnvironment.use_apache_writers = !pandas_output; - theEnvironment.separateCsv = false; // Process the image sdata int min_online_roi_size = 0; - SaveOption saveOption = [](){ - if (theEnvironment.use_apache_writers) { - if (Nyxus::toupper(theEnvironment.arrow_output_type) == "ARROW") { - return SaveOption::saveArrowIPC; - } else { - return SaveOption::saveParquet; - } - } - else {return SaveOption::saveBuffer;} + theEnvironment.saveOption = [&output_type](){ + if (output_type == "arrowipc") { + return SaveOption::saveArrowIPC; + } else if (output_type == "parquet") { + return SaveOption::saveParquet; + } else {return SaveOption::saveBuffer;} }(); errorCode = processDataset( @@ -198,14 +193,14 @@ py::tuple featurize_directory_imp ( theEnvironment.n_pixel_scan_threads, theEnvironment.n_reduce_threads, min_online_roi_size, - saveOption, + theEnvironment.saveOption, theEnvironment.output_dir); if (errorCode) throw std::runtime_error("Error " + std::to_string(errorCode) + " occurred during dataset processing"); // Output the result - if (pandas_output) + if (theEnvironment.saveOption == Nyxus::SaveOption::saveBuffer) { auto pyHeader = py::array(py::cast(theResultsCache.get_headerBuf())); @@ -227,18 +222,12 @@ py::tuple featurize_montage_imp ( const py::array_t& label_images, const std::vector& intensity_names, const std::vector& label_names, - bool pandas_output=true, - const std::string arrow_output_type="", + const std::string output_type="", const std::string output_dir="") { // Set the whole-slide/multi-ROI flag theEnvironment.singleROI = false; -#ifdef USE_ARROW - // Set arrow output type - theEnvironment.arrow_output_type = arrow_output_type; -#endif - auto intens_buffer = intensity_images.request(); auto label_buffer = label_images.request(); @@ -269,20 +258,15 @@ py::tuple featurize_montage_imp ( theResultsCache.clear(); - theEnvironment.use_apache_writers = !pandas_output; - // Process the image sdata std::string error_message = ""; - SaveOption saveOption = [](){ - if (theEnvironment.use_apache_writers) { - if (Nyxus::toupper(theEnvironment.arrow_output_type) == "ARROW") { - return SaveOption::saveArrowIPC; - } else { - return SaveOption::saveParquet; - } - } - else {return SaveOption::saveBuffer;} + theEnvironment.saveOption = [&output_type](){ + if (output_type == "arrowipc") { + return SaveOption::saveArrowIPC; + } else if (output_type == "parquet") { + return SaveOption::saveParquet; + } else {return SaveOption::saveBuffer;} }(); @@ -294,13 +278,13 @@ py::tuple featurize_montage_imp ( intensity_names, label_names, error_message, - saveOption, + theEnvironment.saveOption, output_dir); if (errorCode) throw std::runtime_error("Error #" + std::to_string(errorCode) + " " + error_message + " occurred during dataset processing."); - if (pandas_output) { + if (theEnvironment.saveOption == Nyxus::SaveOption::saveBuffer) { auto pyHeader = py::array(py::cast(theResultsCache.get_headerBuf())); auto pyStrData = py::array(py::cast(theResultsCache.get_stringColBuf())); @@ -318,10 +302,11 @@ py::tuple featurize_montage_imp ( return py::make_tuple(error_message, path); } -py::tuple featurize_fname_lists_imp (const py::list& int_fnames, const py::list & seg_fnames, bool single_roi, bool pandas_output=true) +py::tuple featurize_fname_lists_imp (const py::list& int_fnames, const py::list & seg_fnames, bool single_roi, const std::string& output_type, const std::string& output_dir) { // Set the whole-slide/multi-ROI flag theEnvironment.singleROI = single_roi; + theEnvironment.output_dir = output_dir; std::vector intensFiles, labelFiles; for (auto it = int_fnames.begin(); it != int_fnames.end(); ++it) @@ -361,21 +346,16 @@ py::tuple featurize_fname_lists_imp (const py::list& int_fnames, const py::list theResultsCache.clear(); - theEnvironment.use_apache_writers = !pandas_output; - // Process the image sdata int min_online_roi_size = 0; int errorCode; - SaveOption saveOption = [](){ - if (theEnvironment.use_apache_writers) { - if (Nyxus::toupper(theEnvironment.arrow_output_type) == "ARROW") { - return SaveOption::saveArrowIPC; - } else { - return SaveOption::saveParquet; - } - } - else {return SaveOption::saveBuffer;} + theEnvironment.saveOption = [&output_type](){ + if (output_type == "arrowipc") { + return SaveOption::saveArrowIPC; + } else if (output_type == "parquet") { + return SaveOption::saveParquet; + } else {return SaveOption::saveBuffer;} }(); errorCode = processDataset( @@ -385,14 +365,14 @@ py::tuple featurize_fname_lists_imp (const py::list& int_fnames, const py::list theEnvironment.n_pixel_scan_threads, theEnvironment.n_reduce_threads, min_online_roi_size, - saveOption, + theEnvironment.saveOption, theEnvironment.output_dir); if (errorCode) throw std::runtime_error("Error occurred during dataset processing."); - if (pandas_output) { + if (theEnvironment.saveOption == Nyxus::SaveOption::saveBuffer) { auto pyHeader = py::array(py::cast(theResultsCache.get_headerBuf())); auto pyStrData = py::array(py::cast(theResultsCache.get_stringColBuf())); diff --git a/src/nyx/python/nyxus/nyxus.py b/src/nyx/python/nyxus/nyxus.py index 824bd652..29f26524 100644 --- a/src/nyx/python/nyxus/nyxus.py +++ b/src/nyx/python/nyxus/nyxus.py @@ -162,7 +162,7 @@ def __init__( ) # list of valid outputs that are used throughout featurize functions - self._valid_output_types = ['pandas', 'arrow', 'arrowipc', 'parquet'] + self._valid_output_types = ['pandas', 'arrowipc', 'parquet'] def featurize_directory( self, @@ -213,7 +213,7 @@ def featurize_directory( if (output_type == 'pandas'): - header, string_data, numeric_data = featurize_directory_imp (intensity_dir, label_dir, file_pattern, True, "") + header, string_data, numeric_data = featurize_directory_imp (intensity_dir, label_dir, file_pattern, output_type, "") df = pd.concat( [ @@ -231,7 +231,7 @@ def featurize_directory( else: - featurize_directory_imp(intensity_dir, label_dir, file_pattern, False, output_path) + featurize_directory_imp(intensity_dir, label_dir, file_pattern, output_type, output_path) @@ -270,9 +270,8 @@ def featurize( Pandas DataFrame containing the requested features with one row per label per image. """ - valid_output_types = ['arrow', 'parquet', 'pandas'] - if (output_type != "" and output_type not in valid_output_types): - raise ValueError("Invalid output type: " + output_type + ". Valid options are: " + valid_output_types) + if (output_type != "" and output_type not in self._valid_output_types): + raise ValueError("Invalid output type: " + output_type + ". Valid options are: " + self._valid_output_types) # verify argument types @@ -326,7 +325,7 @@ def featurize( if (output_type == 'pandas'): - header, string_data, numeric_data, error_message = featurize_montage_imp (intensity_images, label_images, intensity_names, label_names, True, "", "") + header, string_data, numeric_data, error_message = featurize_montage_imp (intensity_images, label_images, intensity_names, label_names, output_Type, "") self.error_message = error_message if(error_message != ''): @@ -348,7 +347,7 @@ def featurize( else: - error_message = featurize_montage_imp (intensity_images, label_images, intensity_names, label_names, False, output_type, output_path) + error_message = featurize_montage_imp (intensity_images, label_images, intensity_names, label_names, output_type, output_path) self.error_message = error_message @@ -404,7 +403,7 @@ def featurize_files ( if (output_type == 'pandas'): - header, string_data, numeric_data = featurize_fname_lists_imp (intensity_files, mask_files, single_roi, True) + header, string_data, numeric_data = featurize_fname_lists_imp (intensity_files, mask_files, single_roi, output_type, "") df = pd.concat( [ @@ -422,7 +421,7 @@ def featurize_files ( else: - featurize_fname_lists_imp (intensity_files, mask_files, single_roi, False) + featurize_fname_lists_imp (intensity_files, mask_files, single_roi, output_type, output_path) From d9488ebf86510f63a115e2e4fb168b82a2c5ae85 Mon Sep 17 00:00:00 2001 From: JesseMckinzie <72471813+JesseMckinzie@users.noreply.github.com> Date: Wed, 25 Oct 2023 10:41:04 -0600 Subject: [PATCH 11/13] Update "arrow" type to "arrowipc" --- src/nyx/python/nyxus/nyxus.py | 2 +- tests/python/test_nyxus.py | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/nyx/python/nyxus/nyxus.py b/src/nyx/python/nyxus/nyxus.py index 29f26524..6b8be644 100644 --- a/src/nyx/python/nyxus/nyxus.py +++ b/src/nyx/python/nyxus/nyxus.py @@ -271,7 +271,7 @@ def featurize( per image. """ if (output_type != "" and output_type not in self._valid_output_types): - raise ValueError("Invalid output type: " + output_type + ". Valid options are: " + self._valid_output_types) + raise ValueError(f'Invalid output type: {output_type}. Valid options are: {self._valid_output_types}') # verify argument types diff --git a/tests/python/test_nyxus.py b/tests/python/test_nyxus.py index 3f948cba..d3bcc358 100644 --- a/tests/python/test_nyxus.py +++ b/tests/python/test_nyxus.py @@ -312,7 +312,7 @@ def test_make_arrow_ipc(self): features = nyx.featurize(intens, seg) - arrow_path = nyx.featurize(intens, seg, output_type="arrow") + arrow_path = nyx.featurize(intens, seg, output_type="arrowipc") arrow_array = nyx.get_arrow_memory_mapping(arrow_path) @@ -342,7 +342,7 @@ def test_arrow_ipc(self): nyx = nyxus.Nyxus (["*ALL*"]) assert nyx is not None - arrow_path = nyx.featurize(intens, seg, output_type="arrow", output_path='TestNyxusOut') + arrow_path = nyx.featurize(intens, seg, output_type="arrowipc", output_path='TestNyxusOut') if (not nyx.arrow_is_enabled()): @@ -381,7 +381,7 @@ def test_arrow_ipc_no_path(self): nyx = nyxus.Nyxus (["*ALL*"]) assert nyx is not None - arrow_path = nyx.featurize(intens, seg, output_type="arrow") + arrow_path = nyx.featurize(intens, seg, output_type="arrowipc") assert arrow_path == 'NyxusFeatures.arrow' @@ -422,7 +422,7 @@ def test_arrow_ipc_path(self): nyx = nyxus.Nyxus (["*ALL*"]) assert nyx is not None - arrow_path = nyx.featurize(intens, seg, output_type="arrow") + arrow_path = nyx.featurize(intens, seg, output_type="arrowipc") path = nyx.get_arrow_ipc_file() From dc5530f365586e31c20f9a880982dbb921be6e1a Mon Sep 17 00:00:00 2001 From: JesseMckinzie <72471813+JesseMckinzie@users.noreply.github.com> Date: Wed, 25 Oct 2023 11:19:57 -0600 Subject: [PATCH 12/13] Update nyxus.py --- src/nyx/python/nyxus/nyxus.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/src/nyx/python/nyxus/nyxus.py b/src/nyx/python/nyxus/nyxus.py index 6b8be644..13c4888c 100644 --- a/src/nyx/python/nyxus/nyxus.py +++ b/src/nyx/python/nyxus/nyxus.py @@ -325,7 +325,7 @@ def featurize( if (output_type == 'pandas'): - header, string_data, numeric_data, error_message = featurize_montage_imp (intensity_images, label_images, intensity_names, label_names, output_Type, "") + header, string_data, numeric_data, error_message = featurize_montage_imp (intensity_images, label_images, intensity_names, label_names, output_type, "") self.error_message = error_message if(error_message != ''): @@ -357,9 +357,19 @@ def featurize( if (output_path.endswith('.arrow') or output_path.endswith('.parquet')): return output_path else: + if (output_path == ""): + + if (output_type == "arrowipc"): + return 'NyxusFeatures.arrow' + return 'NyxusFeatures.' + output_type + else: + + if (output_type == "arrowipc"): + return output_path + 'NyxusFeatures.arrow' + return output_path + '/NyxusFeatures.' + output_type From 228c7468b91a809b7667b78fece2df5121347d93 Mon Sep 17 00:00:00 2001 From: JesseMckinzie <72471813+JesseMckinzie@users.noreply.github.com> Date: Wed, 25 Oct 2023 11:27:28 -0600 Subject: [PATCH 13/13] Update nyxus.py --- src/nyx/python/nyxus/nyxus.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/nyx/python/nyxus/nyxus.py b/src/nyx/python/nyxus/nyxus.py index 13c4888c..fdf7b941 100644 --- a/src/nyx/python/nyxus/nyxus.py +++ b/src/nyx/python/nyxus/nyxus.py @@ -368,7 +368,7 @@ def featurize( else: if (output_type == "arrowipc"): - return output_path + 'NyxusFeatures.arrow' + return output_path + '/NyxusFeatures.arrow' return output_path + '/NyxusFeatures.' + output_type