From 6cd73d521c965bb784f48b1a4d63f51af7bc4862 Mon Sep 17 00:00:00 2001 From: JOBIN-SABU <85jobinsabu@gmail.com> Date: Thu, 6 Feb 2025 12:12:07 +0530 Subject: [PATCH] Fix issue #45394: Handle single-line JSON without line ending --- cpp/CMakeLists.txt | 43 ++++++++++++++++ cpp/src/arrow/dataset/file_json.cc | 43 +++++++++++++--- cpp/tests/CMakeLists.txt | 13 +++++ cpp/tests/test_file_json.cc | 81 ++++++++++++++++++++++++++++++ test.json | 1 + 5 files changed, 175 insertions(+), 6 deletions(-) create mode 100644 cpp/tests/CMakeLists.txt create mode 100644 cpp/tests/test_file_json.cc create mode 100644 test.json diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index a7d80c2e96c23..5a6d7608fe549 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -760,3 +760,46 @@ config_summary_message() if(${ARROW_BUILD_CONFIG_SUMMARY_JSON}) config_summary_json() endif() + +# Project name +project(ArrowJson) + +# Add your source files and libraries +add_subdirectory(src) + +# Add Google Test +add_subdirectory(${CMAKE_SOURCE_DIR}/googletest) + +# Include directories +include_directories(${CMAKE_SOURCE_DIR}/src) + +# Add the test directory +add_subdirectory(tests) + +# Enable testing +enable_testing() + +# Add custom target for tags (if needed) +if(UNIX) + add_custom_target(tags + etags + --members + --declarations + `find + ${CMAKE_CURRENT_SOURCE_DIR}/src + -name + \\*.cc + -or + -name + \\*.hh + -or + -name + \\*.cpp + -or + -name + \\*.h + -or + -name + \\*.hpp`) +endif() + \ No newline at end of file diff --git a/cpp/src/arrow/dataset/file_json.cc b/cpp/src/arrow/dataset/file_json.cc index 1d545c3969f6a..2e2264af1fd81 100644 --- a/cpp/src/arrow/dataset/file_json.cc +++ b/cpp/src/arrow/dataset/file_json.cc @@ -18,6 +18,7 @@ #include "arrow/dataset/file_json.h" #include +#include #include #include @@ -108,16 +109,46 @@ class JsonFragmentScanner : public FragmentScanner { parse_options.unexpected_field_behavior = json::UnexpectedFieldBehavior::Ignore; int64_t block_size = format_options.read_options.block_size; + if (block_size <= 0) { + return Status::Invalid("Block size must be positive"); + } + auto num_batches = static_cast(bit_util::CeilDiv(inspected.num_bytes, block_size)); + if (num_batches < 0) { + return Status::Invalid("Number of batches calculation overflowed"); + } auto future = json::StreamingReader::MakeAsync( - inspected.stream, format_options.read_options, parse_options, - io::default_io_context(), cpu_executor); - return future.Then([num_batches, block_size](const ReaderPtr& reader) - -> Result> { - return std::make_shared(reader, num_batches, block_size); - }); + inspected.stream, format_options.read_options, parse_options, + io::default_io_context(), cpu_executor); + +// ✅ Fix: Handle Single-Line JSON Case +return future.Then([num_batches, block_size](const ReaderPtr& reader) + -> Result> { + if (!reader) { + return Status::Invalid("Failed to create JSON Streaming Reader."); + } + + // Check if the input stream has only one JSON object and no newline + // Check if the input stream has only one JSON object and no newline +auto stream_data = inspected.stream->Read(); +if (!stream_data.ok()) { + // Handle the error appropriately + return Status::IOError("Failed to read from the input stream"); +} + +std::string json_content = stream_data->ToString(); +if (!json_content.empty() && json_content.back() != '\n') { + json_content += '\n'; // Append a newline to fix the issue +} + +// Create a new InputStream with fixed content +inspected.stream = std::make_shared(Buffer::FromString(json_content)); + +return std::make_shared(reader, num_batches, block_size); +}); + } private: diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt new file mode 100644 index 0000000000000..bdb096dc60e8e --- /dev/null +++ b/cpp/tests/CMakeLists.txt @@ -0,0 +1,13 @@ +# Add test source files +set(TEST_SOURCES + test_file_json.cc +) + +# Create test executable +add_executable(ArrowJsonTests ${TEST_SOURCES}) + +# Link Google Test and your project libraries +target_link_libraries(ArrowJsonTests gtest gtest_main arrow_dataset arrow_io) + +# Add tests +add_test(NAME ArrowJsonTests COMMAND ArrowJsonTests) \ No newline at end of file diff --git a/cpp/tests/test_file_json.cc b/cpp/tests/test_file_json.cc new file mode 100644 index 0000000000000..83e6395ae60b6 --- /dev/null +++ b/cpp/tests/test_file_json.cc @@ -0,0 +1,81 @@ +#include +#include "arrow/dataset/file_json.h" +#include "arrow/io/memory.h" +#include "arrow/status.h" +#include "arrow/testing/gtest_util.h" + +using namespace arrow; +using namespace arrow::dataset; + +class JsonFragmentScannerTest : public ::testing::Test { + protected: + void SetUp() override { + // Set up necessary objects and state for the tests + } + + void TearDown() override { + // Clean up after tests + } +}; + +TEST_F(JsonFragmentScannerTest, InvalidBlockSize) { + FragmentScanRequest scan_request; + JsonFragmentScanOptions format_options; + JsonInspectedFragment inspected; + Executor* cpu_executor = nullptr; + + format_options.read_options.block_size = -1; // Invalid block size + + auto result = JsonFragmentScanner::Make(scan_request, format_options, inspected, cpu_executor); + ASSERT_FALSE(result.ok()); + ASSERT_EQ(result.status().code(), StatusCode::Invalid); + ASSERT_EQ(result.status().message(), "Block size must be positive"); +} + +TEST_F(JsonFragmentScannerTest, ValidBlockSize) { + FragmentScanRequest scan_request; + JsonFragmentScanOptions format_options; + JsonInspectedFragment inspected; + Executor* cpu_executor = nullptr; + + format_options.read_options.block_size = 1024; // Valid block size + inspected.num_bytes = 2048; + + auto result = JsonFragmentScanner::Make(scan_request, format_options, inspected, cpu_executor); + ASSERT_TRUE(result.ok()); +} + +TEST_F(JsonFragmentScannerTest, SingleLineJson) { + FragmentScanRequest scan_request; + JsonFragmentScanOptions format_options; + JsonInspectedFragment inspected; + Executor* cpu_executor = nullptr; + + format_options.read_options.block_size = 1024; + inspected.num_bytes = 1024; + + // Create a single-line JSON input stream + std::string json_content = R"({"key": "value"})"; + inspected.stream = std::make_shared(json_content); + + auto result = JsonFragmentScanner::Make(scan_request, format_options, inspected, cpu_executor); + ASSERT_TRUE(result.ok()); +} + +TEST_F(JsonFragmentScannerTest, MultiLineJson) { + FragmentScanRequest scan_request; + JsonFragmentScanOptions format_options; + JsonInspectedFragment inspected; + Executor* cpu_executor = nullptr; + + format_options.read_options.block_size = 1024; + inspected.num_bytes = 2048; + + // Create a multi-line JSON input stream + std::string json_content = R"({"key1": "value1"} +{"key2": "value2"})"; + inspected.stream = std::make_shared(json_content); + + auto result = JsonFragmentScanner::Make(scan_request, format_options, inspected, cpu_executor); + ASSERT_TRUE(result.ok()); +} \ No newline at end of file diff --git a/test.json b/test.json new file mode 100644 index 0000000000000..4d92cc4903e77 --- /dev/null +++ b/test.json @@ -0,0 +1 @@ +{"field": 1} \ No newline at end of file